feat(orchestrator): serialize webhooks admision#5855
Conversation
…sory lock Add a happy-path integration test verifying that tasks with webhook: group keys complete the full create/dequeue/succeed lifecycle through the advisory lock path. Add a benchmark test suite (npm run test:benchmark) to measure throughput with and without the advisory lock under concurrent producers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
TBonnin
left a comment
There was a problem hiding this comment.
see my comment inline. I don't think this is the best way to address the problem
| } | ||
|
|
||
| if (shouldSerializeImmediateEnqueue(taskProps.groupKey)) { | ||
| await trx.raw(`SELECT pg_advisory_xact_lock(?) as "lock_immediate_enqueue"`, [stringToHash(`enqueue:${taskProps.groupKey}`)]); |
There was a problem hiding this comment.
A few things:
- I think all types of immediate tasks should be handled, not just webhooks. The same exact scenario can happen with actions
- This doesn't reduce the amount of queries. It is just distributing them over time, potentially slowing down any downstream execution.
I think a better solution would be to collapse the counting query (aka: caching promises) in order to transform all concurrency queries into a single one, at the cost of a little bit of memory.
There was a problem hiding this comment.
I think all types of immediate tasks should be handled, not just webhooks. The same exact scenario can happen with actions
Yes technically, still the conditions are not the same, webhooks are unrate limitted in our public API and we also have the fan out of creating N tasks per webhook which makes the webhook case much more concerning.
But yes, I would agree that any other primitive can also
This doesn't reduce the amount of queries. It is just distributing them over time, potentially slowing down any downstream execution.
Yes it slows down the downstream, providing less throughput since operations are serialized. But it basically reduces the max load that a single group key can inflict to our DB which is the main goal
I think a better solution would be to collapse the counting query (aka: caching promises) in order to transform all concurrency queries into a single one, at the cost of a little bit of memory.
Yea good point, so you mean moving into a queue of promises all requests that are targeting the same groups? and once we have the result from the first one we just allow the others to move ahead or reject considering the initial counter received?
There was a problem hiding this comment.
ill go for a solution which is not super complicated by just reusing the same counter returned by the first request
There was a problem hiding this comment.
have a look at this PR for an example of this promise coalescing pattern https://github.com/NangoHQ/nango/pull/5090/changes.
It is not a queue of promise, it is a single promise (per group key) that is shared across requests. Sometimes using node has some benefits 😉. So if 1000s of requests hit at the same time for the same webhook for instance, the count query can be a single promise that all requests await
…eSizes Replace the pg_advisory_xact_lock serialization with promise coalescing on the queueSizes counting query. Concurrent immediate() calls for the same group key now share a single DB count query instead of each running their own, reducing DB load without serializing inserts or penalizing throughput. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d545933ec5
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| const inflightQueueSizes = new Map<string, Promise<Result<Map<string, number>>>>(); | ||
|
|
||
| export async function queueSizes(db: knex.Knex, opts: { groupKeys?: string[] | undefined }): Promise<Result<Map<string, number>>> { | ||
| const cacheKey = opts.groupKeys ? opts.groupKeys.sort().join(',') : '*'; |
There was a problem hiding this comment.
Use unambiguous key encoding for inflight queueSizes cache
The cache key opts.groupKeys.sort().join(',') is ambiguous when a group key itself contains a comma, so distinct key sets can collide (for example, ['a,b','c'] vs ['a','b,c']). In that case, a caller can reuse an in-flight promise for the wrong whereIn set and receive counts for different groups, which can incorrectly cap or admit task creation for valid inputs (group keys are free-form non-empty strings). Use a collision-free encoding (e.g., JSON.stringify([...groupKeys].sort())) for the cache key.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Valid point. group key currently cannot have a , but let's not rely on this assumption
Remove the benchmark config, benchmark npm script, and the webhook group key integration test that were added for the advisory lock approach which has been replaced by promise coalescing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use JSON.stringify for the inflight cache key instead of join(',')
to avoid ambiguity when group keys contain commas.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Serialize webhook task creation in the scheduler using PostgreSQL advisory locks (
pg_advisory_xact_lock). When multiple concurrent webhook enqueue requests target the same group key, the advisory lock forces them to be serialized, preventing DB overload from unbounded parallel inserts.webhook:acquire a transaction-scoped advisory lock keyed onenqueue:{groupKey}before creationBenchmark Results
Measured on a local testcontainers PostgreSQL 15.5 instance, 20s per scenario:
Key observations:
pg_advisory_xact_lockper taskTest plan
npm run test:benchmark -- --dir packages/schedulervalidates serialization effect