outbox

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2025 License: MIT Imports: 7 Imported by: 0

README

Go Outbox Pattern

A robust, production-ready implementation of the transactional outbox pattern in Go.

Overview

The outbox pattern ensures reliable message delivery by persisting events in the same database transaction as your business data, then reliably delivering them to a message broker.

This implementation provides:

  • Reliable delivery with at-least-once semantics
  • Concurrent processing with configurable concurrency limits
  • Entity-based grouping for ordered processing per entity
  • Automatic retries on errors
  • Comprehensive test coverage (unit + integration)

Installation

go get github.com/aliaksandr-biarozka/go-outbox

Quick Start

package main

import (
    "context"
    "log/slog"
    
    "github.com/aliaksandr-biarozka/go-outbox"
)

// Define your event type
type MyEvent struct {
    ID        string
    EntityID  string
    Sequence  int64
    Payload   []byte
}

func (e *MyEvent) GetId() string        { return e.ID }
func (e *MyEvent) GetEntityId() string  { return e.EntityID }
func (e *MyEvent) GetSequence() int64   { return e.Sequence }

func main() {
    logger := slog.Default()
    
    // Implement Source and Destination interfaces for your event type
    source := &MyDatabaseSource{}
    destination := &MyMessageQueueDestination{}
    
    // Create outbox with your event type
    ob := outbox.New[*MyEvent](source, destination, outbox.Config{
        BatchSize:           30,  // Items to fetch per batch
        SleepSec:            5,   // Sleep when no items
        MaxConcurrentGroups: 30,  // Concurrent entity groups
    }, logger)
    
    // Run the outbox (blocks until context is cancelled)
    ctx := context.Background()
    if err := ob.Run(ctx); err != nil {
        logger.Error("outbox failed", "error", err)
    }
}

Core Interfaces

Item
type Item interface {
    GetEntityId() string  // Context-specific ID (userId, accountId, etc.)
    GetId() string        // Unique item ID (messageId, orderId, etc.)
    GetSequence() int64   // Monotonically increasing value for ordering
}

The GetSequence() method determines the processing order within each entity group. Common implementations:

  • return time.Now().UnixMilli() - Use timestamp in milliseconds
  • return event.CreatedAt.UnixMilli() - Use database timestamp
  • return event.Version - Use custom version/sequence number
  • return event.ID - Use auto-increment database ID
Source
type Source[T Item] interface {
    GetItems(ctx context.Context, batchSize int) ([]T, error)
    Acknowledge(ctx context.Context, item T) error
}

Source is generic and works with any type T that implements the Item interface.

Destination
type Destination[T Item] interface {
    Send(ctx context.Context, item T) error
}

Destination is generic and works with any type T that implements the Item interface.

Outbox
type Outbox[T Item] struct { ... }

func New[T Item](
    source Source[T],
    destination Destination[T],
    config Config,
    logger *slog.Logger,
) *Outbox[T]

The Outbox is generic and type-safe. When creating an outbox, specify your event type:

// For pointer types
outbox.New[*MyEvent](source, destination, config, logger)

// For value types
outbox.New[MyEvent](source, destination, config, logger)

All interfaces (Source, Destination) must use the same type T. This ensures compile-time type safety and eliminates runtime type assertions.

How It Works

  1. Fetch: Retrieves a batch of unsent items from the source (database)
  2. Group: Groups items by EntityId to ensure ordered processing per entity
  3. Send: Sends each item to the destination (message queue)
  4. Acknowledge: Marks the item as sent in the source after successful delivery
  5. Repeat: Continues until context is cancelled
Entity-Based Processing

Items are grouped by EntityId and processed concurrently:

  • Items for the same entity are processed sequentially in order of GetSequence() (maintains order)
  • Items for different entities are processed in parallel (up to MaxConcurrentGroups)

Ordering Guarantee: Within each entity group, items are sorted by GetSequence() before processing. This ensures that events are delivered in the exact order they were created, even if they're fetched in batches.

Configuration

type Config struct {
    BatchSize           int      // Number of items to fetch per batch (default: 30)
    SleepSec            int      // Sleep duration when no items (default: 5)
    MaxConcurrentGroups int      // Max concurrent entity groups (default: 30)
    Metrics             Metrics  // Optional metrics collector (default: nil)
}

Observability

Metrics

The outbox supports optional metrics collection through the Metrics interface:

type Metrics interface {
    IncProcessedItems()
    RecordBatchDuration(duration time.Duration, success bool)
}

Implement this interface with your preferred metrics backend (Prometheus, OpenTelemetry, StatsD, etc.):

// Example: Prometheus implementation
type prometheusMetrics struct {
    itemsProcessed   prometheus.Counter
    batchDuration    prometheus.Histogram
    batchesSucceeded prometheus.Counter
    batchesFailed    prometheus.Counter
}

func (m *prometheusMetrics) IncProcessedItems() {
    m.itemsProcessed.Inc()
}

func (m *prometheusMetrics) RecordBatchDuration(duration time.Duration, success bool) {
    m.batchDuration.Observe(duration.Seconds())
    if success {
        m.batchesSucceeded.Inc()
    } else {
        m.batchesFailed.Inc()
    }
}

// Use it
metrics := &prometheusMetrics{...}
outbox.New(source, dest, outbox.Config{
    BatchSize: 30,
    Metrics:   metrics,
}, logger)

The IncProcessedItems() is called once per successfully processed item (after both send and acknowledge). The RecordBatchDuration() is called once per batch with the processing time and success status.

Examples

See examples/simple/main.go for a complete example.

For production examples with PostgreSQL and RabbitMQ, see:

Testing

# Run unit tests (fast, no Docker needed)
make test-unit

# Run integration tests (requires Docker)
make test-integration

# Run all tests
make test

# Generate coverage report
make test-coverage

See TESTING.md for detailed testing documentation.

Architecture

The Outbox is interface-based and doesn't dictate how you implement Source or Destination. Use any database, message broker, or acknowledgment strategy.

The integration tests demonstrate one common approach (delete after send), but you can implement it however you need.

See ARCHITECTURE.md for implementation examples and design considerations.

Database Schema Example (from integration tests)
CREATE TABLE outbox_events (
    id VARCHAR(255) PRIMARY KEY,
    entity_id VARCHAR(255) NOT NULL,
    message TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_outbox_events_created_at ON outbox_events(created_at);

This is just one possible schema. Your implementation can use any structure that fits your needs.

Production Considerations

Error Handling
  • Failed sends stop processing for that entity group
  • Other entity groups continue processing independently
  • Failed items will be retried on the next fetch cycle
Concurrency
  • Limit MaxConcurrentGroups based on your database connection pool
  • Semaphore ensures controlled concurrency
  • Each entity group is processed sequentially
Monitoring
  • All operations are logged with structured logging (slog)
  • Each processing run gets a unique run_id for tracing
  • Failed operations are logged with full error details
Performance
  • Batch processing reduces database round-trips
  • Concurrent processing maximizes throughput
  • Small table size (delete approach) keeps queries fast

Dependencies

Contributing

Contributions are welcome! Please:

  1. Open an issue first to discuss changes
  2. Follow Go best practices and coding standards
  3. Add tests for new functionality
  4. Update documentation as needed

References

Documentation

Overview

Package outbox provides a robust implementation of the transactional outbox pattern for reliable message delivery between a database and a message broker.

The outbox pattern ensures at-least-once delivery semantics by:

  • Fetching unsent items from a persistent source (database)
  • Grouping items by entity ID for ordered processing
  • Sending items to a destination (message queue)
  • Marking items as sent after successful delivery

Example usage:

source := &MyDatabaseSource{}
destination := &MyMessageQueueDestination{}
config := outbox.Config{
    BatchSize:           30,
    SleepSec:            5,
    MaxConcurrentGroups: 30,
}
ob := outbox.New(source, destination, config, logger)
if err := ob.Run(ctx); err != nil {
    log.Fatal(err)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// BatchSize is the number of items to fetch from the source in each iteration.
	// Default: 30
	BatchSize int

	// SleepSec is the number of seconds to sleep when no items are available.
	// This prevents busy-waiting and reduces database load.
	// Default: 5
	SleepSec int

	// MaxConcurrentGroups is the maximum number of entity groups that can be processed concurrently.
	// Higher values increase throughput but also increase resource usage.
	// Should not exceed your database connection pool size.
	// Default: 30
	MaxConcurrentGroups int

	// Metrics is an optional metrics collector for observability.
	// If nil, no metrics are recorded.
	Metrics Metrics
}

Config holds configuration parameters for the outbox processor. Zero values will be replaced with sensible defaults.

type Destination

type Destination[T Item] interface {
	// Send sends a single item to the destination.
	// If this fails, the item will remain in the source and be retried on the next outbox iteration.
	Send(ctx context.Context, item T) error
}

Destination represents a target system (typically a message queue or API) where items are sent after being retrieved from the source.

Implementation notes:

  • Send should be idempotent when possible (same item sent twice should not cause issues)
  • Should handle context cancellation and return descriptive errors

type Item

type Item interface {
	// GetEntityId returns a context-specific identifier used for grouping items.
	// Examples: userId, accountId, tenantId, orderId
	// Items with the same entity ID are processed sequentially to maintain order.
	GetEntityId() string

	// GetId returns a unique identifier for this specific item.
	// Examples: messageId, eventId, transactionId
	// This is used for logging and tracking individual item processing.
	GetId() string

	// GetSequence returns a monotonically increasing value used for ordering items.
	// Items are processed in ascending sequence order within each entity group.
	// Common implementations:
	//   - Unix timestamp in milliseconds: time.Now().UnixMilli()
	//   - Database auto-increment ID
	//   - Custom version/sequence number
	// Items with the same sequence are processed in arbitrary order.
	GetSequence() int64
}

Item represents a single unit of work in the outbox pattern. Each item belongs to an entity (for grouping) and has a unique identifier.

Items with the same EntityId are processed sequentially to maintain ordering, while items with different EntityIds can be processed concurrently.

type Metrics

type Metrics interface {
	// IncProcessedItems increments the counter of successfully processed items by 1.
	// Called once per item after successful send and acknowledge.
	IncProcessedItems()

	// RecordBatchDuration records the time taken to process a batch of items.
	// The success parameter indicates whether the batch was fully processed without errors.
	RecordBatchDuration(duration time.Duration, success bool)
}

Metrics provides observability into outbox processing operations. Implement this interface to collect metrics using your preferred backend (Prometheus, OpenTelemetry, StatsD, etc.).

type Outbox

type Outbox[T Item] struct {
	// contains filtered or unexported fields
}

Outbox is the main processor that coordinates fetching items from a source, sending them to a destination, and tracking success/failure.

Items are automatically grouped by entity ID and processed concurrently up to MaxConcurrentGroups. Items within the same entity group are processed sequentially to maintain ordering guarantees.

func New

func New[T Item](source Source[T], destination Destination[T], config Config, logger *slog.Logger) *Outbox[T]

New creates a new Outbox processor with the given source, destination, configuration, and logger.

The source provides items to process, the destination receives the items, and the logger is used for structured logging of all operations.

Config values of 0 or negative will be replaced with defaults:

  • BatchSize: 30
  • SleepSec: 5
  • MaxConcurrentGroups: 30

Example:

ob := outbox.New(dbSource, mqDestination, outbox.Config{
    BatchSize:           50,
    SleepSec:            10,
    MaxConcurrentGroups: 20,
}, slog.Default())

func (*Outbox[T]) Run

func (o *Outbox[T]) Run(ctx context.Context) error

Run starts the outbox processor and runs until the context is cancelled.

The processor continuously:

  1. Fetches a batch of items from the source
  2. Groups items by entity ID
  3. Sorts items within each group by sequence (ascending order)
  4. Processes each group concurrently (up to MaxConcurrentGroups)
  5. Sends each item to the destination (one attempt per iteration)
  6. Acknowledges successfully sent items in the source
  7. Sleeps for SleepSec when no items are available or on errors

Ordering guarantees:

  • Items with the same entity ID are processed sequentially in sequence order
  • Items with different entity IDs can be processed concurrently
  • Sequence is determined by Item.GetSequence() (typically timestamp or version number)

Error handling:

  • GetItems errors trigger a sleep and retry (prevents hammering the database)
  • Send errors stop processing for that entity group; failed items remain in source
  • Acknowledge errors stop processing for that entity group to prevent duplicate sends
  • Failed items are automatically retried after SleepSec delay
  • Context cancellation returns ctx.Err()

This method blocks until ctx is cancelled. It's designed to run as a long-lived service.

Example:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Handle graceful shutdown
go func() {
    <-sigterm
    cancel()
}()

if err := outbox.Run(ctx); err != context.Canceled {
    log.Printf("outbox stopped with error: %v", err)
}

type Source

type Source[T Item] interface {
	// GetItems retrieves a batch of unprocessed items from the source.
	// It should return an empty slice when no items are available.
	GetItems(ctx context.Context, batchSize int) ([]T, error)

	// Acknowledge confirms that an item was successfully sent to the destination.
	// Typical implementations:
	//   - DELETE FROM outbox WHERE id = ? (recommended for performance)
	//   - UPDATE outbox SET sent_at = NOW() WHERE id = ? (for audit trails)
	Acknowledge(ctx context.Context, item T) error
}

Source represents a persistent storage (typically a database) that provides items to be processed by the outbox. Items are fetched in batches and removed after successful delivery to the destination.

Implementation notes:

  • GetItems should return items ordered by creation time for predictable processing
  • Acknowledge should either DELETE the item (recommended) or mark it as sent with UPDATE
  • Both methods should handle context cancellation gracefully

Directories

Path Synopsis
examples
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL