Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeUint64ToBytes(value []byte) uint64
- func EncodePeers(data []Peer) (result []byte)
- func EncodeUint64ToBytes(value uint64) []byte
- func MarshalBinary(entry *LogEntry, w io.Writer) error
- func MarshalBinaryWithChecksum(buffer *bytes.Buffer, w io.Writer) error
- type BoltOptions
- type BoltStore
- func (b *BoltStore) Close() error
- func (b *BoltStore) CompactLogs(index uint64) error
- func (b *BoltStore) DiscardLogs(minIndex, maxIndex uint64) error
- func (b *BoltStore) FirstIndex() (uint64, error)
- func (b *BoltStore) Get(key []byte) ([]byte, error)
- func (b *BoltStore) GetLastConfiguration() (*LogEntry, error)
- func (b *BoltStore) GetLogByIndex(index uint64) (*LogEntry, error)
- func (b *BoltStore) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
- func (b *BoltStore) GetMetadata() ([]byte, error)
- func (b *BoltStore) GetUint64(key []byte) uint64
- func (b *BoltStore) LastIndex() (uint64, error)
- func (b *BoltStore) Set(key, value []byte) error
- func (b *BoltStore) SetUint64(key, value []byte) error
- func (b *BoltStore) StoreLog(log *LogEntry) error
- func (b *BoltStore) StoreLogs(logs []*LogEntry) error
- func (b *BoltStore) StoreMetadata(value []byte) error
- type ClusterStore
- type Configuration
- type GetLogsByRangeResponse
- type InitialPeer
- type LogEntry
- type LogKind
- type LogStore
- type LogsCache
- func (lc *LogsCache) Close() error
- func (lc *LogsCache) CompactLogs(index uint64) error
- func (lc *LogsCache) DiscardLogs(minIndex, maxIndex uint64) error
- func (lc *LogsCache) FirstIndex() (uint64, error)
- func (lc *LogsCache) GetLastConfiguration() (*LogEntry, error)
- func (lc *LogsCache) GetLogByIndex(index uint64) (*LogEntry, error)
- func (lc *LogsCache) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
- func (lc *LogsCache) LastIndex() (uint64, error)
- func (lc *LogsCache) StoreLog(log *LogEntry) error
- func (lc *LogsCache) StoreLogs(logs []*LogEntry) error
- type LogsCacheOptions
- type LogsInMemory
- func (in *LogsInMemory) Close() error
- func (in *LogsInMemory) CompactLogs(index uint64) error
- func (in *LogsInMemory) DiscardLogs(minIndex, maxIndex uint64) error
- func (in *LogsInMemory) FirstIndex() (uint64, error)
- func (in *LogsInMemory) GetLastConfiguration() (*LogEntry, error)
- func (in *LogsInMemory) GetLogByIndex(index uint64) (*LogEntry, error)
- func (in *LogsInMemory) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
- func (in *LogsInMemory) LastIndex() (uint64, error)
- func (in *LogsInMemory) StoreLog(log *LogEntry) error
- func (in *LogsInMemory) StoreLogs(logs []*LogEntry) error
- type MembershipChange
- type Options
- type Peer
- type RPCAskNodeIDRequest
- type RPCAskNodeIDResponse
- type RPCGetLeaderRequest
- type RPCGetLeaderResponse
- type RPCPreVoteRequest
- type RPCPreVoteResponse
- type RPCRequest
- type RPCResponse
- type RPCTimeoutNowRequest
- type RPCTimeoutNowResponse
- type RPCType
- type RPCVoteRequest
- type RPCVoteResponse
- type Rafty
- func (r *Rafty) AddMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) AskForMembership() bool
- func (r *Rafty) BootstrapCluster(timeout time.Duration) error
- func (r *Rafty) DemoteMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) ForceRemoveMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) IsBootstrapped() bool
- func (r *Rafty) IsLeader() bool
- func (r *Rafty) IsRunning() bool
- func (r *Rafty) Leader() (bool, string, string)
- func (r *Rafty) LeaveOnTerminateMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) PromoteMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) RemoveMember(timeout time.Duration, address, id string, isVoter bool) error
- func (r *Rafty) Start() error
- func (r *Rafty) Status() Status
- func (r *Rafty) Stop()
- func (r *Rafty) SubmitCommand(timeout time.Duration, logKind LogKind, command []byte) ([]byte, error)
- type Snapshot
- type SnapshotConfig
- type SnapshotManager
- type SnapshotMetadata
- type SnapshotStore
- type State
- type StateMachine
- type Status
Constants ¶
const ( // GRPCAddress defines the default address to run the grpc server GRPCAddress string = "127.0.0.1" // GRPCPort define the default port to run the grpc server GRPCPort uint16 = 50051 )
Variables ¶
var ( // ErrAppendEntriesToLeader is triggered when trying to append entries to a leader ErrAppendEntriesToLeader = errors.New("cannot send append entries to a leader") // ErrTermTooOld is triggered when the term of the peer is older than mine ErrTermTooOld = errors.New("peer term older than mine") // ErrNoLeader is triggered when there is no existing leader ErrNoLeader = errors.New("no leader") // ErrNotLeader is triggered when trying to perform an operation on a non leader ErrNotLeader = errors.New("not leader") // ErrShutdown is triggered when the node is shutting down ErrShutdown = errors.New("node is shutting down") // ErrLogNotFound is triggered when the provided log is not found on the current node ErrLogNotFound = errors.New("log not found") // ErrIndexOutOfRange is triggered when trying to fetch an index that does not exist ErrIndexOutOfRange = errors.New("index out of range") // ErrUnkownRPCType is triggered when trying to perform an rpc call with wrong parameter ErrUnkownRPCType = errors.New("unknown rpcType") // ErrTimeout is triggered when an operation timed out ErrTimeout = errors.New("operation timeout") // ErrUnkown is triggered when trying to perform an operation that does not exist ErrUnkown = errors.New("unknown error") // ErrLeadershipTransferInProgress is triggered when a node receive an append entry request // or a vote request ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress") // ErrMembershipChangeNodeTooSlow is triggered by a timeout when a node is to slow to catch leader logs ErrMembershipChangeNodeTooSlow = errors.New("new node is too slow catching up leader logs") // ErrMembershipChangeInProgress is triggered by the leader when it already has a membership in progress ErrMembershipChangeInProgress = errors.New("membership change in progress") // ErrMembershipChangeNodeNotDemoted is triggered by the leader when remove a node without // demoting it first ErrMembershipChangeNodeNotDemoted = errors.New("node must be demoted before being removed") // ErrMembershipChangeNodeDemotionForbidden is triggered by the leader when trying to demote a node can // break cluster quorum ErrMembershipChangeNodeDemotionForbidden = errors.New("node cannot be demoted, breaking cluster") // ErrClusterNotBootstrapped is triggered by the leader when the bootstrap options is set // and a client is submitting a command ErrClusterNotBootstrapped = errors.New("cluster not bootstrapped") // ErrClusterAlreadyBootstrapped is triggered when trying to bootstrap the cluster again ErrClusterAlreadyBootstrapped = errors.New("cluster already bootstrapped") // ErrChecksumDataTooShort is triggered while decoding data with checksum ErrChecksumDataTooShort = errors.New("data to short for checksum") // ErrChecksumMistmatch is triggered while decoding data. It generally happen when // a file is corrupted ErrChecksumMistmatch = errors.New("CRC32 checksum mistmatch") // ErrKeyNotFound is triggered when trying to fetch a key that does not exist ErrKeyNotFound = errors.New("key not found") // ErrStoreClosed is triggered when trying to perform an operation // while the store is closed ErrStoreClosed = errors.New("store closed") // ErrDataDirRequired is triggered when data dir is missing ErrDataDirRequired = errors.New("data dir cannot be empty") // ErrNoSnapshotToTake is triggered when there is no snapshot to take ErrNoSnapshotToTake = errors.New("no snapshot to take") // ErrLogCommandNotAllowed is triggered when submitting a command that is not allowed ErrLogCommandNotAllowed = errors.New("log command not allowed") // ErrClient is triggered when failed to built grpc client ErrClient = errors.New("fail to get grpc client") )
Functions ¶
func DecodeUint64ToBytes ¶
DecodeUint64ToBytes permits to decode bytes to uint64
func EncodePeers ¶
EncodePeers permits to encode peers and return bytes
func EncodeUint64ToBytes ¶
EncodeUint64ToBytes permits to encode uint64 to bytes
func MarshalBinary ¶
marshalBinary permit to encode data in binary format
Types ¶
type BoltOptions ¶
type BoltOptions struct {
// DataDir is the default data directory that will be used to store all data on the disk. It's required
DataDir string
// Options holds all bolt options
Options *bolt.Options
}
BoltOptions is all requirements related to boltdb
type BoltStore ¶
type BoltStore struct {
// contains filtered or unexported fields
}
func NewBoltStorage ¶
func NewBoltStorage(options BoltOptions) (*BoltStore, error)
NewBoltStorage allows us to configure bolt storage with the provided options
func (*BoltStore) CompactLogs ¶
CompactLogs permits to wipe all entries lower than the provided index
func (*BoltStore) DiscardLogs ¶
DiscardLogs permits to wipe entries with the provided range indexes
func (*BoltStore) FirstIndex ¶
FistIndex will return the first index from the raft log
func (*BoltStore) Get ¶
Get will fetch provided key from the k/v store. An error will be returned if the key is not found
func (*BoltStore) GetLastConfiguration ¶
GetLastConfiguration returns the last configuration found in logs
func (*BoltStore) GetLogByIndex ¶
GetLogByIndex permits to retrieve log from specified index
func (*BoltStore) GetLogsByRange ¶
func (b *BoltStore) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
GetLogsByRange will return a slice of logs with peer lastLogIndex and leader lastLogIndex capped by options.MaxAppendEntries
func (*BoltStore) GetMetadata ¶
GetMetadata will fetch rafty metadata from the k/v store
func (*BoltStore) GetUint64 ¶
GetUint64 will fetch provided key from the k/v store. An error will be returned if the key is not found
func (*BoltStore) Set ¶
Set will add key/value to the k/v store. An error will be returned if necessary
func (*BoltStore) SetUint64 ¶
SetUint64 will add key/value to the k/v store. An error will be returned if necessary
func (*BoltStore) StoreMetadata ¶
StoreMetadata will store rafty metadata into the k/v bucket
type ClusterStore ¶
type ClusterStore interface {
// Close permits to close the store
Close() error
// GetMetadata will fetch rafty metadata from the k/v store
GetMetadata() ([]byte, error)
// StoreMetadata will store rafty metadata into the k/v bucket.
// This won't be replicated
StoreMetadata([]byte) error
// Set will add key/value to the k/v store
Set(key, value []byte) error
// Get will fetch provided key from the k/v store
Get(key []byte) ([]byte, error)
// SetUint64 will add key/value to the k/v store
SetUint64(key, value []byte) error
// GetUint64 will fetch provided key from the k/v store
GetUint64(key []byte) uint64
}
ClusterStore is an interface that allows us to store and retrieve data only for cluster related purpose
type Configuration ¶
type Configuration struct {
// ServerMembers holds all current members of the cluster
ServerMembers []Peer `json:"serverMembers"`
}
Configuration holds configuration related to current server
type GetLogsByRangeResponse ¶
type GetLogsByRangeResponse struct {
Logs []*LogEntry
Total, LastLogIndex, LastLogTerm uint64
SendSnapshot bool
Err error
}
GetLogsByRangeResponse returns response from GetLogsByRange func
type InitialPeer ¶
type InitialPeer struct {
// Address is the address of a peer node, must be just the ip or ip:port
Address string
// contains filtered or unexported fields
}
InitialPeer holds address of the peer
type LogEntry ¶
type LogEntry struct {
FileFormat uint32
Tombstone uint32
LogType uint32
Timestamp uint32
Term uint64
Index uint64
Command []byte
}
LogEntry is holds requirements that will be used to store logs on disk
func UnmarshalBinary ¶
UnmarshalBinary permit to decode data in binary format
func UnmarshalBinaryWithChecksum ¶
UnmarshalBinaryWithChecksum permit to decode data in binary format by validating its checksum before moving further
type LogKind ¶
type LogKind uint8
LogKind represent the kind of the log
const ( // logNoop is a log type used only by the leader // to keep the log index and term in sync with followers // when stepping up as leader LogNoop LogKind = iota // logConfiguration is a log type used between nodes // when configuration need to change LogConfiguration // logReplication is a log type used by clients to append log entries // on all nodes LogReplication // LogCommandReadLeaderLease is a log type use by clients to fetch data from the leader. // This mode is susceptible to time drift or long GC pause. // Use this method only if you don't mind to potentially have stale data LogCommandReadLeaderLease // LogCommandLinearizableRead is a command that guarantees read data validity. // No stale read can happen in this mode. LogCommandLinearizableRead )
type LogStore ¶
type LogStore interface {
// Close permits to close the store
Close() error
// StoreLogs stores multiple log entries
StoreLogs(logs []*LogEntry) error
// StoreLog stores a single log entry
StoreLog(log *LogEntry) error
// GetLogByIndex permits to retrieve log from specified index
GetLogByIndex(index uint64) (*LogEntry, error)
// GetLogsByRange will return a slice of logs
// with min and max index capped by options.MaxAppendEntries
GetLogsByRange(minLogIndex, maxLogIndex, maxAppendEntries uint64) GetLogsByRangeResponse
// DiscardLogs permits to wipe entries with the provided range indexes
DiscardLogs(from, to uint64) error
// CompactLogs permits to wipe all entries lower than the provided index
CompactLogs(index uint64) error
// FistIndex will return the first index from the raft log
FirstIndex() (uint64, error)
// LastIndex will return the last index from the raft log
LastIndex() (uint64, error)
// GetLastConfiguration returns the last configuration found
// in logs
GetLastConfiguration() (*LogEntry, error)
}
LogStore is an interface that allows us to store and retrieve raft logs from memory
type LogsCache ¶
type LogsCache struct {
// contains filtered or unexported fields
}
LogsCache holds the requirements related to caching rafty data
func NewLogsCache ¶
func NewLogsCache(options LogsCacheOptions) *LogsCache
NewLogsCache allows us to configure the cache with the provided store option
func (*LogsCache) CompactLogs ¶
CompactLogs permits to wipe all entries lower than the provided index
func (*LogsCache) DiscardLogs ¶
DiscardLogs remove key cache and from long term storage
func (*LogsCache) FirstIndex ¶
FirstIndex return data from cache when exist otherwise data is fetch from long term storage
func (*LogsCache) GetLastConfiguration ¶
GetLastConfiguration return data from cache when exist otherwise data is fetch from long term storage
func (*LogsCache) GetLogByIndex ¶
GetLogByIndex return data from cache when exist otherwise data is fetch from long term storage
func (*LogsCache) GetLogsByRange ¶
func (lc *LogsCache) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
GetLogsByRange return data from cache when exist otherwise data is fetch from long term storage
func (*LogsCache) LastIndex ¶
LastIndex return data from cache when exist otherwise data is fetch from long term storage
type LogsCacheOptions ¶
type LogsCacheOptions struct {
// LogStore holds the long term storage data related to raft logs
LogStore LogStore
// CacheOnWrite when set to true will put every write
// request in cache before writting to the long term storage
CacheOnWrite bool
// TTL is the maximum amount of time to keep the entry in cache.
// Default to 30 seconds if ttl == 0
TTL time.Duration
}
LogsCacheOptions holds all cache options that will be later used by LogsCache
type LogsInMemory ¶
type LogsInMemory struct {
// contains filtered or unexported fields
}
LogsInMemory holds the requirements related to in memory rafty data
func NewLogsInMemory ¶
func NewLogsInMemory() *LogsInMemory
func (*LogsInMemory) CompactLogs ¶
func (in *LogsInMemory) CompactLogs(index uint64) error
CompactLogs permits to wipe all entries lower than the provided index
func (*LogsInMemory) DiscardLogs ¶
func (in *LogsInMemory) DiscardLogs(minIndex, maxIndex uint64) error
DiscardLogs permits to wipe entries with the provided range indexes
func (*LogsInMemory) FirstIndex ¶
func (in *LogsInMemory) FirstIndex() (uint64, error)
FirstIndex return fist index from in memory
func (*LogsInMemory) GetLastConfiguration ¶
func (in *LogsInMemory) GetLastConfiguration() (*LogEntry, error)
GetLastConfiguration returns the last configuration found in logs
func (*LogsInMemory) GetLogByIndex ¶
func (in *LogsInMemory) GetLogByIndex(index uint64) (*LogEntry, error)
GetLogByIndex permits to retrieve log from specified index
func (*LogsInMemory) GetLogsByRange ¶
func (in *LogsInMemory) GetLogsByRange(minIndex, maxIndex, maxAppendEntries uint64) (response GetLogsByRangeResponse)
GetLogsByRange will return a slice of logs with peer lastLogIndex and leader lastLogIndex capped by options.MaxAppendEntries
func (*LogsInMemory) LastIndex ¶
func (in *LogsInMemory) LastIndex() (uint64, error)
LastIndex return last index from in memory
func (*LogsInMemory) StoreLog ¶
func (in *LogsInMemory) StoreLog(log *LogEntry) error
StoreLog stores a single log entry
func (*LogsInMemory) StoreLogs ¶
func (in *LogsInMemory) StoreLogs(logs []*LogEntry) error
StoreLogs stores multiple log entries
type MembershipChange ¶
type MembershipChange uint32
MembershipChange holds state related to membership management of the raft server.
const ( // Add is use to add a voting or read replica node in the configuration. // It will not yet participate in any requests as it need // later on to be promoted with Promote Add MembershipChange = iota // Promote is used when a new node caught up leader log entries. // Promoting this node will allow it to be fully part of the cluster. Promote // Demote is used when a node needs to shut down for maintenance. // It will still received log entries from the leader but won't be part of // the quorum or election campaign. If I'm the current node, I will step down Demote // Remove is use to remove node in the cluster after beeing demoted. Remove // ForceRemove is use to force remove node in the cluster. // Using this must be used with caution as it could break the cluster ForceRemove // LeaveOnTerminate is a boolean that allow the current node to completely remove itself // from the cluster before shutting down by sending a LeaveOnTerminate command to the leader. // It's usually used by read replicas nodes. LeaveOnTerminate )
func (MembershipChange) String ¶
func (s MembershipChange) String() string
String return a human readable membership state of the raft server
type Options ¶
type Options struct {
// Logger expose zerolog so it can be override
Logger *zerolog.Logger
// ElectionTimeout is used to start new election campaign.
// Its value will be divided by 2 in order to randomize election timing process.
// It must be greater or equal to HeartbeatTimeout.
// Unit is in milliseconds
ElectionTimeout int
// HeartbeatTimeout is used by follower without contact from the leader
// before starting new election campaign.
// Unit is in milliseconds
HeartbeatTimeout int
// TimeMultiplier is a scaling factor that will be used during election timeout
// by electionTimeoutMin/electionTimeoutMax/leaderHeartBeatTimeout in order to avoid cluster instability
// The default value is 1 and the maximum is 10
TimeMultiplier uint
// ForceStopTimeout is the timeout after which grpc server will forced to stop.
// Default to 60s
ForceStopTimeout time.Duration
// MinimumClusterSize is the size minimum to have before starting prevote or election campaign
// default is 3
// all members of the cluster will be contacted before any other tasks
MinimumClusterSize uint64
// MaxAppendEntries will holds how much append entries the leader will send to the follower at once
MaxAppendEntries uint64
// DataDir is the default data directory that will be used to store all data on the disk. It's required
DataDir string
// IsVoter statuates if this peer is a voting member node.
// When set to false, this node won't participate into any election campaign
IsVoter bool
// Peers holds the list of the peers
InitialPeers []InitialPeer
// PrevoteDisabled is a boolean the allows us to directly start
// vote election without pre vote step
PrevoteDisabled bool
// ShutdownOnRemove is a boolean that allow the current node to shut down
// when Remove command as been sent during membership change request
ShutdownOnRemove bool
// LeaveOnTerminate is a boolean that allow the current node to completely remove itself
// from the cluster before shutting down by sending a LeaveOnTerminate command to the leader.
// It's usually used by read replicas nodes.
LeaveOnTerminate bool
// IsSingleServerCluster indicate that it's a single server cluster.
// This is useful for development for example
IsSingleServerCluster bool
// BootstrapCluster indicate if the cluster must be bootstrapped before
// proceeding to normal cluster operations
BootstrapCluster bool
// SnapshotInterval is the interval at which a snapshot will be taken.
// It will be randomize with this minimum value in order to prevent
// all nodes to take a snapshot at the same time
// Default to 1h
SnapshotInterval time.Duration
// SnapshotThreshold is the threshold that must be reached between LastIncluedIndex and new logs before taking a snapshot.
// It prevent to take snapshots to frequently.
// Default to 64
SnapshotThreshold uint64
// MetricsNamespacePrefix is the namespace to use for all rafty metrics.
// When set, the full metric name will be `<MetricsNamespacePrefix>_rafty_<metric_name>`.
// Otherwise it will be `rafty_<metric_name>`.
MetricsNamespacePrefix string
// contains filtered or unexported fields
}
Options holds config that will be modified by users
type Peer ¶
type Peer struct {
// Address is the address of a peer node, must be just the ip or ip:port
Address string `json:"address"`
// ID of the current peer
ID string `json:"id"`
// IsVoter statuates if this peer is a voting member node.
// When set to false, this node won't participate into any election campaign
IsVoter bool `json:"isVoter"`
// WaitToBePromoted is a boolean when set to true make sure the node
// can fully participate in raft operations.
// It's used when using AddVoter or AddNonVoter
WaitToBePromoted bool `json:"waitToBePromoted"`
// Decommissioning is a boolean when set to true will allow devops
// to put this node on maintenance or to lately send a membership
// removal command to be safely be removed from the cluster.
// DON'T confuse it with WaitToBePromoted flag
Decommissioning bool `json:"decommissioning"`
// contains filtered or unexported fields
}
Peer holds configuration needed by a peer node
func DecodePeers ¶
decodePeers permits to decode peers and return bytes
type RPCAskNodeIDRequest ¶
RPCAskNodeIDRequest holds the requirements to ask node id
type RPCAskNodeIDResponse ¶
type RPCAskNodeIDResponse struct {
LeaderID, LeaderAddress, PeerID string
IsVoter, AskForMembership bool
}
RPCAskNodeIDResponse holds the response from RPCAskNodeIDRequest
type RPCGetLeaderRequest ¶
RPCGetLeaderRequest holds the requirements to get the leader
type RPCGetLeaderResponse ¶
type RPCGetLeaderResponse struct {
LeaderID, LeaderAddress, PeerID string
TotalPeers int
AskForMembership bool
}
RPCGetLeaderResponse holds the response from RPCGetLeaderRequest
type RPCPreVoteRequest ¶
RPCPreVoteRequest holds the requirements to send pre vote requests
type RPCPreVoteResponse ¶
RPCPreVoteResponse holds the response from RPCPreVoteRequest
type RPCRequest ¶
type RPCRequest struct {
RPCType RPCType
Request any
Timeout time.Duration
ResponseChan chan<- RPCResponse
}
RPCRequest is used by chans in order to manage rpc requests
type RPCResponse ¶
RPCResponse is used by RPCRequest in order to reply to rpc requests
type RPCTimeoutNowRequest ¶
type RPCTimeoutNowRequest struct{}
RPCTimeoutNowRequest holds the requirements to send timeout now requests for leadership transfer
type RPCTimeoutNowResponse ¶
type RPCTimeoutNowResponse struct {
Success bool
}
RPCTimeoutNowResponse holds the response from RPCTimeoutNowRequest
type RPCType ¶
type RPCType uint8
RPCType is used to build rpc requests
const ( // AskNodeID will be used to ask node id AskNodeID RPCType = iota // GetLeader will be used to ask who is the leader GetLeader // PreVoteRequest is used during pre vote request PreVoteRequest // VoteRequest is used during Vote request VoteRequest // AppendEntriesRequest is used by the leader to replicate logs to followers AppendEntriesRequest // AppendEntriesReplicationRequest is used during append entries request on follower side AppendEntriesReplicationRequest // ForwardCommandToLeader is used to forward command to leader ForwardCommandToLeader // TimeoutNow is used during leadership transfer TimeoutNowRequest // BootstrapClusterRequest is used to bootstrap the cluster BootstrapClusterRequest // InstallSnapshotRequest is used to install snapshot in the cluster InstallSnapshotRequest )
type RPCVoteRequest ¶
type RPCVoteRequest struct {
CandidateId, CandidateAddress string
CurrentTerm, LastLogIndex, LastLogTerm uint64
CandidateForLeadershipTransfer bool
}
RPCVoteRequest holds the requirements to send vote requests
type RPCVoteResponse ¶
RPCVoteResponse holds the response from RPCVoteRequest
type Rafty ¶
type Rafty struct {
// Logger expose zerolog so it can be override
Logger *zerolog.Logger
// Address is the current address of the raft server
Address net.TCPAddr
// State of the current raft server state
// Can only be Leader, Candidate, Follower, Down
State
// contains filtered or unexported fields
}
Rafty is a struct representing the raft requirements
func NewRafty ¶
func NewRafty(address net.TCPAddr, id string, options Options, logStore LogStore, clusterStore ClusterStore, fsm StateMachine, snapshot SnapshotStore) (*Rafty, error)
NewRafty instantiate rafty with default configuration with server address and its id
func (*Rafty) AddMember ¶
AddMember is used by the current node to add the provided member the cluster. An error will be returned if any
func (*Rafty) AskForMembership ¶
AskForMembership return a boolean if the node is must ask for membership
func (*Rafty) BootstrapCluster ¶
BootstrapCluster is used by the current node to bootstrap the cluster with all initial nodes. This should be only call once and on one node. If already bootstrapped, and error will be returned
func (*Rafty) DemoteMember ¶
DemoteMember is used by the current node to demote the provided member the cluster. An error will be returned if any
func (*Rafty) ForceRemoveMember ¶
ForceRemoveMember is used by the current node to force remove the provided member the cluster. An error will be returned if any
func (*Rafty) IsBootstrapped ¶
IsBootstrapped return true if the cluster has been bootstrapped. It only applied when options.BootstrapCluster is set to true
func (*Rafty) IsRunning ¶
IsRunning return a boolean if the node is running and ready to perform its duty
func (*Rafty) LeaveOnTerminateMember ¶
func (r *Rafty) LeaveOnTerminateMember(timeout time.Duration, address, id string, isVoter bool) error
LeaveOnTerminateMember is used by the current node to remove itself from the cluster the cluster. An error will be returned if any
func (*Rafty) PromoteMember ¶
PromoteMember is used by the current node to promote the provided member the cluster. An error will be returned if any
func (*Rafty) RemoveMember ¶
RemoveMember is used by the current node to remove the provided member the cluster. An error will be returned if any
type Snapshot ¶
type Snapshot interface {
io.ReadWriteSeeker
io.Closer
// Name return the snapshot name
Name() string
// Metadata will return snapshot metadata
Metadata() SnapshotMetadata
// Discard will remove the snapshot actually in progress
Discard() error
}
Snapshot is that implements SnapshotManager
type SnapshotConfig ¶
type SnapshotConfig struct {
// contains filtered or unexported fields
}
SnapshotConfig return the config that will be use to manipulate snapshots
func NewSnapshot ¶
func NewSnapshot(dataDir string, maxSnapshots int) *SnapshotConfig
NewSnapshot will return the snapshot config that allows us to manage snapshots
func (*SnapshotConfig) List ¶
func (s *SnapshotConfig) List() (l []*SnapshotMetadata)
List will return the list of snapshots
func (*SnapshotConfig) PrepareSnapshotReader ¶
func (s *SnapshotConfig) PrepareSnapshotReader(name string) (Snapshot, io.ReadCloser, error)
PrepareSnapshotReader will return the appropriate config to read the snapshot name
func (*SnapshotConfig) PrepareSnapshotWriter ¶
func (s *SnapshotConfig) PrepareSnapshotWriter(lastIncludedIndex, lastIncludedTerm, lastAppliedConfigIndex, lastAppliedConfigTerm uint64, currentConfig Configuration) (Snapshot, error)
PrepareSnapshotWriter will prepare the requirements with the provided parameters to write a snapshot
type SnapshotManager ¶
type SnapshotManager struct {
io.ReadWriteSeeker
// contains filtered or unexported fields
}
SnapshotManager allows us to manage snapshots
func (*SnapshotManager) Close ¶
func (s *SnapshotManager) Close() error
Close will close the snapshot file
func (*SnapshotManager) Discard ¶
func (s *SnapshotManager) Discard() error
Discard will remove the snapshot actually in progress
func (*SnapshotManager) Metadata ¶
func (s *SnapshotManager) Metadata() SnapshotMetadata
Metadata will return snapshot metadata
func (*SnapshotManager) Name ¶
func (s *SnapshotManager) Name() string
Name will return the snapshot name
type SnapshotMetadata ¶
type SnapshotMetadata struct {
// LastIncludedIndex is the last index included in the snapshot
LastIncludedIndex uint64 `json:"lastIncludedIndex"`
// LastIncludedTerm is the term of LastIncludedIndex
LastIncludedTerm uint64 `json:"lastIncludedTerm"`
// Configuration holds server members
Configuration Configuration `json:"configuration"`
// LastAppliedConfig is the index of the highest log entry configuration applied to the current raft server
LastAppliedConfigIndex uint64 `json:"lastAppliedConfigIndex"`
// LastAppliedConfigTerm is the term of the highest log entry configuration applied to the current raft server
LastAppliedConfigTerm uint64 `json:"lastAppliedConfigTerm"`
// SnapshotName is the snapshot name
SnapshotName string `json:"snapshotName"`
// Size is the snapshot size
Size int64
// contains filtered or unexported fields
}
SnapshotMetadata holds the snapshot metadata
type SnapshotStore ¶
type SnapshotStore interface {
// PrepareSnapshotWriter will prepare the requirements with the provided parameters to write a snapshot
PrepareSnapshotWriter(lastIncludedIndex, lastIncludedTerm, lastAppliedConfigIndex, lastAppliedConfigTerm uint64, currentConfig Configuration) (Snapshot, error)
// PrepareSnapshotReader will return the appropriate config to read
// the snapshot name
PrepareSnapshotReader(name string) (Snapshot, io.ReadCloser, error)
// List will return the list of snapshots
List() []*SnapshotMetadata
}
SnapshotStore is an interface that allow end user to read or take snapshots
type State ¶
type State uint32
State represent the current status of the raft server. The state can only be Leader, Candidate, Follower, Down
const ( // Down state is a node that has been unreachable for a long period of time Down State = iota // Follower state is a node that participate into the voting campaign. // It's a passive node that issue no requests on his own but simply respond from the leader. // This node can become a Precandidate if all requirements are available Follower // Candidate state is a node that participate into the voting campaign. // It can become a Leader Candidate // Leader state is a node that was previously a Candidate. // It received the majority of the votes including itself and get elected as the Leader. // It will then handle all client requests. // Writes requests can only be done on the leader Leader )
type StateMachine ¶
type StateMachine interface {
// Snapshot allow the client to take snapshots
Snapshot(snapshotWriter io.Writer) error
// Restore allow the client to restore a snapshot
Restore(snapshotReader io.Reader) error
// ApplyCommand allow the client to apply a log to the state machine
ApplyCommand(log *LogEntry) ([]byte, error)
}
StateMachine is an interface allowing clients to interact with the raft cluster
type Status ¶
type Status struct {
// State is the current state of the node
State State
// Leader is the current leader of the cluster, if any
Leader leaderMap
// currentTerm is the current term of the node
CurrentTerm uint64
// CommittedIndex is the last committed index of the node
CommitIndex uint64
// LastApplied is the last applied index of the node
LastApplied uint64
// LastLogIndex is the last log index of the node
LastLogIndex uint64
// LastLogTerm is the last log term of the node
LastLogTerm uint64
// LastAppliedConfigIndex is the index of the last applied configuration
LastAppliedConfigIndex uint64
// LastAppliedConfigTerm is the term of the last applied configuration
LastAppliedConfigTerm uint64
// Configuration is the current configuration of the cluster
Configuration Configuration
}
Status represent the current status of the node
Source Files
¶
- client.go
- configuration.go
- draining_requests.go
- encoding.go
- errors.go
- generate.go
- grpc_connection.go
- grpc_types.go
- handlers.go
- log_replication.go
- log_replication_types.go
- logs.go
- logs_cache.go
- logs_cache_types.go
- logs_in_memory.go
- logs_in_memory_types.go
- logs_persistant.go
- logs_persistant_types.go
- logs_types.go
- membership.go
- membership_types.go
- metrics.go
- metrics_types.go
- rafty.go
- rafty_types.go
- raftypb.go
- rpcs.go
- rpcs_types.go
- snapshot.go
- snapshot_internal.go
- snapshot_interval_types.go
- snapshot_types.go
- state.go
- state_candidate.go
- state_candidate_types.go
- state_follower.go
- state_follower_types.go
- state_leader.go
- state_leader_types.go
- state_loop.go
- state_machine_types.go
- state_types.go
- timers.go
- utils.go
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
grpc/cluster-3-nodes
command
|
|
|
grpc/cluster-auto/server
command
|
|
|
grpc/single-server-cluster
command
|
|