batchwriter

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2025 License: MIT Imports: 13 Imported by: 0

README

BatchWriter

A thread-safe, batch processing writer for MongoDB with configurable batching, timeouts, and failure handling.

Features

  • Thread-safe: Safe for concurrent use across multiple goroutines
  • Configurable batching: Set maximum batch size and flush delays
  • Failure handling: Configurable sink for handling failed batches
  • Graceful shutdown: Proper cleanup and remaining item processing
  • MongoDB integration: Built specifically for MongoDB collections
  • Non-blocking first push: (v0.1.1) Push mirrors PushMany semantics—if there is room in the queue it enqueues even after shutdown has begun; cancellation is only honored when the call would block.

Testing Requirements

To run the full test suite, you need a MongoDB instance. The connection can be configured via environment variables.

Quick Setup (Interactive)

For first-time setup, you can use the interactive configuration script:

./setup-mongo.sh

This will guide you through setting up MongoDB configuration and save it to a .env file.

Environment Variables

The tests automatically load configuration from a .env file if present, or from environment variables:

  • MONGO_HOST - MongoDB host (default: localhost)
  • MONGO_PORT - MongoDB port (default: 27017)
  • MONGO_USERNAME - MongoDB username (default: testuser)
  • MONGO_PASSWORD - MongoDB password (default: testpass)
  • MONGO_AUTH_DATABASE - Authentication database (default: admin)
Setup Examples

Using .env file (recommended):

# Create .env file
cat > .env << EOF
MONGO_HOST=localhost
MONGO_PORT=27017
MONGO_USERNAME=testuser
MONGO_PASSWORD=testpass
EOF

# Tests automatically load .env file
go test ./...

Using Docker (recommended for testing):

# Start MongoDB with default credentials
docker run -d \
  --name mongo-test \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=testuser \
  -e MONGO_INITDB_ROOT_PASSWORD=testpass \
  mongo:7

# Run tests (will use .env or defaults)
go test ./...

Using custom MongoDB instance:

# Run tests with custom MongoDB configuration
MONGO_HOST=my-mongo-server \
MONGO_PORT=27018 \
MONGO_USERNAME=testuser \
MONGO_PASSWORD=testpass \
go test ./...

Using MongoDB Atlas or remote instance:

# For cloud MongoDB instances, you might need different auth setup
MONGO_HOST=cluster0.mongodb.net \
MONGO_PORT=27017 \
MONGO_USERNAME=myuser \
MONGO_PASSWORD=mypass \
MONGO_AUTH_DATABASE=admin \
go test ./...

If MongoDB is not available, the tests will be skipped automatically with a message indicating the connection details that were attempted.

Usage

package main

import (
    "context"
    "time"
    "github.com/blackorder/batchwriter"
    "go.mongodb.org/mongo-driver/v2/mongo"
    "go.mongodb.org/mongo-driver/v2/mongo/options"
)

type Document struct {
    ID   int    `bson:"_id"`
    Name string `bson:"name"`
}

func main() {
    // Connect to MongoDB
    client, err := mongo.Connect(options.Client().ApplyURI("mongodb://localhost:27017"))
    if err != nil {
        panic(err)
    }
    defer client.Disconnect(context.Background())
    
    collection := client.Database("mydb").Collection("documents")
    
    // Create shutdown context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Configure the writer
    cfg := batchwriter.Config{
        MaxBatch:  100,                    // Flush when 100 items accumulated
        MaxDelay:  5 * time.Second,        // Flush every 5 seconds
        QueueSize: 1000,                   // Queue up to 1000 items
        Workers:   4,                      // Use 4 worker goroutines
        Sink:      nil,                    // Optional: handle failed batches
    }
    
    // Create the writer
    writer, err := batchwriter.NewWriter[Document](ctx, collection, cfg)
    if err != nil {
        panic(err)
    }
    
    // Push documents
    doc := Document{ID: 1, Name: "example"}
    if err := writer.Push(context.Background(), doc); err != nil {
        // Handle error (item was dumped to sink if configured)
    }
    
    // Push multiple documents
    docs := []Document{
        {ID: 2, Name: "batch1"},
        {ID: 3, Name: "batch2"},
    }
    if err := writer.PushMany(context.Background(), docs); err != nil {
        // Handle error
    }
    
    // Graceful shutdown
    cancel() // Signal shutdown
    if err := writer.Close(context.Background()); err != nil {
        // Handle shutdown error
    }
}

Configuration

  • MaxBatch: Maximum number of items per batch (default: 100)
  • MaxDelay: Maximum time to wait before flushing (default: 5s)
  • QueueSize: Size of the internal queue (default: 1000)
  • Workers: Number of worker goroutines (default: 1)
  • Sink: Optional handler for failed batches

Thread Safety

The library is fully thread-safe. You can safely call Push() and PushMany() from multiple goroutines simultaneously.

Error Handling

When operations fail (timeouts, shutdown, database errors), items are sent to the configured Sink for custom handling. If no sink is configured, failed items are logged and discarded.

License

[Add your license here]

Documentation

Index

Constants

View Source
const (
	DuplicateKey           = 11000
	DuplicateKeyLegacy     = 11001
	DuplicateKeyUpdateConf = 12582
)

MongoDB error codes

View Source
const Version = "v0.1.1"

Version is the current library version. Update when making a public release.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	MaxBatch        int           // e.g. 500
	MaxDelay        time.Duration // e.g. 100 * time.Millisecond
	QueueSize       int           // e.g. 100_000
	Retries         int           // e.g. 5
	RetryBackoffMin time.Duration // e.g. 50 * time.Millisecond
	RetryBackoffMax time.Duration // e.g. 2 * time.Second
	RetryTimeout    time.Duration // e.g. 5 * time.Second
	Workers         int           // e.g. 2-4

	// Provide either a custom sink OR configure the built-in JSON sink.
	Sink     FailureSink[any] // if set, used as-is (preferred for custom formats)
	JSONDump *JSONDumpConfig  // if Sink is nil and JSONDump != nil, enable built-in JSON dumping

	Logger Logger // optional; if nil, no logging
}

type FailureSink

type FailureSink[T any] interface {
	// Dump must be non-blocking or fast; it's called in hot paths.
	// 'reason' examples: "insert_failed", "enqueue_after_shutdown", "shutdown".
	Dump(reason string, cause error, batch []T)
	Close() error
}

FailureSink receives items that could not be persisted (e.g., after retries) or that remained in the queue at shutdown. Implementations must be thread-safe.

type JSONDumpConfig

type JSONDumpConfig struct {
	Dir           string // required
	FilePrefix    string // e.g. "batchwriter"
	FsyncEvery    int    // 0=never; e.g. 200 flushes then Sync()
	RotateBytes   int64  // 0=disable rotation; else create new file when exceeded
	IncludeReason bool   // include _reason and _error fields in JSON
}

type Logger

type Logger interface {
	Debug(msg string)
	Info(msg string)
	Warn(msg string)
	Error(msg string)
	With(key string, value any) Logger
	Err(err error) Logger
}

type Writer

type Writer[T any] struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter[T any](shutdownCtx context.Context, coll *mongo.Collection, cfg Config) (*Writer[T], error)

Contract: cancel shutdownCtx to stop workers; after cancel, call Close(waitCtx) to wait for drain. If waitCtx times out, remaining items are dumped and Close returns waitCtx.Err(). NewWriter starts the background workers immediately.

func (*Writer[T]) Close

func (w *Writer[T]) Close(ctx context.Context) error

func (*Writer[T]) Push

func (w *Writer[T]) Push(ctx context.Context, evt T) error

Push enqueues one event. It blocks when the queue is full. If the writer's shutdown context or the call context is canceled, the item is dumped and the corresponding error is returned.

func (*Writer[T]) PushMany

func (w *Writer[T]) PushMany(ctx context.Context, evts []T) error

PushMany enqueues multiple events with backpressure. It blocks when the queue is full. If either the writer's shutdown context or the call context is canceled, the *remaining* items are dumped and the function returns the relevant error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL