Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var CloseEvent = Event{Type: "close"}
CloseEvent is a sentinel Event value that the SSE handler sends when the server is shutting down (ctx cancelled). Signals clients to reconnect later.
Functions ¶
This section is empty.
Types ¶
type Event ¶
type Event struct {
// Channel is the logical event channel (e.g., "products", "refresh").
Channel string
// Payload is the raw JSON payload from the NOTIFY message, already extracted
// from the outer notifyMessage envelope. May be nil for control events.
Payload []byte
// Type is the SSE event type. Empty means a standard data event. "close" and
// "refresh" are sentinel values recognised by the SSE HTTP handler.
Type string
}
Event represents a notification received from PostgreSQL LISTEN/NOTIFY and fan-out to SSE subscribers.
The Type field controls SSE event framing:
- Empty string (zero value): data-only event (standard SSE data frame).
- "close": signals graceful server shutdown; the SSE handler should write "event: close\ndata: {}\n\n" and terminate the connection (SSE-03).
- "refresh": backpressure signal indicating the subscriber's buffer was full and events were dropped; the client should reload the resource state (SSE-05).
type Executor ¶
type Executor interface {
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
}
Executor is the minimal interface required by PostgresHub.Publish. It is satisfied by *pgx.Conn, *pgxpool.Pool, and pgx.Tx — any type that can execute SQL. The notify package does not import the generated actions package to avoid circular dependencies.
type NotifyHub ¶
type NotifyHub interface {
// Subscribe registers a subscriber for events on the given channel scoped
// to the given tenant. Returns a Subscription whose Events channel receives
// fan-out notifications. Call Subscription.Close to unsubscribe.
Subscribe(channel string, tenantID uuid.UUID) *Subscription
// Publish sends a pg_notify payload to all subscribers on the given channel
// scoped to the given tenant. The payload must be < 8000 bytes (PostgreSQL limit).
Publish(ctx context.Context, channel string, tenantID uuid.UUID, payload []byte) error
// Start begins listening for PostgreSQL notifications. It blocks until ctx is
// cancelled (SSE-03: graceful shutdown via context propagation). Any error
// returned indicates a fatal configuration problem; reconnection errors are
// handled internally by pgxlisten.
Start(ctx context.Context) error
}
NotifyHub is the interface for publishing and subscribing to real-time events via PostgreSQL LISTEN/NOTIFY. The interface allows swapping the backend to Redis, NATS, or any other pub/sub system (SSE-06).
type PostgresHub ¶
type PostgresHub struct {
// contains filtered or unexported fields
}
PostgresHub implements NotifyHub using a single dedicated PostgreSQL LISTEN connection (via pgxlisten) that fans out events to per-subscriber buffered Go channels.
One connection handles all channels — routing is done in the payload JSON. This avoids per-client LISTEN connections which do not scale (SSE-04).
func NewPostgresHub ¶
func NewPostgresHub(connConfig *pgx.ConnConfig, db Executor, bufferSize int) *PostgresHub
NewPostgresHub creates a PostgresHub.
- connConfig: pgx connection config for the dedicated LISTEN connection. Must be a separate connection from the application pool — LISTEN state is connection-scoped and cannot be shared with query traffic (SSE-04).
- db: Executor for pg_notify calls in Publish (e.g., the application pool).
- bufferSize: per-subscriber channel capacity. Defaults to 32 when 0.
func (*PostgresHub) Publish ¶
func (h *PostgresHub) Publish(ctx context.Context, channel string, tenantID uuid.UUID, payload []byte) error
Publish sends a pg_notify message on the "forge_events" PostgreSQL channel. The payload is wrapped in a JSON envelope with channel and tenantID for in-process routing. Returns an error if the marshaled payload exceeds the 8000-byte PostgreSQL NOTIFY limit.
func (*PostgresHub) Start ¶
func (h *PostgresHub) Start(ctx context.Context) error
Start begins listening on the "forge_events" PostgreSQL NOTIFY channel. It blocks until ctx is cancelled, enabling graceful shutdown (SSE-03). Non-fatal errors (e.g., transient connection failures) are logged; pgxlisten handles reconnection automatically with the configured ReconnectDelay.
func (*PostgresHub) Subscribe ¶
func (h *PostgresHub) Subscribe(channel string, tenantID uuid.UUID) *Subscription
Subscribe registers a new subscriber for events on the given channel scoped to the given tenant. Returns a Subscription with a buffered Events channel. The subscription is active immediately; any in-flight pg_notify dispatches that arrive after Subscribe returns will be delivered to the channel.
type Subscription ¶
type Subscription struct {
// Events is the read-only channel the consumer reads events from.
Events <-chan Event
// contains filtered or unexported fields
}
Subscription is the consumer-facing handle returned by NotifyHub.Subscribe. Events is a read-only channel; call Close to unsubscribe and release resources.
func (*Subscription) Close ¶
func (s *Subscription) Close()
Close unsubscribes from the hub and drains/closes the underlying channel. Calling Close more than once is safe.