Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 93 additions & 69 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type (
config Config
logger zerolog.Logger

lock sync.Mutex
syncMutex sync.Mutex
lock sync.Mutex
}
)

Expand Down Expand Up @@ -228,91 +229,114 @@ func (d *Downloader) loop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
// Shard chain and beacon chain nodes start with initSync=true
// to ensure they first go through long-range sync.
// Epoch chain nodes only require epoch sync.
initSync := !d.stagedSyncInstance.isEpochChain

trigger := func() {
select {
case d.downloadC <- struct{}{}:
case <-d.closeC:
case d.downloadC <- struct{}{}: // Notify to start syncing
case <-d.closeC: // Stop if downloader is closing
default:
}
}

// Start an initial sync trigger immediately
go trigger()

for {
select {
case <-ticker.C:
go trigger()
trigger()

case <-d.downloadC:
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err == ErrNotEnoughStreams {
d.waitForEnoughStreams(d.config.MinStreams)
}
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
numTriedStreams := len(d.stagedSyncInstance.invalidBlock.StreamID)
// if many streams couldn't solve it, then that's an unresolvable bad block
if numTriedStreams >= d.config.InitStreams {
if !d.stagedSyncInstance.invalidBlock.IsLogged {
d.logger.Error().
Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number).
Msg(WrapStagedSyncMsg("unresolvable bad block"))
d.stagedSyncInstance.invalidBlock.IsLogged = true
}
//TODO: if we don't have any new or untried stream in the list, sleep or panic
}
}
// If any error happens, sleep 5 seconds and retry
d.logger.Error().
Err(err).
Bool("initSync", initSync).
Msg(WrapStagedSyncMsg("sync loop failed"))
go func() {
time.Sleep(5 * time.Second)
trigger()
}()
time.Sleep(1 * time.Second)
break
}
if initSync {
d.logger.Info().Int("block added", addedBN).
Uint64("current height", d.bc.CurrentBlock().NumberU64()).
Bool("initSync", initSync).
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
go func() {
d.syncMutex.Lock()
defer d.syncMutex.Unlock()
d.handleDownload(&initSync, trigger)
}()

case <-d.closeC:
return
}
}
}

func (d *Downloader) handleDownload(initSync *bool, trigger func()) {
d.syncMutex.Lock()
defer d.syncMutex.Unlock()

bnBeforeSync := d.bc.CurrentBlock().NumberU64()

// Perform sync and get estimated height and blocks added
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, *initSync)

switch err {
case nil:
// Log completion when finishing initial long-range sync
if *initSync {
d.logger.Info().
Int("block added", addedBN).
Uint64("current height", d.bc.CurrentBlock().NumberU64()).
Bool("initSync", *initSync).
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
}

// If new blocks were added, trigger another sync and process last-mile blocks
if addedBN != 0 {
trigger()
if d.bh != nil {
d.bh.insertSync()
}
// If block number has been changed, trigger another sync
if addedBN != 0 {
go trigger()
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
}
}

// Transition from long-range sync to short-range sync if nearing the latest height.
// This prevents staying in long-range mode when only a few blocks remain.
if *initSync && addedBN > 0 {
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync

// Switch to short-range sync if both before and after sync distances are small.
if distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
*initSync = false
}
// If the last sync operation only a few blocks (less than LastMileBlocksThreshold)
// and the node is now fully synced, switch to short-range syncing.
// We check distanceBeforeSync to handle cases where the previous sync covered a long distance.
// In such cases, it’s likely that new blocks were added to other nodes during the sync process,
// so the node should remain in long-range mode to catch up with those blocks.
if initSync && addedBN > 0 {
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
// If after completing a full sync cycle, the node is still within the last mile block range,
// switch to short-range sync.
if distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
}

case ErrNotEnoughStreams:
// Wait for enough available streams before retrying
d.waitForEnoughStreams(d.config.MinStreams)

default:
// Handle unresolvable bad blocks
if d.stagedSyncInstance.invalidBlock.Active {
numTriedStreams := len(d.stagedSyncInstance.invalidBlock.StreamID)

// If multiple streams fail to resolve the bad block, mark it as unresolvable
if numTriedStreams >= d.config.InitStreams && !d.stagedSyncInstance.invalidBlock.IsLogged {
d.logger.Error().
Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number).
Str("bad block hash", d.stagedSyncInstance.invalidBlock.Hash.String()).
Msg(WrapStagedSyncMsg("unresolvable bad block"))
d.stagedSyncInstance.invalidBlock.IsLogged = true

// TODO: If no new untried streams exist, consider sleeping or panicking
}
case <-d.closeC:
return
}

// Log sync failure and retry after a short delay
d.logger.Error().
Err(err).
Bool("initSync", *initSync).
Msg(WrapStagedSyncMsg("sync loop failed"))

// Retry sync after 5 seconds
go func() {
time.Sleep(5 * time.Second)
trigger()
}()
}
}