Documentation ¶
Index ¶
- Constants
- Variables
- type ChunkRolePosition
- type Cluster
- type ClusterConfig
- type ClusterStore
- func (cluster *ClusterStore) CommitMigration(taskSlots SlotRange) error
- func (cluster *ClusterStore) Decode(data []byte) error
- func (cluster *ClusterStore) Encode() ([]byte, error)
- func (cluster *ClusterStore) FindChunkByProxy(proxyAddress string) (*NodeChunkStore, error)
- func (cluster *ClusterStore) HasEmptyChunks() bool
- func (cluster *ClusterStore) LimitMigration(migrationLimit int64) (*ClusterStore, error)
- func (cluster *ClusterStore) SplitSlots(newEpoch uint64) error
- type EtcdConfig
- type EtcdMetaBroker
- func (broker *EtcdMetaBroker) AddFailure(ctx context.Context, address string, reportID string) error
- func (broker *EtcdMetaBroker) ClearCache()
- func (broker *EtcdMetaBroker) GetCluster(ctx context.Context, name string) (*Cluster, error)
- func (broker *EtcdMetaBroker) GetClusterNames(ctx context.Context) ([]string, error)
- func (broker *EtcdMetaBroker) GetConfig() *EtcdConfig
- func (broker *EtcdMetaBroker) GetFailures(ctx context.Context) ([]string, error)
- func (broker *EtcdMetaBroker) GetProxy(ctx context.Context, address string) (*Host, error)
- func (broker *EtcdMetaBroker) GetProxyAddresses(ctx context.Context) ([]string, error)
- func (broker *EtcdMetaBroker) Serve(ctx context.Context) error
- type EtcdMetaManipulationBroker
- func NewEtcdMetaManipulationBroker(config *EtcdConfig, client *clientv3.Client, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)
- func NewEtcdMetaManipulationBrokerFromEndpoints(config *EtcdConfig, endpoints []string, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)
- func (broker *EtcdMetaManipulationBroker) AddNodesToCluster(ctx context.Context, clusterName string) error
- func (broker *EtcdMetaManipulationBroker) AddProxy(ctx context.Context, address string, nodes []string) error
- func (broker *EtcdMetaManipulationBroker) CommitMigration(ctx context.Context, task MigrationTaskMeta) error
- func (broker *EtcdMetaManipulationBroker) CreateCluster(ctx context.Context, clusterName string, nodeNum uint64) error
- func (broker *EtcdMetaManipulationBroker) GetMetaBroker() *EtcdMetaBroker
- func (broker *EtcdMetaManipulationBroker) InitGlobalEpoch() error
- func (broker *EtcdMetaManipulationBroker) MigrateSlots(ctx context.Context, clusterName string) error
- func (broker *EtcdMetaManipulationBroker) RemoveCluster(ctx context.Context, clusterName string) error
- func (broker *EtcdMetaManipulationBroker) RemoveProxy(ctx context.Context, address string) error
- func (broker *EtcdMetaManipulationBroker) RemoveUnusedProxiesFromCluster(ctx context.Context, clusterName string) error
- func (broker *EtcdMetaManipulationBroker) ReplaceProxy(ctx context.Context, address string) (*Host, error)
- func (broker *EtcdMetaManipulationBroker) SetConfig(ctx context.Context, clusterName string, config map[string]string) error
- type FailedProxyStore
- type Host
- type MetaDataBroker
- type MetaManipulationBroker
- type MigrationMeta
- type MigrationMetaStore
- type MigrationTagType
- type MigrationTaskMeta
- type Node
- type NodeChunkStore
- type NodeStore
- type PeerProxy
- type ProxyStore
- type ReplMeta
- type ReplPeer
- type Role
- type SlotRange
- type SlotRangeStore
- type SlotRangeTag
- type SlotRangeTagStore
- type TxnBroker
Constants ¶
const ( // CompressionStrategyDisabled disables compression CompressionStrategyDisabled = "disabled" // CompressionStrategySetGetOnly enables compression and restrict the commands // to only SET family and GET for string data type. CompressionStrategySetGetOnly = "set_get_only" // CompressionStrategyAllowAll enables compression and allow all the commands, // even though they will get the wrong results. CompressionStrategyAllowAll = "allow_all" )
const MasterRole = "master"
MasterRole represents master node.
const MaxClusterNameLen = 30
MaxClusterNameLen is the maximum length of cluter name
const MaxSlotNumber = 16384
MaxSlotNumber is specified by Redis Cluster
const ReplicaRole = "replica"
ReplicaRole represents replcia node.
Variables ¶
var ErrAllocatedProxyInUse = errors.New("allocated proxy is in use")
ErrAllocatedProxyInUse tells the client to try again.
var ErrAlreadyMigrating = errors.New("cluster is migrating")
ErrAlreadyMigrating indicates the cluster has already started migration.
var ErrCanNotMigrate = errors.New("cluster cannot migrate")
ErrCanNotMigrate indicates that the cluster does not have enough empty chunks to do the migration.
var ErrClusterExists = errors.New("cluster already exists")
ErrClusterExists indicates the cluster has already existed.
var ErrClusterNotFound = errors.New("cluster not found")
ErrClusterNotFound indicates that cluster does not exist.
var ErrEmptyChunksExist = errors.New("there're empty chunks")
ErrEmptyChunksExist indicates that the cluster still need to do the migration for the remaining empty chunks before adding more nodes.
var ErrGlobalEpochNotFound = errors.New("global epoch not found")
ErrGlobalEpochNotFound indicates that key does not exist.
var ErrHostExists = errors.New("host already existed")
ErrHostExists indicates the proxy has already existed.
var ErrInvalidAddress = errors.New("invalid address")
ErrInvalidAddress indicates that the address is empty.
var ErrInvalidClusterConfig = errors.New("invalid cluster config")
ErrInvalidClusterConfig indicates the config is invalid
var ErrInvalidClusterName = errors.New("invalid cluster name. Length should not be longer than 30. Should only contains letters, digit, and '.' '-' '_'")
ErrInvalidClusterName indicates the cluster name is not valid.
var ErrInvalidNodesNum = errors.New("invalid node number")
ErrInvalidNodesNum indicates invalid node number.
var ErrInvalidRequestedMigrationSlotRange = errors.New("invalid requested migration task")
ErrInvalidRequestedMigrationSlotRange indicates the request is not valid
var ErrInvalidRequestedNodesNum = errors.New("invalid node number")
ErrInvalidRequestedNodesNum indicates invalid node number.
var ErrInvalidRequestedProxyNum = errors.New("invalid requested proxy number")
ErrInvalidRequestedProxyNum indicates that the remaining resources can't produce enough chunks.
var ErrMigrationTaskNotFound = errors.New("migration task not found")
ErrMigrationTaskNotFound indicates we can't find the slot range.
var ErrMigrationTaskNotMatch = errors.New("migration task not found")
ErrMigrationTaskNotMatch indicates we can't find a slot range matches the task.
var ErrNoAvailableNodes = errors.New("no available nodes to start migration")
ErrNoAvailableNodes indicates no nodes with empty slots found.
var ErrNoAvailableResource = errors.New("no available resource")
ErrNoAvailableResource indicates no available resource.
var ErrNoChunkResource = errors.New("no chunk resource")
ErrNoChunkResource indicates that the remaining resources can't produce enough chunks.
var ErrProxyInUse = errors.New("proxy is in use")
ErrProxyInUse indicates that key does not exist.
var ErrProxyNotFound = errors.New("proxy not found")
ErrProxyNotFound indicates that proxy does not exist.
var ErrProxyNotInUse = errors.New("proxy not in use")
ErrProxyNotInUse indicates that key does not exist.
var ErrTryAgain = errors.New("try again")
ErrTryAgain indicates the cache is stall and client need to try again.
var ErrTxnFailed = errors.New("txn failed")
ErrTxnFailed indicates the transaction failed.
Functions ¶
This section is empty.
Types ¶
type ChunkRolePosition ¶
type ChunkRolePosition int
ChunkRolePosition indicates the roles in the chunk
const ( // ChunkRoleNormalPosition indicates each proxy has one master. ChunkRoleNormalPosition ChunkRolePosition = 0 // ChunkRoleFirstChunkMaster indicates all the masters are in the first proxy. ChunkRoleFirstChunkMaster ChunkRolePosition = 1 // ChunkRoleSecondChunkMaster indicates all the masters are in the second proxy. ChunkRoleSecondChunkMaster ChunkRolePosition = 2 )
type Cluster ¶
type Cluster struct { Name string `json:"name"` Epoch uint64 `json:"epoch"` Nodes []*Node `json:"nodes"` Config ClusterConfig `json:"config"` }
Cluster is the redis cluster we implement.
type ClusterConfig ¶ added in v0.1.1
type ClusterConfig struct {
CompressionStrategy string `json:"compression_strategy"`
}
ClusterConfig is the config of each cluster.
func NewClusterConfig ¶ added in v0.1.1
func NewClusterConfig() *ClusterConfig
NewClusterConfig creates ClusterConfig with default settings.
type ClusterStore ¶
type ClusterStore struct { Chunks []*NodeChunkStore `json:"chunks"` Config *ClusterConfig `json:"cluster_config"` }
ClusterStore stores the nodes
func (*ClusterStore) CommitMigration ¶
func (cluster *ClusterStore) CommitMigration(taskSlots SlotRange) error
CommitMigration finish the slots migration.
func (*ClusterStore) Decode ¶
func (cluster *ClusterStore) Decode(data []byte) error
Decode decodes json string
func (*ClusterStore) Encode ¶
func (cluster *ClusterStore) Encode() ([]byte, error)
Encode encodes json string
func (*ClusterStore) FindChunkByProxy ¶
func (cluster *ClusterStore) FindChunkByProxy(proxyAddress string) (*NodeChunkStore, error)
FindChunkByProxy find chunk by proxy address.
func (*ClusterStore) HasEmptyChunks ¶
func (cluster *ClusterStore) HasEmptyChunks() bool
HasEmptyChunks checks whether there're still chunks that do not have slots.
func (*ClusterStore) LimitMigration ¶
func (cluster *ClusterStore) LimitMigration(migrationLimit int64) (*ClusterStore, error)
LimitMigration reduces the concurrent migration flags. This implementation is a bit tricky. The stored data do not allow some of shards are migrating while others not. They are all set with the flags. We only reduce the migration in the query API. (1) Only the former shards finish the migration, will the later ones get started. (2) And once started, since the former one will not restart migration again, the later ones will not stop until they are done. (3) The later migration flags will be updated to server proxies with new epoch bumped by the committing of former ones.
func (*ClusterStore) SplitSlots ¶
func (cluster *ClusterStore) SplitSlots(newEpoch uint64) error
SplitSlots splits the slots from the first half to the second half.
type EtcdConfig ¶
type EtcdConfig struct { PathPrefix string FailureTTL int64 MigrationLimit int64 FailureQuorum int64 }
EtcdConfig stores broker config.
type EtcdMetaBroker ¶
type EtcdMetaBroker struct {
// contains filtered or unexported fields
}
EtcdMetaBroker implements the MetaDataBroker interface and serves as a broker backend.
func NewEtcdMetaBroker ¶
func NewEtcdMetaBroker(config *EtcdConfig, client *clientv3.Client) (*EtcdMetaBroker, error)
NewEtcdMetaBroker creates EtcdMetaBroker
func NewEtcdMetaBrokerFromEndpoints ¶
func NewEtcdMetaBrokerFromEndpoints(config *EtcdConfig, endpoints []string) (*EtcdMetaBroker, error)
NewEtcdMetaBrokerFromEndpoints creates EtcdMetaBroker from endpoints
func (*EtcdMetaBroker) AddFailure ¶
func (broker *EtcdMetaBroker) AddFailure(ctx context.Context, address string, reportID string) error
AddFailure add failures reported by coordinators
func (*EtcdMetaBroker) ClearCache ¶
func (broker *EtcdMetaBroker) ClearCache()
ClearCache clears the cached metadata.
func (*EtcdMetaBroker) GetCluster ¶
GetCluster queries a cluster by name
func (*EtcdMetaBroker) GetClusterNames ¶
func (broker *EtcdMetaBroker) GetClusterNames(ctx context.Context) ([]string, error)
GetClusterNames retrieves all the cluster names from etcd.
func (*EtcdMetaBroker) GetConfig ¶ added in v0.1.4
func (broker *EtcdMetaBroker) GetConfig() *EtcdConfig
GetConfig returns EtcdConfig
func (*EtcdMetaBroker) GetFailures ¶
func (broker *EtcdMetaBroker) GetFailures(ctx context.Context) ([]string, error)
GetFailures retrieves the failures
func (*EtcdMetaBroker) GetProxyAddresses ¶
func (broker *EtcdMetaBroker) GetProxyAddresses(ctx context.Context) ([]string, error)
GetProxyAddresses queries all proxies.
type EtcdMetaManipulationBroker ¶
type EtcdMetaManipulationBroker struct {
// contains filtered or unexported fields
}
EtcdMetaManipulationBroker is mainly for metadata modification
func NewEtcdMetaManipulationBroker ¶
func NewEtcdMetaManipulationBroker(config *EtcdConfig, client *clientv3.Client, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)
NewEtcdMetaManipulationBroker creates EtcdMetaManipulationBroker
func NewEtcdMetaManipulationBrokerFromEndpoints ¶
func NewEtcdMetaManipulationBrokerFromEndpoints(config *EtcdConfig, endpoints []string, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)
NewEtcdMetaManipulationBrokerFromEndpoints creates EtcdMetaManipulationBroker from endpoints
func (*EtcdMetaManipulationBroker) AddNodesToCluster ¶
func (broker *EtcdMetaManipulationBroker) AddNodesToCluster(ctx context.Context, clusterName string) error
AddNodesToCluster adds chunks to cluster.
func (*EtcdMetaManipulationBroker) AddProxy ¶
func (broker *EtcdMetaManipulationBroker) AddProxy(ctx context.Context, address string, nodes []string) error
AddProxy adds new proxy and removes it from failed proxies
func (*EtcdMetaManipulationBroker) CommitMigration ¶
func (broker *EtcdMetaManipulationBroker) CommitMigration(ctx context.Context, task MigrationTaskMeta) error
CommitMigration finishes the migration.
func (*EtcdMetaManipulationBroker) CreateCluster ¶
func (broker *EtcdMetaManipulationBroker) CreateCluster(ctx context.Context, clusterName string, nodeNum uint64) error
CreateCluster creates a new cluster with specified node number
func (*EtcdMetaManipulationBroker) GetMetaBroker ¶
func (broker *EtcdMetaManipulationBroker) GetMetaBroker() *EtcdMetaBroker
GetMetaBroker returens the EtcdMetaBroker
func (*EtcdMetaManipulationBroker) InitGlobalEpoch ¶
func (broker *EtcdMetaManipulationBroker) InitGlobalEpoch() error
InitGlobalEpoch initializes the global_epoch if it does not exist
func (*EtcdMetaManipulationBroker) MigrateSlots ¶
func (broker *EtcdMetaManipulationBroker) MigrateSlots(ctx context.Context, clusterName string) error
MigrateSlots splits the slots to another half cluster.
func (*EtcdMetaManipulationBroker) RemoveCluster ¶
func (broker *EtcdMetaManipulationBroker) RemoveCluster(ctx context.Context, clusterName string) error
RemoveCluster removes the cluster.
func (*EtcdMetaManipulationBroker) RemoveProxy ¶
func (broker *EtcdMetaManipulationBroker) RemoveProxy(ctx context.Context, address string) error
RemoveProxy remove a free proxy.
func (*EtcdMetaManipulationBroker) RemoveUnusedProxiesFromCluster ¶
func (broker *EtcdMetaManipulationBroker) RemoveUnusedProxiesFromCluster(ctx context.Context, clusterName string) error
RemoveUnusedProxiesFromCluster free the unused proxies.
func (*EtcdMetaManipulationBroker) ReplaceProxy ¶
func (broker *EtcdMetaManipulationBroker) ReplaceProxy(ctx context.Context, address string) (*Host, error)
ReplaceProxy changes the proxy and return the new one.
type FailedProxyStore ¶
type FailedProxyStore struct {
NodeAddresses []string `json:"node_addresses"`
}
FailedProxyStore stores
func (*FailedProxyStore) Decode ¶
func (meta *FailedProxyStore) Decode(data []byte) error
Decode decodes json string
func (*FailedProxyStore) Encode ¶
func (meta *FailedProxyStore) Encode() ([]byte, error)
Encode encodes json string
type Host ¶
type Host struct { Address string `json:"address"` Epoch uint64 `json:"epoch"` Nodes []*Node `json:"nodes"` FreeNodes []string `json:"free_nodes"` Peers []*PeerProxy `json:"peers"` ClustersConfig map[string]ClusterConfig `json:"clusters_config"` }
Host is the proxies on each physical machine.
type MetaDataBroker ¶
type MetaDataBroker interface { GetClusterNames(ctx context.Context) ([]string, error) GetCluster(ctx context.Context, name string) (*Cluster, error) GetProxyAddresses(ctx context.Context) ([]string, error) GetProxy(ctx context.Context, address string) (*Host, error) AddFailure(ctx context.Context, address string, reportID string) error GetFailures(ctx context.Context) ([]string, error) }
MetaDataBroker abstracts the ability to check meta data and detect failures.
type MetaManipulationBroker ¶
type MetaManipulationBroker interface { // Basic API ReplaceProxy(ctx context.Context, proxyAddress string) (*Host, error) CommitMigration(ctx context.Context, task MigrationTaskMeta) error // Extended API InitGlobalEpoch() error CreateCluster(ctx context.Context, clusterName string, nodeNum uint64) error AddProxy(ctx context.Context, address string, nodes []string) error AddNodesToCluster(ctx context.Context, clusterName string) error MigrateSlots(ctx context.Context, clusterName string) error RemoveProxy(ctx context.Context, address string) error RemoveUnusedProxiesFromCluster(ctx context.Context, clusterName string) error RemoveCluster(ctx context.Context, clusterName string) error SetConfig(ctx context.Context, clusterName string, config map[string]string) error }
MetaManipulationBroker abstracts the ability to manipulate clusters.
type MigrationMeta ¶
type MigrationMeta struct { Epoch uint64 `json:"epoch"` SrcProxyAddress string `json:"src_proxy_address"` SrcNodeAddress string `json:"src_node_address"` DstProxyAddress string `json:"dst_proxy_address"` DstNodeAddress string `json:"dst_node_address"` }
MigrationMeta includes the migration metadata
type MigrationMetaStore ¶
type MigrationMetaStore struct { Epoch uint64 `json:"epoch"` SrcProxyIndex uint64 `json:"src_proxy_index"` DstProxyIndex uint64 `json:"dst_proxy_index"` }
MigrationMetaStore stores the migration meta
type MigrationTagType ¶
type MigrationTagType string
MigrationTagType consists of "Migrating", "Importing", and "None"
const ( // MigratingTag is for source node MigratingTag MigrationTagType = "Migrating" // ImportingTag is for destination node ImportingTag MigrationTagType = "Importing" // NoneTag is for stable slots NoneTag MigrationTagType = "None" )
type MigrationTaskMeta ¶
type MigrationTaskMeta struct { DBName string `json:"db_name"` Slots SlotRange `json:"slot_range"` }
MigrationTaskMeta denotes the migration task.
type Node ¶
type Node struct { Address string `json:"address"` ProxyAddress string `json:"proxy_address"` ClusterName string `json:"cluster_name"` Slots []SlotRange `json:"slots"` Repl ReplMeta `json:"repl"` }
Node is redis node.
type NodeChunkStore ¶
type NodeChunkStore struct { RolePosition ChunkRolePosition // Currently []SlotRangeStore has only at most one SlotRange with NoneTag // and at most one SlotRange with Migrating or Importing Tag. Slots [2][]SlotRangeStore `json:"slots"` Nodes [4]*NodeStore `json:"nodes"` }
NodeChunkStore stores 4 nodes as a group
func (*NodeChunkStore) GetMasterNodes ¶
func (chunk *NodeChunkStore) GetMasterNodes() [2]*NodeStore
GetMasterNodes gets the master nodes
func (*NodeChunkStore) SwitchMaster ¶
func (chunk *NodeChunkStore) SwitchMaster(failedProxyAddress string) error
SwitchMaster takes over the master role
type NodeStore ¶
type NodeStore struct { NodeAddress string `json:"node_address"` ProxyAddress string `json:"proxy_address"` }
NodeStore stores the metadata of node
type PeerProxy ¶
type PeerProxy struct { ProxyAddress string `json:"proxy_address"` ClusterName string `json:"cluster_name"` Slots []SlotRange `json:"slots"` }
PeerProxy is used for server proxy to do the redirection.
type ProxyStore ¶
type ProxyStore struct { ProxyIndex uint64 `json:"proxy_index"` ClusterName string `json:"cluster_name"` NodeAddresses []string `json:"node_addresses"` }
ProxyStore stores the basic proxy metadata
func (*ProxyStore) Decode ¶
func (proxy *ProxyStore) Decode(data []byte) error
Decode decodes json string
func (*ProxyStore) Encode ¶
func (proxy *ProxyStore) Encode() ([]byte, error)
Encode encodes json string
type ReplPeer ¶
type ReplPeer struct { NodeAddress string `json:"node_address"` ProxyAddress string `json:"proxy_address"` }
ReplPeer stores the replication peer
type SlotRange ¶
type SlotRange struct { Start uint64 `json:"start"` End uint64 `json:"end"` Tag SlotRangeTag `json:"tag"` }
SlotRange is the slot range of redis cluster. Start and End will be the same the single slot.
type SlotRangeStore ¶
type SlotRangeStore struct { Start uint64 `json:"start"` End uint64 `json:"end"` Tag SlotRangeTagStore `json:"tag"` }
SlotRangeStore stores the slot range
type SlotRangeTag ¶
type SlotRangeTag struct { TagType MigrationTagType Meta *MigrationMeta }
SlotRangeTag includes the migration type and migration metadata
func (*SlotRangeTag) MarshalJSON ¶
func (slotRangeTag *SlotRangeTag) MarshalJSON() ([]byte, error)
MarshalJSON changes the json format of SlotRangeTag to the Rust Serde format.
func (*SlotRangeTag) UnmarshalJSON ¶
func (slotRangeTag *SlotRangeTag) UnmarshalJSON(data []byte) error
UnmarshalJSON changes the json format of SlotRangeTag to the Rust Serde format.
type SlotRangeTagStore ¶
type SlotRangeTagStore struct { TagType MigrationTagType `json:"tag_type"` Meta *MigrationMetaStore `json:"meta"` }
SlotRangeTagStore stores the tag and migration meta
type TxnBroker ¶
type TxnBroker struct {
// contains filtered or unexported fields
}
TxnBroker implements some building block for transaction.
func NewTxnBroker ¶
func NewTxnBroker(config *EtcdConfig, stm conc.STM) *TxnBroker
NewTxnBroker creates TxnBroker.