raft

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2024 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewHcLogrusLogger

func NewHcLogrusLogger() hclog.Logger

Types

type BoltDbFsm

type BoltDbFsm struct {
	// contains filtered or unexported fields
}

func NewFsm

func NewFsm(dataDir string, decoders command.Decoders, indexTracker IndexTracker, eventDispatcher event2.Dispatcher) *BoltDbFsm

func (*BoltDbFsm) Apply

func (self *BoltDbFsm) Apply(log *raft.Log) interface{}

func (*BoltDbFsm) GetCurrentState

func (self *BoltDbFsm) GetCurrentState(raft *raft.Raft) (uint64, *raft.Configuration)

func (*BoltDbFsm) GetDb

func (self *BoltDbFsm) GetDb() boltz.Db

func (*BoltDbFsm) Init

func (self *BoltDbFsm) Init() error

func (*BoltDbFsm) Restore

func (self *BoltDbFsm) Restore(snapshot io.ReadCloser) error

func (*BoltDbFsm) Snapshot

func (self *BoltDbFsm) Snapshot() (raft.FSMSnapshot, error)

func (*BoltDbFsm) StoreConfiguration

func (self *BoltDbFsm) StoreConfiguration(index uint64, configuration raft.Configuration)

type ClusterEvent

type ClusterEvent uint32
const (
	ClusterEventReadOnly         ClusterEvent = 0
	ClusterEventReadWrite        ClusterEvent = 1
	ClusterEventLeadershipGained ClusterEvent = 2
	ClusterEventLeadershipLost   ClusterEvent = 3
)

func (ClusterEvent) String

func (self ClusterEvent) String() string

type ClusterState

type ClusterState uint8

func (ClusterState) IsLeader

func (c ClusterState) IsLeader() bool

func (ClusterState) IsReadWrite

func (c ClusterState) IsReadWrite() bool

func (ClusterState) String

func (c ClusterState) String() string

type Config

type Config struct {
	Recover               bool
	DataDir               string
	MinClusterSize        uint32
	AdvertiseAddress      transport.Address
	BootstrapMembers      []string
	CommandHandlerOptions struct {
		MaxQueueSize uint16
	}

	SnapshotInterval  *time.Duration
	SnapshotThreshold *uint32
	TrailingLogs      *uint32
	MaxAppendEntries  *uint32

	ElectionTimeout    *time.Duration
	CommitTimeout      *time.Duration
	HeartbeatTimeout   *time.Duration
	LeaderLeaseTimeout *time.Duration

	LogLevel *string
	Logger   hclog.Logger
}

func (*Config) Configure

func (self *Config) Configure(conf *raft.Config)

func (*Config) ConfigureReloadable

func (self *Config) ConfigureReloadable(conf *raft.ReloadableConfig)

type Controller

type Controller struct {
	Config *Config
	Mesh   mesh.Mesh
	Raft   *raft.Raft
	Fsm    *BoltDbFsm
	// contains filtered or unexported fields
}

Controller manages RAFT related state and operations

func NewController

func NewController(env Env, migrationMgr MigrationManager) *Controller

func (*Controller) ApplyEncodedCommand

func (self *Controller) ApplyEncodedCommand(encoded []byte) (uint64, error)

ApplyEncodedCommand applies the command to the RAFT distributed log

func (*Controller) ApplyWithTimeout

func (self *Controller) ApplyWithTimeout(log []byte, timeout time.Duration) (interface{}, uint64, error)

ApplyWithTimeout applies the given command to the RAFT distributed log with the given timeout

func (*Controller) Bootstrap

func (self *Controller) Bootstrap() error

func (*Controller) ConfigureMeshHandlers

func (self *Controller) ConfigureMeshHandlers(bindHandler channel.BindHandler)

func (*Controller) CtrlAddresses

func (self *Controller) CtrlAddresses() (uint64, []string)

func (*Controller) Dispatch

func (self *Controller) Dispatch(cmd command.Command) error

Dispatch dispatches the given command to the current leader. If the current node is the leader, the command will be applied and the result returned

func (*Controller) GetCloseNotify

func (self *Controller) GetCloseNotify() <-chan struct{}

func (*Controller) GetDb

func (self *Controller) GetDb() boltz.Db

GetDb returns the DB instance

func (*Controller) GetLeaderAddr

func (self *Controller) GetLeaderAddr() string

GetLeaderAddr returns the current leader address, which may be blank if there is no leader currently

func (*Controller) GetMesh

func (self *Controller) GetMesh() mesh.Mesh

GetMesh returns the related Mesh instance

func (*Controller) GetMetricsRegistry

func (self *Controller) GetMetricsRegistry() metrics.Registry

func (*Controller) GetPeers

func (self *Controller) GetPeers() map[string]channel.Channel

func (*Controller) GetRaft

func (self *Controller) GetRaft() *raft.Raft

GetRaft returns the managed raft instance

func (*Controller) GetRateLimiter added in v0.33.0

func (self *Controller) GetRateLimiter() rate.RateLimiter

func (*Controller) HandleAddPeer

func (self *Controller) HandleAddPeer(req *cmd_pb.AddPeerRequest) error

func (*Controller) HandleAddPeerAsLeader

func (self *Controller) HandleAddPeerAsLeader(req *cmd_pb.AddPeerRequest) error

func (*Controller) HandleRemovePeer

func (self *Controller) HandleRemovePeer(req *cmd_pb.RemovePeerRequest) error

func (*Controller) HandleRemovePeerAsLeader

func (self *Controller) HandleRemovePeerAsLeader(req *cmd_pb.RemovePeerRequest) error

func (*Controller) HandleTransferLeadership

func (self *Controller) HandleTransferLeadership(req *cmd_pb.TransferLeadershipRequest) error

func (*Controller) HandleTransferLeadershipAsLeader

func (self *Controller) HandleTransferLeadershipAsLeader(req *cmd_pb.TransferLeadershipRequest) error

func (*Controller) Init

func (self *Controller) Init() error

Init sets up the Mesh and Raft instances

func (*Controller) IsDistributed

func (self *Controller) IsDistributed() bool

func (*Controller) IsLeader

func (self *Controller) IsLeader() bool

IsLeader returns true if the current node is the RAFT leader

func (*Controller) IsLeaderOrLeaderless

func (self *Controller) IsLeaderOrLeaderless() bool

func (*Controller) IsReadOnlyMode

func (self *Controller) IsReadOnlyMode() bool

func (*Controller) Join

func (self *Controller) Join(req *cmd_pb.AddPeerRequest) error

Join adds the given node to the raft cluster

func (*Controller) ListMembers

func (self *Controller) ListMembers() ([]*Member, error)

func (*Controller) ObserveLeaderChanges

func (self *Controller) ObserveLeaderChanges()

func (*Controller) RegisterClusterEventHandler

func (self *Controller) RegisterClusterEventHandler(f func(event ClusterEvent, state ClusterState))

func (*Controller) RemoveServer

func (self *Controller) RemoveServer(id string) error

RemoveServer removes the node specified by the given id from the raft cluster

func (*Controller) RenderJsonConfig

func (self *Controller) RenderJsonConfig() (string, error)

type Env

type Env interface {
	GetId() *identity.TokenId
	GetVersionProvider() versions.VersionProvider
	GetCommandRateLimiterConfig() command.RateLimiterConfig
	GetRaftConfig() *Config
	GetMetricsRegistry() metrics.Registry
	GetEventDispatcher() event.Dispatcher
	GetCloseNotify() <-chan struct{}
	GetHelloHeaderProviders() []mesh.HeaderProvider
}

type IndexTracker

type IndexTracker interface {
	Index() uint64
	WaitForIndex(index uint64, deadline time.Time) error
	NotifyOfIndex(index uint64)
}

func NewIndexTracker

func NewIndexTracker() IndexTracker

type Member

type Member struct {
	Id        string `json:"id"`
	Addr      string `json:"addr"`
	Voter     bool   `json:"isVoter"`
	Leader    bool   `json:"isLeader"`
	Version   string `json:"version"`
	Connected bool   `json:"isConnected"`
}

type MigrationManager

type MigrationManager interface {
	TryInitializeRaftFromBoltDb() error
	InitializeRaftFromBoltDb(srcDb string) error
}

type RouterDispatchCallback

type RouterDispatchCallback func(*raft.Configuration) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL