raft

package
v3.0.0-...-a9a26d0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2022 License: MIT Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ReadShardFromFile

func ReadShardFromFile(dir string, replicaId uint64) (*gossip.RaftShardMessage, error)

ReadShardFromFile 集群启动成功,外部管理Storage的程序需要调用读取本地文件的shard信息

func WriteMetadataToFile

func WriteMetadataToFile(dir string, id uint16, meta *RaftMetadata) error

存储raft启动的meta数据

Types

type GRPCConfig

type GRPCConfig struct {
	KeepAliveMinTime     int
	WriteBufferSize      int
	ReadBufferSize       int
	MaxRecvMsgSize       int
	MaxSendMsgSize       int
	MaxConcurrentStreams uint32
}

type MoveTo

type MoveTo struct {
	pb.UnimplementedMoveToServer
	// contains filtered or unexported fields
}

func NewMoveTo

func NewMoveTo(s *Storage) *MoveTo

func (*MoveTo) MoveToInvoke

func (m *MoveTo) MoveToInvoke(ctx context.Context, req *pb.MoveToCommand) (*pb.MoveToResponse, error)

func (*MoveTo) RaftNodeInvoke

func (m *MoveTo) RaftNodeInvoke(ctx context.Context, req *pb.RaftInvokeOp) (*pb.MoveToResponse, error)

type RaftConfig

type RaftConfig struct {
	LogDir   string
	LogLevel zapcore.Level

	// 本机IP地址
	HostIP string

	// ReplicaId一旦生成不能变动
	ReplicaId uint64

	// 该节点被分配的shardIds
	ShardIds []uint64

	// raft通信地址
	RaftAddr string

	// 用于moveTo命令时对方raft节点的grpc端口
	GrpcPort uint16

	// raft shard的分组个数, 用于hashKey计算shardId
	MultiGroupSize uint32

	// 数据存储地址
	StorageDir string

	// 是否以join的方式加入raft集群
	Join map[uint64]map[uint64]bool

	// 此参数需注意的是:
	// 采用gossip方式启动时val是nodehostId, 详细参考dragonboat的文档,
	// 初次可以使用dragonboat id.NewNodeHostID(id uint64)来生成;
	// 若采用raftAddr固定不变的方式启动,val就是raftAddr
	// key: shardId, key:replicaId
	InitialMembers map[uint64]map[uint64]string

	// 如果raft集群采用gossip可变IP的方式启动需设置
	Gossip      bool
	GossipPort  uint16
	GossipSeeds []string

	// dragonboat 是否开启metrics
	Metrics bool

	// 自己内部的gossip配置
	GossipConfig gossip.GossipConfig
}

type RaftMetadata

type RaftMetadata struct {
	// raft shard的分组个数, 用于hashKey计算shardId
	MultiGroupSize uint32 `json:"multiGroupSize"`

	// 本地的节点ID
	ReplicaId uint64 `json:"replicaId"`

	// raft节点的IP地址
	// 启动前需校验机器IP是否发生变化
	LocalIP string `json:"localIP"`

	// raft的通信端口号
	RaftPort uint16 `json:"raftPort"`

	// moveTo grpc端口号
	GrpcPort uint16 `json:"grpcPort"`

	// 是否采用gossip方式启动
	Gossip bool `json:"gossip"`

	// 节点shardId分配的版本号
	Revision int64 `json:"revision"`

	// 如果raft集群采用gossip可变IP的方式启动需设置
	GossipPort  uint16   `json:"gossipPort"`
	GossipSeeds []string `json:"gossipSeeds"`

	// dragonboat 是否开启metrics
	Metrics bool `json:"metrics"`

	// 自己内部的gossip配置
	GossipConfig gossip.GossipConfig `json:"gossipConfig"`
}

func ReadMetadataFromFile

func ReadMetadataFromFile(dir string, id uint16) (*RaftMetadata, error)

读取raft启动的meta数据

type RaftOpType

type RaftOpType uint8
const (
	DELETE RaftOpType = iota + 1
	ADD
	OBSERVER
)

type StateMachine

type StateMachine struct {
	ShardId   uint64
	ReplicaID uint64
	// contains filtered or unexported fields
}

func (*StateMachine) Close

func (r *StateMachine) Close() error

func (*StateMachine) Lookup

func (r *StateMachine) Lookup(query interface{}) (interface{}, error)

func (*StateMachine) Open

func (r *StateMachine) Open(stopChan <-chan struct{}) (uint64, error)

func (*StateMachine) PrepareSnapshot

func (r *StateMachine) PrepareSnapshot() (interface{}, error)

func (*StateMachine) RecoverFromSnapshot

func (r *StateMachine) RecoverFromSnapshot(reader io.Reader, stopChan <-chan struct{}) error

func (*StateMachine) SaveSnapshot

func (r *StateMachine) SaveSnapshot(snapshot interface{}, writer io.Writer, stopChan <-chan struct{}) error

func (*StateMachine) Sync

func (r *StateMachine) Sync() error

func (*StateMachine) Update

func (r *StateMachine) Update(entries []sm.Entry) ([]sm.Entry, error)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(ServerGrpcAddr, metricsAddr string, cfg *RaftConfig) (*Storage, error)

func (*Storage) AddRaftNode

func (s *Storage) AddRaftNode(replicaId uint64, target string, shardIds []uint64) error

AddRaftNode 添加raft节点,可用于机器的新增或已有机器新增shardIds

func (*Storage) AddRaftObserver

func (s *Storage) AddRaftObserver(replicaId uint64, target string, shardIds []uint64) error

AddRaftObserver 添加raft observer节点,可用于机器的新增或已有机器新增shardIds

func (*Storage) ChangeRaftNodeShardIds

func (s *Storage) ChangeRaftNodeShardIds(add bool, shardIds []uint64) error

func (*Storage) Del

func (s *Storage) Del(cfName string, hashKey, key []byte) error

func (*Storage) DelPrefix

func (s *Storage) DelPrefix(cfName string, hashKey, prefix []byte) error

func (*Storage) Get

func (s *Storage) Get(cfName string, hashKey []byte, linearizable bool, key []byte) (uint64, []byte, error)

func (*Storage) GetAliveInstances

func (s *Storage) GetAliveInstances() map[string]bool

func (*Storage) GetLocalMembership

func (s *Storage) GetLocalMembership() map[uint64]*gossip.MemberInfo

func (*Storage) GetNodeHost

func (s *Storage) GetNodeHost() []string

func (*Storage) GetRaftMembership

func (s *Storage) GetRaftMembership() ([]*gossip.MemberInfo, error)

func (*Storage) GetReplicaId

func (s *Storage) GetReplicaId() uint64

func (*Storage) GetShardMessage

func (s *Storage) GetShardMessage() *gossip.RaftShardMessage

func (*Storage) GetTarget

func (s *Storage) GetTarget() string

func (*Storage) Put

func (s *Storage) Put(cfName string, hashKey []byte, key, val []byte) (uint64, error)

func (*Storage) RaftReady

func (s *Storage) RaftReady() error

func (*Storage) RemoveRaftNode

func (s *Storage) RemoveRaftNode(replicaId uint64, shardIds []uint64) error

RemoveRaftNodeShards 下线当前Node所在的部分shardIds

func (*Storage) Search

func (s *Storage) Search(cfName string, hashKey []byte, linearizable bool, prefix []byte) ([][]byte, error)

func (*Storage) StopRaftNode

func (s *Storage) StopRaftNode()

func (*Storage) TryLock

func (s *Storage) TryLock(lockTimeout uint64, cfName string, key []byte) (bool, error)

func (*Storage) TryUnLock

func (s *Storage) TryUnLock(cfName string, key []byte) (bool, error)

func (*Storage) UpdateShardMessage

func (s *Storage) UpdateShardMessage(shard *gossip.RaftShardMessage)

更新集群的情况

func (*Storage) WriteShardToFile

func (s *Storage) WriteShardToFile(shard *gossip.RaftShardMessage) error

更新集群后需要存储到文件

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL