zongzi

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

Zongzi

chibi style zongzi rice pyramid

A cluster coordinator for Dragonboat

Go Reference Go Report Card Go Coverage

This package provides a centralized coordination layer for Dragonboat multi-group Raft consesus clusters.

Components
  • Registry
    • Stores desired state of all hosts, shards and replicas in the cluster
    • Cluster state stored in raft shard zero (shardID: 0)
    • All cluster state changes pass through the Zongzi agent
  • Message Bus
    • Internal gPRC
    • Facilitates cluster boostrap
    • Forwards proposals and queries between nodes
  • Host Controller
    • Manages replicas on all hosts (start, stop, recover, delete, etc)
    • Responds automatically to changes in cluster state registry
  • Shard Controller
    • Reads placement policies from shard tags
    • Creates and destroys replicas to reconcile cluster state
  • Host Client
    • Used to make proposals and queries to replicas on specific hosts
    • gRPC over message bus
  • Shard Client (in progress)
    • Intelligently routes proposals and queries to active shard replicas
    • Selects nearest replica based on ping
    • Load balances across replica groups

Usage

The Zongzi Agent simplifies multi-host operations using an internal API that automatically coordinates the necessary multi-host actions required to achieve the desired cluster state.

  1. Call zongzi.(*Agent).CreateReplica from any host in the cluster
  2. The desired replica state will be stored in the registry
  3. The responsible host controller will start the replica on the desired host

The cluster state registry is replicated to every host in the cluster so every host always has an eventually consistent snapshot of the cluster topology for command/query forwarding. Changes to the cluster can be proposed via internal API so cluster changes are as simple as writing to the registry and the host controllers will reconcile the difference.

There is no need to import the dragonboat package or interact with the dragonboat host directly.

Architectural Constraints

Zongzi imposes some additional architectural constraints on top of Dragonboat:

  1. Although Dragonboat statemachine reads accept and return interface{}, all queries and responses sent through Zongzi must be expressed as []byte, just like command proposals. This serialization overhead is necessary for request forwarding because empty interfaces are not serializable. See ADR: Message Bus

  2. Although Dragonboat replica IDs are unique per shard, Zongzi replica IDs are unique per cluster. Having independent replica ids simplifies many replica operations which may have previously required both a shard id and replica id to be passed together up and down the callstack. The loss of address space (uint64 * uint64 vs uint64) is not expected to be a concern as 18.4 quintillion is still an astonomically large number. Having a materialized view of the global cluster state replicated to every host in the cluster makes it simple and efficient to derefence a replicaID to the correct host and shard. Dragonboat can't do this alone because its cluster state is decentralized.

  3. Any host may have at most one active replica of any shard. No host may ever have more than one active replica of any shard. This aligns with many Dragonboat host operations like (*NodeHost).StopShard which assumes no more than one active replica per shard per host.

  4. Dragonboat sessions are not supported (they didn't work with on disk state machines anyways). Proposal deduplication is delegated to the developer. See ADR: Raft Sessions

  5. Initializing a Zongzi cluster requires at least 3 peer agents. Single host clusters are not supported.

Setup

Requirements for make gen

  1. Install protoc
  2. Install protoc-gen-go

Project Status

This library is in alpha (v0.0.x). Expect regular breaking changes to interfaces and serious defects outside the green path. When the API is stable and the code is well tested, beta releases will begin (v0.1.0). General availability will come after the library has been thoroughly tested in at least one production deployment for an extended period of time (v1.0.0).

Documentation

Index

Constants

View Source
const (
	DefaultApiAddress    = "127.0.0.1:17001"
	DefaultRaftAddress   = "127.0.0.1:17002"
	DefaultGossipAddress = "127.0.0.1:17003"

	ZongziShardID = 0
)
View Source
const (
	LogLevelCritical = logger.CRITICAL
	LogLevelError    = logger.ERROR
	LogLevelWarning  = logger.WARNING
	LogLevelInfo     = logger.INFO
	LogLevelDebug    = logger.DEBUG
)
View Source
const (
	AgentStatus_Initializing = AgentStatus("initializing")
	AgentStatus_Joining      = AgentStatus("joining")
	AgentStatus_Pending      = AgentStatus("pending")
	AgentStatus_Ready        = AgentStatus("ready")
	AgentStatus_Rejoining    = AgentStatus("rejoining")
	AgentStatus_Stopped      = AgentStatus("stopped")

	HostStatus_Active     = HostStatus("active")
	HostStatus_Gone       = HostStatus("gone")
	HostStatus_Missing    = HostStatus("missing")
	HostStatus_New        = HostStatus("new")
	HostStatus_Recovering = HostStatus("recovering")

	ShardStatus_Active      = ShardStatus("active")
	ShardStatus_Closed      = ShardStatus("closed")
	ShardStatus_New         = ShardStatus("new")
	ShardStatus_Unavailable = ShardStatus("unavailable")

	ReplicaStatus_Active        = ReplicaStatus("active")
	ReplicaStatus_Closed        = ReplicaStatus("closed")
	ReplicaStatus_Bootstrapping = ReplicaStatus("bootstrapping")
	ReplicaStatus_Joining       = ReplicaStatus("joining")
	ReplicaStatus_New           = ReplicaStatus("new")
)

Variables

View Source
var (
	DefaultHostConfig = HostConfig{
		NodeHostDir:    "/var/lib/zongzi/raft",
		RaftAddress:    DefaultRaftAddress,
		RTTMillisecond: 10,
		NotifyCommit:   true,
		WALDir:         "/var/lib/zongzi/wal",
		Expert: config.ExpertConfig{
			LogDBFactory: tan.Factory,
		},
	}
	DefaultReplicaConfig = ReplicaConfig{
		CheckQuorum:             true,
		CompactionOverhead:      1000,
		ElectionRTT:             100,
		HeartbeatRTT:            10,
		OrderedConfigChange:     true,
		Quiesce:                 false,
		SnapshotCompressionType: config.Snappy,
		SnapshotEntries:         1000,
	}
)
View Source
var (
	ErrAborted       = dragonboat.ErrAborted
	ErrCanceled      = dragonboat.ErrCanceled
	ErrRejected      = dragonboat.ErrRejected
	ErrShardClosed   = dragonboat.ErrShardClosed
	ErrShardNotReady = dragonboat.ErrShardNotReady
	ErrTimeout       = dragonboat.ErrTimeout

	ErrAgentNotReady     = fmt.Errorf("Agent not ready")
	ErrHostNotFound      = fmt.Errorf(`Host not found`)
	ErrIDOutOfRange      = fmt.Errorf(`ID out of range`)
	ErrInvalidFactory    = fmt.Errorf(`Invalid Factory`)
	ErrReplicaNotActive  = fmt.Errorf("Replica not active")
	ErrReplicaNotAllowed = fmt.Errorf("Replica not allowed")
	ErrReplicaNotFound   = fmt.Errorf("Replica not found")
	ErrShardExists       = fmt.Errorf(`Shard already exists`)
	ErrShardNotFound     = fmt.Errorf(`Shard not found`)

	// ErrClusterNameInvalid indicates that the clusterName is invalid
	// Base36 supports only lowercase alphanumeric characters
	ErrClusterNameInvalid = fmt.Errorf("Invalid cluster name (base36 maxlen 12)")
	ClusterNameRegex      = `^[a-z0-9]{1,12}$`

	// ErrNotifyCommitDisabled is logged when non-linearizable writes are requested but disabled.
	// Set property `NotifyCommit` to `true` in `HostConfig` to add support for non-linearizable writes.
	ErrNotifyCommitDisabled = fmt.Errorf("Attempted to make a non-linearizable write while NotifyCommit is disabled")
)
View Source
var GetLogger = logger.GetLogger

Functions

func SetLogLevel

func SetLogLevel(level LogLevel)

SetLogLevel sets log level for all zongzi and dragonboat loggers.

Recommend LogLevelWarning for production.

func SetLogLevelDebug

func SetLogLevelDebug()

SetLogLevelDebug sets a debug log level for most loggers. but filters out loggers having tons of debug output.

func SetLogLevelProduction

func SetLogLevelProduction()

SetLogLevelProduction sets a good log level for production (gossip logger is a bit noisy).

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

func NewAgent

func NewAgent(clusterName string, peers []string, opts ...AgentOption) (a *Agent, err error)

func (*Agent) Client added in v0.0.5

func (a *Agent) Client(hostID string) (c *Client)

Client returns a Client for a specific host.

func (*Agent) HostID added in v0.0.3

func (a *Agent) HostID() (id string)

HostID returns host ID if host is initialized, otherwise empty string.

func (*Agent) Read

func (a *Agent) Read(ctx context.Context, fn func(*State), stale ...bool) (err error)

Read executes a callback function passing a snapshot of the cluster state.

err := agent.Read(ctx, func(s *State) error {
    log.Println(s.Index())
    return nil
})

Linear reads are enable by default to achieve "Read Your Writes" consistency following a proposal. Pass optional argument _stale_ as true to disable linearizable reads (for higher performance). State will always provide snapshot isolation, even for stale reads.

Read will block indefinitely if the prime shard is unavailable. This may prevent the agent from stopping gracefully. Pass a timeout context to avoid blocking indefinitely.

Read is thread safe and will not block writes.

func (*Agent) RegisterShard added in v0.0.6

func (a *Agent) RegisterShard(ctx context.Context, uri string, opts ...ShardOption) (shard Shard, created bool, err error)

RegisterShard creates a new shard. If shard name option is provided and shard already exists, found shard is updated.

func (*Agent) RegisterStateMachine

func (a *Agent) RegisterStateMachine(uri string, factory StateMachineFactory, config ...ReplicaConfig)

RegisterStateMachine registers a non-persistent shard type. Call before Starting agent.

func (*Agent) RegisterStateMachinePersistent added in v0.0.7

func (a *Agent) RegisterStateMachinePersistent(uri string, factory PersistentStateMachineFactory, config ...ReplicaConfig)

RegisterStateMachinePersistent registers a persistent shard type. Call before Starting agent.

func (*Agent) Start

func (a *Agent) Start() (err error)

func (*Agent) Status added in v0.0.3

func (a *Agent) Status() AgentStatus

Status returns the agent status

func (*Agent) Stop

func (a *Agent) Stop()

Stop stops the agent

type AgentOption

type AgentOption func(*Agent) error

func WithApiAddress

func WithApiAddress(advertiseAddress string, bindAddress ...string) AgentOption

func WithGossipAddress

func WithGossipAddress(advertiseAddress string, bindAddress ...string) AgentOption

func WithHostConfig

func WithHostConfig(cfg HostConfig) AgentOption

func WithHostTags added in v0.0.5

func WithHostTags(tags ...string) AgentOption

func WithRaftAddress added in v0.0.8

func WithRaftAddress(raftAddress string) AgentOption

func WithRaftEventListener added in v0.0.6

func WithRaftEventListener(listener RaftEventListener) AgentOption

func WithReplicaConfig

func WithReplicaConfig(cfg ReplicaConfig) AgentOption

func WithSystemEventListener added in v0.0.6

func WithSystemEventListener(listener SystemEventListener) AgentOption

type AgentStatus

type AgentStatus string

type Client added in v0.0.5

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Apply added in v0.0.5

func (c *Client) Apply(ctx context.Context, shardID uint64, cmd []byte) (value uint64, data []byte, err error)

func (*Client) Commit added in v0.0.5

func (c *Client) Commit(ctx context.Context, shardID uint64, cmd []byte) (err error)

func (*Client) Ping added in v0.0.5

func (c *Client) Ping(ctx context.Context) (err error)

func (*Client) Query added in v0.0.5

func (c *Client) Query(ctx context.Context, shardID uint64, query []byte, stale ...bool) (value uint64, data []byte, err error)

type Entry

type Entry = statemachine.Entry

type GossipConfig

type GossipConfig = config.GossipConfig

type Host

type Host struct {
	ID      string            `json:"id"`
	Created uint64            `json:"created"`
	Updated uint64            `json:"updated"`
	Status  HostStatus        `json:"status"`
	Tags    map[string]string `json:"tags"`

	ApiAddress  string   `json:"apiAddress"`
	RaftAddress string   `json:"raftAddress"`
	ShardTypes  []string `json:"shardTypes"`
}

type HostConfig

type HostConfig = config.NodeHostConfig

type HostStatus

type HostStatus string

type LeaderInfo

type LeaderInfo = raftio.LeaderInfo

type LogLevel

type LogLevel = logger.LogLevel

type Logger added in v0.0.8

type Logger = logger.ILogger

type PersistentStateMachine

type PersistentStateMachine interface {
	Open(stopc <-chan struct{}) (index uint64, err error)
	Update(entries []Entry) []Entry
	Query(ctx context.Context, query []byte) *Result
	Watch(ctx context.Context, query []byte, result chan<- *Result)
	PrepareSnapshot() (cursor any, err error)
	SaveSnapshot(cursor any, w io.Writer, close <-chan struct{}) error
	RecoverFromSnapshot(r io.Reader, close <-chan struct{}) error
	Sync() error
	Close() error
}

PersistentStateMachine is a StateMachine where the state is persisted to a medium (such as disk) that can survive restart. During compaction, calls to Snapshot are replaced with calls to Sync which effectively flushes state to the persistent medium. SaveSnapshot and RecoverFromSnapshot are used to replicate full on-disk state to new replicas.

type PersistentStateMachineFactory

type PersistentStateMachineFactory = func(shardID uint64, replicaID uint64) PersistentStateMachine

PersistentStateMachineFactory is a function that returns a PersistentStateMachine

type RaftEventListener added in v0.0.6

type RaftEventListener = raftio.IRaftEventListener

type Replica

type Replica struct {
	ID      uint64            `json:"id"`
	Created uint64            `json:"created"`
	Updated uint64            `json:"updated"`
	Status  ReplicaStatus     `json:"status"`
	Tags    map[string]string `json:"tags"`

	HostID      string `json:"hostID"`
	IsNonVoting bool   `json:"isNonVoting"`
	IsWitness   bool   `json:"isWitness"`
	ShardID     uint64 `json:"shardID"`
}

type ReplicaConfig

type ReplicaConfig = config.Config

type ReplicaStatus

type ReplicaStatus string

type Result

type Result = statemachine.Result

func GetResult

func GetResult() *Result

GetResult can be used to efficiently retrieve an empty Result from a global pool. It is recommended to use this method to instantiate Result objects returned by Lookup or sent over Watch channels as they will be automatically returned to the pool to reduce allocation overhead.

type Shard

type Shard struct {
	ID      uint64            `json:"id"`
	Created uint64            `json:"created"`
	Updated uint64            `json:"updated"`
	Status  ShardStatus       `json:"status"`
	Tags    map[string]string `json:"tags"`

	Type string `json:"type"`
	Name string `json:"name"`
}

type ShardOption added in v0.0.6

type ShardOption func(*Shard) error

func WithName added in v0.0.6

func WithName(name string) ShardOption

func WithPlacementCover added in v0.0.8

func WithPlacementCover(tagKeys ...string) ShardOption

func WithPlacementMembers added in v0.0.6

func WithPlacementMembers(n int, tags ...string) ShardOption

func WithPlacementReplicas added in v0.0.6

func WithPlacementReplicas(group string, n int, tags ...string) ShardOption

func WithPlacementVary added in v0.0.6

func WithPlacementVary(tagKeys ...string) ShardOption

type ShardStatus

type ShardStatus string

type SnapshotFile

type SnapshotFile = statemachine.SnapshotFile

type SnapshotFileCollection

type SnapshotFileCollection = statemachine.ISnapshotFileCollection

type State

type State struct {
	// contains filtered or unexported fields
}

func (*State) Host added in v0.0.5

func (fsm *State) Host(id string) (h Host, ok bool)

Host returns the host with the specified ID or ok false if not found.

func (*State) HostID added in v0.0.8

func (fsm *State) HostID() string

HostID returns the ID of the last created Host

func (*State) HostIterate added in v0.0.2

func (fsm *State) HostIterate(fn func(h Host) bool)

HostIterate executes a callback for every host in the cluster. Return true to continue iterating, false to stop.

var hostCount int
agent.Read(func(s *zongzi.State) {
    s.HostIterate(func(h Host) bool {
        hostCount++
        return true
    })
})

func (*State) HostIterateByShardType added in v0.0.5

func (fsm *State) HostIterateByShardType(shardType string, fn func(h Host) bool)

HostIterateByShardType executes a callback for every host in the cluster supporting to the provided shard type, ordered by host id ascending. Return true to continue iterating, false to stop.

func (*State) HostIterateByTag added in v0.0.5

func (fsm *State) HostIterateByTag(tag string, fn func(h Host) bool)

HostIterateByTag executes a callback for every host in the cluster matching the specified tag, ordered by host id ascending. Return true to continue iterating, false to stop.

func (*State) Index

func (fsm *State) Index() (val uint64)

func (*State) Replica added in v0.0.5

func (fsm *State) Replica(id uint64) (r Replica, ok bool)

Replica returns the replica with the specified id or ok false if not found.

func (*State) ReplicaID added in v0.0.8

func (fsm *State) ReplicaID() uint64

ReplicaID returns the ID of the last created Replica

func (*State) ReplicaIterate added in v0.0.2

func (fsm *State) ReplicaIterate(fn func(r Replica) bool)

ReplicaIterate executes a callback for every replica in the cluster ordered by replica id ascending. Return true to continue iterating, false to stop.

func (*State) ReplicaIterateByHostID added in v0.0.2

func (fsm *State) ReplicaIterateByHostID(hostID string, fn func(r Replica) bool)

ReplicaIterateByHostID executes a callback for every replica in the cluster belonging to the provided host id, ordered by replica id ascending. Return true to continue iterating, false to stop.

func (*State) ReplicaIterateByShardID added in v0.0.2

func (fsm *State) ReplicaIterateByShardID(shardID uint64, fn func(r Replica) bool)

ReplicaIterateByShardID executes a callback for every replica in the cluster belonging to the provided shard id, ordered by replica id ascending. Return true to continue iterating, false to stop.

func (*State) ReplicaIterateByTag added in v0.0.5

func (fsm *State) ReplicaIterateByTag(tag string, fn func(r Replica) bool)

ReplicaIterateByTag executes a callback for every host in the cluster matching the specified tag, ordered by host id ascending. Return true to continue iterating, false to stop.

func (*State) Save added in v0.0.2

func (fsm *State) Save(w io.Writer) error

func (*State) Shard added in v0.0.5

func (fsm *State) Shard(id uint64) (s Shard, ok bool)

Shard returns the shard with the specified id or ok false if not found.

func (*State) ShardFindByName added in v0.0.6

func (fsm *State) ShardFindByName(name string) (s Shard, ok bool)

ShardFindByName returns the shard with the specified name or ok false if not found.

func (*State) ShardID added in v0.0.8

func (fsm *State) ShardID() uint64

ShardID returns the ID of the last created Shard

func (*State) ShardIterate added in v0.0.2

func (fsm *State) ShardIterate(fn func(s Shard) bool)

ShardIterate executes a callback for every shard in the cluster ordered by shard id ascending. Return true to continue iterating, false to stop.

func (*State) ShardIterateByTag added in v0.0.5

func (fsm *State) ShardIterateByTag(tag string, fn func(r Shard) bool)

ShardIterateByTag executes a callback for every shard in the cluster matching the specified tag, ordered by shard id ascending. Return true to continue iterating, false to stop.

func (*State) ShardIterateUpdatedAfter added in v0.0.6

func (fsm *State) ShardIterateUpdatedAfter(index uint64, fn func(r Shard) bool)

ShardIterateUpdatedAfter executes a callback for every shard in the cluster having an updated index greater than the supplied index

func (*State) ShardMembers added in v0.0.4

func (fsm *State) ShardMembers(id uint64) map[uint64]string

ShardMembers returns a map of shard members (replicaID: hostID)

type StateMachine

type StateMachine interface {
	Update(entries []Entry) []Entry
	Query(ctx context.Context, query []byte) *Result
	Watch(ctx context.Context, query []byte, result chan<- *Result)
	PrepareSnapshot() (cursor any, err error)
	SaveSnapshot(cursor any, w io.Writer, c SnapshotFileCollection, close <-chan struct{}) error
	RecoverFromSnapshot(r io.Reader, f []SnapshotFile, close <-chan struct{}) error
	Close() error
}

StateMachine is a deterministic finite state machine. Snapshots are requested during log compaction to ensure that the in-memory state can be recovered following a restart. If you expect a dataset larger than memory, a persistent state machine may be more appropriate.

Lookup may be called concurrently with Update and SaveSnapshot. It is the caller's responsibility to ensure that snapshots are generated using snapshot isolation. This can be achieved using Multi Version Concurrency Control (MVCC). A simple mutex can also be used if blocking writes during read is acceptable.

type StateMachineFactory

type StateMachineFactory = func(shardID uint64, replicaID uint64) StateMachine

StateMachineFactory is a function that returns a StateMachine

type SystemEventListener added in v0.0.6

type SystemEventListener = raftio.ISystemEventListener

Directories

Path Synopsis
_example
kv1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL