rafty

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2025 License: MIT Imports: 38 Imported by: 0

README

rafty

Rafty is yet another golang library that manage to replicate log state machine. Details about Raft protocol can be found here

Reading this documentation is recommanded before going any further.

Check out these websites will also be useful:

Why another library?

There are many libraries out there implementing the search of an understandable consensus algorithm. Unfortunately, I mostly found them difficult to understand as there is not so much clear documentation about how to use them. As examples, we have production ready repositories like:

So let's try to redo the wheel with more explanations.

Supported features

Here is a list of the supported features of rafty:

  • PreVote election
  • Leader election
  • Leader lease
  • Leadership transfer
  • Logs
    • Log replication
    • Submit write commands
    • Submit read commands
      • Submit read commands linearizable (read index)
      • Submit read commands leader lease
    • Forward read/write commands to leader
    • Log compaction
  • Log storage
    • etcd bbolt for long term storage
    • log cache backed by etcd bbolt long term storage
    • in memory storage (for development)
  • Membership changes
    • Add member
    • promote member
    • demote member
    • remove member (with shutdown if proper flag set)
    • forceRemove member
    • leaveOnTerminate
  • Single server cluster
  • Bootstrap cluster
  • Snapshot
    • Take snapshot (with FSM interface)
    • Restore snapshot (with FSM interface)
    • InstallSnapshot RPC call
  • Prometheus metrics

Real world app

See real world app as implentation example.

References

Documentation

Index

Constants

View Source
const (
	// GRPCAddress defines the default address to run the grpc server
	GRPCAddress string = "127.0.0.1"

	// GRPCPort define the default port to run the grpc server
	GRPCPort uint16 = 50051
)

Variables

View Source
var (
	// ErrAppendEntriesToLeader is triggered when trying to append entries to a leader
	ErrAppendEntriesToLeader = errors.New("cannot send append entries to a leader")

	// ErrTermTooOld is triggered when the term of the peer is older than mine
	ErrTermTooOld = errors.New("peer term older than mine")

	// ErrNoLeader is triggered when there is no existing leader
	ErrNoLeader = errors.New("no leader")

	// ErrNotLeader is triggered when trying to perform an operation on a non leader
	ErrNotLeader = errors.New("not leader")

	// ErrShutdown is triggered when the node is shutting down
	ErrShutdown = errors.New("node is shutting down")

	// ErrLogNotFound is triggered when the provided log is not found on the current node
	ErrLogNotFound = errors.New("log not found")

	// ErrIndexOutOfRange is triggered when trying to fetch an index that does not exist
	ErrIndexOutOfRange = errors.New("index out of range")

	// ErrUnkownRPCType is triggered when trying to perform an rpc call with wrong parameter
	ErrUnkownRPCType = errors.New("unknown rpcType")

	// ErrTimeout is triggered when an operation timed out
	ErrTimeout = errors.New("operation timeout")

	// ErrUnkown is triggered when trying to perform an operation that does not exist
	ErrUnkown = errors.New("unknown error")

	// ErrLeadershipTransferInProgress is triggered when a node receive an append entry request
	// or a vote request
	ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress")

	// ErrMembershipChangeNodeTooSlow is triggered by a timeout when a node is to slow to catch leader logs
	ErrMembershipChangeNodeTooSlow = errors.New("new node is too slow catching up leader logs")

	// ErrMembershipChangeInProgress is triggered by the leader when it already has a membership in progress
	ErrMembershipChangeInProgress = errors.New("membership change in progress")

	// ErrMembershipChangeNodeNotDemoted is triggered by the leader when remove a node without
	// demoting it first
	ErrMembershipChangeNodeNotDemoted = errors.New("node must be demoted before being removed")

	// ErrMembershipChangeNodeDemotionForbidden is triggered by the leader when trying to demote a node can
	// break cluster quorum
	ErrMembershipChangeNodeDemotionForbidden = errors.New("node cannot be demoted, breaking cluster")

	// ErrClusterNotBootstrapped is triggered by the leader when the bootstrap options is set
	// and a client is submitting a command
	ErrClusterNotBootstrapped = errors.New("cluster not bootstrapped")

	// ErrClusterAlreadyBootstrapped is triggered when trying to bootstrap the cluster again
	ErrClusterAlreadyBootstrapped = errors.New("cluster already bootstrapped")

	// ErrChecksumDataTooShort is triggered while decoding data with checksum
	ErrChecksumDataTooShort = errors.New("data to short for checksum")

	// ErrChecksumMistmatch is triggered while decoding data. It generally happen when
	// a file is corrupted
	ErrChecksumMistmatch = errors.New("CRC32 checksum mistmatch")

	// ErrKeyNotFound is triggered when trying to fetch a key that does not exist
	ErrKeyNotFound = errors.New("key not found")

	// ErrStoreClosed is triggered when trying to perform an operation
	// while the store is closed
	ErrStoreClosed = errors.New("store closed")

	// ErrDataDirRequired is triggered when data dir is missing
	ErrDataDirRequired = errors.New("data dir cannot be empty")

	// ErrNoSnapshotToTake is triggered when there is no snapshot to take
	ErrNoSnapshotToTake = errors.New("no snapshot to take")

	// ErrLogCommandNotAllowed is triggered when submitting a command that is not allowed
	ErrLogCommandNotAllowed = errors.New("log command not allowed")

	// ErrClient is triggered when failed to built grpc client
	ErrClient = errors.New("fail to get grpc client")
)

Functions

func DecodeUint64ToBytes

func DecodeUint64ToBytes(value []byte) uint64

DecodeUint64ToBytes permits to decode bytes to uint64

func EncodePeers

func EncodePeers(data []Peer) (result []byte)

EncodePeers permits to encode peers and return bytes

func EncodeUint64ToBytes

func EncodeUint64ToBytes(value uint64) []byte

EncodeUint64ToBytes permits to encode uint64 to bytes

func MarshalBinary

func MarshalBinary(entry *LogEntry, w io.Writer) error

marshalBinary permit to encode data in binary format

func MarshalBinaryWithChecksum

func MarshalBinaryWithChecksum(buffer *bytes.Buffer, w io.Writer) error

MarshalBinaryWithChecksum permit to encode data in binary format with checksum before being written to disk

Types

type BoltOptions

type BoltOptions struct {
	// DataDir is the default data directory that will be used to store all data on the disk. It's required
	DataDir string

	// Options holds all bolt options
	Options *bolt.Options
}

BoltOptions is all requirements related to boltdb

type BoltStore

type BoltStore struct {
	// contains filtered or unexported fields
}

func NewBoltStorage

func NewBoltStorage(options BoltOptions) (*BoltStore, error)

NewBoltStorage allows us to configure bolt storage with the provided options

func (*BoltStore) Close

func (b *BoltStore) Close() error

Close will close bolt database

func (*BoltStore) CompactLogs

func (b *BoltStore) CompactLogs(index uint64) error

CompactLogs permits to wipe all entries lower than the provided index

func (*BoltStore) DiscardLogs

func (b *BoltStore) DiscardLogs(minIndex, maxIndex uint64) error

DiscardLogs permits to wipe entries with the provided range indexes

func (*BoltStore) FirstIndex

func (b *BoltStore) FirstIndex() (uint64, error)

FistIndex will return the first index from the raft log

func (*BoltStore) Get

func (b *BoltStore) Get(key []byte) ([]byte, error)

Get will fetch provided key from the k/v store. An error will be returned if the key is not found

func (*BoltStore) GetLastConfiguration

func (b *BoltStore) GetLastConfiguration() (*LogEntry, error)

GetLastConfiguration returns the last configuration found in logs

func (*BoltStore) GetLogByIndex

func (b *BoltStore) GetLogByIndex(index uint64) (*LogEntry, error)

GetLogByIndex permits to retrieve log from specified index

func (*BoltStore) GetLogsByRange

func (b *BoltStore) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)

GetLogsByRange will return a slice of logs with peer lastLogIndex and leader lastLogIndex capped by options.MaxAppendEntries

func (*BoltStore) GetMetadata

func (b *BoltStore) GetMetadata() ([]byte, error)

GetMetadata will fetch rafty metadata from the k/v store

func (*BoltStore) GetUint64

func (b *BoltStore) GetUint64(key []byte) uint64

GetUint64 will fetch provided key from the k/v store. An error will be returned if the key is not found

func (*BoltStore) LastIndex

func (b *BoltStore) LastIndex() (uint64, error)

LastIndex will return the last index from the raft log

func (*BoltStore) Set

func (b *BoltStore) Set(key, value []byte) error

Set will add key/value to the k/v store. An error will be returned if necessary

func (*BoltStore) SetUint64

func (b *BoltStore) SetUint64(key, value []byte) error

SetUint64 will add key/value to the k/v store. An error will be returned if necessary

func (*BoltStore) StoreLog

func (b *BoltStore) StoreLog(log *LogEntry) error

StoreLog stores a single log entry

func (*BoltStore) StoreLogs

func (b *BoltStore) StoreLogs(logs []*LogEntry) error

StoreLogs stores multiple log entries

func (*BoltStore) StoreMetadata

func (b *BoltStore) StoreMetadata(value []byte) error

StoreMetadata will store rafty metadata into the k/v bucket

type ClusterStore

type ClusterStore interface {
	// Close permits to close the store
	Close() error

	// GetMetadata will fetch rafty metadata from the k/v store
	GetMetadata() ([]byte, error)

	// StoreMetadata will store rafty metadata into the k/v bucket.
	// This won't be replicated
	StoreMetadata([]byte) error

	// Set will add key/value to the k/v store
	Set(key, value []byte) error

	// Get will fetch provided key from the k/v store
	Get(key []byte) ([]byte, error)

	// SetUint64 will add key/value to the k/v store
	SetUint64(key, value []byte) error

	// GetUint64 will fetch provided key from the k/v store
	GetUint64(key []byte) uint64
}

ClusterStore is an interface that allows us to store and retrieve data only for cluster related purpose

type Configuration

type Configuration struct {
	// ServerMembers holds all current members of the cluster
	ServerMembers []Peer `json:"serverMembers"`
}

Configuration holds configuration related to current server

type GetLogsByRangeResponse

type GetLogsByRangeResponse struct {
	Logs                             []*LogEntry
	Total, LastLogIndex, LastLogTerm uint64
	SendSnapshot                     bool
	Err                              error
}

GetLogsByRangeResponse returns response from GetLogsByRange func

type InitialPeer

type InitialPeer struct {
	// Address is the address of a peer node, must be just the ip or ip:port
	Address string
	// contains filtered or unexported fields
}

InitialPeer holds address of the peer

type LogEntry

type LogEntry struct {
	FileFormat uint32
	Tombstone  uint32
	LogType    uint32
	Timestamp  uint32
	Term       uint64
	Index      uint64
	Command    []byte
}

LogEntry is holds requirements that will be used to store logs on disk

func UnmarshalBinary

func UnmarshalBinary(data []byte) (*LogEntry, error)

UnmarshalBinary permit to decode data in binary format

func UnmarshalBinaryWithChecksum

func UnmarshalBinaryWithChecksum(data []byte) (*LogEntry, error)

UnmarshalBinaryWithChecksum permit to decode data in binary format by validating its checksum before moving further

type LogKind

type LogKind uint8

LogKind represent the kind of the log

const (
	// logNoop is a log type used only by the leader
	// to keep the log index and term in sync with followers
	// when stepping up as leader
	LogNoop LogKind = iota

	// logConfiguration is a log type used between nodes
	// when configuration need to change
	LogConfiguration

	// logReplication is a log type used by clients to append log entries
	// on all nodes
	LogReplication

	// LogCommandReadLeaderLease is a log type use by clients to fetch data from the leader.
	// This mode is susceptible to time drift or long GC pause.
	// Use this method only if you don't mind to potentially have stale data
	LogCommandReadLeaderLease

	// LogCommandLinearizableRead is a command that guarantees read data validity.
	// No stale read can happen in this mode.
	LogCommandLinearizableRead
)

func (LogKind) String

func (s LogKind) String() string

String return a human readable state of the raft server

type LogStore

type LogStore interface {
	// Close permits to close the store
	Close() error

	// StoreLogs stores multiple log entries
	StoreLogs(logs []*LogEntry) error

	// StoreLog stores a single log entry
	StoreLog(log *LogEntry) error

	// GetLogByIndex permits to retrieve log from specified index
	GetLogByIndex(index uint64) (*LogEntry, error)

	// GetLogsByRange will return a slice of logs
	// with min and max index capped by options.MaxAppendEntries
	GetLogsByRange(minLogIndex, maxLogIndex, maxAppendEntries uint64) GetLogsByRangeResponse

	// DiscardLogs permits to wipe entries with the provided range indexes
	DiscardLogs(from, to uint64) error

	// CompactLogs permits to wipe all entries lower than the provided index
	CompactLogs(index uint64) error

	// FistIndex will return the first index from the raft log
	FirstIndex() (uint64, error)

	// LastIndex will return the last index from the raft log
	LastIndex() (uint64, error)

	// GetLastConfiguration returns the last configuration found
	// in logs
	GetLastConfiguration() (*LogEntry, error)
}

LogStore is an interface that allows us to store and retrieve raft logs from memory

type LogsCache

type LogsCache struct {
	// contains filtered or unexported fields
}

LogsCache holds the requirements related to caching rafty data

func NewLogsCache

func NewLogsCache(options LogsCacheOptions) *LogsCache

NewLogsCache allows us to configure the cache with the provided store option

func (*LogsCache) Close

func (lc *LogsCache) Close() error

Close will close the underlying long term store of the cache

func (*LogsCache) CompactLogs

func (lc *LogsCache) CompactLogs(index uint64) error

CompactLogs permits to wipe all entries lower than the provided index

func (*LogsCache) DiscardLogs

func (lc *LogsCache) DiscardLogs(minIndex, maxIndex uint64) error

DiscardLogs remove key cache and from long term storage

func (*LogsCache) FirstIndex

func (lc *LogsCache) FirstIndex() (uint64, error)

FirstIndex return data from cache when exist otherwise data is fetch from long term storage

func (*LogsCache) GetLastConfiguration

func (lc *LogsCache) GetLastConfiguration() (*LogEntry, error)

GetLastConfiguration return data from cache when exist otherwise data is fetch from long term storage

func (*LogsCache) GetLogByIndex

func (lc *LogsCache) GetLogByIndex(index uint64) (*LogEntry, error)

GetLogByIndex return data from cache when exist otherwise data is fetch from long term storage

func (*LogsCache) GetLogsByRange

func (lc *LogsCache) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)

GetLogsByRange return data from cache when exist otherwise data is fetch from long term storage

func (*LogsCache) LastIndex

func (lc *LogsCache) LastIndex() (uint64, error)

LastIndex return data from cache when exist otherwise data is fetch from long term storage

func (*LogsCache) StoreLog

func (lc *LogsCache) StoreLog(log *LogEntry) error

StoreLogs stores multiple log entries

func (*LogsCache) StoreLogs

func (lc *LogsCache) StoreLogs(logs []*LogEntry) error

StoreLogs stores multiple log entries in cache but also in long term storage

type LogsCacheOptions

type LogsCacheOptions struct {
	// LogStore holds the long term storage data related to raft logs
	LogStore LogStore

	// CacheOnWrite when set to true will put every write
	// request in cache before writting to the long term storage
	CacheOnWrite bool

	// TTL is the maximum amount of time to keep the entry in cache.
	// Default to 30 seconds if ttl == 0
	TTL time.Duration
}

LogsCacheOptions holds all cache options that will be later used by LogsCache

type LogsInMemory

type LogsInMemory struct {
	// contains filtered or unexported fields
}

LogsInMemory holds the requirements related to in memory rafty data

func NewLogsInMemory

func NewLogsInMemory() *LogsInMemory

func (*LogsInMemory) Close

func (in *LogsInMemory) Close() error

Close will close database

func (*LogsInMemory) CompactLogs

func (in *LogsInMemory) CompactLogs(index uint64) error

CompactLogs permits to wipe all entries lower than the provided index

func (*LogsInMemory) DiscardLogs

func (in *LogsInMemory) DiscardLogs(minIndex, maxIndex uint64) error

DiscardLogs permits to wipe entries with the provided range indexes

func (*LogsInMemory) FirstIndex

func (in *LogsInMemory) FirstIndex() (uint64, error)

FirstIndex return fist index from in memory

func (*LogsInMemory) GetLastConfiguration

func (in *LogsInMemory) GetLastConfiguration() (*LogEntry, error)

GetLastConfiguration returns the last configuration found in logs

func (*LogsInMemory) GetLogByIndex

func (in *LogsInMemory) GetLogByIndex(index uint64) (*LogEntry, error)

GetLogByIndex permits to retrieve log from specified index

func (*LogsInMemory) GetLogsByRange

func (in *LogsInMemory) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)

GetLogsByRange will return a slice of logs with peer lastLogIndex and leader lastLogIndex capped by options.MaxAppendEntries

func (*LogsInMemory) LastIndex

func (in *LogsInMemory) LastIndex() (uint64, error)

LastIndex return last index from in memory

func (*LogsInMemory) StoreLog

func (in *LogsInMemory) StoreLog(log *LogEntry) error

StoreLog stores a single log entry

func (*LogsInMemory) StoreLogs

func (in *LogsInMemory) StoreLogs(logs []*LogEntry) error

StoreLogs stores multiple log entries

type MembershipChange

type MembershipChange uint32

MembershipChange holds state related to membership management of the raft server.

const (
	// Add is use to add a voting or read replica node in the configuration.
	// It will not yet participate in any requests as it need
	// later on to be promoted with Promote
	Add MembershipChange = iota

	// Promote is used when a new node caught up leader log entries.
	// Promoting this node will allow it to be fully part of the cluster.
	Promote

	// Demote is used when a node needs to shut down for maintenance.
	// It will still received log entries from the leader but won't be part of
	// the quorum or election campaign. If I'm the current node, I will step down
	Demote

	// Remove is use to remove node in the cluster after beeing demoted.
	Remove

	// ForceRemove is use to force remove node in the cluster.
	// Using this must be used with caution as it could break the cluster
	ForceRemove

	// LeaveOnTerminate is a boolean that allow the current node to completely remove itself
	// from the cluster before shutting down by sending a LeaveOnTerminate command to the leader.
	// It's usually used by read replicas nodes.
	LeaveOnTerminate
)

func (MembershipChange) String

func (s MembershipChange) String() string

String return a human readable membership state of the raft server

type Options

type Options struct {

	// Logger expose zerolog so it can be override
	Logger *zerolog.Logger

	// ElectionTimeout is used to start new election campaign.
	// Its value will be divided by 2 in order to randomize election timing process.
	// It must be greater or equal to HeartbeatTimeout.
	// Unit is in milliseconds
	ElectionTimeout int

	// HeartbeatTimeout is used by follower without contact from the leader
	// before starting new election campaign.
	// Unit is in milliseconds
	HeartbeatTimeout int

	// TimeMultiplier is a scaling factor that will be used during election timeout
	// by electionTimeoutMin/electionTimeoutMax/leaderHeartBeatTimeout in order to avoid cluster instability
	// The default value is 1 and the maximum is 10
	TimeMultiplier uint

	// ForceStopTimeout is the timeout after which grpc server will forced to stop.
	// Default to 60s
	ForceStopTimeout time.Duration

	// MinimumClusterSize is the size minimum to have before starting prevote or election campaign
	// default is 3
	// all members of the cluster will be contacted before any other tasks
	MinimumClusterSize uint64

	// MaxAppendEntries will holds how much append entries the leader will send to the follower at once
	MaxAppendEntries uint64

	// DataDir is the default data directory that will be used to store all data on the disk. It's required
	DataDir string

	// IsVoter statuates if this peer is a voting member node.
	// When set to false, this node won't participate into any election campaign
	IsVoter bool

	// Peers holds the list of the peers
	InitialPeers []InitialPeer

	// PrevoteDisabled is a boolean the allows us to directly start
	// vote election without pre vote step
	PrevoteDisabled bool

	// ShutdownOnRemove is a boolean that allow the current node to shut down
	// when Remove command as been sent during membership change request
	ShutdownOnRemove bool

	// LeaveOnTerminate is a boolean that allow the current node to completely remove itself
	// from the cluster before shutting down by sending a LeaveOnTerminate command to the leader.
	// It's usually used by read replicas nodes.
	LeaveOnTerminate bool

	// IsSingleServerCluster indicate that it's a single server cluster.
	// This is useful for development for example
	IsSingleServerCluster bool

	// BootstrapCluster indicate if the cluster must be bootstrapped before
	// proceeding to normal cluster operations
	BootstrapCluster bool

	// SnapshotInterval is the interval at which a snapshot will be taken.
	// It will be randomize with this minimum value in order to prevent
	// all nodes to take a snapshot at the same time
	// Default to 1h
	SnapshotInterval time.Duration

	// SnapshotThreshold is the threshold that must be reached between LastIncluedIndex and new logs before taking a snapshot.
	// It prevent to take snapshots to frequently.
	// Default to 64
	SnapshotThreshold uint64

	// MetricsNamespacePrefix is the namespace to use for all rafty metrics.
	// When set, the full metric name will be `<MetricsNamespacePrefix>_rafty_<metric_name>`.
	// Otherwise it will be `rafty_<metric_name>`.
	MetricsNamespacePrefix string
	// contains filtered or unexported fields
}

Options holds config that will be modified by users

type Peer

type Peer struct {
	// Address is the address of a peer node, must be just the ip or ip:port
	Address string `json:"address"`

	// ID of the current peer
	ID string `json:"id"`

	// IsVoter statuates if this peer is a voting member node.
	// When set to false, this node won't participate into any election campaign
	IsVoter bool `json:"isVoter"`

	// WaitToBePromoted is a boolean when set to true make sure the node
	// can fully participate in raft operations.
	// It's used when using AddVoter or AddNonVoter
	WaitToBePromoted bool `json:"waitToBePromoted"`

	// Decommissioning is a boolean when set to true will allow devops
	// to put this node on maintenance or to lately send a membership
	// removal command to be safely be removed from the cluster.
	// DON'T confuse it with WaitToBePromoted flag
	Decommissioning bool `json:"decommissioning"`
	// contains filtered or unexported fields
}

Peer holds configuration needed by a peer node

func DecodePeers

func DecodePeers(data []byte) (result []Peer, err error)

decodePeers permits to decode peers and return bytes

type RPCAskNodeIDRequest

type RPCAskNodeIDRequest struct {
	Id, Address string
	IsVoter     bool
}

RPCAskNodeIDRequest holds the requirements to ask node id

type RPCAskNodeIDResponse

type RPCAskNodeIDResponse struct {
	LeaderID, LeaderAddress, PeerID string
	IsVoter, AskForMembership       bool
}

RPCAskNodeIDResponse holds the response from RPCAskNodeIDRequest

type RPCGetLeaderRequest

type RPCGetLeaderRequest struct {
	PeerID, PeerAddress string
	TotalPeers          int
}

RPCGetLeaderRequest holds the requirements to get the leader

type RPCGetLeaderResponse

type RPCGetLeaderResponse struct {
	LeaderID, LeaderAddress, PeerID string
	TotalPeers                      int
	AskForMembership                bool
}

RPCGetLeaderResponse holds the response from RPCGetLeaderRequest

type RPCPreVoteRequest

type RPCPreVoteRequest struct {
	Id          string
	CurrentTerm uint64
}

RPCPreVoteRequest holds the requirements to send pre vote requests

type RPCPreVoteResponse

type RPCPreVoteResponse struct {
	PeerID                     string
	RequesterTerm, CurrentTerm uint64
	Granted                    bool
}

RPCPreVoteResponse holds the response from RPCPreVoteRequest

type RPCRequest

type RPCRequest struct {
	RPCType      RPCType
	Request      any
	Timeout      time.Duration
	ResponseChan chan<- RPCResponse
}

RPCRequest is used by chans in order to manage rpc requests

type RPCResponse

type RPCResponse struct {
	TargetPeer Peer
	Response   any
	Error      error
}

RPCResponse is used by RPCRequest in order to reply to rpc requests

type RPCTimeoutNowRequest

type RPCTimeoutNowRequest struct{}

RPCTimeoutNowRequest holds the requirements to send timeout now requests for leadership transfer

type RPCTimeoutNowResponse

type RPCTimeoutNowResponse struct {
	Success bool
}

RPCTimeoutNowResponse holds the response from RPCTimeoutNowRequest

type RPCType

type RPCType uint8

RPCType is used to build rpc requests

const (
	// AskNodeID will be used to ask node id
	AskNodeID RPCType = iota

	// GetLeader will be used to ask who is the leader
	GetLeader

	// PreVoteRequest is used during pre vote request
	PreVoteRequest

	// VoteRequest is used during Vote request
	VoteRequest

	// AppendEntriesRequest is used by the leader to replicate logs to followers
	AppendEntriesRequest

	// AppendEntriesReplicationRequest is used during append entries request on follower side
	AppendEntriesReplicationRequest

	// ForwardCommandToLeader is used to forward command to leader
	ForwardCommandToLeader

	// TimeoutNow is used during leadership transfer
	TimeoutNowRequest

	// BootstrapClusterRequest is used to bootstrap the cluster
	BootstrapClusterRequest

	// InstallSnapshotRequest is used to install snapshot in the cluster
	InstallSnapshotRequest
)

type RPCVoteRequest

type RPCVoteRequest struct {
	CandidateId, CandidateAddress          string
	CurrentTerm, LastLogIndex, LastLogTerm uint64
	CandidateForLeadershipTransfer         bool
}

RPCVoteRequest holds the requirements to send vote requests

type RPCVoteResponse

type RPCVoteResponse struct {
	PeerID                     string
	RequesterTerm, CurrentTerm uint64
	Granted                    bool
}

RPCVoteResponse holds the response from RPCVoteRequest

type Rafty

type Rafty struct {

	// Logger expose zerolog so it can be override
	Logger *zerolog.Logger

	// Address is the current address of the raft server
	Address net.TCPAddr

	// State of the current raft server state
	// Can only be Leader, Candidate, Follower, Down
	State
	// contains filtered or unexported fields
}

Rafty is a struct representing the raft requirements

func NewRafty

func NewRafty(address net.TCPAddr, id string, options Options, logStore LogStore, clusterStore ClusterStore, fsm StateMachine, snapshot SnapshotStore) (*Rafty, error)

NewRafty instantiate rafty with default configuration with server address and its id

func (*Rafty) AddMember

func (r *Rafty) AddMember(timeout time.Duration, address, id string, isVoter bool) error

AddMember is used by the current node to add the provided member the cluster. An error will be returned if any

func (*Rafty) AskForMembership

func (r *Rafty) AskForMembership() bool

AskForMembership return a boolean if the node is must ask for membership

func (*Rafty) BootstrapCluster

func (r *Rafty) BootstrapCluster(timeout time.Duration) error

BootstrapCluster is used by the current node to bootstrap the cluster with all initial nodes. This should be only call once and on one node. If already bootstrapped, and error will be returned

func (*Rafty) DemoteMember

func (r *Rafty) DemoteMember(timeout time.Duration, address, id string, isVoter bool) error

DemoteMember is used by the current node to demote the provided member the cluster. An error will be returned if any

func (*Rafty) ForceRemoveMember

func (r *Rafty) ForceRemoveMember(timeout time.Duration, address, id string, isVoter bool) error

ForceRemoveMember is used by the current node to force remove the provided member the cluster. An error will be returned if any

func (*Rafty) IsBootstrapped

func (r *Rafty) IsBootstrapped() bool

IsBootstrapped return true if the cluster has been bootstrapped. It only applied when options.BootstrapCluster is set to true

func (*Rafty) IsLeader

func (r *Rafty) IsLeader() bool

IsLeader return true if the current node is the leader

func (*Rafty) IsRunning

func (r *Rafty) IsRunning() bool

IsRunning return a boolean if the node is running and ready to perform its duty

func (*Rafty) Leader

func (r *Rafty) Leader() (bool, string, string)

IsLeader return true if the current node is the leader

func (*Rafty) LeaveOnTerminateMember

func (r *Rafty) LeaveOnTerminateMember(timeout time.Duration, address, id string, isVoter bool) error

LeaveOnTerminateMember is used by the current node to remove itself from the cluster the cluster. An error will be returned if any

func (*Rafty) PromoteMember

func (r *Rafty) PromoteMember(timeout time.Duration, address, id string, isVoter bool) error

PromoteMember is used by the current node to promote the provided member the cluster. An error will be returned if any

func (*Rafty) RemoveMember

func (r *Rafty) RemoveMember(timeout time.Duration, address, id string, isVoter bool) error

RemoveMember is used by the current node to remove the provided member the cluster. An error will be returned if any

func (*Rafty) Start

func (r *Rafty) Start() error

Start permits to start the node with the provided configuration

func (*Rafty) Status

func (r *Rafty) Status() Status

Status return the current status of the node

func (*Rafty) Stop

func (r *Rafty) Stop()

Stop permits to stop the gRPC server by using signal

func (*Rafty) SubmitCommand

func (r *Rafty) SubmitCommand(timeout time.Duration, logKind LogKind, command []byte) ([]byte, error)

SubmitCommand allow clients to submit command to the leader. Forwarded command will be multiplied by 5

type Snapshot

type Snapshot interface {
	io.ReadWriteSeeker
	io.Closer

	// Name return the snapshot name
	Name() string

	// Metadata will return snapshot metadata
	Metadata() SnapshotMetadata

	// Discard will remove the snapshot actually in progress
	Discard() error
}

Snapshot is that implements SnapshotManager

type SnapshotConfig

type SnapshotConfig struct {
	// contains filtered or unexported fields
}

SnapshotConfig return the config that will be use to manipulate snapshots

func NewSnapshot

func NewSnapshot(dataDir string, maxSnapshots int) *SnapshotConfig

NewSnapshot will return the snapshot config that allows us to manage snapshots

func (*SnapshotConfig) List

func (s *SnapshotConfig) List() (l []*SnapshotMetadata)

List will return the list of snapshots

func (*SnapshotConfig) PrepareSnapshotReader

func (s *SnapshotConfig) PrepareSnapshotReader(name string) (Snapshot, io.ReadCloser, error)

PrepareSnapshotReader will return the appropriate config to read the snapshot name

func (*SnapshotConfig) PrepareSnapshotWriter

func (s *SnapshotConfig) PrepareSnapshotWriter(lastIncludedIndex, lastIncludedTerm, lastAppliedConfigIndex, lastAppliedConfigTerm uint64, currentConfig Configuration) (Snapshot, error)

PrepareSnapshotWriter will prepare the requirements with the provided parameters to write a snapshot

type SnapshotManager

type SnapshotManager struct {
	io.ReadWriteSeeker
	// contains filtered or unexported fields
}

SnapshotManager allows us to manage snapshots

func (*SnapshotManager) Close

func (s *SnapshotManager) Close() error

Close will close the snapshot file

func (*SnapshotManager) Discard

func (s *SnapshotManager) Discard() error

Discard will remove the snapshot actually in progress

func (*SnapshotManager) Metadata

func (s *SnapshotManager) Metadata() SnapshotMetadata

Metadata will return snapshot metadata

func (*SnapshotManager) Name

func (s *SnapshotManager) Name() string

Name will return the snapshot name

type SnapshotMetadata

type SnapshotMetadata struct {
	// LastIncludedIndex is the last index included in the snapshot
	LastIncludedIndex uint64 `json:"lastIncludedIndex"`

	// LastIncludedTerm is the term of LastIncludedIndex
	LastIncludedTerm uint64 `json:"lastIncludedTerm"`

	// Configuration holds server members
	Configuration Configuration `json:"configuration"`

	// LastAppliedConfig is the index of the highest log entry configuration applied to the current raft server
	LastAppliedConfigIndex uint64 `json:"lastAppliedConfigIndex"`

	// LastAppliedConfigTerm is the term of the highest log entry configuration applied to the current raft server
	LastAppliedConfigTerm uint64 `json:"lastAppliedConfigTerm"`

	// SnapshotName is the snapshot name
	SnapshotName string `json:"snapshotName"`

	// Size is the snapshot size
	Size int64
	// contains filtered or unexported fields
}

SnapshotMetadata holds the snapshot metadata

type SnapshotStore

type SnapshotStore interface {
	// PrepareSnapshotWriter will prepare the requirements with the provided parameters to write a snapshot
	PrepareSnapshotWriter(lastIncludedIndex, lastIncludedTerm, lastAppliedConfigIndex, lastAppliedConfigTerm uint64, currentConfig Configuration) (Snapshot, error)

	// PrepareSnapshotReader will return the appropriate config to read
	// the snapshot name
	PrepareSnapshotReader(name string) (Snapshot, io.ReadCloser, error)

	// List will return the list of snapshots
	List() []*SnapshotMetadata
}

SnapshotStore is an interface that allow end user to read or take snapshots

type State

type State uint32

State represent the current status of the raft server. The state can only be Leader, Candidate, Follower, Down

const (
	// Down state is a node that has been unreachable for a long period of time
	Down State = iota

	// Follower state is a node that participate into the voting campaign.
	// It's a passive node that issue no requests on his own but simply respond from the leader.
	// This node can become a Precandidate if all requirements are available
	Follower

	// Candidate state is a node that participate into the voting campaign.
	// It can become a Leader
	Candidate

	// Leader state is a node that was previously a Candidate.
	// It received the majority of the votes including itself and get elected as the Leader.
	// It will then handle all client requests.
	// Writes requests can only be done on the leader
	Leader
)

func (State) String

func (s State) String() string

String return a human readable state of the raft server

type StateMachine

type StateMachine interface {
	// Snapshot allow the client to take snapshots
	Snapshot(snapshotWriter io.Writer) error

	// Restore allow the client to restore a snapshot
	Restore(snapshotReader io.Reader) error

	// ApplyCommand allow the client to apply a log to the state machine
	ApplyCommand(log *LogEntry) ([]byte, error)
}

StateMachine is an interface allowing clients to interact with the raft cluster

type Status

type Status struct {
	// State is the current state of the node
	State State

	// Leader is the current leader of the cluster, if any
	Leader leaderMap

	// currentTerm is the current term of the node
	CurrentTerm uint64

	// CommittedIndex is the last committed index of the node
	CommitIndex uint64

	// LastApplied is the last applied index of the node
	LastApplied uint64

	// LastLogIndex is the last log index of the node
	LastLogIndex uint64

	// LastLogTerm is the last log term of the node
	LastLogTerm uint64

	// LastAppliedConfigIndex is the index of the last applied configuration
	LastAppliedConfigIndex uint64

	// LastAppliedConfigTerm is the term of the last applied configuration
	LastAppliedConfigTerm uint64

	// Configuration is the current configuration of the cluster
	Configuration Configuration
}

Status represent the current status of the node

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL