Этот проект представляет собой реализацию паттерна "Transactional Outbox" на Go. Он обеспечивает надежную асинхронную доставку сообщений из микросервисов в брокер сообщений (по умолчанию Kafka), даже в случае сбоев.
Для установки пакета выполните команду:
go get github.com/overtonx/outbox/v2- Сохранение события: Вместо прямой отправки сообщения в брокер, сервис сохраняет его как событие (
Event) в специальную таблицуoutbox_eventsв своей базе данных. Это происходит в рамках той же транзакции, что и основная бизнес-логика. Это гарантирует, что событие будет сохранено только в том случае, если бизнес-транзакция успешно завершена. - Фоновая обработка: Отдельный процесс, Диспетчер (
Dispatcher), периодически опрашивает таблицуoutbox_eventsна наличие новых, необработанных событий. - Публикация: Обнаружив новые события,
Dispatcherс помощью Публикатора (Publisher) отправляет их в брокер сообщений. - Обновление статуса: После успешной отправки
Dispatcherпомечает событие в таблице как обработанное. В случае сбоя отправки, он увеличивает счетчик попыток и планирует повторную отправку с использованием настраиваемой стратегии отсрочки (backoff). - Dead-Letter Queue: Если событие не удается доставить после максимального количества попыток, оно перемещается в таблицу "мертвых писем" (
outbox_deadletters) для последующего анализа.
outbox: Основной пакет для создания и сохранения событий (SaveEvent).Dispatcher: Ядро системы. Управляет воркерами, которые опрашивают базу данных, обрабатывают и публикуют события, а также выполняют очистку.Publisher: Интерфейс для отправки сообщений. По умолчанию предоставляетсяKafkaPublisher. Вы можете реализовать свой собственныйPublisherдля интеграции с другими брокерами (например, RabbitMQ).- Воркеры (
Worker): Фоновые процессы, управляемыеDispatcher, для выполнения конкретных задач:EventProcessor: Обрабатывает и публикует новые события.DeadLetterService: Перемещает неисправимые события в DLQ.StuckEventService: Восстанавливает "зависшие" события, которые находились в обработке слишком долго.CleanupService: Удаляет старые обработанные события и записи из DLQ.
Диспетчер создается с помощью NewDispatcher и настраивается через функциональные опции (DispatcherOption).
// Пример создания с опциями
dispatcher, err := outbox.NewDispatcher(
db, // *sql.DB
outbox.WithPollInterval(5 * time.Second),
outbox.WithMaxAttempts(5),
outbox.WithPublisher(myCustomPublisher),
)
if err != nil {
// ...
}
// Запуск диспетчера
go dispatcher.Start(context.Background())WithPollInterval(time.Duration): Интервал опроса таблицыoutbox_eventsна наличие новых событий. (По умолчанию: 2 секунды)WithBatchSize(int): Максимальное количество событий, запрашиваемых из БД за один раз. (По умолчанию: 100)WithMaxAttempts(int): Максимальное количество попыток отправки события. (По умолчанию: 3)WithBackoffStrategy(BackoffStrategy): Стратегия вычисления задержки между повторными попытками.WithPublisher(Publisher): Позволяет указать собственную реализациюPublisher.WithLogger(*zap.Logger): Настройка логирования.WithStuckEventTimeout(time.Duration): Время, по истечении которого событие в статусе "в обработке" считается "зависшим". (По умолчанию: 10 минут)WithCleanupInterval(time.Duration): Интервал запуска воркера очистки. (По умолчанию: 1 час)WithSentEventsRetention(time.Duration): Как долго хранить успешно отправленные события. (По умолчанию: 24 часа)
По умолчанию используется KafkaPublisher. Его можно тонко настроить с помощью NewKafkaPublisherWithConfig.
kafkaConfig := outbox.DefaultKafkaConfig()
kafkaConfig.Topic = "my-default-topic"
kafkaConfig.ProducerProps["bootstrap.servers"] = "kafka1:9092,kafka2:9092"
publisher, err := outbox.NewKafkaPublisherWithConfig(logger, kafkaConfig)
if err != nil {
// ...
}
// Передача настроенного публикатора в диспетчер
dispatcher, err := outbox.NewDispatcher(db, outbox.WithPublisher(publisher))Topic (string): Имя топика по умолчанию, которое будет использоваться, если топик не указан в самом событии.ProducerProps (kafka.ConfigMap): Карта для настройки нативного Kafka-продюсера изconfluent-kafka-go. Позволяет задавать любые параметры, такие какbootstrap.servers,acks,compression.typeи т.д.HeaderBuilder (KafkaHeaderBuilder): Функция для создания заголовков Kafka-сообщения.
Headers Builder (KafkaHeaderBuilder) — это функция, которая преобразует метаданные события (event_id, event_type, aggregate_id, trace_id и т.д.) в нативные заголовки Kafka-сообщения.
Пример пользовательского конструктора заголовков:
func myCustomHeaderBuilder(record outbox.EventRecord) []kafka.Header {
// Начинаем с заголовков по умолчанию
headers := outbox.BuildKafkaHeaders(record)
// Добавляем пользовательский заголовок
headers = append(headers, kafka.Header{
Key: "X-Custom-Header",
Value: []byte("my-value"),
})
return headers
}
// Затем назначаем его в конфигурации
kafkaConfig := outbox.KafkaConfig{
// ...
HeaderBuilder: myCustomHeaderBuilder,
}Логика выбора топика для публикации сообщения следующая:
-
Приоритет у события: Если при создании события (
NewOutboxEvent) вы указали конкретный топик, сообщение будет отправлено именно в него.// Сообщение будет отправлено в топик "user-events" event, _ := NewOutboxEvent(..., "user-events", payload) SaveEvent(ctx, tx, event)
-
Топик по умолчанию: Если при создании события поле
Topicосталось пустым, будет использован топик по умолчанию, заданный вKafkaConfig.Topicпри конфигурацииKafkaPublisher.// Topic не указан, будет использован топик из KafkaConfig event, _ := NewOutboxEvent(..., "", payload) SaveEvent(ctx, tx, event)
Такой подход обеспечивает гибкость: вы можете как направлять все события в один общий топик, так и маршрутизировать их по разным топикам в зависимости от бизнес-логики.
В версии v2 изменилась схема таблицы outbox_events и outbox_deadletters. Поля trace_id и span_id были заменены одним универсальным полем headers типа JSON для большей гибкости при хранении метаданных.
Для ручного обновления схемы базы данных (на примере MySQL) выполните следующие запросы:
-
Добавьте колонки
headers:ALTER TABLE outbox_events ADD COLUMN headers JSON DEFAULT NULL AFTER payload; ALTER TABLE outbox_deadletters ADD COLUMN headers JSON DEFAULT NULL AFTER payload;
-
Удалите старые колонки:
ALTER TABLE outbox_events DROP COLUMN trace_id, DROP COLUMN span_id; ALTER TABLE outbox_deadletters DROP COLUMN trace_id, DROP COLUMN span_id;