Documentation
¶
Index ¶
- Variables
- func Await(ctx Context, checkFn func() bool)
- func AwaitWithTimeout(ctx Context, timeout time.Duration, checkFn func() bool) bool
- func BindWorkflowInput(fn any, encodedInput []byte) func(Context)
- func CancelTimer(ctx Context, timer Future[time.Time]) error
- func Go(ctx Context, goFn func(Context))
- func Heartbeat(ctx context.Context, details any) error
- func NonRetryable(err error) error
- func Now(ctx Context) time.Time
- func RegisterActivity[I, O any](r Registrar, name string, fn Activity[I, O]) error
- func RegisterWorkflow[I, O any](r Registrar, name string, wf Workflow[I, O]) error
- func Replay(workflowFn any, events []journal.Event) error
- func ReplayFromStore(ctx context.Context, store journal.Store, workflowID, runID string, ...) error
- func RetryAfter(delay time.Duration, err error) error
- func RunWorker(ctx context.Context, w Worker) error
- func SideEffect[T any](ctx Context, fn func() T) T
- func SignalExternalWorkflow(ctx Context, targetWorkflowID, signalName string, payload any)
- func Sleep(ctx Context, d time.Duration)
- func WithActivityInfo(ctx context.Context, info ActivityInfo) context.Context
- func WithHeartbeatRecorder(ctx context.Context, recorder func(details any) error) context.Context
- type Activity
- type ActivityInfo
- type ActivityOption
- func WithHeartbeatTimeout(d time.Duration) ActivityOption
- func WithNoRetry() ActivityOption
- func WithRetry(policy RetryPolicy) ActivityOption
- func WithScheduleToCloseTimeout(d time.Duration) ActivityOption
- func WithScheduleToStartTimeout(d time.Duration) ActivityOption
- func WithStartToCloseTimeout(d time.Duration) ActivityOption
- type ActivityRef
- type ActivityResolver
- type CancelInfo
- type Channel
- type ChildWorkflowOption
- type ChildWorkflowRef
- type Client
- type Clock
- type Codec
- type Context
- type ContinueAsNewError
- type Engine
- func (e *Engine) ActivityWorker() *activityWorker
- func (e *Engine) Actor() (execute func() error, interrupt func(error))
- func (e *Engine) Client() Client
- func (e *Engine) Run(ctx context.Context) error
- func (e *Engine) TimeoutWorker() *timeoutWorker
- func (e *Engine) TimerWorker() *timerWorker
- func (e *Engine) WorkerID() string
- func (e *Engine) WorkflowWorker() *workflowWorker
- type EngineOption
- func WithActivityResolver(r ActivityResolver) EngineOption
- func WithCacheSize(size int) EngineOption
- func WithClock(clock Clock) EngineOption
- func WithDefaultRetryPolicy(p RetryPolicy) EngineOption
- func WithEngineCodec(codec Codec) EngineOption
- func WithEngineLogger(l *slog.Logger) EngineOption
- func WithWorkerConcurrency(n int) EngineOption
- func WithWorkerID(id string) EngineOption
- func WithWorkflowResolver(r WorkflowResolver) EngineOption
- func WithWorkflowTaskTimeout(d time.Duration) EngineOption
- type ExecutionState
- type FakeClock
- type Future
- type GetOption
- type MemoryStore
- func (ms *MemoryStore) Append(ctx context.Context, workflowID, runID string, events []journal.Event, ...) ([]int, error)
- func (ms *MemoryStore) CancelWorkflow(ctx context.Context, workflowID, reason string) error
- func (ms *MemoryStore) CreateWorkflow(ctx context.Context, workflowID, workflowType string, input []byte, ...) (string, error)
- func (ms *MemoryStore) GetStatus(ctx context.Context, workflowID, runID string) (journal.WorkflowStatus, error)
- func (ms *MemoryStore) GetTimedOutActivities(ctx context.Context, now time.Time) ([]journal.TimedOutActivity, error)
- func (ms *MemoryStore) GetTimersToFire(ctx context.Context, now time.Time) ([]journal.PendingTimerTask, error)
- func (ms *MemoryStore) ListWorkflows(ctx context.Context, opts ...journal.ListWorkflowsOption) (*journal.ListWorkflowsResult, error)
- func (ms *MemoryStore) Load(ctx context.Context, workflowID, runID string) ([]journal.Event, error)
- func (ms *MemoryStore) LoadFrom(ctx context.Context, workflowID, runID string, afterEventID int) ([]journal.Event, error)
- func (ms *MemoryStore) RecordHeartbeat(ctx context.Context, workflowID, runID string, scheduledAt int, details []byte) error
- func (ms *MemoryStore) ReleaseExpiredWorkflowTasks(ctx context.Context, now time.Time, timeout time.Duration) (int, error)
- func (ms *MemoryStore) RequeueForRetry(ctx context.Context, workflowID, runID string, scheduledAt int, ...) error
- func (ms *MemoryStore) SignalWorkflow(ctx context.Context, workflowID, signalName string, payload []byte) error
- func (ms *MemoryStore) WaitForActivityTasks(ctx context.Context, workerID string, maxActivities int) ([]journal.PendingActivityTask, error)
- func (ms *MemoryStore) WaitForCompletion(ctx context.Context, workflowID, runID string) (journal.Event, error)
- func (ms *MemoryStore) WaitForWorkflowTasks(ctx context.Context, workerID string, maxNew int) ([]journal.PendingWorkflowTask, error)
- type MemoryStoreOption
- type NonRetryableError
- type NondeterminismError
- type RealClock
- type ReceiveFuture
- type Recorder
- type Registrar
- type Registry
- type ReplayResult
- type RetryPolicy
- type RetryableError
- type Run
- type ScheduledIDReceiver
- type Scheduler
- type SchedulerOption
- type Selector
- type SignalChannel
- type StubExecutionState
- func (s *StubExecutionState) AddExternalEvent(ev journal.Event) int
- func (s *StubExecutionState) Events() []journal.Event
- func (s *StubExecutionState) GetByScheduledID(scheduledEventID int) []journal.Event
- func (s *StubExecutionState) GetCancelRequest() *journal.WorkflowCancelRequested
- func (s *StubExecutionState) GetSignals(signalName string) []journal.SignalReceived
- func (s *StubExecutionState) NewEvents() []journal.Event
- func (s *StubExecutionState) NewRecorder(afterEventID int) Recorder
- type TimeoutPolicy
- type Worker
- type Workflow
- type WorkflowInfo
- type WorkflowNotFoundError
- type WorkflowResolver
Constants ¶
This section is empty.
Variables ¶
var ErrCancelled = errors.New("derecho: workflow cancelled")
var ErrChannelClosed = errors.New("derecho: channel closed")
ErrChannelClosed is returned by ReceiveFuture.Get when the channel was closed.
var ErrContinuedAsNew = errors.New("derecho: workflow continued as new")
var ErrNotCompleted = errors.New("derecho: workflow not completed")
ErrNotCompleted is returned by Get with NonBlocking option when the workflow hasn't finished.
var ErrTimerCancelled = errors.New("derecho: timer cancelled")
ErrTimerCancelled is returned by Future.Get when the timer was cancelled.
Functions ¶
func Await ¶
Await blocks until checkFn returns true. Each call to checkFn is a yield point - the scheduler runs other fibers between checks.
func AwaitWithTimeout ¶
AwaitWithTimeout blocks until checkFn returns true or timeout elapses. Returns true if condition was met, false if timed out.
func BindWorkflowInput ¶
BindWorkflowInput binds encoded input to a typed workflow, returning an executable func(Context). The returned function handles input unmarshaling and emits WorkflowCompleted/Failed events.
func CancelTimer ¶
CancelTimer cancels a pending timer. The timer's Future.Get will return ErrTimerCancelled. Returns an error if the timer has already fired or been cancelled.
func Go ¶
Go spawns a new fiber to run goFn concurrently within the workflow. Child fibers are cooperatively scheduled with the parent.
func Heartbeat ¶
Heartbeat signals activity liveness and optionally records progress. The details parameter is encoded and stored for checkpointing on retry. No-op if the activity was not configured with HeartbeatTimeout.
func NonRetryable ¶
NonRetryable wraps an error to indicate it should not be retried.
func Now ¶
Now returns the workflow's current logical time. This is deterministic during replay - it returns the time from when the workflow originally executed, not wall-clock time.
func RegisterActivity ¶
RegisterActivity registers an activity function with compile-time type safety. The function must have signature: func(context.Context, I) (O, error)
func RegisterWorkflow ¶
RegisterWorkflow registers a workflow function with compile-time type safety. The function must have signature: func(Context, I) (O, error)
func Replay ¶
Replay replays a workflow against recorded events. Returns NondeterminismError if workflow diverges from history. Works with complete and incomplete workflows.
func ReplayFromStore ¶
func ReplayFromStore(ctx context.Context, store journal.Store, workflowID, runID string, workflowFn any) error
ReplayFromStore loads events from a store and replays.
func RetryAfter ¶
RetryAfter wraps an error with a specific delay before the next retry.
func SideEffect ¶
SideEffect executes fn and records its result. On replay, fn is not called - the recorded result is returned instead. Use for non-deterministic operations like generating UUIDs or reading wall-clock time.
func SignalExternalWorkflow ¶
SignalExternalWorkflow sends a signal to another running workflow. This is fire-and-forget - no confirmation of delivery is provided.
func Sleep ¶
Sleep pauses the workflow for duration d. The timer is durable - if the workflow replays, Sleep returns immediately once the original timer would have fired.
func WithActivityInfo ¶
func WithActivityInfo(ctx context.Context, info ActivityInfo) context.Context
WithActivityInfo returns a context with ActivityInfo set. Intended for testing; production code uses automatic injection.
Types ¶
type ActivityInfo ¶
type ActivityInfo struct {
WorkflowID string
RunID string
ActivityName string
ScheduledAt int // Event ID - stable across retries
Attempt int // 1-indexed; use with ScheduledAt for retry-aware idempotency
ScheduledTime time.Time // When the activity was first scheduled
}
ActivityInfo provides execution context to running activities. Use this to build idempotency keys for external calls.
func GetActivityInfo ¶
func GetActivityInfo(ctx context.Context) ActivityInfo
GetActivityInfo retrieves execution context from an activity's context. Returns zero value if called outside an activity.
func (ActivityInfo) IdempotencyKey ¶
func (i ActivityInfo) IdempotencyKey() string
IdempotencyKey returns a stable key for this activity execution. Format: {workflowID}/{scheduledAt}/{attempt}
type ActivityOption ¶
type ActivityOption func(*activityOptions)
ActivityOption configures activity execution.
func WithHeartbeatTimeout ¶
func WithHeartbeatTimeout(d time.Duration) ActivityOption
WithHeartbeatTimeout sets the maximum time between heartbeats. Activities must call Heartbeat periodically to reset this deadline.
func WithRetry ¶
func WithRetry(policy RetryPolicy) ActivityOption
WithRetry sets the retry policy for the activity.
func WithScheduleToCloseTimeout ¶
func WithScheduleToCloseTimeout(d time.Duration) ActivityOption
WithScheduleToCloseTimeout sets the maximum total time from scheduling to completion.
func WithScheduleToStartTimeout ¶
func WithScheduleToStartTimeout(d time.Duration) ActivityOption
WithScheduleToStartTimeout sets the maximum time from scheduling to worker pickup.
func WithStartToCloseTimeout ¶
func WithStartToCloseTimeout(d time.Duration) ActivityOption
WithStartToCloseTimeout sets the maximum time from worker start to completion.
type ActivityRef ¶
type ActivityRef[I, O any] struct { // contains filtered or unexported fields }
func NewActivityRef ¶
func NewActivityRef[I, O any](name string) ActivityRef[I, O]
func (ActivityRef[I, O]) Execute ¶
func (r ActivityRef[I, O]) Execute(ctx Context, input I, opts ...ActivityOption) Future[O]
func (ActivityRef[I, O]) Name ¶
func (r ActivityRef[I, O]) Name() string
type ActivityResolver ¶
ActivityResolver looks up activity functions by name. Returned fn must have signature func(context.Context, I) (O, error) - use RegisterActivity to ensure this.
type CancelInfo ¶
type Channel ¶
type Channel[T any] struct { // contains filtered or unexported fields }
Channel provides typed, fiber-safe communication within a workflow.
func NewBufferedChannel ¶
NewBufferedChannel creates a buffered channel with the given capacity.
func NewChannel ¶
NewChannel creates an unbuffered channel.
func (*Channel[T]) ReceiveFuture ¶
ReceiveFuture returns a Future for use with Selector.
type ChildWorkflowOption ¶
type ChildWorkflowOption func(*childWorkflowOptions)
ChildWorkflowOption configures child workflow execution.
func WithClosePolicy ¶
func WithClosePolicy(policy journal.ParentClosePolicy) ChildWorkflowOption
WithClosePolicy sets the parent close policy for the child workflow.
type ChildWorkflowRef ¶
type ChildWorkflowRef[I, O any] struct { // contains filtered or unexported fields }
func NewChildWorkflowRef ¶
func NewChildWorkflowRef[I, O any](name string) ChildWorkflowRef[I, O]
func (ChildWorkflowRef[I, O]) Execute ¶
func (r ChildWorkflowRef[I, O]) Execute(ctx Context, workflowID string, input I, opts ...ChildWorkflowOption) Future[O]
Execute schedules a child workflow and returns a future for its result. The workflowID must be provided explicitly for deduplication.
func (ChildWorkflowRef[I, O]) Name ¶
func (r ChildWorkflowRef[I, O]) Name() string
type Client ¶
type Client interface {
StartWorkflow(ctx context.Context, workflowType, workflowID string, input any) (Run, error)
GetWorkflow(ctx context.Context, workflowID, runID string) (Run, error)
SignalWorkflow(ctx context.Context, workflowID, signalName string, payload any) error
CancelWorkflow(ctx context.Context, workflowID, reason string) error
ListWorkflows(ctx context.Context, opts ...journal.ListWorkflowsOption) (*journal.ListWorkflowsResult, error)
}
type Context ¶
type Context interface {
Err() error
// contains filtered or unexported methods
}
Context is a sealed interface for workflow execution. The unexported method prevents external implementation, so internal type assertions to capability interfaces (yielder, spawner, etc) are safe - only workflowContext exists.
type ContinueAsNewError ¶
type ContinueAsNewError struct {
Input any
}
ContinueAsNewError restarts the workflow with fresh history. Use to avoid unbounded event accumulation in long-running workflows.
func NewContinueAsNewError ¶
func NewContinueAsNewError(input any) *ContinueAsNewError
func (*ContinueAsNewError) Error ¶
func (e *ContinueAsNewError) Error() string
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) ActivityWorker ¶
func (e *Engine) ActivityWorker() *activityWorker
func (*Engine) TimeoutWorker ¶
func (e *Engine) TimeoutWorker() *timeoutWorker
func (*Engine) TimerWorker ¶
func (e *Engine) TimerWorker() *timerWorker
func (*Engine) WorkflowWorker ¶
func (e *Engine) WorkflowWorker() *workflowWorker
type EngineOption ¶
type EngineOption func(*engineConfig)
func WithActivityResolver ¶
func WithActivityResolver(r ActivityResolver) EngineOption
WithActivityResolver sets a custom activity resolver. When set, RegisterActivity cannot be used on this engine.
func WithCacheSize ¶
func WithCacheSize(size int) EngineOption
func WithClock ¶
func WithClock(clock Clock) EngineOption
func WithDefaultRetryPolicy ¶
func WithDefaultRetryPolicy(p RetryPolicy) EngineOption
WithDefaultRetryPolicy sets the default retry policy for activities.
func WithEngineCodec ¶
func WithEngineCodec(codec Codec) EngineOption
func WithEngineLogger ¶
func WithEngineLogger(l *slog.Logger) EngineOption
WithEngineLogger sets the logger for the engine and its workers.
func WithWorkerConcurrency ¶
func WithWorkerConcurrency(n int) EngineOption
WithWorkerConcurrency sets the number of goroutines per worker type.
func WithWorkerID ¶
func WithWorkerID(id string) EngineOption
func WithWorkflowResolver ¶
func WithWorkflowResolver(r WorkflowResolver) EngineOption
WithWorkflowResolver sets a custom workflow resolver. When set, RegisterWorkflow cannot be used on this engine.
func WithWorkflowTaskTimeout ¶
func WithWorkflowTaskTimeout(d time.Duration) EngineOption
WithWorkflowTaskTimeout sets the lease duration for workflow tasks. Expired tasks are released for reprocessing; leases are not renewed.
type ExecutionState ¶
type ExecutionState interface {
NewRecorder(workflowTaskScheduledID int) Recorder
GetByScheduledID(scheduledEventID int) []journal.Event
GetSignals(signalName string) []journal.SignalReceived
GetCancelRequest() *journal.WorkflowCancelRequested
}
ExecutionState feeds events to the scheduler and manages visibility for deterministic replay of a single workflow execution.
type FakeClock ¶
type FakeClock struct {
// contains filtered or unexported fields
}
FakeClock provides controllable time for tests.
func NewFakeClock ¶
type GetOption ¶
type GetOption func(*getOptions)
GetOption configures Get behavior.
func NonBlocking ¶
func NonBlocking() GetOption
NonBlocking returns ErrNotCompleted instead of blocking if the workflow hasn't finished.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
func NewMemoryStore ¶
func NewMemoryStore(opts ...MemoryStoreOption) *MemoryStore
func (*MemoryStore) CancelWorkflow ¶
func (ms *MemoryStore) CancelWorkflow(ctx context.Context, workflowID, reason string) error
func (*MemoryStore) CreateWorkflow ¶
func (*MemoryStore) GetStatus ¶
func (ms *MemoryStore) GetStatus(ctx context.Context, workflowID, runID string) (journal.WorkflowStatus, error)
func (*MemoryStore) GetTimedOutActivities ¶
func (ms *MemoryStore) GetTimedOutActivities(ctx context.Context, now time.Time) ([]journal.TimedOutActivity, error)
func (*MemoryStore) GetTimersToFire ¶
func (ms *MemoryStore) GetTimersToFire(ctx context.Context, now time.Time) ([]journal.PendingTimerTask, error)
func (*MemoryStore) ListWorkflows ¶
func (ms *MemoryStore) ListWorkflows(ctx context.Context, opts ...journal.ListWorkflowsOption) (*journal.ListWorkflowsResult, error)
func (*MemoryStore) RecordHeartbeat ¶
func (*MemoryStore) ReleaseExpiredWorkflowTasks ¶
func (*MemoryStore) RequeueForRetry ¶
func (ms *MemoryStore) RequeueForRetry(ctx context.Context, workflowID, runID string, scheduledAt int, info journal.RequeueInfo) error
func (*MemoryStore) SignalWorkflow ¶
func (*MemoryStore) WaitForActivityTasks ¶
func (ms *MemoryStore) WaitForActivityTasks(ctx context.Context, workerID string, maxActivities int) ([]journal.PendingActivityTask, error)
func (*MemoryStore) WaitForCompletion ¶
func (*MemoryStore) WaitForWorkflowTasks ¶
func (ms *MemoryStore) WaitForWorkflowTasks(ctx context.Context, workerID string, maxNew int) ([]journal.PendingWorkflowTask, error)
type MemoryStoreOption ¶
type MemoryStoreOption func(*MemoryStore)
func WithIDGenerator ¶
func WithIDGenerator(gen func() string) MemoryStoreOption
WithIDGenerator sets the function used to generate run IDs.
func WithStoreClock ¶
func WithStoreClock(clock Clock) MemoryStoreOption
type NonRetryableError ¶
type NonRetryableError struct {
Err error
}
NonRetryableError marks an error as permanent - no retries will be attempted.
func (*NonRetryableError) Error ¶
func (e *NonRetryableError) Error() string
func (*NonRetryableError) Unwrap ¶
func (e *NonRetryableError) Unwrap() error
type NondeterminismError ¶
type NondeterminismError struct {
WorkflowID string
RunID string
EventSeq int
Expected string
Got string
}
func (*NondeterminismError) Error ¶
func (e *NondeterminismError) Error() string
type ReceiveFuture ¶
type ReceiveFuture[T any] struct { // contains filtered or unexported fields }
ReceiveFuture adapts a channel receive for Selector integration.
func (*ReceiveFuture[T]) Get ¶
func (f *ReceiveFuture[T]) Get(ctx Context) (T, error)
Get implements Future[T].
type Recorder ¶
type Recorder interface {
// Record returns the event and its index in pending. During replay, index
// is -1 since the event already has the correct ID. During first execution,
// index identifies this event for RegisterPendingFuture.
Record(eventType string, generate func() journal.Event) (event journal.Event, pendingIndex int, err error)
// RegisterPendingFuture associates a future with a pending event so Commit
// can update the future's scheduled ID with the real ID from the store.
RegisterPendingFuture(pendingIndex int, f ScheduledIDReceiver)
Commit(ctx context.Context, scheduledAtTask int) error
PendingCount() int
}
type Registrar ¶
type Registrar interface {
// contains filtered or unexported methods
}
Registrar allows registration of workflows and activities. Sealed via unexported methods - use RegisterWorkflow and RegisterActivity with an Engine instance.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the default WorkflowResolver and ActivityResolver implementation. It provides simple name-based lookup. Users can substitute custom resolvers for dynamic loading, versioning, multi-tenancy, or other routing strategies.
func (*Registry) ResolveActivity ¶
ResolveActivity implements ActivityResolver.
type ReplayResult ¶
type ReplayResult struct {
// NewEvents contains events generated beyond the recorded history.
// Empty for complete workflows replayed deterministically.
NewEvents []journal.Event
// Complete is true if the workflow reached a terminal state (Completed, Failed, or Cancelled).
Complete bool
// Result is the JSON-encoded result if Complete is true and workflow succeeded.
Result []byte
// Error is the workflow error if Complete is true and workflow failed.
Error error
}
ReplayResult contains the result of a successful replay.
func ReplayWithResult ¶
func ReplayWithResult(workflowFn any, events []journal.Event) (*ReplayResult, error)
ReplayWithResult replays a workflow and returns detailed result information.
type RetryPolicy ¶
type RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaxInterval time.Duration
MaxAttempts int // 0 = unlimited
NonRetryableErrors []journal.ErrorKind
}
RetryPolicy configures automatic retry behavior for activities.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy.
func NoRetryPolicy ¶
func NoRetryPolicy() RetryPolicy
NoRetryPolicy returns a policy that allows no retries.
func PolicyFromPayload ¶
func PolicyFromPayload(p *journal.RetryPolicyPayload) RetryPolicy
PolicyFromPayload converts a serialized payload back to a RetryPolicy.
func (RetryPolicy) NextDelay ¶
func (p RetryPolicy) NextDelay(attempt int) time.Duration
NextDelay calculates the backoff delay for the given attempt (1-indexed).
func (RetryPolicy) ShouldRetry ¶
func (p RetryPolicy) ShouldRetry(kind journal.ErrorKind, attempt int) bool
ShouldRetry returns true if the given error kind should be retried at the given attempt.
func (RetryPolicy) ToPayload ¶
func (p RetryPolicy) ToPayload() *journal.RetryPolicyPayload
ToPayload converts the policy to a serializable payload.
type RetryableError ¶
RetryableError provides hints for retry behavior.
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶
func (e *RetryableError) Unwrap() error
type ScheduledIDReceiver ¶
type ScheduledIDReceiver interface {
SetScheduledID(id int)
}
ScheduledIDReceiver is implemented by futures that need their scheduled ID updated after Commit assigns real event IDs.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(state ExecutionState, workflowFn func(Context), info WorkflowInfo, opts ...SchedulerOption) *Scheduler
type SchedulerOption ¶
type SchedulerOption func(*Scheduler)
func WithCodec ¶
func WithCodec(c Codec) SchedulerOption
func WithSchedulerRetryPolicy ¶
func WithSchedulerRetryPolicy(p RetryPolicy) SchedulerOption
type Selector ¶
type Selector struct {
// contains filtered or unexported fields
}
func NewSelector ¶
func NewSelector() *Selector
func (*Selector) HasPending ¶
type SignalChannel ¶
type SignalChannel[T any] struct { // contains filtered or unexported fields }
SignalChannel provides typed signal reception within a workflow. Signals arrive from external clients or other workflows.
func GetSignalChannel ¶
func GetSignalChannel[T any](ctx Context, name string) *SignalChannel[T]
GetSignalChannel returns a channel for receiving signals with the given name.
func (*SignalChannel[T]) Receive ¶
func (ch *SignalChannel[T]) Receive(ctx Context) (T, bool)
Receive blocks until a signal arrives, returning (value, true). During replay, returns signals in the same order they were originally received.
func (*SignalChannel[T]) ReceiveFuture ¶
func (ch *SignalChannel[T]) ReceiveFuture() Future[T]
ReceiveFuture returns a Future for use with Selector.
func (*SignalChannel[T]) TryReceive ¶
func (ch *SignalChannel[T]) TryReceive(ctx Context) (T, bool)
TryReceive returns a signal if one is available, without blocking. Returns (value, true) if a signal was available, (zero, false) otherwise.
type StubExecutionState ¶
type StubExecutionState struct {
// contains filtered or unexported fields
}
StubExecutionState is a minimal ExecutionState for unit tests. Use for scheduler tests and as a base for test harnesses.
func NewStubExecutionState ¶
func NewStubExecutionState() *StubExecutionState
func (*StubExecutionState) AddExternalEvent ¶
func (s *StubExecutionState) AddExternalEvent(ev journal.Event) int
AddExternalEvent appends an event as if it arrived from outside the workflow.
func (*StubExecutionState) Events ¶
func (s *StubExecutionState) Events() []journal.Event
Events returns all recorded events.
func (*StubExecutionState) GetByScheduledID ¶
func (s *StubExecutionState) GetByScheduledID(scheduledEventID int) []journal.Event
func (*StubExecutionState) GetCancelRequest ¶
func (s *StubExecutionState) GetCancelRequest() *journal.WorkflowCancelRequested
func (*StubExecutionState) GetSignals ¶
func (s *StubExecutionState) GetSignals(signalName string) []journal.SignalReceived
func (*StubExecutionState) NewEvents ¶
func (s *StubExecutionState) NewEvents() []journal.Event
NewEvents returns events generated by the scheduler (not external events).
func (*StubExecutionState) NewRecorder ¶
func (s *StubExecutionState) NewRecorder(afterEventID int) Recorder
type TimeoutPolicy ¶
type TimeoutPolicy struct {
ScheduleToStartTimeout time.Duration
StartToCloseTimeout time.Duration
ScheduleToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
}
TimeoutPolicy configures activity timeout behavior.
func (*TimeoutPolicy) ToPayload ¶
func (p *TimeoutPolicy) ToPayload() *journal.TimeoutPolicyPayload
ToPayload converts the policy to a serializable payload.
type WorkflowInfo ¶
func GetInfo ¶
func GetInfo(ctx Context) WorkflowInfo
GetInfo returns metadata about the currently executing workflow.
type WorkflowNotFoundError ¶
type WorkflowNotFoundError = journal.WorkflowNotFoundError
Alias for backward compat after moving to journal package.
type WorkflowResolver ¶
WorkflowResolver looks up workflow functions by name. Returned fn must have signature func(Context, I) (O, error) - use RegisterWorkflow to ensure this.
Source Files
¶
- activity.go
- activity_worker.go
- cancel.go
- channel.go
- child_workflow.go
- client.go
- codec.go
- context.go
- derecho.go
- engine.go
- execution_state.go
- fiber.go
- future.go
- memory_store.go
- registry.go
- replay.go
- resolver.go
- retry.go
- scheduler.go
- scheduler_cache.go
- selector.go
- signal.go
- stub_state.go
- timeout_worker.go
- timer_worker.go
- workflow_worker.go