Skip to content

feat(orchestrator): serialize webhooks admision#5855

Merged
pfreixes merged 6 commits into
masterfrom
pfreixes/INC-100-serialize-webhooks-admision
Apr 15, 2026
Merged

feat(orchestrator): serialize webhooks admision#5855
pfreixes merged 6 commits into
masterfrom
pfreixes/INC-100-serialize-webhooks-admision

Conversation

@pfreixes

@pfreixes pfreixes commented Apr 14, 2026

Copy link
Copy Markdown
Contributor

Summary

Serialize webhook task creation in the scheduler using PostgreSQL advisory locks (pg_advisory_xact_lock). When multiple concurrent webhook enqueue requests target the same group key, the advisory lock forces them to be serialized, preventing DB overload from unbounded parallel inserts.

  • Tasks with group keys starting with webhook: acquire a transaction-scoped advisory lock keyed on enqueue:{groupKey} before creation
  • The lock is automatically released on transaction commit/rollback
  • Non-webhook tasks are unaffected

Benchmark Results

Measured on a local testcontainers PostgreSQL 15.5 instance, 20s per scenario:

Scenario Producers Advisory Lock Throughput (tasks/s) DB CPU (max)
1 1 No 722 ~57%
2 1 Yes 654 ~54%
3 2 Yes 764 ~66%
4 2 No 1,176 ~142%
5 4 No 1,670 ~300%

Key observations:

  • Lock overhead (single producer): ~9.4% throughput reduction, ~5% less CPU — the pure cost of acquiring pg_advisory_xact_lock per task
  • Without lock, CPU scales faster than throughput: 2 producers → 1.6x throughput at 2.5x CPU; 4 producers → 2.3x throughput at 5.3x CPU
  • The lock caps concurrent webhook producers to a ~constant throughput and ~constant CPU regardless of producer count, which is the intended behavior

Test plan

  • Integration test: happy-path test for webhook group key through create/dequeue/succeed lifecycle
  • Benchmark test: npm run test:benchmark -- --dir packages/scheduler validates serialization effect
  • All existing scheduler integration tests pass (66/66)

pfreixes and others added 3 commits April 14, 2026 11:13
…sory lock

Add a happy-path integration test verifying that tasks with webhook: group
keys complete the full create/dequeue/succeed lifecycle through the advisory
lock path. Add a benchmark test suite (npm run test:benchmark) to measure
throughput with and without the advisory lock under concurrent producers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@linear

linear Bot commented Apr 14, 2026

Copy link
Copy Markdown

@pfreixes pfreixes changed the title Pfreixes/inc 100 serialize webhooks admision feat(orchestrator): serialize webhooks admision Apr 14, 2026
@pfreixes pfreixes requested review from a team and TBonnin April 14, 2026 10:51

@TBonnin TBonnin left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment inline. I don't think this is the best way to address the problem

Comment thread packages/scheduler/lib/scheduler.ts Outdated
}

if (shouldSerializeImmediateEnqueue(taskProps.groupKey)) {
await trx.raw(`SELECT pg_advisory_xact_lock(?) as "lock_immediate_enqueue"`, [stringToHash(`enqueue:${taskProps.groupKey}`)]);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things:

  • I think all types of immediate tasks should be handled, not just webhooks. The same exact scenario can happen with actions
  • This doesn't reduce the amount of queries. It is just distributing them over time, potentially slowing down any downstream execution.

I think a better solution would be to collapse the counting query (aka: caching promises) in order to transform all concurrency queries into a single one, at the cost of a little bit of memory.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all types of immediate tasks should be handled, not just webhooks. The same exact scenario can happen with actions

Yes technically, still the conditions are not the same, webhooks are unrate limitted in our public API and we also have the fan out of creating N tasks per webhook which makes the webhook case much more concerning.

But yes, I would agree that any other primitive can also

This doesn't reduce the amount of queries. It is just distributing them over time, potentially slowing down any downstream execution.

Yes it slows down the downstream, providing less throughput since operations are serialized. But it basically reduces the max load that a single group key can inflict to our DB which is the main goal

I think a better solution would be to collapse the counting query (aka: caching promises) in order to transform all concurrency queries into a single one, at the cost of a little bit of memory.

Yea good point, so you mean moving into a queue of promises all requests that are targeting the same groups? and once we have the result from the first one we just allow the others to move ahead or reject considering the initial counter received?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill go for a solution which is not super complicated by just reusing the same counter returned by the first request

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have a look at this PR for an example of this promise coalescing pattern https://github.com/NangoHQ/nango/pull/5090/changes.
It is not a queue of promise, it is a single promise (per group key) that is shared across requests. Sometimes using node has some benefits 😉. So if 1000s of requests hit at the same time for the same webhook for instance, the count query can be a single promise that all requests await

…eSizes

Replace the pg_advisory_xact_lock serialization with promise coalescing
on the queueSizes counting query. Concurrent immediate() calls for the
same group key now share a single DB count query instead of each running
their own, reducing DB load without serializing inserts or penalizing
throughput.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: d545933ec5

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/scheduler/lib/models/tasks.ts Outdated
const inflightQueueSizes = new Map<string, Promise<Result<Map<string, number>>>>();

export async function queueSizes(db: knex.Knex, opts: { groupKeys?: string[] | undefined }): Promise<Result<Map<string, number>>> {
const cacheKey = opts.groupKeys ? opts.groupKeys.sort().join(',') : '*';

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use unambiguous key encoding for inflight queueSizes cache

The cache key opts.groupKeys.sort().join(',') is ambiguous when a group key itself contains a comma, so distinct key sets can collide (for example, ['a,b','c'] vs ['a','b,c']). In that case, a caller can reuse an in-flight promise for the wrong whereIn set and receive counts for different groups, which can incorrectly cap or admit task creation for valid inputs (group keys are free-form non-empty strings). Use a collision-free encoding (e.g., JSON.stringify([...groupKeys].sort())) for the cache key.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. group key currently cannot have a , but let's not rely on this assumption

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good point

pfreixes and others added 2 commits April 14, 2026 17:02
Remove the benchmark config, benchmark npm script, and the webhook
group key integration test that were added for the advisory lock
approach which has been replaced by promise coalescing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use JSON.stringify for the inflight cache key instead of join(',')
to avoid ambiguity when group keys contain commas.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@pfreixes pfreixes added this pull request to the merge queue Apr 15, 2026
Merged via the queue into master with commit 6b29438 Apr 15, 2026
23 checks passed
@pfreixes pfreixes deleted the pfreixes/INC-100-serialize-webhooks-admision branch April 15, 2026 06:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants