Skip to content

Allow processing partitions concurrently#395

Open
dasch wants to merge 1 commit into
mainfrom
dasch-concurrent-partition-processing
Open

Allow processing partitions concurrently#395
dasch wants to merge 1 commit into
mainfrom
dasch-concurrent-partition-processing

Conversation

@dasch

@dasch dasch commented Feb 26, 2025

Copy link
Copy Markdown
Contributor

Use concurrent-ruby to process each partition in a fetch response concurrently. This maintains in-order processing within partitions but allows using multiple cores to process messages in different partitions at the same time.

  • Partition offsets are committed within the concurrent blocks; rdkafka takes care of sending those commits to Kafka asynchronously.
  • Pausing and error handling should continue to work as expected, since this is also handled per-partition within the process_batch method already.

@dasch dasch force-pushed the dasch-concurrent-partition-processing branch 2 times, most recently from d1158bf to c7c004f Compare February 26, 2025 10:20
Use concurrent-ruby to process each partition in a fetch response
concurrently. This maintains in-order processing within partitions but
allows using multiple cores to process messages in *different*
partitions at the same time.
@dasch dasch force-pushed the dasch-concurrent-partition-processing branch from c7c004f to 5a8e2aa Compare February 26, 2025 12:55
@dasch dasch requested a review from Copilot February 27, 2025 19:33

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR implements concurrent partition processing using concurrent-ruby, enabling messages in different partitions to be processed in parallel while maintaining sequential order within each partition. Key changes include:

  • Replacing standard arrays and hashes with concurrent collections in the consumer and test code.
  • Adding concurrent processing paths in the main runner loop with new helper methods.
  • Updating configuration, documentation, and tests to support the new concurrent_processing option.

Reviewed Changes

File Description
spec/runner_spec.rb Updated tests to use concurrent collections and revised pause handling.
lib/racecar/runner.rb Introduced concurrent processing for batches using promises; refactored pause handling.
racecar.gemspec Added a runtime dependency on concurrent-ruby.
lib/racecar/config.rb Added a new boolean flag for concurrent_processing.
README.md Updated documentation to include concurrent processing.
CHANGELOG.md Updated changelog with concurrent processing feature details.

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Comment thread lib/racecar/runner.rb
Comment on lines +120 to +121
# TODO: add a timeout to `wait` to avoid blocking indefinitely.
Concurrent::Promises.zip(*futures).wait

Copilot AI Feb 27, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a timeout to the wait call on the futures (e.g. using a timeout option) to avoid potential indefinite blocking if a concurrent batch takes too long or hangs.

Suggested change
# TODO: add a timeout to `wait` to avoid blocking indefinitely.
Concurrent::Promises.zip(*futures).wait
timeout = config.concurrent_processing_timeout || 30 # default to 30 seconds if not set
begin
Concurrent::Promises.zip(*futures).wait(timeout)
rescue Concurrent::TimeoutError
logger.error("Timeout while waiting for concurrent batch processing")
# Handle timeout appropriately, e.g., retry or abort
end

Copilot uses AI. Check for mistakes.
@henrikbjorn

Copy link
Copy Markdown
Contributor

This would be awesome. Ready to test, or is something needed to bring it to completion ?

@dasch

dasch commented Nov 10, 2025

Copy link
Copy Markdown
Contributor Author

I think it needs thorough testing, and it's likely to break something – but if you want to drive it forward it would be much appreciated! I won't have the time anytime soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants