Skip to content

refactor: extract ChunkIngressService from mempool#1155

Merged
roberts-pumpurs merged 24 commits into
masterfrom
rob/chunk-ingress-service
Feb 23, 2026
Merged

refactor: extract ChunkIngressService from mempool#1155
roberts-pumpurs merged 24 commits into
masterfrom
rob/chunk-ingress-service

Conversation

@roberts-pumpurs

@roberts-pumpurs roberts-pumpurs commented Feb 18, 2026

Copy link
Copy Markdown
Contributor

Summary

Extracts chunk ingestion and ingress proof handling out of MempoolService into a dedicated ChunkIngressService, simplifying the mempool by separating concerns.

The mempool previously handled data transactions, commitment transactions, block confirmation, and all chunk data ingestion in a single service. This refactoring moves chunk-related responsibilities into their own service while the mempool retains transaction-level logic only.

Key changes:

  • New ChunkIngressService with its own message loop, concurrency control (semaphore-based), and periodic sweep timer
  • Moved chunks, ingress_proofs, pending_chunks, and chunk metrics modules from mempool into the new service
  • New ChunkIngressFacadeImpl for P2P gossip handler integration
  • New max_concurrent_chunk_ingress_tasks config field for independent concurrency tuning
  • All chunk entry points (HTTP upload, gossip, block validation, data sync) now route to ChunkIngressService instead of mempool
  • Mempool notifies ChunkIngressService via ProcessPendingChunks message when a data TX is validated, so buffered chunks waiting on their TX header can be processed

Communication between services:

  • MempoolServiceChunkIngressService: ProcessPendingChunks(DataRoot) after TX validation
  • MempoolServiceChunkIngressService: GetPendingChunksCount for status reporting

Summary by CodeRabbit

  • New Features

    • Adds a dedicated chunk ingress service with messaging, facade, state handle, public errors, pending-chunks view, and API exposure; new config to control max concurrent chunk-ingress tasks with runtime validation.
  • Refactor

    • Routes chunk ingestion, ingress-proof handling, and pending-chunk processing to the new service; mempool surface simplified and status now sourced from chunk-ingress state; wiring updated across API, gossip, p2p, and runtime.
  • Chores

    • Tests and test scaffolding updated to use the new ingress channel and consume chunk-ingress messages.

Move all chunk validation, caching, storage writes, ingress proof
generation/validation, and pending chunk management out of the mempool
into a dedicated ChunkIngressService actor. The mempool's only remaining
link is a one-way ProcessPendingChunks message when a data TX is ingested.

Key changes:
- New chunk_ingress_service module with chunks, ingress_proofs,
  pending_chunks, and facade submodules
- All callers (API, P2P gossip, data sync, block validation) now route
  chunk/proof messages through the chunk_ingress channel
- cache_service uses ChunkIngressServiceInner for proof expiry checks
- GossipDataHandler gets a dedicated chunk_ingress field
- Mempool no longer owns storage_modules_guard, recent_valid_chunks,
  or pending_chunks state
@coderabbitai

coderabbitai Bot commented Feb 18, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

Extracts chunk ingestion and ingress-proof handling from the mempool into a new ChunkIngressService (with facade, messages, state, spawn logic), rewires call sites/tests to use it, removes pending-chunks state from mempool, and adds configuration for concurrent chunk-ingress tasks.

Changes

Cohort / File(s) Summary
Chunk ingress subsystem
crates/actors/src/chunk_ingress_service.rs, crates/actors/src/chunk_ingress_service/chunks.rs, crates/actors/src/chunk_ingress_service/facade.rs, crates/actors/src/chunk_ingress_service/ingress_proofs.rs, crates/actors/src/chunk_ingress_service/pending_chunks.rs, crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
Adds new ChunkIngressService: message enum, service/state/inner, spawn logic, facade, ingress-proof error types, pending-chunks helper, and refactors chunk data writer internals/visibility and duplicate-tracking.
Mempool refactor
crates/actors/src/mempool_service.rs, crates/actors/src/mempool_service/data_txs.rs, crates/actors/src/mempool_service/facade.rs, crates/actors/src/mempool_service/types.rs
Removes chunk/ingress-proof variants and pending_chunks/recent_valid_chunks from mempool, delegates pending-chunk processing to ChunkIngressService, updates facade API and spawn signature to accept ChunkIngressState.
Service wiring & re-exports
crates/actors/src/lib.rs, crates/actors/src/services.rs, crates/actors/src/cache_service.rs
Adds chunk_ingress channel to ServiceSenders/ServiceReceivers, re-exports chunk-ingress types, updates imports to use ChunkIngressServiceInner and new module paths.
Call-site routing changes
crates/actors/src/block_validation.rs, crates/actors/src/data_sync_service.rs, crates/api-server/src/routes/post_chunk.rs, crates/chain/tests/utils.rs
Routes unpacked chunks and ingress-proofs to service_senders.chunk_ingress using ChunkIngressMessage variants (IngestChunk/IngestIngressProof/ProcessPendingChunks); updates error/log messages and oneshot usage.
Chain & API integration
crates/chain/src/chain.rs, crates/api-server/src/lib.rs
Spawns ChunkIngressService at startup, threads ChunkIngressState into IrysNodeCtx/ApiState, wires handle into runtime services, and updates mempool spawn signature.
P2P / Gossip wiring & tests
crates/p2p/src/gossip_data_handler.rs, crates/p2p/src/gossip_service.rs, crates/p2p/src/tests/*, crates/p2p/src/types.rs
Injects ChunkIngressFacadeImpl into gossip handlers, replaces mempool facade chunk methods with chunk_ingress facade, updates imports and test scaffolding to consume chunk_ingress messages.
Tests & fixtures
crates/chain/tests/*, crates/p2p/src/tests/*
Updates tests to send chunks/ingress-proofs via chunk_ingress channel, spawns test consumers for chunk_ingress, and removes mempool pending-count assertions.
Configuration
crates/types/src/config/mod.rs, crates/types/src/config/node.rs
Adds max_concurrent_chunk_ingress_tasks to mempool config, runtime validation in Config::validate, and defaults in testing and node config.
Mempool guard & data writer cleanup
crates/actors/src/mempool_guard.rs, crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
Removes pending_chunk_count_for_data_root from MempoolReadGuard; narrows ChunkDataWriter visibility, drops pending-chunk counters, and simplifies duplicate tracking and flush behavior.
Misc / small updates
crates/actors/src/cache_service.rs, crates/actors/src/data_sync_service.rs, other small files
Namespace/import updates to use chunk_ingress modules, replace mempool-based sends with chunk_ingress messages, minor logging and re-export adjustments.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as API Server
    participant Chunk as ChunkIngressService
    participant DB as Database
    participant Cache as Cache Service
    participant Mempool as Mempool Service

    autonumber

    rect rgba(200,150,255,0.5)
    note over Client,Cache: Old Flow (via Mempool)
    Client->>API: POST chunk
    API->>Mempool: MempoolServiceMessage::IngestChunk
    Mempool->>Mempool: process_pending_chunks_for_root
    Mempool->>DB: store chunk
    Mempool->>Cache: record_recent_valid_chunk
    end

    rect rgba(150,200,255,0.5)
    note over Client,Cache: New Flow (via ChunkIngressService)
    Client->>API: POST chunk
    API->>Chunk: ChunkIngressMessage::IngestChunk
    Chunk->>Chunk: handle_chunk_ingress_message
    Chunk->>DB: store chunk
    Chunk->>Cache: record_recent_valid_chunk
    Chunk->>Mempool: ChunkIngressMessage::ProcessPendingChunks (notify)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • glottologist
  • antouhou
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically describes the main refactoring: extracting chunk ingestion responsibilities from the mempool service into a dedicated ChunkIngressService.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch rob/chunk-ingress-service

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

- Restore concurrent message handling with semaphore-gated spawning
- Move chunk metrics from mempool to chunk_ingress_service module
- Add error metrics on pending chunk replay path
- Add periodic 30s sweep timer for stranded pending chunks
- Downgrade ProcessPendingChunks send failure to warn (sweep retries)
- Query real pending_chunks_count from ChunkIngressService for status
- Remove ChunkIngressFacade trait, keep concrete impl
- Remove planning docs
@roberts-pumpurs roberts-pumpurs self-assigned this Feb 19, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/actors/src/chunk_ingress_service/chunks.rs (1)

221-238: ⚠️ Potential issue | 🟠 Major

Consolidate the pre-header cap check with the insert under a single write lock to prevent race condition.

Between the read lock check (lines 220-226) and the write lock insert (line 238), concurrent ingests can bypass the cap. Move the check and insert under the same write lock as shown below to keep the DoS guard effective.

🔧 Suggested change
-                let current_chunk_count = self
-                    .pending_chunks
-                    .read()
-                    .await
-                    .get(&chunk.data_root)
-                    .map(lru::LruCache::len)
-                    .unwrap_or(0);
+                let mut pending = self.pending_chunks.write().await;
+                let current_chunk_count = pending
+                    .get(&chunk.data_root)
+                    .map(lru::LruCache::len)
+                    .unwrap_or(0);
                 if current_chunk_count >= preheader_chunks_per_item {
                     warn!(
                         "Dropping pre-header chunk for {} at offset {}: cache full ({}/{})",
                         &chunk.data_root,
                         &chunk.tx_offset,
                         current_chunk_count,
                         preheader_chunks_per_item
                     );
                     return Err(AdvisoryChunkIngressError::PreHeaderOffsetExceedsCap.into());
                 }
 
-                self.pending_chunks.write().await.put(chunk.clone());
+                pending.put(chunk.clone());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/chunks.rs` around lines 221 - 238,
The pre-header cap check currently uses a read lock on self.pending_chunks and
later a separate write lock for insertion, creating a race where concurrent
ingests can exceed preheader_chunks_per_item; change this by acquiring a write
lock on self.pending_chunks and perform both the get/len check and the
put(chunk.clone()) under that same write lock, preserving the warn(...) and
returning Err(AdvisoryChunkIngressError::PreHeaderOffsetExceedsCap.into()) when
current_chunk_count >= preheader_chunks_per_item, and removing the earlier
separate read lock usage.
crates/p2p/src/gossip_data_handler.rs (1)

239-253: ⚠️ Potential issue | 🟡 Minor

Update ingress-proof log text to reflect chunk ingress.

The log messages still say “mempool,” which is misleading after the routing change.

💡 Suggested update
-                debug!("Ingress Proof sent to mempool");
+                debug!("Ingress Proof sent to chunk ingress");
@@
-                error!("Error when sending ingress proof to mempool: {}", error);
+                error!("Error when sending ingress proof to chunk ingress: {}", error);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/gossip_data_handler.rs` around lines 239 - 253, The debug! and
error! log lines in the chunk_ingress handling branch still reference “mempool”;
update the messages in the block that calls
chunk_ingress.handle_ingest_ingress_proof(proof).await (the branch matching
Ok(()) | Err(GossipError::TransactionIsAlreadyHandled) and the Err(error) arm)
to refer to "chunk ingress" (or similar accurate wording) instead of "mempool"
so logs reflect the new routing; keep existing context (e.g., include the error
variable in error! and retain record_seen with
GossipCacheKey::IngressProof(proof_hash)).
crates/p2p/src/tests/util.rs (1)

1013-1038: ⚠️ Potential issue | 🟠 Major

Keep chunk_ingress receiver alive in handler stubs

data_handler_stub and data_handler_with_stubbed_pool create a ChunkIngressFacadeImpl but immediately drop service_receivers, closing the chunk_ingress channel. The handle_chunk() and handle_ingress_proof() methods invoke chunk_ingress operations that will fail if these code paths are exercised. Retain the receiver and drain it with a minimal stub spawned in a background task (matching the pattern already used in the fixture earlier in this file).

🔧 Suggested fix (apply similarly in both helpers)
-    let (service_senders, _service_receivers) =
-        irys_actors::test_helpers::build_test_service_senders();
+    let (service_senders, service_receivers) =
+        irys_actors::test_helpers::build_test_service_senders();
+    let mut chunk_ingress_rx = service_receivers.chunk_ingress;
+    tokio::spawn(async move {
+        use irys_actors::ChunkIngressMessage;
+        while let Some(message) = chunk_ingress_rx.recv().await {
+            match message {
+                ChunkIngressMessage::IngestChunk(_, reply) => {
+                    let _ = reply.send(Ok(()));
+                }
+                ChunkIngressMessage::IngestChunkFireAndForget(_) => {}
+                ChunkIngressMessage::IngestIngressProof(_, reply) => {
+                    let _ = reply.send(Ok(()));
+                }
+                ChunkIngressMessage::ProcessPendingChunks(_) => {}
+                ChunkIngressMessage::GetPendingChunksCount(reply) => {
+                    let _ = reply.send(0);
+                }
+            }
+        }
+    });

Also applies to: 1082-1089

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/tests/util.rs` around lines 1013 - 1038, The
ChunkIngressFacadeImpl created in data_handler_stub and
data_handler_with_stubbed_pool currently drops the service_receivers so
chunk_ingress channels close and subsequent calls to chunk_ingress in methods
like handle_chunk and handle_ingress_proof will fail; fix by keeping the
receiver alive: capture the service_receivers/receiver returned by
irys_actors::chunk_ingress_service::facade::ChunkIngressFacadeImpl::from(&service_senders)
alongside chunk_ingress and spawn a background task (as used earlier in this
file) that drains the receiver with a minimal stub loop so the channel remains
open for the lifetime of the GossipDataHandler/BlockPool stub instances. Ensure
references to ChunkIngressFacadeImpl, data_handler_stub,
data_handler_with_stubbed_pool, chunk_ingress, and service_receivers are updated
accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service/facade.rs`:
- Around line 18-55: Both handle_chunk_ingress and handle_ingest_ingress_proof
currently call oneshot_rx.await.expect(...) which panics if the service task
dropped the responder; instead map the oneshot::error::RecvError into the
appropriate domain error and return Err. For handle_chunk_ingress replace the
expect with awaiting the oneshot and map Err(RecvError) into a ChunkIngressError
(e.g., wrap as CriticalChunkIngressError::Other or a new ChunkIngressError
variant) including chunk_data_root and chunk_tx_offset context; for
handle_ingest_ingress_proof map RecvError into IngressProofError::Other (include
data_root) and return that Result::Err. Apply the same pattern in the analogous
functions in mempool_service/facade.rs.

In `@crates/actors/src/chunk_ingress_service/mod.rs`:
- Around line 285-306: The timeout branch currently calls
inner.handle_message(msg).await which runs the handler inline and can block the
event loop; change this to avoid awaiting inline on the reactor thread. Replace
the Err(_) arm (the tokio::time::timeout result) so it either (a) immediately
send an error/dropped response for the message (use the message's response
mechanism) and continue, or (b) offload processing to a spawned background task
(tokio::spawn or tokio::task::spawn_blocking as appropriate) so
inner.handle_message(msg) runs off the event loop; update the code around
message_handler_semaphore, semaphore.acquire_owned(), and inner.handle_message
to implement one of these non-blocking behaviors.

In `@crates/actors/src/data_sync_service.rs`:
- Around line 257-266: The send to the chunk_ingress channel currently uses
expect and will panic if the receiver is closed; update the code in the data
sync path (after sm.write_data_chunk(&unpacked_chunk) failure) to handle send
errors gracefully instead of calling expect on
service_senders.chunk_ingress.send(ChunkIngressMessage::IngestChunkFireAndForget(unpacked_chunk)).
Replace the expect with a non-panicking branch (e.g., match or if let Err(err) =
...) that logs the failure (using the crate's logging/tracing facility)
including the error and chunk context, and then continues without crashing the
data sync task so behavior matches other places like post_chunk.rs and
storage_module_service.

In `@crates/actors/src/mempool_service.rs`:
- Around line 415-431: The pending chunks query in the mempool status path (call
to self.mempool_state.get_status) silently falls back to 0 when
service_senders.chunk_ingress.send or rx.await fails; add diagnostic logging on
both failure points so dropped/closed-channel and oneshot-receive errors are
visible. Specifically, inspect the result of
self.service_senders.chunk_ingress.send(crate::chunk_ingress_service::ChunkIngressMessage::GetPendingChunksCount(tx))
and log a warn/debug with context (e.g., mentioning chunk_ingress or
GetPendingChunksCount) if send returns Err, and also handle the rx.await result
explicitly instead of unwrap_or(0) so you can log when the oneshot receive
errors before keeping the 0 fallback. Ensure logs include enough context (node
id or config info) and preserve the existing fallback behavior.

In `@crates/api-server/src/routes/post_chunk.rs`:
- Around line 16-19: The doc comment wrongly says "mempool" — update it to
describe the chunk ingress service; change the summary sentence in the top doc
comment of the post_chunk handler (e.g., the function named post_chunk or
similar in post_chunk.rs) to say it handles HTTP POST requests for ingesting a
Chunk via the chunk ingress service, and ensure subsequent sentences reference
ChunkIngressMessage and "chunk ingress service" (not "mempool") throughout the
comment so documentation accurately reflects the service name.

---

Outside diff comments:
In `@crates/actors/src/chunk_ingress_service/chunks.rs`:
- Around line 221-238: The pre-header cap check currently uses a read lock on
self.pending_chunks and later a separate write lock for insertion, creating a
race where concurrent ingests can exceed preheader_chunks_per_item; change this
by acquiring a write lock on self.pending_chunks and perform both the get/len
check and the put(chunk.clone()) under that same write lock, preserving the
warn(...) and returning
Err(AdvisoryChunkIngressError::PreHeaderOffsetExceedsCap.into()) when
current_chunk_count >= preheader_chunks_per_item, and removing the earlier
separate read lock usage.

In `@crates/p2p/src/gossip_data_handler.rs`:
- Around line 239-253: The debug! and error! log lines in the chunk_ingress
handling branch still reference “mempool”; update the messages in the block that
calls chunk_ingress.handle_ingest_ingress_proof(proof).await (the branch
matching Ok(()) | Err(GossipError::TransactionIsAlreadyHandled) and the
Err(error) arm) to refer to "chunk ingress" (or similar accurate wording)
instead of "mempool" so logs reflect the new routing; keep existing context
(e.g., include the error variable in error! and retain record_seen with
GossipCacheKey::IngressProof(proof_hash)).

In `@crates/p2p/src/tests/util.rs`:
- Around line 1013-1038: The ChunkIngressFacadeImpl created in data_handler_stub
and data_handler_with_stubbed_pool currently drops the service_receivers so
chunk_ingress channels close and subsequent calls to chunk_ingress in methods
like handle_chunk and handle_ingress_proof will fail; fix by keeping the
receiver alive: capture the service_receivers/receiver returned by
irys_actors::chunk_ingress_service::facade::ChunkIngressFacadeImpl::from(&service_senders)
alongside chunk_ingress and spawn a background task (as used earlier in this
file) that drains the receiver with a minimal stub loop so the channel remains
open for the lifetime of the GossipDataHandler/BlockPool stub instances. Ensure
references to ChunkIngressFacadeImpl, data_handler_stub,
data_handler_with_stubbed_pool, chunk_ingress, and service_receivers are updated
accordingly.

Comment thread crates/actors/src/chunk_ingress_service/facade.rs Outdated
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/data_sync_service.rs Outdated
Comment thread crates/actors/src/mempool_service.rs Outdated
Comment thread crates/api-server/src/routes/post_chunk.rs Outdated
@roberts-pumpurs roberts-pumpurs marked this pull request as ready for review February 19, 2026 18:10

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
crates/p2p/src/gossip_data_handler.rs (1)

42-48: 🧹 Nitpick | 🔵 Trivial

chunk_ingress is a concrete type while mempool is generic — consider a trait for testability.

GossipDataHandler uses a generic TMempoolFacade: MempoolFacade for the mempool but a concrete ChunkIngressFacadeImpl for chunk ingress. This makes it harder to mock chunk ingress in tests. Consider introducing a ChunkIngressFacade trait (mirroring MempoolFacade) and making this field generic, for consistency and testability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/gossip_data_handler.rs` around lines 42 - 48,
GossipDataHandler currently takes a generic TMempoolFacade but a concrete
ChunkIngressFacadeImpl, which hinders mocking; introduce a ChunkIngressFacade
trait (analogous to MempoolFacade) and change the struct to accept a generic
TChunkIngress: ChunkIngressFacade (replace the chunk_ingress:
ChunkIngressFacadeImpl field with chunk_ingress: TChunkIngress), then update
constructors/impl blocks and any usages of GossipDataHandler, new(), and methods
that reference chunk_ingress to be generic over TChunkIngress so tests can
inject mocks.
crates/p2p/src/tests/util.rs (1)

996-997: ⚠️ Potential issue | 🟡 Minor

Dropped _service_receivers closes the chunk_ingress channel in test helpers.

data_handler_stub and data_handler_with_stubbed_pool both immediately drop _service_receivers after building service senders, which closes the underlying UnboundedReceiver<ChunkIngressMessage>. Any code path that calls chunk_ingress.handle_chunk_ingress() will fail with a send error on a closed channel. While current tests may not exercise these paths, this is a latent reliability gap.

The GossipServiceTestFixture already demonstrates the correct pattern: keep the receiver alive by spawning a drain task. Apply that same pattern here by extracting chunk_ingress_rx from service_receivers and spawning an async task to consume messages before the receiver would be dropped.

Suggested fix
 pub(crate) fn data_handler_stub(
     config: &Config,
     peer_list_guard: &PeerList,
     db: DatabaseProvider,
     sync_state: ChainSyncState,
 ) -> Arc<GossipDataHandler<MempoolStub, BlockDiscoveryStub>> {
     ...
-    let (service_senders, _service_receivers) =
+    let (service_senders, service_receivers) =
         irys_actors::test_helpers::build_test_service_senders();
+    // Keep chunk_ingress receiver alive so sends from the facade don't fail.
+    let mut chunk_ingress_rx = service_receivers.chunk_ingress;
+    let chunk_store_drain = Arc::clone(&mempool_stub.chunks);
+    tokio::spawn(async move {
+        use irys_actors::ChunkIngressMessage;
+        while let Some(msg) = chunk_ingress_rx.recv().await {
+            match msg {
+                ChunkIngressMessage::IngestChunk(chunk, reply) => {
+                    chunk_store_drain.write().unwrap().push(chunk);
+                    let _ = reply.send(Ok(()));
+                }
+                ChunkIngressMessage::IngestChunkFireAndForget(chunk) => {
+                    chunk_store_drain.write().unwrap().push(chunk);
+                }
+                ChunkIngressMessage::IngestIngressProof(_, reply) => { let _ = reply.send(Ok(())); }
+                ChunkIngressMessage::ProcessPendingChunks(_) => {}
+                ChunkIngressMessage::GetPendingChunksCount(reply) => { let _ = reply.send(0); }
+            }
+        }
+    });
     ...
 }

Apply the same pattern to data_handler_with_stubbed_pool.

Applies to lines 1013–1014 and 1065–1066, 1083–1084.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/tests/util.rs` around lines 996 - 997, data_handler_stub and
data_handler_with_stubbed_pool drop the `_service_receivers` returned by
irys_actors::test_helpers::build_test_service_senders(), which closes the
underlying UnboundedReceiver<ChunkIngressMessage> and causes sends to fail; fix
by extracting the chunk_ingress receiver from `service_receivers` (e.g., `let
mut chunk_ingress_rx = service_receivers.chunk_ingress;`) and spawn a background
task (tokio::spawn or similar) that drains it (loop receiving and ignoring
messages) so the receiver lives for the test lifetime—follow the same pattern
used in GossipServiceTestFixture to keep `chunk_ingress_rx` alive instead of
dropping `_service_receivers`.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 51-62: ChunkIngressServiceInner currently exposes every field
publicly; tighten visibility to encapsulate internal state by changing the
fields (e.g., block_tree_read_guard, config, exec, irys_db,
message_handler_semaphore, service_senders, storage_modules_guard,
recent_valid_chunks, pending_chunks, chunk_data_writer) from pub to a more
restrictive scope such as pub(crate) or pub(super) (or make them private and add
explicit getters) and then update any external call sites that rely on direct
field access (if any) to use spawn_service-local helpers or new accessor
methods; locate uses via the ChunkIngressServiceInner type and adjust visibility
or API accordingly.
- Around line 143-164: In sweep_pending_chunks, the DB read result from
self.irys_db.view_eyre(...).ok().flatten().is_some() currently drops errors
silently; change this to explicitly match the Result from view_eyre and, on Err,
emit a trace-level log including the error (use tracing::trace!) before
continuing; keep the existing success path that calls
process_pending_chunks_for_root(data_root) when cached_data_root_by_data_root
returns Some. This should reference the sweep_pending_chunks method,
self.irys_db.view_eyre call, and the cached_data_root_by_data_root helper so you
can locate and update the error handling and add the trace log without changing
retry semantics.
- Around line 271-276: The spawn-on-each-tick in the timer handler can start
overlapping sweeps because each tick always calls
tokio::spawn(inner.sweep_pending_chunks()), so add a concurrency guard inside
the sweep path to skip starting a new run when one is active: introduce an
AtomicBool (e.g., sweep_running) or a tokio::sync::Mutex<()> and check it at the
start of sweep_pending_chunks() (or wrap the spawned closure) using
compare_exchange/set or try_lock(), return immediately if already held, and
clear/unlock when the sweep finishes (including on error); update references to
inner.sweep_pending_chunks() and the timer closure so the tick still spawns but
the guard prevents concurrent execution.

In `@crates/actors/src/lib.rs`:
- Line 31: ChunkIngressServiceInner is unnecessarily exported publicly from the
crate; change its visibility to pub(crate) where it is defined and remove the
public re-export from lib.rs so it is only accessible inside the actors crate
(ensure all internal uses in cache_service.rs, mempool_service.rs, and the chunk
ingress service modules still reference ChunkIngressServiceInner without
changing paths). Update the struct definition to pub(crate) and delete the
ChunkIngressServiceInner entry from the public re-exports in lib.rs so
compilation and internal references remain intact.

In `@crates/actors/src/mempool_service.rs`:
- Around line 1244-1249: The call to
crate::chunk_ingress_service::ChunkIngressServiceInner::is_ingress_proof_expired_static(...)
in get_publish_txs_and_proofs is correct but overly verbose; add a local use
(e.g., use crate::chunk_ingress_service::ChunkIngressServiceInner;) or a short
alias near the top of the file and replace the fully qualified call with
ChunkIngressServiceInner::is_ingress_proof_expired_static(...) to improve
readability and reduce clutter in this hot path.

---

Outside diff comments:
In `@crates/p2p/src/gossip_data_handler.rs`:
- Around line 42-48: GossipDataHandler currently takes a generic TMempoolFacade
but a concrete ChunkIngressFacadeImpl, which hinders mocking; introduce a
ChunkIngressFacade trait (analogous to MempoolFacade) and change the struct to
accept a generic TChunkIngress: ChunkIngressFacade (replace the chunk_ingress:
ChunkIngressFacadeImpl field with chunk_ingress: TChunkIngress), then update
constructors/impl blocks and any usages of GossipDataHandler, new(), and methods
that reference chunk_ingress to be generic over TChunkIngress so tests can
inject mocks.

In `@crates/p2p/src/tests/util.rs`:
- Around line 996-997: data_handler_stub and data_handler_with_stubbed_pool drop
the `_service_receivers` returned by
irys_actors::test_helpers::build_test_service_senders(), which closes the
underlying UnboundedReceiver<ChunkIngressMessage> and causes sends to fail; fix
by extracting the chunk_ingress receiver from `service_receivers` (e.g., `let
mut chunk_ingress_rx = service_receivers.chunk_ingress;`) and spawn a background
task (tokio::spawn or similar) that drains it (loop receiving and ignoring
messages) so the receiver lives for the test lifetime—follow the same pattern
used in GossipServiceTestFixture to keep `chunk_ingress_rx` alive instead of
dropping `_service_receivers`.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 293-314: The fallback branch currently calls
inner.handle_message(msg).await inline after the 60s timeout, which blocks the
select! loop; change that to spawn a background task instead: when Err(_) from
tokio::time::timeout on semaphore.acquire_owned(), call tokio::spawn(async move
{ inner.handle_message(msg).await }) (ensure msg and inner are cloned/moved so
ownership is correct) and log that processing was spawned after timeout; keep
the existing error log message but remove the inline await so shutdown/draining
is not blocked.

In `@crates/actors/src/chunk_ingress_service/facade.rs`:
- Line 33: Replace the panic-causing oneshot_rx.await.expect(...) calls in
handle_chunk_ingress and handle_ingest_ingress_proof with a mapped error path:
await the oneshot receiver and use .map_err(...) (or match the Result) to
convert the oneshot::error::RecvError into the appropriate domain error type
returned by these functions, following the get_block_header pattern in
mempool_service/facade.rs (i.e., return the domain error instead of panicking
when the response channel is dropped); ensure you reference the same error
variant/type used by get_block_header so callers receive a proper Err instead of
a panic.

In `@crates/actors/src/data_sync_service.rs`:
- Around line 257-266: The code currently calls
self.service_senders.chunk_ingress.send(...).expect("to send
ChunkIngressMessage"), which will panic if the channel is closed; change this to
handle the Result from send returned by
ChunkIngressMessage::IngestChunkFireAndForget gracefully (e.g., match or if let
Err(e) => log a warning/info and continue) instead of panicking; specifically
update the branch that follows sm.write_data_chunk(&unpacked_chunk).is_err() to
check the send result from self.service_senders.chunk_ingress and handle Err by
logging (using the existing logger/tracing) and skipping the panic so the data
sync task can shut down cleanly.

In `@crates/actors/src/mempool_service.rs`:
- Around line 416-434: The send to chunk_ingress in handle_get_mempool_status
currently drops into a silent fallback setting pending_chunks_count to 0 when
send fails; update the failure branch where
self.service_senders.chunk_ingress.send(...).is_ok() is false to emit a
diagnostic log (e.g., tracing::warn! or debug!) mentioning
ChunkIngressMessage::GetPendingChunksCount send failure and include contextual
info (self identity or node_config) so you can detect communication issues while
keeping the existing rx.await.unwrap_or(0) fallback behavior for
pending_chunks_count.

In `@crates/api-server/src/routes/post_chunk.rs`:
- Line 17: Update the doc comment that currently says "adding a chunk to the
mempool" to reference the chunk ingress service instead; locate the module/file
post_chunk.rs and the HTTP handler function (e.g., post_chunk or
handle_post_chunk) and change the comment to something like "Handles the HTTP
POST request for adding a chunk to the chunk ingress service" so the
documentation accurately reflects the service being used.

Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/lib.rs Outdated
Comment thread crates/actors/src/mempool_service.rs Outdated
…l mempool

- Remove sweep_pending_chunks periodic timer (new behavior not in original)
- Remove data_roots() method from PriorityPendingChunks (only needed by sweep)
- Add per-message tracing span (chunk_ingress_handle_message)
- Add wait_with_progress deadlock detection (20s warning interval)
- Add variant_name() to ChunkIngressMessage for structured tracing
- Add tracing::error! on fire-and-forget chunk ingress failures
- Add shutdown message draining with 10s timeout before flush
- Match original semaphore timeout behavior (drop message, don't block loop)

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 362-371: The EnteredSpan guard returned by span.enter() is being
dropped immediately by using "let _ = span.enter()", so the span is not active
during the subsequent warn!; change the code in the loop to bind the guard to a
variable that lives until after the warn! (e.g., let _guard = span.enter();) so
the span is active for the warn! call (then let it drop afterwards or explicitly
drop(_guard)); this affects the select branch that handles ticker.tick() where
span.enter(), warn!, ticker.tick(), and fut are used.
- Around line 200-203: Add a validation check for the config field used by
max_valid_chunks so it can never be zero: in the Config::validate() function
(crates/types/src/config/mod.rs) ensure the value backing max_valid_chunks is >
0 and return a clear validation error if it equals 0; this prevents panics from
NonZeroUsize::new(max_valid_chunks).unwrap() in chunk_ingress_service.rs and
enforces the invariant at startup rather than at use sites.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 280-308: The select! loop must not await semaphore.acquire_owned
with timeout inline; instead, when try_acquire_owned returns NoPermits, spawn a
background task (via runtime_handle.spawn) that performs the timeouted
semaphore.acquire_owned and, on success, calls
wait_with_progress(inner.handle_message(msg), 20, &task_info). Move the match
that handles tokio::time::timeout(...) into that spawned task, ensure msg and
msg_type are moved or cloned into the task, and keep the existing error logs for
Err(err) and timeout cases so the main event loop remains responsive to shutdown
signals.

Comment thread crates/actors/src/chunk_ingress_service.rs
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
- Replace .expect() panics with proper error propagation in facade.rs
  and data_sync_service.rs
- Fix TOCTOU race in pre-header chunk cap check (single write lock)
- Move semaphore timeout+acquire into spawned task to avoid blocking
  the service event loop
- Fix span.enter() guard lifetime in wait_with_progress
- Tighten ChunkIngressServiceInner visibility to pub(crate)
- Add diagnostic logging for chunk_ingress channel failures in
  mempool status path
- Add config validation for zero-value fields that would panic
- Keep chunk_ingress receiver alive in p2p test stubs
- Update stale "mempool" references in docs and log messages

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 114-156: Change the visibility of the two helper functions to
crate-only to match the containing type and silence the unreachable_pub lint:
update get_latest_block_height_static and get_anchor_height_static from pub to
pub(crate). Ensure any internal callers still resolve (they are within the same
module) and leave all implementations unchanged.

In `@crates/actors/src/mempool_service.rs`:
- Around line 1988-1992: AtomicMempoolState::get_status currently returns
MempoolStatus with pending_chunks_count set to 0, which is later patched by
Inner::handle_get_mempool_status; update the code and docs so callers know this
field is a placeholder: add a doc-comment on AtomicMempoolState::get_status
stating that MempoolStatus.pending_chunks_count is a placeholder and must be
enriched by the caller (or rename the field to something like
pending_chunks_count_placeholder) and adjust any callers or tests that assume
the value is accurate to either call the MempoolServiceMessage::GetMempoolStatus
path or enrich the count themselves.

In `@crates/p2p/src/tests/util.rs`:
- Around line 369-410: The chunk-ingress consumer logic for handling
ChunkIngressMessage is duplicated across three places (this block using
chunk_store, plus data_handler_stub and data_handler_with_stubbed_pool); extract
it into a single helper like spawn_test_chunk_ingress_consumer(rx,
Option<Arc<RwLock<Vec<UnpackedChunk>>>>) that takes the chunk_ingress_receiver
and an optional chunk_store, moves the same match handling (IngestChunk,
IngestChunkFireAndForget, IngestIngressProof, ProcessPendingChunks,
GetPendingChunksCount) into the helper, and replace the three inline
tokio::spawn blocks with calls to this helper so the variant handling is
centralized and the implementation in this file uses the helper with
Some(chunk_store) while the other stubs pass None.

---

Duplicate comments:
In `@crates/actors/src/mempool_service.rs`:
- Around line 1255-1261: The call to
ChunkIngressServiceInner::is_ingress_proof_expired_static(...) was shortened
using the import, which is fine; ensure the function is actually imported into
scope (e.g., a `use` bringing
ChunkIngressServiceInner::is_ingress_proof_expired_static into the module) so
the short form resolves, and verify the types passed
(self.block_tree_read_guard, self.irys_db, self.config, cached_proof.proof)
match the function signature to avoid compile errors; if the import is missing,
add the `use` for ChunkIngressServiceInner or qualify the call with the full
path.
- Around line 419-445: The change correctly queries ChunkIngressService and
handles both send and receive failures by logging warnings and falling back to
the status from mempool_state.get_status; leave handle_get_mempool_status as-is:
keep the call to self.mempool_state.get_status(&self.config.node_config).await,
the oneshot channel/tx/rx, the send via
self.service_senders.chunk_ingress.send(ChunkIngressMessage::GetPendingChunksCount(tx))
with a warn! on Err, and the match on rx.await that sets
status.pending_chunks_count on Ok or logs warn! on Err, then returns Ok(status).

Comment thread crates/actors/src/mempool_service.rs
Comment thread crates/p2p/src/tests/util.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
crates/actors/src/chunk_ingress_service/chunks.rs (2)

374-392: ⚠️ Potential issue | 🟡 Minor

Invalid chunk sizes return Ok(()) instead of an error — silent acceptance.

Lines 381 and 391 return Ok(()) for chunks with incorrect sizes. The caller interprets Ok as successful ingress. While the error is logged, returning success can confuse upstream callers (e.g., HTTP upload returning 200 for a malformed chunk) and prevents the caller from distinguishing "accepted" from "rejected silently".

Consider returning a CriticalChunkIngressError variant instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/chunks.rs` around lines 374 - 392,
The code currently logs invalid chunk sizes inside the chunk validation block
(using variables chunk_len, chunk_size, is_last_chunk and chunk.tx_offset) but
returns Ok(()) which silently treats malformed chunks as accepted; change those
early returns to return Err(CriticalChunkIngressError::InvalidChunkSize {
tx_offset: chunk.tx_offset, chunk_len, expected: if is_last_chunk {
Some(chunk_size) } else { None } }) (or the project’s existing
CriticalChunkIngressError variant that best matches) so the caller receives an
error instead of Ok—update both the "Last chunk exceeds max size" and "Non-last
chunk has wrong size" branches in the function containing this logic.

854-857: ⚠️ Potential issue | 🟠 Major

Production assert_eq! will panic-crash the node on data inconsistency.

Lines 856–857 use assert_eq! which panics in release builds. If a chunk is added or removed between the count query and proof generation (a narrow but possible race), this brings down the node instead of returning an error.

🛡️ Suggested: return an error instead of panicking
-    assert_eq!(actual_data_size, size);
-    assert_eq!(actual_chunk_count, expected_chunk_count);
+    if actual_data_size != size {
+        return Err(eyre::eyre!(
+            "data_size mismatch for {data_root}: expected {size}, got {actual_data_size}"
+        ));
+    }
+    if actual_chunk_count != expected_chunk_count {
+        return Err(eyre::eyre!(
+            "chunk_count mismatch for {data_root}: expected {expected_chunk_count}, got {actual_chunk_count}"
+        ));
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/chunks.rs` around lines 854 - 857,
Replace the production assert_eq! checks comparing actual_data_size==size and
actual_chunk_count==expected_chunk_count with guarded checks that return a
Result::Err instead of panicking; locate the verification block (the code that
computes actual_data_size, actual_chunk_count from &proof.proof and &data_root)
and change each assertion to an if mismatch { return Err(...) } that
constructs/propagates an appropriate error (e.g., a ChunkIngressError or
anyhow::Error) containing both expected and actual values and contextual info so
the caller can handle the inconsistency instead of crashing the node.
crates/actors/src/chunk_ingress_service/ingress_proofs.rs (1)

341-363: 🧹 Nitpick | 🔵 Trivial

Add documentation clarifying blocking call requirements.

generate_and_store_ingress_proof() contains a blocking call to std::sync::mpsc::recv() at line 354. While both current call sites (chunks.rs:579 and cache_service.rs) are safe because they execute in blocking contexts (spawn_blocking and std::thread::spawn respectively), the function has no documentation indicating this constraint. Future maintainers might call it from an async context without realizing it would block the executor. Add a doc comment noting that this function must only be called from blocking contexts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/ingress_proofs.rs` around lines 341 -
363, Add a doc comment to the function generate_and_store_ingress_proof
explaining it performs a blocking receive on std::sync::mpsc::Receiver (uses
response_receiver.recv()) and therefore must only be called from a blocking
context (e.g., std::thread::spawn or tokio::task::spawn_blocking); update the
function's top-level docstring near its signature to explicitly state this
constraint and the rationale so future callers do not invoke it from async
executors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 296-304: The timeout branch currently drops the whole message
(including any oneshot responder) which causes callers to get a closed channel
error; update the Err(_) timeout arm in chunk_ingress_service.rs to explicitly
respond before dropping: match the incoming message (the enum variants
IngestChunk and IngestIngressProof) and, if it contains a
oneshot::Sender/responder, send a descriptive error (e.g., a Backpressure or
Timeout variant) on that channel (using try_send/send and ignoring send errors),
then log the timeout as before; ensure you reference the message enum
(IngestChunk, IngestIngressProof) and the responder field when implementing this
change.

---

Outside diff comments:
In `@crates/actors/src/chunk_ingress_service/chunks.rs`:
- Around line 374-392: The code currently logs invalid chunk sizes inside the
chunk validation block (using variables chunk_len, chunk_size, is_last_chunk and
chunk.tx_offset) but returns Ok(()) which silently treats malformed chunks as
accepted; change those early returns to return
Err(CriticalChunkIngressError::InvalidChunkSize { tx_offset: chunk.tx_offset,
chunk_len, expected: if is_last_chunk { Some(chunk_size) } else { None } }) (or
the project’s existing CriticalChunkIngressError variant that best matches) so
the caller receives an error instead of Ok—update both the "Last chunk exceeds
max size" and "Non-last chunk has wrong size" branches in the function
containing this logic.
- Around line 854-857: Replace the production assert_eq! checks comparing
actual_data_size==size and actual_chunk_count==expected_chunk_count with guarded
checks that return a Result::Err instead of panicking; locate the verification
block (the code that computes actual_data_size, actual_chunk_count from
&proof.proof and &data_root) and change each assertion to an if mismatch {
return Err(...) } that constructs/propagates an appropriate error (e.g., a
ChunkIngressError or anyhow::Error) containing both expected and actual values
and contextual info so the caller can handle the inconsistency instead of
crashing the node.

In `@crates/actors/src/chunk_ingress_service/ingress_proofs.rs`:
- Around line 341-363: Add a doc comment to the function
generate_and_store_ingress_proof explaining it performs a blocking receive on
std::sync::mpsc::Receiver (uses response_receiver.recv()) and therefore must
only be called from a blocking context (e.g., std::thread::spawn or
tokio::task::spawn_blocking); update the function's top-level docstring near its
signature to explicitly state this constraint and the rationale so future
callers do not invoke it from async executors.

Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Replace the GetPendingChunksCount message/oneshot pattern with a shared
ChunkIngressState read handle backed by Arc<RwLock<PriorityPendingChunks>>.
This avoids unnecessary round-trips through the message loop for a simple
read, matching the AtomicMempoolState pattern already used by the mempool.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 292-316: When try_acquire_owned on message_handler_semaphore fails
with NoPermits we currently unconditionally runtime_handle.spawn a waiter that
captures msg/inner, which can lead to unbounded waiting tasks; add a bounded
pending-waiters guard (e.g., an AtomicUsize pending_waiters on
ChunkIngressServiceInner with a const MAX_PENDING_WAITERS) and check/increment
it before spawning (decrement in all waiter exit paths), and if the limit is
reached either process the message inline with a timeout using
tokio::time::timeout around inner.handle_message(msg) (via wait_with_progress)
or drop/log the message and return early; update the spawn branch that calls
semaphore.acquire_owned()/wait_with_progress to only run when the pending
counter was successfully incremented and ensure decrement happens on Ok(permit),
Ok(Err), and Err(_) paths to avoid leaks.
- Around line 214-221: Remove the unused ChunkDataWriter instance spawned inside
MempoolService: locate the call to chunk_data_writer::ChunkDataWriter::spawn
(and any stored field or variable holding that writer) in MempoolService, delete
the spawn invocation and the associated field/variable, and remove any unused
imports and shutdown/flush calls (e.g., flush() or queue_write() references)
related to that writer; ensure ChunkIngressService remains the single writer by
not altering its chunk_data_writer usage.

In `@crates/actors/src/mempool_service/types.rs`:
- Around line 10-13: MempoolStatus is currently populated in two phases
(AtomicMempoolState::get_status sets pending_chunks_count to 0 and callers must
call ChunkIngressState::pending_chunks_count()), which is brittle; change
AtomicMempoolState::get_status to accept either a &ChunkIngressState or a
pending_chunks_count: usize parameter and populate
MempoolStatus.pending_chunks_count inside get_status so the returned
MempoolStatus is always complete; update all callers of
AtomicMempoolState::get_status to pass the ChunkIngressState (or count) and
remove any external enrichment calls to
ChunkIngressState::pending_chunks_count(), and update the doc comment on
MempoolStatus to reflect the single-step construction.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 286-317: The timeout path in the runtime_handle.spawn task
currently drops the message (including any oneshot::Sender) when
tokio::time::timeout(Duration::from_secs(60), semaphore.acquire_owned()) returns
Err, causing callers (e.g., those sending IngestChunk or IngestIngressProof) to
get a closed channel with no explanation; change the timeout branch to detect if
the message contains a response channel (inspect the Msg enum payload used by
inner.handle_message or the specific variants IngestChunk/IngestIngressProof)
and send an explicit error/timeout response on that oneshot::Sender before
allowing the message to be dropped, ensuring you handle send errors gracefully
and avoid double-sending.

In `@crates/actors/src/mempool_service.rs`:
- Around line 1969-1980: get_status currently hardcodes pending_chunks_count: 0
causing callers that call AtomicMempoolState::get_status directly to receive
stale data; update AtomicMempoolState::get_status to compute and populate
pending_chunks_count from the real in-memory field (e.g.,
state.pending_chunks.len() or whatever collection holds pending chunks) instead
of 0, and remove the special-case injection from
Inner::handle_get_mempool_status; after making the change, run the provided
ripgrep search to verify no remaining direct callers rely on the old placeholder
and add a short doc-comment on AtomicMempoolState::get_status noting it now
returns an accurate pending_chunks_count.

In `@crates/p2p/src/tests/util.rs`:
- Around line 369-407: The chunk-ingress consumer logic for handling
ChunkIngressMessage is duplicated; extract it into a single helper (e.g., an
async fn consume_chunk_ingress(mut receiver: Receiver<ChunkIngressMessage>,
chunk_store: Arc<RwLock<Vec<...>>>) or similarly named utility) that contains
the match arms handling IngestChunk, IngestChunkFireAndForget,
IngestIngressProof, and ProcessPendingChunks and the final "channel closed"
debug; then replace the in-place spawn blocks in this test and in
data_handler_stub and data_handler_with_stubbed_pool with a tokio::spawn call
that invokes that helper, passing chunk_ingress_receiver and
Arc::clone(&mempool_chunks) (or the local chunk_store) so future variants of
ChunkIngressMessage are maintained in one place.

Comment thread crates/actors/src/chunk_ingress_service.rs
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/mempool_service/types.rs
roberts-pumpurs and others added 4 commits February 20, 2026 14:52
…_count directly

Accept &ChunkIngressState in get_status so MempoolStatus is always
returned fully populated in a single call, removing the brittle
two-phase pattern where callers had to manually enrich the count.
MempoolService spawned its own ChunkDataWriter but never called
queue_write — only ChunkIngressService writes chunks. Remove the
redundant spawn, field, and no-op flush on shutdown.
…_ingress_service

ChunkDataWriter is exclusively used by ChunkIngressService, so its
module belongs under the service that owns it.
Remove pending_chunk_counts DashMap, pending_chunk_count(), and
is_pending() — none were ever wired up to call sites. Simplify
write_batch to return a plain count and deduplicate pending_hashes
cleanup into a single site.
Comment on lines -72 to -84
/// Returns `true` if the chunk hash is currently pending a write.
pub fn is_pending(&self, hash: &ChunkPathHash) -> bool {
self.pending_hashes.contains(hash)
}

/// Returns the number of chunks pending or already written for a data root.
pub fn pending_chunk_count(&self, data_root: &DataRoot) -> u64 {
self.pending_chunk_counts
.get(data_root)
.map(|v| *v)
.unwrap_or(0)
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glottologist, these methods seemed to be unused on master, so I just removed them

@glottologist glottologist left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log the send errors; everything else is non-blocking

Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/actors/src/mempool_service/data_txs.rs Outdated
Comment thread crates/actors/src/chunk_ingress_service.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 312-314: The timeout branch currently just warns and drops the
message, which closes the caller's oneshot without explanation; update the
Err(_) arm in chunk_ingress_service.rs (the "Timed out waiting for chunk ingress
message handler permit" branch) to detect IngestChunk and IngestIngressProof
message variants and, before dropping, send an explicit error through the
message's oneshot::Sender (or otherwise return a clear error result) so callers
receive a meaningful backpressure error instead of RecvError; ensure you handle
the case where sending on the oneshot may fail (ignore or log) and only after
attempting send keep the existing warning/drop behavior.
- Around line 292-316: The current logic spawns an unbounded number of tasks
whenever try_acquire_owned returns NoPermits, which can leak memory under high
backpressure; modify the code around try_acquire_owned /
message_handler_semaphore handling to gate spawning: introduce a bounded counter
or semaphore (e.g., pending_waiters_counter or pending_waiters_semaphore)
checked before calling runtime_handle.spawn and increment it when scheduling a
waiter and decrement it when the waiter finishes (including all timeout and
error branches), so that runtime_handle.spawn for the waiting-acquire branch is
only invoked if pending waiters < MAX_PENDING_WAITERS; ensure the change is
applied where the code captures Arc::clone(&self.inner), msg, and spawns the
async block that awaits semaphore.acquire_owned() and still logs/handles
Ok(Ok(permit)), Ok(Err(err)) and Err(_) appropriately while always decrementing
the counter on exit.

Comment thread crates/actors/src/chunk_ingress_service.rs Outdated
Comment thread crates/chain/tests/multi_node/mempool_tests.rs
- Log send failures in handle_message instead of silently discarding
- Remove stale "sweep timer will retry" from ProcessPendingChunks warn
- Deduplicate wait_with_progress: make mempool's definition pub(crate)
  and import it in chunk_ingress_service
- Merge IngestChunk and IngestChunkFireAndForget into single variant
  with Option<oneshot::Sender>
- Fix preheader_rejects_when_cache_full test to verify response body
  contains PreHeaderOffsetExceedsCap

@glottologist glottologist left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
crates/actors/src/chunk_ingress_service.rs (2)

292-317: ⚠️ Potential issue | 🟡 Minor

Unbounded spawning of wait-tasks under sustained load.

When try_acquire_owned returns NoPermits, a new task is unconditionally spawned (line 298) that waits up to 60s. Under sustained burst traffic, this can spawn an unbounded number of waiting tasks, each holding msg and Arc<ChunkIngressServiceInner> in memory.

Consider bounding the number of pending waiters with an AtomicUsize counter, or using a bounded queue for overflow.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 292 - 317, The
current logic unconditionally spawns a tokio task when try_acquire_owned returns
NoPermits, allowing unbounded waiting tasks that hold msg and
Arc<ChunkIngressServiceInner> and can exhaust memory under sustained load; fix
by gating spawns with a bounded pending-waiters counter or overflow queue: add
an AtomicUsize (e.g., pending_waiters) or use a bounded mpsc/semaphore-backed
queue referenced from ChunkIngressServiceInner and check/increment it before
calling runtime_handle.spawn (reject/metric/log and drop or enqueue msg when the
bound is exceeded), and decrement on task start/exit; ensure you still use
message_handler_semaphore.acquire_owned() and inner.handle_message(msg) inside
the spawned task and instrument with the same span.

308-316: 🧹 Nitpick | 🔵 Trivial

Messages silently dropped on semaphore timeout — callers receive RecvError.

When the 60-second timeout fires (line 312), the message (including any oneshot::Sender) is dropped. Callers awaiting IngestChunk or IngestIngressProof responses will receive a channel-closed error with no indication the message was dropped due to backpressure.

Consider sending an explicit error response before dropping:

♻️ Suggested improvement
 Err(_) => {
     warn!("Timed out waiting for chunk ingress message handler permit");
+    // Send backpressure error to callers waiting for a response
+    match msg {
+        ChunkIngressMessage::IngestChunk(_, Some(tx)) => {
+            let _ = tx.send(Err(ChunkIngressError::other("backpressure timeout", false)));
+        }
+        ChunkIngressMessage::IngestIngressProof(_, tx) => {
+            let _ = tx.send(Err(IngressProofError::Other("backpressure timeout".into())));
+        }
+        _ => {}
+    }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 308 - 316, The
timeout branch currently drops the whole message (including any oneshot::Sender)
causing callers of IngestChunk/IngestIngressProof to see a generic RecvError;
update the Err(_) timeout arm in the permit-awaiting block inside
chunk_ingress_service.rs so that before dropping the message you detect and send
a clear error through the included oneshot::Sender (or channel) with a
descriptive backpressure/timeout error; ensure you do not double-send in the
Ok(Err(err)) path and handle send failures (ignore or log) so the handler then
drops the message safely.
crates/p2p/src/tests/util.rs (1)

1010-1028: 🧹 Nitpick | 🔵 Trivial

Triplicated chunk-ingress consumer stub — consider a shared helper.

The match block consuming ChunkIngressMessage is repeated across three locations with minor differences (storing vs. discarding chunks). A helper like spawn_test_chunk_ingress_consumer(rx, Option<Arc<RwLock<Vec<UnpackedChunk>>>>) would reduce maintenance burden.

Also applies to: 1097-1115

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/p2p/src/tests/util.rs` around lines 1010 - 1028, The chunk-ingress
consumer logic is duplicated; extract a helper like
spawn_test_chunk_ingress_consumer(rx, Option<Arc<RwLock<Vec<UnpackedChunk>>>>)
that takes the ChunkIngress receiver (chunk_ingress_rx) and an optional shared
storage to push unpacked chunks into, then spawn the same tokio task inside that
helper and reuse it in the three places; ensure the helper matches on
ChunkIngressMessage variants and sends replies (reply.send(Ok(()))) where
present and either stores the chunk into the supplied
Arc<RwLock<Vec<UnpackedChunk>>> when Some or discards it when None so behavior
matches current per-site differences.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/chain/tests/utils.rs`:
- Around line 2079-2085: The test currently ignores the Result of
peer.node_ctx.service_senders.chunk_ingress.send(...) (the IngestChunk message)
which can hide wiring failures; change the send call to assert its success
instead of discarding it (for example, replace the `let _ =
peer.node_ctx.service_senders.chunk_ingress.send(...)` with a checked send such
as calling .expect("failed to send chunk_ingress") or matching on the Result and
panicking on Err) so that failures are surfaced; keep the rest of the flow (the
irys_actors::ChunkIngressMessage::IngestChunk payload and the existing crx.await
usage) unchanged.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 292-317: The current logic unconditionally spawns a tokio task
when try_acquire_owned returns NoPermits, allowing unbounded waiting tasks that
hold msg and Arc<ChunkIngressServiceInner> and can exhaust memory under
sustained load; fix by gating spawns with a bounded pending-waiters counter or
overflow queue: add an AtomicUsize (e.g., pending_waiters) or use a bounded
mpsc/semaphore-backed queue referenced from ChunkIngressServiceInner and
check/increment it before calling runtime_handle.spawn (reject/metric/log and
drop or enqueue msg when the bound is exceeded), and decrement on task
start/exit; ensure you still use message_handler_semaphore.acquire_owned() and
inner.handle_message(msg) inside the spawned task and instrument with the same
span.
- Around line 308-316: The timeout branch currently drops the whole message
(including any oneshot::Sender) causing callers of
IngestChunk/IngestIngressProof to see a generic RecvError; update the Err(_)
timeout arm in the permit-awaiting block inside chunk_ingress_service.rs so that
before dropping the message you detect and send a clear error through the
included oneshot::Sender (or channel) with a descriptive backpressure/timeout
error; ensure you do not double-send in the Ok(Err(err)) path and handle send
failures (ignore or log) so the handler then drops the message safely.

In `@crates/p2p/src/tests/util.rs`:
- Around line 1010-1028: The chunk-ingress consumer logic is duplicated; extract
a helper like spawn_test_chunk_ingress_consumer(rx,
Option<Arc<RwLock<Vec<UnpackedChunk>>>>) that takes the ChunkIngress receiver
(chunk_ingress_rx) and an optional shared storage to push unpacked chunks into,
then spawn the same tokio task inside that helper and reuse it in the three
places; ensure the helper matches on ChunkIngressMessage variants and sends
replies (reply.send(Ok(()))) where present and either stores the chunk into the
supplied Arc<RwLock<Vec<UnpackedChunk>>> when Some or discards it when None so
behavior matches current per-site differences.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e81da9 and 2b923c8.

📒 Files selected for processing (15)
  • crates/actors/src/block_validation.rs
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
  • crates/actors/src/chunk_ingress_service/facade.rs
  • crates/actors/src/data_sync_service.rs
  • crates/actors/src/lib.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/mempool_service/data_txs.rs
  • crates/actors/src/mempool_service/types.rs
  • crates/actors/src/services.rs
  • crates/api-server/src/routes/post_chunk.rs
  • crates/chain/src/chain.rs
  • crates/chain/tests/multi_node/mempool_tests.rs
  • crates/chain/tests/utils.rs
  • crates/p2p/src/tests/util.rs

Comment thread crates/chain/tests/utils.rs Outdated
…st helpers

- Await semaphore inline on NoPermits (matching mempool pattern) to
  provide natural backpressure instead of spawning unbounded waiting tasks
- Send explicit timeout errors through oneshot channels before dropping
  messages so callers get descriptive errors instead of generic RecvError
- Assert chunk ingress send/receive results in integration test instead
  of silently discarding with let _ =
- Extract spawn_test_chunk_ingress_consumer helper in p2p test utils to
  deduplicate three identical consumer blocks

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 369-372: The code is classifying a backpressure/timeout as
IngressProofError::DatabaseError in the ChunkIngressMessage::IngestIngressProof
branch; change this to a non-database variant (e.g.,
IngressProofError::Backpressure or IngressProofError::ServiceOverloaded) and
send that via reply.send so callers/metrics aren’t misled; if the enum lacks
such a variant, add one to IngressProofError and use it here, preserving the
existing descriptive message ("service overloaded: timed out waiting for handler
permit").

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e81da9 and 17dcc47.

📒 Files selected for processing (15)
  • crates/actors/src/block_validation.rs
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/chunk_ingress_service/chunk_data_writer.rs
  • crates/actors/src/chunk_ingress_service/facade.rs
  • crates/actors/src/data_sync_service.rs
  • crates/actors/src/lib.rs
  • crates/actors/src/mempool_service.rs
  • crates/actors/src/mempool_service/data_txs.rs
  • crates/actors/src/mempool_service/types.rs
  • crates/actors/src/services.rs
  • crates/api-server/src/routes/post_chunk.rs
  • crates/chain/src/chain.rs
  • crates/chain/tests/multi_node/mempool_tests.rs
  • crates/chain/tests/utils.rs
  • crates/p2p/src/tests/util.rs

Comment thread crates/actors/src/chunk_ingress_service.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 312-314: The AcquireError branch currently drops the incoming msg
(including its oneshot::Sender) silently; update the Ok(Err(err)) arm where the
chunk ingress message handler permit acquisition fails to call
send_timeout_errors (or a new send_overload_errors) with the original msg so the
caller is notified of a descriptive error instead of getting a bare RecvError;
locate the branch handling AcquireError in chunk_ingress_service.rs (the
Ok(Err(err)) arm), invoke send_timeout_errors/send_overload_errors with the same
msg and an appropriate error variant/message indicating the semaphore is closed,
and adjust the helper name/signature if you prefer a more general overload error
helper used for both timeout and acquisition failures.
- Around line 296-319: The inline timeout around semaphore.acquire_owned()
blocks the select! loop and delays shutdown; change the acquisition to race
against the service shutdown future instead of using a standalone
tokio::time::timeout so the task can abort promptly. Refactor the backpressure
path (the block that clones inner, semaphore, and does
semaphore.acquire_owned()) to perform a tokio::select! between
semaphore.acquire_owned() and the existing shutdown future (or move it into a
helper that accepts the shutdown future), and on shutdown branch drop the
message and call Self::send_timeout_errors(msg) (or equivalent) instead of
waiting; keep the runtime_handle.spawn/inner.handle_message/wait_with_progress
logic unchanged for the success branch and preserve logging on semaphore
acquisition/error.
- Around line 120-162: Extract the duplicated helpers
get_latest_block_height_static and get_anchor_height_static into a new shared
utility module (e.g., block_tree_utils) and replace the copies in
chunk_ingress_service.rs and mempool_service.rs with imports from that module;
consolidate the return types (use eyre::Result<Option<u64>> for
get_anchor_height_static and use eyre::Result<u64> for
get_latest_block_height_static so callers can handle eyre errors consistently),
move the logic exactly as it is (preserve use of BlockTreeReadGuard,
DatabaseProvider, H256, irys_database calls and BlockTreeEntry::height), update
all call sites to use the new module path and remove the duplicate functions
from both files.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17dcc47 and 198abd7.

📒 Files selected for processing (1)
  • crates/actors/src/chunk_ingress_service.rs

Comment on lines +120 to +162
/// Helper to get the latest block height from the canonical chain.
pub(crate) fn get_latest_block_height_static(
block_tree_read_guard: &BlockTreeReadGuard,
) -> Result<u64, String> {
let canon_chain = block_tree_read_guard.read().get_canonical_chain();
let latest = canon_chain
.0
.last()
.ok_or_else(|| "unable to get canonical chain from block tree".to_owned())?;
Ok(latest.height())
}

/// Resolves an anchor (block hash) to its height.
/// If it couldn't find the anchor, returns None.
/// Set canonical to true to enforce that the anchor must be part of the current canonical chain.
pub(crate) fn get_anchor_height_static(
block_tree_read_guard: &BlockTreeReadGuard,
irys_db: &DatabaseProvider,
anchor: H256,
canonical: bool,
) -> eyre::Result<Option<u64>> {
if let Some(height) = {
let guard = block_tree_read_guard.read();
if canonical {
guard
.get_canonical_chain()
.0
.iter()
.find(|b| b.block_hash() == anchor)
.map(BlockTreeEntry::height)
} else {
guard.get_block(&anchor).map(|h| h.height)
}
} {
Ok(Some(height))
} else if let Some(hdr) =
irys_db.view_eyre(|tx| irys_database::block_header_by_hash(tx, &anchor, false))?
{
Ok(Some(hdr.height))
} else {
Ok(None)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if mempool_service still has get_latest_block_height_static or get_anchor_height_static
echo "=== Searching for get_latest_block_height_static ==="
rg -n "get_latest_block_height_static" --type=rust -C 2

echo ""
echo "=== Searching for get_anchor_height_static ==="
rg -n "get_anchor_height_static" --type=rust -C 2

Repository: Irys-xyz/irys

Length of output: 3869


🏁 Script executed:

#!/bin/bash
# Get the full implementation of both methods in mempool_service
echo "=== mempool_service.rs: get_latest_block_height_static ==="
sed -n '1714,1725p' crates/actors/src/mempool_service.rs

echo ""
echo "=== chunk_ingress_service.rs: get_latest_block_height_static ==="
sed -n '121,132p' crates/actors/src/chunk_ingress_service.rs

echo ""
echo "=== mempool_service.rs: get_anchor_height_static ==="
sed -n '1449,1475p' crates/actors/src/mempool_service.rs

echo ""
echo "=== chunk_ingress_service.rs: get_anchor_height_static ==="
sed -n '135,161p' crates/actors/src/chunk_ingress_service.rs

Repository: Irys-xyz/irys

Length of output: 3229


Extract these shared helpers to a utility module to prevent duplication and potential drift.

Both get_latest_block_height_static and get_anchor_height_static are duplicated in mempool_service.rs and chunk_ingress_service.rs. The get_anchor_height_static implementations are identical, and get_latest_block_height_static is functionally equivalent despite different error types. Create a shared utility module for these block tree helpers to maintain consistency and reduce maintenance burden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 120 - 162, Extract
the duplicated helpers get_latest_block_height_static and
get_anchor_height_static into a new shared utility module (e.g.,
block_tree_utils) and replace the copies in chunk_ingress_service.rs and
mempool_service.rs with imports from that module; consolidate the return types
(use eyre::Result<Option<u64>> for get_anchor_height_static and use
eyre::Result<u64> for get_latest_block_height_static so callers can handle eyre
errors consistently), move the logic exactly as it is (preserve use of
BlockTreeReadGuard, DatabaseProvider, H256, irys_database calls and
BlockTreeEntry::height), update all call sites to use the new module path and
remove the duplicate functions from both files.

Comment on lines +296 to +319
// Await inline (blocking the loop) for natural backpressure,
// matching the mempool service pattern.
let inner = Arc::clone(&self.inner);
let semaphore = inner.message_handler_semaphore.clone();
match tokio::time::timeout(Duration::from_secs(60), semaphore.acquire_owned()).await {
Ok(Ok(permit)) => {
runtime_handle.spawn(async move {
let _permit = permit;
let task_info = format!("Chunk ingress message handler for {}", msg_type);
wait_with_progress(
inner.handle_message(msg),
20,
&task_info,
).await;
}.instrument(span));
}
Ok(Err(err)) => {
error!("Failed to acquire chunk ingress message handler permit: {:?}", err);
}
Err(_) => {
warn!("Timed out waiting for chunk ingress message handler permit, dropping message");
Self::send_timeout_errors(msg);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Inline semaphore wait delays shutdown by up to 60 seconds.

The tokio::time::timeout(60s, semaphore.acquire_owned()).await on Line 300 blocks the select! loop, making the service unresponsive to shutdown signals during the wait. Consider racing the acquisition against the shutdown future to preserve responsiveness.

♻️ Optional: race against shutdown

One approach is to restructure so the semaphore wait is inside a nested tokio::select! that also watches the shutdown signal:

-                                    match tokio::time::timeout(Duration::from_secs(60), semaphore.acquire_owned()).await {
+                                    let timed_acquire = tokio::time::timeout(Duration::from_secs(60), semaphore.acquire_owned());
+                                    tokio::select! {
+                                        result = timed_acquire => match result {
                                         Ok(Ok(permit)) => {
                                             // ... spawn handler ...
                                         }
                                         Ok(Err(err)) => {
                                             error!("Failed to acquire chunk ingress message handler permit: {:?}", err);
+                                            Self::send_timeout_errors(msg);
                                         }
                                         Err(_) => {
                                             warn!("Timed out waiting for chunk ingress message handler permit, dropping message");
                                             Self::send_timeout_errors(msg);
                                         }
+                                        },
+                                        _ = &mut shutdown_future => {
+                                            info!("Shutdown during backpressure wait, dropping message");
+                                            Self::send_timeout_errors(msg);
+                                            break;
+                                        }
                                     }

Note: this would require restructuring the outer select! so shutdown_future is accessible in this scope (e.g., by extracting the backpressure path into a helper).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 296 - 319, The
inline timeout around semaphore.acquire_owned() blocks the select! loop and
delays shutdown; change the acquisition to race against the service shutdown
future instead of using a standalone tokio::time::timeout so the task can abort
promptly. Refactor the backpressure path (the block that clones inner,
semaphore, and does semaphore.acquire_owned()) to perform a tokio::select!
between semaphore.acquire_owned() and the existing shutdown future (or move it
into a helper that accepts the shutdown future), and on shutdown branch drop the
message and call Self::send_timeout_errors(msg) (or equivalent) instead of
waiting; keep the runtime_handle.spawn/inner.handle_message/wait_with_progress
logic unchanged for the success branch and preserve logging on semaphore
acquisition/error.

Comment thread crates/actors/src/chunk_ingress_service.rs
…gress

Remove get_latest_block_height_static and get_anchor_height_static from
ChunkIngressServiceInner, reusing the identical copies already on
mempool_service::Inner. Clean up now-unused imports.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
crates/actors/src/chunk_ingress_service.rs (2)

267-269: AcquireError path drops the message without notifying callers.

When semaphore.acquire_owned() returns Err (semaphore closed), the msg — including any oneshot::Sender — is dropped silently. Callers waiting on the response channel receive an opaque RecvError. For consistency with the timeout path (line 272), call Self::send_timeout_errors(msg) here too.

🐛 Proposed fix
                                        Ok(Err(err)) => {
                                            error!("Failed to acquire chunk ingress message handler permit: {:?}", err);
+                                           Self::send_timeout_errors(msg);
                                        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 267 - 269, The
AcquireError branch currently drops the incoming msg (from
semaphore.acquire_owned()) and never notifies callers; update the Ok(Err(err))
arm to call Self::send_timeout_errors(msg) so any oneshot::Sender in msg gets a
timeout/error response, and still log the acquire error (e.g., keep the
error!("Failed to acquire...: {:?}", err) call). Locate the
semaphore.acquire_owned() match in chunk_ingress_service.rs and replace the
silent drop in the Ok(Err(err)) path with a call to
Self::send_timeout_errors(msg) before/after logging.

247-274: Inline semaphore wait still blocks the event loop for up to 60 seconds.

When try_acquire_owned returns NoPermits (line 247), the code awaits inline with a 60-second timeout (line 255). During this wait, the select! loop cannot process shutdown signals or other messages. This was flagged in a prior review.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service.rs` around lines 247 - 274, When
try_acquire_owned() returns NoPermits, do not await the 60s
semaphore.acquire_owned() inline; instead spawn a background task (via
runtime_handle.spawn) that performs the timeout(await semaphore.acquire_owned())
and then either spawns the actual message handler (holding the permit) or calls
Self::send_timeout_errors(msg) on timeout/error. Move the
semaphore.acquire_owned() + timeout logic out of the select loop and into a
dedicated async task so the select loop can continue to process shutdown and
other messages; reference message_handler_semaphore, try_acquire_owned,
timeout(Duration::from_secs(60)), runtime_handle.spawn, inner.handle_message,
and Self::send_timeout_errors when implementing the change.
crates/actors/src/chunk_ingress_service/ingress_proofs.rs (2)

133-161: Cross-service coupling: validate_ingress_proof_anchor_static calls into mempool_service::Inner.

Lines 140 and 148 reach into crate::mempool_service::Inner for get_latest_block_height_static and get_anchor_height_static. This creates a tight coupling between the newly extracted ChunkIngressService and the mempool it was extracted from. These helpers should live in a shared utility module.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/ingress_proofs.rs` around lines 133 -
161, The function validate_ingress_proof_anchor_static currently depends on
crate::mempool_service::Inner by calling get_latest_block_height_static and
get_anchor_height_static, creating cross-service coupling; extract those helper
functions into a shared utility module (e.g., crate::utils::block_helpers or
similar) and update validate_ingress_proof_anchor_static to call the new shared
helpers instead of mempool_service::Inner::get_latest_block_height_static and
mempool_service::Inner::get_anchor_height_static, adjust imports and error
mapping as needed, and remove the mempool_service dependency from the
ChunkIngressService so the ingress proof validation uses the common utilities.

313-397: Silent failures on cache_sender.send(...) in proof generation lifecycle notifications.

Lines 368, 384, 394 (and similarly in reanchor_and_store_ingress_proof at lines 440, 447, 462, 469, 477) use let _ = cache_sender.send(...), silently discarding send errors for the NotifyProofGenerationStarted and NotifyProofGenerationCompleted messages. If the cache service channel is closed, the generation-state tracking becomes inconsistent (a Started without a matching Completed, or vice versa), potentially causing AlreadyGenerating errors for future proof generation attempts for the same data root.

At minimum, log a warning on failure for the lifecycle-critical notifications.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/actors/src/chunk_ingress_service/ingress_proofs.rs` around lines 313 -
397, The send calls to cache_sender in generate_and_store_ingress_proof and
reanchor_and_store_ingress_proof silently ignore errors (using let _ =
cache_sender.send(...)), which can leave the cache in an inconsistent state;
change each
cache_sender.send(CacheServiceAction::NotifyProofGenerationStarted(data_root))
and
cache_sender.send(CacheServiceAction::NotifyProofGenerationCompleted(data_root))
to explicitly handle Err(err) by logging a warning (e.g., tracing::warn! or
log::warn!) that includes the CacheServiceAction variant, the data_root, and the
error details so lifecycle failures are visible and debuggable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 119-133: process_pending_chunks_for_root currently removes all
chunks via pending_chunks.write().await.pop(&data_root) before processing, so
any chunk that fails in handle_chunk_ingress_message is lost; change the logic
to not permanently remove chunks until they succeed (e.g., iterate over a cloned
list or use a peek/get API instead of pop, and only remove each chunk from
pending_chunks on successful handling), and on transient failures re-queue or
leave the chunk in pending_chunks and log a warn plus record metrics (keep using
metrics::record_chunk_error and error/warn as appropriate) so failed chunks are
retried rather than dropped.

---

Duplicate comments:
In `@crates/actors/src/chunk_ingress_service.rs`:
- Around line 267-269: The AcquireError branch currently drops the incoming msg
(from semaphore.acquire_owned()) and never notifies callers; update the
Ok(Err(err)) arm to call Self::send_timeout_errors(msg) so any oneshot::Sender
in msg gets a timeout/error response, and still log the acquire error (e.g.,
keep the error!("Failed to acquire...: {:?}", err) call). Locate the
semaphore.acquire_owned() match in chunk_ingress_service.rs and replace the
silent drop in the Ok(Err(err)) path with a call to
Self::send_timeout_errors(msg) before/after logging.
- Around line 247-274: When try_acquire_owned() returns NoPermits, do not await
the 60s semaphore.acquire_owned() inline; instead spawn a background task (via
runtime_handle.spawn) that performs the timeout(await semaphore.acquire_owned())
and then either spawns the actual message handler (holding the permit) or calls
Self::send_timeout_errors(msg) on timeout/error. Move the
semaphore.acquire_owned() + timeout logic out of the select loop and into a
dedicated async task so the select loop can continue to process shutdown and
other messages; reference message_handler_semaphore, try_acquire_owned,
timeout(Duration::from_secs(60)), runtime_handle.spawn, inner.handle_message,
and Self::send_timeout_errors when implementing the change.

In `@crates/actors/src/chunk_ingress_service/ingress_proofs.rs`:
- Around line 133-161: The function validate_ingress_proof_anchor_static
currently depends on crate::mempool_service::Inner by calling
get_latest_block_height_static and get_anchor_height_static, creating
cross-service coupling; extract those helper functions into a shared utility
module (e.g., crate::utils::block_helpers or similar) and update
validate_ingress_proof_anchor_static to call the new shared helpers instead of
mempool_service::Inner::get_latest_block_height_static and
mempool_service::Inner::get_anchor_height_static, adjust imports and error
mapping as needed, and remove the mempool_service dependency from the
ChunkIngressService so the ingress proof validation uses the common utilities.
- Around line 313-397: The send calls to cache_sender in
generate_and_store_ingress_proof and reanchor_and_store_ingress_proof silently
ignore errors (using let _ = cache_sender.send(...)), which can leave the cache
in an inconsistent state; change each
cache_sender.send(CacheServiceAction::NotifyProofGenerationStarted(data_root))
and
cache_sender.send(CacheServiceAction::NotifyProofGenerationCompleted(data_root))
to explicitly handle Err(err) by logging a warning (e.g., tracing::warn! or
log::warn!) that includes the CacheServiceAction variant, the data_root, and the
error details so lifecycle failures are visible and debuggable.

ℹ️ Review info

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 198abd7 and 2c1a88e.

📒 Files selected for processing (2)
  • crates/actors/src/chunk_ingress_service.rs
  • crates/actors/src/chunk_ingress_service/ingress_proofs.rs

Comment thread crates/actors/src/chunk_ingress_service.rs
- Send timeout errors through oneshot channels in the AcquireError path
  so callers get a descriptive error instead of a silent drop
- Log warnings on all cache_sender.send() failures in ingress proof
  generation/reanchoring instead of silently discarding with let _ =
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants