Documentation
¶
Index ¶
- func MigrateUp(pool *pgxpool.Pool) error
- type DB
- type EnqueueOption
- type Job
- type JobQueue
- type JobQueueOption
- func WithBaseRetryDelay(d time.Duration) JobQueueOption
- func WithFIFO(isFIFO bool) JobQueueOption
- func WithLogger(logger *slog.Logger) JobQueueOption
- func WithMaxRetryDelay(d time.Duration) JobQueueOption
- func WithPollInterval(interval time.Duration) JobQueueOption
- func WithQueueName(name string) JobQueueOption
- type Querier
- type RetryPolicy
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DB ¶
type DB interface {
database.DBTX
// BeginTx creates a new (pgx) transaction
BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error)
}
DB is the abstraction for any (pgx) database connection with allowing to create transactions.
type EnqueueOption ¶
type EnqueueOption func(*enqueueConfig)
EnqueueOption defines a functional option for configuring job enqueuing.
func WithMaxRetries ¶
func WithMaxRetries(n int32) EnqueueOption
WithMaxRetries sets the maximum number of retries for the enqueued job.
func WithRetryPolicy ¶
func WithRetryPolicy(policy RetryPolicy) EnqueueOption
WithRetryPolicy sets the retry policy for the enqueued job. Possible policies are RetryPolicyConstant, RetryPolicyLinear, and RetryPolicyExponential.
type Job ¶
type Job[T any] struct { // ID is the database identifier for the job. ID int32 // Args contains the job payload of type T. Args T }
Job represents a queued job with its arguments and database ID.
type JobQueue ¶
type JobQueue[T any] struct { // contains filtered or unexported fields }
JobQueue manages the lifecycle of jobs in a named queue. It polls the database for scheduled jobs, executes them via a Worker, and handles retries with exponential backoff or other configured policies. Jobs are processed concurrently.
T is the type of the job arguments payload and must be JSON serializable.
JobQueues should be created using the New function. Example:
queue := jobqueue.New(db, &MyWorker{},
jobqueue.WithQueueName("my-queue"),
jobqueue.WithPollInterval(500*time.Millisecond),
)
func New ¶
func New[T any](db DB, worker Worker[T], opts ...JobQueueOption) *JobQueue[T]
New creates a new JobQueue with the given database connection and worker. Configure behavior using functional options like WithPollInterval, WithQueueName, etc.
func (*JobQueue[T]) Enqueue ¶
Enqueue adds a new job with the given arguments to the queue. Optional parameters can be set using EnqueueOption functions like WithMaxRetries and WithRetryPolicy. It returns the created Job or an error. Example:
job, err := queue.Enqueue(ctx, MyJobArgs{...},
jobqueue.WithMaxRetries(5),
jobqueue.WithRetryPolicy(jobqueue.RetryPolicyExponential),
)
func (*JobQueue[T]) Receive ¶
Receive starts polling the queue for scheduled jobs and processes them using the configured Worker. It runs until the provided context is cancelled. Errors during job processing are logged but do not stop the polling loop. Failed jobs are retried or marked as failed after the retry policy has been exhausted.
Example:
ctx, cancel := context.WithCancel(context.Background()) defer cancel() go queue.Receive(ctx) // Run for 10 minutes time.Sleep(10 * time.Minute) cancel()
type JobQueueOption ¶
type JobQueueOption func(*jobQueueConfig)
JobQueueOption defines a functional option for configuring a JobQueue.
func WithBaseRetryDelay ¶
func WithBaseRetryDelay(d time.Duration) JobQueueOption
WithBaseRetryDelay sets the base delay used for calculating retry backoff.
func WithFIFO ¶
func WithFIFO(isFIFO bool) JobQueueOption
WithFIFO configures the queue to process jobs in a first-in-first-out manner.
func WithLogger ¶
func WithLogger(logger *slog.Logger) JobQueueOption
WithLogger sets a custom logger for the JobQueue.
func WithMaxRetryDelay ¶
func WithMaxRetryDelay(d time.Duration) JobQueueOption
WithMaxRetryDelay sets the maximum delay allowed between retry attempts.
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) JobQueueOption
WithPollInterval sets the polling interval for checking the queue for new jobs.
func WithQueueName ¶
func WithQueueName(name string) JobQueueOption
WithQueueName sets the name of the job queue.
type RetryPolicy ¶
type RetryPolicy int
RetryPolicy defines the strategy for retrying failed jobs.
const ( // RetryPolicyConstant retries jobs with a constant delay. Formlar: // // next_delay = base_delay // RetryPolicyConstant RetryPolicy = iota // RetryPolicyLinear retries jobs with a linearly increasing delay. Formula: // // next_delay = base_delay * (attempt_number + 1) // RetryPolicyLinear // RetryPolicyExponential retries jobs with an exponentially increasing delay. Formula: // // next_delay = base_delay ^ (attempt_number + 1) // RetryPolicyExponential )
type Worker ¶
type Worker[T any] interface { // Work processes a single job. Returning an error triggers retry logic // based on the job's retry policy and max retry count. If max retries is // exceeded, the job is marked as failed. Work(ctx context.Context, job *Job[T]) error }
Worker processes jobs ob type T. Implementations should be idempotent as jobs may be retried multiple times on failure.