kvdb

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2019 License: Apache-2.0 Imports: 6 Imported by: 0

README

kvdb

GoDoc Travis branch Go Report Card Code Coverage

Key Value Store abstraction library.

The kvdb library abstracts the caller from the specific key-value database implementation. The main goal of the kvdb library is to provide simple APIs to deal with only keys and values, and abstract away the intricate details of a specific key value stores. It also provides support for complex APIs like Snapshot, Watch and Lock which are built using the basic APIs.

Supported key value stores
  • Etcd v2
  • Etcd v3
  • Consul
  • In-memory store (local to the node)
  • Bolt DB (local to the node)
  • Zookeeper
Usage

The kvdb library is easy to use and requires you to create a new instance of the Kvdb object

package main

import (
  "github.com/portworx/kvdb"
  "github.com/portworx/kvdb/etcd/v3"
  "github.com/libopenstorage/openstorage/pkg/dbg"
)

func getKvdb(
  kvdbName string, // Use one of the kv store implementation names
  basePath string, // The path under which all the keys will be created by this kv instance
  discoveryEndpoints []string,  // A list of kv store endpoints
  options map[string]string, // Options that need to be passed to the kv store
  panicHandler kvdb.FatalErrorCB, // A callback function to execute when the library needs to panic
) (kvdb.Kvdb, error) {

	kv, err := kvdb.New(
		kvdbName,
		basePath,
		discoveryEndpoints,
		options,
		panicHandler,
	)
  return kv, err

}

type A struct {
   a1 string
   a2 int
}

func main() {

  // An example kvdb using etcd v3 as a key value store
  kv, err := getKvdb(
    v3.Name,
    "root/",
    []{"127.0.0.1:2379"},
    nil,
    dbg.Panicf,
  )
  if err != nil {
    fmt.Println("Failed to create a kvdb instance: ", err)
    return
  }

  // Put a key value pair foo=bar
  a := &A{"bar", 1}
  _, err = kv.Put("foo", &a, 0)
  if err != nil {
    fmt.Println("Failed to put a key in kvdb: ", err)
    return
  }

  // Get a key
  value := A{}
  _, err = kv.GetVal("foo", &value)
  if err != nil {
    fmt.Println("Failed to get a key from kvdb: ", err)
    return
  }
}

Contributing

We are always looking for contributions from the open source community. Send out a PR and we will review it.

License

kvdb library is licensed under the Apache License 2.0

Documentation

Index

Constants

View Source
const (
	// ReadPermission for read only access
	ReadPermission = iota
	// WritePermission for write only access
	WritePermission
	// ReadWritePermission for read-write access
	ReadWritePermission
)
View Source
const (
	// UsernameKey for an authenticated kvdb endpoint
	UsernameKey = "Username"
	// PasswordKey for an authenticated kvdb endpoint
	PasswordKey = "Password"
	// CAFileKey is the CA file path for an authenticated kvdb endpoint
	CAFileKey = "CAFile"
	// CertFileKey is the certificate file path for an authenticated kvdb endpoint
	CertFileKey = "CertFile"
	// CertKeyFileKey is the key to the certificate
	CertKeyFileKey = "CertKeyFile"
	// TrustedCAFileKey is the key for the trusted CA.
	TrustedCAFileKey = "TrustedCAFile"
	// ClientCertAuthKey is the boolean value indicating client authenticated certificate.
	ClientCertAuthKey = "ClientCertAuth"
	// RetryCountKey is the integer value indicating the retry count of etcd operations
	RetryCountKey = "RetryCount"
	// ACLTokenKey is the token value for ACL based KV stores
	ACLTokenKey = "ACLToken"
	// CAAuthAddress is the address of CA signing authority (required in consul TLS config)
	CAAuthAddress = "CAAuthAddress"
	// InsecureSkipVerify has a value true or false (required in consul TLS config)
	InsecureSkipVerify = "InsecureSkipVerify"
	// TransportScheme points to http transport being either http or https.
	TransportScheme = "TransportScheme"

	// MaxCallSendMsgSize is the client-side request send limit in bytes.
	// If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
	// Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
	// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
	MaxCallSendMsgSize = "MaxCallSendMsgSize"

	// MaxCallRecvMsgSize is the client-side response receive limit.
	// If 0, it defaults to "math.MaxInt32", because range response can
	// easily exceed request send limits.
	// Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
	// ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
	MaxCallRecvMsgSize = "MaxCallRecvMsgSize"
)
View Source
const (
	// ConsulVersion1 key
	ConsulVersion1 = "consulv1"
	// EtcdBaseVersion key
	EtcdBaseVersion = "etcd"
	// EtcdVersion3 key
	EtcdVersion3 = "etcdv3"
	// MemVersion1 key
	MemVersion1 = "memv1"
	// BoltVersion1 key
	BoltVersion1 = "boltv1"
	// ZookeeperVersion1 key
	ZookeeperVersion1 = "zookeeperv1"
)

List of kvdb endpoints supported versions

View Source
const (
	// DefaultLockTryDuration is the maximum time spent trying to acquire lock
	DefaultLockTryDuration = 300 * time.Second
	// DefaultSeparator separate key components
	DefaultSeparator = "/"
)
View Source
const (
	// PeerPort is the port on which peers identify themselves
	PeerPort = "2380"
	// ClientPort is the port on which clients send requests to kvdb.
	ClientPort = "2379"
)

List of kvdb controller ports

View Source
const (
	// KVCapabilityOrderedUpdates support requires watch to send an watch update
	// for every put - instead of coalescing multiple puts in one update.
	KVCapabilityOrderedUpdates = 1 << iota
)

Variables

View Source
var (
	// ErrNotSupported implemenation of a specific function is not supported.
	ErrNotSupported = errors.New("implementation not supported")
	// ErrWatchStopped is raised when user stops watch.
	ErrWatchStopped = errors.New("Watch Stopped")
	// ErrNotFound raised if Key is not found
	ErrNotFound = errors.New("Key not found")
	// ErrExist raised if key already exists
	ErrExist = errors.New("Key already exists")
	// ErrUnmarshal raised if Get fails to unmarshal value.
	ErrUnmarshal = errors.New("Failed to unmarshal value")
	// ErrIllegal raised if object is not valid.
	ErrIllegal = errors.New("Illegal operation")
	// ErrValueMismatch raised if existing KVDB value mismatches with user provided value
	ErrValueMismatch = errors.New("Value mismatch")
	// ErrEmptyValue raised if the value is empty
	ErrEmptyValue = errors.New("Value cannot be empty")
	// ErrModified raised during an atomic operation if the index does not match the one in the store
	ErrModified = errors.New("Key Index mismatch")
	// ErrSetTTLFailed raised if unable to set ttl value for a key create/put/update action
	ErrSetTTLFailed = errors.New("Unable to set ttl value")
	// ErrTTLNotSupported if kvdb implementation doesn't support TTL
	ErrTTLNotSupported = errors.New("TTL value not supported")
	// ErrInvalidLock Lock and unlock operations don't match.
	ErrInvalidLock = errors.New("Invalid lock/unlock operation")
	// ErrNoPassword provided
	ErrNoPassword = errors.New("Username provided without any password")
	// ErrAuthNotSupported authentication not supported for this kvdb implementation
	ErrAuthNotSupported = errors.New("Kvdb authentication not supported")
	// ErrNoCertificate no certificate provided for authentication
	ErrNoCertificate = errors.New("Certificate File Path not provided")
	// ErrUnknownPermission raised if unknown permission type
	ErrUnknownPermission = errors.New("Unknown Permission Type")
	// ErrMemberDoesNotExist returned when an operation fails for a member
	// which does not exist
	ErrMemberDoesNotExist = errors.New("Kvdb member does not exist")
)
View Source
var (
	// ControllerNotSupported is a null controller implementation. This can be used
	// kvdb implementors that do no want to implement the controller interface
	ControllerNotSupported = &controllerNotSupported{}
)

Functions

func Register

func Register(name string, dsInit DatastoreInit, dsVersion DatastoreVersion) error

Register adds specified datastore backend to the list of options.

func SetInstance

func SetInstance(kvdb Kvdb) error

SetInstance sets the singleton instance.

func Version

func Version(name string, url string, kvdbOptions map[string]string) (string, error)

Version returns the supported version for the provided kvdb endpoint.

Types

type Controller

type Controller interface {
	// AddMember adds a new member to an existing kvdb cluster. Add should be
	// called on a kvdb node where kvdb is already running. It should be
	// followed by a Setup call on the actual node
	// Returns: map of nodeID to peerUrls of all members in the initial cluster or error
	AddMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

	// RemoveMember removes a member from an existing kvdb cluster
	// Returns: error if it fails to remove a member
	RemoveMember(nodeName, nodeIP string) error

	// UpdateMember updates the IP for the given node in an existing kvdb cluster
	// Returns: map of nodeID to peerUrls of all members from the existing cluster
	UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

	// ListMembers enumerates the members of the kvdb cluster
	// Returns: the nodeID  to memberInfo mappings of all the members
	ListMembers() (map[string]*MemberInfo, error)

	// SetEndpoints set the kvdb endpoints for the client
	SetEndpoints(endpoints []string) error

	// GetEndpoints returns the kvdb endpoints for the client
	GetEndpoints() []string

	// Defragment defrags the underlying database for the given endpoint
	// with a timeout specified in seconds
	Defragment(endpoint string, timeout int) error
}

Controller interface provides APIs to manage Kvdb Cluster and Kvdb Clients.

type CopySelect

type CopySelect func(val interface{}) interface{}

CopySelect function is a callback function provided to EnumerateWithSelect API This fn should perform a deep copy of the input interface and return the copy

type DatastoreInit

type DatastoreInit func(domain string, machines []string, options map[string]string,
	cb FatalErrorCB) (Kvdb, error)

DatastoreInit is called to activate a backend KV store.

type DatastoreVersion

type DatastoreVersion func(url string, kvdbOptions map[string]string) (string, error)

DatastoreVersion is called to get the version of a backend KV store

type EnumerateSelect

type EnumerateSelect func(val interface{}) bool

EnumerateSelect function is a callback function provided to EnumerateWithSelect API This fn is executed over all the keys and only those values are returned by Enumerate for which this function return true.

type FatalErrorCB

type FatalErrorCB func(format string, args ...interface{})

FatalErrorCB callback is invoked incase of fatal errors

type KVAction

type KVAction int

KVAction specifies the action on a KV pair. This is useful to make decisions from the results of a Watch.

const (
	// KVSet signifies the KV was modified.
	KVSet KVAction = 1 << iota
	// KVCreate set if the KV pair was created.
	KVCreate
	// KVGet set when the key is fetched from the KV store
	KVGet
	// KVDelete set when the key is deleted from the KV store
	KVDelete
	// KVExpire set when the key expires
	KVExpire
	// KVUknown operation on KV pair
	KVUknown
)

type KVFlags

type KVFlags uint64

KVFlags options for operations on KVDB

const (
	// KVPrevExists flag to check key already exists
	KVPrevExists KVFlags = 1 << iota
	// KVCreatedIndex flag compares with passed in index (possibly in KVPair)
	KVCreatedIndex
	// KVModifiedIndex flag compares with passed in index (possibly in KVPair)
	KVModifiedIndex
	// KVTTL uses TTL val from KVPair.
	KVTTL
)

type KVPair

type KVPair struct {
	// Key for this kv pair.
	Key string
	// Value for this kv pair
	Value []byte
	// Action the last action on this KVPair.
	Action KVAction
	// TTL value after which this key will expire from KVDB
	TTL int64
	// KVDBIndex A Monotonically index updated at each modification operation.
	KVDBIndex uint64
	// CreatedIndex for this kv pair
	CreatedIndex uint64
	// ModifiedIndex for this kv pair
	ModifiedIndex uint64
	// Lock is a generic interface to represent a lock held on a key.
	Lock interface{}
}

KVPair represents the results of an operation on KVDB.

type KVPairs

type KVPairs []*KVPair

KVPairs list of KVPairs

type Kvdb

type Kvdb interface {
	Controller
	// String representation of backend datastore.
	String() string
	// Capbilities - see KVCapabilityXXX
	Capabilities() int
	// Get returns KVPair that maps to specified key or ErrNotFound.
	Get(key string) (*KVPair, error)
	// Get returns KVPair that maps to specified key or ErrNotFound. If found
	// value contains the unmarshalled result or error is ErrUnmarshal
	GetVal(key string, value interface{}) (*KVPair, error)
	// GetWithCopy returns a copy of the value as an interface for the specified key
	GetWithCopy(key string, copySelect CopySelect) (interface{}, error)
	// Put inserts value at key in kvdb. If value is a runtime.Object, it is
	// marshalled. If Value is []byte it is set directly. If Value is a string,
	// its byte representation is stored.
	Put(key string, value interface{}, ttl uint64) (*KVPair, error)
	// Create is the same as Put except that ErrExist is returned if the key exists.
	Create(key string, value interface{}, ttl uint64) (*KVPair, error)
	// Update is the same as Put except that ErrNotFound is returned if the key
	// does not exist.
	Update(key string, value interface{}, ttl uint64) (*KVPair, error)
	// Enumerate returns a list of KVPair for all keys that share the specified prefix.
	Enumerate(prefix string) (KVPairs, error)
	// EnumerateWithSelect returns a copy of all values under the prefix that satisfy the select
	// function in the provided output array of interfaces
	EnumerateWithSelect(prefix string, enumerateSelect EnumerateSelect, copySelect CopySelect) ([]interface{}, error)
	// Delete deletes the KVPair specified by the key. ErrNotFound is returned
	// if the key is not found. The old KVPair is returned if successful.
	Delete(key string) (*KVPair, error)
	// DeleteTree same as Delete execpt that all keys sharing the prefix are
	// deleted.
	DeleteTree(prefix string) error
	// Keys returns an array of keys that share specified prefix (ie. "1st level directory").
	// sep parameter defines a key-separator, and if not provided the "/" is assumed.
	Keys(prefix, sep string) ([]string, error)
	// CompareAndSet updates value at kvp.Key if the previous resident
	// satisfies conditions set in flags and optional prevValue.
	CompareAndSet(kvp *KVPair, flags KVFlags, prevValue []byte) (*KVPair, error)
	// CompareAndDelete deletes value at kvp.Key if the previous resident matches
	// satisfies conditions set in flags.
	CompareAndDelete(kvp *KVPair, flags KVFlags) (*KVPair, error)
	// WatchKey calls watchCB everytime a value at key is updated. waitIndex
	// is the oldest ModifiedIndex of a KVPair for which updates are requestd.
	WatchKey(key string, waitIndex uint64, opaque interface{}, watchCB WatchCB) error
	// WatchTree is the same as WatchKey except that watchCB is triggered
	// for updates on all keys that share the prefix.
	WatchTree(prefix string, waitIndex uint64, opaque interface{}, watchCB WatchCB) error
	// Snapshot returns a kvdb snapshot of the provided list of prefixes and the last updated index.
	// If no prefixes are provided, then the whole kvdb tree is snapshotted and could be potentially an expensive operation
	// If consistent is true, then snapshot is going to return all the updates happening during the snapshot operation and the last
	// updated index from the snapshot
	Snapshot(prefixes []string, consistent bool) (Kvdb, uint64, error)
	// SnapPut records the key value pair including the index.
	SnapPut(kvp *KVPair) (*KVPair, error)
	// Lock specfied key and associate a lockerID with it, probably to identify
	// who acquired the lock. The KVPair returned should be used to unlock.
	LockWithID(key string, lockerID string) (*KVPair, error)
	// Lock specfied key. The KVPair returned should be used to unlock.
	Lock(key string) (*KVPair, error)
	// Lock with specified key and associate a lockerID with it.
	// lockTryDuration is the maximum time that can be spent trying to acquire
	// lock, else return error.
	// lockHoldDuration is the maximum time the lock can be held, after which
	// FatalCb is invoked.
	// The KVPair returned should be used to unlock if successful.
	LockWithTimeout(key string, lockerID string, lockTryDuration time.Duration,
		lockHoldDuration time.Duration) (*KVPair, error)
	// Unlock kvp previously acquired through a call to lock.
	Unlock(kvp *KVPair) error
	// TxNew returns a new Tx coordinator object or ErrNotSupported
	TxNew() (Tx, error)
	// AddUser adds a new user to kvdb
	AddUser(username string, password string) error
	// RemoveUser removes a user from kvdb
	RemoveUser(username string) error
	// GrantUserAccess grants user access to a subtree/prefix based on the permission
	GrantUserAccess(username string, permType PermissionType, subtree string) error
	// RevokeUsersAccess revokes user's access to a subtree/prefix based on the permission
	RevokeUsersAccess(username string, permType PermissionType, subtree string) error
	// SetFatalCb sets the function to be called in case of fatal errors
	SetFatalCb(f FatalErrorCB)
	// SetLockTimeout sets maximum time a lock may be held
	SetLockTimeout(timeout time.Duration)
	// GetLockTimeout gets the currently set lock timeout
	GetLockTimeout() time.Duration
	// Serialize serializes all the keys under the domain and returns a byte array
	Serialize() ([]byte, error)
	// Deserialize deserializes the given byte array into a list of kv pairs
	Deserialize([]byte) (KVPairs, error)
}

Kvdb interface implemented by backing datastores.

func Instance

func Instance() Kvdb

Instance returns instance set via SetInstance, nil if none was set.

func New

func New(
	name string,
	domain string,
	machines []string,
	options map[string]string,
	errorCB FatalErrorCB,
) (Kvdb, error)

New return a new instance of KVDB as specified by datastore name. If domain is set all requests to KVDB are prefixed by domain. options is interpreted by backend KVDB.

type MemberInfo

type MemberInfo struct {
	PeerUrls   []string
	ClientUrls []string
	Leader     bool
	DbSize     int64
	IsHealthy  bool
	ID         string
}

MemberInfo represents a member of the kvdb cluster

type PermissionType

type PermissionType int

PermissionType for user access

type ReplayCb

type ReplayCb struct {
	// Prefix is the watch key/tree prefix
	Prefix string
	// WaitIndex is the index after which updates must be returned
	WaitIndex uint64
	// Opaque is a hint returned by the caller
	Opaque interface{}
	// WatchCB is the watch callback
	WatchCB WatchCB
}

ReplayCb provides info required for replay

type Tx

type Tx interface {
	// Put specified key value pair in TX.
	Put(key string, value interface{}, ttl uint64) (*KVPair, error)
	// Get returns KVPair in this TXs view. If not found, returns value from
	// backing KVDB.
	Get(key string) (*KVPair, error)
	// Get same as get except that value has the unmarshalled value.
	GetVal(key string, value interface{}) (*KVPair, error)
	// Prepare returns an error it transaction cannot be logged.
	Prepare() error
	// Commit propagates updates to the KVDB. No operations on this Tx are
	// allowed after commit.
	Commit() error
	// Abort aborts this transaction.  No operations on this Tx are allowed
	// afer commit.
	Abort() error
}

Tx Interface to transactionally apply updates to a set of keys.

type UpdatesCollector

type UpdatesCollector interface {
	// Stop collecting updates
	Stop()
	// ReplayUpdates replays the collected updates.
	// Returns the version until the replay's were done
	// and any errors it encountered.
	ReplayUpdates(updateCb []ReplayCb) (uint64, error)
}

UpdatesCollector collects updates from kvdb.

func NewUpdatesCollector

func NewUpdatesCollector(
	db Kvdb,
	prefix string,
	startIndex uint64,
) (UpdatesCollector, error)

NewUpdatesCollector creates new Kvdb collector that collects updates starting at startIndex + 1 index.

type WatchCB

type WatchCB func(prefix string, opaque interface{}, kvp *KVPair, err error) error

WatchCB is called when a watched key or tree is modified. If the callback returns an error, then watch stops and the cb is called one last time with ErrWatchStopped.

Directories

Path Synopsis
Package consul implements the KVDB interface based on consul.
Package consul implements the KVDB interface based on consul.
etcd
v2
v3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL