Allow processing partitions concurrently#395
Conversation
d1158bf to
c7c004f
Compare
Use concurrent-ruby to process each partition in a fetch response concurrently. This maintains in-order processing within partitions but allows using multiple cores to process messages in *different* partitions at the same time.
c7c004f to
5a8e2aa
Compare
There was a problem hiding this comment.
PR Overview
This PR implements concurrent partition processing using concurrent-ruby, enabling messages in different partitions to be processed in parallel while maintaining sequential order within each partition. Key changes include:
- Replacing standard arrays and hashes with concurrent collections in the consumer and test code.
- Adding concurrent processing paths in the main runner loop with new helper methods.
- Updating configuration, documentation, and tests to support the new concurrent_processing option.
Reviewed Changes
| File | Description |
|---|---|
| spec/runner_spec.rb | Updated tests to use concurrent collections and revised pause handling. |
| lib/racecar/runner.rb | Introduced concurrent processing for batches using promises; refactored pause handling. |
| racecar.gemspec | Added a runtime dependency on concurrent-ruby. |
| lib/racecar/config.rb | Added a new boolean flag for concurrent_processing. |
| README.md | Updated documentation to include concurrent processing. |
| CHANGELOG.md | Updated changelog with concurrent processing feature details. |
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
| # TODO: add a timeout to `wait` to avoid blocking indefinitely. | ||
| Concurrent::Promises.zip(*futures).wait |
There was a problem hiding this comment.
Consider adding a timeout to the wait call on the futures (e.g. using a timeout option) to avoid potential indefinite blocking if a concurrent batch takes too long or hangs.
| # TODO: add a timeout to `wait` to avoid blocking indefinitely. | |
| Concurrent::Promises.zip(*futures).wait | |
| timeout = config.concurrent_processing_timeout || 30 # default to 30 seconds if not set | |
| begin | |
| Concurrent::Promises.zip(*futures).wait(timeout) | |
| rescue Concurrent::TimeoutError | |
| logger.error("Timeout while waiting for concurrent batch processing") | |
| # Handle timeout appropriately, e.g., retry or abort | |
| end |
|
This would be awesome. Ready to test, or is something needed to bring it to completion ? |
|
I think it needs thorough testing, and it's likely to break something – but if you want to drive it forward it would be much appreciated! I won't have the time anytime soon. |
Use concurrent-ruby to process each partition in a fetch response concurrently. This maintains in-order processing within partitions but allows using multiple cores to process messages in different partitions at the same time.
process_batchmethod already.