Documentation
¶
Index ¶
- Constants
- Variables
- func CalculateNextRetry(currentRetry int, strategy string, initialDelay, maxDelay time.Duration, ...) time.Time
- func GenerateCorrelationID() string
- func GenerateDedupeKey(taskType TaskType, payload []byte) string
- func GetPriorityWeight(priority Priority) int
- func IsTerminalStatus(status TaskStatus) bool
- func IsValidPriority(priority Priority) bool
- func IsValidTaskStatus(status TaskStatus) bool
- func IsValidTaskType(taskType TaskType) bool
- type APIConfig
- type APIKeyConfig
- type AppConfig
- type AuthConfig
- type AuthType
- type BatchPayload
- type BatchTask
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type Config
- type DataOperation
- type DataOperationType
- type DataProcessPayload
- type DataSourceConfig
- type DataSourceType
- type DataTargetConfig
- type DataTargetType
- type EmailAttachment
- type EmailPayload
- type EncryptionConfig
- type ErrorCode
- type Field
- type ImageOperation
- type ImageOperationType
- type ImageProcessPayload
- type JWTConfig
- type LimitConfig
- type Logger
- type LoggingConfig
- type MetricsCollector
- type MetricsConfig
- type PostgresConfig
- type PrefixedIDGenerator
- type Priority
- type QueueBackend
- type QueueConfig
- type QueueStats
- type RateLimitConfig
- type RateLimiter
- type RedisConfig
- type ScheduledPayload
- type Scheduler
- type SchedulerConfig
- type SecurityConfig
- type ServiceCircuitConfig
- type TLSConfig
- type Task
- func (t *Task) Age() time.Duration
- func (t *Task) CanRetry() bool
- func (t *Task) Clone() *Task
- func (t *Task) Duration() time.Duration
- func (t *Task) IsExpired() bool
- func (t *Task) QueueTime() time.Duration
- func (t *Task) SetPayload(v interface{}) error
- func (t *Task) UnmarshalPayload(v interface{}) error
- type TaskBuilder
- func (b *TaskBuilder) Build() (*Task, error)
- func (b *TaskBuilder) MustBuild() *Task
- func (b *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder
- func (b *TaskBuilder) WithDeadline(deadline time.Time) *TaskBuilder
- func (b *TaskBuilder) WithDedupeKey(key string) *TaskBuilder
- func (b *TaskBuilder) WithID(id string) *TaskBuilder
- func (b *TaskBuilder) WithMaxRetries(maxRetries int) *TaskBuilder
- func (b *TaskBuilder) WithMetadata(key string, value interface{}) *TaskBuilder
- func (b *TaskBuilder) WithPayload(payload interface{}) *TaskBuilder
- func (b *TaskBuilder) WithPriority(priority Priority) *TaskBuilder
- func (b *TaskBuilder) WithQueue(queue string) *TaskBuilder
- func (b *TaskBuilder) WithRawPayload(payload json.RawMessage) *TaskBuilder
- func (b *TaskBuilder) WithScheduledAt(scheduledAt time.Time) *TaskBuilder
- func (b *TaskBuilder) WithTenantID(tenantID string) *TaskBuilder
- func (b *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder
- type TaskError
- type TaskIDGenerator
- type TaskOptions
- type TaskProcessor
- type TaskResult
- type TaskStatus
- type TaskTransformer
- type TaskType
- type TaskValidator
- type UUIDGenerator
- type ValidationError
- type ValidationErrors
- type WebhookPayload
- type Worker
- type WorkerConfig
- type WorkerInfo
- type WorkerStatus
Constants ¶
const ( // Default retry settings DefaultMaxRetries = 3 DefaultRetryBackoff = "exponential" // exponential, linear, fixed DefaultInitialDelay = "1s" DefaultMaxDelay = "5m" DefaultBackoffFactor = 2.0 // Default timeouts DefaultTaskTimeout = "5m" DefaultDequeueTimeout = "30s" DefaultShutdownTimeout = "30s" // Default queue settings DefaultQueueName = "default" DefaultPriority = PriorityNormal DefaultBatchSize = 10 DefaultMaxQueueSize = 10000 // Default worker settings DefaultWorkerConcurrency = 5 DefaultHeartbeatInterval = "30s" DefaultWorkerTimeout = "5m" // Circuit breaker defaults DefaultCircuitBreakerThreshold = 5 // failures before opening DefaultCircuitBreakerTimeout = "60s" // how long to stay open DefaultCircuitBreakerMaxRequests = 3 // max requests in half-open state // Rate limiting defaults DefaultRateLimit = 100 // requests per second DefaultRateBurst = 10 // burst size // Monitoring defaults DefaultMetricsPort = 9090 DefaultHealthPath = "/health" DefaultMetricsPath = "/metrics" DefaultReadyPath = "/ready" )
Constants for default values
Variables ¶
var ( // Task errors ErrTaskNotFound = errors.New("task not found") ErrTaskInvalidStatus = errors.New("invalid task status") ErrTaskTimeout = errors.New("task execution timeout") ErrTaskCancelled = errors.New("task was cancelled") ErrTaskDeadline = errors.New("task deadline exceeded") ErrTaskRetryExceeded = errors.New("maximum retries exceeded") ErrTaskDuplicate = errors.New("duplicate task detected") // Queue errors ErrQueueNotFound = errors.New("queue not found") ErrQueueFull = errors.New("queue is full") ErrQueueEmpty = errors.New("queue is empty") ErrQueueClosed = errors.New("queue is closed") // Worker errors ErrWorkerNotFound = errors.New("worker not found") ErrWorkerOffline = errors.New("worker is offline") ErrWorkerOverloaded = errors.New("worker is overloaded") ErrWorkerShutdown = errors.New("worker is shutting down") // Processor errors ErrProcessorNotFound = errors.New("no processor found for task type") ErrProcessorFailed = errors.New("task processor failed") // Validation errors ErrInvalidPayload = errors.New("invalid task payload") ErrInvalidPriority = errors.New("invalid task priority") ErrInvalidTaskType = errors.New("invalid task type") ErrMissingRequired = errors.New("missing required field") // Backend errors ErrBackendTimeout = errors.New("backend operation timeout") ErrBackendConnection = errors.New("backend connection error") // Rate limiting errors ErrRateLimitExceeded = errors.New("rate limit exceeded") // Circuit breaker errors ErrCircuitBreakerOpen = errors.New("circuit breaker is open") )
Common error variables
var QueuePriorities = []Priority{ PriorityCritical, PriorityHigh, PriorityNormal, PriorityLow, }
QueuePriorities defines the processing order for different priority levels
var RetryableErrorCodes = []ErrorCode{ ErrorCodeTimeout, ErrorCodeNetworkError, ErrorCodeBackendUnavailable, ErrorCodeRateLimited, ErrorCodeCircuitOpen, ErrorCodeWorkerOverloaded, ErrorCodeTemporaryFailure, }
RetryableErrorCodes lists all error codes that should trigger retries
var SupportedTaskTypes = []TaskType{ TaskTypeWebhook, TaskTypeEmail, TaskTypeImageProcess, TaskTypeDataProcess, TaskTypeScheduled, TaskTypeBatch, }
SupportedTaskTypes lists all supported task types
Functions ¶
func CalculateNextRetry ¶
func CalculateNextRetry(currentRetry int, strategy string, initialDelay, maxDelay time.Duration, factor float64) time.Time
CalculateNextRetry calculates the next retry time based on backoff strategy
func GenerateCorrelationID ¶
func GenerateCorrelationID() string
GenerateCorrelationID generates a correlation ID for request tracing
func GenerateDedupeKey ¶
GenerateDedupeKey generates a deduplication key based on task content
func GetPriorityWeight ¶
GetPriorityWeight returns a numeric weight for priority-based sorting
func IsTerminalStatus ¶
func IsTerminalStatus(status TaskStatus) bool
IsTerminalStatus checks if a task status is terminal (won't change)
func IsValidPriority ¶
IsValidPriority checks if a priority is valid
func IsValidTaskStatus ¶
func IsValidTaskStatus(status TaskStatus) bool
IsValidTaskStatus checks if a task status is valid
func IsValidTaskType ¶
IsValidTaskType checks if a task type is valid
Types ¶
type APIConfig ¶
type APIConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
TLS TLSConfig `json:"tls" yaml:"tls"`
// Timeouts
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
IdleTimeout time.Duration `json:"idle_timeout" yaml:"idle_timeout"`
// Middleware
EnableCORS bool `json:"enable_cors" yaml:"enable_cors"`
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
EnableAuth bool `json:"enable_auth" yaml:"enable_auth"`
// Rate limiting
RateLimit int `json:"rate_limit" yaml:"rate_limit"` // requests per second
RateBurst int `json:"rate_burst" yaml:"rate_burst"` // burst size
}
APIConfig contains API server configuration
type APIKeyConfig ¶
type APIKeyConfig struct {
HeaderName string `json:"header_name" yaml:"header_name"`
Keys []string `json:"keys" yaml:"keys"`
}
APIKeyConfig contains API key configuration
type AppConfig ¶
type AppConfig struct {
Name string `json:"name" yaml:"name"`
Version string `json:"version" yaml:"version"`
Environment string `json:"environment" yaml:"environment"` // dev, staging, prod
Debug bool `json:"debug" yaml:"debug"`
}
AppConfig contains general application settings
type AuthConfig ¶
type AuthConfig struct {
Type AuthType `json:"type" yaml:"type"`
Enabled bool `json:"enabled" yaml:"enabled"`
Required bool `json:"required" yaml:"required"`
}
AuthConfig contains authentication configuration
type BatchPayload ¶
type BatchPayload struct {
Tasks []BatchTask `json:"tasks" validate:"required,min=1"`
Sequential bool `json:"sequential,omitempty"` // Process tasks in sequence
StopOnError bool `json:"stop_on_error,omitempty"` // Stop batch if any task fails
// Progress tracking
CallbackURL string `json:"callback_url,omitempty"` // Progress webhook
CallbackHeaders map[string]string `json:"callback_headers,omitempty"`
}
BatchPayload represents the payload for batch operations
type BatchTask ¶
type BatchTask struct {
ID string `json:"id"` // Unique ID within batch
Type TaskType `json:"type" validate:"required"`
Payload interface{} `json:"payload" validate:"required"`
Priority Priority `json:"priority,omitempty"`
Options TaskOptions `json:"options,omitempty"`
}
BatchTask represents a single task within a batch
type CircuitBreaker ¶
type CircuitBreaker interface {
// Execute runs the function with circuit breaker protection
Execute(fn func() error) error
// State returns the current circuit breaker state
State() CircuitBreakerState
// Reset manually resets the circuit breaker
Reset()
}
CircuitBreaker defines the interface for circuit breaker pattern
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Threshold int `json:"threshold" yaml:"threshold"` // failures before opening
Timeout time.Duration `json:"timeout" yaml:"timeout"` // how long to stay open
MaxRequests int `json:"max_requests" yaml:"max_requests"` // max requests in half-open
ResetTimeout time.Duration `json:"reset_timeout" yaml:"reset_timeout"` // time to reset counters
// Per-service settings
Services map[string]ServiceCircuitConfig `json:"services" yaml:"services"`
}
CircuitBreakerConfig contains circuit breaker configuration
type CircuitBreakerState ¶
type CircuitBreakerState string
CircuitBreakerState represents the state of a circuit breaker
const ( CircuitBreakerClosed CircuitBreakerState = "closed" // Normal operation CircuitBreakerOpen CircuitBreakerState = "open" // Failing fast CircuitBreakerHalfOpen CircuitBreakerState = "half_open" // Testing if service recovered )
type Config ¶
type Config struct {
// Application settings
App AppConfig `json:"app" yaml:"app"`
// Queue backend configuration
Queue QueueConfig `json:"queue" yaml:"queue"`
// Worker configuration
Worker WorkerConfig `json:"worker" yaml:"worker"`
// Scheduler configuration
Scheduler SchedulerConfig `json:"scheduler" yaml:"scheduler"`
// API server configuration
API APIConfig `json:"api" yaml:"api"`
// Monitoring and observability
Metrics MetricsConfig `json:"metrics" yaml:"metrics"`
Logging LoggingConfig `json:"logging" yaml:"logging"`
// Circuit breaker configuration
CircuitBreaker CircuitBreakerConfig `json:"circuit_breaker" yaml:"circuit_breaker"`
// Rate limiting configuration
RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"`
// Security configuration
Security SecurityConfig `json:"security" yaml:"security"`
}
Config represents the main configuration for TaskForge
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a configuration with sensible defaults
type DataOperation ¶
type DataOperation struct {
Type DataOperationType `json:"type" validate:"required"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
DataOperation represents a data transformation operation
type DataOperationType ¶
type DataOperationType string
DataOperationType defines available data operations
const ( DataOpFilter DataOperationType = "filter" // Filter rows DataOpTransform DataOperationType = "transform" // Transform columns DataOpAggregate DataOperationType = "aggregate" // Group and aggregate DataOpJoin DataOperationType = "join" // Join with other data DataOpSort DataOperationType = "sort" // Sort data DataOpDedupe DataOperationType = "dedupe" // Remove duplicates DataOpValidate DataOperationType = "validate" // Validate data quality )
type DataProcessPayload ¶
type DataProcessPayload struct {
SourceType DataSourceType `json:"source_type" validate:"required"`
SourceConfig DataSourceConfig `json:"source_config" validate:"required"`
TargetType DataTargetType `json:"target_type" validate:"required"`
TargetConfig DataTargetConfig `json:"target_config" validate:"required"`
Operations []DataOperation `json:"operations,omitempty"`
// Processing options
BatchSize int `json:"batch_size,omitempty"` // Records per batch
MaxErrors int `json:"max_errors,omitempty"` // Max errors before failing
// Progress tracking
CallbackURL string `json:"callback_url,omitempty"` // Progress updates
}
DataProcessPayload represents the payload for data processing tasks
type DataSourceConfig ¶
type DataSourceConfig struct {
URL string `json:"url,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"` // SQL query for database
Headers map[string]string `json:"headers,omitempty"` // HTTP headers for API
Auth AuthConfig `json:"auth,omitempty"` // Authentication config
Parameters map[string]interface{} `json:"parameters,omitempty"` // Source-specific params
}
DataSourceConfig contains source-specific configuration
type DataSourceType ¶
type DataSourceType string
DataSourceType defines supported data sources
const ( DataSourceCSV DataSourceType = "csv" DataSourceJSON DataSourceType = "json" DataSourceDatabase DataSourceType = "database" DataSourceS3 DataSourceType = "s3" DataSourceAPI DataSourceType = "api" )
type DataTargetConfig ¶
type DataTargetConfig struct {
URL string `json:"url,omitempty"`
Path string `json:"path,omitempty"`
Table string `json:"table,omitempty"` // Database table
Headers map[string]string `json:"headers,omitempty"` // HTTP headers for API
Auth AuthConfig `json:"auth,omitempty"` // Authentication config
Parameters map[string]interface{} `json:"parameters,omitempty"` // Target-specific params
}
DataTargetConfig contains target-specific configuration
type DataTargetType ¶
type DataTargetType string
DataTargetType defines supported data targets
const ( DataTargetCSV DataTargetType = "csv" DataTargetJSON DataTargetType = "json" DataTargetDatabase DataTargetType = "database" DataTargetS3 DataTargetType = "s3" DataTargetAPI DataTargetType = "api" )
type EmailAttachment ¶
type EmailAttachment struct {
Filename string `json:"filename" validate:"required"`
ContentType string `json:"content_type,omitempty"`
Content []byte `json:"content,omitempty"` // Base64 encoded content
ContentURL string `json:"content_url,omitempty"` // URL to fetch content
Size int64 `json:"size,omitempty"` // Size in bytes
}
EmailAttachment represents an email attachment
type EmailPayload ¶
type EmailPayload struct {
To []string `json:"to" validate:"required,min=1"`
CC []string `json:"cc,omitempty"`
BCC []string `json:"bcc,omitempty"`
From string `json:"from" validate:"required,email"`
ReplyTo string `json:"reply_to,omitempty"`
Subject string `json:"subject" validate:"required"`
// Content
TextBody string `json:"text_body,omitempty"`
HTMLBody string `json:"html_body,omitempty"`
TemplateID string `json:"template_id,omitempty"` // Email template identifier
TemplateData map[string]interface{} `json:"template_data,omitempty"` // Data for template rendering
// Attachments
Attachments []EmailAttachment `json:"attachments,omitempty"`
// Email provider settings
Provider string `json:"provider,omitempty"` // sendgrid, ses, smtp, etc.
Tags []string `json:"tags,omitempty"` // For categorization/analytics
Metadata map[string]string `json:"metadata,omitempty"` // Provider-specific metadata
// Tracking
TrackOpens bool `json:"track_opens,omitempty"`
TrackClicks bool `json:"track_clicks,omitempty"`
}
EmailPayload represents the payload for email processing tasks
type EncryptionConfig ¶
type EncryptionConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Algorithm string `json:"algorithm" yaml:"algorithm"` // AES-256-GCM
KeyFile string `json:"key_file" yaml:"key_file"`
}
EncryptionConfig contains encryption configuration
type ErrorCode ¶
type ErrorCode string
ErrorCode represents specific error types with retry behavior
const ( // Retryable errors (temporary failures) ErrorCodeTimeout ErrorCode = "timeout" ErrorCodeNetworkError ErrorCode = "network_error" ErrorCodeRateLimited ErrorCode = "rate_limited" ErrorCodeCircuitOpen ErrorCode = "circuit_open" ErrorCodeWorkerOverloaded ErrorCode = "worker_overloaded" ErrorCodeTemporaryFailure ErrorCode = "temporary_failure" // Non-retryable errors (permanent failures) ErrorCodeInvalidPayload ErrorCode = "invalid_payload" ErrorCodeValidationFailed ErrorCode = "validation_failed" ErrorCodeForbidden ErrorCode = "forbidden" ErrorCodeNotFound ErrorCode = "not_found" ErrorCodeDuplicateTask ErrorCode = "duplicate_task" ErrorCodeDeadlineExceeded ErrorCode = "deadline_exceeded" ErrorCodeCancelled ErrorCode = "cancelled" ErrorCodePermanentFailure ErrorCode = "permanent_failure" // System errors ErrorCodeInternalError ErrorCode = "internal_error" ErrorCodeConfigError ErrorCode = "config_error" ErrorCodeUnknownError ErrorCode = "unknown_error" )
func (ErrorCode) IsRetryable ¶
IsRetryable returns true if the error code indicates a retryable failure
type Field ¶
type Field struct {
Key string
Value interface{}
}
Field represents a structured log field
type ImageOperation ¶
type ImageOperation struct {
Type ImageOperationType `json:"type" validate:"required"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
}
ImageOperation represents a single image processing operation
type ImageOperationType ¶
type ImageOperationType string
ImageOperationType defines available image operations
const ( ImageOpResize ImageOperationType = "resize" // Resize image ImageOpCrop ImageOperationType = "crop" // Crop image ImageOpWatermark ImageOperationType = "watermark" // Add watermark ImageOpFilter ImageOperationType = "filter" // Apply filters (blur, sharpen, etc.) ImageOpRotate ImageOperationType = "rotate" // Rotate image ImageOpFlip ImageOperationType = "flip" // Flip horizontal/vertical ImageOpCompress ImageOperationType = "compress" // Compress/optimize )
type ImageProcessPayload ¶
type ImageProcessPayload struct {
SourceURL string `json:"source_url,omitempty"` // URL to source image
SourceData []byte `json:"source_data,omitempty"` // Raw image data
SourcePath string `json:"source_path,omitempty"` // File system path
Operations []ImageOperation `json:"operations" validate:"required,min=1"`
OutputPath string `json:"output_path,omitempty"` // Where to save result
OutputURL string `json:"output_url,omitempty"` // Where to upload result
// Processing options
Quality int `json:"quality,omitempty"` // JPEG quality (1-100)
Format string `json:"format,omitempty"` // Output format (jpg, png, webp)
Progressive bool `json:"progressive,omitempty"` // Progressive JPEG
// Metadata preservation
PreserveMetadata bool `json:"preserve_metadata,omitempty"`
// Callback
CallbackURL string `json:"callback_url,omitempty"` // Webhook when complete
}
ImageProcessPayload represents the payload for image processing tasks
type JWTConfig ¶
type JWTConfig struct {
SecretKey string `json:"secret_key" yaml:"secret_key"`
ExpirationTime time.Duration `json:"expiration_time" yaml:"expiration_time"`
Issuer string `json:"issuer" yaml:"issuer"`
Audience string `json:"audience" yaml:"audience"`
}
JWTConfig contains JWT configuration
type LimitConfig ¶
type LimitConfig struct {
Limit int `json:"limit" yaml:"limit"` // requests per second
Burst int `json:"burst" yaml:"burst"` // burst size
}
LimitConfig contains specific rate limit settings
type Logger ¶
type Logger interface {
Debug(msg string, fields ...Field)
Info(msg string, fields ...Field)
Warn(msg string, fields ...Field)
Error(msg string, fields ...Field)
With(fields ...Field) Logger
}
Logger defines the interface for structured logging
type LoggingConfig ¶
type LoggingConfig struct {
Level string `json:"level" yaml:"level"` // debug, info, warn, error
Format string `json:"format" yaml:"format"` // json, text
Output string `json:"output" yaml:"output"` // stdout, stderr, file
File string `json:"file" yaml:"file"` // log file path
MaxSize int `json:"max_size" yaml:"max_size"` // MB
MaxBackups int `json:"max_backups" yaml:"max_backups"`
MaxAge int `json:"max_age" yaml:"max_age"` // days
Compress bool `json:"compress" yaml:"compress"`
// Structured logging fields
Fields map[string]interface{} `json:"fields" yaml:"fields"`
}
LoggingConfig contains logging configuration
type MetricsCollector ¶
type MetricsCollector interface {
// Task metrics
RecordTaskEnqueued(taskType TaskType, priority Priority, queue string)
RecordTaskStarted(taskType TaskType, priority Priority, queue string)
RecordTaskCompleted(taskType TaskType, priority Priority, queue string, duration time.Duration)
RecordTaskFailed(taskType TaskType, priority Priority, queue string, duration time.Duration, errorType string)
// Queue metrics
UpdateQueueDepth(queue string, depth int64)
UpdateActiveWorkers(queue string, count int64)
// Worker metrics
RecordWorkerRegistered(workerID string, queues []string)
RecordWorkerUnregistered(workerID string)
UpdateWorkerStatus(workerID string, status WorkerStatus)
// Circuit breaker metrics
RecordCircuitBreakerOpen(service string)
RecordCircuitBreakerClosed(service string)
RecordCircuitBreakerHalfOpen(service string)
}
MetricsCollector defines the interface for metrics collection
type MetricsConfig ¶
type MetricsConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Port int `json:"port" yaml:"port"`
Path string `json:"path" yaml:"path"`
// Prometheus settings
Namespace string `json:"namespace" yaml:"namespace"`
Subsystem string `json:"subsystem" yaml:"subsystem"`
// Collection intervals
CollectInterval time.Duration `json:"collect_interval" yaml:"collect_interval"`
// Health check endpoints
HealthPath string `json:"health_path" yaml:"health_path"`
ReadyPath string `json:"ready_path" yaml:"ready_path"`
}
MetricsConfig contains monitoring configuration
type PostgresConfig ¶
type PostgresConfig struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Database string `json:"database" yaml:"database"`
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
SSLMode string `json:"ssl_mode" yaml:"ssl_mode"`
MaxConnections int `json:"max_connections" yaml:"max_connections"`
MaxIdleTime time.Duration `json:"max_idle_time" yaml:"max_idle_time"`
MaxLifetime time.Duration `json:"max_lifetime" yaml:"max_lifetime"`
ConnectTimeout time.Duration `json:"connect_timeout" yaml:"connect_timeout"`
}
PostgresConfig contains PostgreSQL-specific configuration
type PrefixedIDGenerator ¶
type PrefixedIDGenerator struct {
Prefix string
}
PrefixedIDGenerator generates IDs with a prefix
func (*PrefixedIDGenerator) Generate ¶
func (g *PrefixedIDGenerator) Generate() string
type QueueBackend ¶
type QueueBackend interface {
// Task operations
Enqueue(ctx context.Context, task *Task) error
Dequeue(ctx context.Context, queue string, timeout time.Duration) (*Task, error)
Ack(ctx context.Context, taskID string) error
Nack(ctx context.Context, taskID string, reason string) error
// Batch operations for efficiency
EnqueueBatch(ctx context.Context, tasks []*Task) error
DequeueBatch(ctx context.Context, queue string, count int, timeout time.Duration) ([]*Task, error)
// Task management
GetTask(ctx context.Context, taskID string) (*Task, error)
UpdateTask(ctx context.Context, task *Task) error
DeleteTask(ctx context.Context, taskID string) error
// Queue management
GetQueueStats(ctx context.Context, queue string) (*QueueStats, error)
ListQueues(ctx context.Context) ([]string, error)
PurgeQueue(ctx context.Context, queue string) error
// Dead letter queue operations
MoveToDLQ(ctx context.Context, taskID string, reason string) error
RequeueFromDLQ(ctx context.Context, taskID string) error
// Retry and scheduling
ScheduleRetry(ctx context.Context, taskID string, retryAt time.Time) error
GetScheduledTasks(ctx context.Context, before time.Time, limit int) ([]*Task, error)
// Health and monitoring
HealthCheck(ctx context.Context) error
Close() error
}
QueueBackend defines the interface for different queue implementations This allows us to swap between Redis, PostgreSQL, NATS, etc.
type QueueConfig ¶
type QueueConfig struct {
Backend string `json:"backend" yaml:"backend"` // redis, postgres, nats
URL string `json:"url" yaml:"url"` // Connection URL
MaxRetries int `json:"max_retries" yaml:"max_retries"` // Connection retries
Timeout time.Duration `json:"timeout" yaml:"timeout"` // Operation timeout
// Redis-specific settings
Redis RedisConfig `json:"redis" yaml:"redis"`
// PostgreSQL-specific settings
Postgres PostgresConfig `json:"postgres" yaml:"postgres"`
// Default queue settings
DefaultQueue string `json:"default_queue" yaml:"default_queue"`
MaxQueueSize int `json:"max_queue_size" yaml:"max_queue_size"`
// Task retention
CompletedTaskTTL time.Duration `json:"completed_task_ttl" yaml:"completed_task_ttl"`
FailedTaskTTL time.Duration `json:"failed_task_ttl" yaml:"failed_task_ttl"`
}
QueueConfig contains queue backend configuration
type QueueStats ¶
type QueueStats struct {
QueueName string `json:"queue_name"`
PendingTasks int64 `json:"pending_tasks"`
RunningTasks int64 `json:"running_tasks"`
CompletedTasks int64 `json:"completed_tasks"`
FailedTasks int64 `json:"failed_tasks"`
DeadLetterTasks int64 `json:"dead_letter_tasks"`
TasksByPriority map[Priority]int64 `json:"tasks_by_priority"`
TasksByType map[TaskType]int64 `json:"tasks_by_type"`
LastUpdated time.Time `json:"last_updated"`
}
QueueStats provides statistics about a queue
type RateLimitConfig ¶
type RateLimitConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
DefaultLimit int `json:"default_limit" yaml:"default_limit"` // requests per second
DefaultBurst int `json:"default_burst" yaml:"default_burst"` // burst size
CleanupInterval time.Duration `json:"cleanup_interval" yaml:"cleanup_interval"`
// Per-client and per-task-type limits
ClientLimits map[string]LimitConfig `json:"client_limits" yaml:"client_limits"`
TaskTypeLimits map[TaskType]LimitConfig `json:"task_type_limits" yaml:"task_type_limits"`
}
RateLimitConfig contains rate limiting configuration
type RateLimiter ¶
type RateLimiter interface {
// Allow checks if an operation is allowed under the rate limit
Allow(ctx context.Context, key string) (bool, error)
// AllowN checks if N operations are allowed
AllowN(ctx context.Context, key string, n int) (bool, error)
// Reset resets the rate limiter for a specific key
Reset(ctx context.Context, key string) error
}
RateLimiter defines the interface for rate limiting
type RedisConfig ¶
type RedisConfig struct {
DB int `json:"db" yaml:"db"`
Password string `json:"password" yaml:"password"`
MaxRetries int `json:"max_retries" yaml:"max_retries"`
PoolSize int `json:"pool_size" yaml:"pool_size"`
MinIdleConns int `json:"min_idle_conns" yaml:"min_idle_conns"`
DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
PoolTimeout time.Duration `json:"pool_timeout" yaml:"pool_timeout"`
}
RedisConfig contains Redis-specific configuration
type ScheduledPayload ¶
type ScheduledPayload struct {
CronExpression string `json:"cron_expression,omitempty"` // Cron expression for recurring
Timezone string `json:"timezone,omitempty"` // Timezone for execution
MaxRuns int `json:"max_runs,omitempty"` // Limit number of executions
StartDate *time.Time `json:"start_date,omitempty"` // When to start schedule
EndDate *time.Time `json:"end_date,omitempty"` // When to end schedule
// The actual task to execute
TaskType TaskType `json:"task_type" validate:"required"`
TaskPayload interface{} `json:"task_payload" validate:"required"`
TaskOptions TaskOptions `json:"task_options,omitempty"`
}
ScheduledPayload represents the payload for scheduled/cron tasks
type Scheduler ¶
type Scheduler interface {
// Start begins the scheduling process
Start(ctx context.Context) error
// Stop gracefully stops the scheduler
Stop(ctx context.Context) error
// ScheduleTask adds a task to be executed at a specific time
ScheduleTask(ctx context.Context, task *Task, executeAt time.Time) error
// ScheduleCron adds a recurring task with cron expression
ScheduleCron(ctx context.Context, cronExpr string, taskTemplate *Task) error
// CancelScheduledTask removes a scheduled task
CancelScheduledTask(ctx context.Context, taskID string) error
// GetScheduledTasks returns upcoming scheduled tasks
GetScheduledTasks(ctx context.Context, from, to time.Time) ([]*Task, error)
}
Scheduler defines the interface for task scheduling
type SchedulerConfig ¶
type SchedulerConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
CheckInterval time.Duration `json:"check_interval" yaml:"check_interval"`
MaxScheduledTasks int `json:"max_scheduled_tasks" yaml:"max_scheduled_tasks"`
Timezone string `json:"timezone" yaml:"timezone"`
// Cleanup settings
CleanupInterval time.Duration `json:"cleanup_interval" yaml:"cleanup_interval"`
CleanupAge time.Duration `json:"cleanup_age" yaml:"cleanup_age"`
}
SchedulerConfig contains scheduler configuration
type SecurityConfig ¶
type SecurityConfig struct {
// Authentication
Auth AuthConfig `json:"auth" yaml:"auth"`
// JWT settings
JWT JWTConfig `json:"jwt" yaml:"jwt"`
// API keys
APIKeys APIKeyConfig `json:"api_keys" yaml:"api_keys"`
// Encryption
Encryption EncryptionConfig `json:"encryption" yaml:"encryption"`
}
SecurityConfig contains security-related configuration
type ServiceCircuitConfig ¶
type ServiceCircuitConfig struct {
Threshold int `json:"threshold" yaml:"threshold"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
MaxRequests int `json:"max_requests" yaml:"max_requests"`
}
ServiceCircuitConfig contains service-specific circuit breaker settings
type TLSConfig ¶
type TLSConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
CertFile string `json:"cert_file" yaml:"cert_file"`
KeyFile string `json:"key_file" yaml:"key_file"`
CAFile string `json:"ca_file" yaml:"ca_file"`
}
TLSConfig contains TLS configuration
type Task ¶
type Task struct {
// Core identifiers
ID string `json:"id" bson:"_id"` // Unique task identifier
Type TaskType `json:"type" bson:"type"` // Type of task
Priority Priority `json:"priority" bson:"priority"` // Execution priority
Status TaskStatus `json:"status" bson:"status"` // Current status
Queue string `json:"queue" bson:"queue"` // Target queue name
// Payload and metadata
// Payload contains task-specific data as a raw JSON object.
// The expected structure of Payload depends on the Type field and should match
// the corresponding typed payload struct defined in payloads.go (e.g., WebhookPayload, EmailPayload, etc.).
// When creating or processing a Task, marshal/unmarshal Payload to/from the appropriate struct
// based on Task.Type. See payloads.go for definitions and expected formats.
Payload json.RawMessage `json:"payload" bson:"payload"` // Task-specific data
Metadata map[string]interface{} `json:"metadata" bson:"metadata"` // Additional metadata
// Scheduling and timing
CreatedAt time.Time `json:"created_at" bson:"created_at"` // When task was created
ScheduledAt *time.Time `json:"scheduled_at" bson:"scheduled_at"` // When to execute (nil = immediate)
StartedAt *time.Time `json:"started_at" bson:"started_at"` // When processing began
CompletedAt *time.Time `json:"completed_at" bson:"completed_at"` // When processing finished
// Retry and error handling
MaxRetries int `json:"max_retries" bson:"max_retries"` // Maximum retry attempts
CurrentRetries int `json:"current_retries" bson:"current_retries"` // Current retry count
LastError string `json:"last_error,omitempty" bson:"last_error"` // Last error message
NextRetryAt *time.Time `json:"next_retry_at" bson:"next_retry_at"` // When to retry next
// Processing context
WorkerID string `json:"worker_id,omitempty" bson:"worker_id"` // ID of processing worker
CorrelationID string `json:"correlation_id,omitempty" bson:"correlation_id"` // For request tracing
// Multi-tenancy and isolation
TenantID string `json:"tenant_id,omitempty" bson:"tenant_id"` // Tenant identifier
// Timeout and deadlines
Timeout *time.Duration `json:"timeout,omitempty" bson:"timeout"` // Max execution time
DeadlineAt *time.Time `json:"deadline_at" bson:"deadline_at"` // Hard deadline
// Deduplication
DedupeKey string `json:"dedupe_key,omitempty" bson:"dedupe_key"` // For preventing duplicates
}
Task represents a unit of work in the TaskForge system
func (*Task) SetPayload ¶
SetPayload marshals the provided value and sets it as the task payload
func (*Task) UnmarshalPayload ¶
UnmarshalPayload unmarshals the task payload into the provided interface Example:
var webhook WebhookPayload err := task.UnmarshalPayload(&webhook)
type TaskBuilder ¶
type TaskBuilder struct {
// contains filtered or unexported fields
}
TaskBuilder provides a fluent interface for building tasks
func NewTaskBuilder ¶
func NewTaskBuilder(taskType TaskType) *TaskBuilder
NewTaskBuilder creates a new task builder
func (*TaskBuilder) Build ¶
func (b *TaskBuilder) Build() (*Task, error)
Build returns the constructed task or an error if building failed
func (*TaskBuilder) MustBuild ¶
func (b *TaskBuilder) MustBuild() *Task
MustBuild returns the constructed task or panics if there's an error Use this only in tests or when you're certain the build will succeed
func (*TaskBuilder) WithCorrelationID ¶
func (b *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder
WithCorrelationID sets the correlation ID
func (*TaskBuilder) WithDeadline ¶
func (b *TaskBuilder) WithDeadline(deadline time.Time) *TaskBuilder
WithDeadline sets the task deadline
func (*TaskBuilder) WithDedupeKey ¶
func (b *TaskBuilder) WithDedupeKey(key string) *TaskBuilder
WithDedupeKey sets the deduplication key
func (*TaskBuilder) WithID ¶
func (b *TaskBuilder) WithID(id string) *TaskBuilder
WithID sets the task ID
func (*TaskBuilder) WithMaxRetries ¶
func (b *TaskBuilder) WithMaxRetries(maxRetries int) *TaskBuilder
WithMaxRetries sets the maximum retry attempts
func (*TaskBuilder) WithMetadata ¶
func (b *TaskBuilder) WithMetadata(key string, value interface{}) *TaskBuilder
WithMetadata adds metadata to the task
func (*TaskBuilder) WithPayload ¶
func (b *TaskBuilder) WithPayload(payload interface{}) *TaskBuilder
WithPayload sets the task payload The payload must be a valid JSON-serializable value
func (*TaskBuilder) WithPriority ¶
func (b *TaskBuilder) WithPriority(priority Priority) *TaskBuilder
WithPriority sets the task priority
func (*TaskBuilder) WithQueue ¶
func (b *TaskBuilder) WithQueue(queue string) *TaskBuilder
WithQueue sets the target queue
func (*TaskBuilder) WithRawPayload ¶
func (b *TaskBuilder) WithRawPayload(payload json.RawMessage) *TaskBuilder
WithRawPayload sets the task payload from already-serialized JSON
func (*TaskBuilder) WithScheduledAt ¶
func (b *TaskBuilder) WithScheduledAt(scheduledAt time.Time) *TaskBuilder
WithScheduledAt sets when the task should be executed
func (*TaskBuilder) WithTenantID ¶
func (b *TaskBuilder) WithTenantID(tenantID string) *TaskBuilder
WithTenantID sets the tenant ID
func (*TaskBuilder) WithTimeout ¶
func (b *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder
WithTimeout sets the task timeout
type TaskError ¶
type TaskError struct {
TaskID string `json:"task_id"`
TaskType TaskType `json:"task_type"`
Code ErrorCode `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
Retryable bool `json:"retryable"`
Cause error `json:"-"` // Original error (not serialized)
}
TaskError represents a task-specific error with additional context
func NewTaskError ¶
NewTaskError creates a new TaskError
type TaskIDGenerator ¶
type TaskIDGenerator interface {
Generate() string
}
TaskIDGenerator generates unique task IDs
type TaskOptions ¶
type TaskOptions struct {
Priority Priority `json:"priority,omitempty"`
Queue string `json:"queue,omitempty"`
MaxRetries *int `json:"max_retries,omitempty"`
Timeout *time.Duration `json:"timeout,omitempty"`
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
DeadlineAt *time.Time `json:"deadline_at,omitempty"`
DedupeKey string `json:"dedupe_key,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
TenantID string `json:"tenant_id,omitempty"`
}
TaskOptions provides configuration for task creation
type TaskProcessor ¶
type TaskProcessor interface {
// Process executes a task and returns the result
Process(ctx context.Context, task *Task) (*TaskResult, error)
// GetSupportedTypes returns the task types this processor can handle
GetSupportedTypes() []TaskType
// GetCapabilities returns additional capabilities (e.g., "image-resize", "webhook-v2")
GetCapabilities() []string
}
TaskProcessor defines the interface for task execution
type TaskResult ¶
type TaskResult struct {
TaskID string `json:"task_id"`
Status TaskStatus `json:"status"`
Result json.RawMessage `json:"result,omitempty"` // Success result data
Error string `json:"error,omitempty"` // Error message if failed
Duration time.Duration `json:"duration"` // Execution time
Metadata map[string]interface{} `json:"metadata,omitempty"` // Additional result metadata
CompletedAt time.Time `json:"completed_at"`
WorkerID string `json:"worker_id"`
}
TaskResult represents the outcome of task execution
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current state of a task
const ( TaskStatusPending TaskStatus = "pending" // Queued, awaiting processing TaskStatusRunning TaskStatus = "running" // Currently being processed TaskStatusCompleted TaskStatus = "completed" // Successfully completed TaskStatusFailed TaskStatus = "failed" // Failed (will retry if attempts remain) TaskStatusDeadLetter TaskStatus = "dead_letter" // Failed permanently, moved to DLQ TaskStatusCancelled TaskStatus = "cancelled" // Cancelled by user )
type TaskTransformer ¶
type TaskTransformer interface {
// Transform modifies a task before it's enqueued (e.g., add defaults, enrich data)
Transform(task *Task) (*Task, error)
// GetSupportedTypes returns the task types this transformer handles
GetSupportedTypes() []TaskType
}
TaskTransformer defines the interface for task transformation
type TaskType ¶
type TaskType string
TaskType defines the type of task to be executed
const ( TaskTypeWebhook TaskType = "webhook" // HTTP webhook delivery TaskTypeEmail TaskType = "email" // Email processing TaskTypeImageProcess TaskType = "image_process" // Image operations TaskTypeDataProcess TaskType = "data_process" // Data transformations TaskTypeScheduled TaskType = "scheduled" // Cron/scheduled tasks TaskTypeBatch TaskType = "batch" // Bulk operations )
type TaskValidator ¶
type TaskValidator interface {
// Validate checks if a task is valid and can be processed
Validate(task *Task) error
// ValidatePayload validates task-specific payload
ValidatePayload(taskType TaskType, payload []byte) error
}
TaskValidator defines the interface for task validation
type UUIDGenerator ¶
type UUIDGenerator struct{}
UUIDGenerator generates UUID-based task IDs
func (*UUIDGenerator) Generate ¶
func (g *UUIDGenerator) Generate() string
type ValidationError ¶
type ValidationError struct {
Field string `json:"field"`
Value interface{} `json:"value"`
Tag string `json:"tag"`
Message string `json:"message"`
}
ValidationError represents field validation errors
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
type ValidationErrors ¶
type ValidationErrors []*ValidationError
ValidationErrors represents multiple validation errors
func (ValidationErrors) Error ¶
func (e ValidationErrors) Error() string
type WebhookPayload ¶
type WebhookPayload struct {
URL string `json:"url" validate:"required,url"`
Method string `json:"method" validate:"required,oneof=GET POST PUT PATCH DELETE"`
Headers map[string]string `json:"headers,omitempty"`
Body interface{} `json:"body,omitempty"`
// Webhook-specific options
Timeout time.Duration `json:"timeout,omitempty"` // Request timeout
FollowRedirects bool `json:"follow_redirects,omitempty"` // Follow HTTP redirects
VerifySSL bool `json:"verify_ssl,omitempty"` // Verify SSL certificates
ExpectedStatus []int `json:"expected_status,omitempty"` // Expected HTTP status codes
Secret string `json:"secret,omitempty"` // For HMAC signature
SignatureHeader string `json:"signature_header,omitempty"` // Header name for signature
// Circuit breaker settings
CircuitBreakerKey string `json:"circuit_breaker_key,omitempty"` // Group webhooks for circuit breaking
}
WebhookPayload represents the payload for HTTP webhook delivery tasks
type Worker ¶
type Worker interface {
// Start begins processing tasks from specified queues
Start(ctx context.Context, queues []string) error
// Stop gracefully stops the worker, finishing current tasks
Stop(ctx context.Context) error
// RegisterProcessor adds a task processor for specific task types
RegisterProcessor(taskType TaskType, processor TaskProcessor) error
// GetInfo returns current worker information
GetInfo() *WorkerInfo
// Heartbeat updates the worker's status and metadata
Heartbeat(ctx context.Context) error
}
Worker defines the interface for task workers
type WorkerConfig ¶
type WorkerConfig struct {
ID string `json:"id" yaml:"id"` // Worker identifier
Queues []string `json:"queues" yaml:"queues"` // Queues to process
Concurrency int `json:"concurrency" yaml:"concurrency"` // Concurrent tasks
Timeout time.Duration `json:"timeout" yaml:"timeout"` // Task timeout
HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
ShutdownTimeout time.Duration `json:"shutdown_timeout" yaml:"shutdown_timeout"`
// Retry configuration
MaxRetries int `json:"max_retries" yaml:"max_retries"`
RetryBackoff string `json:"retry_backoff" yaml:"retry_backoff"` // exponential, linear, fixed
InitialDelay time.Duration `json:"initial_delay" yaml:"initial_delay"`
MaxDelay time.Duration `json:"max_delay" yaml:"max_delay"`
BackoffFactor float64 `json:"backoff_factor" yaml:"backoff_factor"`
// Resource limits
MaxMemoryMB int `json:"max_memory_mb" yaml:"max_memory_mb"`
MaxCPUPercent int `json:"max_cpu_percent" yaml:"max_cpu_percent"`
// Task type filters
SupportedTypes []TaskType `json:"supported_types" yaml:"supported_types"`
Capabilities []string `json:"capabilities" yaml:"capabilities"`
}
WorkerConfig contains worker configuration
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
Queues []string `json:"queues"` // Queues this worker processes
Status WorkerStatus `json:"status"`
RegisteredAt time.Time `json:"registered_at"`
LastHeartbeat time.Time `json:"last_heartbeat"`
CurrentTasks []string `json:"current_tasks"` // Currently processing task IDs
Capabilities []string `json:"capabilities"` // Task types this worker can handle
Metadata map[string]string `json:"metadata"`
}
WorkerInfo represents information about a worker
type WorkerStatus ¶
type WorkerStatus string
WorkerStatus represents the current state of a worker
const ( WorkerStatusIdle WorkerStatus = "idle" // Available for work WorkerStatusBusy WorkerStatus = "busy" // Processing tasks WorkerStatusDraining WorkerStatus = "draining" // Finishing current tasks, no new ones WorkerStatusOffline WorkerStatus = "offline" // Disconnected WorkerStatusMaintenance WorkerStatus = "maintenance" // Under maintenance )