notify

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CloseEvent = Event{Type: "close"}

CloseEvent is a sentinel Event value that the SSE handler sends when the server is shutting down (ctx cancelled). Signals clients to reconnect later.

Functions

This section is empty.

Types

type Event

type Event struct {
	// Channel is the logical event channel (e.g., "products", "refresh").
	Channel string

	// Payload is the raw JSON payload from the NOTIFY message, already extracted
	// from the outer notifyMessage envelope. May be nil for control events.
	Payload []byte

	// Type is the SSE event type. Empty means a standard data event. "close" and
	// "refresh" are sentinel values recognised by the SSE HTTP handler.
	Type string
}

Event represents a notification received from PostgreSQL LISTEN/NOTIFY and fan-out to SSE subscribers.

The Type field controls SSE event framing:

  • Empty string (zero value): data-only event (standard SSE data frame).
  • "close": signals graceful server shutdown; the SSE handler should write "event: close\ndata: {}\n\n" and terminate the connection (SSE-03).
  • "refresh": backpressure signal indicating the subscriber's buffer was full and events were dropped; the client should reload the resource state (SSE-05).

type Executor

type Executor interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
}

Executor is the minimal interface required by PostgresHub.Publish. It is satisfied by *pgx.Conn, *pgxpool.Pool, and pgx.Tx — any type that can execute SQL. The notify package does not import the generated actions package to avoid circular dependencies.

type NotifyHub

type NotifyHub interface {
	// Subscribe registers a subscriber for events on the given channel scoped
	// to the given tenant. Returns a Subscription whose Events channel receives
	// fan-out notifications. Call Subscription.Close to unsubscribe.
	Subscribe(channel string, tenantID uuid.UUID) *Subscription

	// Publish sends a pg_notify payload to all subscribers on the given channel
	// scoped to the given tenant. The payload must be < 8000 bytes (PostgreSQL limit).
	Publish(ctx context.Context, channel string, tenantID uuid.UUID, payload []byte) error

	// Start begins listening for PostgreSQL notifications. It blocks until ctx is
	// cancelled (SSE-03: graceful shutdown via context propagation). Any error
	// returned indicates a fatal configuration problem; reconnection errors are
	// handled internally by pgxlisten.
	Start(ctx context.Context) error
}

NotifyHub is the interface for publishing and subscribing to real-time events via PostgreSQL LISTEN/NOTIFY. The interface allows swapping the backend to Redis, NATS, or any other pub/sub system (SSE-06).

type PostgresHub

type PostgresHub struct {
	// contains filtered or unexported fields
}

PostgresHub implements NotifyHub using a single dedicated PostgreSQL LISTEN connection (via pgxlisten) that fans out events to per-subscriber buffered Go channels.

One connection handles all channels — routing is done in the payload JSON. This avoids per-client LISTEN connections which do not scale (SSE-04).

func NewPostgresHub

func NewPostgresHub(connConfig *pgx.ConnConfig, db Executor, bufferSize int) *PostgresHub

NewPostgresHub creates a PostgresHub.

  • connConfig: pgx connection config for the dedicated LISTEN connection. Must be a separate connection from the application pool — LISTEN state is connection-scoped and cannot be shared with query traffic (SSE-04).
  • db: Executor for pg_notify calls in Publish (e.g., the application pool).
  • bufferSize: per-subscriber channel capacity. Defaults to 32 when 0.

func (*PostgresHub) Publish

func (h *PostgresHub) Publish(ctx context.Context, channel string, tenantID uuid.UUID, payload []byte) error

Publish sends a pg_notify message on the "forge_events" PostgreSQL channel. The payload is wrapped in a JSON envelope with channel and tenantID for in-process routing. Returns an error if the marshaled payload exceeds the 8000-byte PostgreSQL NOTIFY limit.

func (*PostgresHub) Start

func (h *PostgresHub) Start(ctx context.Context) error

Start begins listening on the "forge_events" PostgreSQL NOTIFY channel. It blocks until ctx is cancelled, enabling graceful shutdown (SSE-03). Non-fatal errors (e.g., transient connection failures) are logged; pgxlisten handles reconnection automatically with the configured ReconnectDelay.

func (*PostgresHub) Subscribe

func (h *PostgresHub) Subscribe(channel string, tenantID uuid.UUID) *Subscription

Subscribe registers a new subscriber for events on the given channel scoped to the given tenant. Returns a Subscription with a buffered Events channel. The subscription is active immediately; any in-flight pg_notify dispatches that arrive after Subscribe returns will be delivered to the channel.

type Subscription

type Subscription struct {
	// Events is the read-only channel the consumer reads events from.
	Events <-chan Event
	// contains filtered or unexported fields
}

Subscription is the consumer-facing handle returned by NotifyHub.Subscribe. Events is a read-only channel; call Close to unsubscribe and release resources.

func (*Subscription) Close

func (s *Subscription) Close()

Close unsubscribes from the hub and drains/closes the underlying channel. Calling Close more than once is safe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL