Skip to content

Conversation

@kriyanshii
Copy link
Contributor

@kriyanshii kriyanshii commented Dec 14, 2025

When queues are discovered dynamically (when items are enqueued), they were being auto-created with maxConcurrency=1 by default, even if the queue was configured in the config file with a higher value. This caused only 1 DAG to run at a time instead of the configured maxConcurrency value.

The fix ensures that:

  1. When a new queue is discovered, it checks the config first and uses the configured maxConcurrency value if found
  2. Existing queues that were created before config was loaded are updated to use config values on subsequent processing cycles
  3. Added debug logging to help diagnose queue configuration issues

This ensures queue-level concurrency (maxConcurrency=n) works correctly, allowing n DAGs to run concurrently from the same queue as configured.

Summary by CodeRabbit

  • Refactor
    • Enhanced queue initialization and processing with improved concurrency management based on configuration and thread-safety improvements for better system reliability and performance.

✏️ Tip: You can customize this high-level summary in your review settings.

kriyanshii and others added 4 commits December 14, 2025 12:09
The SubCmdBuilder.Dequeue method was missing the queue name parameter
when building the dequeue command. The CLI command expects the queue
name as the first positional argument after "dequeue", but it was not
being included, causing dequeue operations via the API to fail.

This change:
- Updates Dequeue to use dag.ProcGroup() to get the queue name
- Includes the queue name as the first argument after "dequeue"
- Updates tests to verify the queue name is included
- Adds test case for DAGs with custom queue names

Fixes the issue where dequeuing from the API failed because the queue
name was not being passed to the CLI command.
When queues are discovered dynamically (when items are enqueued),
they were being auto-created with maxConcurrency=1 by default,
even if the queue was configured in the config file with a higher
value. This caused only 1 DAG to run at a time instead of the
configured maxConcurrency value.

The fix ensures that:
1. When a new queue is discovered, it checks the config first
   and uses the configured maxConcurrency value if found
2. Existing queues that were created before config was loaded
   are updated to use config values on subsequent processing
   cycles
3. Added debug logging to help diagnose queue configuration
   issues

This ensures queue-level concurrency (maxConcurrency=n) works
correctly, allowing n DAGs to run concurrently from the same
queue as configured.
@coderabbitai
Copy link

coderabbitai bot commented Dec 14, 2025

Walkthrough

This PR enhances the queue processor to dynamically handle global queues sourced from configuration. It introduces logic to distinguish between global queues (from config with preserved maxConcurrency) and DAG-based queues, while adding thread-safe state management and contextual logging throughout the queue initialization and processing lifecycle.

Changes

Cohort / File(s) Summary
Queue Processor Global Queue Handling
internal/service/scheduler/queue_processor.go
Added queuesConfig field and parameter to QueueProcessor and NewQueueProcessor to store global-queue configuration. Implemented dynamic queue initialization logic that inspects config to determine if a queue should be treated as global or DAG-based. Added queue upgrade logic to convert non-global queues to global when they appear in config. Enhanced ProcessQueueItems with contextual logging for missing/existing queues and capacity calculations. Updated concurrency semantics: DAG queues use MaxActiveRuns; global queues preserve configured maxConcurrency. Added mutex-guarded reads/updates for thread-safe state changes to isGlobal and maxConcurrency. Introduced debug/info logging for queue initialization, updates, and processing.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Thread-safety verification: Carefully examine mutex placement around isGlobal and maxConcurrency reads/updates to ensure no race conditions exist.
  • Config-to-queue mapping logic: Verify the interaction between config-sourced global queues and dynamically created DAG-based queues, especially queue upgrade scenarios.
  • Concurrency semantics: Validate that DAG queues correctly use MaxActiveRuns while global queues preserve config-specified maxConcurrency.
  • Queue lifecycle edge cases: Check behavior of global queue persistence when inactive and cleanup of non-global queues.

Possibly related PRs

Suggested reviewers

  • yottahmd

Poem

🐰 Queues now dance with config's grace,
Global marks find their rightful place,
Mutexes guard each state with care,
Concurrency blooms beyond compare!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: fixing queue maxConcurrency initialization from config in the scheduler. It directly matches the PR's primary objective of ensuring configured maxConcurrency values are properly used when queues are discovered.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
internal/service/scheduler/queue_processor.go (3)

229-250: Consider holding lock for entire check-and-update to avoid TOCTOU pattern.

The current pattern reads isGlobal under lock (lines 229-231), releases, then acquires again to update (lines 239-242). While safe in the current single-loop design, this could introduce a race if the code is refactored for concurrent execution.

-			existingQ.mu.Lock()
-			existingIsGlobal := existingQ.isGlobal
-			existingQ.mu.Unlock()
-
-			// If it's not a global queue, check if it should be now
-			if !existingIsGlobal {
-				for _, queueConfig := range p.queuesConfig.Config {
-					if queueConfig.Name == queueName {
-						// Update to global queue with config value
-						newMaxConc := max(queueConfig.MaxActiveRuns, 1)
-						existingQ.mu.Lock()
-						existingQ.isGlobal = true
-						existingQ.maxConcurrency = newMaxConc
-						existingQ.mu.Unlock()
+			existingQ.mu.Lock()
+			if !existingQ.isGlobal {
+				for _, queueConfig := range p.queuesConfig.Config {
+					if queueConfig.Name == queueName {
+						existingQ.isGlobal = true
+						existingQ.maxConcurrency = max(queueConfig.MaxActiveRuns, 1)
						logger.Info(ctx, "Updated queue to global queue from config",
							tag.Queue(queueName),
-							tag.MaxConcurrency(newMaxConc),
+							tag.MaxConcurrency(existingQ.maxConcurrency),
						)
						break
					}
				}
			}
+			existingQ.mu.Unlock()

359-371: Consider adding tag helpers for consistency with structured logging patterns.

Lines 362, 369-370 use raw slog.Bool("isGlobal", ...) and slog.Int("free", ...) / slog.Int("queued", ...) directly. Per the coding guidelines, the codebase prefers structured logging helpers from internal/common/logger/tag. Consider adding tag.IsGlobal(), tag.Free(), and tag.Queued() helpers for consistency.


488-489: Handle the case where queue might not exist.

The second return value from p.queues.Load() is ignored. If the queue was removed between the check at line 310 and here, this would cause a nil pointer dereference at line 489. While the current control flow makes this unlikely (loop waits for processing to complete before cleanup), defensive coding would be safer.

-	queueVal, _ := p.queues.Load(queueName)
-	queue := queueVal.(*queue)
+	queueVal, ok := p.queues.Load(queueName)
+	if !ok {
+		logger.Warn(ctx, "Queue no longer exists", tag.Queue(queueName))
+		return false
+	}
+	queue := queueVal.(*queue)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be3e71b and 83bb69b.

📒 Files selected for processing (1)
  • internal/service/scheduler/queue_processor.go (10 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/service/scheduler/queue_processor.go
🧬 Code graph analysis (1)
internal/service/scheduler/queue_processor.go (3)
internal/common/config/config.go (1)
  • Queues (240-243)
internal/common/logger/tag/tag.go (6)
  • Config (354-356)
  • Name (271-273)
  • Queue (126-128)
  • MaxConcurrency (146-148)
  • Alive (151-153)
  • Count (141-143)
internal/common/logger/context.go (3)
  • Info (40-42)
  • Warn (45-47)
  • Debug (35-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (9)
internal/service/scheduler/queue_processor.go (9)

46-61: LGTM! Clean struct additions for global queue support.

The queuesConfig field and isGlobal flag are well-placed additions that enable the new queue configuration behavior.


75-79: LGTM!

Thread-safe accessor follows the established pattern in this struct.


116-122: LGTM!

Correctly initializes global queues from config with proper isGlobal: true marking and configured concurrency values.


319-322: LGTM!

Thread-safe read of isGlobal for debug logging purposes.


490-501: LGTM! Clear distinction between global and DAG-based queue concurrency handling.

The lock is correctly held for the entire check-and-update operation. The "never decrease" logic for DAG-based queues is documented and prevents accidental concurrency reduction during active processing.


586-609: LGTM!

Clean helper method that properly extracts the DAG reading logic and uses the thread-safe setMaxConc() accessor.


380-389: LGTM!

The added comments clarify the queue-level concurrency semantics, and the enhanced logging will aid in debugging queue capacity issues.


255-277: LGTM!

Correctly preserves global queues while cleaning up inactive DAG-based queues.


337-347: LGTM! Key fix for the PR objective.

This proactively updates maxConcurrency for DAG-based queues before calculating available slots, ensuring the correct concurrency is applied from the first processing cycle rather than defaulting to 1.

@yottahmd
Copy link
Collaborator

@kriyanshii Thanks a lot for opening the PR! I might be wrong, but I believe the issue below has already been fixed in the latest version.

When queues are discovered dynamically (when items are enqueued), they were being auto-created with maxConcurrency=1 by default, even if the queue was configured in the config file with a higher value. This caused only 1 DAG to run at a time instead of the configured maxConcurrency value.

@ghansham
Copy link

ghansham commented Dec 15, 2025

@yottahmd The observations look to be on the latest main branch based on timeline. Still if you think we can confirm by running a relevant test case on the latest release (1.26.4) to be double sure.


for _, queueConfig := range queuesConfig.Config {
conc := max(queueConfig.MaxActiveRuns, 1)
p.queues.Store(queueConfig.Name, &queue{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the global queue config is already stored in the queue list here. If I understand correctly, this PR might not result in any actual changes.

@kriyanshii
Copy link
Contributor Author

It looks like i might have worked on a stale commit. I didn't pull after my pr. I should have understood when I got merge conflict 🥲

@ghansham
Copy link

Thanks @kriyanshii
Sorry for the confusion @yottahmd

@kriyanshii
Copy link
Contributor Author

Gimme some time to test the latest commit tho

@kriyanshii
Copy link
Contributor Author

closing due to based on stale commit. sorry

@kriyanshii kriyanshii closed this Dec 15, 2025
@codecov
Copy link

codecov bot commented Dec 15, 2025

Codecov Report

❌ Patch coverage is 31.34328% with 46 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.82%. Comparing base (be3e71b) to head (83bb69b).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/scheduler/queue_processor.go 31.34% 45 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1485      +/-   ##
==========================================
- Coverage   59.89%   59.82%   -0.07%     
==========================================
  Files         195      195              
  Lines       21918    21978      +60     
==========================================
+ Hits        13127    13148      +21     
- Misses       7385     7424      +39     
  Partials     1406     1406              
Files with missing lines Coverage Δ
internal/service/scheduler/queue_processor.go 36.56% <31.34%> (-0.65%) ⬇️

... and 2 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update be3e71b...83bb69b. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yottahmd
Copy link
Collaborator

No worries! Thank you very much.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants