store

package module
v0.0.0-...-18a1b88 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2015 License: Apache-2.0 Imports: 12 Imported by: 0

README

libkv

libkv provides a Go native library to store metadata.

The goal of libkv is to abstract common store operations for multiple Key/Value backends and offer the same experience no matter which one of the backend you want to use.

For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.

You can also easily implement a generic Leader Election on top of it (see the swarm/leadership package).

As of now, libkv offers support for Consul, Etcd and Zookeeper.

Example of usage

Create a new store and use Put/Get
package main

import (
	"fmt"
	"time"

	log "github.com/Sirupsen/logrus"
	"github.com/docker/swarm/store"
)

func main() {
	client := "localhost:8500"

	// Initialize a new store with consul
	kv, err = store.NewStore(
		store.CONSUL, // or "consul"
		[]string{client},
		&store.Config{
			ConnectionTimeout: 10*time.Second,
		},
	)
	if err != nil {
		log.Error("Cannot create store consul")
	}

	key := "foo"
	err = kv.Put(key, []byte("bar"), nil)
	if err != nil {
		log.Error("Error trying to put value at key `", key, "`")
	}

	pair, err := kv.Get(key)
	if err != nil {
		log.Error("Error trying accessing value at key `", key, "`")
	}

	log.Info("value: ", string(pair.Value))
}

Details

You should expect the same experience for basic operations like Get/Put, etc.

However calls like WatchTree are limited to the common denominator and you should only expect events when nodes are added or deleted (although Etcd and Consul will likely return more events that you should triage).

Create a new storage backend

A new storage backend should include those calls:

type Store interface {
	Put(key string, value []byte, options *WriteOptions) error
	Get(key string) (*KVPair, error)
	Delete(key string) error
	Exists(key string) (bool, error)
	Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
	WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
	NewLock(key string, options *LockOptions) (Locker, error)
	List(prefix string) ([]*KVPair, error)
	DeleteTree(prefix string) error
	AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
	AtomicDelete(key string, previous *KVPair) (bool, error)
	Close()
}

In the case of Swarm and to be eligible as a discovery backend only, a K/V store implementation should at least offer Get, Put, WatchTree and List.

Put should support usage of ttl to be able to remove entries in case of a node failure.

You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend.

##Future

A few points on the ROADMAP:

  • Make the API nicer to use
  • Improve performance (remove extras Get operations)
  • Provide with more options (consistency for example)
  • Add more exhaustive tests
  • New backends?

##Contributing

Want to hack on libkv? Docker's contributions guidelines apply.

##Copyright and license

Code and documentation copyright 2015 Docker, inc. Code released under the Apache 2.0 license. Docs released under Creative commons.

Documentation

Index

Constants

View Source
const (
	// MOCK backend
	MOCK Backend = "mock"
	// CONSUL backend
	CONSUL = "consul"
	// ETCD backend
	ETCD = "etcd"
	// ZK backend
	ZK = "zk"
)
View Source
const (
	// DefaultWatchWaitTime is how long we block for at a time to check if the
	// watched key has changed.  This affects the minimum time it takes to
	// cancel a watch.
	DefaultWatchWaitTime = 15 * time.Second
)

Variables

View Source
var (
	// ErrInvalidTTL is a specific error to consul
	ErrInvalidTTL = errors.New("Invalid TTL, please change the value to the miminum allowed ttl for the chosen store")
	// ErrNotSupported is exported
	ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
	// ErrNotImplemented is exported
	ErrNotImplemented = errors.New("Call not implemented in current backend")
	// ErrNotReachable is exported
	ErrNotReachable = errors.New("Api not reachable")
	// ErrCannotLock is exported
	ErrCannotLock = errors.New("Error acquiring the lock")
	// ErrWatchDoesNotExist is exported
	ErrWatchDoesNotExist = errors.New("No watch found for specified key")
	// ErrKeyModified is exported
	ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
	// ErrKeyNotFound is exported
	ErrKeyNotFound = errors.New("Key not found in store")
	// ErrPreviousNotSpecified is exported
	ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
)

Functions

This section is empty.

Types

type Backend

type Backend string

Backend represents a KV Store Backend

type Config

type Config struct {
	TLS               *tls.Config
	ConnectionTimeout time.Duration
	EphemeralTTL      time.Duration
}

Config contains the options for a storage client

type Consul

type Consul struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Consul embeds the client and watches

func (*Consul) AtomicDelete

func (s *Consul) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Consul) AtomicPut

func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Consul) Close

func (s *Consul) Close()

Close closes the client connection

func (*Consul) Delete

func (s *Consul) Delete(key string) error

Delete a value at "key"

func (*Consul) DeleteTree

func (s *Consul) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Consul) Exists

func (s *Consul) Exists(key string) (bool, error)

Exists checks that the key exists inside the store

func (*Consul) Get

func (s *Consul) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Consul) List

func (s *Consul) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Consul) NewLock

func (s *Consul) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Consul) Put

func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Consul) Watch

func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Consul) WatchTree

func (s *Consul) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

type Etcd

type Etcd struct {
	// contains filtered or unexported fields
}

Etcd embeds the client

func (*Etcd) AtomicDelete

func (s *Etcd) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Etcd) AtomicPut

func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Etcd) Close

func (s *Etcd) Close()

Close closes the client connection

func (*Etcd) Delete

func (s *Etcd) Delete(key string) error

Delete a value at "key"

func (*Etcd) DeleteTree

func (s *Etcd) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Etcd) Exists

func (s *Etcd) Exists(key string) (bool, error)

Exists checks if the key exists inside the store

func (*Etcd) Get

func (s *Etcd) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Etcd) List

func (s *Etcd) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Etcd) NewLock

func (s *Etcd) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Etcd) Put

func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Etcd) Watch

func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Etcd) WatchTree

func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

type Initialize

type Initialize func(addrs []string, options *Config) (Store, error)

Initialize creates a new Store object, initializing the client

type KVPair

type KVPair struct {
	Key       string
	Value     []byte
	LastIndex uint64
}

KVPair represents {Key, Value, Lastindex} tuple

type LockOptions

type LockOptions struct {
	Value []byte        // Optional, value to associate with the lock
	TTL   time.Duration // Optional, expiration ttl associated with the lock
}

LockOptions contains optional request parameters

type Locker

type Locker interface {
	Lock() (<-chan struct{}, error)
	Unlock() error
}

Locker provides locking mechanism on top of the store. Similar to `sync.Lock` except it may return errors.

type Mock

type Mock struct {
	mock.Mock

	// Endpoints passed to InitializeMock
	Endpoints []string
	// Options passed to InitializeMock
	Options *Config
}

Mock store. Mocks all Store functions using testify.Mock.

func (*Mock) AtomicDelete

func (s *Mock) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete mock

func (*Mock) AtomicPut

func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, *KVPair, error)

AtomicPut mock

func (*Mock) Close

func (s *Mock) Close()

Close mock

func (*Mock) Delete

func (s *Mock) Delete(key string) error

Delete mock

func (*Mock) DeleteTree

func (s *Mock) DeleteTree(prefix string) error

DeleteTree mock

func (*Mock) Exists

func (s *Mock) Exists(key string) (bool, error)

Exists mock

func (*Mock) Get

func (s *Mock) Get(key string) (*KVPair, error)

Get mock

func (*Mock) List

func (s *Mock) List(prefix string) ([]*KVPair, error)

List mock

func (*Mock) NewLock

func (s *Mock) NewLock(key string, options *LockOptions) (Locker, error)

NewLock mock

func (*Mock) Put

func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error

Put mock

func (*Mock) Watch

func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch mock

func (*Mock) WatchTree

func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree mock

type MockLock

type MockLock struct {
	mock.Mock
}

MockLock mock implementation of Locker

func (*MockLock) Lock

func (l *MockLock) Lock() (<-chan struct{}, error)

Lock mock

func (*MockLock) Unlock

func (l *MockLock) Unlock() error

Unlock mock

type Store

type Store interface {
	// Put a value at the specified key
	Put(key string, value []byte, options *WriteOptions) error

	// Get a value given its key
	Get(key string) (*KVPair, error)

	// Delete the value at the specified key
	Delete(key string) error

	// Verify if a Key exists in the store
	Exists(key string) (bool, error)

	// Watch changes on a key.
	// Returns a channel that will receive changes or an error.
	// Upon creating a watch, the current value will be sent to the channel.
	// Providing a non-nil stopCh can be used to stop watching.
	Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

	// WatchTree watches changes on a "directory"
	// Returns a channel that will receive changes or an error.
	// Upon creating a watch, the current value will be sent to the channel.
	// Providing a non-nil stopCh can be used to stop watching.
	WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

	// CreateLock for a given key.
	// The returned Locker is not held and must be acquired with `.Lock`.
	// value is optional.
	NewLock(key string, options *LockOptions) (Locker, error)

	// List the content of a given prefix
	List(prefix string) ([]*KVPair, error)

	// DeleteTree deletes a range of keys based on prefix
	DeleteTree(prefix string) error

	// Atomic operation on a single value
	AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)

	// Atomic delete of a single value
	AtomicDelete(key string, previous *KVPair) (bool, error)

	// Close the store connection
	Close()
}

Store represents the backend K/V storage Each store should support every call listed here. Or it couldn't be implemented as a K/V backend for libkv

func InitializeConsul

func InitializeConsul(endpoints []string, options *Config) (Store, error)

InitializeConsul creates a new Consul client given a list of endpoints and optional tls config

func InitializeEtcd

func InitializeEtcd(addrs []string, options *Config) (Store, error)

InitializeEtcd creates a new Etcd client given a list of endpoints and optional tls config

func InitializeMock

func InitializeMock(endpoints []string, options *Config) (Store, error)

InitializeMock creates a Mock store.

func InitializeZookeeper

func InitializeZookeeper(endpoints []string, options *Config) (Store, error)

InitializeZookeeper creates a new Zookeeper client given a list of endpoints and optional tls config

func NewStore

func NewStore(backend Backend, addrs []string, options *Config) (Store, error)

NewStore creates a an instance of store

type WatchCallback

type WatchCallback func(entries ...*KVPair)

WatchCallback is used for watch methods on keys and is triggered on key change

type WriteOptions

type WriteOptions struct {
	Heartbeat time.Duration
	Ephemeral bool
}

WriteOptions contains optional request parameters

type Zookeeper

type Zookeeper struct {
	// contains filtered or unexported fields
}

Zookeeper embeds the zookeeper client

func (*Zookeeper) AtomicDelete

func (s *Zookeeper) AtomicDelete(key string, previous *KVPair) (bool, error)

AtomicDelete deletes a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Zookeeper) AtomicPut

func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, _ *WriteOptions) (bool, *KVPair, error)

AtomicPut put a value at "key" if the key has not been modified in the meantime, throws an error if this is the case

func (*Zookeeper) Close

func (s *Zookeeper) Close()

Close closes the client connection

func (*Zookeeper) Delete

func (s *Zookeeper) Delete(key string) error

Delete a value at "key"

func (*Zookeeper) DeleteTree

func (s *Zookeeper) DeleteTree(prefix string) error

DeleteTree deletes a range of keys based on prefix

func (*Zookeeper) Exists

func (s *Zookeeper) Exists(key string) (bool, error)

Exists checks if the key exists inside the store

func (*Zookeeper) Get

func (s *Zookeeper) Get(key string) (*KVPair, error)

Get the value at "key", returns the last modified index to use in conjunction to CAS calls

func (*Zookeeper) List

func (s *Zookeeper) List(prefix string) ([]*KVPair, error)

List the content of a given prefix

func (*Zookeeper) NewLock

func (s *Zookeeper) NewLock(key string, options *LockOptions) (Locker, error)

NewLock returns a handle to a lock struct which can be used to acquire and release the mutex.

func (*Zookeeper) Put

func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error

Put a value at "key"

func (*Zookeeper) Watch

func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)

Watch changes on a key. Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

func (*Zookeeper) WatchTree

func (s *Zookeeper) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPair, error)

WatchTree watches changes on a "directory" Returns a channel that will receive changes or an error. Upon creating a watch, the current value will be sent to the channel. Providing a non-nil stopCh can be used to stop watching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL