feat(webhooks): add dispatch queue publisher (NAN-5339) 1/5#5955
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 74bcc1255f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| const failedIds = (response.Failed ?? []).map((f) => f.Id).filter((id): id is string => typeof id === 'string'); | ||
| return { failedIds }; | ||
| } catch (err) { | ||
| logger.error(`SendMessageBatchCommand threw`, { error: err }); |
There was a problem hiding this comment.
Intetional log here as we don't have per-batch tracing
74bcc12 to
269273c
Compare
| return { | ||
| Id: indexToEntryId(index), | ||
| MessageBody: JSON.stringify(message), | ||
| MessageGroupId: messageGroupId |
There was a problem hiding this comment.
no MessageDeduplicationId
There was a problem hiding this comment.
We are using a standard SQS queue with fair-queue behavior via MessageGroupId, not a FIFO queue. Unfortunately the caveat is that fair-queues do not support MessageDeduplicationId. We will be using the task name to dedupe on the orchestrator. I'm open to using a FIFO queue with MessageDeduplicationId but we lose the fairness across tenants. Even then orchestrator would still need deduping transport retries. (FIFO guarantees at-least-once delivery)
| /** | ||
| * Publish a list of dispatch messages in batches. Batches are fired in parallel up to | ||
| * `publishConcurrency`; failed entries within a batch are retried once inline. Any | ||
| * entries still failing after the retry are counted as `failed`. Never throws — the |
There was a problem hiding this comment.
Never throws is contradicted by L112
There was a problem hiding this comment.
Yeah, fixing it. I meant never throws on regular SQS operations.
|
|
||
| if (enqueued > 0) { | ||
| metrics.increment(metrics.Types.WEBHOOK_DISPATCH_MESSAGES_ENQUEUED, enqueued, { provider }); | ||
| metrics.increment(metrics.Types.WEBHOOK_DISPATCH_PUBLISH_SUCCESS, enqueued, { provider }); |
There was a problem hiding this comment.
Do we need both? We are counting the same things twice
There was a problem hiding this comment.
Agreed, enqueued ended up being redundant here and also reads more like queue depth, so I removed it and kept publish.success / publish.failure
| tracerMocks.dogstatsd.decrement.mockClear(); | ||
| tracerMocks.dogstatsd.gauge.mockClear(); | ||
| tracerMocks.dogstatsd.histogram.mockClear(); | ||
| tracerMocks.dogstatsd.distribution.mockClear(); |
There was a problem hiding this comment.
isn't restoreAllMocks already cleaning up the spies?
There was a problem hiding this comment.
Most of these are hoisted vi.fn() mocks, not vi.spyOns, and restoreAllMocks() does not clear or reset the history of vi.fn()
907fa9e to
5fb9cd8
Compare
| } | ||
| this.batchSize = Math.min(configuredBatchSize, SQS_BATCH_MAX_ENTRIES); | ||
|
|
||
| const configuredPublishConcurrency = props.publishConcurrency ?? SQS_BATCH_MAX_ENTRIES; |
There was a problem hiding this comment.
Why are we using a constant for batching as a default for concurrency?
There was a problem hiding this comment.
Missed this, originally I had set it to the same value and forgot to split into new const
| @@ -67,6 +67,16 @@ export enum Types { | |||
| WEBHOOK_ASYNC_ACTION_FAILED = 'nango.webhook.async_action.failed', | |||
| WEBHOOK_INCOMING_PAYLOAD_SIZE_BYTES = 'nango.webhook.incoming.payloadSizeBytes', | |||
|
|
|||
| WEBHOOK_DISPATCH_PUBLISH_SUCCESS = 'nango.webhook.dispatch_queue.publish.success', | |||
| WEBHOOK_DISPATCH_PUBLISH_FAILURE = 'nango.webhook.dispatch_queue.publish.failure', | |||
There was a problem hiding this comment.
I imagine the following metrics are declared for future use?
2e1ff55 to
b4f5512
Compare
Introduces a DispatchQueuePublisher abstraction for publishing webhook execution messages to SQS. Uses activity log ID as the message identifier. Adds tracing and a shared webhook concurrency helper.
b4f5512 to
ec71242
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ec71242da9
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| return { | ||
| Id: indexToEntryId(index), | ||
| MessageBody: JSON.stringify(message), | ||
| MessageGroupId: messageGroupId |
There was a problem hiding this comment.
Validate FIFO requirements before setting SQS group fields
toEntry always sends MessageGroupId but never sets MessageDeduplicationId, which only works for a FIFO queue that already has content-based dedup enabled. In environments where the queue is standard, or FIFO without content-based deduplication, SendMessageBatch will reject every entry and this publisher will return all messages as failed, causing webhook dispatches to be dropped instead of queued. Please either enforce/validate the queue mode at construction time or include a deterministic deduplication ID (for example from taskName) when publishing.
Useful? React with 👍 / 👎.
- WebhookDispatchMessage shared type in `@nangohq/types` - DispatchQueuePublisher in server package: batched SendMessageBatch, parallel batches, one inline retry for failed entries, partial-failure emits metric + log - New WEBHOOK_DISPATCH_* metric types in `@nangohq/utils` - Adds `@aws-sdk/client-sqs` depedency to server No call sites yet, wired up in NAN-5340. <!-- Describe the problem and your solution --> <!-- Issue ticket number and link (if applicable) --> <!-- Testing instructions (skip if just adding/editing providers) -->
@nangohq/types@nangohq/utils@aws-sdk/client-sqsdepedency to serverNo call sites yet, wired up in NAN-5340.