Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion internal/alert/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,25 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp
}

// Consecutive failure tracking
count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID)
res, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID)
if err != nil {
return fmt.Errorf("failed to get alert state: %w", err)
}

// Replayed attempt (MQ redelivery, producer re-publish) that already
// completed a full evaluation — skip re-emitting alerts. Attempts that
// were counted but never marked evaluated (eval failed mid-way and the
// message was nacked) fall through and re-run as recovery.
if !res.NewlyCounted && res.AlreadyEvaluated {
m.logger.Ctx(ctx).Debug("skipping replayed attempt: already evaluated",
zap.String("attempt_id", attempt.Attempt.ID),
zap.String("tenant_id", attempt.Destination.TenantID),
zap.String("destination_id", attempt.Destination.ID),
)
return nil
}

count := res.Count
level, shouldAlert := m.evaluator.ShouldAlert(count)
if shouldAlert {
// At 100% threshold, disable the destination and emit disabled alert.
Expand Down Expand Up @@ -249,5 +263,17 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp
}
}

// Mark the attempt fully evaluated so replays skip re-emitting alerts.
// Non-fatal: on failure the attempt simply re-evaluates on replay, which
// matches the previous behavior (emit/disable are idempotent-by-design).
if err := m.store.MarkAttemptEvaluated(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID); err != nil {
m.logger.Ctx(ctx).Warn("failed to mark attempt evaluated",
zap.Error(err),
zap.String("attempt_id", attempt.Attempt.ID),
zap.String("tenant_id", attempt.Destination.TenantID),
zap.String("destination_id", attempt.Destination.ID),
)
}

return nil
}
87 changes: 87 additions & 0 deletions internal/alert/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,93 @@ func TestAlertMonitor_ExhaustedRetries_EmitFailureRetryClearsWindow(t *testing.T
require.Equal(t, 2, erCalls, "Should have 2 emit attempts (1 failed + 1 succeeded)")
}

func TestAlertMonitor_ReplayedAttempt_SkipsEvaluation(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := testutil.CreateTestLogger(t)
redisClient := testutil.CreateTestRedisClient(t)
emitter := &mockAlertEmitter{}
emitter.On("Emit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

monitor := alert.NewAlertMonitor(
logger,
redisClient,
emitter,
10,
alert.WithAutoDisableFailureCount(2),
alert.WithAlertThresholds([]int{50, 100}),
)

dest := &alert.AlertDestination{ID: "dest_replay", TenantID: "tenant_replay"}
event := &models.Event{Topic: "test.event"}
attempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: "att_replay_1",
Status: "failed",
Code: "500",
Time: time.Now(),
},
}

// First delivery: count=1 → 50% threshold → emits
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
require.Equal(t, 1, countEmitCalls(emitter, "alert.destination.consecutive_failure"))

// Replay (MQ redelivery / producer re-publish): fully evaluated → skipped
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
assert.Equal(t, 1, countEmitCalls(emitter, "alert.destination.consecutive_failure"),
"replayed attempt should not re-emit the alert")
}

func TestAlertMonitor_ReplayedAttempt_PartialFailureRetries(t *testing.T) {
// When evaluation fails after the attempt was counted (e.g. emit error),
// the message is nacked and redelivered — the replay must re-run the
// evaluation, not skip it, or alerts would be silently dropped.
t.Parallel()
ctx := context.Background()
logger := testutil.CreateTestLogger(t)
redisClient := testutil.CreateTestRedisClient(t)
emitter := &mockAlertEmitter{}
emitter.On("Emit", mock.Anything, "alert.destination.consecutive_failure", mock.Anything, mock.Anything).Return(fmt.Errorf("emit failed")).Once()
emitter.On("Emit", mock.Anything, "alert.destination.consecutive_failure", mock.Anything, mock.Anything).Return(nil)

monitor := alert.NewAlertMonitor(
logger,
redisClient,
emitter,
10,
alert.WithAutoDisableFailureCount(2),
alert.WithAlertThresholds([]int{50, 100}),
)

dest := &alert.AlertDestination{ID: "dest_partial", TenantID: "tenant_partial"}
event := &models.Event{Topic: "test.event"}
attempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: "att_partial_1",
Status: "failed",
Code: "500",
Time: time.Now(),
},
}

// First delivery: counted, but emit fails → error (entry would be nacked)
require.Error(t, monitor.HandleAttempt(ctx, attempt))

// Replay: not marked evaluated → re-runs and emits successfully
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
assert.Equal(t, 2, countEmitCalls(emitter, "alert.destination.consecutive_failure"),
"replay after partial failure should re-run evaluation")

// Second replay: now fully evaluated → skipped
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
assert.Equal(t, 2, countEmitCalls(emitter, "alert.destination.consecutive_failure"))
}

func countEmitCalls(emitter *mockAlertEmitter, topic string) int {
count := 0
for _, call := range emitter.Calls {
Expand Down
66 changes: 56 additions & 10 deletions internal/alert/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,27 @@ import (
)

const (
keyPrefixAlert = "alert" // Base prefix for all alert keys
keyFailures = "cf" // Set for consecutive failure attempt IDs
keyPrefixAlert = "alert" // Base prefix for all alert keys
keyFailures = "cf" // Set for consecutive failure attempt IDs
keyEvaluated = "cfeval" // Set for fully evaluated attempt IDs
alertKeyTTL = 24 * time.Hour
)

// FailureCountResult reports the state of consecutive-failure tracking
// after recording an attempt.
type FailureCountResult struct {
Count int // current consecutive failure count
NewlyCounted bool // attempt was not previously counted (first delivery of this attempt)
AlreadyEvaluated bool // attempt was fully evaluated before (marked via MarkAttemptEvaluated)
}

// AlertStore manages alert-related data persistence
type AlertStore interface {
IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error)
IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (FailureCountResult, error)
// MarkAttemptEvaluated records that an attempt's alert evaluation fully
// completed, so replays (MQ redelivery, producer re-publish) can skip
// re-evaluating it.
MarkAttemptEvaluated(ctx context.Context, tenantID, destinationID, attemptID string) error
ResetConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) error
}

Expand All @@ -32,32 +46,60 @@ func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore {
}
}

func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error) {
func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (FailureCountResult, error) {
key := s.getFailuresKey(destinationID)

// Use a transaction to ensure atomicity between SADD, SCARD, and EXPIRE operations.
// SADD is idempotent — adding the same attemptID on replay is a no-op,
// preventing double-counting when messages are redelivered.
pipe := s.client.TxPipeline()
pipe.SAdd(ctx, key, attemptID)
saddCmd := pipe.SAdd(ctx, key, attemptID)
scardCmd := pipe.SCard(ctx, key)
pipe.Expire(ctx, key, 24*time.Hour)
evaluatedCmd := pipe.SIsMember(ctx, s.getEvaluatedKey(destinationID), attemptID)
pipe.Expire(ctx, key, alertKeyTTL)

_, err := pipe.Exec(ctx)
if err != nil {
return 0, fmt.Errorf("failed to execute consecutive failure count transaction: %w", err)
return FailureCountResult{}, fmt.Errorf("failed to execute consecutive failure count transaction: %w", err)
}

added, err := saddCmd.Result()
if err != nil {
return FailureCountResult{}, fmt.Errorf("failed to record attempt: %w", err)
}

count, err := scardCmd.Result()
if err != nil {
return 0, fmt.Errorf("failed to get consecutive failure count: %w", err)
return FailureCountResult{}, fmt.Errorf("failed to get consecutive failure count: %w", err)
}

evaluated, err := evaluatedCmd.Result()
if err != nil {
return FailureCountResult{}, fmt.Errorf("failed to check evaluated state: %w", err)
}

return int(count), nil
return FailureCountResult{
Count: int(count),
NewlyCounted: added == 1,
AlreadyEvaluated: evaluated,
}, nil
}

func (s *redisAlertStore) MarkAttemptEvaluated(ctx context.Context, tenantID, destinationID, attemptID string) error {
key := s.getEvaluatedKey(destinationID)

pipe := s.client.TxPipeline()
pipe.SAdd(ctx, key, attemptID)
pipe.Expire(ctx, key, alertKeyTTL)

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to mark attempt evaluated: %w", err)
}
return nil
}

func (s *redisAlertStore) ResetConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) error {
return s.client.Del(ctx, s.getFailuresKey(destinationID)).Err()
return s.client.Del(ctx, s.getFailuresKey(destinationID), s.getEvaluatedKey(destinationID)).Err()
}

func (s *redisAlertStore) deploymentPrefix() string {
Expand All @@ -70,3 +112,7 @@ func (s *redisAlertStore) deploymentPrefix() string {
func (s *redisAlertStore) getFailuresKey(destinationID string) string {
return fmt.Sprintf("%s%s:%s:%s", s.deploymentPrefix(), keyPrefixAlert, destinationID, keyFailures)
}

func (s *redisAlertStore) getEvaluatedKey(destinationID string) string {
return fmt.Sprintf("%s%s:%s:%s", s.deploymentPrefix(), keyPrefixAlert, destinationID, keyEvaluated)
}
Loading
Loading