derecho

package module
v0.0.0-...-d474bd5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: MIT Imports: 15 Imported by: 0

README

Derecho

Durable workflow execution through event sourcing. About 10k lines of Go.

Notice

This is an export from our internal monorepo. We use it in production and wanted to share it with the community.

No support. We will not respond to issues, questions, or feature requests.

No contributions. We will not accept pull requests. Fork it if you want to take it somewhere.

No stability guarantees. The API may change without warning. We version for ourselves.

Use it, learn from it, build on it. Just know you're on your own.

Introduction

Your function crashes. The machine dies. The process restarts.

Derecho replays your workflow from its event log and picks up exactly where it left off. Same variables, same state, same execution path.

func OrderWorkflow(ctx derecho.Context, order Order) (Receipt, error) {
    // Crash here? On restart, skip this. Result already in log.
    reserved := inventory.Execute(ctx, order.Items).Get(ctx)

    // Crash here? Payment result replays from log
    payment := charge.Execute(ctx, order.Payment).Get(ctx)

    // Weeks of downtime? Execution resumes at this exact point
    return ship.Execute(ctx, reserved, payment).Get(ctx)
}

Why Derecho

Durable execution, one database. Temporal wants multiple services, Cassandra clusters, UI servers, someone to keep it all running. Derecho embeds into your app. One binary. One database. Done.

Determinism is structural. Derecho runs one fiber at a time through cooperative yields. Can't call time.Now() and break replay. Scheduler controls execution.

Small enough to understand. 10k lines. Something always breaks. When it does, you read the code and figure out why. Takes an afternoon.

When Derecho Fits
  • Behind-the-firewall apps where you own the infrastructure
  • Single-region deploys (or active-passive failover)
  • Teams that want to own their workflow engine
  • Workflow counts in the millions
When It Doesn't
  • Multi-datacenter active-active (we don't coordinate across regions)
  • You need a workflow UI yesterday (we don't have one)
  • You need workflow versioning (planned, still cooking)
  • You'd rather pay Temporal (reasonable, they're good)

How It Works

Event sourcing. Every scheduling decision becomes an immutable event. Replay the log, get identical execution.

Cooperative fibers. Write straight-line code. Await, Sleep, Future.Get are yield points. Scheduler switches fibers there.

Determinism by construction. One scheduler loop. One fiber running at a time. Same events in, same events out. Always.

Storage

Derecho talks to a journal.Store interface. Pick your poison:

// Development and single-process: everything in memory
store := derecho.NewMemoryStore()

// Single node: SQLite (planned)
// store := derechosqlite.New(db)

// Multiple nodes: PostgreSQL (planned)
// store := derechopgx.New(pool)
MemoryStore

Ships with Derecho. Good for development, testing, and single-process deployments where you don't need persistence across restarts.

SQLite (Planned)

One server. WAL mode. Thousands of concurrent workflows on a laptop.

db, _ := sql.Open("sqlite", "workflows.db?_journal=WAL&_synchronous=NORMAL")
store := derechosqlite.New(db)

Good for edge deployments, single-server apps, anywhere "just run Postgres" is annoying.

PostgreSQL (Planned)

Multiple engine instances. Coordination through SELECT ... FOR UPDATE SKIP LOCKED. Worker affinity so workflows stick to nodes that have them cached.

pool, _ := pgxpool.New(ctx, "postgres://localhost/workflows")
store := derechopgx.New(pool,
    derechopgx.WithWorkerAffinity(true),
)

Good for anything that needs to survive a server dying.

Codecs

Derecho serializes workflow inputs, activity results, and signals. Interface is obvious:

type Codec interface {
    Encode(v any) ([]byte, error)
    Decode(data []byte, v any) error
}

Default is JSON. JSON is fine. Strong feelings about protobuf? Wire it up:

engine := derecho.NewEngine(store,
    derecho.WithEngineCodec(protoCodec{}),
)
Protobuf
type protoCodec struct{}

func (protoCodec) Encode(v any) ([]byte, error) {
    msg, ok := v.(proto.Message)
    if !ok {
        return nil, fmt.Errorf("not a proto.Message: %T", v)
    }
    return proto.Marshal(msg)
}

func (protoCodec) Decode(data []byte, v any) error {
    msg, ok := v.(proto.Message)
    if !ok {
        return fmt.Errorf("not a proto.Message: %T", v)
    }
    return proto.Unmarshal(data, msg)
}
Versioning

Codecs don't version. Your types do. Two ways:

Envelope with version tag:

type Envelope struct {
    Version int             `json:"v"`
    Payload json.RawMessage `json:"p"`
}

// Decode, switch on version, unmarshal the right type

Additive fields:

type Order struct {
    Items    []string `json:"items"`
    Priority int      `json:"priority,omitempty"` // New field, zero = default
}

Old events decode with Priority: 0. New code treats zero as "normal priority." Works.

Changing codecs mid-deployment means migrating stored events. Pick a codec early. Save yourself pain.

Activities

Activities are where side effects live. HTTP calls. Database writes. Emails. Anything touching the outside world.

// Typed reference. Use this in workflows.
var sendEmail = derecho.NewActivityRef[EmailReq, EmailResp]("send-email")

// Implementation. Runs on activity workers.
derecho.RegisterActivity(engine, "send-email", func(ctx context.Context, req EmailReq) (EmailResp, error) {
    return emailClient.Send(req)
})

// In your workflow
resp, err := sendEmail.Execute(ctx, req).Get(ctx)

Activities run outside the deterministic bubble. Results get recorded. On replay, recorded result comes back. Activity doesn't run twice.

Retries and Timeouts
resp, err := sendEmail.Execute(ctx, req,
    derecho.WithRetry(derecho.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaxInterval:        time.Minute,
        MaxAttempts:        5,
    }),
    derecho.WithScheduleToCloseTimeout(5*time.Minute),  // Total budget
    derecho.WithStartToCloseTimeout(30*time.Second),    // Per attempt
    derecho.WithHeartbeatTimeout(10*time.Second),       // Liveness check
).Get(ctx)

Timeouts:

  • ScheduleToClose: Total time from scheduling to completion, across all retries
  • StartToClose: Time for one attempt
  • ScheduleToStart: Time waiting for a worker to pick it up
  • Heartbeat: Silence before we assume the worker died
Heartbeats

Long-running activities should heartbeat. Otherwise can't tell if working or dead.

func processLargeFile(ctx context.Context, req FileReq) (Result, error) {
    for i, chunk := range chunks {
        process(chunk)
        derecho.Heartbeat(ctx, Progress{Chunk: i, Total: len(chunks)})
    }
    return Result{}, nil
}

Heartbeat progress gets saved. Activity crashes, next attempt resumes from checkpoint.

Controlling Retries

Activities can override retry behavior:

func myActivity(ctx context.Context, req Request) (Response, error) {
    err := doWork()
    if isFatal(err) {
        return Response{}, derecho.NonRetryable(err)  // Stop retrying
    }
    if needsBackoff(err) {
        return Response{}, derecho.RetryAfter(30*time.Second, err)  // Custom delay
    }
    return Response{}, err  // Normal retry with policy backoff
}

Parallelism

Spawn fibers with Go. Wait for conditions with Await. Fibers are cooperative. Safe to share memory because only one runs at a time.

func ProcessBatch(ctx derecho.Context, items []Item) ([]Result, error) {
    results := make([]Result, len(items))

    for i, item := range items {
        i, item := i, item
        derecho.Go(ctx, func(ctx derecho.Context) {
            results[i], _ = process.Execute(ctx, item).Get(ctx)
        })
    }

    derecho.Await(ctx, func() bool {
        for _, r := range results {
            if r == (Result{}) { return false }
        }
        return true
    })

    return results, nil
}
AwaitWithTimeout

Wait for a condition with a deadline:

approved := derecho.AwaitWithTimeout(ctx, 24*time.Hour, func() bool {
    return approvalReceived
})
if !approved {
    return errors.New("approval timed out")
}

Timers

Sleep survives restarts. Set timer for a week. Kill process. Restart three days later. Timer fires on schedule.

derecho.Sleep(ctx, 7 * 24 * time.Hour)

// Cancellable version
timer := derecho.NewTimer(ctx, 30 * time.Minute)
// ... later ...
derecho.CancelTimer(ctx, timer)

Workflow Context

Access workflow metadata and deterministic time:

// Deterministic time (replays return original execution time)
now := derecho.Now(ctx)

// Workflow metadata
info := derecho.GetInfo(ctx)
// Fields: WorkflowID, RunID, WorkflowType, StartTime

derecho.Now(ctx) returns logical time, not wall clock. Safe for replay. Use SideEffect if you need actual wall time recorded.

Signals

External events into workflows. User approves something. Webhook fires. Whatever.

func ApprovalWorkflow(ctx derecho.Context, req ApprovalReq) (bool, error) {
    approvals := derecho.GetSignalChannel[Approval](ctx, "approval")
    deadline := derecho.NewTimer(ctx, 72 * time.Hour)

    sel := derecho.NewSelector()
    derecho.AddFuture(sel, approvals.ReceiveFuture(), func(a Approval, _ error) {
        // Handle approval
    })
    derecho.AddFuture(sel, deadline, func(_ time.Time, _ error) {
        // Handle timeout
    })
    sel.Select(ctx)
    // ...
}

// From outside
client.SignalWorkflow(ctx, "workflow-123", "approval", Approval{Approved: true})

Child Workflows

Workflow section generates tons of events? Loops, fan-outs? Split it into child workflows. Each gets own event log. Parent stays small. Replays stay fast.

var processChunk = derecho.NewChildWorkflowRef[Chunk, ChunkResult]("process-chunk")

func BigWorkflow(ctx derecho.Context, data LargeDataset) (Summary, error) {
    var futures []derecho.Future[ChunkResult]

    for i, chunk := range data.Split(1000) {
        f := processChunk.Execute(ctx, fmt.Sprintf("chunk-%d", i), chunk,
            derecho.WithClosePolicy(journal.ParentClosePolicyAbandon),
        )
        futures = append(futures, f)
    }
    // ...
}

SideEffect

Non-deterministic operations too small for an activity. UUIDs. Wall-clock reads. Random numbers.

id := derecho.SideEffect(ctx, func() string {
    return uuid.New().String()
})
// On replay, returns same UUID

Continue-As-New

Long-running workflows accumulate events. Eventually replay gets slow. ContinueAsNew restarts with fresh history, carrying forward state.

func InfinitePoller(ctx derecho.Context, cursor string) error {
    for i := 0; i < 1000; i++ {
        result, _ := poll.Execute(ctx, cursor).Get(ctx)
        cursor = result.NextCursor
        derecho.Sleep(ctx, time.Minute)
    }
    return derecho.NewContinueAsNewError(cursor)
}

Running It

store := derecho.NewMemoryStore()
engine := derecho.NewEngine(store,
    derecho.WithWorkerID("worker-1"),
    derecho.WithDefaultRetryPolicy(defaultPolicy),
)

derecho.RegisterWorkflow(engine, "order", OrderWorkflow)
derecho.RegisterActivity(engine, "charge", chargeActivity)

client := engine.Client()
wf, _ := client.StartWorkflow(ctx, "order", "order-123", orderData)

go engine.Run(ctx)

var receipt Receipt
wf.Get(ctx, &receipt)
Graceful Shutdown

engine.Actor() returns (execute, interrupt) functions compatible with oklog/run or similar:

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

if err := engine.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
    log.Fatal(err)
}

Observability

Derecho logs through slog. Workflow ID, run ID, activity name in context. Something goes wrong at 3am, grep for the workflow.

For metrics, instrument the Store interface or wrap activity functions. No built-in metrics interface yet.

Testing

Unit Tests

derechotest stubs activities. Test workflow logic without I/O.

func TestOrderWorkflow(t *testing.T) {
    env := derechotest.New(t)

    env.StubActivity("reserve-inventory", InventoryResp{Reserved: true})
    env.StubActivity("charge-payment", PaymentResp{TransactionID: "txn-123"})
    env.StubActivity("ship-order", ShipResp{TrackingNumber: "track-456"})

    result, err := derechotest.Run(env, OrderWorkflow, Order{Items: []Item{{SKU: "ABC"}}})

    env.AssertActivityCalled("reserve-inventory")
    env.AssertActivityCalled("charge-payment")
}
Dynamic Stubs
env.StubActivityFunc("charge-payment", func(input json.RawMessage) (any, error) {
    var req PaymentReq
    json.Unmarshal(input, &req)
    if req.Amount > 10000 {
        return nil, errors.New("amount exceeds limit")
    }
    return PaymentResp{TransactionID: "txn-123"}, nil
})
Replay Testing

Catch non-determinism before production. Save event logs from real executions, replay against new code.

func TestOrderWorkflow_Replay(t *testing.T) {
    events := loadEvents(t, "testdata/order-workflow-v1.json")
    err := derecho.Replay(OrderWorkflow, events)
    if err != nil {
        t.Fatalf("workflow replay failed: %v", err)
    }
}

Run in CI. Every deploy.

The Rules

Workflows must be deterministic. Same inputs, same scheduling decisions.

Don't:

  • time.Now() : use derecho.Now(ctx)
  • rand.Int() : use derecho.SideEffect
  • Global mutable state
  • Goroutines : use derecho.Go
  • Direct I/O : use activities

Activities handle the real world. HTTP calls, database writes, file I/O. All activities. They fail, retry, timeout. Results are durable.

Replay catches mistakes. Workflow makes different decisions on replay, you get NondeterminismError. Good. Finds bugs before they corrupt your event log.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                         Engine                              │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   Workflow   │  │   Activity   │  │    Timer     │      │
│  │    Worker    │  │    Worker    │  │    Worker    │      │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘      │
│         │                 │                 │               │
│         └─────────────────┼─────────────────┘               │
│                           │                                 │
│                    ┌──────▼──────┐                          │
│                    │journal.Store│                          │
│                    └──────┬──────┘                          │
└───────────────────────────┼─────────────────────────────────┘
                            │
              ┌─────────────┼─────────────┐
              │             │             │
        ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
        │  SQLite   │ │PostgreSQL │ │  Memory   │
        └───────────┘ └───────────┘ └───────────┘

Four workers:

  • Workflow Worker: Loads history, replays scheduler, persists new events
  • Activity Worker: Runs activities, handles retries and timeouts
  • Timer Worker: Fires timers when due
  • Timeout Worker: Fails activities that exceed their timeout

Workers coordinate through Store. Multiple engines against PostgreSQL for horizontal scaling.


Derecho: Spanish for "straight" or "direct." Also a powerful straight-line windstorm.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCancelled = errors.New("derecho: workflow cancelled")
View Source
var ErrChannelClosed = errors.New("derecho: channel closed")

ErrChannelClosed is returned by ReceiveFuture.Get when the channel was closed.

View Source
var ErrContinuedAsNew = errors.New("derecho: workflow continued as new")
View Source
var ErrNotCompleted = errors.New("derecho: workflow not completed")

ErrNotCompleted is returned by Get with NonBlocking option when the workflow hasn't finished.

View Source
var ErrTimerCancelled = errors.New("derecho: timer cancelled")

ErrTimerCancelled is returned by Future.Get when the timer was cancelled.

Functions

func Await

func Await(ctx Context, checkFn func() bool)

Await blocks until checkFn returns true. Each call to checkFn is a yield point - the scheduler runs other fibers between checks.

func AwaitWithTimeout

func AwaitWithTimeout(ctx Context, timeout time.Duration, checkFn func() bool) bool

AwaitWithTimeout blocks until checkFn returns true or timeout elapses. Returns true if condition was met, false if timed out.

func BindWorkflowInput

func BindWorkflowInput(fn any, encodedInput []byte) func(Context)

BindWorkflowInput binds encoded input to a typed workflow, returning an executable func(Context). The returned function handles input unmarshaling and emits WorkflowCompleted/Failed events.

func CancelTimer

func CancelTimer(ctx Context, timer Future[time.Time]) error

CancelTimer cancels a pending timer. The timer's Future.Get will return ErrTimerCancelled. Returns an error if the timer has already fired or been cancelled.

func Go

func Go(ctx Context, goFn func(Context))

Go spawns a new fiber to run goFn concurrently within the workflow. Child fibers are cooperatively scheduled with the parent.

func Heartbeat

func Heartbeat(ctx context.Context, details any) error

Heartbeat signals activity liveness and optionally records progress. The details parameter is encoded and stored for checkpointing on retry. No-op if the activity was not configured with HeartbeatTimeout.

func NonRetryable

func NonRetryable(err error) error

NonRetryable wraps an error to indicate it should not be retried.

func Now

func Now(ctx Context) time.Time

Now returns the workflow's current logical time. This is deterministic during replay - it returns the time from when the workflow originally executed, not wall-clock time.

func RegisterActivity

func RegisterActivity[I, O any](r Registrar, name string, fn Activity[I, O]) error

RegisterActivity registers an activity function with compile-time type safety. The function must have signature: func(context.Context, I) (O, error)

func RegisterWorkflow

func RegisterWorkflow[I, O any](r Registrar, name string, wf Workflow[I, O]) error

RegisterWorkflow registers a workflow function with compile-time type safety. The function must have signature: func(Context, I) (O, error)

func Replay

func Replay(workflowFn any, events []journal.Event) error

Replay replays a workflow against recorded events. Returns NondeterminismError if workflow diverges from history. Works with complete and incomplete workflows.

func ReplayFromStore

func ReplayFromStore(ctx context.Context, store journal.Store, workflowID, runID string, workflowFn any) error

ReplayFromStore loads events from a store and replays.

func RetryAfter

func RetryAfter(delay time.Duration, err error) error

RetryAfter wraps an error with a specific delay before the next retry.

func RunWorker

func RunWorker(ctx context.Context, w Worker) error

func SideEffect

func SideEffect[T any](ctx Context, fn func() T) T

SideEffect executes fn and records its result. On replay, fn is not called - the recorded result is returned instead. Use for non-deterministic operations like generating UUIDs or reading wall-clock time.

func SignalExternalWorkflow

func SignalExternalWorkflow(ctx Context, targetWorkflowID, signalName string, payload any)

SignalExternalWorkflow sends a signal to another running workflow. This is fire-and-forget - no confirmation of delivery is provided.

func Sleep

func Sleep(ctx Context, d time.Duration)

Sleep pauses the workflow for duration d. The timer is durable - if the workflow replays, Sleep returns immediately once the original timer would have fired.

func WithActivityInfo

func WithActivityInfo(ctx context.Context, info ActivityInfo) context.Context

WithActivityInfo returns a context with ActivityInfo set. Intended for testing; production code uses automatic injection.

func WithHeartbeatRecorder

func WithHeartbeatRecorder(ctx context.Context, recorder func(details any) error) context.Context

WithHeartbeatRecorder returns a context with a custom heartbeat handler. Intended for testing; production code uses automatic injection.

Types

type Activity

type Activity[I, O any] func(context.Context, I) (O, error)

type ActivityInfo

type ActivityInfo struct {
	WorkflowID    string
	RunID         string
	ActivityName  string
	ScheduledAt   int       // Event ID - stable across retries
	Attempt       int       // 1-indexed; use with ScheduledAt for retry-aware idempotency
	ScheduledTime time.Time // When the activity was first scheduled
}

ActivityInfo provides execution context to running activities. Use this to build idempotency keys for external calls.

func GetActivityInfo

func GetActivityInfo(ctx context.Context) ActivityInfo

GetActivityInfo retrieves execution context from an activity's context. Returns zero value if called outside an activity.

func (ActivityInfo) IdempotencyKey

func (i ActivityInfo) IdempotencyKey() string

IdempotencyKey returns a stable key for this activity execution. Format: {workflowID}/{scheduledAt}/{attempt}

type ActivityOption

type ActivityOption func(*activityOptions)

ActivityOption configures activity execution.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(d time.Duration) ActivityOption

WithHeartbeatTimeout sets the maximum time between heartbeats. Activities must call Heartbeat periodically to reset this deadline.

func WithNoRetry

func WithNoRetry() ActivityOption

WithNoRetry disables retries for the activity.

func WithRetry

func WithRetry(policy RetryPolicy) ActivityOption

WithRetry sets the retry policy for the activity.

func WithScheduleToCloseTimeout

func WithScheduleToCloseTimeout(d time.Duration) ActivityOption

WithScheduleToCloseTimeout sets the maximum total time from scheduling to completion.

func WithScheduleToStartTimeout

func WithScheduleToStartTimeout(d time.Duration) ActivityOption

WithScheduleToStartTimeout sets the maximum time from scheduling to worker pickup.

func WithStartToCloseTimeout

func WithStartToCloseTimeout(d time.Duration) ActivityOption

WithStartToCloseTimeout sets the maximum time from worker start to completion.

type ActivityRef

type ActivityRef[I, O any] struct {
	// contains filtered or unexported fields
}

func NewActivityRef

func NewActivityRef[I, O any](name string) ActivityRef[I, O]

func (ActivityRef[I, O]) Execute

func (r ActivityRef[I, O]) Execute(ctx Context, input I, opts ...ActivityOption) Future[O]

func (ActivityRef[I, O]) Name

func (r ActivityRef[I, O]) Name() string

type ActivityResolver

type ActivityResolver interface {
	ResolveActivity(name string) (fn any, ok bool)
}

ActivityResolver looks up activity functions by name. Returned fn must have signature func(context.Context, I) (O, error) - use RegisterActivity to ensure this.

type CancelInfo

type CancelInfo struct {
	Reason      string
	RequestedAt time.Time
}

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel provides typed, fiber-safe communication within a workflow.

func NewBufferedChannel

func NewBufferedChannel[T any](ctx Context, capacity int) *Channel[T]

NewBufferedChannel creates a buffered channel with the given capacity.

func NewChannel

func NewChannel[T any](ctx Context) *Channel[T]

NewChannel creates an unbuffered channel.

func (*Channel[T]) Close

func (ch *Channel[T]) Close(ctx Context)

Close closes the channel.

func (*Channel[T]) Receive

func (ch *Channel[T]) Receive(ctx Context) (T, bool)

Receive receives a value. Returns (zero, false) if closed.

func (*Channel[T]) ReceiveFuture

func (ch *Channel[T]) ReceiveFuture() Future[T]

ReceiveFuture returns a Future for use with Selector.

func (*Channel[T]) Send

func (ch *Channel[T]) Send(ctx Context, value T)

Send sends a value on the channel. Panics if closed.

type ChildWorkflowOption

type ChildWorkflowOption func(*childWorkflowOptions)

ChildWorkflowOption configures child workflow execution.

func WithClosePolicy

func WithClosePolicy(policy journal.ParentClosePolicy) ChildWorkflowOption

WithClosePolicy sets the parent close policy for the child workflow.

type ChildWorkflowRef

type ChildWorkflowRef[I, O any] struct {
	// contains filtered or unexported fields
}

func NewChildWorkflowRef

func NewChildWorkflowRef[I, O any](name string) ChildWorkflowRef[I, O]

func (ChildWorkflowRef[I, O]) Execute

func (r ChildWorkflowRef[I, O]) Execute(ctx Context, workflowID string, input I, opts ...ChildWorkflowOption) Future[O]

Execute schedules a child workflow and returns a future for its result. The workflowID must be provided explicitly for deduplication.

func (ChildWorkflowRef[I, O]) Name

func (r ChildWorkflowRef[I, O]) Name() string

type Client

type Client interface {
	StartWorkflow(ctx context.Context, workflowType, workflowID string, input any) (Run, error)
	GetWorkflow(ctx context.Context, workflowID, runID string) (Run, error)
	SignalWorkflow(ctx context.Context, workflowID, signalName string, payload any) error
	CancelWorkflow(ctx context.Context, workflowID, reason string) error
	ListWorkflows(ctx context.Context, opts ...journal.ListWorkflowsOption) (*journal.ListWorkflowsResult, error)
}

type Clock

type Clock interface {
	Now() time.Time
}

Clock abstracts time for deterministic testing.

type Codec

type Codec interface {
	Encode(v any) ([]byte, error)
	Decode(data []byte, v any) error
}
var DefaultCodec Codec = jsonCodec{}

type Context

type Context interface {
	Err() error
	// contains filtered or unexported methods
}

Context is a sealed interface for workflow execution. The unexported method prevents external implementation, so internal type assertions to capability interfaces (yielder, spawner, etc) are safe - only workflowContext exists.

type ContinueAsNewError

type ContinueAsNewError struct {
	Input any
}

ContinueAsNewError restarts the workflow with fresh history. Use to avoid unbounded event accumulation in long-running workflows.

func NewContinueAsNewError

func NewContinueAsNewError(input any) *ContinueAsNewError

func (*ContinueAsNewError) Error

func (e *ContinueAsNewError) Error() string

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(store journal.Store, opts ...EngineOption) (*Engine, error)

func (*Engine) ActivityWorker

func (e *Engine) ActivityWorker() *activityWorker

func (*Engine) Actor

func (e *Engine) Actor() (execute func() error, interrupt func(error))

Actor returns execute and interrupt functions for use with run.Group.

func (*Engine) Client

func (e *Engine) Client() Client

func (*Engine) Run

func (e *Engine) Run(ctx context.Context) error

Run starts all workers and blocks until ctx is cancelled or a worker fails.

func (*Engine) TimeoutWorker

func (e *Engine) TimeoutWorker() *timeoutWorker

func (*Engine) TimerWorker

func (e *Engine) TimerWorker() *timerWorker

func (*Engine) WorkerID

func (e *Engine) WorkerID() string

func (*Engine) WorkflowWorker

func (e *Engine) WorkflowWorker() *workflowWorker

type EngineOption

type EngineOption func(*engineConfig)

func WithActivityResolver

func WithActivityResolver(r ActivityResolver) EngineOption

WithActivityResolver sets a custom activity resolver. When set, RegisterActivity cannot be used on this engine.

func WithCacheSize

func WithCacheSize(size int) EngineOption

func WithClock

func WithClock(clock Clock) EngineOption

func WithDefaultRetryPolicy

func WithDefaultRetryPolicy(p RetryPolicy) EngineOption

WithDefaultRetryPolicy sets the default retry policy for activities.

func WithEngineCodec

func WithEngineCodec(codec Codec) EngineOption

func WithEngineLogger

func WithEngineLogger(l *slog.Logger) EngineOption

WithEngineLogger sets the logger for the engine and its workers.

func WithWorkerConcurrency

func WithWorkerConcurrency(n int) EngineOption

WithWorkerConcurrency sets the number of goroutines per worker type.

func WithWorkerID

func WithWorkerID(id string) EngineOption

func WithWorkflowResolver

func WithWorkflowResolver(r WorkflowResolver) EngineOption

WithWorkflowResolver sets a custom workflow resolver. When set, RegisterWorkflow cannot be used on this engine.

func WithWorkflowTaskTimeout

func WithWorkflowTaskTimeout(d time.Duration) EngineOption

WithWorkflowTaskTimeout sets the lease duration for workflow tasks. Expired tasks are released for reprocessing; leases are not renewed.

type ExecutionState

type ExecutionState interface {
	NewRecorder(workflowTaskScheduledID int) Recorder
	GetByScheduledID(scheduledEventID int) []journal.Event
	GetSignals(signalName string) []journal.SignalReceived
	GetCancelRequest() *journal.WorkflowCancelRequested
}

ExecutionState feeds events to the scheduler and manages visibility for deterministic replay of a single workflow execution.

type FakeClock

type FakeClock struct {
	// contains filtered or unexported fields
}

FakeClock provides controllable time for tests.

func NewFakeClock

func NewFakeClock(t time.Time) *FakeClock

func (*FakeClock) Advance

func (c *FakeClock) Advance(d time.Duration)

func (*FakeClock) Now

func (c *FakeClock) Now() time.Time

func (*FakeClock) Set

func (c *FakeClock) Set(t time.Time)

type Future

type Future[T any] interface {
	Get(ctx Context) (T, error)
}

func Cancelled

func Cancelled(ctx Context) Future[CancelInfo]

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future[time.Time]

NewTimer schedules a timer that fires after duration d. Returns a Future that resolves to the time when the timer fired.

type GetOption

type GetOption func(*getOptions)

GetOption configures Get behavior.

func NonBlocking

func NonBlocking() GetOption

NonBlocking returns ErrNotCompleted instead of blocking if the workflow hasn't finished.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore

func NewMemoryStore(opts ...MemoryStoreOption) *MemoryStore

func (*MemoryStore) Append

func (ms *MemoryStore) Append(ctx context.Context, workflowID, runID string, events []journal.Event, scheduledByEventID int) ([]int, error)

func (*MemoryStore) CancelWorkflow

func (ms *MemoryStore) CancelWorkflow(ctx context.Context, workflowID, reason string) error

func (*MemoryStore) CreateWorkflow

func (ms *MemoryStore) CreateWorkflow(ctx context.Context, workflowID, workflowType string, input []byte, startedAt time.Time) (string, error)

func (*MemoryStore) GetStatus

func (ms *MemoryStore) GetStatus(ctx context.Context, workflowID, runID string) (journal.WorkflowStatus, error)

func (*MemoryStore) GetTimedOutActivities

func (ms *MemoryStore) GetTimedOutActivities(ctx context.Context, now time.Time) ([]journal.TimedOutActivity, error)

func (*MemoryStore) GetTimersToFire

func (ms *MemoryStore) GetTimersToFire(ctx context.Context, now time.Time) ([]journal.PendingTimerTask, error)

func (*MemoryStore) ListWorkflows

func (*MemoryStore) Load

func (ms *MemoryStore) Load(ctx context.Context, workflowID, runID string) ([]journal.Event, error)

func (*MemoryStore) LoadFrom

func (ms *MemoryStore) LoadFrom(ctx context.Context, workflowID, runID string, afterEventID int) ([]journal.Event, error)

func (*MemoryStore) RecordHeartbeat

func (ms *MemoryStore) RecordHeartbeat(ctx context.Context, workflowID, runID string, scheduledAt int, details []byte) error

func (*MemoryStore) ReleaseExpiredWorkflowTasks

func (ms *MemoryStore) ReleaseExpiredWorkflowTasks(ctx context.Context, now time.Time, timeout time.Duration) (int, error)

func (*MemoryStore) RequeueForRetry

func (ms *MemoryStore) RequeueForRetry(ctx context.Context, workflowID, runID string, scheduledAt int, info journal.RequeueInfo) error

func (*MemoryStore) SignalWorkflow

func (ms *MemoryStore) SignalWorkflow(ctx context.Context, workflowID, signalName string, payload []byte) error

func (*MemoryStore) WaitForActivityTasks

func (ms *MemoryStore) WaitForActivityTasks(ctx context.Context, workerID string, maxActivities int) ([]journal.PendingActivityTask, error)

func (*MemoryStore) WaitForCompletion

func (ms *MemoryStore) WaitForCompletion(ctx context.Context, workflowID, runID string) (journal.Event, error)

func (*MemoryStore) WaitForWorkflowTasks

func (ms *MemoryStore) WaitForWorkflowTasks(ctx context.Context, workerID string, maxNew int) ([]journal.PendingWorkflowTask, error)

type MemoryStoreOption

type MemoryStoreOption func(*MemoryStore)

func WithIDGenerator

func WithIDGenerator(gen func() string) MemoryStoreOption

WithIDGenerator sets the function used to generate run IDs.

func WithStoreClock

func WithStoreClock(clock Clock) MemoryStoreOption

type NonRetryableError

type NonRetryableError struct {
	Err error
}

NonRetryableError marks an error as permanent - no retries will be attempted.

func (*NonRetryableError) Error

func (e *NonRetryableError) Error() string

func (*NonRetryableError) Unwrap

func (e *NonRetryableError) Unwrap() error

type NondeterminismError

type NondeterminismError struct {
	WorkflowID string
	RunID      string
	EventSeq   int
	Expected   string
	Got        string
}

func (*NondeterminismError) Error

func (e *NondeterminismError) Error() string

type RealClock

type RealClock struct{}

func (RealClock) Now

func (RealClock) Now() time.Time

type ReceiveFuture

type ReceiveFuture[T any] struct {
	// contains filtered or unexported fields
}

ReceiveFuture adapts a channel receive for Selector integration.

func (*ReceiveFuture[T]) Get

func (f *ReceiveFuture[T]) Get(ctx Context) (T, error)

Get implements Future[T].

type Recorder

type Recorder interface {
	// Record returns the event and its index in pending. During replay, index
	// is -1 since the event already has the correct ID. During first execution,
	// index identifies this event for RegisterPendingFuture.
	Record(eventType string, generate func() journal.Event) (event journal.Event, pendingIndex int, err error)
	// RegisterPendingFuture associates a future with a pending event so Commit
	// can update the future's scheduled ID with the real ID from the store.
	RegisterPendingFuture(pendingIndex int, f ScheduledIDReceiver)
	Commit(ctx context.Context, scheduledAtTask int) error
	PendingCount() int
}

type Registrar

type Registrar interface {
	// contains filtered or unexported methods
}

Registrar allows registration of workflows and activities. Sealed via unexported methods - use RegisterWorkflow and RegisterActivity with an Engine instance.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the default WorkflowResolver and ActivityResolver implementation. It provides simple name-based lookup. Users can substitute custom resolvers for dynamic loading, versioning, multi-tenancy, or other routing strategies.

func (*Registry) ResolveActivity

func (r *Registry) ResolveActivity(name string) (any, bool)

ResolveActivity implements ActivityResolver.

func (*Registry) ResolveWorkflow

func (r *Registry) ResolveWorkflow(name string) (any, bool)

ResolveWorkflow implements WorkflowResolver.

type ReplayResult

type ReplayResult struct {
	// NewEvents contains events generated beyond the recorded history.
	// Empty for complete workflows replayed deterministically.
	NewEvents []journal.Event

	// Complete is true if the workflow reached a terminal state (Completed, Failed, or Cancelled).
	Complete bool

	// Result is the JSON-encoded result if Complete is true and workflow succeeded.
	Result []byte

	// Error is the workflow error if Complete is true and workflow failed.
	Error error
}

ReplayResult contains the result of a successful replay.

func ReplayWithResult

func ReplayWithResult(workflowFn any, events []journal.Event) (*ReplayResult, error)

ReplayWithResult replays a workflow and returns detailed result information.

type RetryPolicy

type RetryPolicy struct {
	InitialInterval    time.Duration
	BackoffCoefficient float64
	MaxInterval        time.Duration
	MaxAttempts        int // 0 = unlimited
	NonRetryableErrors []journal.ErrorKind
}

RetryPolicy configures automatic retry behavior for activities.

func DefaultRetryPolicy

func DefaultRetryPolicy() RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy.

func NoRetryPolicy

func NoRetryPolicy() RetryPolicy

NoRetryPolicy returns a policy that allows no retries.

func PolicyFromPayload

func PolicyFromPayload(p *journal.RetryPolicyPayload) RetryPolicy

PolicyFromPayload converts a serialized payload back to a RetryPolicy.

func (RetryPolicy) NextDelay

func (p RetryPolicy) NextDelay(attempt int) time.Duration

NextDelay calculates the backoff delay for the given attempt (1-indexed).

func (RetryPolicy) ShouldRetry

func (p RetryPolicy) ShouldRetry(kind journal.ErrorKind, attempt int) bool

ShouldRetry returns true if the given error kind should be retried at the given attempt.

func (RetryPolicy) ToPayload

func (p RetryPolicy) ToPayload() *journal.RetryPolicyPayload

ToPayload converts the policy to a serializable payload.

type RetryableError

type RetryableError struct {
	Err       error
	NextDelay time.Duration // 0 = use policy default
}

RetryableError provides hints for retry behavior.

func (*RetryableError) Error

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap

func (e *RetryableError) Unwrap() error

type Run

type Run interface {
	ID() string
	RunID() string
	Get(ctx context.Context, result any, opts ...GetOption) error
}

type ScheduledIDReceiver

type ScheduledIDReceiver interface {
	SetScheduledID(id int)
}

ScheduledIDReceiver is implemented by futures that need their scheduled ID updated after Commit assigns real event IDs.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(state ExecutionState, workflowFn func(Context), info WorkflowInfo, opts ...SchedulerOption) *Scheduler

func (*Scheduler) Advance

func (s *Scheduler) Advance(ctx context.Context, scheduledAtTask int, workflowTime time.Time) error

Advance runs the scheduler until quiescence or completion. scheduledAtTask is the ID of the WorkflowTaskScheduled event that triggered this advance.

func (*Scheduler) Close

func (s *Scheduler) Close()

type SchedulerOption

type SchedulerOption func(*Scheduler)

func WithCodec

func WithCodec(c Codec) SchedulerOption

func WithSchedulerRetryPolicy

func WithSchedulerRetryPolicy(p RetryPolicy) SchedulerOption

type Selector

type Selector struct {
	// contains filtered or unexported fields
}

func AddFuture

func AddFuture[T any](s *Selector, f Future[T], callback func(T, error)) *Selector

func NewSelector

func NewSelector() *Selector

func (*Selector) HasPending

func (s *Selector) HasPending() bool

func (*Selector) Select

func (s *Selector) Select(ctx Context)

type SignalChannel

type SignalChannel[T any] struct {
	// contains filtered or unexported fields
}

SignalChannel provides typed signal reception within a workflow. Signals arrive from external clients or other workflows.

func GetSignalChannel

func GetSignalChannel[T any](ctx Context, name string) *SignalChannel[T]

GetSignalChannel returns a channel for receiving signals with the given name.

func (*SignalChannel[T]) Receive

func (ch *SignalChannel[T]) Receive(ctx Context) (T, bool)

Receive blocks until a signal arrives, returning (value, true). During replay, returns signals in the same order they were originally received.

func (*SignalChannel[T]) ReceiveFuture

func (ch *SignalChannel[T]) ReceiveFuture() Future[T]

ReceiveFuture returns a Future for use with Selector.

func (*SignalChannel[T]) TryReceive

func (ch *SignalChannel[T]) TryReceive(ctx Context) (T, bool)

TryReceive returns a signal if one is available, without blocking. Returns (value, true) if a signal was available, (zero, false) otherwise.

type StubExecutionState

type StubExecutionState struct {
	// contains filtered or unexported fields
}

StubExecutionState is a minimal ExecutionState for unit tests. Use for scheduler tests and as a base for test harnesses.

func NewStubExecutionState

func NewStubExecutionState() *StubExecutionState

func (*StubExecutionState) AddExternalEvent

func (s *StubExecutionState) AddExternalEvent(ev journal.Event) int

AddExternalEvent appends an event as if it arrived from outside the workflow.

func (*StubExecutionState) Events

func (s *StubExecutionState) Events() []journal.Event

Events returns all recorded events.

func (*StubExecutionState) GetByScheduledID

func (s *StubExecutionState) GetByScheduledID(scheduledEventID int) []journal.Event

func (*StubExecutionState) GetCancelRequest

func (s *StubExecutionState) GetCancelRequest() *journal.WorkflowCancelRequested

func (*StubExecutionState) GetSignals

func (s *StubExecutionState) GetSignals(signalName string) []journal.SignalReceived

func (*StubExecutionState) NewEvents

func (s *StubExecutionState) NewEvents() []journal.Event

NewEvents returns events generated by the scheduler (not external events).

func (*StubExecutionState) NewRecorder

func (s *StubExecutionState) NewRecorder(afterEventID int) Recorder

type TimeoutPolicy

type TimeoutPolicy struct {
	ScheduleToStartTimeout time.Duration
	StartToCloseTimeout    time.Duration
	ScheduleToCloseTimeout time.Duration
	HeartbeatTimeout       time.Duration
}

TimeoutPolicy configures activity timeout behavior.

func (*TimeoutPolicy) ToPayload

func (p *TimeoutPolicy) ToPayload() *journal.TimeoutPolicyPayload

ToPayload converts the policy to a serializable payload.

type Worker

type Worker interface {
	Process(ctx context.Context) error
}

type Workflow

type Workflow[I, O any] func(Context, I) (O, error)

type WorkflowInfo

type WorkflowInfo struct {
	WorkflowID   string
	RunID        string
	WorkflowType string
	StartTime    time.Time
}

func GetInfo

func GetInfo(ctx Context) WorkflowInfo

GetInfo returns metadata about the currently executing workflow.

type WorkflowNotFoundError

type WorkflowNotFoundError = journal.WorkflowNotFoundError

Alias for backward compat after moving to journal package.

type WorkflowResolver

type WorkflowResolver interface {
	ResolveWorkflow(name string) (fn any, ok bool)
}

WorkflowResolver looks up workflow functions by name. Returned fn must have signature func(Context, I) (O, error) - use RegisterWorkflow to ensure this.

Directories

Path Synopsis
storetest
Package storetest provides conformance tests for journal.Store implementations.
Package storetest provides conformance tests for journal.Store implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL