fix(tracing): add missing spans#1161
Conversation
📝 WalkthroughWalkthroughWide rollout of tracing instrumentation across services, added error checking/logging for chunk batch writes, and reorganized shutdown/concurrency handling for mempool and chunk ingress (new internal max_concurrent_tasks and semaphore-driven drain). Also adjusted observability/docker Elasticsearch ILM and JVM memory settings. No public API signature changes. Changes
Sequence Diagram(s)mermaid Operator->>ChunkIngress: initiate shutdown Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/actors/src/data_sync_service.rs (1)
621-632:⚠️ Potential issue | 🟠 MajorRemove the span guard that wraps
tokio::select!and use function-level#[instrument]instead.Lines 621–631 hold a
Span::enter()guard across multiple.awaitpoints in the select loop, which violates tracing best practices and can cause incorrect span propagation and Send-bound friction.Since the function already has
#[tracing::instrument(level = "trace", skip_all)], replace the manual span and guard with a function attribute that sets the custom name:- #[tracing::instrument(level = "trace", skip_all)] + #[tracing::instrument(level = "info", name = "data_sync_service", skip_all)] async fn start(mut self) -> eyre::Result<()> { - let service_span = tracing::info_span!("data_sync_service"); tracing::info!("starting DataSync Service"); loop { - let _guard = service_span.enter(); tokio::select! {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/data_sync_service.rs` around lines 621 - 632, The code creates a tracing span via service_span = tracing::info_span!("data_sync_service") and holds a guard across the tokio::select! loop (entering the span before await points), which should be removed; delete the manual span and the _guard = service_span.enter() usage around the select, and instead annotate the async function (data_sync_service) with a function-level tracing attribute that sets the span name (e.g., #[tracing::instrument(name = "data_sync_service", level = "trace", skip_all)]); leave the rest of the loop (interval, peer_events_rx, tokio::select!) untouched so tracing uses the function-level instrumented span.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/chunk_ingress_service/chunk_data_writer.rs`:
- Line 146: The calls that currently discard the Result from write_batch (the
lines using let _ = self.write_batch(...)) should surface failures instead of
swallowing them; update those call sites to either propagate the Result
(returning or mapping Err(QueueError::WriteFailed) up the call chain from the
caller) or at minimum inspect the Result and log errors with error! including
context (e.g., batch_size and any identifying metadata) so failures are
observable; locate the failing call sites by the pattern let _ =
self.write_batch(...) and modify them to handle the Result rather than ignoring
it.
In `@crates/actors/src/chunk_migration_service.rs`:
- Around line 502-507: The manual span guard created by service_span and its
enter() (_guard) is being held across an await in the tokio::select! loop
(symbols: service_span, _guard, tracing::info_span!("chunk_migration_service")),
which violates tracing async semantics; remove the manual span and guard and
instead annotate the function with #[tracing::instrument(name =
"chunk_migration_service", level = "trace", skip_all, err)] (or adjust the
existing #[instrument] on the enclosing function) so the span is applied per
await-aware invocation and do not call service_span.enter() around the loop or
across await points.
In `@crates/database/src/data_ledger.rs`:
- Around line 81-82: The tracing span for get_expired_slot_indexes currently
uses skip_all and declares fields(epoch_height) which creates an empty field;
update the attribute on pub fn get_expired_slot_indexes(&self, epoch_height:
u64) to explicitly record the argument value by changing the span fields to
fields(epoch_height = %epoch_height) so the epoch_height is captured in the
trace.
In `@crates/p2p/src/chain_sync.rs`:
- Around line 347-360: The running-flag reset is only done on normal completion
and will remain true if the spawned task panics; create a small RAII guard type
(e.g., WhitelistRunningGuard) whose Drop impl sets
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed) and
instantiate it at the start of the async block that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is
cleared even if the task panics; you can then remove or keep the explicit
store(false) call at the end, but ensure the guard variable lives for the whole
task scope so Drop runs on unwind.
---
Outside diff comments:
In `@crates/actors/src/data_sync_service.rs`:
- Around line 621-632: The code creates a tracing span via service_span =
tracing::info_span!("data_sync_service") and holds a guard across the
tokio::select! loop (entering the span before await points), which should be
removed; delete the manual span and the _guard = service_span.enter() usage
around the select, and instead annotate the async function (data_sync_service)
with a function-level tracing attribute that sets the span name (e.g.,
#[tracing::instrument(name = "data_sync_service", level = "trace", skip_all)]);
leave the rest of the loop (interval, peer_events_rx, tokio::select!) untouched
so tracing uses the function-level instrumented span.
ℹ️ Review info
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (16)
crates/actors/src/chunk_ingress_service/chunk_data_writer.rscrates/actors/src/chunk_migration_service.rscrates/actors/src/data_sync_service.rscrates/actors/src/mempool_service.rscrates/actors/src/partition_mining_service.rscrates/chain/src/chain.rscrates/database/src/data_ledger.rscrates/database/src/database.rscrates/domain/src/snapshots/epoch_snapshot/epoch_snapshot.rscrates/p2p/src/chain_sync.rscrates/p2p/src/gossip_client.rscrates/p2p/src/peer_network_service.rscrates/vdf/src/lib.rscrates/vdf/src/state.rscrates/vdf/src/vdf.rsdocker/observation/docker-compose.yaml
| tokio::spawn( | ||
| async move { | ||
| match handler.pull_and_process_stake_and_pledge_whitelist().await { | ||
| Ok(()) => { | ||
| info!("Successfully updated stake and pledge whitelist",); | ||
| } | ||
| Err(e) => { | ||
| error!("Failed to update stake and pledge whitelist: {:?}", e); | ||
| } | ||
| } | ||
| stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed); | ||
| } | ||
| stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed); | ||
| }); | ||
| .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")), | ||
| ); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check the current code at lines 347-360 and search for panic-safe mechanisms
rg -n -C3 'spawn_stake_and_pledge_update_task|stake_and_pledge_whitelist_running_flag|store\(false, Ordering::Relaxed\)' crates/p2p/src/chain_sync.rs
echo "---"
rg -n -C2 'impl Drop|scopeguard|defer!' crates/p2p/src/chain_sync.rs
echo "---"
# Also check the exact lines 347-360
sed -n '347,360p' crates/p2p/src/chain_sync.rsRepository: Irys-xyz/irys
Length of output: 3334
Protect whitelist-task running flag against panic paths.
stake_and_pledge_whitelist_running_flag is reset only on normal completion at line 357. If the spawned task panics during pull_and_process_stake_and_pledge_whitelist(), the flag is never reset, leaving it stuck at true and permanently blocking future whitelist update attempts via the guard check at lines 338–342.
Use an RAII guard with Drop impl to ensure the flag is reset even on panic:
Proposed fix
tokio::spawn(
async move {
+ struct ResetWhitelistFlagOnDrop(Arc<AtomicBool>);
+ impl Drop for ResetWhitelistFlagOnDrop {
+ fn drop(&mut self) {
+ self.0.store(false, Ordering::Relaxed);
+ }
+ }
+
+ let _reset_flag =
+ ResetWhitelistFlagOnDrop(stake_and_pledge_whitelist_running_flag.clone());
+
match handler.pull_and_process_stake_and_pledge_whitelist().await {
Ok(()) => {
info!("Successfully updated stake and pledge whitelist",);
}
Err(e) => {
error!("Failed to update stake and pledge whitelist: {:?}", e);
}
}
- stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
}
.instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tokio::spawn( | |
| async move { | |
| match handler.pull_and_process_stake_and_pledge_whitelist().await { | |
| Ok(()) => { | |
| info!("Successfully updated stake and pledge whitelist",); | |
| } | |
| Err(e) => { | |
| error!("Failed to update stake and pledge whitelist: {:?}", e); | |
| } | |
| } | |
| stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed); | |
| } | |
| stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed); | |
| }); | |
| .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")), | |
| ); | |
| tokio::spawn( | |
| async move { | |
| struct ResetWhitelistFlagOnDrop(Arc<AtomicBool>); | |
| impl Drop for ResetWhitelistFlagOnDrop { | |
| fn drop(&mut self) { | |
| self.0.store(false, Ordering::Relaxed); | |
| } | |
| } | |
| let _reset_flag = | |
| ResetWhitelistFlagOnDrop(stake_and_pledge_whitelist_running_flag.clone()); | |
| match handler.pull_and_process_stake_and_pledge_whitelist().await { | |
| Ok(()) => { | |
| info!("Successfully updated stake and pledge whitelist",); | |
| } | |
| Err(e) => { | |
| error!("Failed to update stake and pledge whitelist: {:?}", e); | |
| } | |
| } | |
| } | |
| .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")), | |
| ); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/p2p/src/chain_sync.rs` around lines 347 - 360, The running-flag reset
is only done on normal completion and will remain true if the spawned task
panics; create a small RAII guard type (e.g., WhitelistRunningGuard) whose Drop
impl sets stake_and_pledge_whitelist_running_flag.store(false,
Ordering::Relaxed) and instantiate it at the start of the async block that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is
cleared even if the task panics; you can then remove or keep the explicit
store(false) call at the end, but ensure the guard variable lives for the whole
task scope so Drop runs on unwind.
There was a problem hiding this comment.
^, if we don't panic if this task panics, we'll need to do something like the above
There was a problem hiding this comment.
@JesseTheRobot, you're absolutely right! Since you're opting for graceful panic handling here (rather than letting the task panic propagate), the RAII guard pattern is the correct solution.
This differs from the fire-and-forget mempool ingestion tasks (where panics are acceptable because re-ingestion happens during block validation). For the whitelist update task, the stake_and_pledge_whitelist_running_flag must be reset on panic; otherwise, the guard check at lines 338-342 will permanently block all future whitelist updates.
The suggested ResetWhitelistFlagOnDrop guard ensures the flag is cleared even on unwind, preventing the deadlock scenario. Feel free to proceed with that approach! 🐰
🧠 Learnings used
Learnt from: antouhou
Repo: Irys-xyz/irys PR: 1097
File: crates/p2p/src/block_pool.rs:834-867
Timestamp: 2026-02-11T21:49:10.701Z
Learning: In crates/p2p/src/block_pool.rs, the spawned background task for mempool ingestion (handle_commitment_transaction_ingress_gossip and handle_data_transaction_ingress_gossip) intentionally uses a fire-and-forget pattern without panic handling because failures at this stage don't matter—block validation will re-ingest the transactions anyway.
Learnt from: antouhou
Repo: Irys-xyz/irys PR: 1057
File: crates/p2p/src/gossip_client.rs:143-170
Timestamp: 2026-01-05T16:21:34.820Z
Learning: In crates/p2p/src/gossip_client.rs, methods like pull_block_body_from_v1_peer intentionally convert all errors (Err(_)) to Ok(GossipResponse::Accepted(None)) because these methods must never return errors by design. This protocol will be replaced with direct socket communication later, so the current approach of treating network failures uniformly as "data not found" is acceptable.
Learnt from: roberts-pumpurs
Repo: Irys-xyz/irys PR: 1118
File: crates/actors/src/mempool_service/ingress_proofs.rs:286-299
Timestamp: 2026-02-02T14:41:28.988Z
Learning: In ingress proof handling (e.g., `crates/actors/src/mempool_service/ingress_proofs.rs`), the staking checks are intentionally asymmetric: proof ingestion (`handle_ingest_ingress_proof`) accepts proofs from nodes staked in either `epoch_snapshot` or `commitment_snapshot` to minimize fork issues, but proof generation (`generate_and_store_ingress_proof`, `reanchor_and_store_ingress_proof`) only checks `epoch_snapshot` to ensure nodes generate proofs only when definitively staked in the current epoch.
Learnt from: glottologist
Repo: Irys-xyz/irys PR: 1090
File: crates/actors/src/supply_state_calculator.rs:121-130
Timestamp: 2026-01-20T13:27:23.880Z
Learning: In the supply state backfill logic (crates/actors/src/supply_state_calculator.rs), if the backfill process encounters failures such as batch count mismatches, the code should panic rather than return an error, as these represent critical invariant violations.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
crates/database/src/data_ledger.rs (1)
131-140: 🧹 Nitpick | 🔵 TrivialConsider instrumenting
expire_old_slotswith its own span.
expire_old_slotsis the public mutation entry-point that calls the now-instrumentedget_expired_slot_indexes, but it carries no span of its own. Any caller that invokesexpire_old_slotsdirectly will have an unattributed gap in the trace tree.💡 Suggested addition
+ #[tracing::instrument(level = "debug", skip_all, fields(epoch_height = %epoch_height))] pub fn expire_old_slots(&mut self, epoch_height: u64) -> Vec<usize> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/database/src/data_ledger.rs` around lines 131 - 140, Add a tracing span to the public mutation entry-point expire_old_slots so callers are attributed in traces; create a span (or add #[tracing::instrument] on pub fn expire_old_slots) that records epoch_height (and optionally the number of expired slots) and enter it before calling get_expired_slot_indexes and mutating self.slots, so the trace tree has no unattributed gap when this method is invoked directly.crates/actors/src/block_validation.rs (1)
1140-1148:⚠️ Potential issue | 🟡 MinorSpurious blank line between
#[tracing::instrument]andpub fn poa_is_validwill be removed bycargo fmt.Line 1147 is an empty line sitting between the closing
), err)]of the attribute and thepub fndeclaration.cargo fmtcollapses blank lines between outer attributes and their associated item, so this will be reformatted on the nextcargo fmt --allrun.🖌️ Proposed fix
#[tracing::instrument(level = "debug", skip_all, fields( block.miner_address = ?miner_address, poa.chunk_offset = ?poa.partition_chunk_offset, poa.partition_hash = ?poa.partition_hash, config.entropy_packing_iterations = ?config.entropy_packing_iterations, config.chunk_size = ?config.chunk_size ), err)] - pub fn poa_is_valid(As per coding guidelines: "Run
cargo fmt --allbefore pushing or creating PRs".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/block_validation.rs` around lines 1140 - 1148, Remove the spurious blank line between the #[tracing::instrument(..., err)] attribute and the function declaration so the attribute attaches directly to pub fn poa_is_valid; i.e., edit the block containing the tracing::instrument attribute (which references fields like miner_address, poa.partition_chunk_offset, poa.partition_hash, config.entropy_packing_iterations, config.chunk_size) to eliminate the empty line before the pub fn poa_is_valid declaration and then run cargo fmt --all to ensure formatting is stable.crates/actors/src/data_sync_service.rs (1)
213-221: 🧹 Nitpick | 🔵 TrivialRemove dead commented-out code.
The
elsebranch ofoptimize_peer_concurrencycontains only a commented-outdebug!block. Clippy'sclippy::collapsible_else_if/ dead-code lints may not flag this, but it's noise and violates the spirit of runningcargo clippyclean before pushing.🧹 Proposed cleanup
- } else { - // debug!( - // "Not increasing concurrency for peer {} max_concurrency {} (concurrent utilization: {:.1}%, health: {:.2})", - // peer_addr, - // current_max, - // utilization_ratio * 100.0, - // health_score - // ); - }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/data_sync_service.rs` around lines 213 - 221, The else branch inside the optimize_peer_concurrency function contains only a dead, commented-out debug! block (referencing peer_addr, current_max, utilization_ratio, health_score); remove that commented code and the empty else branch entirely (or collapse the if/else into a single if) so the function contains no noisy commented-out debug lines.crates/actors/src/mempool_service.rs (1)
3174-3178: 🧹 Nitpick | 🔵 Trivial
channel_nameis invisible in the generated span due toskip_all.
skip_allsuppresses all arguments, so the"mempool_shutdown"/ reorg span won't carry achannelfield. The name is only visible inside thetracing::debug!/warn!log messages emitted within the function body, which are harder to query in a trace viewer.✨ Optional — add a `fields` clause to retain the channel name in the span
-#[tracing::instrument(level = "trace", skip_all)] +#[tracing::instrument(level = "trace", skip(result), fields(channel = channel_name))] pub fn handle_broadcast_recv<T>( result: Result<T, broadcast::error::RecvError>, channel_name: &str, ) -> Option<T> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/mempool_service.rs` around lines 3174 - 3178, The span currently hides the channel_name because #[tracing::instrument(..., skip_all)] suppresses all args; change the attribute on handle_broadcast_recv to include the channel as a span field and only skip non-serializable args: replace skip_all with skip(result) (or skip the specific non-serializable param) and add fields(channel = %channel_name) so the span carries a channel field while still skipping the receive result in the span.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/data_sync_service.rs`:
- Line 619: The tracing span on the data_sync_service start function is missing
error capture; update the tracing::instrument attribute on the start function
(the one annotated with name = "data_sync_service_start") to include err so
errors returned from start (eyre::Result<()>) are recorded in the span—match the
pattern used in storage_module_service_start and chunk_migration_service_start
by adding err to the attribute.
---
Outside diff comments:
In `@crates/actors/src/block_validation.rs`:
- Around line 1140-1148: Remove the spurious blank line between the
#[tracing::instrument(..., err)] attribute and the function declaration so the
attribute attaches directly to pub fn poa_is_valid; i.e., edit the block
containing the tracing::instrument attribute (which references fields like
miner_address, poa.partition_chunk_offset, poa.partition_hash,
config.entropy_packing_iterations, config.chunk_size) to eliminate the empty
line before the pub fn poa_is_valid declaration and then run cargo fmt --all to
ensure formatting is stable.
In `@crates/actors/src/data_sync_service.rs`:
- Around line 213-221: The else branch inside the optimize_peer_concurrency
function contains only a dead, commented-out debug! block (referencing
peer_addr, current_max, utilization_ratio, health_score); remove that commented
code and the empty else branch entirely (or collapse the if/else into a single
if) so the function contains no noisy commented-out debug lines.
In `@crates/actors/src/mempool_service.rs`:
- Around line 3174-3178: The span currently hides the channel_name because
#[tracing::instrument(..., skip_all)] suppresses all args; change the attribute
on handle_broadcast_recv to include the channel as a span field and only skip
non-serializable args: replace skip_all with skip(result) (or skip the specific
non-serializable param) and add fields(channel = %channel_name) so the span
carries a channel field while still skipping the receive result in the span.
In `@crates/database/src/data_ledger.rs`:
- Around line 131-140: Add a tracing span to the public mutation entry-point
expire_old_slots so callers are attributed in traces; create a span (or add
#[tracing::instrument] on pub fn expire_old_slots) that records epoch_height
(and optionally the number of expired slots) and enter it before calling
get_expired_slot_indexes and mutating self.slots, so the trace tree has no
unattributed gap when this method is invoked directly.
ℹ️ Review info
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (14)
crates/actors/src/block_discovery.rscrates/actors/src/block_producer.rscrates/actors/src/block_tree_service.rscrates/actors/src/block_validation.rscrates/actors/src/cache_service.rscrates/actors/src/chunk_ingress_service.rscrates/actors/src/chunk_ingress_service/chunk_data_writer.rscrates/actors/src/chunk_migration_service.rscrates/actors/src/data_sync_service.rscrates/actors/src/mempool_service.rscrates/actors/src/storage_module_service.rscrates/actors/src/validation_service.rscrates/actors/src/validation_service/block_validation_task.rscrates/database/src/data_ledger.rs
| } | ||
|
|
||
| #[tracing::instrument(level = "trace", skip_all)] | ||
| #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n 'instrument\(.*_service_start' --type rust -A 1Repository: Irys-xyz/irys
Length of output: 1688
Add err to align error capture with other service spans in this PR.
The start function returns eyre::Result<()> and propagates errors via ? (e.g., from handle_message on Line 642). Verification shows inconsistency across the PR: storage_module_service_start and chunk_migration_service_start include err, while five other services (including this one) omit it. Adding err ensures that any error causing the service to exit is automatically recorded in the span, and aligns the observability approach across all service start spans.
Proposed change
- #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)]
+ #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all, err)]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)] | |
| #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all, err)] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/actors/src/data_sync_service.rs` at line 619, The tracing span on the
data_sync_service start function is missing error capture; update the
tracing::instrument attribute on the start function (the one annotated with name
= "data_sync_service_start") to include err so errors returned from start
(eyre::Result<()>) are recorded in the span—match the pattern used in
storage_module_service_start and chunk_migration_service_start by adding err to
the attribute.
There was a problem hiding this comment.
@JesseTheRobot, thanks for confirming! Adding err is the right call—it will automatically emit an ERROR-level event whenever the start function returns an Err, capturing the full error context in the span without additional manual logging. This aligns the observability approach with the other instrumented service start spans in this PR and ensures any early-exit errors (from handle_message? or tick?) are recorded.
Feel free to commit this change directly, or let me know if you'd like me to help implement it!
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 188-193: Validate and bound the configured concurrent-task value
before constructing the semaphore and setting max_concurrent_tasks: ensure the
configured max_concurrent_chunk_ingress_tasks is at least 1 and fits into u32
(or cap it to u32::MAX or return an error), compute a single bounded value
(e.g., bounded_permits) and use that for both Semaphore::new(bounded_permits)
and u32::try_from(bounded_permits) so you don't create a 0-permit semaphore or
call expect on an out-of-range conversion. Update the same pattern in the second
occurrence (the block around the other message_handler_semaphore /
max_concurrent_tasks assignment).
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/actors/src/mempool_service.rs (1)
3041-3041: 🧹 Nitpick | 🔵 TrivialInconsistent span strategy: propagate-current vs. named span.
chunk_ingress_service.rswraps its spawned task withtracing::info_span!("chunk_ingress_service"), giving the service a stable, named root span. Here,tracing::Span::current()is propagated instead — if the calling context has no active named span, the service's traces will be rootless and harder to filter.🔍 Suggested fix
- .instrument(tracing::Span::current()), + .instrument(tracing::info_span!("mempool_service")),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/mempool_service.rs` at line 3041, The spawned task in mempool_service uses .instrument(tracing::Span::current()), which propagates the caller's span and can leave the service without a stable root span; replace this with creating and using a named span like tracing::info_span!("mempool_service") and attach it to the spawned future (e.g., create let span = tracing::info_span!("mempool_service"); tokio::spawn(my_future.instrument(span))); update the .instrument(...) call in the mempool_service spawn site to use that named span so traces are consistently rooted and filterable (mirror chunk_ingress_service's approach).
♻️ Duplicate comments (2)
crates/actors/src/chunk_ingress_service.rs (2)
158-201: Clamping +max_concurrent_tasksinitialization correctly addresses the previous review concern.The prior comment flagged that a 0-configured value could silently push all ingress into timeout/drop behavior and that
u32::try_fromcould panic. Both are resolved: the lower bound is now 1, andu32::try_fromis guaranteed safe by the preceding clamp.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/chunk_ingress_service.rs` around lines 158 - 201, The clamp of mempool_config.max_concurrent_chunk_ingress_tasks to range 1..=u32::MAX and the subsequent use of u32::try_from when initializing max_concurrent_tasks resolves the prior issues; ensure the clamp logic (raw_max_concurrent.clamp(1, MAX_PERMITS)) is kept, the warning log for adjusted values remains, and the u32::try_from call on max_concurrent_chunk_ingress_tasks in ChunkIngressServiceInner initialization is preserved so it cannot panic.
307-331:⚠️ Potential issue | 🟡 MinorSame 10-second budget shared between
acquire_manyand drain — same concern asmempool_service.rs.
acquire_many(self.inner.max_concurrent_tasks)and the subsequenttry_recvloop share the single10stimeout window (line 326). Slow in-flight handlers can exhaust the budget before any queued message is drained. Consider the same two-phase split suggested inmempool_service.rs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/chunk_ingress_service.rs` around lines 307 - 331, The current tokio::time::timeout wraps the entire process_remaining closure so acquire_many(self.inner.max_concurrent_tasks) and the drain loop share the same 10s budget; split this into two phases: first await acquire_many with its own short timeout (so we don't block waiting for in-flight handlers indefinitely), then run the try_recv drain loop inside a separate timeout (or with its own bounded wait) to ensure queued messages get time to be processed; update the logic around the process_remaining closure, acquire_many, msg_rx.try_recv, wait_with_progress and handle_message to use two distinct timeouts and preserve the existing logs/behavior on Ok/Err.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Line 210: Replace the inline tokio::runtime::Handle::current() call passed to
.start(...) with the same explicit handle-cloning pattern used in
mempool_service: clone the existing runtime_handle into a local name (e.g.,
handle_for_inner) before spawning the task and pass that cloned handle to the
chunk ingress service's start(...) call so the code uses runtime_handle.clone()
-> handle_for_inner and .start(handle_for_inner) instead of
.start(tokio::runtime::Handle::current()).
In `@crates/actors/src/mempool_service.rs`:
- Around line 3148-3181: The shutdown drain currently uses one 10s timeout for
both acquiring all permits and draining the queue (process_remaining), which
lets slow in-flight handlers block the drain; split the phases so acquire_many
on message_handler_semaphore / self.inner.max_concurrent_tasks is awaited under
its own timeout (or allowed to complete without the 10s limit) and then run the
loop that calls self.msg_rx.try_recv() +
wait_with_progress(self.inner.handle_message(...)) under a separate
tokio::time::timeout(Duration::from_secs(10)); update logging to distinguish
failures when acquiring permits vs when draining the queue and ensure you still
call .instrument(span) and handle Err results from each phase (referencing
process_remaining, acquire_many, message_handler_semaphore,
max_concurrent_tasks, msg_rx, try_recv, wait_with_progress, and
tokio::time::timeout).
---
Outside diff comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3041: The spawned task in mempool_service uses
.instrument(tracing::Span::current()), which propagates the caller's span and
can leave the service without a stable root span; replace this with creating and
using a named span like tracing::info_span!("mempool_service") and attach it to
the spawned future (e.g., create let span =
tracing::info_span!("mempool_service");
tokio::spawn(my_future.instrument(span))); update the .instrument(...) call in
the mempool_service spawn site to use that named span so traces are consistently
rooted and filterable (mirror chunk_ingress_service's approach).
---
Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 158-201: The clamp of
mempool_config.max_concurrent_chunk_ingress_tasks to range 1..=u32::MAX and the
subsequent use of u32::try_from when initializing max_concurrent_tasks resolves
the prior issues; ensure the clamp logic (raw_max_concurrent.clamp(1,
MAX_PERMITS)) is kept, the warning log for adjusted values remains, and the
u32::try_from call on max_concurrent_chunk_ingress_tasks in
ChunkIngressServiceInner initialization is preserved so it cannot panic.
- Around line 307-331: The current tokio::time::timeout wraps the entire
process_remaining closure so acquire_many(self.inner.max_concurrent_tasks) and
the drain loop share the same 10s budget; split this into two phases: first
await acquire_many with its own short timeout (so we don't block waiting for
in-flight handlers indefinitely), then run the try_recv drain loop inside a
separate timeout (or with its own bounded wait) to ensure queued messages get
time to be processed; update the logic around the process_remaining closure,
acquire_many, msg_rx.try_recv, wait_with_progress and handle_message to use two
distinct timeouts and preserve the existing logs/behavior on Ok/Err.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3211: The traced span on handle_broadcast_recv at
#[tracing::instrument(level = "trace", skip_all)] is low-value noise; either
remove the attribute entirely or change it to a less-chatty level (e.g., level =
"debug") and add meaningful fields to the span if you need context (for example
request ids or peer identifiers) so the outer span provides useful data; update
the attribute on the handle_broadcast_recv function accordingly.
| ) | ||
| .inspect_err(|err| tracing::error!( | ||
| { | ||
| let poa_span = tracing::info_span!( |
There was a problem hiding this comment.
did the previous instrument call not work?
There was a problem hiding this comment.
spurious, fixed
| /// Submits the EVM payload to reth for execution layer validation. | ||
| /// This should only be called after all consensus layer validations have passed. | ||
| #[tracing::instrument(level = "trace", skip_all, err, fields( | ||
| #[tracing::instrument(level = "debug", skip_all, err, fields( |
There was a problem hiding this comment.
why is the level changing from trace to debug for just these spans?
There was a problem hiding this comment.
this is the errant child, will fix
| let max_concurrent_mempool_tasks = mempool_config.max_concurrent_mempool_tasks; | ||
| let raw_max_concurrent = mempool_config.max_concurrent_mempool_tasks; | ||
| const MAX_PERMITS: usize = u32::MAX as usize; | ||
| let max_concurrent_mempool_tasks = raw_max_concurrent.clamp(1, MAX_PERMITS); |
There was a problem hiding this comment.
the minimum should probably be closer to 100 than 1 - maybe 10? 20?
| // Phase 2: drain queued messages with its own budget | ||
| let drain_fut = async { | ||
| while let Ok(traced) = self.msg_rx.try_recv() { | ||
| let (msg, parent_span) = traced.into_parts(); | ||
| let msg_type = msg.variant_name(); | ||
| let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type); | ||
| let task_info = format!("shutdown drain: {}", msg_type); | ||
| if let Err(e) = wait_with_progress( | ||
| self.inner.handle_message(msg), | ||
| 20, | ||
| &task_info, | ||
| ) | ||
| .instrument(span) | ||
| .await | ||
| { | ||
| tracing::error!("Error handling message during shutdown drain: {:?}", e); | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
I would be tempted to flip these: wait until the rx queue is empty, then try to grab all the permits - this was shutdown message processing isn't one-by-one
JesseTheRobot
left a comment
There was a problem hiding this comment.
LGTM, just some nonblocking feedback
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/actors/src/block_validation.rs (2)
1734-1741:⚠️ Potential issue | 🟠 MajorAvoid serializing full
vdf_infoin invalid-seed diagnostics.This currently logs and stores the entire
vdf_infodebug payload, which can be large and amplify memory/log volume on invalid-block spam. Log and return only the minimal seed fields needed for triage.Proposed fix
- error!( - "Seed data is invalid. Expected: {:?}, got: {:?}", - expected_seed_data, vdf_info - ); - ValidationResult::Invalid(ValidationError::SeedDataInvalid(format!( - "Expected: {:?}, got: {:?}", - expected_seed_data, vdf_info - ))) + error!( + expected_next_seed = ?expected_seed_data.0, + expected_seed = ?expected_seed_data.1, + got_next_seed = ?vdf_info.next_seed, + got_seed = ?vdf_info.seed, + got_global_step_number = vdf_info.global_step_number, + "Seed data is invalid" + ); + ValidationResult::Invalid(ValidationError::SeedDataInvalid(format!( + "seed mismatch: expected_next_seed={:?}, expected_seed={:?}, got_next_seed={:?}, got_seed={:?}, step={}", + expected_seed_data.0, + expected_seed_data.1, + vdf_info.next_seed, + vdf_info.seed, + vdf_info.global_step_number + )))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/block_validation.rs` around lines 1734 - 1741, The code currently logs and returns the entire vdf_info via "{:?}" in the error! and ValidationError::SeedDataInvalid call; replace that with a compact diagnostic composed only of the minimal seed fields needed for triage (for example extract vdf_info.seed and vdf_info.seed_height or equivalent minimal identifiers), build a small string like "expected: X, got: seed=<...>, height=<...>" and use that string in both error! and the ValidationError::SeedDataInvalid message instead of printing the full vdf_info; update the code around expected_seed_data, vdf_info, and the ValidationError::SeedDataInvalid construction to reference those extracted fields.
1727-1728: 🧹 Nitpick | 🔵 TrivialTrack the consensus TODO explicitly.
This TODO marks a known validation gap in a consensus-critical path. Please link an issue (or add one) so it’s not lost.
If helpful, I can draft an issue with acceptance criteria and test cases for the missing difficulty-related seed validation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/block_validation.rs` around lines 1727 - 1728, The TODO in block_validation.rs notes a consensus-critical missing difficulty validation but has no tracking; create a formal issue in the repo (include acceptance criteria and tests for seed/difficulty validation) and then update the TODO comment in block_validation.rs to include the issue tracker reference (e.g. "CONSENSUS-TODO: see issue `#123`" or URL) and a short actionable note pointing to the code area to be fixed; also add the issue ID to any consensus-related tracking document (e.g. issues.md or CHANGELOG) so the gap is discoverable and not lost.crates/actors/src/chunk_ingress_service.rs (1)
314-347:⚠️ Potential issue | 🟠 MajorShutdown continues into drain/flush even when handler quiescence fails.
At Line 314, timeout on
acquire_many(...)means handlers may still be running, but the code still drains and flushes. That can race with in-flight writes and produce a non-deterministic shutdown state.🛠️ Proposed guard: only run final drain/flush after successful permit acquisition
- let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await { - Ok(Ok(p)) => Some(p), - Ok(Err(_)) => { - error!("Semaphore closed during chunk ingress shutdown drain"); - None - } - Err(_) => { - warn!("Timed out waiting for in-flight chunk ingress handlers; proceeding without full drain"); - None - } - }; + let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await { + Ok(Ok(p)) => p, + Ok(Err(_)) => { + error!("Semaphore closed during chunk ingress shutdown drain; skipping drain/flush"); + info!("ChunkIngressService shut down"); + return Ok(()); + } + Err(_) => { + warn!("Timed out waiting for in-flight chunk ingress handlers; skipping drain/flush"); + info!("ChunkIngressService shut down"); + return Ok(()); + } + };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/chunk_ingress_service.rs` around lines 314 - 347, The shutdown currently proceeds to drain queued messages and flush the chunk writer even if acquiring the shutdown permits failed; change the logic so that the Phase 2 drain (loop using self.msg_rx.try_recv(), wait_with_progress(self.inner.handle_message(...)), and the final self.inner.chunk_data_writer.flush().await) only run when the permit acquisition succeeded (i.e., when the match produced Some(p) for the acquire_fut result). Use the existing _all_permits result (rename to all_permits or check .is_some()) to gate running the drain_fut and flush, and log/skipping the drain+flush path when permits were not acquired to avoid racing with in-flight handlers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/mempool_service.rs`:
- Around line 3188-3213: The shutdown currently persists the mempool regardless
of whether acquire_many succeeded, risking inconsistent snapshots if handlers
timed out; modify the logic so persist_mempool_to_disk()
(self.inner.persist_mempool_to_disk()) is only called when the acquire_many
branch succeeded (the Ok(Ok(p)) case / when _all_permits is Some), e.g., move
the tokio::time::timeout(... self.inner.persist_mempool_to_disk() ...) call into
the successful branch or add an explicit check that _all_permits.is_some()
before attempting the 10s timeout persist; keep the existing timeout and error
handling for the persist call unchanged.
---
Outside diff comments:
In `@crates/actors/src/block_validation.rs`:
- Around line 1734-1741: The code currently logs and returns the entire vdf_info
via "{:?}" in the error! and ValidationError::SeedDataInvalid call; replace that
with a compact diagnostic composed only of the minimal seed fields needed for
triage (for example extract vdf_info.seed and vdf_info.seed_height or equivalent
minimal identifiers), build a small string like "expected: X, got: seed=<...>,
height=<...>" and use that string in both error! and the
ValidationError::SeedDataInvalid message instead of printing the full vdf_info;
update the code around expected_seed_data, vdf_info, and the
ValidationError::SeedDataInvalid construction to reference those extracted
fields.
- Around line 1727-1728: The TODO in block_validation.rs notes a
consensus-critical missing difficulty validation but has no tracking; create a
formal issue in the repo (include acceptance criteria and tests for
seed/difficulty validation) and then update the TODO comment in
block_validation.rs to include the issue tracker reference (e.g.
"CONSENSUS-TODO: see issue `#123`" or URL) and a short actionable note pointing to
the code area to be fixed; also add the issue ID to any consensus-related
tracking document (e.g. issues.md or CHANGELOG) so the gap is discoverable and
not lost.
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 314-347: The shutdown currently proceeds to drain queued messages
and flush the chunk writer even if acquiring the shutdown permits failed; change
the logic so that the Phase 2 drain (loop using self.msg_rx.try_recv(),
wait_with_progress(self.inner.handle_message(...)), and the final
self.inner.chunk_data_writer.flush().await) only run when the permit acquisition
succeeded (i.e., when the match produced Some(p) for the acquire_fut result).
Use the existing _all_permits result (rename to all_permits or check .is_some())
to gate running the drain_fut and flush, and log/skipping the drain+flush path
when permits were not acquired to avoid racing with in-flight handlers.
ℹ️ Review info
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
crates/actors/src/block_validation.rscrates/actors/src/chunk_ingress_service.rscrates/actors/src/mempool_service.rscrates/actors/src/validation_service/block_validation_task.rsdocker/observation/configs/elasticsearch/index-settings.jsondocker/observation/configs/elasticsearch/index-template.jsondocker/observation/scripts/setup_elasticsearch_ilm.sh
There was a problem hiding this comment.
♻️ Duplicate comments (1)
crates/actors/src/mempool_service.rs (1)
3226-3226: 🧹 Nitpick | 🔵 TrivialTrace-level instrument on hot utility function — previously flagged as low-value.
This function is called on every reorg broadcast tick and already logs internally. The outer span with
skip_alladds minimal observability value. Consider removing the attribute or adding meaningful fields if context is needed.
,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/mempool_service.rs` at line 3226, The #[tracing::instrument(level = "trace", skip_all)] attribute on the hot utility function (the function annotated with #[tracing::instrument(level = "trace", skip_all)] that runs on every reorg broadcast tick) should be removed to avoid cheap, low-value spans; either delete the attribute entirely or replace it with a lighter-weight/meaningful span (e.g., remove skip_all and add specific fields like reorg_id or tick_count) so that only useful context is recorded and you don't instrument this hot path unnecessarily.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3226: The #[tracing::instrument(level = "trace", skip_all)] attribute on
the hot utility function (the function annotated with
#[tracing::instrument(level = "trace", skip_all)] that runs on every reorg
broadcast tick) should be removed to avoid cheap, low-value spans; either delete
the attribute entirely or replace it with a lighter-weight/meaningful span
(e.g., remove skip_all and add specific fields like reorg_id or tick_count) so
that only useful context is recorded and you don't instrument this hot path
unnecessarily.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/database/src/database.rs (1)
267-271: 🧹 Nitpick | 🔵 TrivialLGTM - consider adding span fields for better traceability.
Trace level is appropriate for high-frequency chunk caching. Optionally, capturing
data_rootin the span fields would improve trace correlation, similar to howinsert_block_headercapturesblock_hash:💡 Optional: Add data_root field to span
-#[tracing::instrument(level = "trace", skip_all)] +#[tracing::instrument(level = "trace", skip_all, fields(data_root = ?chunk.data_root))] pub fn cache_chunk_verified<T: DbTx + DbTxMut>( tx: &T, chunk: &UnpackedChunk, ) -> eyre::Result<IsDuplicate> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/database/src/database.rs` around lines 267 - 271, The tracing span for cache_chunk_verified should include the chunk's data_root for better traceability; update the #[tracing::instrument(...)] on the cache_chunk_verified function to add a span field that records the chunk's data_root (i.e., capture chunk.data_root or UnpackedChunk's data_root property) while keeping the level="trace" and skip_all behavior, mirroring how insert_block_header includes block_hash in its span fields.
♻️ Duplicate comments (2)
crates/p2p/src/chain_sync.rs (1)
357-370:⚠️ Potential issue | 🟠 MajorEnsure whitelist running-flag is cleared even on panic.
At Line 367, the flag reset runs only on normal completion. If the spawned task panics,
is_update_whitelist_task_runningcan staytrueand permanently suppress future updates.Proposed fix
tokio::spawn( async move { + struct WhitelistRunningGuard(Arc<AtomicBool>); + impl Drop for WhitelistRunningGuard { + fn drop(&mut self) { + self.0.store(false, Ordering::Relaxed); + } + } + + let _guard = WhitelistRunningGuard(stake_and_pledge_whitelist_running_flag.clone()); + match handler.pull_and_process_stake_and_pledge_whitelist().await { Ok(()) => { info!("Successfully updated stake and pledge whitelist",); } Err(e) => { error!("Failed to update stake and pledge whitelist: {:?}", e); } } - stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed); } .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")), );#!/bin/bash # Verify flag reset paths and whether a Drop-based guard exists. rg -n -C4 'spawn_stake_and_pledge_update_task|is_update_whitelist_task_running|store\(false, Ordering::Relaxed\)|impl Drop' crates/p2p/src/chain_sync.rs🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/p2p/src/chain_sync.rs` around lines 357 - 370, The spawned task must always clear stake_and_pledge_whitelist_running_flag even if it panics; wrap the task body with a panic-safe cleanup (e.g., create a small ResetFlagGuard that stores false in its Drop impl, or call std::panic::catch_unwind around the async body) and instantiate it at the start of the async closure that calls handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is set to false in all exit paths; keep the tracing::info_span!("stake_and_pledge_whitelist_update") instrumentation around the spawned task.crates/actors/src/chunk_ingress_service.rs (1)
314-326:⚠️ Potential issue | 🟠 MajorProceeding after failed quiescence can race with shutdown flush.
If
acquire_many(...)times out or the semaphore is closed, shutdown still continues. That means in-flight handlers may still enqueue chunk writes while the service proceeds towardchunk_data_writer.flush(), which can produce incomplete final draining.🛠️ Suggested guard (only final-drain/flush when handlers quiesce)
- let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await { + let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await { Ok(Ok(p)) => Some(p), Ok(Err(_)) => { error!("Semaphore closed during chunk ingress shutdown drain"); None } Err(_) => { warn!( "Timed out waiting for in-flight chunk ingress handlers; proceeding without full drain" ); None } }; + if handlers_quiesced.is_none() { + warn!("Skipping drain/flush because handlers did not fully quiesce"); + info!("ChunkIngressService shut down"); + return Ok(()); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/chunk_ingress_service.rs` around lines 314 - 326, The shutdown currently proceeds to call chunk_data_writer.flush() even if acquire_many (the acquire_fut assigned to _all_permits) timed out or the semaphore closed, which risks missing in-flight enqueues; change the shutdown logic so it only performs the final drain/flush when acquire_many succeeded (i.e., _all_permits is Some(p)). Specifically, inspect the match result from the tokio::time::timeout block (the variable created from acquire_fut / acquire_many) and if it returned Ok(Ok(p)) proceed with handing p to chunk_data_writer.flush(); otherwise log and skip or retry waiting rather than flushing immediately. Ensure references to acquire_many/acquire_fut/_all_permits and chunk_data_writer.flush() are used so the guard is clearly enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 340-344: The timeout branch of the tokio::time::timeout match
abandons remaining queued messages, causing callers to receive generic
closed-channel errors; update the Err(_) arm (the timeout path around drain_fut)
to explicitly notify queued request/response items with a typed
overload/shutdown error by reusing the existing timeout notifier helper (use the
same notifier used elsewhere in this module to signal timeouts) or by iterating
the remaining items in the chunk ingress queue and sending a proper error via
their responder channels; reference the drain_fut match and the request/response
enum variants in chunk_ingress_service.rs and ensure each pending message uses
the existing notifier API to deliver the explicit error before returning.
In `@crates/actors/src/mempool_service.rs`:
- Around line 3159-3188: The loop currently pops a message from msg_rx
(traced.into_parts()) before attempting semaphore.acquire_owned() with timeout,
so when acquire times out the popped message is dropped and remaining messages
aren't drained; fix by acquiring the permit (using
inner.message_handler_semaphore.acquire_owned() with the same timeout) before
removing a message from msg_rx, then after successfully obtaining the permit
call msg_rx.try_recv() (or try_recv in a small loop) to get the next message and
spawn the task that calls inner.handle_message wrapped by wait_with_progress;
keep the tracing span creation and use of runtime_handle.spawn, and handle Err
from acquire_owned (semaphore closed) and timeout without discarding messages so
no queued messages are lost during shutdown drain.
---
Outside diff comments:
In `@crates/database/src/database.rs`:
- Around line 267-271: The tracing span for cache_chunk_verified should include
the chunk's data_root for better traceability; update the
#[tracing::instrument(...)] on the cache_chunk_verified function to add a span
field that records the chunk's data_root (i.e., capture chunk.data_root or
UnpackedChunk's data_root property) while keeping the level="trace" and skip_all
behavior, mirroring how insert_block_header includes block_hash in its span
fields.
---
Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 314-326: The shutdown currently proceeds to call
chunk_data_writer.flush() even if acquire_many (the acquire_fut assigned to
_all_permits) timed out or the semaphore closed, which risks missing in-flight
enqueues; change the shutdown logic so it only performs the final drain/flush
when acquire_many succeeded (i.e., _all_permits is Some(p)). Specifically,
inspect the match result from the tokio::time::timeout block (the variable
created from acquire_fut / acquire_many) and if it returned Ok(Ok(p)) proceed
with handing p to chunk_data_writer.flush(); otherwise log and skip or retry
waiting rather than flushing immediately. Ensure references to
acquire_many/acquire_fut/_all_permits and chunk_data_writer.flush() are used so
the guard is clearly enforced.
In `@crates/p2p/src/chain_sync.rs`:
- Around line 357-370: The spawned task must always clear
stake_and_pledge_whitelist_running_flag even if it panics; wrap the task body
with a panic-safe cleanup (e.g., create a small ResetFlagGuard that stores false
in its Drop impl, or call std::panic::catch_unwind around the async body) and
instantiate it at the start of the async closure that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is set
to false in all exit paths; keep the
tracing::info_span!("stake_and_pledge_whitelist_update") instrumentation around
the spawned task.
ℹ️ Review info
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (22)
crates/actors/src/block_discovery.rscrates/actors/src/block_producer.rscrates/actors/src/block_tree_service.rscrates/actors/src/block_validation.rscrates/actors/src/cache_service.rscrates/actors/src/chunk_ingress_service.rscrates/actors/src/chunk_ingress_service/chunk_data_writer.rscrates/actors/src/chunk_migration_service.rscrates/actors/src/data_sync_service.rscrates/actors/src/mempool_service.rscrates/actors/src/partition_mining_service.rscrates/actors/src/storage_module_service.rscrates/actors/src/validation_service.rscrates/actors/src/validation_service/block_validation_task.rscrates/chain/src/chain.rscrates/database/src/data_ledger.rscrates/database/src/database.rscrates/domain/src/snapshots/epoch_snapshot/epoch_snapshot.rscrates/p2p/src/chain_sync.rscrates/p2p/src/gossip_client.rscrates/vdf/src/state.rscrates/vdf/src/vdf.rs
| match tokio::time::timeout(Duration::from_secs(10), drain_fut).await { | ||
| Ok(()) => tracing::debug!("Processed remaining chunk ingress messages successfully"), | ||
| Err(_) => tracing::warn!( | ||
| "Timeout processing remaining chunk ingress messages, continuing shutdown" | ||
| ), | ||
| Err(_) => { | ||
| warn!("Timeout draining remaining chunk ingress messages, continuing shutdown") | ||
| } |
There was a problem hiding this comment.
Timeout path drops remaining queued requests without explicit caller error.
When shutdown drain times out, remaining queued messages are abandoned. For request/response variants, callers get a generic closed-channel failure instead of a typed overload/shutdown error.
🛠️ Suggested follow-up (reuse existing timeout notifier)
match tokio::time::timeout(Duration::from_secs(10), drain_fut).await {
Ok(()) => tracing::debug!("Processed remaining chunk ingress messages successfully"),
Err(_) => {
warn!("Timeout draining remaining chunk ingress messages, continuing shutdown")
+ while let Ok(traced) = self.msg_rx.try_recv() {
+ let (msg, _) = traced.into_parts();
+ Self::send_timeout_errors(msg);
+ }
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/actors/src/chunk_ingress_service.rs` around lines 340 - 344, The
timeout branch of the tokio::time::timeout match abandons remaining queued
messages, causing callers to receive generic closed-channel errors; update the
Err(_) arm (the timeout path around drain_fut) to explicitly notify queued
request/response items with a typed overload/shutdown error by reusing the
existing timeout notifier helper (use the same notifier used elsewhere in this
module to signal timeouts) or by iterating the remaining items in the chunk
ingress queue and sending a proper error via their responder channels; reference
the drain_fut match and the request/response enum variants in
chunk_ingress_service.rs and ensure each pending message uses the existing
notifier API to deliver the explicit error before returning.
| // Phase 1: drain queued messages concurrently using semaphore permits | ||
| while let Ok(traced) = self.msg_rx.try_recv() { | ||
| let (msg, parent_span) = traced.into_parts(); | ||
| let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg.variant_name()); | ||
| self.inner.handle_message(msg).instrument(span).await?; | ||
| let msg_type = msg.variant_name(); | ||
| let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type); | ||
|
|
||
| let inner = Arc::clone(&self.inner); | ||
| let semaphore = inner.message_handler_semaphore.clone(); | ||
| match tokio::time::timeout(Duration::from_secs(10), semaphore.acquire_owned()).await { | ||
| Ok(Ok(permit)) => { | ||
| runtime_handle.spawn(async move { | ||
| let _permit = permit; | ||
| let task_info = format!("shutdown drain: {}", msg_type); | ||
| if let Err(e) = wait_with_progress( | ||
| inner.handle_message(msg), | ||
| 20, | ||
| &task_info, | ||
| ).await { | ||
| tracing::error!("Error handling message during shutdown drain: {:?}", e); | ||
| } | ||
| }.instrument(span)); | ||
| } | ||
| Ok(Err(_)) => { | ||
| tracing::error!("Semaphore closed during shutdown drain"); | ||
| break; | ||
| } | ||
| Err(_) => { | ||
| tracing::warn!("Timed out acquiring permit during shutdown drain, skipping remaining messages"); | ||
| break; | ||
| } |
There was a problem hiding this comment.
Shutdown drain can silently drop queued messages on permit-acquire timeout.
In this loop, a message is popped from msg_rx before permit acquisition. If acquire_owned() times out, the code breaks, dropping that popped message and leaving the rest undrained. This can lose final mempool operations during shutdown.
🛠️ Suggested direction (avoid pop-before-permit timeout loss)
- // Phase 1: drain queued messages concurrently using semaphore permits
- while let Ok(traced) = self.msg_rx.try_recv() {
- let (msg, parent_span) = traced.into_parts();
- ...
- match tokio::time::timeout(Duration::from_secs(10), semaphore.acquire_owned()).await {
- Ok(Ok(permit)) => { runtime_handle.spawn(...); }
- Ok(Err(_)) => { ...; break; }
- Err(_) => { ...; break; }
- }
- }
-
- // Phase 2: acquire all permits to wait for in-flight + drain-spawned handlers
+ // Phase 1: wait for all in-flight handlers first.
let acquire_fut = self
.inner
.message_handler_semaphore
.acquire_many(self.inner.max_concurrent_tasks);
- let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
+ let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
...
};
- if handlers_quiesced {
+ // Phase 2: once quiesced, drain synchronously so no popped message is dropped on permit timeouts.
+ if handlers_quiesced {
+ while let Ok(traced) = self.msg_rx.try_recv() {
+ let (msg, parent_span) = traced.into_parts();
+ let msg_type = msg.variant_name();
+ let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type);
+ let task_info = format!("shutdown drain: {}", msg_type);
+ if let Err(e) = wait_with_progress(self.inner.handle_message(msg), 20, &task_info)
+ .instrument(span)
+ .await
+ {
+ tracing::error!("Error handling message during shutdown drain: {:?}", e);
+ }
+ }
+
match tokio::time::timeout(
Duration::from_secs(10),
self.inner.persist_mempool_to_disk(),
).await { ... }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/actors/src/mempool_service.rs` around lines 3159 - 3188, The loop
currently pops a message from msg_rx (traced.into_parts()) before attempting
semaphore.acquire_owned() with timeout, so when acquire times out the popped
message is dropped and remaining messages aren't drained; fix by acquiring the
permit (using inner.message_handler_semaphore.acquire_owned() with the same
timeout) before removing a message from msg_rx, then after successfully
obtaining the permit call msg_rx.try_recv() (or try_recv in a small loop) to get
the next message and spawn the task that calls inner.handle_message wrapped by
wait_with_progress; keep the tracing span creation and use of
runtime_handle.spawn, and handle Err from acquire_owned (semaphore closed) and
timeout without discarding messages so no queued messages are lost during
shutdown drain.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
crates/actors/src/mempool_service.rs (1)
3166-3197:⚠️ Potential issue | 🟠 MajorShutdown inline fallback bypasses semaphore bound (regression).
At Line 3184, the
NoPermitsbranch processeshandle_messageinline without holding a permit, so shutdown can run abovemax_concurrent_taskswhile permit-held handlers are still active. This reintroduces the same class of drain-path concurrency issue.🛠️ Suggested fix (keep popped message, still honor semaphore)
match inner.message_handler_semaphore.clone().try_acquire_owned() { Ok(permit) => { runtime_handle.spawn(async move { let _permit = permit; let task_info = format!("shutdown drain: {}", msg_type); if let Err(e) = wait_with_progress( inner.handle_message(msg), 20, &task_info, ).await { tracing::error!("Error handling message during shutdown drain: {:?}", e); } }.instrument(span)); } Err(tokio::sync::TryAcquireError::Closed) => { tracing::error!("Semaphore closed during shutdown drain"); break; } Err(tokio::sync::TryAcquireError::NoPermits) => { - let task_info = format!("shutdown drain (inline): {}", msg_type); - if let Err(e) = wait_with_progress( - inner.handle_message(msg), - 20, - &task_info, - ) - .instrument(span) - .await - { - tracing::error!("Error handling message during shutdown drain: {:?}", e); - } + // Preserve message handling but still respect concurrency bound. + match inner.message_handler_semaphore.clone().acquire_owned().await { + Ok(permit) => { + let _permit = permit; + let task_info = format!("shutdown drain (inline): {}", msg_type); + if let Err(e) = wait_with_progress( + inner.handle_message(msg), + 20, + &task_info, + ) + .instrument(span) + .await + { + tracing::error!("Error handling message during shutdown drain: {:?}", e); + } + } + Err(_) => { + tracing::error!("Semaphore closed while waiting in shutdown drain"); + break; + } + } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/actors/src/mempool_service.rs` around lines 3166 - 3197, The shutdown inline fallback in the NoPermits branch bypasses inner.message_handler_semaphore and can exceed max_concurrent_tasks; modify the NoPermits handling so it still respects the semaphore: instead of invoking inner.handle_message(msg) directly, re-acquire a permit (e.g., await inner.message_handler_semaphore.acquire_owned().await) before calling handle_message, and then run the handler under the acquired permit (using the same wait_with_progress, task_info and span logic) so both the inline path and spawned tasks honor the semaphore bound and the popped message is still processed during shutdown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@crates/actors/src/mempool_service.rs`:
- Around line 3166-3197: The shutdown inline fallback in the NoPermits branch
bypasses inner.message_handler_semaphore and can exceed max_concurrent_tasks;
modify the NoPermits handling so it still respects the semaphore: instead of
invoking inner.handle_message(msg) directly, re-acquire a permit (e.g., await
inner.message_handler_semaphore.acquire_owned().await) before calling
handle_message, and then run the handler under the acquired permit (using the
same wait_with_progress, task_info and span logic) so both the inline path and
spawned tasks honor the semaphore bound and the popped message is still
processed during shutdown.
Describe the changes
A clear and concise description of what the bug fix, feature, or improvement is.
Related Issue(s)
Please link to the issue(s) that will be closed with this PR.
Checklist
Additional Context
Add any other context about the pull request here.
Summary by CodeRabbit
Bug Fixes
New Features
Chores