Documentation
¶
Overview ¶
Package gorums provide protobuf options for gRPC-based quorum calls.
Index ¶
- Constants
- Variables
- func AsProto[T proto.Message](msg *Message) T
- func Closer(t testing.TB, c io.Closer) func()
- func Multicast[Req proto.Message](ctx *ConfigContext, msg Req, method string, opts ...CallOption) error
- func RPCCall(ctx *NodeContext, msg proto.Message, method string) (proto.Message, error)
- func Range(n int) iter.Seq[int]
- func TestContext(t testing.TB, timeout time.Duration) context.Context
- func TestServers(t testing.TB, numServers int, srvFn func(i int) ServerIface) []string
- func Unicast[Req proto.Message](ctx *NodeContext, req Req, method string, opts ...CallOption) error
- type Async
- type CallOption
- type ClientCtx
- type Codec
- type ConfigContext
- type Configuration
- func (c Configuration) And(d Configuration) NodeListOption
- func (cfg Configuration) Context(parent context.Context) *ConfigContext
- func (c Configuration) Equal(b Configuration) bool
- func (c Configuration) Except(rm Configuration) NodeListOption
- func (c Configuration) Manager() *Manager
- func (c Configuration) NodeIDs() []uint32
- func (c Configuration) Nodes() []*Node
- func (c Configuration) Size() int
- func (c Configuration) WithNewNodes(newNodes NodeListOption) NodeListOption
- func (c Configuration) WithoutErrors(err QuorumCallError, errorTypes ...error) NodeListOption
- func (c Configuration) WithoutNodes(ids ...uint32) NodeListOption
- type Correctable
- type EnforceVersion
- type Handler
- type Interceptor
- type Manager
- type ManagerOption
- func InsecureDialOptions(_ testing.TB) ManagerOption
- func WithBackoff(backoff backoff.Config) ManagerOption
- func WithDialOptions(opts ...grpc.DialOption) ManagerOption
- func WithLogger(logger *log.Logger) ManagerOption
- func WithMetadata(md metadata.MD) ManagerOption
- func WithPerNodeMetadata(f func(uint32) metadata.MD) ManagerOption
- func WithSendBufferSize(size uint) ManagerOption
- type Message
- type MultiSorter
- type Node
- func (n *Node) Address() string
- func (n *Node) Context(parent context.Context) *NodeContext
- func (n *Node) FullString() string
- func (n *Node) Host() string
- func (n *Node) ID() uint32
- func (n *Node) LastErr() error
- func (n *Node) Latency() time.Duration
- func (n *Node) Port() string
- func (n *Node) String() string
- type NodeContext
- type NodeListOption
- type NodeResponse
- type Option
- type QuorumCallError
- type QuorumInterceptor
- type ResponseSeq
- type Responses
- func NewResponses[Req, Resp msg](ctx *ClientCtx[Req, Resp]) *Responses[Resp]
- func QuorumCall[Req, Resp msg](ctx *ConfigContext, req Req, method string, opts ...CallOption) *Responses[Resp]
- func QuorumCallStream[Req, Resp msg](ctx *ConfigContext, req Req, method string, opts ...CallOption) *Responses[Resp]
- func (r *Responses[Resp]) All() (Resp, error)
- func (r *Responses[Resp]) AsyncAll() *Async[Resp]
- func (r *Responses[Resp]) AsyncFirst() *Async[Resp]
- func (r *Responses[Resp]) AsyncMajority() *Async[Resp]
- func (r *Responses[Resp]) AsyncThreshold(threshold int) *Async[Resp]
- func (r *Responses[Resp]) Correctable(threshold int) *Correctable[Resp]
- func (r *Responses[Resp]) First() (Resp, error)
- func (r *Responses[Resp]) Majority() (Resp, error)
- func (r *Responses[Resp]) Seq() ResponseSeq[Resp]
- func (r *Responses[Resp]) Size() int
- func (r *Responses[Resp]) Threshold(threshold int) (resp Resp, err error)
- type Server
- type ServerCtx
- type ServerIface
- type ServerOption
- type System
- type TestOption
Constants ¶
const ( // MaxVersion is the maximum supported version for generated .pb.go files. // It is always the current version of the module. MaxVersion = version.Minor // GenVersion is the runtime version required by generated .pb.go files. // This is incremented when generated code relies on new functionality // in the runtime. GenVersion = 11 // MinVersion is the minimum supported version for generated .pb.go files. // This is incremented when the runtime drops support for old code. MinVersion = 11 )
const ContentSubtype = "gorums"
ContentSubtype is the subtype used by gorums when sending messages via gRPC.
const LevelNotSet = -1
LevelNotSet is the zero value level used to indicate that no level (and thereby no reply) has been set for a correctable quorum call.
Variables ¶
var ( // call types // // optional bool rpc = 50001; E_Rpc = &file_gorums_proto_extTypes[0] // optional bool unicast = 50002; E_Unicast = &file_gorums_proto_extTypes[1] // optional bool multicast = 50003; E_Multicast = &file_gorums_proto_extTypes[2] // optional bool quorumcall = 50004; E_Quorumcall = &file_gorums_proto_extTypes[3] )
Extension fields to descriptorpb.MethodOptions.
var ErrIncomplete = errors.New("incomplete call")
ErrIncomplete is the error returned by a quorum call when the call cannot be completed due to insufficient non-error replies to form a quorum according to the quorum function.
var ErrSendFailure = errors.New("send failure")
ErrSendFailure is the error returned by a multicast call when message sending fails for one or more nodes.
var ErrTypeMismatch = errors.New("response type mismatch")
ErrTypeMismatch is returned when a response cannot be cast to the expected type.
var File_gorums_proto protoreflect.FileDescriptor
var ID = func(n1, n2 *Node) bool {
return n1.id < n2.id
}
ID sorts nodes by their identifier in increasing order.
var LastNodeError = func(n1, n2 *Node) bool { if n1.channel.lastErr() != nil && n2.channel.lastErr() == nil { return false } return true }
LastNodeError sorts nodes by their LastErr() status in increasing order. A node with LastErr() != nil is larger than a node with LastErr() == nil.
var Port = func(n1, n2 *Node) bool { p1, _ := strconv.Atoi(n1.Port()) p2, _ := strconv.Atoi(n2.Port()) return p1 < p2 }
Port sorts nodes by their port number in increasing order. Warning: This function may be removed in the future.
Functions ¶
func AsProto ¶ added in v0.10.0
AsProto returns msg's underlying protobuf message of the specified type T. If msg is nil or the contained message is not of type T, the zero value of T is returned.
func Multicast ¶ added in v0.11.0
func Multicast[Req proto.Message](ctx *ConfigContext, msg Req, method string, opts ...CallOption) error
Multicast is a one-way call; no replies are returned to the client.
By default, this method blocks until messages have been sent to all nodes. This ensures that send operations complete before the caller proceeds, which can be useful for observing context cancellation or for pacing message sends. If the sending fails, the error is returned to the caller.
With the IgnoreErrors call option, the method returns nil immediately after enqueueing messages to all nodes (fire-and-forget semantics).
Multicast supports request transformation interceptors via the gorums.Interceptors option. Use gorums.MapRequest to transform requests per-node.
This method should be used by generated code only.
func RPCCall ¶ added in v0.11.0
RPCCall executes a remote procedure call on the node.
This method should be used by generated code only.
func TestContext ¶ added in v0.11.0
TestContext creates a context with timeout for testing. It uses t.Context() as the parent and automatically cancels on cleanup.
func TestServers ¶ added in v0.11.0
TestServers starts numServers gRPC servers using the given registration function. Servers are automatically stopped when the test finishes via t.Cleanup. The cleanup is registered first, so it runs after any subsequently registered cleanups (e.g., manager.Close()), ensuring proper shutdown ordering.
Goroutine leak detection via goleak is automatically enabled and runs after all other cleanup functions complete.
The provided srvFn is used to create and register the server handlers. If srvFn is nil, a default mock server implementation is used.
Example usage:
addrs := gorums.TestServers(t, 3, serverFn) mgr := gorums.NewManager(gorums.InsecureGrpcDialOptions(t)) t.Cleanup(mgr.Close) ...
This function can be used by other packages for testing purposes, as long as the required service, method, and message types are registered in the global protobuf registry before calling this function.
func Unicast ¶ added in v0.11.0
func Unicast[Req proto.Message](ctx *NodeContext, req Req, method string, opts ...CallOption) error
Unicast is a one-way call; no replies are returned to the client.
By default, this method blocks until the message has been sent to the node. This ensures that send operations complete before the caller proceeds, which can be useful for observing context cancellation or for pacing message sends. If the sending fails, the error is returned to the caller.
With the IgnoreErrors call option, the method returns nil immediately after enqueueing the message (fire-and-forget semantics).
This method should be used by generated code only.
Types ¶
type Async ¶ added in v0.3.0
type Async[Resp any] struct { // contains filtered or unexported fields }
Async is a generic future type for asynchronous quorum calls. It encapsulates the state of an asynchronous call and provides methods for checking the status or waiting for completion.
Type parameter Resp is the response type from nodes.
type CallOption ¶ added in v0.3.0
type CallOption func(*callOptions)
CallOption is a function that sets a value in the given callOptions struct
func IgnoreErrors ¶ added in v0.11.0
func IgnoreErrors() CallOption
IgnoreErrors ignores send errors from Unicast or Multicast methods and returns immediately instead of blocking until the message has been sent. By default, Unicast and Multicast methods return an error if the message could not be sent or the context was canceled.
func Interceptors ¶ added in v0.11.0
func Interceptors[Req, Resp proto.Message](interceptors ...QuorumInterceptor[Req, Resp]) CallOption
Interceptors returns a CallOption that adds quorum call interceptors. Interceptors are executed in the order provided, modifying the Responses object before the user calls a terminal method.
Example:
resp, err := ReadQC(ctx, req,
gorums.Interceptors(loggingInterceptor, filterInterceptor),
).Majority()
type ClientCtx ¶ added in v0.11.0
ClientCtx provides context and access to the quorum call state for interceptors. It exposes the request, configuration, metadata about the call, and the response iterator.
func (*ClientCtx[Req, Resp]) Config ¶ added in v0.11.0
func (c *ClientCtx[Req, Resp]) Config() Configuration
Config returns the configuration (set of nodes) for this quorum call.
func (*ClientCtx[Req, Resp]) Method ¶ added in v0.11.0
Method returns the name of the RPC method being called.
func (*ClientCtx[Req, Resp]) Nodes ¶ added in v0.11.0
Nodes returns the slice of nodes in this configuration.
type Codec ¶ added in v0.3.0
type Codec struct {
// contains filtered or unexported fields
}
Codec is the gRPC codec used by gorums.
type ConfigContext ¶ added in v0.11.0
ConfigContext is a context that carries a configuration for quorum calls. It embeds context.Context and provides access to the Configuration.
Use Configuration.Context to create a ConfigContext from an existing context.
func (ConfigContext) Configuration ¶ added in v0.11.0
func (c ConfigContext) Configuration() Configuration
Configuration returns the Configuration associated with this context.
type Configuration ¶ added in v0.3.0
type Configuration []*Node
Configuration represents a static set of nodes on which quorum calls may be invoked.
Mutating the configuration is not supported; instead, use NewConfiguration to create a new configuration.
func NewConfig ¶ added in v0.11.0
func NewConfig(opts ...Option) (Configuration, error)
NewConfig returns a new Configuration based on the provided [gorums.Option]s. It accepts exactly one gorums.NodeListOption and multiple [gorums.ManagerOption]s. You may use this function to create the initial configuration for a new manager.
Example:
cfg, err := NewConfig(
gorums.WithNodeList([]string{"localhost:8080", "localhost:8081", "localhost:8082"}),
gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
This is a convenience function for creating a configuration without explicitly creating a manager first. However, the manager can be accessed using the Configuration.Manager method. This method should only be used once since it creates a new manager; if a manager already exists, use NewConfiguration instead, and provide the existing manager as the first argument.
func NewConfiguration ¶ added in v0.3.0
func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, err error)
NewConfiguration returns a configuration based on the provided list of nodes. Nodes can be supplied using WithNodeMap or WithNodeList, or WithNodeIDs. A new configuration can also be created from an existing configuration, using the And, WithNewNodes, Except, and WithoutNodes methods.
func TestConfiguration ¶ added in v0.11.0
func TestConfiguration(t testing.TB, numServers int, srvFn func(i int) ServerIface, opts ...TestOption) Configuration
TestConfiguration creates servers and a configuration for testing. Both server and manager cleanup are handled via t.Cleanup in the correct order: manager is closed first, then servers are stopped.
The provided srvFn is used to create and register the server handlers. If srvFn is nil, a default mock server implementation is used.
Optional TestOptions can be provided to customize the manager, server, or configuration.
By default, nodes are assigned sequential IDs (0, 1, 2, ...) matching the server creation order. This can be overridden by providing a NodeListOption.
This is the recommended way to set up tests that need both servers and a configuration. It ensures proper cleanup and detects goroutine leaks.
func (Configuration) And ¶ added in v0.4.0
func (c Configuration) And(d Configuration) NodeListOption
And returns a NodeListOption that can be used to create a new configuration combining c and d.
func (Configuration) Context ¶ added in v0.11.0
func (cfg Configuration) Context(parent context.Context) *ConfigContext
Context creates a new ConfigContext from the given parent context and this configuration.
Example:
config, _ := gorums.NewConfiguration(mgr, gorums.WithNodeList(addrs)) cfgCtx := config.Context(context.Background()) resp, err := paxos.Prepare(cfgCtx, req)
func (Configuration) Equal ¶ added in v0.3.0
func (c Configuration) Equal(b Configuration) bool
Equal returns true if configurations b and c have the same set of nodes.
func (Configuration) Except ¶ added in v0.4.0
func (c Configuration) Except(rm Configuration) NodeListOption
Except returns a NodeListOption that can be used to create a new configuration from c without the nodes in rm.
func (Configuration) Manager ¶ added in v0.11.0
func (c Configuration) Manager() *Manager
Manager returns the Manager that manages this configuration's nodes. Returns nil if the configuration is empty.
func (Configuration) NodeIDs ¶ added in v0.3.0
func (c Configuration) NodeIDs() []uint32
NodeIDs returns a slice of this configuration's Node IDs.
func (Configuration) Nodes ¶ added in v0.3.0
func (c Configuration) Nodes() []*Node
Nodes returns the nodes in this configuration.
func (Configuration) Size ¶ added in v0.3.0
func (c Configuration) Size() int
Size returns the number of nodes in this configuration.
func (Configuration) WithNewNodes ¶ added in v0.4.0
func (c Configuration) WithNewNodes(newNodes NodeListOption) NodeListOption
WithNewNodes returns a NodeListOption that can be used to create a new configuration combining c and the new nodes.
func (Configuration) WithoutErrors ¶ added in v0.11.0
func (c Configuration) WithoutErrors(err QuorumCallError, errorTypes ...error) NodeListOption
WithoutErrors returns a NodeListOption that creates a new configuration excluding nodes that failed in the given QuorumCallError. If specific error types are provided, only nodes whose errors match one of those types (using errors.Is) will be excluded. If no error types are provided, all failed nodes are excluded.
func (Configuration) WithoutNodes ¶ added in v0.4.0
func (c Configuration) WithoutNodes(ids ...uint32) NodeListOption
WithoutNodes returns a NodeListOption that can be used to create a new configuration from c without the given node IDs.
type Correctable ¶ added in v0.3.0
type Correctable[Resp any] struct { // contains filtered or unexported fields }
Correctable is a generic type for correctable quorum calls. It encapsulates the state of a correctable call and provides methods for checking the status or waiting for completion at specific levels.
Type parameter Resp is the response type from nodes.
func NewCorrectable ¶ added in v0.11.0
func NewCorrectable[Resp any]() *Correctable[Resp]
NewCorrectable creates a new Correctable object.
func (*Correctable[Resp]) Done ¶ added in v0.3.0
func (c *Correctable[Resp]) Done() <-chan struct{}
Done returns a channel that will close when the correctable call is completed.
func (*Correctable[Resp]) Get ¶ added in v0.3.0
func (c *Correctable[Resp]) Get() (Resp, int, error)
Get returns the latest response, the current level, and the last error.
func (*Correctable[Resp]) Watch ¶ added in v0.3.0
func (c *Correctable[Resp]) Watch(level int) <-chan struct{}
Watch returns a channel that will close when the correctable call has reached a specified level.
type EnforceVersion ¶ added in v0.4.0
type EnforceVersion uint
EnforceVersion is used by code generated by protoc-gen-gorums to statically enforce minimum and maximum versions of this package. A compilation failure implies either that:
- the runtime package is too old and needs to be updated OR
- the generated code is too old and needs to be regenerated.
The runtime package can be upgraded by running:
go get github.com/relab/gorums
The generated code can be regenerated by running:
protoc --gorums_out=${PROTOC_GEN_GORUMS_ARGS} ${PROTO_FILES}
Example usage by generated code:
const ( // Verify that this generated code is sufficiently up-to-date. _ = gorums.EnforceVersion(genVersion - gorums.MinVersion) // Verify that runtime/protoimpl is sufficiently up-to-date. _ = gorums.EnforceVersion(gorums.MaxVersion - genVersion) )
The genVersion is the current minor version used to generated the code. This compile-time check relies on negative integer overflow of a uint being a compilation failure (guaranteed by the Go specification).
type Handler ¶ added in v0.10.0
Handler is a function that processes a request message and returns a response message.
type Interceptor ¶ added in v0.10.0
Interceptor is a function that can intercept and modify incoming requests and outgoing responses. It receives a ServerCtx, the incoming Message, and a Handler representing the next element in the chain (either another Interceptor or the actual server method). It returns a Message and an error.
type Manager ¶ added in v0.3.0
type Manager struct {
// contains filtered or unexported fields
}
Manager maintains a connection pool of nodes on which quorum calls can be performed.
func NewManager ¶ added in v0.3.0
func NewManager(opts ...ManagerOption) *Manager
NewManager returns a new Manager for managing connection to nodes added to the manager. This function accepts manager options used to configure various aspects of the manager.
func (*Manager) NodeIDs ¶ added in v0.3.0
NodeIDs returns the identifier of each available node. IDs are returned in the same order as they were provided in the creation of the Manager.
type ManagerOption ¶ added in v0.3.0
type ManagerOption func(*managerOptions)
ManagerOption provides a way to set different options on a new Manager.
func InsecureDialOptions ¶ added in v0.11.0
func InsecureDialOptions(_ testing.TB) ManagerOption
InsecureDialOptions returns the default insecure gRPC dial options for testing.
func WithBackoff ¶ added in v0.3.0
func WithBackoff(backoff backoff.Config) ManagerOption
WithBackoff allows for changing the backoff delays used by Gorums.
func WithDialOptions ¶ added in v0.11.0
func WithDialOptions(opts ...grpc.DialOption) ManagerOption
WithDialOptions returns a ManagerOption which sets any gRPC dial options the Manager should use when initially connecting to each node in its pool.
func WithLogger ¶ added in v0.3.0
func WithLogger(logger *log.Logger) ManagerOption
WithLogger returns a ManagerOption which sets an optional error logger for the Manager.
func WithMetadata ¶ added in v0.3.0
func WithMetadata(md metadata.MD) ManagerOption
WithMetadata returns a ManagerOption that sets the metadata that is sent to each node when the connection is initially established. This metadata can be retrieved from the server-side method handlers.
func WithPerNodeMetadata ¶ added in v0.3.0
func WithPerNodeMetadata(f func(uint32) metadata.MD) ManagerOption
WithPerNodeMetadata returns a ManagerOption that allows you to set metadata for each node individually.
func WithSendBufferSize ¶ added in v0.3.0
func WithSendBufferSize(size uint) ManagerOption
WithSendBufferSize allows for changing the size of the send buffer used by Gorums. A larger buffer might achieve higher throughput for asynchronous calltypes, but at the cost of latency.
type Message ¶ added in v0.3.0
type Message struct {
// contains filtered or unexported fields
}
Message encapsulates a protobuf message and metadata.
This struct should be used by generated code only.
func NewRequestMessage ¶ added in v0.11.0
NewRequestMessage creates a new Gorums Message for the given metadata and request message.
This function should be used by generated code and tests only.
func NewResponseMessage ¶ added in v0.10.0
NewResponseMessage creates a new Gorums Message for the given metadata and response message.
This function should be used by generated code only.
func (*Message) GetMessageID ¶ added in v0.10.0
GetMessageID returns the message ID from the message metadata.
func (*Message) GetMetadata ¶ added in v0.10.0
GetMetadata returns the metadata of the message.
func (*Message) GetMethod ¶ added in v0.10.0
GetMethod returns the method name from the message metadata.
func (*Message) GetProtoMessage ¶ added in v0.10.0
GetProtoMessage returns the protobuf message contained in the Message.
type MultiSorter ¶ added in v0.3.0
type MultiSorter struct {
// contains filtered or unexported fields
}
MultiSorter implements the Sort interface, sorting the nodes within.
func OrderedBy ¶ added in v0.3.0
func OrderedBy(less ...lessFunc) *MultiSorter
OrderedBy returns a Sorter that sorts using the less functions, in order. Call its Sort method to sort the data.
func (*MultiSorter) Len ¶ added in v0.3.0
func (ms *MultiSorter) Len() int
Len is part of sort.Interface.
func (*MultiSorter) Less ¶ added in v0.3.0
func (ms *MultiSorter) Less(i, j int) bool
Less is part of sort.Interface. It is implemented by looping along the less functions until it finds a comparison that is either Less or not Less. Note that it can call the less functions twice per call. We could change the functions to return -1, 0, 1 and reduce the number of calls for greater efficiency: an exercise for the reader.
func (*MultiSorter) Sort ¶ added in v0.3.0
func (ms *MultiSorter) Sort(nodes []*Node)
Sort sorts the argument slice according to the less functions passed to OrderedBy.
func (*MultiSorter) Swap ¶ added in v0.3.0
func (ms *MultiSorter) Swap(i, j int)
Swap is part of sort.Interface.
type Node ¶ added in v0.3.0
type Node struct {
// contains filtered or unexported fields
}
Node encapsulates the state of a node on which a remote procedure call can be performed.
func TestNode ¶ added in v0.11.0
func TestNode(t testing.TB, srvFn func(i int) ServerIface, opts ...TestOption) *Node
TestNode creates a single server and returns the node for testing. Both server and manager cleanup are handled via t.Cleanup in the correct order.
The provided srvFn is used to create and register the server handler. If srvFn is nil, a default mock server implementation is used.
Optional TestOptions can be provided to customize the manager, server, or configuration.
This is the recommended way to set up tests that need only a single server node. It ensures proper cleanup and detects goroutine leaks.
func (*Node) Context ¶ added in v0.11.0
func (n *Node) Context(parent context.Context) *NodeContext
Context creates a new NodeContext from the given parent context and this node.
Example:
nodeCtx := node.Context(context.Background()) resp, err := service.GRPCCall(nodeCtx, req)
func (*Node) FullString ¶ added in v0.3.0
FullString returns a more descriptive string representation of n that includes id, network address and latency information.
func (*Node) LastErr ¶ added in v0.3.0
LastErr returns the last error encountered (if any) for this node.
type NodeContext ¶ added in v0.11.0
NodeContext is a context that carries a node for unicast and RPC calls. It embeds context.Context and provides access to the Node.
Use Node.Context to create a NodeContext from an existing context.
func (NodeContext) Node ¶ added in v0.11.0
func (c NodeContext) Node() *Node
Node returns the Node associated with this context.
type NodeListOption ¶ added in v0.4.0
type NodeListOption interface {
Option
// contains filtered or unexported methods
}
NodeListOption must be implemented by node providers.
func WithNodeIDs ¶ added in v0.3.0
func WithNodeIDs(ids []uint32) NodeListOption
WithNodeIDs returns a NodeListOption containing a list of node IDs. This assumes that the provided node IDs have already been registered with the manager.
func WithNodeList ¶ added in v0.3.0
func WithNodeList(addrsList []string) NodeListOption
WithNodeList returns a NodeListOption containing the provided list of node addresses. With this option, node IDs are generated by the Manager.
func WithNodeMap ¶ added in v0.3.0
func WithNodeMap(idMap map[string]uint32) NodeListOption
WithNodeMap returns a NodeListOption containing the provided mapping from node addresses to application-specific IDs.
type NodeResponse ¶ added in v0.11.0
NodeResponse wraps a response value from node ID, and an error if any.
type Option ¶ added in v0.11.0
type Option interface {
// contains filtered or unexported methods
}
Option is a marker interface for options to NewConfig.
type QuorumCallError ¶ added in v0.3.0
type QuorumCallError struct {
// contains filtered or unexported fields
}
QuorumCallError reports on a failed quorum call. It provides detailed information about which nodes failed.
func TestQuorumCallError ¶ added in v0.11.0
func TestQuorumCallError(_ testing.TB, nodeErrors map[uint32]error) QuorumCallError
TestQuorumCallError creates a QuorumCallError for testing. The nodeErrors map contains node IDs and their corresponding errors.
func (QuorumCallError) Cause ¶ added in v0.11.0
func (e QuorumCallError) Cause() error
Cause returns the underlying cause of the quorum call failure. Common causes include ErrIncomplete and ErrSendFailure.
func (QuorumCallError) Error ¶ added in v0.3.0
func (e QuorumCallError) Error() string
func (QuorumCallError) Is ¶ added in v0.8.0
func (e QuorumCallError) Is(target error) bool
Is reports whether the target error is the same as the cause of the QuorumCallError.
func (QuorumCallError) NodeErrors ¶ added in v0.11.0
func (e QuorumCallError) NodeErrors() int
NodeErrors returns the number of nodes that failed during the quorum call.
func (QuorumCallError) Unwrap ¶ added in v0.11.0
func (e QuorumCallError) Unwrap() (errs []error)
Unwrap returns all the underlying node errors as a slice. This allows the error to work with errors.Is and errors.As for any wrapped errors.
type QuorumInterceptor ¶ added in v0.11.0
type QuorumInterceptor[Req, Resp msg] func(ctx *ClientCtx[Req, Resp], next ResponseSeq[Resp]) ResponseSeq[Resp]
QuorumInterceptor intercepts and processes quorum calls, allowing modification of requests, responses, and aggregation logic. Interceptors can be chained together.
Type parameters:
- Req: The request message type sent to nodes
- Resp: The response message type from individual nodes
The interceptor receives the ClientCtx for metadata access, the current response iterator (next), and returns a new response iterator. This pattern allows interceptors to wrap the response stream with custom logic.
Custom interceptors can be created like this:
func LoggingInterceptor[Req, Resp proto.Message](
ctx *gorums.ClientCtx[Req, Resp],
next gorums.ResponseSeq[Resp],
) gorums.ResponseSeq[Resp] {
return func(yield func(gorums.NodeResponse[Resp]) bool) {
for resp := range next {
log.Printf("Response from node %d", resp.NodeID)
if !yield(resp) { return }
}
}
}
func MapRequest ¶ added in v0.11.0
func MapRequest[Req, Resp msg](fn func(Req, *Node) Req) QuorumInterceptor[Req, Resp]
MapRequest returns an interceptor that applies per-node request transformations. Multiple interceptors can be chained together, with transforms applied in order.
The fn receives the original request and a node, and returns the transformed request to send to that node. If the function returns an invalid message or nil, the request to that node is skipped.
func MapResponse ¶ added in v0.11.0
func MapResponse[Req, Resp msg](fn func(Resp, *Node) Resp) QuorumInterceptor[Req, Resp]
MapResponse returns an interceptor that applies per-node response transformations.
The fn receives the response from a node and the node itself, and returns the transformed response.
type ResponseSeq ¶ added in v0.11.0
type ResponseSeq[T msg] iter.Seq[NodeResponse[T]]
ResponseSeq is an iterator that yields NodeResponse[T] values from a quorum call.
func (ResponseSeq[Resp]) CollectAll ¶ added in v0.11.0
func (seq ResponseSeq[Resp]) CollectAll() map[uint32]Resp
CollectAll collects all responses, including errors, from the iterator into a map by node ID.
Example:
responses := QuorumCall(ctx, req) // Collect all responses (including errors) replies := responses.CollectAll() // or collect all successful responses replies = responses.IgnoreErrors().CollectAll()
func (ResponseSeq[Resp]) CollectN ¶ added in v0.11.0
func (seq ResponseSeq[Resp]) CollectN(n int) map[uint32]Resp
CollectN collects up to n responses, including errors, from the iterator into a map by node ID. It returns early if n responses are collected or the iterator is exhausted.
Example:
responses := QuorumCall(ctx, req) // Collect the first 2 responses (including errors) replies := responses.CollectN(2) // or collect 2 successful responses replies = responses.IgnoreErrors().CollectN(2)
func (ResponseSeq[Resp]) Filter ¶ added in v0.11.0
func (seq ResponseSeq[Resp]) Filter(keep func(NodeResponse[Resp]) bool) ResponseSeq[Resp]
Filter returns an iterator that yields only the responses for which the provided keep function returns true. This is useful for verifying or filtering responses from servers before further processing.
Example:
responses := QuorumCall(ctx, req)
// Filter to only responses from a specific node
for resp := range responses.Filter(func(r NodeResponse[Resp]) bool {
return r.NodeID == 1
}) {
// process resp
}
func (ResponseSeq[Resp]) IgnoreErrors ¶ added in v0.11.0
func (seq ResponseSeq[Resp]) IgnoreErrors() ResponseSeq[Resp]
IgnoreErrors returns an iterator that yields only successful responses, discarding any responses with errors. This is useful when you want to process only valid responses from nodes.
Example:
responses := QuorumCall(ctx, Request_builder{Num: uint64(42)}.Build())
var sum int32
for resp := range responses.IgnoreErrors() {
// resp is guaranteed to be a successful response
sum += resp.Value.GetValue()
}
type Responses ¶ added in v0.11.0
type Responses[Resp msg] struct {
ResponseSeq[Resp]
// contains filtered or unexported fields
}
Responses provides access to quorum call responses and terminal methods. It is returned by quorum call functions and allows fluent-style API usage:
resp, err := ReadQuorumCall(ctx, req).Majority() // or resp, err := ReadQuorumCall(ctx, req).First() // or replies := ReadQuorumCall(ctx, req).IgnoreErrors().CollectAll()
Type parameter:
- Resp: The response message type
func NewResponses ¶ added in v0.11.0
func QuorumCall ¶ added in v0.11.0
func QuorumCall[Req, Resp msg]( ctx *ConfigContext, req Req, method string, opts ...CallOption, ) *Responses[Resp]
QuorumCall performs a quorum call and returns a Responses object that provides access to node responses via terminal methods and fluent iteration.
Type parameters:
- Req: The request message type
- Resp: The response message type from individual nodes
The opts parameter accepts CallOption values such as Interceptors. Interceptors are applied in the order they are provided via Interceptors, modifying the clientCtx before the user calls a terminal method.
Note: Messages are not sent to nodes until a terminal method (like Majority, First) or iterator method (like Seq) is called, applying any registered request transformations. This lazy sending is necessary to allow interceptors to register transformations prior to dispatch.
This function should be used by generated code only.
func QuorumCallStream ¶ added in v0.11.0
func QuorumCallStream[Req, Resp msg]( ctx *ConfigContext, req Req, method string, opts ...CallOption, ) *Responses[Resp]
QuorumCallStream performs a streaming quorum call and returns a Responses object. This is used for correctable stream methods where the server sends multiple responses.
In streaming mode, the response iterator continues indefinitely until the context is canceled, allowing the server to send multiple responses over time.
This function should be used by generated code only.
func (*Responses[Resp]) All ¶ added in v0.11.0
All returns the first response once all nodes have responded successfully. If any node fails, it returns an error.
func (*Responses[Resp]) AsyncAll ¶ added in v0.11.0
AsyncAll returns an Async future that resolves when all nodes have responded. Messages are sent immediately (synchronously) to preserve ordering.
func (*Responses[Resp]) AsyncFirst ¶ added in v0.11.0
AsyncFirst returns an Async future that resolves when the first response is received. Messages are sent immediately (synchronously) to preserve ordering.
func (*Responses[Resp]) AsyncMajority ¶ added in v0.11.0
AsyncMajority returns an Async future that resolves when a majority quorum is reached. Messages are sent immediately (synchronously) to preserve ordering when multiple async calls are created in sequence.
func (*Responses[Resp]) AsyncThreshold ¶ added in v0.11.0
AsyncThreshold returns an Async future that resolves when the threshold is reached. Messages are sent immediately (synchronously) to preserve ordering when multiple async calls are created in sequence.
func (*Responses[Resp]) Correctable ¶ added in v0.11.0
func (r *Responses[Resp]) Correctable(threshold int) *Correctable[Resp]
Correctable returns a Correctable that provides progressive updates as responses arrive. The level increases with each successful response. Use this for correctable quorum patterns where you want to observe intermediate states.
Example:
corr := ReadQC(ctx, req).Correctable(2) // Wait for level 2 to be reached <-corr.Watch(2) resp, level, err := corr.Get()
func (*Responses[Resp]) First ¶ added in v0.11.0
First returns the first successful response received from any node. This is useful for read-any patterns where any single response is sufficient.
func (*Responses[Resp]) Majority ¶ added in v0.11.0
Majority returns the first response once a simple majority (⌈(n+1)/2⌉) of successful responses are received.
func (*Responses[Resp]) Seq ¶ added in v0.11.0
func (r *Responses[Resp]) Seq() ResponseSeq[Resp]
Seq returns the underlying response iterator that yields node responses as they arrive. It returns a single-use iterator. Users can use this to implement custom aggregation logic. This method triggers lazy sending of requests.
The iterator will:
- Yield responses as they arrive from nodes
- Continue until the context is canceled or all expected responses have been received
- Allow early termination by breaking from the range loop
Example usage:
for result := range ReadQuorumCall(ctx, req).Seq() {
if result.Err != nil {
// Handle node error
continue
}
// Process result.Value
}
type Server ¶ added in v0.3.0
type Server struct {
// contains filtered or unexported fields
}
Server serves all ordering based RPCs using registered handlers.
func NewServer ¶ added in v0.3.0
func NewServer(opts ...ServerOption) *Server
NewServer returns a new instance of gorums.Server.
func (*Server) GracefulStop ¶ added in v0.3.0
func (s *Server) GracefulStop()
GracefulStop waits for all RPCs to finish before stopping.
func (*Server) RegisterHandler ¶ added in v0.3.0
RegisterHandler registers a request handler for the specified method name.
This function should only be used by generated code.
type ServerCtx ¶ added in v0.6.0
ServerCtx is a context that is passed from the Gorums server to the handler. It allows the handler to release its lock on the server, allowing the next request to be processed. This happens automatically when the handler returns.
func (*ServerCtx) Release ¶ added in v0.6.0
func (ctx *ServerCtx) Release()
Release releases this handler's lock on the server, which allows the next request to be processed concurrently. Use Release only when the handler no longer needs exclusive access to the server's state. It is safe to call Release multiple times.
func (*ServerCtx) SendMessage ¶ added in v0.10.0
SendMessage attempts to send the given message to the client. This may fail if the stream was closed or the stream context got canceled.
This function should be used by generated code only.
type ServerIface ¶ added in v0.3.0
ServerIface is the interface that must be implemented by a server in order to support the TestSetup function.
func EchoServerFn ¶ added in v0.11.0
func EchoServerFn(_ int) ServerIface
func StreamBenchmarkServerFn ¶ added in v0.11.0
func StreamBenchmarkServerFn(_ int) ServerIface
func StreamServerFn ¶ added in v0.11.0
func StreamServerFn(_ int) ServerIface
type ServerOption ¶ added in v0.3.0
type ServerOption func(*serverOptions)
ServerOption is used to change settings for the GorumsServer
func WithConnectCallback ¶ added in v0.8.0
func WithConnectCallback(callback func(context.Context)) ServerOption
WithConnectCallback registers a callback function that will be called by the server whenever a node connects or reconnects to the server. This allows access to the node's stream context, which is passed to the callback function. The stream context can be used to extract the metadata and peer information, if available.
func WithGRPCServerOptions ¶ added in v0.3.0
func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOption
WithGRPCServerOptions allows to set gRPC options for the server.
func WithInterceptors ¶ added in v0.10.0
func WithInterceptors(i ...Interceptor) ServerOption
WithInterceptors registers server-side interceptors to run for every incoming request. Interceptors are executed for each registered handler. Interceptors may modify both the request and/or response messages, or perform additional actions before or after calling the next handler in the chain. Interceptors are executed in the order they are provided: the first element is executed first, and the last element calls the actual server method handler.
func WithReceiveBufferSize ¶ added in v0.3.0
func WithReceiveBufferSize(size uint) ServerOption
WithReceiveBufferSize sets the buffer size for the server. A larger buffer may result in higher throughput at the cost of higher latency.
type System ¶ added in v0.11.0
type System struct {
// contains filtered or unexported fields
}
System encapsulates the state of a Gorums system, including the server, listener, and any registered closers (e.g. managers).
func NewSystem ¶ added in v0.11.0
func NewSystem(addr string, opts ...ServerOption) (*System, error)
NewSystem creates a new Gorums System listening on the specified address with the provided server options.
func (*System) RegisterService ¶ added in v0.11.0
RegisterService registers the service with the server using the provided register function. The closer is added to the list of closers to be closed when the system is stopped.
Example usage:
gs := NewSystem(lis)
mgr := NewManager(...)
impl := &srvImpl{}
gs.RegisterService(mgr, func(srv *Server) {
pb.RegisterMultiPaxosServer(srv, impl)
})
func (*System) Stop ¶ added in v0.11.0
Stop stops the Gorums server and closes all registered closers. It immediately closes all open connections and listeners. It cancels all active RPCs on the server side and the corresponding pending RPCs on the client side will get notified by connection errors.
type TestOption ¶ added in v0.11.0
type TestOption any
TestOption is a marker interface that can hold ManagerOption, ServerOption, or NodeListOption. This allows test helpers to accept a single variadic parameter that can be filtered and passed to the appropriate constructors (NewManager, NewServer, NewConfiguration).
Each option type (ManagerOption, ServerOption, NodeListOption) embeds this interface, so they can be passed directly without wrapping:
SetupConfiguration(t, 3, nil, WithBackoff(...), // ManagerOption WithReceiveBufferSize(10), // ServerOption WithNodeMap(...), // NodeListOption )
func WithManager ¶ added in v0.11.0
func WithManager(_ testing.TB, mgr *Manager) TestOption
WithManager returns a TestOption that provides an existing manager to use instead of creating a new one. This is useful when creating multiple configurations that should share the same manager.
When using WithManager, the caller is responsible for closing the manager. SetupConfiguration will NOT register a cleanup function for the manager.
This option is intended for testing purposes only.
func WithPreConnect ¶ added in v0.11.0
func WithPreConnect(_ testing.TB, fn func(stopServers func())) TestOption
WithPreConnect returns a TestOption that registers a function to be called after servers are started but before nodes attempt to connect. The function receives a stopServers callback that can be used to stop the test servers.
This is useful for testing error handling when servers are unavailable:
node := gorums.TestNode(t, nil, gorums.WithPreConnect(t, func(stopServers func()) {
stopServers()
time.Sleep(300 * time.Millisecond) // wait for server to fully stop
}))
This option is intended for testing purposes only.
func WithStopFunc ¶ added in v0.11.0
func WithStopFunc(_ testing.TB, fn *func(...int)) TestOption
WithStopFunc returns a TestOption that captures the variadic server stop function, allowing tests to stop servers at any point during test execution. Call with no arguments to stop all servers, or with specific indices to stop those servers. This is useful for testing server failure scenarios.
Usage:
var stopServers func(...int) config := gorums.TestConfiguration(t, 3, nil, gorums.WithStopFunc(t, &stopServers)) // ... send some messages ... stopServers() // stop all servers // OR stopServers(0, 2) // stop servers at indices 0 and 2 // ... verify error handling ...
This option is intended for testing purposes only.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
benchmark
command
|
|
|
protoc-gen-gorums
command
|
|
|
protoc-gen-gorums/gengorums
Package gengorums is internal to the gorums protobuf module.
|
Package gengorums is internal to the gorums protobuf module. |
|
examples
module
|
|
|
internal
|
|
|
version
Package version records versioning information about this module.
|
Package version records versioning information about this module. |