broker

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// CompressionStrategyDisabled disables compression
	CompressionStrategyDisabled = "disabled"
	// CompressionStrategySetGetOnly enables compression and restrict the commands
	// to only SET family and GET for string data type.
	CompressionStrategySetGetOnly = "set_get_only"
	// CompressionStrategyAllowAll enables compression and allow all the commands,
	// even though they will get the wrong results.
	CompressionStrategyAllowAll = "allow_all"
)
View Source
const MasterRole = "master"

MasterRole represents master node.

View Source
const MaxClusterNameLen = 30

MaxClusterNameLen is the maximum length of cluter name

View Source
const MaxSlotNumber = 16384

MaxSlotNumber is specified by Redis Cluster

View Source
const ReplicaRole = "replica"

ReplicaRole represents replcia node.

Variables

View Source
var ErrAllocatedProxyInUse = errors.New("allocated proxy is in use")

ErrAllocatedProxyInUse tells the client to try again.

View Source
var ErrAlreadyMigrating = errors.New("cluster is migrating")

ErrAlreadyMigrating indicates the cluster has already started migration.

View Source
var ErrCanNotMigrate = errors.New("cluster cannot migrate")

ErrCanNotMigrate indicates that the cluster does not have enough empty chunks to do the migration.

View Source
var ErrClusterExists = errors.New("cluster already exists")

ErrClusterExists indicates the cluster has already existed.

View Source
var ErrClusterNotFound = errors.New("cluster not found")

ErrClusterNotFound indicates that cluster does not exist.

View Source
var ErrEmptyChunksExist = errors.New("there're empty chunks")

ErrEmptyChunksExist indicates that the cluster still need to do the migration for the remaining empty chunks before adding more nodes.

View Source
var ErrGlobalEpochNotFound = errors.New("global epoch not found")

ErrGlobalEpochNotFound indicates that key does not exist.

View Source
var ErrHostExists = errors.New("host already existed")

ErrHostExists indicates the proxy has already existed.

View Source
var ErrInvalidAddress = errors.New("invalid address")

ErrInvalidAddress indicates that the address is empty.

View Source
var ErrInvalidClusterConfig = errors.New("invalid cluster config")

ErrInvalidClusterConfig indicates the config is invalid

View Source
var ErrInvalidClusterName = errors.New("invalid cluster name. Length should not be longer than 30. Should only contains letters, digit, and '.' '-' '_'")

ErrInvalidClusterName indicates the cluster name is not valid.

View Source
var ErrInvalidNodesNum = errors.New("invalid node number")

ErrInvalidNodesNum indicates invalid node number.

View Source
var ErrInvalidRequestedMigrationSlotRange = errors.New("invalid requested migration task")

ErrInvalidRequestedMigrationSlotRange indicates the request is not valid

View Source
var ErrInvalidRequestedNodesNum = errors.New("invalid node number")

ErrInvalidRequestedNodesNum indicates invalid node number.

View Source
var ErrInvalidRequestedProxyNum = errors.New("invalid requested proxy number")

ErrInvalidRequestedProxyNum indicates that the remaining resources can't produce enough chunks.

View Source
var ErrMigrationTaskNotFound = errors.New("migration task not found")

ErrMigrationTaskNotFound indicates we can't find the slot range.

View Source
var ErrMigrationTaskNotMatch = errors.New("migration task not found")

ErrMigrationTaskNotMatch indicates we can't find a slot range matches the task.

View Source
var ErrNoAvailableNodes = errors.New("no available nodes to start migration")

ErrNoAvailableNodes indicates no nodes with empty slots found.

View Source
var ErrNoAvailableResource = errors.New("no available resource")

ErrNoAvailableResource indicates no available resource.

View Source
var ErrNoChunkResource = errors.New("no chunk resource")

ErrNoChunkResource indicates that the remaining resources can't produce enough chunks.

View Source
var ErrProxyInUse = errors.New("proxy is in use")

ErrProxyInUse indicates that key does not exist.

View Source
var ErrProxyNotFound = errors.New("proxy not found")

ErrProxyNotFound indicates that proxy does not exist.

View Source
var ErrProxyNotInUse = errors.New("proxy not in use")

ErrProxyNotInUse indicates that key does not exist.

View Source
var ErrTryAgain = errors.New("try again")

ErrTryAgain indicates the cache is stall and client need to try again.

View Source
var ErrTxnFailed = errors.New("txn failed")

ErrTxnFailed indicates the transaction failed.

Functions

This section is empty.

Types

type ChunkRolePosition

type ChunkRolePosition int

ChunkRolePosition indicates the roles in the chunk

const (
	// ChunkRoleNormalPosition indicates each proxy has one master.
	ChunkRoleNormalPosition ChunkRolePosition = 0
	// ChunkRoleFirstChunkMaster indicates all the masters are in the first proxy.
	ChunkRoleFirstChunkMaster ChunkRolePosition = 1
	// ChunkRoleSecondChunkMaster indicates all the masters are in the second proxy.
	ChunkRoleSecondChunkMaster ChunkRolePosition = 2
)

type Cluster

type Cluster struct {
	Name   string        `json:"name"`
	Epoch  uint64        `json:"epoch"`
	Nodes  []*Node       `json:"nodes"`
	Config ClusterConfig `json:"config"`
}

Cluster is the redis cluster we implement.

type ClusterConfig added in v0.1.1

type ClusterConfig struct {
	CompressionStrategy string `json:"compression_strategy"`
}

ClusterConfig is the config of each cluster.

func NewClusterConfig added in v0.1.1

func NewClusterConfig() *ClusterConfig

NewClusterConfig creates ClusterConfig with default settings.

type ClusterStore

type ClusterStore struct {
	Chunks []*NodeChunkStore `json:"chunks"`
	Config *ClusterConfig    `json:"cluster_config"`
}

ClusterStore stores the nodes

func (*ClusterStore) CommitMigration

func (cluster *ClusterStore) CommitMigration(taskSlots SlotRange) error

CommitMigration finish the slots migration.

func (*ClusterStore) Decode

func (cluster *ClusterStore) Decode(data []byte) error

Decode decodes json string

func (*ClusterStore) Encode

func (cluster *ClusterStore) Encode() ([]byte, error)

Encode encodes json string

func (*ClusterStore) FindChunkByProxy

func (cluster *ClusterStore) FindChunkByProxy(proxyAddress string) (*NodeChunkStore, error)

FindChunkByProxy find chunk by proxy address.

func (*ClusterStore) HasEmptyChunks

func (cluster *ClusterStore) HasEmptyChunks() bool

HasEmptyChunks checks whether there're still chunks that do not have slots.

func (*ClusterStore) LimitMigration

func (cluster *ClusterStore) LimitMigration(migrationLimit int64) (*ClusterStore, error)

LimitMigration reduces the concurrent migration flags. This implementation is a bit tricky. The stored data do not allow some of shards are migrating while others not. They are all set with the flags. We only reduce the migration in the query API. (1) Only the former shards finish the migration, will the later ones get started. (2) And once started, since the former one will not restart migration again, the later ones will not stop until they are done. (3) The later migration flags will be updated to server proxies with new epoch bumped by the committing of former ones.

func (*ClusterStore) SplitSlots

func (cluster *ClusterStore) SplitSlots(newEpoch uint64) error

SplitSlots splits the slots from the first half to the second half.

type EtcdConfig

type EtcdConfig struct {
	PathPrefix     string
	FailureTTL     int64
	MigrationLimit int64
	FailureQuorum  int64
}

EtcdConfig stores broker config.

type EtcdMetaBroker

type EtcdMetaBroker struct {
	// contains filtered or unexported fields
}

EtcdMetaBroker implements the MetaDataBroker interface and serves as a broker backend.

func NewEtcdMetaBroker

func NewEtcdMetaBroker(config *EtcdConfig, client *clientv3.Client) (*EtcdMetaBroker, error)

NewEtcdMetaBroker creates EtcdMetaBroker

func NewEtcdMetaBrokerFromEndpoints

func NewEtcdMetaBrokerFromEndpoints(config *EtcdConfig, endpoints []string) (*EtcdMetaBroker, error)

NewEtcdMetaBrokerFromEndpoints creates EtcdMetaBroker from endpoints

func (*EtcdMetaBroker) AddFailure

func (broker *EtcdMetaBroker) AddFailure(ctx context.Context, address string, reportID string) error

AddFailure add failures reported by coordinators

func (*EtcdMetaBroker) ClearCache

func (broker *EtcdMetaBroker) ClearCache()

ClearCache clears the cached metadata.

func (*EtcdMetaBroker) GetCluster

func (broker *EtcdMetaBroker) GetCluster(ctx context.Context, name string) (*Cluster, error)

GetCluster queries a cluster by name

func (*EtcdMetaBroker) GetClusterNames

func (broker *EtcdMetaBroker) GetClusterNames(ctx context.Context) ([]string, error)

GetClusterNames retrieves all the cluster names from etcd.

func (*EtcdMetaBroker) GetConfig added in v0.1.4

func (broker *EtcdMetaBroker) GetConfig() *EtcdConfig

GetConfig returns EtcdConfig

func (*EtcdMetaBroker) GetFailures

func (broker *EtcdMetaBroker) GetFailures(ctx context.Context) ([]string, error)

GetFailures retrieves the failures

func (*EtcdMetaBroker) GetProxy

func (broker *EtcdMetaBroker) GetProxy(ctx context.Context, address string) (*Host, error)

GetProxy query the host by address

func (*EtcdMetaBroker) GetProxyAddresses

func (broker *EtcdMetaBroker) GetProxyAddresses(ctx context.Context) ([]string, error)

GetProxyAddresses queries all proxies.

func (*EtcdMetaBroker) Serve

func (broker *EtcdMetaBroker) Serve(ctx context.Context) error

Serve runs the routine cleanup of cache

type EtcdMetaManipulationBroker

type EtcdMetaManipulationBroker struct {
	// contains filtered or unexported fields
}

EtcdMetaManipulationBroker is mainly for metadata modification

func NewEtcdMetaManipulationBroker

func NewEtcdMetaManipulationBroker(config *EtcdConfig, client *clientv3.Client, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)

NewEtcdMetaManipulationBroker creates EtcdMetaManipulationBroker

func NewEtcdMetaManipulationBrokerFromEndpoints

func NewEtcdMetaManipulationBrokerFromEndpoints(config *EtcdConfig, endpoints []string, metaDataBroker *EtcdMetaBroker) (*EtcdMetaManipulationBroker, error)

NewEtcdMetaManipulationBrokerFromEndpoints creates EtcdMetaManipulationBroker from endpoints

func (*EtcdMetaManipulationBroker) AddNodesToCluster

func (broker *EtcdMetaManipulationBroker) AddNodesToCluster(ctx context.Context, clusterName string) error

AddNodesToCluster adds chunks to cluster.

func (*EtcdMetaManipulationBroker) AddProxy

func (broker *EtcdMetaManipulationBroker) AddProxy(ctx context.Context, address string, nodes []string) error

AddProxy adds new proxy and removes it from failed proxies

func (*EtcdMetaManipulationBroker) CommitMigration

func (broker *EtcdMetaManipulationBroker) CommitMigration(ctx context.Context, task MigrationTaskMeta) error

CommitMigration finishes the migration.

func (*EtcdMetaManipulationBroker) CreateCluster

func (broker *EtcdMetaManipulationBroker) CreateCluster(ctx context.Context, clusterName string, nodeNum uint64) error

CreateCluster creates a new cluster with specified node number

func (*EtcdMetaManipulationBroker) GetMetaBroker

func (broker *EtcdMetaManipulationBroker) GetMetaBroker() *EtcdMetaBroker

GetMetaBroker returens the EtcdMetaBroker

func (*EtcdMetaManipulationBroker) InitGlobalEpoch

func (broker *EtcdMetaManipulationBroker) InitGlobalEpoch() error

InitGlobalEpoch initializes the global_epoch if it does not exist

func (*EtcdMetaManipulationBroker) MigrateSlots

func (broker *EtcdMetaManipulationBroker) MigrateSlots(ctx context.Context, clusterName string) error

MigrateSlots splits the slots to another half cluster.

func (*EtcdMetaManipulationBroker) RemoveCluster

func (broker *EtcdMetaManipulationBroker) RemoveCluster(ctx context.Context, clusterName string) error

RemoveCluster removes the cluster.

func (*EtcdMetaManipulationBroker) RemoveProxy

func (broker *EtcdMetaManipulationBroker) RemoveProxy(ctx context.Context, address string) error

RemoveProxy remove a free proxy.

func (*EtcdMetaManipulationBroker) RemoveUnusedProxiesFromCluster

func (broker *EtcdMetaManipulationBroker) RemoveUnusedProxiesFromCluster(ctx context.Context, clusterName string) error

RemoveUnusedProxiesFromCluster free the unused proxies.

func (*EtcdMetaManipulationBroker) ReplaceProxy

func (broker *EtcdMetaManipulationBroker) ReplaceProxy(ctx context.Context, address string) (*Host, error)

ReplaceProxy changes the proxy and return the new one.

func (*EtcdMetaManipulationBroker) SetConfig added in v0.1.1

func (broker *EtcdMetaManipulationBroker) SetConfig(ctx context.Context, clusterName string, config map[string]string) error

SetConfig change the config of cluster

type FailedProxyStore

type FailedProxyStore struct {
	NodeAddresses []string `json:"node_addresses"`
}

FailedProxyStore stores

func (*FailedProxyStore) Decode

func (meta *FailedProxyStore) Decode(data []byte) error

Decode decodes json string

func (*FailedProxyStore) Encode

func (meta *FailedProxyStore) Encode() ([]byte, error)

Encode encodes json string

type Host

type Host struct {
	Address        string                   `json:"address"`
	Epoch          uint64                   `json:"epoch"`
	Nodes          []*Node                  `json:"nodes"`
	FreeNodes      []string                 `json:"free_nodes"`
	Peers          []*PeerProxy             `json:"peers"`
	ClustersConfig map[string]ClusterConfig `json:"clusters_config"`
}

Host is the proxies on each physical machine.

type MetaDataBroker

type MetaDataBroker interface {
	GetClusterNames(ctx context.Context) ([]string, error)
	GetCluster(ctx context.Context, name string) (*Cluster, error)
	GetProxyAddresses(ctx context.Context) ([]string, error)
	GetProxy(ctx context.Context, address string) (*Host, error)
	AddFailure(ctx context.Context, address string, reportID string) error
	GetFailures(ctx context.Context) ([]string, error)
}

MetaDataBroker abstracts the ability to check meta data and detect failures.

type MetaManipulationBroker

type MetaManipulationBroker interface {
	// Basic API
	ReplaceProxy(ctx context.Context, proxyAddress string) (*Host, error)
	CommitMigration(ctx context.Context, task MigrationTaskMeta) error

	// Extended API
	InitGlobalEpoch() error
	CreateCluster(ctx context.Context, clusterName string, nodeNum uint64) error
	AddProxy(ctx context.Context, address string, nodes []string) error
	AddNodesToCluster(ctx context.Context, clusterName string) error
	MigrateSlots(ctx context.Context, clusterName string) error
	RemoveProxy(ctx context.Context, address string) error
	RemoveUnusedProxiesFromCluster(ctx context.Context, clusterName string) error
	RemoveCluster(ctx context.Context, clusterName string) error
	SetConfig(ctx context.Context, clusterName string, config map[string]string) error
}

MetaManipulationBroker abstracts the ability to manipulate clusters.

type MigrationMeta

type MigrationMeta struct {
	Epoch           uint64 `json:"epoch"`
	SrcProxyAddress string `json:"src_proxy_address"`
	SrcNodeAddress  string `json:"src_node_address"`
	DstProxyAddress string `json:"dst_proxy_address"`
	DstNodeAddress  string `json:"dst_node_address"`
}

MigrationMeta includes the migration metadata

type MigrationMetaStore

type MigrationMetaStore struct {
	Epoch         uint64 `json:"epoch"`
	SrcProxyIndex uint64 `json:"src_proxy_index"`
	DstProxyIndex uint64 `json:"dst_proxy_index"`
}

MigrationMetaStore stores the migration meta

type MigrationTagType

type MigrationTagType string

MigrationTagType consists of "Migrating", "Importing", and "None"

const (
	// MigratingTag is for source node
	MigratingTag MigrationTagType = "Migrating"
	// ImportingTag is for destination node
	ImportingTag MigrationTagType = "Importing"
	// NoneTag is for stable slots
	NoneTag MigrationTagType = "None"
)

type MigrationTaskMeta

type MigrationTaskMeta struct {
	DBName string    `json:"db_name"`
	Slots  SlotRange `json:"slot_range"`
}

MigrationTaskMeta denotes the migration task.

type Node

type Node struct {
	Address      string      `json:"address"`
	ProxyAddress string      `json:"proxy_address"`
	ClusterName  string      `json:"cluster_name"`
	Slots        []SlotRange `json:"slots"`
	Repl         ReplMeta    `json:"repl"`
}

Node is redis node.

type NodeChunkStore

type NodeChunkStore struct {
	RolePosition ChunkRolePosition
	// Currently []SlotRangeStore has only at most one SlotRange with NoneTag
	// and at most one SlotRange with Migrating or Importing Tag.
	Slots [2][]SlotRangeStore `json:"slots"`
	Nodes [4]*NodeStore       `json:"nodes"`
}

NodeChunkStore stores 4 nodes as a group

func (*NodeChunkStore) GetMasterNodes

func (chunk *NodeChunkStore) GetMasterNodes() [2]*NodeStore

GetMasterNodes gets the master nodes

func (*NodeChunkStore) SwitchMaster

func (chunk *NodeChunkStore) SwitchMaster(failedProxyAddress string) error

SwitchMaster takes over the master role

type NodeStore

type NodeStore struct {
	NodeAddress  string `json:"node_address"`
	ProxyAddress string `json:"proxy_address"`
}

NodeStore stores the metadata of node

type PeerProxy

type PeerProxy struct {
	ProxyAddress string      `json:"proxy_address"`
	ClusterName  string      `json:"cluster_name"`
	Slots        []SlotRange `json:"slots"`
}

PeerProxy is used for server proxy to do the redirection.

type ProxyStore

type ProxyStore struct {
	ProxyIndex    uint64   `json:"proxy_index"`
	ClusterName   string   `json:"cluster_name"`
	NodeAddresses []string `json:"node_addresses"`
}

ProxyStore stores the basic proxy metadata

func (*ProxyStore) Decode

func (proxy *ProxyStore) Decode(data []byte) error

Decode decodes json string

func (*ProxyStore) Encode

func (proxy *ProxyStore) Encode() ([]byte, error)

Encode encodes json string

type ReplMeta

type ReplMeta struct {
	Role  Role       `json:"role"`
	Peers []ReplPeer `json:"peers"`
}

ReplMeta stores the replication metadata

type ReplPeer

type ReplPeer struct {
	NodeAddress  string `json:"node_address"`
	ProxyAddress string `json:"proxy_address"`
}

ReplPeer stores the replication peer

type Role

type Role = string

Role could be 'master' or 'replica'.

type SlotRange

type SlotRange struct {
	Start uint64       `json:"start"`
	End   uint64       `json:"end"`
	Tag   SlotRangeTag `json:"tag"`
}

SlotRange is the slot range of redis cluster. Start and End will be the same the single slot.

type SlotRangeStore

type SlotRangeStore struct {
	Start uint64            `json:"start"`
	End   uint64            `json:"end"`
	Tag   SlotRangeTagStore `json:"tag"`
}

SlotRangeStore stores the slot range

type SlotRangeTag

type SlotRangeTag struct {
	TagType MigrationTagType
	Meta    *MigrationMeta
}

SlotRangeTag includes the migration type and migration metadata

func (*SlotRangeTag) MarshalJSON

func (slotRangeTag *SlotRangeTag) MarshalJSON() ([]byte, error)

MarshalJSON changes the json format of SlotRangeTag to the Rust Serde format.

func (*SlotRangeTag) UnmarshalJSON

func (slotRangeTag *SlotRangeTag) UnmarshalJSON(data []byte) error

UnmarshalJSON changes the json format of SlotRangeTag to the Rust Serde format.

type SlotRangeTagStore

type SlotRangeTagStore struct {
	TagType MigrationTagType    `json:"tag_type"`
	Meta    *MigrationMetaStore `json:"meta"`
}

SlotRangeTagStore stores the tag and migration meta

type TxnBroker

type TxnBroker struct {
	// contains filtered or unexported fields
}

TxnBroker implements some building block for transaction.

func NewTxnBroker

func NewTxnBroker(config *EtcdConfig, stm conc.STM) *TxnBroker

NewTxnBroker creates TxnBroker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL