pgstore

package module
v0.0.0-...-860e172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package pgstore provides PostgreSQL-backed stores for events, snapshots, and checkpoints.

PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. After each event append, NOTIFY is sent on the configured channel with the latest global sequence. Listeners wake instantly instead of polling.

Package pgstore provides a PostgreSQL-backed EventStore implementation using pgx/v5.

Events are stored in a single table with JSONB data, using transaction-based optimistic concurrency control. The UNIQUE(stream_id, version) constraint acts as a final safety net against duplicate versions.

Index

Constants

View Source
const (
	// DefaultLeaseTimeout is how long a partition lease is valid without renewal.
	DefaultLeaseTimeout = 30 * time.Second

	// DefaultRenewInterval is how often to renew the lease.
	DefaultRenewInterval = 10 * time.Second

	// DefaultPartitionCount is the default number of partitions.
	DefaultPartitionCount = 8
)
View Source
const (
	// DefaultChangeChannel is the default PG NOTIFY channel for projection changes.
	DefaultChangeChannel = "eskit_changes"
)
View Source
const (
	// DefaultChannel is the default NOTIFY channel name.
	DefaultChannel = "eskit_events"
)

Variables

This section is empty.

Functions

func EnsureSchema

func EnsureSchema(ctx context.Context, pool *pgxpool.Pool, name string) error

EnsureSchema creates a Postgres schema if it doesn't exist. Safe to call on every startup (idempotent).

Use this to set up the recommended two-schema layout:

pgstore.EnsureSchema(ctx, pool, "eventstore")  // events, checkpoints, command log
pgstore.EnsureSchema(ctx, pool, "readmodel")   // all projections (disposable)

Then configure separate connection pools with search_path:

eventPool: postgres://user:pass@host/db?search_path=eventstore
readPool:  postgres://user:pass@host/db?search_path=readmodel

All eskit queries work unchanged — Postgres resolves table names via search_path.

func NewPool

func NewPool(ctx context.Context, connString string, opts ...PoolOption) (*pgxpool.Pool, error)

NewPool creates a production-tuned pgxpool.Pool with sensible defaults for event sourcing workloads.

Use this instead of pgxpool.New for production deployments. All defaults can be overridden via options:

// Event store pool (eventstore schema)
eventPool, err := pgstore.NewPool(ctx, connStr,
    pgstore.WithSchema("eventstore"),
)

// Read model pool (readmodel schema, more connections for queries)
readPool, err := pgstore.NewPool(ctx, connStr,
    pgstore.WithSchema("readmodel"),
    pgstore.WithMaxConns(20),
)

func NotifyAppend

func NotifyAppend(ctx context.Context, pool *pgxpool.Pool, channel string, sequence int64) error

NotifyAppend sends a NOTIFY with the given global sequence. Call this after appending events (inside or outside the transaction).

func PoolWithSchema

func PoolWithSchema(ctx context.Context, connString string, schema string) (*pgxpool.Pool, error)

PoolWithSchema creates a new connection pool with search_path set to the given schema. This is a convenience wrapper around pgxpool.New that appends search_path to the connection string.

Example:

eventPool, err := pgstore.PoolWithSchema(ctx, "postgres://user:pass@host/db", "eventstore")
readPool, err := pgstore.PoolWithSchema(ctx, "postgres://user:pass@host/db", "readmodel")

The pool's search_path is set to "<schema>, public" so both schema-local and public tables are accessible. All eskit components (store, checkpoint, pgview) work unchanged — just pass the right pool.

Types

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore is a PostgreSQL-backed checkpoint for event subscriptions. Tracks each consumer's last processed global sequence number. Thread-safe: PostgreSQL handles concurrent access.

func NewCheckpointStore

func NewCheckpointStore(ctx context.Context, pool *pgxpool.Pool) (*CheckpointStore, error)

NewCheckpointStore creates a PostgreSQL-backed checkpoint store. Creates the checkpoints table if it doesn't exist.

func (*CheckpointStore) Load

func (c *CheckpointStore) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*CheckpointStore) Save

func (c *CheckpointStore) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically via upsert.

type ClusterOption

type ClusterOption func(*clusterConfig)

ClusterOption configures a ClusterStore.

func WithPool

func WithPool(pool *pgxpool.Pool) ClusterOption

WithPool sets a single pool for both reads and writes. Convenience for simple deployments without read replicas.

func WithReadPool

func WithReadPool(pool *pgxpool.Pool) ClusterOption

WithReadPool sets the replica pool for read operations. If not set, reads use the write pool.

func WithWritePool

func WithWritePool(pool *pgxpool.Pool) ClusterOption

WithWritePool sets the primary pool for write operations.

type ClusterStore

type ClusterStore[E any] struct {
	// contains filtered or unexported fields
}

ClusterStore is a PostgreSQL event store with read/write pool splitting. Writes go to the primary, reads can go to replicas for horizontal read scaling.

Compatible with:

  • Standard PostgreSQL (primary + replicas)
  • Citus (distributed Postgres)
  • Neon (serverless Postgres)
  • YugabyteDB (Postgres wire protocol)
  • CockroachDB (Postgres wire protocol — see advisory lock notes)

For CockroachDB: advisory locks are not supported. Use the CAS-based optimistic concurrency (the UNIQUE constraint) as the sole safety net, or use a separate locks table.

func NewClusterStore

func NewClusterStore[E any](ctx context.Context, opts ...ClusterOption) (*ClusterStore[E], error)

NewClusterStore creates a PostgreSQL event store with read/write pool splitting. Runs migration on the write pool.

func (*ClusterStore[E]) Append

func (s *ClusterStore[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

Append persists events using the WRITE pool (primary).

func (*ClusterStore[E]) Close

func (s *ClusterStore[E]) Close()

Close closes both connection pools.

func (*ClusterStore[E]) Load

func (s *ClusterStore[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

Load returns all events using the READ pool (replica).

func (*ClusterStore[E]) LoadFrom

func (s *ClusterStore[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

LoadFrom returns events from a version using the READ pool (replica).

func (*ClusterStore[E]) Ping

func (s *ClusterStore[E]) Ping(ctx context.Context) error

Ping checks connectivity to both write and read pools.

type GlobalEvent

type GlobalEvent[E any] struct {
	GlobalSequence uint64
	StreamID       string
	EventType      string
	Version        int
	Data           E
	Timestamp      time.Time
}

GlobalEvent is a store-wide event with a global sequence number. Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.

type GlobalReaderOption

type GlobalReaderOption[E any] func(*PGGlobalReader[E])

GlobalReaderOption configures a PGGlobalReader.

func WithGlobalReaderCodecRegistry

func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E]

WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.

func WithGlobalReaderRegistry

func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E]

WithGlobalReaderRegistry enables type registry for heterogeneous global reads.

type PGChangeRelay

type PGChangeRelay struct {
	// contains filtered or unexported fields
}

PGChangeRelay broadcasts projection changes across server instances using PostgreSQL LISTEN/NOTIFY. Use for shared-consumer projections where only one server processes each event and other servers need to be notified.

Flow:

  1. Server A processes event → OnChange fires → Broadcast() sends PG NOTIFY
  2. All servers (including A) receive the NOTIFY via Start()
  3. Each server forwards the change to its local ChangeNotifier
  4. SSE handlers on all servers get notified

Safe for concurrent use.

func NewPGChangeRelay

func NewPGChangeRelay(pool *pgxpool.Pool, channel string, notifier *eskit.ChangeNotifier, opts ...PGChangeRelayOption) *PGChangeRelay

NewPGChangeRelay creates a relay that bridges ChangeNotifier across servers using PostgreSQL LISTEN/NOTIFY.

The notifier receives all changes detected by this relay (from any server). Call Start() to begin listening.

func (*PGChangeRelay) Broadcast

func (r *PGChangeRelay) Broadcast(change eskit.Change)

Broadcast sends a change notification to all servers via PG NOTIFY. Also notifies the local ChangeNotifier directly for immediate delivery to SSE handlers on this server.

Non-blocking: the PG NOTIFY is fire-and-forget. If the database connection is temporarily lost, the notification is dropped (SSE handlers will catch up on the next successful notification).

func (*PGChangeRelay) Close

func (r *PGChangeRelay) Close() error

Close stops the relay.

func (*PGChangeRelay) Start

func (r *PGChangeRelay) Start(ctx context.Context)

Start begins listening for PG NOTIFY messages and forwarding them to the local ChangeNotifier. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.

func (*PGChangeRelay) Wait

func (r *PGChangeRelay) Wait()

Wait blocks until the relay's listen loop has fully stopped.

type PGChangeRelayOption

type PGChangeRelayOption func(*PGChangeRelay)

PGChangeRelayOption configures a PGChangeRelay.

func WithBroadcastTimeout

func WithBroadcastTimeout(d time.Duration) PGChangeRelayOption

WithBroadcastTimeout sets the timeout for each PG NOTIFY broadcast. Default: 2s.

func WithReconnectDelay

func WithReconnectDelay(d time.Duration) PGChangeRelayOption

WithReconnectDelay sets the delay between reconnection attempts when the PostgreSQL LISTEN connection is lost. Default: 1s.

type PGCheckpoint

type PGCheckpoint struct {
	// contains filtered or unexported fields
}

PGCheckpoint implements subscription.Checkpoint backed by PostgreSQL.

func NewPGCheckpoint

func NewPGCheckpoint(ctx context.Context, pool *pgxpool.Pool) (*PGCheckpoint, error)

NewPGCheckpoint creates a PostgreSQL-backed checkpoint store.

func (*PGCheckpoint) Load

func (c *PGCheckpoint) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*PGCheckpoint) Save

func (c *PGCheckpoint) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically.

type PGGlobalReader

type PGGlobalReader[E any] struct {
	// contains filtered or unexported fields
}

PGGlobalReader implements subscription.GlobalReader for PostgreSQL. Reads events by global sequence (the events.id column).

func NewPGGlobalReader

func NewPGGlobalReader[E any](pool *pgxpool.Pool, opts ...GlobalReaderOption[E]) *PGGlobalReader[E]

NewPGGlobalReader creates a global reader backed by a pgxpool.

func (*PGGlobalReader[E]) LatestSequence

func (r *PGGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence in the store, or 0 if empty.

func (*PGGlobalReader[E]) ReadFrom

func (r *PGGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

ReadFrom returns events starting from the given global sequence (inclusive), up to limit.

type PGNotifier

type PGNotifier struct {
	// contains filtered or unexported fields
}

PGNotifier implements subscription.StoreNotifier using PostgreSQL LISTEN/NOTIFY. Auto-reconnects on connection loss. Safe for concurrent use.

func NewPGNotifier

func NewPGNotifier(pool *pgxpool.Pool, channel string) *PGNotifier

NewPGNotifier creates a PGNotifier that listens on the given channel. Call Start() to begin listening, or pass to subscription as a StoreNotifier.

func (*PGNotifier) Close

func (n *PGNotifier) Close() error

Close stops the notifier and closes all listener channels.

func (*PGNotifier) Notify

func (n *PGNotifier) Notify(ctx context.Context) <-chan uint64

Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.

func (*PGNotifier) Start

func (n *PGNotifier) Start(ctx context.Context)

Start begins listening for NOTIFY messages. Blocks until ctx is cancelled or Close is called. Auto-reconnects on connection loss.

func (*PGNotifier) Wait

func (n *PGNotifier) Wait()

Wait blocks until the notifier's listen loop has fully stopped.

type PartitionManager

type PartitionManager struct {
	// contains filtered or unexported fields
}

PartitionManager manages competing partition assignment using PostgreSQL. Instances claim partitions with SELECT ... FOR UPDATE SKIP LOCKED and renew leases periodically. Dead instances' partitions are auto-reclaimed.

func NewPartitionManager

func NewPartitionManager(ctx context.Context, pool *pgxpool.Pool, instanceID string, opts ...PartitionOption) (*PartitionManager, error)

NewPartitionManager creates a partition manager. Runs migration to create the partitions table and seed initial partition rows.

func (*PartitionManager) Close

func (pm *PartitionManager) Close() error

Close stops the partition manager and releases all partitions.

func (*PartitionManager) OwnedPartitions

func (pm *PartitionManager) OwnedPartitions() []int

OwnedPartitions returns the partitions currently owned by this instance.

func (*PartitionManager) OwnsPartition

func (pm *PartitionManager) OwnsPartition(partitionID int) bool

OwnsPartition returns true if this instance owns the given partition.

func (*PartitionManager) PartitionForStream

func (pm *PartitionManager) PartitionForStream(streamID string) int

PartitionForStream maps a stream ID to a partition using consistent hashing.

func (*PartitionManager) Start

func (pm *PartitionManager) Start(ctx context.Context)

Start begins claiming and renewing partitions. Blocks until ctx is cancelled or Close.

func (*PartitionManager) Wait

func (pm *PartitionManager) Wait()

Wait blocks until the partition manager has stopped.

type PartitionOption

type PartitionOption func(*PartitionManager)

PartitionOption configures a PartitionManager.

func WithLeaseTimeout

func WithLeaseTimeout(d time.Duration) PartitionOption

WithLeaseTimeout sets the lease timeout.

func WithPartitionCount

func WithPartitionCount(n int) PartitionOption

WithPartitionCount sets the number of partitions.

func WithRenewInterval

func WithRenewInterval(d time.Duration) PartitionOption

WithRenewInterval sets the lease renewal interval.

type PgLockRegistry

type PgLockRegistry struct {
	// contains filtered or unexported fields
}

PgLockRegistry is a distributed LockRegistry backed by PostgreSQL advisory locks. Uses pg_advisory_lock which is session-scoped and automatically released on disconnect.

All locks share a single dedicated connection. PostgreSQL advisory locks are session-scoped — one connection can hold thousands of locks simultaneously. Previous implementation used one connection per lock, which exhausted the pool when many subscriptions/automations were configured.

NOTE: CockroachDB does not support advisory locks. For CockroachDB, use natslock.NATSLockRegistry (from git.nullsoft.is/ash/eskit/natslock) or implement a table-based lock with CAS semantics.

func NewPgLockRegistry

func NewPgLockRegistry(pool *pgxpool.Pool) *PgLockRegistry

NewPgLockRegistry creates a PostgreSQL advisory lock registry.

func (*PgLockRegistry) Acquire

func (r *PgLockRegistry) Acquire(ctx context.Context, streamID string) (func(), error)

Acquire obtains an advisory lock for the given stream ID. Blocks until acquired. Multiple locks share a single connection. Call the returned function to release.

Returns an error if this registry already holds the lock (prevents reentrant acquisition, which would silently succeed in PG since advisory locks are reentrant within the same session).

func (*PgLockRegistry) Close

func (r *PgLockRegistry) Close()

Close releases the shared connection and all held locks. Locks are automatically released when the connection closes.

func (*PgLockRegistry) TryAcquire

func (r *PgLockRegistry) TryAcquire(streamID string) (func(), bool)

TryAcquire attempts to obtain the advisory lock without blocking.

type PoolConfig

type PoolConfig struct {
	// MaxConns is the maximum number of connections in the pool.
	// Default: max(4, runtime.NumCPU() * 2) — scales with available CPUs.
	// Advisory locks hold one connection for the lock's lifetime, so ensure
	// MaxConns > number of leader-elected subscriptions/automations.
	MaxConns int32

	// MinConns is the minimum number of idle connections kept alive.
	// Default: 2 — avoids cold-start latency on first queries.
	MinConns int32

	// MaxConnLifetime is the maximum time a connection stays open.
	// Default: 30 minutes — recycles connections to pick up Postgres config
	// changes and prevent stale connections after network events.
	// Set longer (1h+) if advisory lock connections must not recycle.
	MaxConnLifetime time.Duration

	// MaxConnIdleTime is how long an idle connection stays in the pool.
	// Default: 5 minutes — returns resources to Postgres when idle.
	MaxConnIdleTime time.Duration

	// HealthCheckPeriod is how often idle connections are health-checked.
	// Default: 30 seconds — detects dead connections before they're used.
	// Critical for advisory locks: a dead connection = lost lock = 30s failover.
	HealthCheckPeriod time.Duration

	// Schema sets the search_path for all connections in this pool.
	// Empty string means no override (uses Postgres default, usually "public").
	Schema string
}

PoolConfig provides production-tuned defaults for pgxpool connection pools. All values can be overridden via PoolOption functions.

Defaults are tuned for event sourcing workloads:

  • High read throughput (subscriptions polling, projections reading)
  • Bursty writes (append batches)
  • Long-held connections (advisory locks for leader election)
  • Connection recycling (prevent stale connections after Postgres restart)

func DefaultPoolConfig

func DefaultPoolConfig() PoolConfig

DefaultPoolConfig returns production-tuned defaults.

type PoolOption

type PoolOption func(*PoolConfig)

PoolOption configures a PoolConfig.

func WithHealthCheckPeriod

func WithHealthCheckPeriod(d time.Duration) PoolOption

WithHealthCheckPeriod sets how often idle connections are checked.

func WithMaxConnIdleTime

func WithMaxConnIdleTime(d time.Duration) PoolOption

WithMaxConnIdleTime sets the maximum idle time before closing.

func WithMaxConnLifetime

func WithMaxConnLifetime(d time.Duration) PoolOption

WithMaxConnLifetime sets the maximum connection lifetime.

func WithMaxConns

func WithMaxConns(n int32) PoolOption

WithMaxConns sets the maximum number of connections.

func WithMinConns

func WithMinConns(n int32) PoolOption

WithMinConns sets the minimum idle connections.

func WithSchema

func WithSchema(schema string) PoolOption

WithSchema sets the search_path for schema separation. See EnsureSchema and docs/schema-separation.md.

type SnapshotStore

type SnapshotStore[S any] struct {
	// contains filtered or unexported fields
}

SnapshotStore is a PostgreSQL-backed snapshot store. Implements eskit.SnapshotStore with schema versioning and timestamps.

func NewSnapshotStore

func NewSnapshotStore[S any](ctx context.Context, pool *pgxpool.Pool) (*SnapshotStore[S], error)

NewSnapshotStore creates a PostgreSQL-backed snapshot store. Runs migration on creation.

func (*SnapshotStore[S]) Invalidate

func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error

Invalidate deletes the snapshot for a single stream.

func (*SnapshotStore[S]) InvalidateAll

func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error

InvalidateAll deletes all snapshots.

func (*SnapshotStore[S]) LoadSnapshot

func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)

LoadSnapshot loads the latest snapshot for a stream. Returns nil, nil if not found.

func (*SnapshotStore[S]) SaveSnapshot

func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error

SaveSnapshot persists a snapshot of decider state. Upserts on stream_id.

type Store

type Store[E any] struct {
	// contains filtered or unexported fields
}

Store is a PostgreSQL-backed event store using pgxpool for connection pooling. Events are serialized as JSONB for queryability and debuggability.

func New

func New[E any](ctx context.Context, connString string, opts ...StoreOption) (*Store[E], error)

New creates a new PostgreSQL event store from a connection string. It also runs the migration to ensure the events table exists.

func NewFromPool

func NewFromPool[E any](ctx context.Context, pool *pgxpool.Pool, opts ...StoreOption) (*Store[E], error)

NewFromPool wraps an existing pgxpool.Pool. Runs migration on creation.

func (*Store[E]) Append

func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

func (*Store[E]) AppendWithOptions

func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)

AppendWithOptions persists events with idempotency and custom timestamp support.

func (*Store[E]) Archive

func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error

Archive moves a stream to the target store and tombstones the primary.

func (*Store[E]) ArchiveStream

func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error

ArchiveStream marks a stream as archived. Future appends are rejected.

func (*Store[E]) Close

func (s *Store[E]) Close() error

Close closes the underlying connection pool.

func (*Store[E]) Delete

func (s *Store[E]) Delete(ctx context.Context, streamID string) error

Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist.

func (*Store[E]) DeleteStream

func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error

DeleteStream permanently removes all events in a stream.

func (*Store[E]) IsTombstoned

func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)

IsTombstoned checks if a stream has been tombstoned.

func (*Store[E]) LatestSequence

func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence in the store, or 0 if empty.

func (*Store[E]) Load

func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

func (*Store[E]) LoadFrom

func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

func (*Store[E]) LoadRaw

func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)

LoadRaw loads events without deserializing the Data field.

func (*Store[E]) LoadRawWithOptions

func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)

LoadRawWithOptions loads raw events with optional filtering.

func (*Store[E]) LoadWithOptions

func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)

LoadWithOptions loads events with server-side filtering (event types, version range, limit).

func (*Store[E]) Pool

func (s *Store[E]) Pool() *pgxpool.Pool

Pool returns the underlying connection pool for advanced use cases (e.g. creating a PGNotifier or PGGlobalReader).

func (*Store[E]) ReadFrom

func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)

ReadFrom reads events across all streams by global sequence. Implements subscription.StoreReader for direct use with StoreAdapter.

func (*Store[E]) ReadFromWithOptions

func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)

ReadFromWithOptions reads global events with optional event type filtering.

func (*Store[E]) Restore

func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error

Restore moves an archived stream back from the source store.

func (*Store[E]) RestoreStream

func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error

RestoreStream brings an archived stream back to active state.

func (*Store[E]) StreamStatus

func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)

StreamStatus returns the current lifecycle state of a stream.

func (*Store[E]) Tombstone

func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error

Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted.

func (*Store[E]) TombstoneStream

func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error

TombstoneStream marks a stream as deleted. Future appends are rejected.

func (*Store[E]) Truncate

func (s *Store[E]) Truncate(ctx context.Context) error

Truncate removes all events from the store. Intended for test isolation.

type StoreOption

type StoreOption func(*storeOptions)

StoreOption configures a Store.

func WithNotifyChannel

func WithNotifyChannel(channel string) StoreOption

WithNotifyChannel enables NOTIFY after each Append with the given channel name. Use with PGNotifier for instant subscription wake-up.

func WithPGCodec

func WithPGCodec(c codec.Codec) StoreOption

WithPGUpcasters enables event upcasting for schema evolution during Load. WithPGCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. For multi-codec migration, use WithPGWriteCodec and WithPGCodecRegistry instead.

func WithPGCodecRegistry

func WithPGCodecRegistry(r *codec.Registry) StoreOption

WithPGCodecRegistry sets the registry used to look up codecs when reading events. If not set, a default registry with JSON, JSONiter, and CBOR is used.

func WithPGRegistry

func WithPGRegistry(reg *eskit.EventRegistry) StoreOption

WithPGRegistry enables type registry for heterogeneous event deserialization.

func WithPGUpcasters

func WithPGUpcasters(u *eskit.UpcasterRegistry) StoreOption

func WithPGWriteCodec

func WithPGWriteCodec(c codec.Codec) StoreOption

WithPGWriteCodec sets the codec used for writing NEW events. Existing events are read using the codec stored in each event's codec column.

type SubscriptionAdapter

type SubscriptionAdapter[E any] struct {
	// contains filtered or unexported fields
}

SubscriptionAdapter adapts PGGlobalReader into a subscription.GlobalReader. This bridges the type gap between pgstore.GlobalEvent and subscription.GlobalEvent.

func NewSubscriptionAdapter

func NewSubscriptionAdapter[E any](reader *PGGlobalReader[E]) *SubscriptionAdapter[E]

NewSubscriptionAdapter wraps a PGGlobalReader for use with the subscription system.

func (*SubscriptionAdapter[E]) LatestSequence

func (a *SubscriptionAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence delegates to the underlying reader.

func (*SubscriptionAdapter[E]) ReadFrom

func (a *SubscriptionAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]subscription.GlobalEvent[E], error)

ReadFrom reads events and converts pgstore.GlobalEvent to subscription.GlobalEvent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL