cluster

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

description: Utility to perform master election/failover using etcd.

Index

Constants

View Source
const (
	ErrFailedOnNotLeader = "E_FAILED_ON_NOT_LEADER"
	APIShortTo           = time.Second * 3
	APILongTo            = time.Second * 10
)
View Source
const (
	ErrCodeEtcdNotReachable    = 501
	ErrCodeUnhandledHTTPStatus = 502
)
View Source
const (
	EVENT_WATCH_L_CREATE = iota
	EVENT_WATCH_L_DELETE
)
View Source
const (
	ROOT_DIR               = "ZanRedisDBMetaData"
	CLUSTER_META_INFO      = "ClusterMeta"
	NAMESPACE_DIR          = "Namespaces"
	NAMESPACE_META         = "NamespaceMeta"
	NAMESPACE_SCHEMA       = "NamespaceSchema"
	NAMESPACE_REPLICA_INFO = "ReplicaInfo"
	NAMESPACE_REAL_LEADER  = "RealLeader"
	DATA_NODE_DIR          = "DataNodes"
	PD_ROOT_DIR            = "PDInfo"
	PD_NODE_DIR            = "PDNodes"
	PD_LEADER_SESSION      = "PDLeaderSession"
	DataKVDir              = "DataKV"
)
View Source
const (
	ETCD_LOCK_NAMESPACE = "zanredisdb"
)
View Source
const (
	MAX_WRITE_RETRY = 10
)
View Source
const (
	RETRY_SLEEP = 200
)

Variables

View Source
var (
	ErrNamespaceInfoNotFound          = NewCoordErr("namespace info not found", CoordClusterErr)
	ErrNamespaceCoordTmpConflicted    = NewCoordErrWithCode("namespace coordinator is conflicted temporally", CoordClusterErr, RpcErrNamespaceCoordConflicted)
	ErrMissingNamespaceCoord          = NewCoordErrWithCode("missing namespace coordinator", CoordClusterErr, RpcErrMissingNamespaceCoord)
	ErrNamespaceCoordStateInvalid     = NewCoordErrWithCode("invalid coordinator state", CoordClusterErr, RpcErrNamespaceCoordStateInvalid)
	ErrClusterChanged                 = NewCoordErrWithCode("cluster changed ", CoordTmpErr, RpcNoErr)
	ErrNamespaceCatchupAlreadyRunning = NewCoordErrWithCode("already try joining the raft group", CoordTmpErr, RpcNoErr)
	ErrCatchupRunningBusy             = NewCoordErrWithCode("too much namespace node joining the raft group", CoordTmpErr, RpcNoErr)
	ErrNamespaceExiting               = NewCoordErrWithCode("too much namespace node joining the raft group", CoordLocalErr, RpcNoErr)
	ErrEpochLessThanCurrent           = NewCoordErrWithCode("epoch should be increased", CoordClusterErr, RpcNoErr)
	ErrLocalInitNamespaceFailed       = NewCoordErrWithCode("init local namespace failed", CoordLocalErr, RpcNoErr)
	ErrNamespaceNotCreated            = NewCoordErrWithCode("namespace not created", CoordLocalErr, RpcNoErr)
	ErrNamespaceConfInvalid           = NewCoordErrWithCode("namespace config is invalid", CoordClusterErr, RpcNoErr)
	ErrNamespaceWaitingSync           = NewCoordErrWithCode("namespace is still waiting sync", CoordTmpErr, RpcNoErr)
	ErrRegisterServiceUnstable        = NewCoordErr("the register service is unstable", CoordTmpErr)
	ErrNoCoordRegister                = NewCoordErr("pd coordinator register is not set", CoordLocalErr)
)
View Source
var (
	ErrKeyAlreadyExist           = errors.New("Key already exist")
	ErrKeyNotFound               = errors.New("Key not found")
	ErrLearnerRoleInvalidChanged = errors.New("node learner role should never be changed")
	ErrLearnerRoleUnsupported    = errors.New("node learner role is not supported")
	DCInfoTag                    = "dc_info"
)
View Source
var (
	EtcdTTL = 30
)

Functions

func AddCounter

func AddCounter(name string) uint32

func CoordLog

func CoordLog() *common.LevelLogger

func ExtractNodeInfoFromID

func ExtractNodeInfoFromID(nid string) (ip string, rpc string, redis string, http string)

func ExtractRegIDFromGenID

func ExtractRegIDFromGenID(nid string) uint64

func FilterList

func FilterList(l []string, filter []string) []string

func FindSlice

func FindSlice(in []string, e string) int

func GenNodeID

func GenNodeID(n *NodeInfo, extra string) string

func IncCounter

func IncCounter(id uint32)

func IncCounterBy

func IncCounterBy(id uint32, amount uint64)

func IsEtcdNodeExist

func IsEtcdNodeExist(err error) bool

func IsEtcdNotFile

func IsEtcdNotFile(err error) bool

func IsEtcdNotReachable added in v0.4.3

func IsEtcdNotReachable(err error) bool

func IsEtcdWatchExpired added in v0.4.3

func IsEtcdWatchExpired(err error) bool

func MergeList

func MergeList(l []string, r []string) []string

func SetLogLevel

func SetLogLevel(level int)

func SetLogger

func SetLogger(level int32, logger common.Logger)

Types

type ClusterMetaInfo

type ClusterMetaInfo struct {
	MaxGID   int64
	MaxRegID uint64
}

type CommonCoordErr

type CommonCoordErr struct {
	CoordErr
}

func (*CommonCoordErr) Error

func (self *CommonCoordErr) Error() string

type ConsistentStore

type ConsistentStore interface {
	WriteKey(key, value string) error
	ReadKey(key string) (string, error)
	ListKey(key string) ([]string, error)
}

type CoordErr

type CoordErr struct {
	ErrMsg  string
	ErrCode ErrRPCRetCode
	ErrType CoordErrType
}

note: since the gorpc will treat error type as special, we should not implement the error interface for CoordErr as response type

func NewCoordErr

func NewCoordErr(msg string, etype CoordErrType) *CoordErr

func NewCoordErrWithCode

func NewCoordErrWithCode(msg string, etype CoordErrType, code ErrRPCRetCode) *CoordErr

func (*CoordErr) CanRetryWrite

func (self *CoordErr) CanRetryWrite(retryTimes int) bool

func (*CoordErr) HasError

func (self *CoordErr) HasError() bool

func (*CoordErr) IsEqual

func (self *CoordErr) IsEqual(other *CoordErr) bool

func (*CoordErr) IsLocalErr

func (self *CoordErr) IsLocalErr() bool

func (*CoordErr) IsNetErr

func (self *CoordErr) IsNetErr() bool

func (*CoordErr) String

func (self *CoordErr) String() string

func (*CoordErr) ToErrorType

func (self *CoordErr) ToErrorType() error

type CoordErrStats

type CoordErrStats struct {
	sync.Mutex
	CoordErrStatsData
}

func (*CoordErrStats) GetCopy

func (self *CoordErrStats) GetCopy() *CoordErrStatsData

type CoordErrStatsData

type CoordErrStatsData struct {
	NamespaceCoordMissingError int64
	LocalErr                   int64
	OtherCoordErrs             map[string]int64
}

type CoordErrType

type CoordErrType int
const (
	CoordNoErr CoordErrType = iota
	CoordCommonErr
	CoordNetErr
	CoordClusterErr
	CoordLocalErr
	CoordLocalTmpErr
	CoordTmpErr
	CoordClusterNoRetryWriteErr
	CoordRegisterErr
)

type CoordStats

type CoordStats struct {
	ErrStats     CoordErrStatsData
	NsCoordStats []NamespaceCoordStat `json:"ns_coord_stats"`
}

type DNEtcdRegister

type DNEtcdRegister struct {
	*EtcdRegister
	sync.Mutex
	// contains filtered or unexported fields
}

func NewDNEtcdRegister

func NewDNEtcdRegister(host string) (*DNEtcdRegister, error)

func (*DNEtcdRegister) GetNamespaceLeader

func (etcdReg *DNEtcdRegister) GetNamespaceLeader(ns string, partition int) (string, EpochType, error)

func (*DNEtcdRegister) GetNodeInfo

func (etcdReg *DNEtcdRegister) GetNodeInfo(nid string) (NodeInfo, error)

func (*DNEtcdRegister) NewRegisterNodeID

func (etcdReg *DNEtcdRegister) NewRegisterNodeID() (uint64, error)

func (*DNEtcdRegister) Register

func (etcdReg *DNEtcdRegister) Register(nodeData *NodeInfo) error

func (*DNEtcdRegister) Unregister

func (etcdReg *DNEtcdRegister) Unregister(nodeData *NodeInfo) error

func (*DNEtcdRegister) UpdateNamespaceLeader

func (etcdReg *DNEtcdRegister) UpdateNamespaceLeader(ns string, partition int, rl RealLeader, oldGen EpochType) (EpochType, error)

func (*DNEtcdRegister) WatchPDLeader

func (etcdReg *DNEtcdRegister) WatchPDLeader(leader chan *NodeInfo, stop chan struct{}) error

type DataNodeRegister

type DataNodeRegister interface {
	Register
	// check the learner role before register, should never change the role
	Register(nodeData *NodeInfo) error // update
	Unregister(nodeData *NodeInfo) error
	// get the newest pd leader and watch the change of it.
	WatchPDLeader(leader chan *NodeInfo, stop chan struct{}) error
	GetNodeInfo(nid string) (NodeInfo, error)
	// while losing leader, update to empty nid
	// while became the new leader, update to my node
	UpdateNamespaceLeader(ns string, partition int, rl RealLeader, oldGen EpochType) (EpochType, error)
	GetNamespaceLeader(ns string, partition int) (string, EpochType, error)
	NewRegisterNodeID() (uint64, error)
}

type EVENT_TYPE added in v0.4.3

type EVENT_TYPE int
const (
	MASTER_ADD EVENT_TYPE = iota
	MASTER_DELETE
	MASTER_MODIFY
	MASTER_ERROR
)

type EpochType

type EpochType int64

type ErrRPCRetCode

type ErrRPCRetCode int
const (
	RpcNoErr ErrRPCRetCode = iota
	RpcCommonErr
)
const (
	RpcErrLeavingISRWait ErrRPCRetCode = iota + 10
	RpcErrNoLeader
	RpcErrLeaderSessionMismatch
	RpcErrNamespaceNotExist
	RpcErrMissingNamespaceCoord
	RpcErrNamespaceCoordConflicted
	RpcErrNamespaceCoordStateInvalid
	RpcErrNamespaceLoading
	RpcErrWriteOnNonISR
)

type EtcdClient added in v0.4.3

type EtcdClient struct {
	// contains filtered or unexported fields
}

func NewEClient added in v0.4.3

func NewEClient(host string) (*EtcdClient, error)

func (*EtcdClient) CompareAndDelete added in v0.4.3

func (self *EtcdClient) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*client.Response, error)

func (*EtcdClient) CompareAndSwap added in v0.4.3

func (self *EtcdClient) CompareAndSwap(key string, value string, ttl uint64, prevValue string, prevIndex uint64) (*client.Response, error)

func (*EtcdClient) Create added in v0.4.3

func (self *EtcdClient) Create(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) CreateDir added in v0.4.3

func (self *EtcdClient) CreateDir(key string, ttl uint64) (*client.Response, error)

func (*EtcdClient) CreateInOrder added in v0.4.3

func (self *EtcdClient) CreateInOrder(dir string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Delete added in v0.4.3

func (self *EtcdClient) Delete(key string, recursive bool) (*client.Response, error)

func (*EtcdClient) Get added in v0.4.3

func (self *EtcdClient) Get(key string, sort, recursive bool) (*client.Response, error)

func (*EtcdClient) GetNewest added in v0.5.2

func (self *EtcdClient) GetNewest(key string, sort, recursive bool) (*client.Response, error)

func (*EtcdClient) Set added in v0.4.3

func (self *EtcdClient) Set(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) SetWithTTL added in v0.4.3

func (self *EtcdClient) SetWithTTL(key string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Update added in v0.4.3

func (self *EtcdClient) Update(key string, value string, ttl uint64) (*client.Response, error)

func (*EtcdClient) Watch added in v0.4.3

func (self *EtcdClient) Watch(key string, waitIndex uint64, recursive bool) client.Watcher

type EtcdLock added in v0.4.3

type EtcdLock struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*EtcdLock) GetEventsChan added in v0.4.3

func (self *EtcdLock) GetEventsChan() <-chan *MasterEvent

func (*EtcdLock) GetKey added in v0.4.3

func (self *EtcdLock) GetKey() string

func (*EtcdLock) GetMaster added in v0.4.3

func (self *EtcdLock) GetMaster() string

func (*EtcdLock) Start added in v0.4.3

func (self *EtcdLock) Start()

func (*EtcdLock) Stop added in v0.4.3

func (self *EtcdLock) Stop()

type EtcdRegister

type EtcdRegister struct {
	// contains filtered or unexported fields
}

func NewEtcdRegister

func NewEtcdRegister(host string) (*EtcdRegister, error)

func (*EtcdRegister) GetAllNamespaces

func (etcdReg *EtcdRegister) GetAllNamespaces() (map[string]map[int]PartitionMetaInfo, EpochType, error)

func (*EtcdRegister) GetAllPDNodes

func (etcdReg *EtcdRegister) GetAllPDNodes() ([]NodeInfo, error)

func (*EtcdRegister) GetKV added in v0.9.0

func (etcdReg *EtcdRegister) GetKV(key string) (string, error)

func (*EtcdRegister) GetNamespaceInfo

func (etcdReg *EtcdRegister) GetNamespaceInfo(ns string) ([]PartitionMetaInfo, error)

func (*EtcdRegister) GetNamespaceMetaInfo

func (etcdReg *EtcdRegister) GetNamespaceMetaInfo(ns string) (NamespaceMetaInfo, error)

func (*EtcdRegister) GetNamespacePartInfo

func (etcdReg *EtcdRegister) GetNamespacePartInfo(ns string, partition int) (*PartitionMetaInfo, error)

func (*EtcdRegister) GetNamespaceSchemas

func (etcdReg *EtcdRegister) GetNamespaceSchemas(ns string) (map[string]SchemaInfo, error)

func (*EtcdRegister) GetNamespaceTableSchema

func (etcdReg *EtcdRegister) GetNamespaceTableSchema(ns string, table string) (*SchemaInfo, error)

func (*EtcdRegister) GetNamespacesNotifyChan

func (etcdReg *EtcdRegister) GetNamespacesNotifyChan() chan struct{}

func (*EtcdRegister) GetRemoteNamespaceReplicaInfo

func (etcdReg *EtcdRegister) GetRemoteNamespaceReplicaInfo(ns string, partition int) (*PartitionReplicaInfo, error)

func (*EtcdRegister) InitClusterID

func (etcdReg *EtcdRegister) InitClusterID(id string)

func (*EtcdRegister) SaveKV added in v0.9.0

func (etcdReg *EtcdRegister) SaveKV(key string, value string) error

func (*EtcdRegister) Start

func (etcdReg *EtcdRegister) Start()

func (*EtcdRegister) Stop

func (etcdReg *EtcdRegister) Stop()

type ISRStat

type ISRStat struct {
	HostName string `json:"hostname"`
	NodeID   string `json:"node_id"`
}

type Master added in v0.4.3

type Master interface {
	Start()
	Stop()
	GetEventsChan() <-chan *MasterEvent
	GetKey() string
	GetMaster() string
}

func NewMaster added in v0.4.3

func NewMaster(etcdClient *EtcdClient, name, value string, ttl uint64) Master

type MasterChanInfo

type MasterChanInfo struct {
	// contains filtered or unexported fields
}

type MasterEvent added in v0.4.3

type MasterEvent struct {
	Type          EVENT_TYPE
	Master        string
	ModifiedIndex uint64
}

type NamespaceCoordStat

type NamespaceCoordStat struct {
	Node      string    `json:"node"`
	Name      string    `json:"name"`
	Partition int       `json:"partition"`
	ISRStats  []ISRStat `json:"isr_stats"`
}

type NamespaceMetaInfo

type NamespaceMetaInfo struct {
	PartitionNum int
	Replica      int
	// to verify the data of the create -> delete -> create with same namespace
	MagicCode int64
	MinGID    int64

	EngType          string
	OptimizedFsync   bool
	SnapCount        int
	Tags             map[string]interface{}
	ExpirationPolicy string
	DataVersion      string
	// contains filtered or unexported fields
}

func (*NamespaceMetaInfo) DeepClone added in v0.4.3

func (self *NamespaceMetaInfo) DeepClone() NamespaceMetaInfo

func (*NamespaceMetaInfo) MetaEpoch

func (self *NamespaceMetaInfo) MetaEpoch() EpochType

type NamespaceNameInfo

type NamespaceNameInfo struct {
	NamespaceName      string
	NamespacePartition int
}

func (*NamespaceNameInfo) String

func (self *NamespaceNameInfo) String() string

type NodeInfo

type NodeInfo struct {
	RegID             uint64
	ID                string
	NodeIP            string
	Hostname          string
	RedisPort         string
	HttpPort          string
	RpcPort           string
	RaftTransportAddr string
	Version           string
	Tags              map[string]interface{}
	DataRoot          string
	RsyncModule       string
	LearnerRole       string
	// contains filtered or unexported fields
}

func (*NodeInfo) Epoch

func (self *NodeInfo) Epoch() EpochType

func (*NodeInfo) GetID

func (self *NodeInfo) GetID() string

func (*NodeInfo) GetRegisterID

func (self *NodeInfo) GetRegisterID() uint64

type Options

type Options struct {
	AutoBalanceAndMigrate bool
	BalanceStart          int
	BalanceEnd            int
	BalanceVer            string
	DataDir               string
	FilterNamespaces      string
}

type PDEtcdRegister

type PDEtcdRegister struct {
	*EtcdRegister
	// contains filtered or unexported fields
}

placement driver register

func NewPDEtcdRegister

func NewPDEtcdRegister(host string) (*PDEtcdRegister, error)

func (*PDEtcdRegister) AcquireAndWatchLeader

func (etcdReg *PDEtcdRegister) AcquireAndWatchLeader(leader chan *NodeInfo, stop chan struct{})

func (*PDEtcdRegister) CheckIfLeader

func (etcdReg *PDEtcdRegister) CheckIfLeader() bool

func (*PDEtcdRegister) CreateNamespace

func (etcdReg *PDEtcdRegister) CreateNamespace(ns string, meta *NamespaceMetaInfo) error

func (*PDEtcdRegister) CreateNamespacePartition

func (etcdReg *PDEtcdRegister) CreateNamespacePartition(ns string, partition int) error

func (*PDEtcdRegister) DeleteNamespacePart

func (etcdReg *PDEtcdRegister) DeleteNamespacePart(ns string, partition int) error

func (*PDEtcdRegister) DeleteWholeNamespace

func (etcdReg *PDEtcdRegister) DeleteWholeNamespace(ns string) error

func (*PDEtcdRegister) GetClusterEpoch

func (etcdReg *PDEtcdRegister) GetClusterEpoch() (EpochType, error)

func (*PDEtcdRegister) GetClusterMetaInfo

func (etcdReg *PDEtcdRegister) GetClusterMetaInfo() (ClusterMetaInfo, error)

func (*PDEtcdRegister) GetDataNodes

func (etcdReg *PDEtcdRegister) GetDataNodes() ([]NodeInfo, error)

func (*PDEtcdRegister) IsExistNamespace

func (etcdReg *PDEtcdRegister) IsExistNamespace(ns string) (bool, error)

func (*PDEtcdRegister) IsExistNamespacePartition

func (etcdReg *PDEtcdRegister) IsExistNamespacePartition(ns string, partitionNum int) (bool, error)

func (*PDEtcdRegister) PrepareNamespaceMinGID

func (etcdReg *PDEtcdRegister) PrepareNamespaceMinGID() (int64, error)

func (*PDEtcdRegister) Register

func (etcdReg *PDEtcdRegister) Register(value *NodeInfo) error

func (*PDEtcdRegister) Unregister

func (etcdReg *PDEtcdRegister) Unregister(value *NodeInfo) error

func (*PDEtcdRegister) UpdateNamespaceMetaInfo

func (etcdReg *PDEtcdRegister) UpdateNamespaceMetaInfo(ns string, meta *NamespaceMetaInfo, oldGen EpochType) error

func (*PDEtcdRegister) UpdateNamespacePartReplicaInfo

func (etcdReg *PDEtcdRegister) UpdateNamespacePartReplicaInfo(ns string, partition int,
	replicaInfo *PartitionReplicaInfo, oldGen EpochType) error

func (*PDEtcdRegister) UpdateNamespaceSchema

func (etcdReg *PDEtcdRegister) UpdateNamespaceSchema(ns string, table string, schema *SchemaInfo) error

func (*PDEtcdRegister) WatchDataNodes

func (etcdReg *PDEtcdRegister) WatchDataNodes(dataNodesChan chan []NodeInfo, stop chan struct{})

type PDRegister

type PDRegister interface {
	Register
	Register(nodeData *NodeInfo) error // update
	Unregister(nodeData *NodeInfo) error
	// the cluster root modify index
	GetClusterEpoch() (EpochType, error)
	GetClusterMetaInfo() (ClusterMetaInfo, error)
	AcquireAndWatchLeader(leader chan *NodeInfo, stop chan struct{})

	GetDataNodes() ([]NodeInfo, error)
	// watching the cluster data node, should return the newest for the first time.
	WatchDataNodes(nodeC chan []NodeInfo, stopC chan struct{})
	// create and write the meta info to meta node
	CreateNamespace(ns string, meta *NamespaceMetaInfo) error
	UpdateNamespaceMetaInfo(ns string, meta *NamespaceMetaInfo, oldGen EpochType) error
	// create partition path
	CreateNamespacePartition(ns string, partition int) error
	IsExistNamespace(ns string) (bool, error)
	IsExistNamespacePartition(ns string, partition int) (bool, error)
	DeleteNamespacePart(ns string, partition int) error
	DeleteWholeNamespace(ns string) error
	//
	// update the replica info about replica node list, epoch for partition
	// Note: update should do check-and-set to avoid unexpected override.
	// the epoch in replicaInfo should be updated to the new epoch
	// if no partition, replica info node should create only once.
	UpdateNamespacePartReplicaInfo(ns string, partition int, replicaInfo *PartitionReplicaInfo, oldGen EpochType) error
	PrepareNamespaceMinGID() (int64, error)
	UpdateNamespaceSchema(ns string, table string, schema *SchemaInfo) error
}

We need check leader before do any modify to etcd. Make sure all returned value should be copied to avoid modify by outside.

type PartitionMetaInfo

type PartitionMetaInfo struct {
	Name      string
	Partition int

	NamespaceMetaInfo
	PartitionReplicaInfo
	// contains filtered or unexported fields
}

func (*PartitionMetaInfo) GetCopy

func (self *PartitionMetaInfo) GetCopy() *PartitionMetaInfo

func (*PartitionMetaInfo) GetDesp

func (self *PartitionMetaInfo) GetDesp() string

func (*PartitionMetaInfo) GetRealLeader

func (self *PartitionMetaInfo) GetRealLeader() string

func (*PartitionMetaInfo) IsISRQuorum

func (self *PartitionMetaInfo) IsISRQuorum() bool

type PartitionReplicaInfo

type PartitionReplicaInfo struct {
	RaftNodes    []string
	RaftIDs      map[string]uint64
	Removings    map[string]RemovingInfo
	MaxRaftID    int64
	LearnerNodes map[string][]string
	// contains filtered or unexported fields
}

func (*PartitionReplicaInfo) DeepClone

func (self *PartitionReplicaInfo) DeepClone() PartitionReplicaInfo

func (*PartitionReplicaInfo) Epoch

func (self *PartitionReplicaInfo) Epoch() EpochType

func (*PartitionReplicaInfo) GetISR

func (self *PartitionReplicaInfo) GetISR() []string

func (*PartitionReplicaInfo) IsLearner

func (self *PartitionReplicaInfo) IsLearner(nid string) bool

type RealLeader

type RealLeader struct {
	Leader string
	// contains filtered or unexported fields
}

type Register

type Register interface {
	InitClusterID(id string)
	Start()
	// all registered pd nodes.
	GetAllPDNodes() ([]NodeInfo, error)
	// should return both the meta info for namespace and the replica info for partition
	// epoch should be updated while return
	GetNamespacePartInfo(ns string, partition int) (*PartitionMetaInfo, error)
	// get directly from register without cache
	GetRemoteNamespaceReplicaInfo(ns string, partition int) (*PartitionReplicaInfo, error)
	// get  meta info only
	GetNamespaceMetaInfo(ns string) (NamespaceMetaInfo, error)
	GetNamespaceInfo(ns string) ([]PartitionMetaInfo, error)
	GetAllNamespaces() (map[string]map[int]PartitionMetaInfo, EpochType, error)
	GetNamespacesNotifyChan() chan struct{}
	GetNamespaceSchemas(ns string) (map[string]SchemaInfo, error)
	GetNamespaceTableSchema(ns string, table string) (*SchemaInfo, error)
	// the saved key should have the node info prefix to avoid conflict with each other data node
	// if it is designed to shared between data node, should use carefully with concurrent modify
	// note: the data key will be under the cluster root data path
	SaveKV(key string, value string) error
	GetKV(key string) (string, error)
	Stop()
}

type RemovingInfo

type RemovingInfo struct {
	RemoveTime      int64
	RemoveReplicaID uint64
}

type SchemaInfo

type SchemaInfo struct {
	Schema []byte
	Epoch  EpochType
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL