Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions api/service/stagedstreamsync/beacon_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (bh *beaconHelper) loop() {
case <-t.C:
bh.insertAsync()

case b, ok := <-bh.blockC:
case b, ok := <-bh.blockC: // for side chain, it receives last block of each epoch
if !ok {
return // blockC closed. Node exited
}
Expand Down Expand Up @@ -176,10 +176,6 @@ func (bh *beaconHelper) getNextBlock(expBN uint64) *types.Block {
if b.NumberU64() < expBN {
continue
}
if b.NumberU64() > expBN {
bh.lastMileCache.push(b)
return nil
}
return b
}
return nil
Expand Down
22 changes: 11 additions & 11 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewDownloader(host p2p.Host,
Logger()

var bh *beaconHelper
if config.BHConfig != nil && bc.ShardID() == shard.BeaconChainShardID {
if config.BHConfig != nil && !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID {
bh = newBeaconHelper(bc, logger, config.BHConfig.BlockC, config.BHConfig.InsertHook)
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (d *Downloader) loop() {
// Shard chain and beacon chain nodes start with initSync=true
// to ensure they first go through long-range sync.
// Epoch chain nodes only require epoch sync.
initSync := !d.stagedSyncInstance.isEpochChain
d.stagedSyncInstance.initSync = !d.stagedSyncInstance.isEpochChain

trigger := func() {
select {
Expand All @@ -283,7 +283,7 @@ func (d *Downloader) loop() {
if atomic.CompareAndSwapInt32(&isDownloading, 0, 1) {
go func() {
defer atomic.StoreInt32(&isDownloading, 0)
d.handleDownload(&initSync, trigger)
d.handleDownload(trigger)
}()
}

Expand All @@ -293,23 +293,23 @@ func (d *Downloader) loop() {
}
}

func (d *Downloader) handleDownload(initSync *bool, trigger func()) {
func (d *Downloader) handleDownload(trigger func()) {
d.syncMutex.Lock()
defer d.syncMutex.Unlock()

bnBeforeSync := d.bc.CurrentBlock().NumberU64()

// Perform sync and get estimated height and blocks added
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, *initSync)
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx)

switch err {
case nil:
// Log completion when finishing initial long-range sync
if *initSync {
if d.stagedSyncInstance.initSync {
d.logger.Info().
Int("block added", addedBN).
Uint64("current height", d.bc.CurrentBlock().NumberU64()).
Bool("initSync", *initSync).
Bool("initSync", d.stagedSyncInstance.initSync).
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
}
Expand All @@ -324,23 +324,23 @@ func (d *Downloader) handleDownload(initSync *bool, trigger func()) {

// Transition from long-range sync to short-range sync if nearing the latest height.
// This prevents staying in long-range mode when only a few blocks remain.
if *initSync && addedBN > 0 {
if d.stagedSyncInstance.initSync && addedBN > 0 {
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync

// Switch to short-range sync if both before and after sync distances are small.
if distanceBeforeSync <= uint64(ShortRangeThreshold) &&
distanceAfterSync <= uint64(ShortRangeThreshold) {
*initSync = false
d.stagedSyncInstance.initSync = false
}
}

case ErrNotEnoughStreams:
// Log sync failure and retry after a short delay
d.logger.Error().
Err(err).
Bool("initSync", *initSync).
Bool("initSync", d.stagedSyncInstance.initSync).
Msg(WrapStagedSyncMsg("sync loop failed"))
// Wait for enough available streams before retrying
d.waitForEnoughStreams(d.config.MinStreams)
Expand Down Expand Up @@ -370,7 +370,7 @@ func (d *Downloader) handleDownload(initSync *bool, trigger func()) {
// Log sync failure and retry after a short delay
d.logger.Error().
Err(err).
Bool("initSync", *initSync).
Bool("initSync", d.stagedSyncInstance.initSync).
Msg(WrapStagedSyncMsg("sync loop failed"))

// Retry sync after 5 seconds
Expand Down
4 changes: 2 additions & 2 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type srHelper struct {
logger zerolog.Logger
}

func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64) ([]common.Hash, []sttypes.StreamID, error) {
func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64, partially bool) ([]common.Hash, []sttypes.StreamID, error) {
results := newBlockHashResults(bns)

concurrency := sh.config.Concurrency
Expand All @@ -34,7 +34,7 @@ func (sh *srHelper) getHashChain(ctx context.Context, bns []uint64) ([]common.Ha
go func(index int) {
defer wg.Done()

hashes, stid, err := sh.doGetBlockHashesRequest(ctx, bns, false)
hashes, stid, err := sh.doGetBlockHashesRequest(ctx, bns, partially)
if err != nil {
sh.logger.Warn().Err(err).Str("StreamID", string(stid)).
Msg(WrapStagedSyncMsg("doGetBlockHashes return error"))
Expand Down
20 changes: 9 additions & 11 deletions api/service/stagedstreamsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ type StageBlockHashesCfg struct {
concurrency int
protocol syncProtocol
//bgProcRunning bool
isBeaconShard bool
cachedb kv.RwDB
logProgress bool
logger zerolog.Logger
cachedb kv.RwDB
logProgress bool
logger zerolog.Logger
}

func NewStageBlockHashes(cfg StageBlockHashesCfg) *StageBlockHashes {
Expand All @@ -43,14 +42,13 @@ func NewStageBlockHashes(cfg StageBlockHashesCfg) *StageBlockHashes {
}
}

func NewStageBlockHashesCfg(bc core.BlockChain, db kv.RwDB, concurrency int, protocol syncProtocol, isBeaconShard bool, logger zerolog.Logger, logProgress bool) StageBlockHashesCfg {
func NewStageBlockHashesCfg(bc core.BlockChain, db kv.RwDB, concurrency int, protocol syncProtocol, logger zerolog.Logger, logProgress bool) StageBlockHashesCfg {
return StageBlockHashesCfg{
bc: bc,
db: db,
concurrency: concurrency,
protocol: protocol,
isBeaconShard: isBeaconShard,
logProgress: logProgress,
bc: bc,
db: db,
concurrency: concurrency,
protocol: protocol,
logProgress: logProgress,
logger: logger.With().
Str("stage", "StageBlockHashes").
Str("mode", "long range").
Expand Down
4 changes: 1 addition & 3 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type StageBodiesCfg struct {
blockDBs []kv.RwDB
concurrency int
protocol syncProtocol
isBeaconShard bool
extractReceiptHashes bool
logProgress bool
logger zerolog.Logger
Expand All @@ -46,14 +45,13 @@ func NewStageBodies(cfg StageBodiesCfg) *StageBodies {
}
}

func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeaconShard bool, extractReceiptHashes bool, logger zerolog.Logger, logProgress bool) StageBodiesCfg {
func NewStageBodiesCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, extractReceiptHashes bool, logger zerolog.Logger, logProgress bool) StageBodiesCfg {
return StageBodiesCfg{
bc: bc,
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
protocol: protocol,
isBeaconShard: isBeaconShard,
extractReceiptHashes: extractReceiptHashes,
logger: logger.With().
Str("stage", "StageBodies").
Expand Down
3 changes: 2 additions & 1 deletion api/service/stagedstreamsync/stage_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
}
}
curBN := s.state.bc.CurrentBlock().NumberU64()
curEpoch := s.state.bc.CurrentHeader().Epoch().Uint64()
bns := make([]uint64, 0, BlocksPerRequest)
// in epoch chain, we have only the last block of each epoch, so, the current
// block's epoch number shows the last epoch we have. We should start
// from next epoch then
loopEpoch := s.state.bc.CurrentHeader().Epoch().Uint64() + 1
loopEpoch := curEpoch + 1
for len(bns) < BlocksPerRequest {
blockNum := shard.Schedule.EpochLastBlock(loopEpoch)
if blockNum > curBN {
Expand Down
28 changes: 13 additions & 15 deletions api/service/stagedstreamsync/stage_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ type StageReceipts struct {
}

type StageReceiptsCfg struct {
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
concurrency int
protocol syncProtocol
isBeaconShard bool
logProgress bool
logger zerolog.Logger
bc core.BlockChain
db kv.RwDB
blockDBs []kv.RwDB
concurrency int
protocol syncProtocol
logProgress bool
logger zerolog.Logger
}

func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts {
Expand All @@ -37,14 +36,13 @@ func NewStageReceipts(cfg StageReceiptsCfg) *StageReceipts {
}
}

func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, isBeaconShard bool, logger zerolog.Logger, logProgress bool) StageReceiptsCfg {
func NewStageReceiptsCfg(bc core.BlockChain, db kv.RwDB, blockDBs []kv.RwDB, concurrency int, protocol syncProtocol, logger zerolog.Logger, logProgress bool) StageReceiptsCfg {
return StageReceiptsCfg{
bc: bc,
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
protocol: protocol,
isBeaconShard: isBeaconShard,
bc: bc,
db: db,
blockDBs: blockDBs,
concurrency: concurrency,
protocol: protocol,
logger: logger.With().
Str("stage", "StageReceipts").
Str("mode", "long range").
Expand Down
2 changes: 1 addition & 1 deletion api/service/stagedstreamsync/stage_short_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums, true)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil
Expand Down
99 changes: 69 additions & 30 deletions api/service/stagedstreamsync/staged_stream_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,33 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) {
}

type StagedStreamSync struct {
bc core.BlockChain
consensus *consensus.Consensus
db kv.RwDB
protocol syncProtocol
gbm *downloadManager // initialized when finished get block number
isEpochChain bool
isBeaconValidator bool
isBeaconShard bool
isExplorer bool
isValidator bool
joinConsensus bool
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
revertPoint *uint64 // used to run stages
prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon)
invalidBlock InvalidBlock
currentStage uint
LogProgress bool
currentCycle SyncCycle // current cycle
stages []*Stage
revertOrder []*Stage
pruningOrder []*Stage
timings []Timing
logPrefixes []string

bc core.BlockChain
consensus *consensus.Consensus
db kv.RwDB
protocol syncProtocol
gbm *downloadManager // initialized when finished get block number
isEpochChain bool
isBeaconValidator bool
isBeaconShard bool
isExplorer bool
isValidator bool
joinConsensus bool
inserted int
config Config
logger zerolog.Logger
status *status //TODO: merge this with currentSyncCycle
initSync bool // if sets to true, node start long range syncing
UseMemDB bool
revertPoint *uint64 // used to run stages
prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon)
invalidBlock InvalidBlock
currentStage uint
currentCycle SyncCycle // current cycle
stages []*Stage
revertOrder []*Stage
pruningOrder []*Stage
timings []Timing
logPrefixes []string
evtDownloadFinished event.Feed // channel for each download task finished
evtDownloadFinishedSubscribed bool
evtDownloadStarted event.Feed // channel for each download has started
Expand Down Expand Up @@ -299,9 +297,9 @@ func New(
consensus *consensus.Consensus,
db kv.RwDB,
stagesList []*Stage,
protocol syncProtocol,
isEpochChain bool,
isBeaconShard bool,
protocol syncProtocol,
isBeaconValidator bool,
isExplorer bool,
isValidator bool,
Expand Down Expand Up @@ -360,6 +358,7 @@ func New(
joinConsensus: joinConsensus,
gbm: nil,
status: status,
initSync: true,
inserted: 0,
config: config,
logger: logger,
Expand Down Expand Up @@ -477,6 +476,46 @@ func (sss *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, fi
return nil
}

func (sss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consensus.Consensus) ([]common.Hash, error) {
curNumber := bc.CurrentBlock().NumberU64()
var hashes []common.Hash

err := cs.GetLastMileBlockIter(curNumber+1, func(blockIter *consensus.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
_, err := bc.InsertChain(types.Blocks{block}, true)
switch {
case errors.Is(err, core.ErrKnownBlock):
case errors.Is(err, core.ErrNotLastBlockInEpoch):
case err != nil:
return errors.Wrap(err, "failed to InsertChain")
default:
hashes = append(hashes, block.Header().Hash())
}
}
return nil
})
return hashes, err
}

func (sss *StagedStreamSync) RollbackLastMileBlocks(ctx context.Context, hashes []common.Hash) error {
if len(hashes) == 0 {
return nil
}
sss.logger.Info().
Interface("block", sss.bc.CurrentBlock()).
Msg("[STAGED_STREAM_SYNC] Rolling back last mile blocks")
if err := sss.bc.Rollback(hashes); err != nil {
sss.logger.Error().Err(err).
Msg("[STAGED_STREAM_SYNC] failed to rollback last mile blocks")
return err
}
return nil
}

func (sss *StagedStreamSync) canExecute(stage *Stage) bool {
// check range mode
if stage.RangeMode != LongRangeAndShortRange {
Expand Down
Loading