-
-
Notifications
You must be signed in to change notification settings - Fork 216
fix (scheduler) fix queue maxConcurrency initialization from config #1485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The SubCmdBuilder.Dequeue method was missing the queue name parameter when building the dequeue command. The CLI command expects the queue name as the first positional argument after "dequeue", but it was not being included, causing dequeue operations via the API to fail. This change: - Updates Dequeue to use dag.ProcGroup() to get the queue name - Includes the queue name as the first argument after "dequeue" - Updates tests to verify the queue name is included - Adds test case for DAGs with custom queue names Fixes the issue where dequeuing from the API failed because the queue name was not being passed to the CLI command.
When queues are discovered dynamically (when items are enqueued), they were being auto-created with maxConcurrency=1 by default, even if the queue was configured in the config file with a higher value. This caused only 1 DAG to run at a time instead of the configured maxConcurrency value. The fix ensures that: 1. When a new queue is discovered, it checks the config first and uses the configured maxConcurrency value if found 2. Existing queues that were created before config was loaded are updated to use config values on subsequent processing cycles 3. Added debug logging to help diagnose queue configuration issues This ensures queue-level concurrency (maxConcurrency=n) works correctly, allowing n DAGs to run concurrently from the same queue as configured.
WalkthroughThis PR enhances the queue processor to dynamically handle global queues sourced from configuration. It introduces logic to distinguish between global queues (from config with preserved maxConcurrency) and DAG-based queues, while adding thread-safe state management and contextual logging throughout the queue initialization and processing lifecycle. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
internal/service/scheduler/queue_processor.go (3)
229-250: Consider holding lock for entire check-and-update to avoid TOCTOU pattern.The current pattern reads
isGlobalunder lock (lines 229-231), releases, then acquires again to update (lines 239-242). While safe in the current single-loop design, this could introduce a race if the code is refactored for concurrent execution.- existingQ.mu.Lock() - existingIsGlobal := existingQ.isGlobal - existingQ.mu.Unlock() - - // If it's not a global queue, check if it should be now - if !existingIsGlobal { - for _, queueConfig := range p.queuesConfig.Config { - if queueConfig.Name == queueName { - // Update to global queue with config value - newMaxConc := max(queueConfig.MaxActiveRuns, 1) - existingQ.mu.Lock() - existingQ.isGlobal = true - existingQ.maxConcurrency = newMaxConc - existingQ.mu.Unlock() + existingQ.mu.Lock() + if !existingQ.isGlobal { + for _, queueConfig := range p.queuesConfig.Config { + if queueConfig.Name == queueName { + existingQ.isGlobal = true + existingQ.maxConcurrency = max(queueConfig.MaxActiveRuns, 1) logger.Info(ctx, "Updated queue to global queue from config", tag.Queue(queueName), - tag.MaxConcurrency(newMaxConc), + tag.MaxConcurrency(existingQ.maxConcurrency), ) break } } } + existingQ.mu.Unlock()
359-371: Consider adding tag helpers for consistency with structured logging patterns.Lines 362, 369-370 use raw
slog.Bool("isGlobal", ...)andslog.Int("free", ...)/slog.Int("queued", ...)directly. Per the coding guidelines, the codebase prefers structured logging helpers frominternal/common/logger/tag. Consider addingtag.IsGlobal(),tag.Free(), andtag.Queued()helpers for consistency.
488-489: Handle the case where queue might not exist.The second return value from
p.queues.Load()is ignored. If the queue was removed between the check at line 310 and here, this would cause a nil pointer dereference at line 489. While the current control flow makes this unlikely (loop waits for processing to complete before cleanup), defensive coding would be safer.- queueVal, _ := p.queues.Load(queueName) - queue := queueVal.(*queue) + queueVal, ok := p.queues.Load(queueName) + if !ok { + logger.Warn(ctx, "Queue no longer exists", tag.Queue(queueName)) + return false + } + queue := queueVal.(*queue)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/service/scheduler/queue_processor.go(10 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/service/scheduler/queue_processor.go
🧬 Code graph analysis (1)
internal/service/scheduler/queue_processor.go (3)
internal/common/config/config.go (1)
Queues(240-243)internal/common/logger/tag/tag.go (6)
Config(354-356)Name(271-273)Queue(126-128)MaxConcurrency(146-148)Alive(151-153)Count(141-143)internal/common/logger/context.go (3)
Info(40-42)Warn(45-47)Debug(35-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (9)
internal/service/scheduler/queue_processor.go (9)
46-61: LGTM! Clean struct additions for global queue support.The
queuesConfigfield andisGlobalflag are well-placed additions that enable the new queue configuration behavior.
75-79: LGTM!Thread-safe accessor follows the established pattern in this struct.
116-122: LGTM!Correctly initializes global queues from config with proper
isGlobal: truemarking and configured concurrency values.
319-322: LGTM!Thread-safe read of
isGlobalfor debug logging purposes.
490-501: LGTM! Clear distinction between global and DAG-based queue concurrency handling.The lock is correctly held for the entire check-and-update operation. The "never decrease" logic for DAG-based queues is documented and prevents accidental concurrency reduction during active processing.
586-609: LGTM!Clean helper method that properly extracts the DAG reading logic and uses the thread-safe
setMaxConc()accessor.
380-389: LGTM!The added comments clarify the queue-level concurrency semantics, and the enhanced logging will aid in debugging queue capacity issues.
255-277: LGTM!Correctly preserves global queues while cleaning up inactive DAG-based queues.
337-347: LGTM! Key fix for the PR objective.This proactively updates
maxConcurrencyfor DAG-based queues before calculating available slots, ensuring the correct concurrency is applied from the first processing cycle rather than defaulting to 1.
|
@kriyanshii Thanks a lot for opening the PR! I might be wrong, but I believe the issue below has already been fixed in the latest version.
|
|
@yottahmd The observations look to be on the latest main branch based on timeline. Still if you think we can confirm by running a relevant test case on the latest release (1.26.4) to be double sure. |
|
|
||
| for _, queueConfig := range queuesConfig.Config { | ||
| conc := max(queueConfig.MaxActiveRuns, 1) | ||
| p.queues.Store(queueConfig.Name, &queue{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the global queue config is already stored in the queue list here. If I understand correctly, this PR might not result in any actual changes.
|
It looks like i might have worked on a stale commit. I didn't pull after my pr. I should have understood when I got merge conflict 🥲 |
|
Thanks @kriyanshii |
|
Gimme some time to test the latest commit tho |
|
closing due to based on stale commit. sorry |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1485 +/- ##
==========================================
- Coverage 59.89% 59.82% -0.07%
==========================================
Files 195 195
Lines 21918 21978 +60
==========================================
+ Hits 13127 13148 +21
- Misses 7385 7424 +39
Partials 1406 1406
... and 2 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
|
No worries! Thank you very much. |
When queues are discovered dynamically (when items are enqueued), they were being auto-created with maxConcurrency=1 by default, even if the queue was configured in the config file with a higher value. This caused only 1 DAG to run at a time instead of the configured maxConcurrency value.
The fix ensures that:
This ensures queue-level concurrency (maxConcurrency=n) works correctly, allowing n DAGs to run concurrently from the same queue as configured.
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.