Documentation ¶
Index ¶
- Constants
- Variables
- func SetLogLevel(level LogLevel)
- func SetLogLevelDebug()
- func SetLogLevelProduction()
- type Agent
- func (a *Agent) Client(hostID string) (c *Client)
- func (a *Agent) HostID() (id string)
- func (a *Agent) Read(ctx context.Context, fn func(*State), stale ...bool) (err error)
- func (a *Agent) RegisterShard(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error)
- func (a *Agent) RegisterStateMachine(uri string, factory StateMachineFactory, config ...ReplicaConfig)
- func (a *Agent) RegisterStateMachinePersistent(uri string, factory PersistentStateMachineFactory, config ...ReplicaConfig)
- func (a *Agent) Start() (err error)
- func (a *Agent) Status() AgentStatus
- func (a *Agent) Stop()
- type AgentOption
- func WithApiAddress(advertiseAddress string, bindAddress ...string) AgentOption
- func WithGossipAddress(advertiseAddress string, bindAddress ...string) AgentOption
- func WithHostConfig(cfg HostConfig) AgentOption
- func WithHostTags(tags ...string) AgentOption
- func WithRaftAddress(raftAddress string) AgentOption
- func WithRaftEventListener(listener RaftEventListener) AgentOption
- func WithReplicaConfig(cfg ReplicaConfig) AgentOption
- func WithSystemEventListener(listener SystemEventListener) AgentOption
- type AgentStatus
- type Client
- func (c *Client) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error)
- func (c *Client) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error)
- func (c *Client) Ping(ctx context.Context) (err error)
- func (c *Client) Query(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error)
- type Entry
- type GossipConfig
- type Host
- type HostConfig
- type HostStatus
- type LeaderInfo
- type LogLevel
- type Logger
- type PersistentStateMachine
- type PersistentStateMachineFactory
- type RaftEventListener
- type Replica
- type ReplicaConfig
- type ReplicaStatus
- type Result
- type Shard
- type ShardOption
- type ShardStatus
- type SnapshotFile
- type SnapshotFileCollection
- type State
- func (fsm *State) Host(id string) (h Host, ok bool)
- func (fsm *State) HostID() string
- func (fsm *State) HostIterate(fn func(h Host) bool)
- func (fsm *State) HostIterateByShardType(shardType string, fn func(h Host) bool)
- func (fsm *State) HostIterateByTag(tag string, fn func(h Host) bool)
- func (fsm *State) Index() (val uint64)
- func (fsm *State) Replica(id uint64) (r Replica, ok bool)
- func (fsm *State) ReplicaID() uint64
- func (fsm *State) ReplicaIterate(fn func(r Replica) bool)
- func (fsm *State) ReplicaIterateByHostID(hostID string, fn func(r Replica) bool)
- func (fsm *State) ReplicaIterateByShardID(shardID uint64, fn func(r Replica) bool)
- func (fsm *State) ReplicaIterateByTag(tag string, fn func(r Replica) bool)
- func (fsm *State) Save(w io.Writer) error
- func (fsm *State) Shard(id uint64) (s Shard, ok bool)
- func (fsm *State) ShardFindByName(name string) (s Shard, ok bool)
- func (fsm *State) ShardID() uint64
- func (fsm *State) ShardIterate(fn func(s Shard) bool)
- func (fsm *State) ShardIterateByTag(tag string, fn func(r Shard) bool)
- func (fsm *State) ShardIterateUpdatedAfter(index uint64, fn func(r Shard) bool)
- func (fsm *State) ShardMembers(id uint64) map[uint64]string
- type StateMachine
- type StateMachineFactory
- type SystemEventListener
Constants ¶
const ( DefaultApiAddress = "127.0.0.1:17001" DefaultRaftAddress = "127.0.0.1:17002" DefaultGossipAddress = "127.0.0.1:17003" ZongziShardID = 0 )
const ( LogLevelCritical = logger.CRITICAL LogLevelError = logger.ERROR LogLevelWarning = logger.WARNING LogLevelInfo = logger.INFO LogLevelDebug = logger.DEBUG )
const ( AgentStatus_Initializing = AgentStatus("initializing") AgentStatus_Joining = AgentStatus("joining") AgentStatus_Pending = AgentStatus("pending") AgentStatus_Ready = AgentStatus("ready") AgentStatus_Rejoining = AgentStatus("rejoining") AgentStatus_Stopped = AgentStatus("stopped") HostStatus_Active = HostStatus("active") HostStatus_Gone = HostStatus("gone") HostStatus_Missing = HostStatus("missing") HostStatus_New = HostStatus("new") HostStatus_Recovering = HostStatus("recovering") ShardStatus_Active = ShardStatus("active") ShardStatus_Closed = ShardStatus("closed") ShardStatus_New = ShardStatus("new") ReplicaStatus_Active = ReplicaStatus("active") ReplicaStatus_Closed = ReplicaStatus("closed") ReplicaStatus_Bootstrapping = ReplicaStatus("bootstrapping") ReplicaStatus_Joining = ReplicaStatus("joining") ReplicaStatus_New = ReplicaStatus("new") )
Variables ¶
var ( DefaultHostConfig = HostConfig{ NodeHostDir: "/var/lib/zongzi/raft", RaftAddress: DefaultRaftAddress, RTTMillisecond: 10, NotifyCommit: true, WALDir: "/var/lib/zongzi/wal", Expert: config.ExpertConfig{ LogDBFactory: tan.Factory, }, } DefaultReplicaConfig = ReplicaConfig{ CheckQuorum: true, CompactionOverhead: 1000, ElectionRTT: 100, HeartbeatRTT: 10, OrderedConfigChange: true, Quiesce: false, SnapshotCompressionType: config.Snappy, SnapshotEntries: 1000, } )
var ( ErrAborted = dragonboat.ErrAborted ErrCanceled = dragonboat.ErrCanceled ErrRejected = dragonboat.ErrRejected ErrShardClosed = dragonboat.ErrShardClosed ErrShardNotReady = dragonboat.ErrShardNotReady ErrTimeout = dragonboat.ErrTimeout ErrAgentNotReady = fmt.Errorf("Agent not ready") ErrHostNotFound = fmt.Errorf(`Host not found`) ErrIDOutOfRange = fmt.Errorf(`ID out of range`) ErrInvalidFactory = fmt.Errorf(`Invalid Factory`) ErrReplicaNotActive = fmt.Errorf("Replica not active") ErrReplicaNotAllowed = fmt.Errorf("Replica not allowed") ErrReplicaNotFound = fmt.Errorf("Replica not found") ErrShardExists = fmt.Errorf(`Shard already exists`) ErrShardNotFound = fmt.Errorf(`Shard not found`) // ErrClusterNameInvalid indicates that the clusterName is invalid // Base36 supports only lowercase alphanumeric characters ErrClusterNameInvalid = fmt.Errorf("Invalid cluster name (base36 maxlen 12)") ClusterNameRegex = `^[a-z0-9]{1,12}$` // ErrNotifyCommitDisabled is logged when non-linearizable writes are requested but disabled. // Set property `NotifyCommit` to `true` in `HostConfig` to add support for non-linearizable writes. ErrNotifyCommitDisabled = fmt.Errorf("Attempted to make a non-linearizable write while NotifyCommit is disabled") )
var GetLogger = logger.GetLogger
Functions ¶
func SetLogLevel ¶
func SetLogLevel(level LogLevel)
SetLogLevel sets log level for all zongzi and dragonboat loggers.
Recommend LogLevelWarning for production.
func SetLogLevelDebug ¶
func SetLogLevelDebug()
SetLogLevelDebug sets a debug log level for most loggers. but filters out loggers having tons of debug output.
func SetLogLevelProduction ¶
func SetLogLevelProduction()
SetLogLevelProduction sets a good log level for production (gossip logger is a bit noisy).
Types ¶
type Agent ¶
type Agent struct {
// contains filtered or unexported fields
}
func NewAgent ¶
func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent, err error)
func (*Agent) HostID ¶ added in v0.0.3
HostID returns host ID if host is initialized, otherwise empty string.
func (*Agent) Read ¶
Read executes a callback function passing a snapshot of the cluster state.
err := agent.Read(ctx, func(s *State) error { log.Println(s.Index()) return nil })
Linear reads are enable by default to achieve "Read Your Writes" consistency following a proposal. Pass optional argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot isolation, even for stale reads.
Read will block indefinitely if the prime shard is unavailable. This may prevent the agent from stopping gracefully. Pass a timeout context to avoid blocking indefinitely.
Read is thread safe and will not block writes.
func (*Agent) RegisterShard ¶ added in v0.0.6
func (a *Agent) RegisterShard(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error)
RegisterShard creates a new shard. If shard name option is provided and shard already exists, found shard is updated.
func (*Agent) RegisterStateMachine ¶
func (a *Agent) RegisterStateMachine(uri string, factory StateMachineFactory, config ...ReplicaConfig)
RegisterStateMachine registers a non-persistent shard type. Call before Starting agent.
func (*Agent) RegisterStateMachinePersistent ¶ added in v0.0.7
func (a *Agent) RegisterStateMachinePersistent(uri string, factory PersistentStateMachineFactory, config ...ReplicaConfig)
RegisterStateMachinePersistent registers a persistent shard type. Call before Starting agent.
func (*Agent) Status ¶ added in v0.0.3
func (a *Agent) Status() AgentStatus
Status returns the agent status
type AgentOption ¶
func WithApiAddress ¶
func WithApiAddress(advertiseAddress string, bindAddress ...string) AgentOption
func WithGossipAddress ¶
func WithGossipAddress(advertiseAddress string, bindAddress ...string) AgentOption
func WithHostConfig ¶
func WithHostConfig(cfg HostConfig) AgentOption
func WithHostTags ¶ added in v0.0.5
func WithHostTags(tags ...string) AgentOption
func WithRaftAddress ¶ added in v0.0.8
func WithRaftAddress(raftAddress string) AgentOption
func WithRaftEventListener ¶ added in v0.0.6
func WithRaftEventListener(listener RaftEventListener) AgentOption
func WithReplicaConfig ¶
func WithReplicaConfig(cfg ReplicaConfig) AgentOption
func WithSystemEventListener ¶ added in v0.0.6
func WithSystemEventListener(listener SystemEventListener) AgentOption
type AgentStatus ¶
type AgentStatus string
type Client ¶ added in v0.0.5
type Client struct {
// contains filtered or unexported fields
}
type Entry ¶
type Entry = statemachine.Entry
type GossipConfig ¶
type GossipConfig = config.GossipConfig
type HostConfig ¶
type HostConfig = config.NodeHostConfig
type HostStatus ¶
type HostStatus string
type LeaderInfo ¶
type LeaderInfo = raftio.LeaderInfo
type PersistentStateMachine ¶
type PersistentStateMachine interface { Open(stopc <-chan struct{}) (index uint64, err error) Update(entries []Entry) []Entry Query(ctx context.Context, query []byte) *Result Watch(ctx context.Context, query []byte, result chan<- *Result) PrepareSnapshot() (cursor any, err error) SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error Sync() error Close() error }
PersistentStateMachine is a StateMachine where the state is persisted to a medium (such as disk) that can survive restart. During compaction, calls to Snapshot are replaced with calls to Sync which effectively flushes state to the persistent medium. SaveSnapshot and RecoverFromSnapshot are used to replicate full on-disk state to new replicas.
type PersistentStateMachineFactory ¶
type PersistentStateMachineFactory = func(shardID uint64, replicaID uint64) PersistentStateMachine
PersistentStateMachineFactory is a function that returns a PersistentStateMachine
type RaftEventListener ¶ added in v0.0.6
type RaftEventListener = raftio.IRaftEventListener
type Replica ¶
type Replica struct { ID uint64 `json:"id"` Created uint64 `json:"created"` Updated uint64 `json:"updated"` Status ReplicaStatus `json:"status"` Tags map[string]string `json:"tags"` HostID string `json:"hostID"` IsNonVoting bool `json:"isNonVoting"` IsWitness bool `json:"isWitness"` ShardID uint64 `json:"shardID"` }
type ReplicaConfig ¶
type ReplicaStatus ¶
type ReplicaStatus string
type Result ¶
type Result = statemachine.Result
func GetResult ¶
func GetResult() *Result
GetResult can be used to efficiently retrieve an empty Result from a global pool. It is recommended to use this method to instantiate Result objects returned by Lookup or sent over Watch channels as they will be automatically returned to the pool to reduce allocation overhead.
type ShardOption ¶ added in v0.0.6
func WithName ¶ added in v0.0.6
func WithName(name string) ShardOption
func WithPlacementCover ¶ added in v0.0.8
func WithPlacementCover(tagKeys ...string) ShardOption
func WithPlacementMembers ¶ added in v0.0.6
func WithPlacementMembers(n int, tags ...string) ShardOption
func WithPlacementReplicas ¶ added in v0.0.6
func WithPlacementReplicas(group string, n int, tags ...string) ShardOption
func WithPlacementVary ¶ added in v0.0.6
func WithPlacementVary(tagKeys ...string) ShardOption
type ShardStatus ¶
type ShardStatus string
type SnapshotFile ¶
type SnapshotFile = statemachine.SnapshotFile
type SnapshotFileCollection ¶
type SnapshotFileCollection = statemachine.ISnapshotFileCollection
type State ¶
type State struct {
// contains filtered or unexported fields
}
func (*State) Host ¶ added in v0.0.5
Host returns the host with the specified ID or ok false if not found.
func (*State) HostIterate ¶ added in v0.0.2
HostIterate executes a callback for every host in the cluster. Return true to continue iterating, false to stop.
var hostCount int agent.Read(func(s *zongzi.State) { s.HostIterate(func(h Host) bool { hostCount++ return true }) })
func (*State) HostIterateByShardType ¶ added in v0.0.5
HostIterateByShardType executes a callback for every host in the cluster supporting to the provided shard type, ordered by host id ascending. Return true to continue iterating, false to stop.
func (*State) HostIterateByTag ¶ added in v0.0.5
HostIterateByTag executes a callback for every host in the cluster matching the specified tag, ordered by host id ascending. Return true to continue iterating, false to stop.
func (*State) Replica ¶ added in v0.0.5
Replica returns the replica with the specified id or ok false if not found.
func (*State) ReplicaIterate ¶ added in v0.0.2
ReplicaIterate executes a callback for every replica in the cluster ordered by replica id ascending. Return true to continue iterating, false to stop.
func (*State) ReplicaIterateByHostID ¶ added in v0.0.2
ReplicaIterateByHostID executes a callback for every replica in the cluster belonging to the provided host id, ordered by replica id ascending. Return true to continue iterating, false to stop.
func (*State) ReplicaIterateByShardID ¶ added in v0.0.2
ReplicaIterateByShardID executes a callback for every replica in the cluster belonging to the provided shard id, ordered by replica id ascending. Return true to continue iterating, false to stop.
func (*State) ReplicaIterateByTag ¶ added in v0.0.5
ReplicaIterateByTag executes a callback for every host in the cluster matching the specified tag, ordered by host id ascending. Return true to continue iterating, false to stop.
func (*State) Shard ¶ added in v0.0.5
Shard returns the shard with the specified id or ok false if not found.
func (*State) ShardFindByName ¶ added in v0.0.6
ShardFindByName returns the shard with the specified name or ok false if not found.
func (*State) ShardIterate ¶ added in v0.0.2
ShardIterate executes a callback for every shard in the cluster ordered by shard id ascending. Return true to continue iterating, false to stop.
func (*State) ShardIterateByTag ¶ added in v0.0.5
ShardIterateByTag executes a callback for every shard in the cluster matching the specified tag, ordered by shard id ascending. Return true to continue iterating, false to stop.
func (*State) ShardIterateUpdatedAfter ¶ added in v0.0.6
ShardIterateUpdatedAfter executes a callback for every shard in the cluster having an updated index greater than the supplied index
type StateMachine ¶
type StateMachine interface { Update(entries []Entry) []Entry Query(ctx context.Context, query []byte) *Result Watch(ctx context.Context, query []byte, result chan<- *Result) PrepareSnapshot() (cursor any, err error) SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error Close() error }
StateMachine is a deterministic finite state machine. Snapshots are requested during log compaction to ensure that the in-memory state can be recovered following a restart. If you expect a dataset larger than memory, a persistent state machine may be more appropriate.
Lookup may be called concurrently with Update and SaveSnapshot. It is the caller's responsibility to ensure that snapshots are generated using snapshot isolation. This can be achieved using Multi Version Concurrency Control (MVCC). A simple mutex can also be used if blocking writes during read is acceptable.
type StateMachineFactory ¶
type StateMachineFactory = func(shardID uint64, replicaID uint64) StateMachine
StateMachineFactory is a function that returns a StateMachine
type SystemEventListener ¶ added in v0.0.6
type SystemEventListener = raftio.ISystemEventListener