leaderless_key_value_store

package module
v0.0.0-...-ec27ee4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2021 License: Unlicense Imports: 21 Imported by: 0

README

Leaderless Key-Value Store

This repo contains an example leaderless key-value store based on:

Nodes are homogenous. Every node acts as both storage and coordinator. This means that any node can accept external queries.

APIs are implemented with gRPC and Protobuf. Nodes store their data using BadgerDB.

Status

A 5-node cluster can accept 1krps of 10KB writes with minimal errors. This is with 2 EPYC CPU cores per node. Performance appeared to be CPU-bound so higher performance should be easily obtainable.

This is not intended as a production commercial project. It is merely for learning. There are numerous FIXME comments throughout the codebase. Many suggest performance improvements but others deal with rare errorcases.

This does not implement data re-replication. Nodes that have been offline and missed some writes do not get that missing data synced to them. Similarly new nodes do not get existing data. This could be added using a Dynamo-style Anti-Entropy process.

Notes

Regenerate protobuf Go code:

protoc -I=. --go_out=. --go-grpc_out=. api/api.proto

Start a 3-node local test server:

go run ./cmd/main/main.go ./examples/3-local-nodes/node-1.yml ./examples/3-local-nodes/cluster.yml
go run ./cmd/main/main.go ./examples/3-local-nodes/node-2.yml ./examples/3-local-nodes/cluster.yml
go run ./cmd/main/main.go ./examples/3-local-nodes/node-3.yml ./examples/3-local-nodes/cluster.yml

Interact manually with the Cluster API:

grpcurl -plaintext -d '{"key": "b"}' -proto api/api.proto localhost:8001 api.Cluster/Get
grpcurl -plaintext -d '{"entry": {"key": "b", "value": "b-value"}}' -proto api/api.proto localhost:8001 api.Cluster/Set

Interact manually with the Node API:

grpcurl -plaintext -d '{}' -proto api/api.proto localhost:8001 api.Node/Info
grpcurl -plaintext -d '{}' -proto api/api.proto localhost:8001 api.Node/Health
grpcurl -plaintext -d '{"key": "a"}' -proto api/api.proto localhost:8001 api.Node/Get
grpcurl -plaintext -d '{"entry": {"key": "a", "value": "a-value"}}' -proto api/api.proto localhost:8001 api.Node/Set

Load test with ghz:

ghz --rps=250 --duration 15s --skipFirst 1000 --insecure --connections 5 --proto ./api/api.proto --call api.Cluster/Set -d '{"entry": {"key": "{{.UUID}}", "value": "{{randomString 1024}}"}}' --lb-strategy=round_robin dns:///rz.46b.it:80

Documentation

Index

Constants

View Source
const (
	MAX_MSG_SIZE = 64 << 20

	PING_RATE    = 5 * time.Second
	PING_TIMEOUT = 5 * time.Second
)

Variables

This section is empty.

Functions

func NewGrpcServer

func NewGrpcServer(extraServerOptions ...grpc.ServerOption) *grpc.Server

func PerformDnsServiceDiscovery

func PerformDnsServiceDiscovery(ctx context.Context, config *DnsServiceDiscoveryConfig, clusterDesc *ClusterDescription)

FIXME: Add TTL-style system to remove expired records

func SetupBadgerStorageMetrics

func SetupBadgerStorageMetrics()

func Write

func Write(entry Entry, cluster *ClusterDescription) error

Types

type ClockServer

type ClockServer struct {
	api.UnimplementedClockServer
	// contains filtered or unexported fields
}

Based on http://rystsov.info/2018/10/01/tso.html

func NewClockServer

func NewClockServer(epochPath string) (*ClockServer, error)

func (*ClockServer) Get

func (*ClockServer) Set

type ClusterDescription

type ClusterDescription struct {
	sync.Mutex

	ReplicationLevel      int
	RendezvousHashingSeed uint32
	StorageNodes          map[string]*StorageNodeDescription
	// FIXME: Move connmanager off this metadata struct and onto more of a
	// 'live' state struct.
	ConnManager *ConnManager
}

func NewClusterDescription

func NewClusterDescription(cfg *CoordinatorConfig, connManager *ConnManager) *ClusterDescription

func (*ClusterDescription) FindNodesForKey

func (c *ClusterDescription) FindNodesForKey(key string) []FoundNode

type ClusterServer

type ClusterServer struct {
	api.UnimplementedClusterServer
	// contains filtered or unexported fields
}

func NewClusterServer

func NewClusterServer(clusterDesc *ClusterDescription) *ClusterServer

func (*ClusterServer) Get

func (*ClusterServer) Info

func (*ClusterServer) Set

type ConnManager

type ConnManager struct {
	sync.Mutex
	PoolSize          int
	RemoveUnusedAfter time.Duration
	Pools             map[string]ConnPool
	LastUsed          map[string]time.Time
}

func NewConnManager

func NewConnManager(poolSize int, removeUnusedAfter time.Duration) *ConnManager

func (*ConnManager) Add

func (m *ConnManager) Add(ctx context.Context, address string, allowRemovalIfUnused bool) error

func (*ConnManager) Get

func (m *ConnManager) Get(address string) (conn grpc.ClientConnInterface, ok bool)

func (*ConnManager) Run

func (m *ConnManager) Run(ctx context.Context)

type ConnPool

type ConnPool interface {
	grpc.ClientConnInterface

	Conn() *grpc.ClientConn
	PoolSize() int
	Close() []error
}

type CoordinatorConfig

type CoordinatorConfig struct {
	RendezvousHashingSeed uint32 `yaml:"rendezvous_hashing_seed"`
	ReplicationLevel      int    `yaml:"replication_level"`

	BindAddress    string   `yaml:"bind_address"`
	StorageNodeIds []string `yaml:"storage_node_ids"`
	// Map from storage node IDs to "hostname:port"
	StaticServiceDiscovery map[string]string          `yaml:"static_service_discovery"`
	DnsServiceDiscovery    *DnsServiceDiscoveryConfig `yaml:"dns_service_discovery"`

	SizeOfConnectionPools            int           `yaml:"size_of_connection_pools"`
	RemoveUnusedConnectionPoolsAfter time.Duration `yaml:"remove_unused_connection_pools_after"`
}

func LoadCoordinatorConfig

func LoadCoordinatorConfig(path string) (*CoordinatorConfig, error)

type DnsServiceDiscoveryConfig

type DnsServiceDiscoveryConfig struct {
	StorageNodeDomain string        `yaml:"storage_node_domain"`
	StorageNodePort   int64         `yaml:"storage_node_port"`
	UpdateInterval    time.Duration `yaml:"update_interval"`
}

type Entry

type Entry struct {
	Key   string `yaml:"key"`
	Value string `yaml:"value"`
}

func Read

func Read(key string, cluster *ClusterDescription) (*Entry, error)

type FoundNode

type FoundNode struct {
	CombinedHash uint64
	Node         *StorageNodeDescription
}

type NodeServer

type NodeServer struct {
	api.UnimplementedNodeServer
	// contains filtered or unexported fields
}

func NewNodeServer

func NewNodeServer(nodeId string, storage *Storage) *NodeServer

func (*NodeServer) Get

func (*NodeServer) Health

func (*NodeServer) Info

func (s *NodeServer) Info(_ *api.InfoRequest, stream api.Node_InfoServer) error

func (*NodeServer) Set

type RoundRobinConnPool

type RoundRobinConnPool struct {
	Index       int32
	Size        int
	Connections []*grpc.ClientConn
}

func NewRoundRobinConnPool

func NewRoundRobinConnPool(ctx context.Context, address string, poolSize int) (*RoundRobinConnPool, error)

func (*RoundRobinConnPool) Close

func (r *RoundRobinConnPool) Close() []error

func (*RoundRobinConnPool) Conn

func (r *RoundRobinConnPool) Conn() *grpc.ClientConn

func (*RoundRobinConnPool) Invoke

func (p *RoundRobinConnPool) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error

func (*RoundRobinConnPool) NewStream

func (p *RoundRobinConnPool) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error)

func (*RoundRobinConnPool) PoolSize

func (r *RoundRobinConnPool) PoolSize() int

type Storage

type Storage struct {
	BadgerDb *badger.DB
}

func NewStorage

func NewStorage(dbPath string) (*Storage, error)

func (*Storage) Close

func (s *Storage) Close()

func (*Storage) Get

func (s *Storage) Get(key string) (*api.ClockedEntry, error)

func (*Storage) Keys

func (s *Storage) Keys() ([]string, error)

func (*Storage) Set

func (s *Storage) Set(value *api.ClockedEntry) error

type StorageNodeConfig

type StorageNodeConfig struct {
	Id                 string `yaml:"id"`
	BindAddress        string `yaml:"bind_address"`
	BindMetricsAddress string `yaml:"bind_metrics_address"`
	ClockEpochFilePath string `yaml:"clock_epoch_file_path"`
	BadgerDbFolder     string `yaml:"badger_db_folder"`
}

func LoadStorageNodeConfig

func LoadStorageNodeConfig(path string) (*StorageNodeConfig, error)

type StorageNodeDescription

type StorageNodeDescription struct {
	Id      string
	Hash    uint32
	Address *string
}

func NewStorageNodeDescription

func NewStorageNodeDescription(id string, rendezvousHashingSeed uint32) *StorageNodeDescription

func (*StorageNodeDescription) Found

func (s *StorageNodeDescription) Found() bool

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL