Skip to content

fix(tracing): add missing spans#1161

Merged
glottologist merged 13 commits into
masterfrom
jason/missing_spans
Feb 26, 2026
Merged

fix(tracing): add missing spans#1161
glottologist merged 13 commits into
masterfrom
jason/missing_spans

Conversation

@glottologist

@glottologist glottologist commented Feb 25, 2026

Copy link
Copy Markdown
Contributor

Describe the changes
A clear and concise description of what the bug fix, feature, or improvement is.

Related Issue(s)
Please link to the issue(s) that will be closed with this PR.

Checklist

  • Tests have been added/updated for the changes.
  • Documentation has been updated for the changes (if applicable).
  • The code follows Rust's style guidelines.

Additional Context
Add any other context about the pull request here.

Summary by CodeRabbit

  • Bug Fixes

    • Shutdown/drain now surfaces and logs write errors for remaining batches and reliably awaits in‑flight work with timeouts and persistence.
  • New Features

    • Added seed-data validation in pre-validation to reject invalid blocks earlier.
  • Chores

    • Expanded tracing with explicit span names across many services for richer observability.
    • Enforced/clamped bounded concurrency for ingress and mempool shutdown/drain paths.
    • Increased service memory/JVM heap and configured Elasticsearch rollover alias and initial index naming for logs.

@coderabbitai

coderabbitai Bot commented Feb 25, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

Wide rollout of tracing instrumentation across services, added error checking/logging for chunk batch writes, and reorganized shutdown/concurrency handling for mempool and chunk ingress (new internal max_concurrent_tasks and semaphore-driven drain). Also adjusted observability/docker Elasticsearch ILM and JVM memory settings. No public API signature changes.

Changes

Cohort / File(s) Summary
Chunk Ingress & Chunk Data Writer
crates/actors/src/chunk_ingress_service.rs, crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
Added pub(crate) max_concurrent_tasks to inner, clamp/wire configured concurrency, per-message tracing spans, semaphore-gated task spawning, shutdown acquires_many(max_concurrent_tasks) and processes remaining messages via wait_with_progress; write_batch now returns checked and error-logged; write_batch instrumented with batch_size.
Mempool Service & Shutdown
crates/actors/src/mempool_service.rs
Added internal max_concurrent_tasks, clamp+warn on configured value; reorganized shutdown into phases (drain per-message with semaphore permits, wait for in-flight handlers via acquire_many with timeout, persist state with timeout) wrapped in mempool_shutdown tracing span.
Validation & Block Validation Tasks
crates/actors/src/block_validation.rs, crates/actors/src/validation_service/block_validation_task.rs
Added is_seed_data_valid public helper; moved POA validation into spawn_blocking under a poa_validation span; seeds validation wrapped in seeds_validation span; instrumentation added, existing logging preserved.
Actor service start instrumentation
crates/actors/src/block_discovery.rs, crates/actors/src/block_producer.rs, crates/actors/src/block_tree_service.rs, crates/actors/src/cache_service.rs, crates/actors/src/chunk_migration_service.rs, crates/actors/src/data_sync_service.rs, crates/actors/src/storage_module_service.rs, crates/actors/src/validation_service.rs
Start functions annotated with explicit tracing span names (instrument attributes updated). No behavior changes.
Chunk Migration / Partition Mining
crates/actors/src/chunk_migration_service.rs, crates/actors/src/partition_mining_service.rs
Added explicit instrumentation span names and wrapped startup tasks in an info_span (partition mining startup includes storage_module_id field).
P2P: Chain Sync, Gossip, Peer Network
crates/p2p/src/chain_sync.rs, crates/p2p/src/gossip_client.rs, crates/p2p/src/peer_network_service.rs
Wrapped spawned tasks with per-task tracing spans (using .instrument/.info_span), added function-level instrumentation (e.g., block_hash, peer_id), and moved lifecycle flag resets inside instrumented tasks.
Chain & Database
crates/chain/src/chain.rs, crates/database/src/data_ledger.rs, crates/database/src/database.rs
Added tracing spans/attributes (e.g., reth_select_loop, get_expired_slot_indexes, insert_block_header, cache_chunk_verified); insert_block_header now logs error when POA chunk missing.
Domain & VDF
crates/domain/src/snapshots/epoch_snapshot/epoch_snapshot.rs, crates/vdf/src/state.rs, crates/vdf/src/vdf.rs
Added trace-level instrumentation to process_slot_needs and calc_capacity; introduced a vdf_loop span in run_vdf.
Observability / Elasticsearch & Docker
docker/observation/docker-compose.yaml, docker/observation/configs/elasticsearch/index-settings.json, docker/observation/configs/elasticsearch/index-template.json, docker/observation/scripts/setup_elasticsearch_ilm.sh
Increased Tempo memory 4G→8G and ES JVM heap to 2g; added rollover alias irys-logs and alias mapping; initial ILM index creation changed to irys-logs-000001.

Sequence Diagram(s)

mermaid
sequenceDiagram
autonumber
actor Operator
participant ChunkIngress as ChunkIngressService
participant Semaphore as ConcurrencySemaphore
participant Worker as TaskWorker
participant Disk as Disk/PersistentStore
participant Tracing as Tracing/Logger

Operator->>ChunkIngress: initiate shutdown
ChunkIngress->>Semaphore: acquire_many(max_concurrent_tasks) (wait/timeout)
ChunkIngress->>Worker: spawn per-queued-message handler (instrumented span)
Worker->>Tracing: enter per-message span
Worker->>Disk: process/persist message (wait_with_progress)
Disk-->>Worker: result (ok / err)
Worker->>Tracing: log result or error
ChunkIngress->>Disk: persist final state (with timeout)
Disk-->>ChunkIngress: persist result
ChunkIngress->>Tracing: emit shutdown span end

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • antouhou
  • DanMacDonald
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix(tracing): add missing spans' accurately summarizes the main change—adding tracing instrumentation and spans throughout the codebase.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jason/missing_spans

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@glottologist glottologist marked this pull request as ready for review February 25, 2026 07:55

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/actors/src/data_sync_service.rs (1)

621-632: ⚠️ Potential issue | 🟠 Major

Remove the span guard that wraps tokio::select! and use function-level #[instrument] instead.

Lines 621–631 hold a Span::enter() guard across multiple .await points in the select loop, which violates tracing best practices and can cause incorrect span propagation and Send-bound friction.

Since the function already has #[tracing::instrument(level = "trace", skip_all)], replace the manual span and guard with a function attribute that sets the custom name:

- #[tracing::instrument(level = "trace", skip_all)]
+ #[tracing::instrument(level = "info", name = "data_sync_service", skip_all)]
  async fn start(mut self) -> eyre::Result<()> {
-     let service_span = tracing::info_span!("data_sync_service");
      tracing::info!("starting DataSync Service");
      
      loop {
-         let _guard = service_span.enter();
          tokio::select! {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/data_sync_service.rs` around lines 621 - 632, The code
creates a tracing span via service_span =
tracing::info_span!("data_sync_service") and holds a guard across the
tokio::select! loop (entering the span before await points), which should be
removed; delete the manual span and the _guard = service_span.enter() usage
around the select, and instead annotate the async function (data_sync_service)
with a function-level tracing attribute that sets the span name (e.g.,
#[tracing::instrument(name = "data_sync_service", level = "trace", skip_all)]);
leave the rest of the loop (interval, peer_events_rx, tokio::select!) untouched
so tracing uses the function-level instrumented span.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service/chunk_data_writer.rs`:
- Line 146: The calls that currently discard the Result from write_batch (the
lines using let _ = self.write_batch(...)) should surface failures instead of
swallowing them; update those call sites to either propagate the Result
(returning or mapping Err(QueueError::WriteFailed) up the call chain from the
caller) or at minimum inspect the Result and log errors with error! including
context (e.g., batch_size and any identifying metadata) so failures are
observable; locate the failing call sites by the pattern let _ =
self.write_batch(...) and modify them to handle the Result rather than ignoring
it.

In `@crates/actors/src/chunk_migration_service.rs`:
- Around line 502-507: The manual span guard created by service_span and its
enter() (_guard) is being held across an await in the tokio::select! loop
(symbols: service_span, _guard, tracing::info_span!("chunk_migration_service")),
which violates tracing async semantics; remove the manual span and guard and
instead annotate the function with #[tracing::instrument(name =
"chunk_migration_service", level = "trace", skip_all, err)] (or adjust the
existing #[instrument] on the enclosing function) so the span is applied per
await-aware invocation and do not call service_span.enter() around the loop or
across await points.

In `@crates/database/src/data_ledger.rs`:
- Around line 81-82: The tracing span for get_expired_slot_indexes currently
uses skip_all and declares fields(epoch_height) which creates an empty field;
update the attribute on pub fn get_expired_slot_indexes(&self, epoch_height:
u64) to explicitly record the argument value by changing the span fields to
fields(epoch_height = %epoch_height) so the epoch_height is captured in the
trace.

In `@crates/p2p/src/chain_sync.rs`:
- Around line 347-360: The running-flag reset is only done on normal completion
and will remain true if the spawned task panics; create a small RAII guard type
(e.g., WhitelistRunningGuard) whose Drop impl sets
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed) and
instantiate it at the start of the async block that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is
cleared even if the task panics; you can then remove or keep the explicit
store(false) call at the end, but ensure the guard variable lives for the whole
task scope so Drop runs on unwind.

---

Outside diff comments:
In `@crates/actors/src/data_sync_service.rs`:
- Around line 621-632: The code creates a tracing span via service_span =
tracing::info_span!("data_sync_service") and holds a guard across the
tokio::select! loop (entering the span before await points), which should be
removed; delete the manual span and the _guard = service_span.enter() usage
around the select, and instead annotate the async function (data_sync_service)
with a function-level tracing attribute that sets the span name (e.g.,
#[tracing::instrument(name = "data_sync_service", level = "trace", skip_all)]);
leave the rest of the loop (interval, peer_events_rx, tokio::select!) untouched
so tracing uses the function-level instrumented span.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a100a92 and fb9ccfc.

📒 Files selected for processing (16)
  • crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
  • crates/actors/src/chunk_migration_service.rs
  • crates/actors/src/data_sync_service.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/partition_mining_service.rs
  • crates/chain/src/chain.rs
  • crates/database/src/data_ledger.rs
  • crates/database/src/database.rs
  • crates/domain/src/snapshots/epoch_snapshot/epoch_snapshot.rs
  • crates/p2p/src/chain_sync.rs
  • crates/p2p/src/gossip_client.rs
  • crates/p2p/src/peer_network_service.rs
  • crates/vdf/src/lib.rs
  • crates/vdf/src/state.rs
  • crates/vdf/src/vdf.rs
  • docker/observation/docker-compose.yaml

Comment thread crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
Comment thread crates/actors/src/chunk_migration_service.rs Outdated
Comment thread crates/database/src/data_ledger.rs Outdated
Comment on lines +347 to +360
tokio::spawn(
async move {
match handler.pull_and_process_stake_and_pledge_whitelist().await {
Ok(()) => {
info!("Successfully updated stake and pledge whitelist",);
}
Err(e) => {
error!("Failed to update stake and pledge whitelist: {:?}", e);
}
}
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
}
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
});
.instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
);

@coderabbitai coderabbitai Bot Feb 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check the current code at lines 347-360 and search for panic-safe mechanisms
rg -n -C3 'spawn_stake_and_pledge_update_task|stake_and_pledge_whitelist_running_flag|store\(false, Ordering::Relaxed\)' crates/p2p/src/chain_sync.rs
echo "---"
rg -n -C2 'impl Drop|scopeguard|defer!' crates/p2p/src/chain_sync.rs
echo "---"
# Also check the exact lines 347-360
sed -n '347,360p' crates/p2p/src/chain_sync.rs

Repository: Irys-xyz/irys

Length of output: 3334


Protect whitelist-task running flag against panic paths.

stake_and_pledge_whitelist_running_flag is reset only on normal completion at line 357. If the spawned task panics during pull_and_process_stake_and_pledge_whitelist(), the flag is never reset, leaving it stuck at true and permanently blocking future whitelist update attempts via the guard check at lines 338–342.

Use an RAII guard with Drop impl to ensure the flag is reset even on panic:

Proposed fix
         tokio::spawn(
             async move {
+                struct ResetWhitelistFlagOnDrop(Arc<AtomicBool>);
+                impl Drop for ResetWhitelistFlagOnDrop {
+                    fn drop(&mut self) {
+                        self.0.store(false, Ordering::Relaxed);
+                    }
+                }
+
+                let _reset_flag =
+                    ResetWhitelistFlagOnDrop(stake_and_pledge_whitelist_running_flag.clone());
+
                 match handler.pull_and_process_stake_and_pledge_whitelist().await {
                     Ok(()) => {
                         info!("Successfully updated stake and pledge whitelist",);
                     }
                     Err(e) => {
                         error!("Failed to update stake and pledge whitelist: {:?}", e);
                     }
                 }
-                stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
             }
             .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
         );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tokio::spawn(
async move {
match handler.pull_and_process_stake_and_pledge_whitelist().await {
Ok(()) => {
info!("Successfully updated stake and pledge whitelist",);
}
Err(e) => {
error!("Failed to update stake and pledge whitelist: {:?}", e);
}
}
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
}
stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
});
.instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
);
tokio::spawn(
async move {
struct ResetWhitelistFlagOnDrop(Arc<AtomicBool>);
impl Drop for ResetWhitelistFlagOnDrop {
fn drop(&mut self) {
self.0.store(false, Ordering::Relaxed);
}
}
let _reset_flag =
ResetWhitelistFlagOnDrop(stake_and_pledge_whitelist_running_flag.clone());
match handler.pull_and_process_stake_and_pledge_whitelist().await {
Ok(()) => {
info!("Successfully updated stake and pledge whitelist",);
}
Err(e) => {
error!("Failed to update stake and pledge whitelist: {:?}", e);
}
}
}
.instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/chain_sync.rs` around lines 347 - 360, The running-flag reset
is only done on normal completion and will remain true if the spawned task
panics; create a small RAII guard type (e.g., WhitelistRunningGuard) whose Drop
impl sets stake_and_pledge_whitelist_running_flag.store(false,
Ordering::Relaxed) and instantiate it at the start of the async block that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is
cleared even if the task panics; you can then remove or keep the explicit
store(false) call at the end, but ensure the guard variable lives for the whole
task scope so Drop runs on unwind.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^, if we don't panic if this task panics, we'll need to do something like the above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JesseTheRobot, you're absolutely right! Since you're opting for graceful panic handling here (rather than letting the task panic propagate), the RAII guard pattern is the correct solution.

This differs from the fire-and-forget mempool ingestion tasks (where panics are acceptable because re-ingestion happens during block validation). For the whitelist update task, the stake_and_pledge_whitelist_running_flag must be reset on panic; otherwise, the guard check at lines 338-342 will permanently block all future whitelist updates.

The suggested ResetWhitelistFlagOnDrop guard ensures the flag is cleared even on unwind, preventing the deadlock scenario. Feel free to proceed with that approach! 🐰


🧠 Learnings used
Learnt from: antouhou
Repo: Irys-xyz/irys PR: 1097
File: crates/p2p/src/block_pool.rs:834-867
Timestamp: 2026-02-11T21:49:10.701Z
Learning: In crates/p2p/src/block_pool.rs, the spawned background task for mempool ingestion (handle_commitment_transaction_ingress_gossip and handle_data_transaction_ingress_gossip) intentionally uses a fire-and-forget pattern without panic handling because failures at this stage don't matter—block validation will re-ingest the transactions anyway.

Learnt from: antouhou
Repo: Irys-xyz/irys PR: 1057
File: crates/p2p/src/gossip_client.rs:143-170
Timestamp: 2026-01-05T16:21:34.820Z
Learning: In crates/p2p/src/gossip_client.rs, methods like pull_block_body_from_v1_peer intentionally convert all errors (Err(_)) to Ok(GossipResponse::Accepted(None)) because these methods must never return errors by design. This protocol will be replaced with direct socket communication later, so the current approach of treating network failures uniformly as "data not found" is acceptable.

Learnt from: roberts-pumpurs
Repo: Irys-xyz/irys PR: 1118
File: crates/actors/src/mempool_service/ingress_proofs.rs:286-299
Timestamp: 2026-02-02T14:41:28.988Z
Learning: In ingress proof handling (e.g., `crates/actors/src/mempool_service/ingress_proofs.rs`), the staking checks are intentionally asymmetric: proof ingestion (`handle_ingest_ingress_proof`) accepts proofs from nodes staked in either `epoch_snapshot` or `commitment_snapshot` to minimize fork issues, but proof generation (`generate_and_store_ingress_proof`, `reanchor_and_store_ingress_proof`) only checks `epoch_snapshot` to ensure nodes generate proofs only when definitively staked in the current epoch.

Learnt from: glottologist
Repo: Irys-xyz/irys PR: 1090
File: crates/actors/src/supply_state_calculator.rs:121-130
Timestamp: 2026-01-20T13:27:23.880Z
Learning: In the supply state backfill logic (crates/actors/src/supply_state_calculator.rs), if the backfill process encounters failures such as batch count mismatches, the code should panic rather than return an error, as these represent critical invariant violations.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
crates/database/src/data_ledger.rs (1)

131-140: 🧹 Nitpick | 🔵 Trivial

Consider instrumenting expire_old_slots with its own span.

expire_old_slots is the public mutation entry-point that calls the now-instrumented get_expired_slot_indexes, but it carries no span of its own. Any caller that invokes expire_old_slots directly will have an unattributed gap in the trace tree.

💡 Suggested addition
+    #[tracing::instrument(level = "debug", skip_all, fields(epoch_height = %epoch_height))]
     pub fn expire_old_slots(&mut self, epoch_height: u64) -> Vec<usize> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/database/src/data_ledger.rs` around lines 131 - 140, Add a tracing
span to the public mutation entry-point expire_old_slots so callers are
attributed in traces; create a span (or add #[tracing::instrument] on pub fn
expire_old_slots) that records epoch_height (and optionally the number of
expired slots) and enter it before calling get_expired_slot_indexes and mutating
self.slots, so the trace tree has no unattributed gap when this method is
invoked directly.
crates/actors/src/block_validation.rs (1)

1140-1148: ⚠️ Potential issue | 🟡 Minor

Spurious blank line between #[tracing::instrument] and pub fn poa_is_valid will be removed by cargo fmt.

Line 1147 is an empty line sitting between the closing ), err)] of the attribute and the pub fn declaration. cargo fmt collapses blank lines between outer attributes and their associated item, so this will be reformatted on the next cargo fmt --all run.

🖌️ Proposed fix
 #[tracing::instrument(level = "debug", skip_all, fields(
     block.miner_address = ?miner_address,
     poa.chunk_offset = ?poa.partition_chunk_offset,
     poa.partition_hash = ?poa.partition_hash,
     config.entropy_packing_iterations = ?config.entropy_packing_iterations,
     config.chunk_size = ?config.chunk_size
 ), err)]
-
 pub fn poa_is_valid(

As per coding guidelines: "Run cargo fmt --all before pushing or creating PRs".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/block_validation.rs` around lines 1140 - 1148, Remove the
spurious blank line between the #[tracing::instrument(..., err)] attribute and
the function declaration so the attribute attaches directly to pub fn
poa_is_valid; i.e., edit the block containing the tracing::instrument attribute
(which references fields like miner_address, poa.partition_chunk_offset,
poa.partition_hash, config.entropy_packing_iterations, config.chunk_size) to
eliminate the empty line before the pub fn poa_is_valid declaration and then run
cargo fmt --all to ensure formatting is stable.
crates/actors/src/data_sync_service.rs (1)

213-221: 🧹 Nitpick | 🔵 Trivial

Remove dead commented-out code.

The else branch of optimize_peer_concurrency contains only a commented-out debug! block. Clippy's clippy::collapsible_else_if / dead-code lints may not flag this, but it's noise and violates the spirit of running cargo clippy clean before pushing.

🧹 Proposed cleanup
-            } else {
-                // debug!(
-                //     "Not increasing concurrency for peer {} max_concurrency {} (concurrent utilization: {:.1}%, health: {:.2})",
-                //     peer_addr,
-                //     current_max,
-                //     utilization_ratio * 100.0,
-                //     health_score
-                // );
-            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/data_sync_service.rs` around lines 213 - 221, The else
branch inside the optimize_peer_concurrency function contains only a dead,
commented-out debug! block (referencing peer_addr, current_max,
utilization_ratio, health_score); remove that commented code and the empty else
branch entirely (or collapse the if/else into a single if) so the function
contains no noisy commented-out debug lines.
crates/actors/src/mempool_service.rs (1)

3174-3178: 🧹 Nitpick | 🔵 Trivial

channel_name is invisible in the generated span due to skip_all.

skip_all suppresses all arguments, so the "mempool_shutdown" / reorg span won't carry a channel field. The name is only visible inside the tracing::debug!/warn! log messages emitted within the function body, which are harder to query in a trace viewer.

✨ Optional — add a `fields` clause to retain the channel name in the span
-#[tracing::instrument(level = "trace", skip_all)]
+#[tracing::instrument(level = "trace", skip(result), fields(channel = channel_name))]
 pub fn handle_broadcast_recv<T>(
     result: Result<T, broadcast::error::RecvError>,
     channel_name: &str,
 ) -> Option<T> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/mempool_service.rs` around lines 3174 - 3178, The span
currently hides the channel_name because #[tracing::instrument(..., skip_all)]
suppresses all args; change the attribute on handle_broadcast_recv to include
the channel as a span field and only skip non-serializable args: replace
skip_all with skip(result) (or skip the specific non-serializable param) and add
fields(channel = %channel_name) so the span carries a channel field while still
skipping the receive result in the span.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/data_sync_service.rs`:
- Line 619: The tracing span on the data_sync_service start function is missing
error capture; update the tracing::instrument attribute on the start function
(the one annotated with name = "data_sync_service_start") to include err so
errors returned from start (eyre::Result<()>) are recorded in the span—match the
pattern used in storage_module_service_start and chunk_migration_service_start
by adding err to the attribute.

---

Outside diff comments:
In `@crates/actors/src/block_validation.rs`:
- Around line 1140-1148: Remove the spurious blank line between the
#[tracing::instrument(..., err)] attribute and the function declaration so the
attribute attaches directly to pub fn poa_is_valid; i.e., edit the block
containing the tracing::instrument attribute (which references fields like
miner_address, poa.partition_chunk_offset, poa.partition_hash,
config.entropy_packing_iterations, config.chunk_size) to eliminate the empty
line before the pub fn poa_is_valid declaration and then run cargo fmt --all to
ensure formatting is stable.

In `@crates/actors/src/data_sync_service.rs`:
- Around line 213-221: The else branch inside the optimize_peer_concurrency
function contains only a dead, commented-out debug! block (referencing
peer_addr, current_max, utilization_ratio, health_score); remove that commented
code and the empty else branch entirely (or collapse the if/else into a single
if) so the function contains no noisy commented-out debug lines.

In `@crates/actors/src/mempool_service.rs`:
- Around line 3174-3178: The span currently hides the channel_name because
#[tracing::instrument(..., skip_all)] suppresses all args; change the attribute
on handle_broadcast_recv to include the channel as a span field and only skip
non-serializable args: replace skip_all with skip(result) (or skip the specific
non-serializable param) and add fields(channel = %channel_name) so the span
carries a channel field while still skipping the receive result in the span.

In `@crates/database/src/data_ledger.rs`:
- Around line 131-140: Add a tracing span to the public mutation entry-point
expire_old_slots so callers are attributed in traces; create a span (or add
#[tracing::instrument] on pub fn expire_old_slots) that records epoch_height
(and optionally the number of expired slots) and enter it before calling
get_expired_slot_indexes and mutating self.slots, so the trace tree has no
unattributed gap when this method is invoked directly.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fb9ccfc and 4b1d619.

📒 Files selected for processing (14)
  • crates/actors/src/block_discovery.rs
  • crates/actors/src/block_producer.rs
  • crates/actors/src/block_tree_service.rs
  • crates/actors/src/block_validation.rs
  • crates/actors/src/cache_service.rs
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
  • crates/actors/src/chunk_migration_service.rs
  • crates/actors/src/data_sync_service.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/storage_module_service.rs
  • crates/actors/src/validation_service.rs
  • crates/actors/src/validation_service/block_validation_task.rs
  • crates/database/src/data_ledger.rs

}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)]

@coderabbitai coderabbitai Bot Feb 25, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n 'instrument\(.*_service_start' --type rust -A 1

Repository: Irys-xyz/irys

Length of output: 1688


Add err to align error capture with other service spans in this PR.

The start function returns eyre::Result<()> and propagates errors via ? (e.g., from handle_message on Line 642). Verification shows inconsistency across the PR: storage_module_service_start and chunk_migration_service_start include err, while five other services (including this one) omit it. Adding err ensures that any error causing the service to exit is automatically recorded in the span, and aligns the observability approach across all service start spans.

Proposed change
-    #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)]
+    #[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all, err)]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all)]
#[tracing::instrument(name = "data_sync_service_start", level = "trace", skip_all, err)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/data_sync_service.rs` at line 619, The tracing span on the
data_sync_service start function is missing error capture; update the
tracing::instrument attribute on the start function (the one annotated with name
= "data_sync_service_start") to include err so errors returned from start
(eyre::Result<()>) are recorded in the span—match the pattern used in
storage_module_service_start and chunk_migration_service_start by adding err to
the attribute.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JesseTheRobot, thanks for confirming! Adding err is the right call—it will automatically emit an ERROR-level event whenever the start function returns an Err, capturing the full error context in the span without additional manual logging. This aligns the observability approach with the other instrumented service start spans in this PR and ensures any early-exit errors (from handle_message? or tick?) are recorded.

Feel free to commit this change directly, or let me know if you'd like me to help implement it!

Comment thread crates/actors/src/mempool_service.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 188-193: Validate and bound the configured concurrent-task value
before constructing the semaphore and setting max_concurrent_tasks: ensure the
configured max_concurrent_chunk_ingress_tasks is at least 1 and fits into u32
(or cap it to u32::MAX or return an error), compute a single bounded value
(e.g., bounded_permits) and use that for both Semaphore::new(bounded_permits)
and u32::try_from(bounded_permits) so you don't create a 0-permit semaphore or
call expect on an out-of-range conversion. Update the same pattern in the second
occurrence (the block around the other message_handler_semaphore /
max_concurrent_tasks assignment).

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4b1d619 and 2eac21c.

📒 Files selected for processing (2)
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/mempool_service.rs

Comment thread crates/actors/src/chunk_ingress_service.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/actors/src/mempool_service.rs (1)

3041-3041: 🧹 Nitpick | 🔵 Trivial

Inconsistent span strategy: propagate-current vs. named span.

chunk_ingress_service.rs wraps its spawned task with tracing::info_span!("chunk_ingress_service"), giving the service a stable, named root span. Here, tracing::Span::current() is propagated instead — if the calling context has no active named span, the service's traces will be rootless and harder to filter.

🔍 Suggested fix
-        .instrument(tracing::Span::current()),
+        .instrument(tracing::info_span!("mempool_service")),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/mempool_service.rs` at line 3041, The spawned task in
mempool_service uses .instrument(tracing::Span::current()), which propagates the
caller's span and can leave the service without a stable root span; replace this
with creating and using a named span like tracing::info_span!("mempool_service")
and attach it to the spawned future (e.g., create let span =
tracing::info_span!("mempool_service");
tokio::spawn(my_future.instrument(span))); update the .instrument(...) call in
the mempool_service spawn site to use that named span so traces are consistently
rooted and filterable (mirror chunk_ingress_service's approach).
♻️ Duplicate comments (2)
crates/actors/src/chunk_ingress_service.rs (2)

158-201: Clamping + max_concurrent_tasks initialization correctly addresses the previous review concern.

The prior comment flagged that a 0-configured value could silently push all ingress into timeout/drop behavior and that u32::try_from could panic. Both are resolved: the lower bound is now 1, and u32::try_from is guaranteed safe by the preceding clamp.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 158 - 201, The clamp
of mempool_config.max_concurrent_chunk_ingress_tasks to range 1..=u32::MAX and
the subsequent use of u32::try_from when initializing max_concurrent_tasks
resolves the prior issues; ensure the clamp logic (raw_max_concurrent.clamp(1,
MAX_PERMITS)) is kept, the warning log for adjusted values remains, and the
u32::try_from call on max_concurrent_chunk_ingress_tasks in
ChunkIngressServiceInner initialization is preserved so it cannot panic.

307-331: ⚠️ Potential issue | 🟡 Minor

Same 10-second budget shared between acquire_many and drain — same concern as mempool_service.rs.

acquire_many(self.inner.max_concurrent_tasks) and the subsequent try_recv loop share the single 10s timeout window (line 326). Slow in-flight handlers can exhaust the budget before any queued message is drained. Consider the same two-phase split suggested in mempool_service.rs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 307 - 331, The
current tokio::time::timeout wraps the entire process_remaining closure so
acquire_many(self.inner.max_concurrent_tasks) and the drain loop share the same
10s budget; split this into two phases: first await acquire_many with its own
short timeout (so we don't block waiting for in-flight handlers indefinitely),
then run the try_recv drain loop inside a separate timeout (or with its own
bounded wait) to ensure queued messages get time to be processed; update the
logic around the process_remaining closure, acquire_many, msg_rx.try_recv,
wait_with_progress and handle_message to use two distinct timeouts and preserve
the existing logs/behavior on Ok/Err.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Line 210: Replace the inline tokio::runtime::Handle::current() call passed to
.start(...) with the same explicit handle-cloning pattern used in
mempool_service: clone the existing runtime_handle into a local name (e.g.,
handle_for_inner) before spawning the task and pass that cloned handle to the
chunk ingress service's start(...) call so the code uses runtime_handle.clone()
-> handle_for_inner and .start(handle_for_inner) instead of
.start(tokio::runtime::Handle::current()).

In `@crates/actors/src/mempool_service.rs`:
- Around line 3148-3181: The shutdown drain currently uses one 10s timeout for
both acquiring all permits and draining the queue (process_remaining), which
lets slow in-flight handlers block the drain; split the phases so acquire_many
on message_handler_semaphore / self.inner.max_concurrent_tasks is awaited under
its own timeout (or allowed to complete without the 10s limit) and then run the
loop that calls self.msg_rx.try_recv() +
wait_with_progress(self.inner.handle_message(...)) under a separate
tokio::time::timeout(Duration::from_secs(10)); update logging to distinguish
failures when acquiring permits vs when draining the queue and ensure you still
call .instrument(span) and handle Err results from each phase (referencing
process_remaining, acquire_many, message_handler_semaphore,
max_concurrent_tasks, msg_rx, try_recv, wait_with_progress, and
tokio::time::timeout).

---

Outside diff comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3041: The spawned task in mempool_service uses
.instrument(tracing::Span::current()), which propagates the caller's span and
can leave the service without a stable root span; replace this with creating and
using a named span like tracing::info_span!("mempool_service") and attach it to
the spawned future (e.g., create let span =
tracing::info_span!("mempool_service");
tokio::spawn(my_future.instrument(span))); update the .instrument(...) call in
the mempool_service spawn site to use that named span so traces are consistently
rooted and filterable (mirror chunk_ingress_service's approach).

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 158-201: The clamp of
mempool_config.max_concurrent_chunk_ingress_tasks to range 1..=u32::MAX and the
subsequent use of u32::try_from when initializing max_concurrent_tasks resolves
the prior issues; ensure the clamp logic (raw_max_concurrent.clamp(1,
MAX_PERMITS)) is kept, the warning log for adjusted values remains, and the
u32::try_from call on max_concurrent_chunk_ingress_tasks in
ChunkIngressServiceInner initialization is preserved so it cannot panic.
- Around line 307-331: The current tokio::time::timeout wraps the entire
process_remaining closure so acquire_many(self.inner.max_concurrent_tasks) and
the drain loop share the same 10s budget; split this into two phases: first
await acquire_many with its own short timeout (so we don't block waiting for
in-flight handlers indefinitely), then run the try_recv drain loop inside a
separate timeout (or with its own bounded wait) to ensure queued messages get
time to be processed; update the logic around the process_remaining closure,
acquire_many, msg_rx.try_recv, wait_with_progress and handle_message to use two
distinct timeouts and preserve the existing logs/behavior on Ok/Err.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2eac21c and 5806453.

📒 Files selected for processing (2)
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/mempool_service.rs

Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/mempool_service.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3211: The traced span on handle_broadcast_recv at
#[tracing::instrument(level = "trace", skip_all)] is low-value noise; either
remove the attribute entirely or change it to a less-chatty level (e.g., level =
"debug") and add meaningful fields to the span if you need context (for example
request ids or peer identifiers) so the outer span provides useful data; update
the attribute on the handle_broadcast_recv function accordingly.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5806453 and cdc37a0.

📒 Files selected for processing (2)
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/mempool_service.rs

Comment thread crates/actors/src/mempool_service.rs
)
.inspect_err(|err| tracing::error!(
{
let poa_span = tracing::info_span!(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did the previous instrument call not work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spurious, fixed

Comment thread crates/actors/src/block_validation.rs Outdated
/// Submits the EVM payload to reth for execution layer validation.
/// This should only be called after all consensus layer validations have passed.
#[tracing::instrument(level = "trace", skip_all, err, fields(
#[tracing::instrument(level = "debug", skip_all, err, fields(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the level changing from trace to debug for just these spans?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the errant child, will fix

Comment thread crates/actors/src/mempool_service.rs Outdated
let max_concurrent_mempool_tasks = mempool_config.max_concurrent_mempool_tasks;
let raw_max_concurrent = mempool_config.max_concurrent_mempool_tasks;
const MAX_PERMITS: usize = u32::MAX as usize;
let max_concurrent_mempool_tasks = raw_max_concurrent.clamp(1, MAX_PERMITS);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the minimum should probably be closer to 100 than 1 - maybe 10? 20?

Comment thread crates/actors/src/mempool_service.rs Outdated
Comment on lines +3166 to +3184
// Phase 2: drain queued messages with its own budget
let drain_fut = async {
while let Ok(traced) = self.msg_rx.try_recv() {
let (msg, parent_span) = traced.into_parts();
let msg_type = msg.variant_name();
let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type);
let task_info = format!("shutdown drain: {}", msg_type);
if let Err(e) = wait_with_progress(
self.inner.handle_message(msg),
20,
&task_info,
)
.instrument(span)
.await
{
tracing::error!("Error handling message during shutdown drain: {:?}", e);
}
}
};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be tempted to flip these: wait until the rx queue is empty, then try to grab all the permits - this was shutdown message processing isn't one-by-one

@JesseTheRobot JesseTheRobot left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just some nonblocking feedback

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/actors/src/block_validation.rs (2)

1734-1741: ⚠️ Potential issue | 🟠 Major

Avoid serializing full vdf_info in invalid-seed diagnostics.

This currently logs and stores the entire vdf_info debug payload, which can be large and amplify memory/log volume on invalid-block spam. Log and return only the minimal seed fields needed for triage.

Proposed fix
-        error!(
-            "Seed data is invalid. Expected: {:?}, got: {:?}",
-            expected_seed_data, vdf_info
-        );
-        ValidationResult::Invalid(ValidationError::SeedDataInvalid(format!(
-            "Expected: {:?}, got: {:?}",
-            expected_seed_data, vdf_info
-        )))
+        error!(
+            expected_next_seed = ?expected_seed_data.0,
+            expected_seed = ?expected_seed_data.1,
+            got_next_seed = ?vdf_info.next_seed,
+            got_seed = ?vdf_info.seed,
+            got_global_step_number = vdf_info.global_step_number,
+            "Seed data is invalid"
+        );
+        ValidationResult::Invalid(ValidationError::SeedDataInvalid(format!(
+            "seed mismatch: expected_next_seed={:?}, expected_seed={:?}, got_next_seed={:?}, got_seed={:?}, step={}",
+            expected_seed_data.0,
+            expected_seed_data.1,
+            vdf_info.next_seed,
+            vdf_info.seed,
+            vdf_info.global_step_number
+        )))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/block_validation.rs` around lines 1734 - 1741, The code
currently logs and returns the entire vdf_info via "{:?}" in the error! and
ValidationError::SeedDataInvalid call; replace that with a compact diagnostic
composed only of the minimal seed fields needed for triage (for example extract
vdf_info.seed and vdf_info.seed_height or equivalent minimal identifiers), build
a small string like "expected: X, got: seed=<...>, height=<...>" and use that
string in both error! and the ValidationError::SeedDataInvalid message instead
of printing the full vdf_info; update the code around expected_seed_data,
vdf_info, and the ValidationError::SeedDataInvalid construction to reference
those extracted fields.

1727-1728: 🧹 Nitpick | 🔵 Trivial

Track the consensus TODO explicitly.

This TODO marks a known validation gap in a consensus-critical path. Please link an issue (or add one) so it’s not lost.

If helpful, I can draft an issue with acceptance criteria and test cases for the missing difficulty-related seed validation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/block_validation.rs` around lines 1727 - 1728, The TODO in
block_validation.rs notes a consensus-critical missing difficulty validation but
has no tracking; create a formal issue in the repo (include acceptance criteria
and tests for seed/difficulty validation) and then update the TODO comment in
block_validation.rs to include the issue tracker reference (e.g.
"CONSENSUS-TODO: see issue `#123`" or URL) and a short actionable note pointing to
the code area to be fixed; also add the issue ID to any consensus-related
tracking document (e.g. issues.md or CHANGELOG) so the gap is discoverable and
not lost.
crates/actors/src/chunk_ingress_service.rs (1)

314-347: ⚠️ Potential issue | 🟠 Major

Shutdown continues into drain/flush even when handler quiescence fails.

At Line 314, timeout on acquire_many(...) means handlers may still be running, but the code still drains and flushes. That can race with in-flight writes and produce a non-deterministic shutdown state.

🛠️ Proposed guard: only run final drain/flush after successful permit acquisition
-        let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
-            Ok(Ok(p)) => Some(p),
-            Ok(Err(_)) => {
-                error!("Semaphore closed during chunk ingress shutdown drain");
-                None
-            }
-            Err(_) => {
-                warn!("Timed out waiting for in-flight chunk ingress handlers; proceeding without full drain");
-                None
-            }
-        };
+        let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
+            Ok(Ok(p)) => p,
+            Ok(Err(_)) => {
+                error!("Semaphore closed during chunk ingress shutdown drain; skipping drain/flush");
+                info!("ChunkIngressService shut down");
+                return Ok(());
+            }
+            Err(_) => {
+                warn!("Timed out waiting for in-flight chunk ingress handlers; skipping drain/flush");
+                info!("ChunkIngressService shut down");
+                return Ok(());
+            }
+        };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 314 - 347, The
shutdown currently proceeds to drain queued messages and flush the chunk writer
even if acquiring the shutdown permits failed; change the logic so that the
Phase 2 drain (loop using self.msg_rx.try_recv(),
wait_with_progress(self.inner.handle_message(...)), and the final
self.inner.chunk_data_writer.flush().await) only run when the permit acquisition
succeeded (i.e., when the match produced Some(p) for the acquire_fut result).
Use the existing _all_permits result (rename to all_permits or check .is_some())
to gate running the drain_fut and flush, and log/skipping the drain+flush path
when permits were not acquired to avoid racing with in-flight handlers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/mempool_service.rs`:
- Around line 3188-3213: The shutdown currently persists the mempool regardless
of whether acquire_many succeeded, risking inconsistent snapshots if handlers
timed out; modify the logic so persist_mempool_to_disk()
(self.inner.persist_mempool_to_disk()) is only called when the acquire_many
branch succeeded (the Ok(Ok(p)) case / when _all_permits is Some), e.g., move
the tokio::time::timeout(... self.inner.persist_mempool_to_disk() ...) call into
the successful branch or add an explicit check that _all_permits.is_some()
before attempting the 10s timeout persist; keep the existing timeout and error
handling for the persist call unchanged.

---

Outside diff comments:
In `@crates/actors/src/block_validation.rs`:
- Around line 1734-1741: The code currently logs and returns the entire vdf_info
via "{:?}" in the error! and ValidationError::SeedDataInvalid call; replace that
with a compact diagnostic composed only of the minimal seed fields needed for
triage (for example extract vdf_info.seed and vdf_info.seed_height or equivalent
minimal identifiers), build a small string like "expected: X, got: seed=<...>,
height=<...>" and use that string in both error! and the
ValidationError::SeedDataInvalid message instead of printing the full vdf_info;
update the code around expected_seed_data, vdf_info, and the
ValidationError::SeedDataInvalid construction to reference those extracted
fields.
- Around line 1727-1728: The TODO in block_validation.rs notes a
consensus-critical missing difficulty validation but has no tracking; create a
formal issue in the repo (include acceptance criteria and tests for
seed/difficulty validation) and then update the TODO comment in
block_validation.rs to include the issue tracker reference (e.g.
"CONSENSUS-TODO: see issue `#123`" or URL) and a short actionable note pointing to
the code area to be fixed; also add the issue ID to any consensus-related
tracking document (e.g. issues.md or CHANGELOG) so the gap is discoverable and
not lost.

In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 314-347: The shutdown currently proceeds to drain queued messages
and flush the chunk writer even if acquiring the shutdown permits failed; change
the logic so that the Phase 2 drain (loop using self.msg_rx.try_recv(),
wait_with_progress(self.inner.handle_message(...)), and the final
self.inner.chunk_data_writer.flush().await) only run when the permit acquisition
succeeded (i.e., when the match produced Some(p) for the acquire_fut result).
Use the existing _all_permits result (rename to all_permits or check .is_some())
to gate running the drain_fut and flush, and log/skipping the drain+flush path
when permits were not acquired to avoid racing with in-flight handlers.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cdc37a0 and 32b2091.

📒 Files selected for processing (7)
  • crates/actors/src/block_validation.rs
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/validation_service/block_validation_task.rs
  • docker/observation/configs/elasticsearch/index-settings.json
  • docker/observation/configs/elasticsearch/index-template.json
  • docker/observation/scripts/setup_elasticsearch_ilm.sh

Comment thread crates/actors/src/mempool_service.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
crates/actors/src/mempool_service.rs (1)

3226-3226: 🧹 Nitpick | 🔵 Trivial

Trace-level instrument on hot utility function — previously flagged as low-value.

This function is called on every reorg broadcast tick and already logs internally. The outer span with skip_all adds minimal observability value. Consider removing the attribute or adding meaningful fields if context is needed.
,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/mempool_service.rs` at line 3226, The
#[tracing::instrument(level = "trace", skip_all)] attribute on the hot utility
function (the function annotated with #[tracing::instrument(level = "trace",
skip_all)] that runs on every reorg broadcast tick) should be removed to avoid
cheap, low-value spans; either delete the attribute entirely or replace it with
a lighter-weight/meaningful span (e.g., remove skip_all and add specific fields
like reorg_id or tick_count) so that only useful context is recorded and you
don't instrument this hot path unnecessarily.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@crates/actors/src/mempool_service.rs`:
- Line 3226: The #[tracing::instrument(level = "trace", skip_all)] attribute on
the hot utility function (the function annotated with
#[tracing::instrument(level = "trace", skip_all)] that runs on every reorg
broadcast tick) should be removed to avoid cheap, low-value spans; either delete
the attribute entirely or replace it with a lighter-weight/meaningful span
(e.g., remove skip_all and add specific fields like reorg_id or tick_count) so
that only useful context is recorded and you don't instrument this hot path
unnecessarily.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 32b2091 and fa3fe42.

📒 Files selected for processing (1)
  • crates/actors/src/mempool_service.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/database/src/database.rs (1)

267-271: 🧹 Nitpick | 🔵 Trivial

LGTM - consider adding span fields for better traceability.

Trace level is appropriate for high-frequency chunk caching. Optionally, capturing data_root in the span fields would improve trace correlation, similar to how insert_block_header captures block_hash:

💡 Optional: Add data_root field to span
-#[tracing::instrument(level = "trace", skip_all)]
+#[tracing::instrument(level = "trace", skip_all, fields(data_root = ?chunk.data_root))]
 pub fn cache_chunk_verified<T: DbTx + DbTxMut>(
     tx: &T,
     chunk: &UnpackedChunk,
 ) -> eyre::Result<IsDuplicate> {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/database/src/database.rs` around lines 267 - 271, The tracing span for
cache_chunk_verified should include the chunk's data_root for better
traceability; update the #[tracing::instrument(...)] on the cache_chunk_verified
function to add a span field that records the chunk's data_root (i.e., capture
chunk.data_root or UnpackedChunk's data_root property) while keeping the
level="trace" and skip_all behavior, mirroring how insert_block_header includes
block_hash in its span fields.
♻️ Duplicate comments (2)
crates/p2p/src/chain_sync.rs (1)

357-370: ⚠️ Potential issue | 🟠 Major

Ensure whitelist running-flag is cleared even on panic.

At Line 367, the flag reset runs only on normal completion. If the spawned task panics, is_update_whitelist_task_running can stay true and permanently suppress future updates.

Proposed fix
         tokio::spawn(
             async move {
+                struct WhitelistRunningGuard(Arc<AtomicBool>);
+                impl Drop for WhitelistRunningGuard {
+                    fn drop(&mut self) {
+                        self.0.store(false, Ordering::Relaxed);
+                    }
+                }
+
+                let _guard = WhitelistRunningGuard(stake_and_pledge_whitelist_running_flag.clone());
+
                 match handler.pull_and_process_stake_and_pledge_whitelist().await {
                     Ok(()) => {
                         info!("Successfully updated stake and pledge whitelist",);
                     }
                     Err(e) => {
                         error!("Failed to update stake and pledge whitelist: {:?}", e);
                     }
                 }
-                stake_and_pledge_whitelist_running_flag.store(false, Ordering::Relaxed);
             }
             .instrument(tracing::info_span!("stake_and_pledge_whitelist_update")),
         );
#!/bin/bash
# Verify flag reset paths and whether a Drop-based guard exists.
rg -n -C4 'spawn_stake_and_pledge_update_task|is_update_whitelist_task_running|store\(false, Ordering::Relaxed\)|impl Drop' crates/p2p/src/chain_sync.rs
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/chain_sync.rs` around lines 357 - 370, The spawned task must
always clear stake_and_pledge_whitelist_running_flag even if it panics; wrap the
task body with a panic-safe cleanup (e.g., create a small ResetFlagGuard that
stores false in its Drop impl, or call std::panic::catch_unwind around the async
body) and instantiate it at the start of the async closure that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is set
to false in all exit paths; keep the
tracing::info_span!("stake_and_pledge_whitelist_update") instrumentation around
the spawned task.
crates/actors/src/chunk_ingress_service.rs (1)

314-326: ⚠️ Potential issue | 🟠 Major

Proceeding after failed quiescence can race with shutdown flush.

If acquire_many(...) times out or the semaphore is closed, shutdown still continues. That means in-flight handlers may still enqueue chunk writes while the service proceeds toward chunk_data_writer.flush(), which can produce incomplete final draining.

🛠️ Suggested guard (only final-drain/flush when handlers quiesce)
-        let _all_permits = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
+        let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
             Ok(Ok(p)) => Some(p),
             Ok(Err(_)) => {
                 error!("Semaphore closed during chunk ingress shutdown drain");
                 None
             }
             Err(_) => {
                 warn!(
                     "Timed out waiting for in-flight chunk ingress handlers; proceeding without full drain"
                 );
                 None
             }
         };
+        if handlers_quiesced.is_none() {
+            warn!("Skipping drain/flush because handlers did not fully quiesce");
+            info!("ChunkIngressService shut down");
+            return Ok(());
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 314 - 326, The
shutdown currently proceeds to call chunk_data_writer.flush() even if
acquire_many (the acquire_fut assigned to _all_permits) timed out or the
semaphore closed, which risks missing in-flight enqueues; change the shutdown
logic so it only performs the final drain/flush when acquire_many succeeded
(i.e., _all_permits is Some(p)). Specifically, inspect the match result from the
tokio::time::timeout block (the variable created from acquire_fut /
acquire_many) and if it returned Ok(Ok(p)) proceed with handing p to
chunk_data_writer.flush(); otherwise log and skip or retry waiting rather than
flushing immediately. Ensure references to acquire_many/acquire_fut/_all_permits
and chunk_data_writer.flush() are used so the guard is clearly enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 340-344: The timeout branch of the tokio::time::timeout match
abandons remaining queued messages, causing callers to receive generic
closed-channel errors; update the Err(_) arm (the timeout path around drain_fut)
to explicitly notify queued request/response items with a typed
overload/shutdown error by reusing the existing timeout notifier helper (use the
same notifier used elsewhere in this module to signal timeouts) or by iterating
the remaining items in the chunk ingress queue and sending a proper error via
their responder channels; reference the drain_fut match and the request/response
enum variants in chunk_ingress_service.rs and ensure each pending message uses
the existing notifier API to deliver the explicit error before returning.

In `@crates/actors/src/mempool_service.rs`:
- Around line 3159-3188: The loop currently pops a message from msg_rx
(traced.into_parts()) before attempting semaphore.acquire_owned() with timeout,
so when acquire times out the popped message is dropped and remaining messages
aren't drained; fix by acquiring the permit (using
inner.message_handler_semaphore.acquire_owned() with the same timeout) before
removing a message from msg_rx, then after successfully obtaining the permit
call msg_rx.try_recv() (or try_recv in a small loop) to get the next message and
spawn the task that calls inner.handle_message wrapped by wait_with_progress;
keep the tracing span creation and use of runtime_handle.spawn, and handle Err
from acquire_owned (semaphore closed) and timeout without discarding messages so
no queued messages are lost during shutdown drain.

---

Outside diff comments:
In `@crates/database/src/database.rs`:
- Around line 267-271: The tracing span for cache_chunk_verified should include
the chunk's data_root for better traceability; update the
#[tracing::instrument(...)] on the cache_chunk_verified function to add a span
field that records the chunk's data_root (i.e., capture chunk.data_root or
UnpackedChunk's data_root property) while keeping the level="trace" and skip_all
behavior, mirroring how insert_block_header includes block_hash in its span
fields.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 314-326: The shutdown currently proceeds to call
chunk_data_writer.flush() even if acquire_many (the acquire_fut assigned to
_all_permits) timed out or the semaphore closed, which risks missing in-flight
enqueues; change the shutdown logic so it only performs the final drain/flush
when acquire_many succeeded (i.e., _all_permits is Some(p)). Specifically,
inspect the match result from the tokio::time::timeout block (the variable
created from acquire_fut / acquire_many) and if it returned Ok(Ok(p)) proceed
with handing p to chunk_data_writer.flush(); otherwise log and skip or retry
waiting rather than flushing immediately. Ensure references to
acquire_many/acquire_fut/_all_permits and chunk_data_writer.flush() are used so
the guard is clearly enforced.

In `@crates/p2p/src/chain_sync.rs`:
- Around line 357-370: The spawned task must always clear
stake_and_pledge_whitelist_running_flag even if it panics; wrap the task body
with a panic-safe cleanup (e.g., create a small ResetFlagGuard that stores false
in its Drop impl, or call std::panic::catch_unwind around the async body) and
instantiate it at the start of the async closure that calls
handler.pull_and_process_stake_and_pledge_whitelist().await so the flag is set
to false in all exit paths; keep the
tracing::info_span!("stake_and_pledge_whitelist_update") instrumentation around
the spawned task.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fa3fe42 and def01de.

📒 Files selected for processing (22)
  • crates/actors/src/block_discovery.rs
  • crates/actors/src/block_producer.rs
  • crates/actors/src/block_tree_service.rs
  • crates/actors/src/block_validation.rs
  • crates/actors/src/cache_service.rs
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
  • crates/actors/src/chunk_migration_service.rs
  • crates/actors/src/data_sync_service.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/partition_mining_service.rs
  • crates/actors/src/storage_module_service.rs
  • crates/actors/src/validation_service.rs
  • crates/actors/src/validation_service/block_validation_task.rs
  • crates/chain/src/chain.rs
  • crates/database/src/data_ledger.rs
  • crates/database/src/database.rs
  • crates/domain/src/snapshots/epoch_snapshot/epoch_snapshot.rs
  • crates/p2p/src/chain_sync.rs
  • crates/p2p/src/gossip_client.rs
  • crates/vdf/src/state.rs
  • crates/vdf/src/vdf.rs

Comment on lines +340 to +344
match tokio::time::timeout(Duration::from_secs(10), drain_fut).await {
Ok(()) => tracing::debug!("Processed remaining chunk ingress messages successfully"),
Err(_) => tracing::warn!(
"Timeout processing remaining chunk ingress messages, continuing shutdown"
),
Err(_) => {
warn!("Timeout draining remaining chunk ingress messages, continuing shutdown")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Timeout path drops remaining queued requests without explicit caller error.

When shutdown drain times out, remaining queued messages are abandoned. For request/response variants, callers get a generic closed-channel failure instead of a typed overload/shutdown error.

🛠️ Suggested follow-up (reuse existing timeout notifier)
         match tokio::time::timeout(Duration::from_secs(10), drain_fut).await {
             Ok(()) => tracing::debug!("Processed remaining chunk ingress messages successfully"),
             Err(_) => {
                 warn!("Timeout draining remaining chunk ingress messages, continuing shutdown")
+                while let Ok(traced) = self.msg_rx.try_recv() {
+                    let (msg, _) = traced.into_parts();
+                    Self::send_timeout_errors(msg);
+                }
             }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 340 - 344, The
timeout branch of the tokio::time::timeout match abandons remaining queued
messages, causing callers to receive generic closed-channel errors; update the
Err(_) arm (the timeout path around drain_fut) to explicitly notify queued
request/response items with a typed overload/shutdown error by reusing the
existing timeout notifier helper (use the same notifier used elsewhere in this
module to signal timeouts) or by iterating the remaining items in the chunk
ingress queue and sending a proper error via their responder channels; reference
the drain_fut match and the request/response enum variants in
chunk_ingress_service.rs and ensure each pending message uses the existing
notifier API to deliver the explicit error before returning.

Comment thread crates/actors/src/mempool_service.rs Outdated
Comment on lines +3159 to +3188
// Phase 1: drain queued messages concurrently using semaphore permits
while let Ok(traced) = self.msg_rx.try_recv() {
let (msg, parent_span) = traced.into_parts();
let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg.variant_name());
self.inner.handle_message(msg).instrument(span).await?;
let msg_type = msg.variant_name();
let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type);

let inner = Arc::clone(&self.inner);
let semaphore = inner.message_handler_semaphore.clone();
match tokio::time::timeout(Duration::from_secs(10), semaphore.acquire_owned()).await {
Ok(Ok(permit)) => {
runtime_handle.spawn(async move {
let _permit = permit;
let task_info = format!("shutdown drain: {}", msg_type);
if let Err(e) = wait_with_progress(
inner.handle_message(msg),
20,
&task_info,
).await {
tracing::error!("Error handling message during shutdown drain: {:?}", e);
}
}.instrument(span));
}
Ok(Err(_)) => {
tracing::error!("Semaphore closed during shutdown drain");
break;
}
Err(_) => {
tracing::warn!("Timed out acquiring permit during shutdown drain, skipping remaining messages");
break;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Shutdown drain can silently drop queued messages on permit-acquire timeout.

In this loop, a message is popped from msg_rx before permit acquisition. If acquire_owned() times out, the code breaks, dropping that popped message and leaving the rest undrained. This can lose final mempool operations during shutdown.

🛠️ Suggested direction (avoid pop-before-permit timeout loss)
-            // Phase 1: drain queued messages concurrently using semaphore permits
-            while let Ok(traced) = self.msg_rx.try_recv() {
-                let (msg, parent_span) = traced.into_parts();
-                ...
-                match tokio::time::timeout(Duration::from_secs(10), semaphore.acquire_owned()).await {
-                    Ok(Ok(permit)) => { runtime_handle.spawn(...); }
-                    Ok(Err(_)) => { ...; break; }
-                    Err(_) => { ...; break; }
-                }
-            }
-
-            // Phase 2: acquire all permits to wait for in-flight + drain-spawned handlers
+            // Phase 1: wait for all in-flight handlers first.
             let acquire_fut = self
                 .inner
                 .message_handler_semaphore
                 .acquire_many(self.inner.max_concurrent_tasks);
-            let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
+            let handlers_quiesced = match tokio::time::timeout(Duration::from_secs(30), acquire_fut).await {
                 ...
             };
 
-            if handlers_quiesced {
+            // Phase 2: once quiesced, drain synchronously so no popped message is dropped on permit timeouts.
+            if handlers_quiesced {
+                while let Ok(traced) = self.msg_rx.try_recv() {
+                    let (msg, parent_span) = traced.into_parts();
+                    let msg_type = msg.variant_name();
+                    let span = tracing::info_span!(parent: &parent_span, "mempool_handle_message", msg_type = %msg_type);
+                    let task_info = format!("shutdown drain: {}", msg_type);
+                    if let Err(e) = wait_with_progress(self.inner.handle_message(msg), 20, &task_info)
+                        .instrument(span)
+                        .await
+                    {
+                        tracing::error!("Error handling message during shutdown drain: {:?}", e);
+                    }
+                }
+
                 match tokio::time::timeout(
                     Duration::from_secs(10),
                     self.inner.persist_mempool_to_disk(),
                 ).await { ... }
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/mempool_service.rs` around lines 3159 - 3188, The loop
currently pops a message from msg_rx (traced.into_parts()) before attempting
semaphore.acquire_owned() with timeout, so when acquire times out the popped
message is dropped and remaining messages aren't drained; fix by acquiring the
permit (using inner.message_handler_semaphore.acquire_owned() with the same
timeout) before removing a message from msg_rx, then after successfully
obtaining the permit call msg_rx.try_recv() (or try_recv in a small loop) to get
the next message and spawn the task that calls inner.handle_message wrapped by
wait_with_progress; keep the tracing span creation and use of
runtime_handle.spawn, and handle Err from acquire_owned (semaphore closed) and
timeout without discarding messages so no queued messages are lost during
shutdown drain.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
crates/actors/src/mempool_service.rs (1)

3166-3197: ⚠️ Potential issue | 🟠 Major

Shutdown inline fallback bypasses semaphore bound (regression).

At Line 3184, the NoPermits branch processes handle_message inline without holding a permit, so shutdown can run above max_concurrent_tasks while permit-held handlers are still active. This reintroduces the same class of drain-path concurrency issue.

🛠️ Suggested fix (keep popped message, still honor semaphore)
                 match inner.message_handler_semaphore.clone().try_acquire_owned() {
                     Ok(permit) => {
                         runtime_handle.spawn(async move {
                             let _permit = permit;
                             let task_info = format!("shutdown drain: {}", msg_type);
                             if let Err(e) = wait_with_progress(
                                 inner.handle_message(msg),
                                 20,
                                 &task_info,
                             ).await {
                                 tracing::error!("Error handling message during shutdown drain: {:?}", e);
                             }
                         }.instrument(span));
                     }
                     Err(tokio::sync::TryAcquireError::Closed) => {
                         tracing::error!("Semaphore closed during shutdown drain");
                         break;
                     }
                     Err(tokio::sync::TryAcquireError::NoPermits) => {
-                        let task_info = format!("shutdown drain (inline): {}", msg_type);
-                        if let Err(e) = wait_with_progress(
-                            inner.handle_message(msg),
-                            20,
-                            &task_info,
-                        )
-                        .instrument(span)
-                        .await
-                        {
-                            tracing::error!("Error handling message during shutdown drain: {:?}", e);
-                        }
+                        // Preserve message handling but still respect concurrency bound.
+                        match inner.message_handler_semaphore.clone().acquire_owned().await {
+                            Ok(permit) => {
+                                let _permit = permit;
+                                let task_info = format!("shutdown drain (inline): {}", msg_type);
+                                if let Err(e) = wait_with_progress(
+                                    inner.handle_message(msg),
+                                    20,
+                                    &task_info,
+                                )
+                                .instrument(span)
+                                .await
+                                {
+                                    tracing::error!("Error handling message during shutdown drain: {:?}", e);
+                                }
+                            }
+                            Err(_) => {
+                                tracing::error!("Semaphore closed while waiting in shutdown drain");
+                                break;
+                            }
+                        }
                     }
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/mempool_service.rs` around lines 3166 - 3197, The shutdown
inline fallback in the NoPermits branch bypasses inner.message_handler_semaphore
and can exceed max_concurrent_tasks; modify the NoPermits handling so it still
respects the semaphore: instead of invoking inner.handle_message(msg) directly,
re-acquire a permit (e.g., await
inner.message_handler_semaphore.acquire_owned().await) before calling
handle_message, and then run the handler under the acquired permit (using the
same wait_with_progress, task_info and span logic) so both the inline path and
spawned tasks honor the semaphore bound and the popped message is still
processed during shutdown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@crates/actors/src/mempool_service.rs`:
- Around line 3166-3197: The shutdown inline fallback in the NoPermits branch
bypasses inner.message_handler_semaphore and can exceed max_concurrent_tasks;
modify the NoPermits handling so it still respects the semaphore: instead of
invoking inner.handle_message(msg) directly, re-acquire a permit (e.g., await
inner.message_handler_semaphore.acquire_owned().await) before calling
handle_message, and then run the handler under the acquired permit (using the
same wait_with_progress, task_info and span logic) so both the inline path and
spawned tasks honor the semaphore bound and the popped message is still
processed during shutdown.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between def01de and abd4120.

📒 Files selected for processing (2)
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/mempool_service.rs

@glottologist glottologist merged commit 4b87ad7 into master Feb 26, 2026
18 checks passed
@glottologist glottologist deleted the jason/missing_spans branch February 26, 2026 12:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants