Documentation
¶
Overview ¶
Package goWorkers provides a flexible and efficient worker pool implementation for concurrent task processing. It allows for managing a pool of workers that can process tasks concurrently with features like task cancellation, retries, and graceful shutdown.
Index ¶
- Variables
- type Pool
- func (p *Pool) AddWait()
- func (p *Pool) Cancel(id string) error
- func (p *Pool) Done()
- func (p *Pool) Find(id string) (*Task, bool)
- func (p *Pool) IsProcessing(id string) bool
- func (p *Pool) Len() int
- func (p *Pool) NewTask(ctx context.Context, function func() bool) (string, error)
- func (p *Pool) Next() (*Task, error)
- func (p *Pool) RemainingTasks() int
- func (p *Pool) RunWorkers()
- func (p *Pool) Shutdown(timeout time.Duration) bool
- func (p *Pool) Size() int
- func (p *Pool) Wait()
- type Task
Constants ¶
This section is empty.
Variables ¶
var ErrNoTasksInQueue = errors.New("no tasks in queue")
ErrNoTasksInQueue is returned when trying to dequeue from an empty queue
var ErrPoolClosed = errors.New("worker pool is closed, not accepting new tasks")
ErrPoolClosed is returned when trying to add a task to a closed pool
var ErrTaskNotFound = errors.New("task not found")
ErrTaskNotFound is returned when a task with the specified ID is not found
var ErrTaskProcessing = errors.New("task is processing, cancellation signal sent")
ErrTaskProcessing is returned when trying to cancel a task that is currently processing
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool struct {
RemainingProcesses int // Number of tasks that still need to be processed
MaxWorkers int // Maximum number of concurrent workers
MaxRetries int // Maximum number of retries for failed tasks
// contains filtered or unexported fields
}
Pool represents a worker pool that processes tasks concurrently. It manages a collection of tasks and a configurable number of worker goroutines that process these tasks. The pool supports features like task cancellation, retries for failed tasks, and graceful shutdown.
func New ¶
New creates a new worker pool with the specified maximum number of workers and retries. This is an alias for NewPool for backward compatibility.
The maxWorkers parameter specifies the maximum number of concurrent worker goroutines. The maxRetries parameter specifies how many times a failed task will be retried.
After creating a pool, call RunWorkers() to start processing tasks.
func NewPool ¶
NewPool creates a new worker pool with the specified maximum number of workers and retries.
The maxWorkers parameter specifies the maximum number of concurrent worker goroutines. The maxRetries parameter specifies how many times a failed task will be retried.
After creating a pool, call RunWorkers() to start processing tasks.
func NewQueue ¶
NewQueue creates a new worker pool with the specified maximum number of workers and retries. Deprecated: Use New or NewPool instead.
This function is maintained for backward compatibility and will be removed in a future version.
func (*Pool) AddWait ¶
func (p *Pool) AddWait()
AddWait increments the wait group counter. This method is used internally by the worker pool to signal that a new task is being processed. It should generally not be called directly unless you're implementing custom task processing logic.
For each call to AddWait(), there should be a corresponding call to Done() when the task completes. The Wait() method will block until all tasks that have been started with AddWait() have called Done().
func (*Pool) Cancel ¶
Cancel cancels the task with the given ID. It returns an error if the task is not found (ErrTaskNotFound) or if it is currently processing (ErrTaskProcessing).
If the task is not currently being processed, it is removed from the pool immediately. If the task is being processed, it is marked for cancellation and will be removed after it completes. The task's context is cancelled in both cases, which allows the task function to detect cancellation.
This method is thread-safe and can be used to cancel tasks that are no longer needed.
func (*Pool) Done ¶
func (p *Pool) Done()
Done decrements the wait group counter. This method is used internally by the worker pool to signal that a task has completed processing. It should generally not be called directly unless you're implementing custom task processing logic.
func (*Pool) Find ¶
Find returns the task with the given ID and a boolean indicating whether it was found. This method is thread-safe and can be used to retrieve a specific task by its ID.
It's useful for checking the status of a task or for implementing custom task management logic. The returned task should not be modified directly, as this could lead to race conditions.
func (*Pool) IsProcessing ¶
IsProcessing returns whether the task with the given ID is currently being processed. If the task is not found in the pool, false is returned.
This method is thread-safe and can be used to check the status of a specific task. It's useful for monitoring long-running tasks or implementing custom task management logic.
func (*Pool) Len ¶
Len returns the number of tasks in the queue waiting to be processed. This method is thread-safe and can be used to monitor the queue length.
Note that this differs from Size(), which returns the total number of tasks in the pool, and from RemainingTasks(), which returns the number of tasks that still need to be processed.
func (*Pool) NewTask ¶
NewTask adds a new task to the pool with the given context and function. It returns the ID of the created task and an error if the pool is closed.
The context can be used to cancel the task before or during execution. The function should return true if the task was processed successfully, or false if it failed and should be retried (subject to the MaxRetries limit).
This method is thread-safe and can be called concurrently from multiple goroutines. If the pool is closed (after Shutdown was called), ErrPoolClosed is returned.
func (*Pool) Next ¶
Next returns the next task to be processed from the queue. It returns an error if there are no tasks in the queue (ErrNoTasksInQueue).
This method is thread-safe and removes the task from the queue. It's primarily used internally by the worker pool to get the next task to process, but can also be used to implement custom task processing logic.
func (*Pool) RemainingTasks ¶
RemainingTasks returns the number of tasks that still need to be processed. This includes tasks that are currently being processed and tasks that are waiting in the queue. This method is thread-safe and can be used to monitor the progress of the worker pool.
func (*Pool) RunWorkers ¶
func (p *Pool) RunWorkers()
RunWorkers starts the worker pool and processes tasks as they are added. It will continue running until Shutdown is called.
This method should be called after creating a pool and before adding tasks. It typically runs in its own goroutine:
pool := goWorkers.NewPool(5, 3) go pool.RunWorkers()
The worker pool will process tasks concurrently up to the MaxWorkers limit. If there are more tasks than workers, the excess tasks will be queued and processed as workers become available. Failed tasks will be retried up to MaxRetries times.
func (*Pool) Shutdown ¶
Shutdown gracefully shuts down the worker pool. It stops accepting new tasks and waits for all currently running tasks to complete. If timeout is greater than 0, it will wait for at most the specified duration for tasks to complete. If timeout is 0 or negative, it will wait indefinitely. Returns true if all tasks completed successfully, false if the timeout was reached.
This method is thread-safe and can be called from any goroutine. After calling Shutdown, any attempts to add new tasks to the pool will return ErrPoolClosed. This method should be called when you're done with the worker pool to ensure all resources are properly released.
func (*Pool) Size ¶
Size returns the total number of tasks in the pool, including both queued and processing tasks. This method is thread-safe and can be used to monitor the overall size of the worker pool.
Note that this differs from Len(), which returns only the number of tasks in the queue, and from RemainingTasks(), which returns the number of tasks that still need to be processed.
func (*Pool) Wait ¶
func (p *Pool) Wait()
Wait waits for all currently processing tasks to complete. This method blocks until all tasks that have been started with AddWait() have called Done().
It's useful for implementing synchronization points in your application, such as waiting for a batch of tasks to complete before proceeding. Note that this only waits for tasks that have been started, not for tasks in the queue.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a unit of work to be processed by the worker pool. It encapsulates the function to be executed, its execution state, and context for cancellation. Tasks are created using the NewTask function and are typically added to a Pool for execution.
func NewTask ¶
NewTask creates a new task with the given context and function. If ctx is nil, context.Background() is used.
The function parameter should return true if the task was processed successfully, or false if it failed and should be retried (subject to the MaxRetries limit of the Pool).
The returned Task is ready to be added to a Pool for execution.
func (*Task) IsCancelled ¶
IsCancelled returns whether the task has been cancelled. This method checks if the task's context has been cancelled, which can happen when the PendingCancel method is called or when the parent context is cancelled. It is used to determine if a task should continue execution.
func (*Task) IsProcessing ¶
IsProcessing returns whether the task is currently being processed. This method is thread-safe and can be used to check if a task is currently being executed.
func (*Task) PendingCancel ¶
func (t *Task) PendingCancel()
PendingCancel marks the task as pending cancellation and cancels its context. This method is thread-safe and is typically called by the Pool when a task is cancelled. It signals the task to stop execution as soon as possible by cancelling its context.
func (*Task) Processing ¶
func (t *Task) Processing()
Processing marks the task as being processed. This method is thread-safe and is typically called by the Pool when it starts processing the task.
func (*Task) ResetProcessing ¶
func (t *Task) ResetProcessing()
ResetProcessing resets the processing state of the task. This method is thread-safe and is typically called by the Pool when it needs to requeue a task for retry.
func (*Task) WithTimeout ¶
WithTimeout sets a timeout for the task execution. If the task takes longer than the specified timeout to complete, it will be automatically cancelled. A timeout of 0 or negative means no timeout. This method returns the task itself to allow for method chaining.
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
cancelTask
command
|
|
|
gracefulShutdown
command
|
|
|
simplePool
command
|
|
|
taskTimeout
command
|