gorums

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2026 License: MIT Imports: 39 Imported by: 35

README

Gorums

license go reference GoReportCard build golangci-lint

Gorums [1] is a novel framework for building fault tolerant distributed systems. Gorums offers a flexible and simple quorum call abstraction, used to communicate with a set of processes, and to collect and process their responses. Gorums provides separate abstractions for (a) selecting processes for a quorum call and (b) processing replies. These abstractions simplify the main control flow of protocol implementations, especially for quorum-based systems, where only a subset of the replies to a quorum call need to be processed.

Gorums uses code generation to produce an RPC library that clients can use to invoke quorum calls. Gorums is a wrapper around the gRPC library. Services are defined using the protocol buffers interface definition language.

System Requirements

To build and deploy Gorums, you need the following software installed:

  • Protobuf compiler (protoc)
  • Make
  • Ansible (used by benchmark script)

Contributors Guide

We value your contributions. Before starting a contribution, please reach out to us by posting on an existing issue or creating a new one. Students and other contributors are encouraged to follow these guidelines:

  • We recommend using VSCode with the following plugins
    • Go plugin with the
      • gopls language server enabled
      • golangci-lint enabled
    • Code Spell Checker
    • markdownlint
    • vscode-proto3
  • Code should regularly be merged into master through pull requests.

Examples

The original EPaxos implementation modified to use Gorums can be found here.

A collection of different algorithms for reconfigurable atomic storage implemented using Gorums can be found here.

Documentation

Publications

[1] Tormod Erevik Lea, Leander Jehl, and Hein Meling. Towards New Abstractions for Implementing Quorum-based Systems. In 37th International Conference on Distributed Computing Systems (ICDCS), Jun 2017.

[2] Sebastian Pedersen, Hein Meling, and Leander Jehl. An Analysis of Quorum-based Abstractions: A Case Study using Gorums to Implement Raft. In Proceedings of the 2018 Workshop on Advanced Tools, Programming Languages, and PLatforms for Implementing and Evaluating Algorithms for Distributed systems.

Authors

  • Hein Meling
  • John Ingve Olsen
  • Tormod Erevik Lea
  • Leander Jehl

Documentation

Overview

Package gorums provide protobuf options for gRPC-based quorum calls.

Index

Constants

View Source
const (
	// MaxVersion is the maximum supported version for generated .pb.go files.
	// It is always the current version of the module.
	MaxVersion = version.Minor

	// GenVersion is the runtime version required by generated .pb.go files.
	// This is incremented when generated code relies on new functionality
	// in the runtime.
	GenVersion = 11

	// MinVersion is the minimum supported version for generated .pb.go files.
	// This is incremented when the runtime drops support for old code.
	MinVersion = 11
)
View Source
const ContentSubtype = "gorums"

ContentSubtype is the subtype used by gorums when sending messages via gRPC.

View Source
const LevelNotSet = -1

LevelNotSet is the zero value level used to indicate that no level (and thereby no reply) has been set for a correctable quorum call.

Variables

View Source
var (
	// call types
	//
	// optional bool rpc = 50001;
	E_Rpc = &file_gorums_proto_extTypes[0]
	// optional bool unicast = 50002;
	E_Unicast = &file_gorums_proto_extTypes[1]
	// optional bool multicast = 50003;
	E_Multicast = &file_gorums_proto_extTypes[2]
	// optional bool quorumcall = 50004;
	E_Quorumcall = &file_gorums_proto_extTypes[3]
)

Extension fields to descriptorpb.MethodOptions.

View Source
var ErrIncomplete = errors.New("incomplete call")

ErrIncomplete is the error returned by a quorum call when the call cannot be completed due to insufficient non-error replies to form a quorum according to the quorum function.

View Source
var ErrSendFailure = errors.New("send failure")

ErrSendFailure is the error returned by a multicast call when message sending fails for one or more nodes.

View Source
var ErrTypeMismatch = errors.New("response type mismatch")

ErrTypeMismatch is returned when a response cannot be cast to the expected type.

View Source
var File_gorums_proto protoreflect.FileDescriptor
View Source
var ID = func(n1, n2 *Node) bool {
	return n1.id < n2.id
}

ID sorts nodes by their identifier in increasing order.

View Source
var LastNodeError = func(n1, n2 *Node) bool {
	if n1.channel.lastErr() != nil && n2.channel.lastErr() == nil {
		return false
	}
	return true
}

LastNodeError sorts nodes by their LastErr() status in increasing order. A node with LastErr() != nil is larger than a node with LastErr() == nil.

View Source
var Port = func(n1, n2 *Node) bool {
	p1, _ := strconv.Atoi(n1.Port())
	p2, _ := strconv.Atoi(n2.Port())
	return p1 < p2
}

Port sorts nodes by their port number in increasing order. Warning: This function may be removed in the future.

Functions

func AsProto added in v0.10.0

func AsProto[T proto.Message](msg *Message) T

AsProto returns msg's underlying protobuf message of the specified type T. If msg is nil or the contained message is not of type T, the zero value of T is returned.

func Closer added in v0.11.0

func Closer(t testing.TB, c io.Closer) func()

Closer returns a cleanup function that closes the given io.Closer.

func Multicast added in v0.11.0

func Multicast[Req proto.Message](ctx *ConfigContext, msg Req, method string, opts ...CallOption) error

Multicast is a one-way call; no replies are returned to the client.

By default, this method blocks until messages have been sent to all nodes. This ensures that send operations complete before the caller proceeds, which can be useful for observing context cancellation or for pacing message sends. If the sending fails, the error is returned to the caller.

With the IgnoreErrors call option, the method returns nil immediately after enqueueing messages to all nodes (fire-and-forget semantics).

Multicast supports request transformation interceptors via the gorums.Interceptors option. Use gorums.MapRequest to transform requests per-node.

This method should be used by generated code only.

func RPCCall added in v0.11.0

func RPCCall(ctx *NodeContext, msg proto.Message, method string) (proto.Message, error)

RPCCall executes a remote procedure call on the node.

This method should be used by generated code only.

func Range added in v0.11.0

func Range(n int) iter.Seq[int]

func TestContext added in v0.11.0

func TestContext(t testing.TB, timeout time.Duration) context.Context

TestContext creates a context with timeout for testing. It uses t.Context() as the parent and automatically cancels on cleanup.

func TestServers added in v0.11.0

func TestServers(t testing.TB, numServers int, srvFn func(i int) ServerIface) []string

TestServers starts numServers gRPC servers using the given registration function. Servers are automatically stopped when the test finishes via t.Cleanup. The cleanup is registered first, so it runs after any subsequently registered cleanups (e.g., manager.Close()), ensuring proper shutdown ordering.

Goroutine leak detection via goleak is automatically enabled and runs after all other cleanup functions complete.

The provided srvFn is used to create and register the server handlers. If srvFn is nil, a default mock server implementation is used.

Example usage:

addrs := gorums.TestServers(t, 3, serverFn)
mgr := gorums.NewManager(gorums.InsecureGrpcDialOptions(t))
t.Cleanup(mgr.Close)
...

This function can be used by other packages for testing purposes, as long as the required service, method, and message types are registered in the global protobuf registry before calling this function.

func Unicast added in v0.11.0

func Unicast[Req proto.Message](ctx *NodeContext, req Req, method string, opts ...CallOption) error

Unicast is a one-way call; no replies are returned to the client.

By default, this method blocks until the message has been sent to the node. This ensures that send operations complete before the caller proceeds, which can be useful for observing context cancellation or for pacing message sends. If the sending fails, the error is returned to the caller.

With the IgnoreErrors call option, the method returns nil immediately after enqueueing the message (fire-and-forget semantics).

This method should be used by generated code only.

Types

type Async added in v0.3.0

type Async[Resp any] struct {
	// contains filtered or unexported fields
}

Async is a generic future type for asynchronous quorum calls. It encapsulates the state of an asynchronous call and provides methods for checking the status or waiting for completion.

Type parameter Resp is the response type from nodes.

func (*Async[Resp]) Done added in v0.3.0

func (f *Async[Resp]) Done() bool

Done reports if a reply and/or error is available for the called method.

func (*Async[Resp]) Get added in v0.3.0

func (f *Async[Resp]) Get() (Resp, error)

Get returns the reply and any error associated with the called method. The method blocks until a reply or error is available.

type CallOption added in v0.3.0

type CallOption func(*callOptions)

CallOption is a function that sets a value in the given callOptions struct

func IgnoreErrors added in v0.11.0

func IgnoreErrors() CallOption

IgnoreErrors ignores send errors from Unicast or Multicast methods and returns immediately instead of blocking until the message has been sent. By default, Unicast and Multicast methods return an error if the message could not be sent or the context was canceled.

func Interceptors added in v0.11.0

func Interceptors[Req, Resp proto.Message](interceptors ...QuorumInterceptor[Req, Resp]) CallOption

Interceptors returns a CallOption that adds quorum call interceptors. Interceptors are executed in the order provided, modifying the Responses object before the user calls a terminal method.

Example:

resp, err := ReadQC(ctx, req,
    gorums.Interceptors(loggingInterceptor, filterInterceptor),
).Majority()

type ClientCtx added in v0.11.0

type ClientCtx[Req, Resp msg] struct {
	context.Context
	// contains filtered or unexported fields
}

ClientCtx provides context and access to the quorum call state for interceptors. It exposes the request, configuration, metadata about the call, and the response iterator.

func (*ClientCtx[Req, Resp]) Config added in v0.11.0

func (c *ClientCtx[Req, Resp]) Config() Configuration

Config returns the configuration (set of nodes) for this quorum call.

func (*ClientCtx[Req, Resp]) Method added in v0.11.0

func (c *ClientCtx[Req, Resp]) Method() string

Method returns the name of the RPC method being called.

func (*ClientCtx[Req, Resp]) Node added in v0.11.0

func (c *ClientCtx[Req, Resp]) Node(id uint32) *Node

Node returns the node with the given ID.

func (*ClientCtx[Req, Resp]) Nodes added in v0.11.0

func (c *ClientCtx[Req, Resp]) Nodes() []*Node

Nodes returns the slice of nodes in this configuration.

func (*ClientCtx[Req, Resp]) Request added in v0.11.0

func (c *ClientCtx[Req, Resp]) Request() Req

Request returns the original request message for this quorum call.

func (*ClientCtx[Req, Resp]) Size added in v0.11.0

func (c *ClientCtx[Req, Resp]) Size() int

Size returns the number of nodes in this configuration.

type Codec added in v0.3.0

type Codec struct {
	// contains filtered or unexported fields
}

Codec is the gRPC codec used by gorums.

func NewCodec added in v0.3.0

func NewCodec() *Codec

NewCodec returns a new Codec.

func (Codec) Marshal added in v0.3.0

func (c Codec) Marshal(m any) (b []byte, err error)

Marshal marshals the message m into a byte slice.

func (Codec) Name added in v0.3.0

func (Codec) Name() string

Name returns the name of the Codec.

func (Codec) String added in v0.3.0

func (Codec) String() string

func (Codec) Unmarshal added in v0.3.0

func (c Codec) Unmarshal(b []byte, m any) (err error)

Unmarshal unmarshals a byte slice into m.

type ConfigContext added in v0.11.0

type ConfigContext struct {
	context.Context
	// contains filtered or unexported fields
}

ConfigContext is a context that carries a configuration for quorum calls. It embeds context.Context and provides access to the Configuration.

Use Configuration.Context to create a ConfigContext from an existing context.

func (ConfigContext) Configuration added in v0.11.0

func (c ConfigContext) Configuration() Configuration

Configuration returns the Configuration associated with this context.

type Configuration added in v0.3.0

type Configuration []*Node

Configuration represents a static set of nodes on which quorum calls may be invoked.

Mutating the configuration is not supported; instead, use NewConfiguration to create a new configuration.

func NewConfig added in v0.11.0

func NewConfig(opts ...Option) (Configuration, error)

NewConfig returns a new Configuration based on the provided [gorums.Option]s. It accepts exactly one gorums.NodeListOption and multiple [gorums.ManagerOption]s. You may use this function to create the initial configuration for a new manager.

Example:

	cfg, err := NewConfig(
	    gorums.WithNodeList([]string{"localhost:8080", "localhost:8081", "localhost:8082"}),
        gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
	)

This is a convenience function for creating a configuration without explicitly creating a manager first. However, the manager can be accessed using the Configuration.Manager method. This method should only be used once since it creates a new manager; if a manager already exists, use NewConfiguration instead, and provide the existing manager as the first argument.

func NewConfiguration added in v0.3.0

func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, err error)

NewConfiguration returns a configuration based on the provided list of nodes. Nodes can be supplied using WithNodeMap or WithNodeList, or WithNodeIDs. A new configuration can also be created from an existing configuration, using the And, WithNewNodes, Except, and WithoutNodes methods.

func TestConfiguration added in v0.11.0

func TestConfiguration(t testing.TB, numServers int, srvFn func(i int) ServerIface, opts ...TestOption) Configuration

TestConfiguration creates servers and a configuration for testing. Both server and manager cleanup are handled via t.Cleanup in the correct order: manager is closed first, then servers are stopped.

The provided srvFn is used to create and register the server handlers. If srvFn is nil, a default mock server implementation is used.

Optional TestOptions can be provided to customize the manager, server, or configuration.

By default, nodes are assigned sequential IDs (0, 1, 2, ...) matching the server creation order. This can be overridden by providing a NodeListOption.

This is the recommended way to set up tests that need both servers and a configuration. It ensures proper cleanup and detects goroutine leaks.

func (Configuration) And added in v0.4.0

And returns a NodeListOption that can be used to create a new configuration combining c and d.

func (Configuration) Context added in v0.11.0

func (cfg Configuration) Context(parent context.Context) *ConfigContext

Context creates a new ConfigContext from the given parent context and this configuration.

Example:

config, _ := gorums.NewConfiguration(mgr, gorums.WithNodeList(addrs))
cfgCtx := config.Context(context.Background())
resp, err := paxos.Prepare(cfgCtx, req)

func (Configuration) Equal added in v0.3.0

func (c Configuration) Equal(b Configuration) bool

Equal returns true if configurations b and c have the same set of nodes.

func (Configuration) Except added in v0.4.0

Except returns a NodeListOption that can be used to create a new configuration from c without the nodes in rm.

func (Configuration) Manager added in v0.11.0

func (c Configuration) Manager() *Manager

Manager returns the Manager that manages this configuration's nodes. Returns nil if the configuration is empty.

func (Configuration) NodeIDs added in v0.3.0

func (c Configuration) NodeIDs() []uint32

NodeIDs returns a slice of this configuration's Node IDs.

func (Configuration) Nodes added in v0.3.0

func (c Configuration) Nodes() []*Node

Nodes returns the nodes in this configuration.

func (Configuration) Size added in v0.3.0

func (c Configuration) Size() int

Size returns the number of nodes in this configuration.

func (Configuration) WithNewNodes added in v0.4.0

func (c Configuration) WithNewNodes(newNodes NodeListOption) NodeListOption

WithNewNodes returns a NodeListOption that can be used to create a new configuration combining c and the new nodes.

func (Configuration) WithoutErrors added in v0.11.0

func (c Configuration) WithoutErrors(err QuorumCallError, errorTypes ...error) NodeListOption

WithoutErrors returns a NodeListOption that creates a new configuration excluding nodes that failed in the given QuorumCallError. If specific error types are provided, only nodes whose errors match one of those types (using errors.Is) will be excluded. If no error types are provided, all failed nodes are excluded.

func (Configuration) WithoutNodes added in v0.4.0

func (c Configuration) WithoutNodes(ids ...uint32) NodeListOption

WithoutNodes returns a NodeListOption that can be used to create a new configuration from c without the given node IDs.

type Correctable added in v0.3.0

type Correctable[Resp any] struct {
	// contains filtered or unexported fields
}

Correctable is a generic type for correctable quorum calls. It encapsulates the state of a correctable call and provides methods for checking the status or waiting for completion at specific levels.

Type parameter Resp is the response type from nodes.

func NewCorrectable added in v0.11.0

func NewCorrectable[Resp any]() *Correctable[Resp]

NewCorrectable creates a new Correctable object.

func (*Correctable[Resp]) Done added in v0.3.0

func (c *Correctable[Resp]) Done() <-chan struct{}

Done returns a channel that will close when the correctable call is completed.

func (*Correctable[Resp]) Get added in v0.3.0

func (c *Correctable[Resp]) Get() (Resp, int, error)

Get returns the latest response, the current level, and the last error.

func (*Correctable[Resp]) Watch added in v0.3.0

func (c *Correctable[Resp]) Watch(level int) <-chan struct{}

Watch returns a channel that will close when the correctable call has reached a specified level.

type EnforceVersion added in v0.4.0

type EnforceVersion uint

EnforceVersion is used by code generated by protoc-gen-gorums to statically enforce minimum and maximum versions of this package. A compilation failure implies either that:

  • the runtime package is too old and needs to be updated OR
  • the generated code is too old and needs to be regenerated.

The runtime package can be upgraded by running:

go get github.com/relab/gorums

The generated code can be regenerated by running:

protoc --gorums_out=${PROTOC_GEN_GORUMS_ARGS} ${PROTO_FILES}

Example usage by generated code:

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = gorums.EnforceVersion(genVersion - gorums.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = gorums.EnforceVersion(gorums.MaxVersion - genVersion)
)

The genVersion is the current minor version used to generated the code. This compile-time check relies on negative integer overflow of a uint being a compilation failure (guaranteed by the Go specification).

type Handler added in v0.10.0

type Handler func(ServerCtx, *Message) (*Message, error)

Handler is a function that processes a request message and returns a response message.

type Interceptor added in v0.10.0

type Interceptor func(ServerCtx, *Message, Handler) (*Message, error)

Interceptor is a function that can intercept and modify incoming requests and outgoing responses. It receives a ServerCtx, the incoming Message, and a Handler representing the next element in the chain (either another Interceptor or the actual server method). It returns a Message and an error.

type Manager added in v0.3.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager maintains a connection pool of nodes on which quorum calls can be performed.

func NewManager added in v0.3.0

func NewManager(opts ...ManagerOption) *Manager

NewManager returns a new Manager for managing connection to nodes added to the manager. This function accepts manager options used to configure various aspects of the manager.

func (*Manager) Close added in v0.3.0

func (m *Manager) Close() error

Close closes all node connections and any client streams.

func (*Manager) Node added in v0.3.0

func (m *Manager) Node(id uint32) (node *Node, found bool)

Node returns the node with the given identifier if present.

func (*Manager) NodeIDs added in v0.3.0

func (m *Manager) NodeIDs() []uint32

NodeIDs returns the identifier of each available node. IDs are returned in the same order as they were provided in the creation of the Manager.

func (*Manager) Nodes added in v0.3.0

func (m *Manager) Nodes() []*Node

Nodes returns a slice of each available node. IDs are returned in the same order as they were provided in the creation of the Manager.

func (*Manager) Size added in v0.3.0

func (m *Manager) Size() (nodes int)

Size returns the number of nodes in the Manager.

type ManagerOption added in v0.3.0

type ManagerOption func(*managerOptions)

ManagerOption provides a way to set different options on a new Manager.

func InsecureDialOptions added in v0.11.0

func InsecureDialOptions(_ testing.TB) ManagerOption

InsecureDialOptions returns the default insecure gRPC dial options for testing.

func WithBackoff added in v0.3.0

func WithBackoff(backoff backoff.Config) ManagerOption

WithBackoff allows for changing the backoff delays used by Gorums.

func WithDialOptions added in v0.11.0

func WithDialOptions(opts ...grpc.DialOption) ManagerOption

WithDialOptions returns a ManagerOption which sets any gRPC dial options the Manager should use when initially connecting to each node in its pool.

func WithLogger added in v0.3.0

func WithLogger(logger *log.Logger) ManagerOption

WithLogger returns a ManagerOption which sets an optional error logger for the Manager.

func WithMetadata added in v0.3.0

func WithMetadata(md metadata.MD) ManagerOption

WithMetadata returns a ManagerOption that sets the metadata that is sent to each node when the connection is initially established. This metadata can be retrieved from the server-side method handlers.

func WithPerNodeMetadata added in v0.3.0

func WithPerNodeMetadata(f func(uint32) metadata.MD) ManagerOption

WithPerNodeMetadata returns a ManagerOption that allows you to set metadata for each node individually.

func WithSendBufferSize added in v0.3.0

func WithSendBufferSize(size uint) ManagerOption

WithSendBufferSize allows for changing the size of the send buffer used by Gorums. A larger buffer might achieve higher throughput for asynchronous calltypes, but at the cost of latency.

type Message added in v0.3.0

type Message struct {
	// contains filtered or unexported fields
}

Message encapsulates a protobuf message and metadata.

This struct should be used by generated code only.

func NewRequestMessage added in v0.11.0

func NewRequestMessage(md *ordering.Metadata, req proto.Message) *Message

NewRequestMessage creates a new Gorums Message for the given metadata and request message.

This function should be used by generated code and tests only.

func NewResponseMessage added in v0.10.0

func NewResponseMessage(md *ordering.Metadata, resp proto.Message) *Message

NewResponseMessage creates a new Gorums Message for the given metadata and response message.

This function should be used by generated code only.

func (*Message) GetMessageID added in v0.10.0

func (m *Message) GetMessageID() uint64

GetMessageID returns the message ID from the message metadata.

func (*Message) GetMetadata added in v0.10.0

func (m *Message) GetMetadata() *ordering.Metadata

GetMetadata returns the metadata of the message.

func (*Message) GetMethod added in v0.10.0

func (m *Message) GetMethod() string

GetMethod returns the method name from the message metadata.

func (*Message) GetProtoMessage added in v0.10.0

func (m *Message) GetProtoMessage() proto.Message

GetProtoMessage returns the protobuf message contained in the Message.

func (*Message) GetStatus added in v0.10.0

func (m *Message) GetStatus() *status.Status

type MultiSorter added in v0.3.0

type MultiSorter struct {
	// contains filtered or unexported fields
}

MultiSorter implements the Sort interface, sorting the nodes within.

func OrderedBy added in v0.3.0

func OrderedBy(less ...lessFunc) *MultiSorter

OrderedBy returns a Sorter that sorts using the less functions, in order. Call its Sort method to sort the data.

func (*MultiSorter) Len added in v0.3.0

func (ms *MultiSorter) Len() int

Len is part of sort.Interface.

func (*MultiSorter) Less added in v0.3.0

func (ms *MultiSorter) Less(i, j int) bool

Less is part of sort.Interface. It is implemented by looping along the less functions until it finds a comparison that is either Less or not Less. Note that it can call the less functions twice per call. We could change the functions to return -1, 0, 1 and reduce the number of calls for greater efficiency: an exercise for the reader.

func (*MultiSorter) Sort added in v0.3.0

func (ms *MultiSorter) Sort(nodes []*Node)

Sort sorts the argument slice according to the less functions passed to OrderedBy.

func (*MultiSorter) Swap added in v0.3.0

func (ms *MultiSorter) Swap(i, j int)

Swap is part of sort.Interface.

type Node added in v0.3.0

type Node struct {
	// contains filtered or unexported fields
}

Node encapsulates the state of a node on which a remote procedure call can be performed.

func TestNode added in v0.11.0

func TestNode(t testing.TB, srvFn func(i int) ServerIface, opts ...TestOption) *Node

TestNode creates a single server and returns the node for testing. Both server and manager cleanup are handled via t.Cleanup in the correct order.

The provided srvFn is used to create and register the server handler. If srvFn is nil, a default mock server implementation is used.

Optional TestOptions can be provided to customize the manager, server, or configuration.

This is the recommended way to set up tests that need only a single server node. It ensures proper cleanup and detects goroutine leaks.

func (*Node) Address added in v0.3.0

func (n *Node) Address() string

Address returns network address of n.

func (*Node) Context added in v0.11.0

func (n *Node) Context(parent context.Context) *NodeContext

Context creates a new NodeContext from the given parent context and this node.

Example:

nodeCtx := node.Context(context.Background())
resp, err := service.GRPCCall(nodeCtx, req)

func (*Node) FullString added in v0.3.0

func (n *Node) FullString() string

FullString returns a more descriptive string representation of n that includes id, network address and latency information.

func (*Node) Host added in v0.6.0

func (n *Node) Host() string

Host returns the network host of n.

func (*Node) ID added in v0.3.0

func (n *Node) ID() uint32

ID returns the ID of n.

func (*Node) LastErr added in v0.3.0

func (n *Node) LastErr() error

LastErr returns the last error encountered (if any) for this node.

func (*Node) Latency added in v0.5.0

func (n *Node) Latency() time.Duration

Latency returns the latency between the client and this node.

func (*Node) Port added in v0.3.0

func (n *Node) Port() string

Port returns network port of n.

func (*Node) String added in v0.3.0

func (n *Node) String() string

type NodeContext added in v0.11.0

type NodeContext struct {
	context.Context
	// contains filtered or unexported fields
}

NodeContext is a context that carries a node for unicast and RPC calls. It embeds context.Context and provides access to the Node.

Use Node.Context to create a NodeContext from an existing context.

func (NodeContext) Node added in v0.11.0

func (c NodeContext) Node() *Node

Node returns the Node associated with this context.

type NodeListOption added in v0.4.0

type NodeListOption interface {
	Option
	// contains filtered or unexported methods
}

NodeListOption must be implemented by node providers.

func WithNodeIDs added in v0.3.0

func WithNodeIDs(ids []uint32) NodeListOption

WithNodeIDs returns a NodeListOption containing a list of node IDs. This assumes that the provided node IDs have already been registered with the manager.

func WithNodeList added in v0.3.0

func WithNodeList(addrsList []string) NodeListOption

WithNodeList returns a NodeListOption containing the provided list of node addresses. With this option, node IDs are generated by the Manager.

func WithNodeMap added in v0.3.0

func WithNodeMap(idMap map[string]uint32) NodeListOption

WithNodeMap returns a NodeListOption containing the provided mapping from node addresses to application-specific IDs.

type NodeResponse added in v0.11.0

type NodeResponse[T any] struct {
	NodeID uint32
	Value  T
	Err    error
}

NodeResponse wraps a response value from node ID, and an error if any.

type Option added in v0.11.0

type Option interface {
	// contains filtered or unexported methods
}

Option is a marker interface for options to NewConfig.

type QuorumCallError added in v0.3.0

type QuorumCallError struct {
	// contains filtered or unexported fields
}

QuorumCallError reports on a failed quorum call. It provides detailed information about which nodes failed.

func TestQuorumCallError added in v0.11.0

func TestQuorumCallError(_ testing.TB, nodeErrors map[uint32]error) QuorumCallError

TestQuorumCallError creates a QuorumCallError for testing. The nodeErrors map contains node IDs and their corresponding errors.

func (QuorumCallError) Cause added in v0.11.0

func (e QuorumCallError) Cause() error

Cause returns the underlying cause of the quorum call failure. Common causes include ErrIncomplete and ErrSendFailure.

func (QuorumCallError) Error added in v0.3.0

func (e QuorumCallError) Error() string

func (QuorumCallError) Is added in v0.8.0

func (e QuorumCallError) Is(target error) bool

Is reports whether the target error is the same as the cause of the QuorumCallError.

func (QuorumCallError) NodeErrors added in v0.11.0

func (e QuorumCallError) NodeErrors() int

NodeErrors returns the number of nodes that failed during the quorum call.

func (QuorumCallError) Unwrap added in v0.11.0

func (e QuorumCallError) Unwrap() (errs []error)

Unwrap returns all the underlying node errors as a slice. This allows the error to work with errors.Is and errors.As for any wrapped errors.

type QuorumInterceptor added in v0.11.0

type QuorumInterceptor[Req, Resp msg] func(ctx *ClientCtx[Req, Resp], next ResponseSeq[Resp]) ResponseSeq[Resp]

QuorumInterceptor intercepts and processes quorum calls, allowing modification of requests, responses, and aggregation logic. Interceptors can be chained together.

Type parameters:

  • Req: The request message type sent to nodes
  • Resp: The response message type from individual nodes

The interceptor receives the ClientCtx for metadata access, the current response iterator (next), and returns a new response iterator. This pattern allows interceptors to wrap the response stream with custom logic.

Custom interceptors can be created like this:

func LoggingInterceptor[Req, Resp proto.Message](
    ctx *gorums.ClientCtx[Req, Resp],
    next gorums.ResponseSeq[Resp],
) gorums.ResponseSeq[Resp] {
    return func(yield func(gorums.NodeResponse[Resp]) bool) {
        for resp := range next {
            log.Printf("Response from node %d", resp.NodeID)
            if !yield(resp) { return }
        }
    }
}

func MapRequest added in v0.11.0

func MapRequest[Req, Resp msg](fn func(Req, *Node) Req) QuorumInterceptor[Req, Resp]

MapRequest returns an interceptor that applies per-node request transformations. Multiple interceptors can be chained together, with transforms applied in order.

The fn receives the original request and a node, and returns the transformed request to send to that node. If the function returns an invalid message or nil, the request to that node is skipped.

func MapResponse added in v0.11.0

func MapResponse[Req, Resp msg](fn func(Resp, *Node) Resp) QuorumInterceptor[Req, Resp]

MapResponse returns an interceptor that applies per-node response transformations.

The fn receives the response from a node and the node itself, and returns the transformed response.

type ResponseSeq added in v0.11.0

type ResponseSeq[T msg] iter.Seq[NodeResponse[T]]

ResponseSeq is an iterator that yields NodeResponse[T] values from a quorum call.

func (ResponseSeq[Resp]) CollectAll added in v0.11.0

func (seq ResponseSeq[Resp]) CollectAll() map[uint32]Resp

CollectAll collects all responses, including errors, from the iterator into a map by node ID.

Example:

responses := QuorumCall(ctx, req)
// Collect all responses (including errors)
replies := responses.CollectAll()
// or collect all successful responses
replies = responses.IgnoreErrors().CollectAll()

func (ResponseSeq[Resp]) CollectN added in v0.11.0

func (seq ResponseSeq[Resp]) CollectN(n int) map[uint32]Resp

CollectN collects up to n responses, including errors, from the iterator into a map by node ID. It returns early if n responses are collected or the iterator is exhausted.

Example:

responses := QuorumCall(ctx, req)
// Collect the first 2 responses (including errors)
replies := responses.CollectN(2)
// or collect 2 successful responses
replies = responses.IgnoreErrors().CollectN(2)

func (ResponseSeq[Resp]) Filter added in v0.11.0

func (seq ResponseSeq[Resp]) Filter(keep func(NodeResponse[Resp]) bool) ResponseSeq[Resp]

Filter returns an iterator that yields only the responses for which the provided keep function returns true. This is useful for verifying or filtering responses from servers before further processing.

Example:

responses := QuorumCall(ctx, req)
// Filter to only responses from a specific node
for resp := range responses.Filter(func(r NodeResponse[Resp]) bool {
	return r.NodeID == 1
}) {
	// process resp
}

func (ResponseSeq[Resp]) IgnoreErrors added in v0.11.0

func (seq ResponseSeq[Resp]) IgnoreErrors() ResponseSeq[Resp]

IgnoreErrors returns an iterator that yields only successful responses, discarding any responses with errors. This is useful when you want to process only valid responses from nodes.

Example:

responses := QuorumCall(ctx, Request_builder{Num: uint64(42)}.Build())
var sum int32
for resp := range responses.IgnoreErrors() {
    // resp is guaranteed to be a successful response
	sum += resp.Value.GetValue()
}

type Responses added in v0.11.0

type Responses[Resp msg] struct {
	ResponseSeq[Resp]
	// contains filtered or unexported fields
}

Responses provides access to quorum call responses and terminal methods. It is returned by quorum call functions and allows fluent-style API usage:

resp, err := ReadQuorumCall(ctx, req).Majority()
// or
resp, err := ReadQuorumCall(ctx, req).First()
// or
replies := ReadQuorumCall(ctx, req).IgnoreErrors().CollectAll()

Type parameter:

  • Resp: The response message type

func NewResponses added in v0.11.0

func NewResponses[Req, Resp msg](ctx *ClientCtx[Req, Resp]) *Responses[Resp]

func QuorumCall added in v0.11.0

func QuorumCall[Req, Resp msg](
	ctx *ConfigContext,
	req Req,
	method string,
	opts ...CallOption,
) *Responses[Resp]

QuorumCall performs a quorum call and returns a Responses object that provides access to node responses via terminal methods and fluent iteration.

Type parameters:

  • Req: The request message type
  • Resp: The response message type from individual nodes

The opts parameter accepts CallOption values such as Interceptors. Interceptors are applied in the order they are provided via Interceptors, modifying the clientCtx before the user calls a terminal method.

Note: Messages are not sent to nodes until a terminal method (like Majority, First) or iterator method (like Seq) is called, applying any registered request transformations. This lazy sending is necessary to allow interceptors to register transformations prior to dispatch.

This function should be used by generated code only.

func QuorumCallStream added in v0.11.0

func QuorumCallStream[Req, Resp msg](
	ctx *ConfigContext,
	req Req,
	method string,
	opts ...CallOption,
) *Responses[Resp]

QuorumCallStream performs a streaming quorum call and returns a Responses object. This is used for correctable stream methods where the server sends multiple responses.

In streaming mode, the response iterator continues indefinitely until the context is canceled, allowing the server to send multiple responses over time.

This function should be used by generated code only.

func (*Responses[Resp]) All added in v0.11.0

func (r *Responses[Resp]) All() (Resp, error)

All returns the first response once all nodes have responded successfully. If any node fails, it returns an error.

func (*Responses[Resp]) AsyncAll added in v0.11.0

func (r *Responses[Resp]) AsyncAll() *Async[Resp]

AsyncAll returns an Async future that resolves when all nodes have responded. Messages are sent immediately (synchronously) to preserve ordering.

func (*Responses[Resp]) AsyncFirst added in v0.11.0

func (r *Responses[Resp]) AsyncFirst() *Async[Resp]

AsyncFirst returns an Async future that resolves when the first response is received. Messages are sent immediately (synchronously) to preserve ordering.

func (*Responses[Resp]) AsyncMajority added in v0.11.0

func (r *Responses[Resp]) AsyncMajority() *Async[Resp]

AsyncMajority returns an Async future that resolves when a majority quorum is reached. Messages are sent immediately (synchronously) to preserve ordering when multiple async calls are created in sequence.

func (*Responses[Resp]) AsyncThreshold added in v0.11.0

func (r *Responses[Resp]) AsyncThreshold(threshold int) *Async[Resp]

AsyncThreshold returns an Async future that resolves when the threshold is reached. Messages are sent immediately (synchronously) to preserve ordering when multiple async calls are created in sequence.

func (*Responses[Resp]) Correctable added in v0.11.0

func (r *Responses[Resp]) Correctable(threshold int) *Correctable[Resp]

Correctable returns a Correctable that provides progressive updates as responses arrive. The level increases with each successful response. Use this for correctable quorum patterns where you want to observe intermediate states.

Example:

corr := ReadQC(ctx, req).Correctable(2)
// Wait for level 2 to be reached
<-corr.Watch(2)
resp, level, err := corr.Get()

func (*Responses[Resp]) First added in v0.11.0

func (r *Responses[Resp]) First() (Resp, error)

First returns the first successful response received from any node. This is useful for read-any patterns where any single response is sufficient.

func (*Responses[Resp]) Majority added in v0.11.0

func (r *Responses[Resp]) Majority() (Resp, error)

Majority returns the first response once a simple majority (⌈(n+1)/2⌉) of successful responses are received.

func (*Responses[Resp]) Seq added in v0.11.0

func (r *Responses[Resp]) Seq() ResponseSeq[Resp]

Seq returns the underlying response iterator that yields node responses as they arrive. It returns a single-use iterator. Users can use this to implement custom aggregation logic. This method triggers lazy sending of requests.

The iterator will:

  • Yield responses as they arrive from nodes
  • Continue until the context is canceled or all expected responses have been received
  • Allow early termination by breaking from the range loop

Example usage:

for result := range ReadQuorumCall(ctx, req).Seq() {
    if result.Err != nil {
        // Handle node error
        continue
    }
    // Process result.Value
}

func (*Responses[Resp]) Size added in v0.11.0

func (r *Responses[Resp]) Size() int

Size returns the number of nodes in the configuration.

func (*Responses[Resp]) Threshold added in v0.11.0

func (r *Responses[Resp]) Threshold(threshold int) (resp Resp, err error)

Threshold waits for a threshold number of successful responses. It returns the first response once the threshold is reached.

type Server added in v0.3.0

type Server struct {
	// contains filtered or unexported fields
}

Server serves all ordering based RPCs using registered handlers.

func NewServer added in v0.3.0

func NewServer(opts ...ServerOption) *Server

NewServer returns a new instance of gorums.Server.

func (*Server) GracefulStop added in v0.3.0

func (s *Server) GracefulStop()

GracefulStop waits for all RPCs to finish before stopping.

func (*Server) RegisterHandler added in v0.3.0

func (s *Server) RegisterHandler(method string, handler Handler)

RegisterHandler registers a request handler for the specified method name.

This function should only be used by generated code.

func (*Server) Serve added in v0.3.0

func (s *Server) Serve(listener net.Listener) error

Serve starts serving on the listener.

func (*Server) Stop added in v0.3.0

func (s *Server) Stop()

Stop stops the server immediately.

type ServerCtx added in v0.6.0

type ServerCtx struct {
	context.Context
	// contains filtered or unexported fields
}

ServerCtx is a context that is passed from the Gorums server to the handler. It allows the handler to release its lock on the server, allowing the next request to be processed. This happens automatically when the handler returns.

func (*ServerCtx) Release added in v0.6.0

func (ctx *ServerCtx) Release()

Release releases this handler's lock on the server, which allows the next request to be processed concurrently. Use Release only when the handler no longer needs exclusive access to the server's state. It is safe to call Release multiple times.

func (*ServerCtx) SendMessage added in v0.10.0

func (ctx *ServerCtx) SendMessage(msg *Message) error

SendMessage attempts to send the given message to the client. This may fail if the stream was closed or the stream context got canceled.

This function should be used by generated code only.

type ServerIface added in v0.3.0

type ServerIface interface {
	Serve(net.Listener) error
	Stop()
}

ServerIface is the interface that must be implemented by a server in order to support the TestSetup function.

func EchoServerFn added in v0.11.0

func EchoServerFn(_ int) ServerIface

func StreamBenchmarkServerFn added in v0.11.0

func StreamBenchmarkServerFn(_ int) ServerIface

func StreamServerFn added in v0.11.0

func StreamServerFn(_ int) ServerIface

type ServerOption added in v0.3.0

type ServerOption func(*serverOptions)

ServerOption is used to change settings for the GorumsServer

func WithConnectCallback added in v0.8.0

func WithConnectCallback(callback func(context.Context)) ServerOption

WithConnectCallback registers a callback function that will be called by the server whenever a node connects or reconnects to the server. This allows access to the node's stream context, which is passed to the callback function. The stream context can be used to extract the metadata and peer information, if available.

func WithGRPCServerOptions added in v0.3.0

func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOption

WithGRPCServerOptions allows to set gRPC options for the server.

func WithInterceptors added in v0.10.0

func WithInterceptors(i ...Interceptor) ServerOption

WithInterceptors registers server-side interceptors to run for every incoming request. Interceptors are executed for each registered handler. Interceptors may modify both the request and/or response messages, or perform additional actions before or after calling the next handler in the chain. Interceptors are executed in the order they are provided: the first element is executed first, and the last element calls the actual server method handler.

func WithReceiveBufferSize added in v0.3.0

func WithReceiveBufferSize(size uint) ServerOption

WithReceiveBufferSize sets the buffer size for the server. A larger buffer may result in higher throughput at the cost of higher latency.

type System added in v0.11.0

type System struct {
	// contains filtered or unexported fields
}

System encapsulates the state of a Gorums system, including the server, listener, and any registered closers (e.g. managers).

func NewSystem added in v0.11.0

func NewSystem(addr string, opts ...ServerOption) (*System, error)

NewSystem creates a new Gorums System listening on the specified address with the provided server options.

func (*System) Addr added in v0.11.0

func (s *System) Addr() string

Addr returns the address the system is listening on.

func (*System) RegisterService added in v0.11.0

func (s *System) RegisterService(closer io.Closer, registerFunc func(*Server))

RegisterService registers the service with the server using the provided register function. The closer is added to the list of closers to be closed when the system is stopped.

Example usage:

gs := NewSystem(lis)
mgr := NewManager(...)
impl := &srvImpl{}
gs.RegisterService(mgr, func(srv *Server) {
	pb.RegisterMultiPaxosServer(srv, impl)
})

func (*System) Serve added in v0.11.0

func (s *System) Serve() error

Serve starts the server.

func (*System) Stop added in v0.11.0

func (s *System) Stop() (errs error)

Stop stops the Gorums server and closes all registered closers. It immediately closes all open connections and listeners. It cancels all active RPCs on the server side and the corresponding pending RPCs on the client side will get notified by connection errors.

type TestOption added in v0.11.0

type TestOption any

TestOption is a marker interface that can hold ManagerOption, ServerOption, or NodeListOption. This allows test helpers to accept a single variadic parameter that can be filtered and passed to the appropriate constructors (NewManager, NewServer, NewConfiguration).

Each option type (ManagerOption, ServerOption, NodeListOption) embeds this interface, so they can be passed directly without wrapping:

SetupConfiguration(t, 3, nil,
	WithBackoff(...),           // ManagerOption
	WithReceiveBufferSize(10),  // ServerOption
	WithNodeMap(...),           // NodeListOption
)

func WithManager added in v0.11.0

func WithManager(_ testing.TB, mgr *Manager) TestOption

WithManager returns a TestOption that provides an existing manager to use instead of creating a new one. This is useful when creating multiple configurations that should share the same manager.

When using WithManager, the caller is responsible for closing the manager. SetupConfiguration will NOT register a cleanup function for the manager.

This option is intended for testing purposes only.

func WithPreConnect added in v0.11.0

func WithPreConnect(_ testing.TB, fn func(stopServers func())) TestOption

WithPreConnect returns a TestOption that registers a function to be called after servers are started but before nodes attempt to connect. The function receives a stopServers callback that can be used to stop the test servers.

This is useful for testing error handling when servers are unavailable:

node := gorums.TestNode(t, nil, gorums.WithPreConnect(t, func(stopServers func()) {
	stopServers()
	time.Sleep(300 * time.Millisecond) // wait for server to fully stop
}))

This option is intended for testing purposes only.

func WithStopFunc added in v0.11.0

func WithStopFunc(_ testing.TB, fn *func(...int)) TestOption

WithStopFunc returns a TestOption that captures the variadic server stop function, allowing tests to stop servers at any point during test execution. Call with no arguments to stop all servers, or with specific indices to stop those servers. This is useful for testing server failure scenarios.

Usage:

var stopServers func(...int)
config := gorums.TestConfiguration(t, 3, nil, gorums.WithStopFunc(t, &stopServers))
// ... send some messages ...
stopServers() // stop all servers
// OR
stopServers(0, 2) // stop servers at indices 0 and 2
// ... verify error handling ...

This option is intended for testing purposes only.

Directories

Path Synopsis
cmd
benchmark command
protoc-gen-gorums/gengorums
Package gengorums is internal to the gorums protobuf module.
Package gengorums is internal to the gorums protobuf module.
examples module
internal
version
Package version records versioning information about this module.
Package version records versioning information about this module.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL