Documentation
¶
Overview ¶
internal/watcher/circuitbreaker.go (NOVO ARQUIVO!)
Package watcher provides a robust file system monitoring and processing pipeline.
Gordon Watcher monitors directories for new files, validates them, ensures idempotency through SHA256 hashing, and publishes them to a message queue for downstream processing.
Architecture ¶
The watcher follows an event-driven architecture with the following components:
- fsnotify integration for real-time file system events
- Worker pool for concurrent file processing
- Rate limiter to prevent system overload
- Stability checker to ensure files are fully written
- Circuit breaker for resilient queue publishing
- Distributed locks via Redis for multi-instance coordination
Basic Usage ¶
cfg := watcher.Config{
Paths: []string{"/data/incoming"},
FilePatterns: []string{"*.xml", "*.json"},
MaxWorkers: 10,
MaxFilesPerSecond: 100,
WorkingDir: "/data",
Queue: myQueue,
Storage: myStorage,
Logger: myLogger,
}
w, err := watcher.New(cfg)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := w.Start(ctx); err != nil {
log.Fatal(err)
}
// Graceful shutdown
defer w.Stop(context.Background())
File Processing Flow ¶
1. File detected in monitored directory 2. Stability check (waits for file to stop changing) 3. Pattern matching and size validation 4. SHA256 hash calculation 5. Idempotency check (skip if already processed) 6. Distributed lock acquisition 7. Move to processing directory 8. Publish to message queue (with retry + circuit breaker) 9. File remains in processing until external worker completes
Resilience Features ¶
- Automatic retry with exponential backoff
- Circuit breaker to prevent cascading failures
- Orphan file reconciliation on startup
- Dead Letter Queue (DLQ) for failed messages
- Graceful shutdown with in-flight request completion
Observability ¶
The watcher provides comprehensive observability:
- Prometheus metrics for monitoring
- OpenTelemetry tracing for distributed debugging
- Structured logging with configurable levels
- Health and readiness endpoints
See the Config type for all available configuration options.
Index ¶
- func ExtractZip(zipPath, destDir string) ([]string, error)
- func IsZipFile(filename string) bool
- func Retry(ctx context.Context, cfg RetryConfig, fn func() error) error
- type CircuitBreaker
- type Cleaner
- type CleanupConfig
- type CleanupScheduler
- type Config
- type RateLimiter
- type RetryConfig
- type StabilityChecker
- type State
- type SubDirectories
- type Watcher
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExtractZip ¶ added in v1.0.2
ExtractZip extracts a ZIP file to the specified destination directory
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) Call ¶
func (cb *CircuitBreaker) Call(fn func() error) error
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() State
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner removes empty directories
type CleanupConfig ¶ added in v1.0.2
type CleanupConfig struct {
WorkingDir string
SubDirs SubDirectories
Retention map[string]int
Schedule string
Logger *logger.Logger
}
CleanupConfig holds cleanup configuration
type CleanupScheduler ¶ added in v1.0.2
type CleanupScheduler struct {
// contains filtered or unexported fields
}
CleanupScheduler handles scheduled directory cleanup
func NewCleanupScheduler ¶ added in v1.0.2
func NewCleanupScheduler(cfg CleanupConfig) *CleanupScheduler
NewCleanupScheduler creates a new cleanup scheduler
func (*CleanupScheduler) Start ¶ added in v1.0.2
func (cs *CleanupScheduler) Start() error
Start starts the cleanup scheduler
func (*CleanupScheduler) Stop ¶ added in v1.0.2
func (cs *CleanupScheduler) Stop()
Stop stops the cleanup scheduler
type Config ¶
type Config struct {
// Paths to watch
Paths []string
// File patterns to match
FilePatterns []string
// Patterns to exclude
ExcludePatterns []string
// File size constraints (bytes)
MinFileSize int64
MaxFileSize int64
// Stability check settings
StableAttempts int
StableDelay time.Duration
// Cleanup interval for empty directories
CleanupInterval time.Duration
// Worker pool settings
MaxWorkers int
MaxFilesPerSecond int
WorkerQueueSize int
// Working directory
WorkingDir string
// Subdirectories
SubDirs SubDirectories
// Dependencies
Queue queue.Queue
Storage storage.Storage
Logger *logger.Logger
}
Config holds watcher configuration
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter limits the rate of file processing
func NewRateLimiter ¶
func NewRateLimiter(maxFilesPerSecond int) *RateLimiter
NewRateLimiter creates a new rate limiter
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
Allow checks if an operation is allowed under the rate limit
func (*RateLimiter) Wait ¶
func (r *RateLimiter) Wait()
Wait waits until the rate limit allows another operation
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
RetryConfig configures retry behavior
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns default retry configuration
type StabilityChecker ¶
type StabilityChecker struct {
// contains filtered or unexported fields
}
StabilityChecker checks if a file has stabilized (stopped changing)
func NewStabilityChecker ¶
func NewStabilityChecker(attempts int, delay time.Duration) *StabilityChecker
NewStabilityChecker creates a new stability checker
func (*StabilityChecker) WaitForStability ¶
func (s *StabilityChecker) WaitForStability(ctx context.Context, path string) bool
WaitForStability waits for a file to stabilize
type SubDirectories ¶
type SubDirectories struct {
Processing string
Processed string
Failed string
Ignored string
Tmp string
}
SubDirectories defines directory structure
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher monitors file system events
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of workers for processing files
func NewWorkerPool ¶
func NewWorkerPool(maxWorkers, queueSize int, processor func(context.Context, string) error) *WorkerPool
NewWorkerPool creates a new worker pool
func (*WorkerPool) Submit ¶
func (p *WorkerPool) Submit(path string)
Submit submits a file path to the worker pool
func (*WorkerPool) SubmitBlocking ¶ added in v1.0.2
func (p *WorkerPool) SubmitBlocking(path string)
SubmitBlocking submits a file path to the worker pool, blocking if the queue is full