goqueue

package module
v0.0.0-...-2844660 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: GPL-3.0 Imports: 17 Imported by: 0

README

goqueue

License Go Report Card

A simple, efficient, and reliable queue implementation for Go with PostgreSQL backend support.

Table of Contents

Features

  • Concurrent Processing: Efficient worker-based job processing
  • FIFO Queues: First-In-First-Out job queues for strict ordering requirements
  • Retry Mechanism: Customizable retry logic for failed jobs
  • PostgreSQL Backend: Reliable, persistent job storage
  • Structured Logging: Built-in support for Go's slog package

Getting Started

Prerequisites
  • Go 1.21 or higher
  • PostgreSQL 13 or higher
  • sqlc for generating type-safe SQL bindings
  • Make for build automation
Installation
git clone https://github.com/flohansen/goqueue
cd goqueue

Development

Available Commands
Command Description
make generate Generate Go code from SQL files (required after any .sql file changes)
make test Run all unit tests
make test-it Run all integration tests
Testing
Unit Tests

Unit tests follow Go conventions and are located alongside implementation files:

.
├── queue.go
├── queue_test.go
└── ...

Run unit tests:

make test

Or manually:

go test ./... -cover -v
Integration Tests

Integration tests are located in tests/integration/ and require the integration build tag:

//go:build integration

Run integration tests:

make test-it

Or manually:

go test -tags=integration ./test/integration/... -v
Working with SQL
Migrations

SQL migration files are stored in sql/migrations/ and follow this naming convention:

<version>_<name>.[up|down].sql

Example: 000001_create_jobs_table.up.sql

Queries

SQL query files are stored in sql/queries/. Go bindings are automatically generated from these files using sqlc.

Important: After modifying any .sql file, regenerate the Go bindings:

make generate

Using Nix (Optional)

This repository includes a flake.nix for simplified dependency management. With Nix installed, you can:

Spawn a development shell with all dependencies:

nix develop

Or run commands directly:

nix develop --command make generate

Contributing

Contributions are welcome! Here's how you can help:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes and add tests
  4. Run tests to ensure everything works (make test && make test-it)
  5. Commit your changes (git commit -m 'feat: add amazing feature')
  6. Push to your branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

Please ensure your code follows Go best practices and includes appropriate tests.

License

This project is licensed under the GPL-3.0 License. See the LICENSE file for details.


Questions or Issues? Feel free to open an issue on GitHub.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MigrateUp

func MigrateUp(pool *pgxpool.Pool) error

MigrateUp applies all up migrations to the database connected via the given pgxpool.Pool.

Types

type DB

type DB interface {
	database.DBTX

	// BeginTx creates a new (pgx) transaction
	BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error)
}

DB is the abstraction for any (pgx) database connection with allowing to create transactions.

type EnqueueOption

type EnqueueOption func(*enqueueConfig)

EnqueueOption defines a functional option for configuring job enqueuing.

func WithMaxRetries

func WithMaxRetries(n int32) EnqueueOption

WithMaxRetries sets the maximum number of retries for the enqueued job.

func WithRetryPolicy

func WithRetryPolicy(policy RetryPolicy) EnqueueOption

WithRetryPolicy sets the retry policy for the enqueued job. Possible policies are RetryPolicyConstant, RetryPolicyLinear, and RetryPolicyExponential.

type Job

type Job[T any] struct {
	// ID is the database identifier for the job.
	ID int32

	// Args contains the job payload of type T.
	Args T
}

Job represents a queued job with its arguments and database ID.

type JobQueue

type JobQueue[T any] struct {
	// contains filtered or unexported fields
}

JobQueue manages the lifecycle of jobs in a named queue. It polls the database for scheduled jobs, executes them via a Worker, and handles retries with exponential backoff or other configured policies. Jobs are processed concurrently.

T is the type of the job arguments payload and must be JSON serializable.

JobQueues should be created using the New function. Example:

queue := jobqueue.New(db, &MyWorker{},
    jobqueue.WithQueueName("my-queue"),
    jobqueue.WithPollInterval(500*time.Millisecond),
)

func New

func New[T any](db DB, worker Worker[T], opts ...JobQueueOption) *JobQueue[T]

New creates a new JobQueue with the given database connection and worker. Configure behavior using functional options like WithPollInterval, WithQueueName, etc.

func (*JobQueue[T]) Enqueue

func (jq *JobQueue[T]) Enqueue(ctx context.Context, args T, opts ...EnqueueOption) (*Job[T], error)

Enqueue adds a new job with the given arguments to the queue. Optional parameters can be set using EnqueueOption functions like WithMaxRetries and WithRetryPolicy. It returns the created Job or an error. Example:

job, err := queue.Enqueue(ctx, MyJobArgs{...},
    jobqueue.WithMaxRetries(5),
    jobqueue.WithRetryPolicy(jobqueue.RetryPolicyExponential),
)

func (*JobQueue[T]) Receive

func (jq *JobQueue[T]) Receive(ctx context.Context)

Receive starts polling the queue for scheduled jobs and processes them using the configured Worker. It runs until the provided context is cancelled. Errors during job processing are logged but do not stop the polling loop. Failed jobs are retried or marked as failed after the retry policy has been exhausted.

Example:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go queue.Receive(ctx)

// Run for 10 minutes
time.Sleep(10 * time.Minute)
cancel()

type JobQueueOption

type JobQueueOption func(*jobQueueConfig)

JobQueueOption defines a functional option for configuring a JobQueue.

func WithBaseRetryDelay

func WithBaseRetryDelay(d time.Duration) JobQueueOption

WithBaseRetryDelay sets the base delay used for calculating retry backoff.

func WithFIFO

func WithFIFO(isFIFO bool) JobQueueOption

WithFIFO configures the queue to process jobs in a first-in-first-out manner.

func WithLogger

func WithLogger(logger *slog.Logger) JobQueueOption

WithLogger sets a custom logger for the JobQueue.

func WithMaxRetryDelay

func WithMaxRetryDelay(d time.Duration) JobQueueOption

WithMaxRetryDelay sets the maximum delay allowed between retry attempts.

func WithPollInterval

func WithPollInterval(interval time.Duration) JobQueueOption

WithPollInterval sets the polling interval for checking the queue for new jobs.

func WithQueueName

func WithQueueName(name string) JobQueueOption

WithQueueName sets the name of the job queue.

type Querier

type Querier interface {
	database.Querier
	WithTx(tx pgx.Tx) *database.Queries
}

type RetryPolicy

type RetryPolicy int

RetryPolicy defines the strategy for retrying failed jobs.

const (
	// RetryPolicyConstant retries jobs with a constant delay. Formlar:
	//
	//	next_delay = base_delay
	//
	RetryPolicyConstant RetryPolicy = iota

	// RetryPolicyLinear retries jobs with a linearly increasing delay. Formula:
	//
	//	next_delay = base_delay * (attempt_number + 1)
	//
	RetryPolicyLinear

	// RetryPolicyExponential retries jobs with an exponentially increasing delay. Formula:
	//
	//	next_delay = base_delay ^ (attempt_number + 1)
	//
	RetryPolicyExponential
)

type Worker

type Worker[T any] interface {
	// Work processes a single job. Returning an error triggers retry logic
	// based on the job's retry policy and max retry count. If max retries is
	// exceeded, the job is marked as failed.
	Work(ctx context.Context, job *Job[T]) error
}

Worker processes jobs ob type T. Implementations should be idempotent as jobs may be retried multiple times on failure.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL