curator

package module
v0.0.0-...-366ffb7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2015 License: MIT Imports: 22 Imported by: 23

README

What is Curator?

Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.

curator

What is curator.go?

Curator.go is a Golang porting for Curator, which base on the go-zookeeper.

Getting Started

Learn ZooKeeper

Curator.go users are assumed to know ZooKeeper. A good place to start is ZooKeeper Getting Started Guide

Install Curator.go

$ go get github.com/flier/curator.go

Using Curator

The Curator.go are available from github.com. You can easily include Curator.go into your code.

import (
	"github.com/flier/curator.go"
)

Getting a Connection

Curator uses Fluent Style. If you haven't used this before, it might seem odd so it's suggested that you familiarize yourself with the style.

Curator connection instances (CuratorFramework) are allocated from the CuratorFrameworkBuilder. You only need one CuratorFramework object for each ZooKeeper cluster you are connecting to:

curator.NewClient(connString, retryPolicy)

This will create a connection to a ZooKeeper cluster using default values. The only thing that you need to specify is the retry policy. For most cases, you should use:

retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second)

client := curator.NewClient(connString, retryPolicy)

client.Start()
defer client.Close()	

The client must be started (and closed when no longer needed).

Calling ZooKeeper Directly

Once you have a CuratorFramework instance, you can make direct calls to ZooKeeper in a similar way to using the raw ZooKeeper object provided in the ZooKeeper distribution. E.g.:

client.Create().ForPathWithData(path, payload)

The benefit here is that Curator manages the ZooKeeper connection and will retry operations if there are connection problems.

Recipes

Distributed Lock
lock := curator.NewInterProcessMutex(client, lockPath)

if ( lock.Acquire(maxWait, waitUnit) ) 
{
    defer lock.Release()

    // do some work inside of the critical section here
}
Leader Election
listener := curator.NewLeaderSelectorListener(func(CuratorFramework client) error {
    // this callback will get called when you are the leader
    // do whatever leader work you need to and only exit
    // this method when you want to relinquish leadership
}))

selector := curator.NewLeaderSelector(client, path, listener)
selector.AutoRequeue()  // not required, but this is behavior that you will probably expect
selector.Start()

Examples

This module contains example usages of various Curator features. Each directory in the module is a separate example.

  • leader Example leader selector code
  • cache Example PathChildrenCache usage
  • locking Example of using InterProcessMutex
  • discovery Example usage of the Curator's ServiceDiscovery
  • framework A few examples of how to use the CuratorFramework class

See the examples source repo for each example.

Components

  • Recipes Implementations of some of the common ZooKeeper "recipes". The implementations are built on top of the Curator Framework.
  • Framework The Curator Framework is a high-level API that greatly simplifies using ZooKeeper. It adds many features that build on ZooKeeper and handles the complexity of managing connections to the ZooKeeper cluster and retrying operations.
  • Utilities Various utilities that are useful when using ZooKeeper.
  • Client A replacement for the bundled ZooKeeper class that takes care of some low-level housekeeping and provides some useful utilities.
  • Errors How Curator deals with errors, connection issues, recoverable exceptions, etc.

Documentation

Overview

Curator.go is a Golang porting for Curator, make it easy to access Zookeeper

Learn ZooKeeper

Curator.go users are assumed to know ZooKeeper. A good place to start is http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html

Using Curator

The Curator.go are available from github.com.

$ go get github.com/flier/curator.go

You can easily include Curator.go into your code.

import (
    "github.com/flier/curator.go"
)

Getting a Connection

Curator uses Fluent Style. If you haven't used this before, it might seem odd so it's suggested that you familiarize yourself with the style.

Curator connection instances (CuratorFramework) are allocated from the CuratorFrameworkBuilder. You only need one CuratorFramework object for each ZooKeeper cluster you are connecting to:

curator.NewClient(connString, retryPolicy)

This will create a connection to a ZooKeeper cluster using default values. The only thing that you need to specify is the retry policy. For most cases, you should use:

retryPolicy := curator.NewExponentialBackoffRetry(time.Second, 3, 15*time.Second)

client := curator.NewClient(connString, retryPolicy)

client.Start()
defer client.Close()

The client must be started (and closed when no longer needed).

Calling ZooKeeper Directly

Once you have a CuratorFramework instance, you can make direct calls to ZooKeeper in a similar way to using the raw ZooKeeper object provided in the ZooKeeper distribution. E.g.:

client.Create().ForPathWithData(path, payload)

The benefit here is that Curator manages the ZooKeeper connection and will retry operations if there are connection problems.

Recipes

Distributed Lock

lock := curator.NewInterProcessMutex(client, lockPath)

if ( lock.Acquire(maxWait, waitUnit) )
{
    defer lock.Release()

    // do some work inside of the critical section here
}

Leader Election

listener := curator.NewLeaderSelectorListener(func(CuratorFramework client) error {
    // this callback will get called when you are the leader
    // do whatever leader work you need to and only exit
    // this method when you want to relinquish leadership
}))

selector := curator.NewLeaderSelector(client, path, listener)
selector.AutoRequeue()  // not required, but this is behavior that you will probably expect
selector.Start()

Generic API

Curator provides generic API for builder

type Pathable[T] interface {
    // Commit the currently building operation using the given path
    ForPath(path string) (T, error)
}

type PathAndBytesable[T] interface {
    Pathable[T]

    // Commit the currently building operation using the given path and data
    ForPathWithData(path string, payload []byte) (T, error)
}

type Compressible[T] interface {
    // Cause the data to be compressed using the configured compression provider
    Compressed() T
}

type Decompressible[T] interface {
    // Cause the data to be de-compressed using the configured compression provider
    Decompressed() T
}

type CreateModable[T] interface {
    // Set a create mode - the default is CreateMode.PERSISTENT
    WithMode(mode CreateMode) T
}

type ACLable[T] interface {
    // Set an ACL list
    WithACL(acl ...zk.ACL) T
}

type Versionable[T] interface {
    // Use the given version (the default is -1)
    WithVersion(version int) T
}

type Statable[T] interface {
    // Have the operation fill the provided stat object
    StoringStatIn(*zk.Stat) T
}

type ParentsCreatable[T] interface {
    // Causes any parent nodes to get created if they haven't already been
    CreatingParentsIfNeeded() T
}

type ChildrenDeletable[T] interface {
    // Will also delete children if they exist.
    DeletingChildrenIfNeeded() T
}

type Watchable[T] interface {
    // Have the operation set a watch
    Watched() T

    // Set a watcher for the operation
    UsingWatcher(watcher Watcher) T
}

type Backgroundable[T] interface {
    // Perform the action in the background
    InBackground() T

    // Perform the action in the background
    InBackgroundWithContext(context interface{}) T

    // Perform the action in the background
    InBackgroundWithCallback(callback BackgroundCallback) T

    // Perform the action in the background
    InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) T
}

Index

Constants

View Source
const (
	PERSISTENT            CreateMode = 0
	PERSISTENT_SEQUENTIAL            = zk.FlagSequence
	EPHEMERAL                        = zk.FlagEphemeral
	EPHEMERAL_SEQUENTIAL             = zk.FlagEphemeral + zk.FlagSequence
)
View Source
const (
	DEFAULT_SESSION_TIMEOUT    = 60 * time.Second
	DEFAULT_CONNECTION_TIMEOUT = 15 * time.Second
	DEFAULT_CLOSE_WAIT         = 1 * time.Second
)
View Source
const (
	MAX_RETRIES_LIMIT               = 29
	DEFAULT_MAX_SLEEP time.Duration = time.Duration(math.MaxInt32 * int64(time.Second))
)
View Source
const AnyVersion int32 = -1
View Source
const MAX_BACKGROUND_ERRORS = 10
View Source
const (
	PATH_SEPARATOR = "/"
)
View Source
const STATE_QUEUE_SIZE = 25

Variables

View Source
var (
	OPEN_ACL_UNSAFE = zk.WorldACL(zk.PermAll)
	CREATOR_ALL_ACL = zk.AuthACL(zk.PermAll)
	READ_ACL_UNSAFE = zk.WorldACL(zk.PermRead)
)
View Source
var (
	ErrConnectionClosed        = zk.ErrConnectionClosed
	ErrUnknown                 = zk.ErrUnknown
	ErrAPIError                = zk.ErrAPIError
	ErrNoNode                  = zk.ErrNoNode
	ErrNoAuth                  = zk.ErrNoAuth
	ErrBadVersion              = zk.ErrBadVersion
	ErrNoChildrenForEphemerals = zk.ErrNoChildrenForEphemerals
	ErrNodeExists              = zk.ErrNodeExists
	ErrNotEmpty                = zk.ErrNotEmpty
	ErrSessionExpired          = zk.ErrSessionExpired
	ErrInvalidACL              = zk.ErrInvalidACL
	ErrAuthFailed              = zk.ErrAuthFailed
	ErrClosing                 = zk.ErrClosing
	ErrNothing                 = zk.ErrNothing
	ErrSessionMoved            = zk.ErrSessionMoved
)
View Source
var (
	EventNodeCreated         = zk.EventNodeCreated
	EventNodeDeleted         = zk.EventNodeDeleted
	EventNodeDataChanged     = zk.EventNodeDataChanged
	EventNodeChildrenChanged = zk.EventNodeChildrenChanged
)
View Source
var (
	ErrConnectionLoss = errors.New("connection loss")
	ErrTimeout        = errors.New("timeout")
)
View Source
var (
	CompressionProviders = map[string]CompressionProvider{
		"gzip": NewGzipCompressionProvider(),
		"lz4":  NewLZ4CompressionProvider(),
	}
)
View Source
var CuratorEventTypeNames = []string{"CREATE", "DELETE", "EXISTS", "GET_DATA", "SET_DATA", "CHILDREN", "SYNC", "GET_ACL", "SET_ACL", "WATCHED", "CLOSING"}

Functions

func CloseQuietly

func CloseQuietly(closeable Closeable) (err error)

func DeleteChildren

func DeleteChildren(conn ZookeeperConnection, path string, deleteSelf bool) error

Recursively deletes children of a node.

func FixForNamespace

func FixForNamespace(namespace, path string, isSequential bool) (string, error)

Apply the namespace to the given path

func GetNodeFromPath

func GetNodeFromPath(path string) string

Given a full path, return the node name. i.e. "/one/two/three" will return "three"

func JoinPath

func JoinPath(parent string, children ...string) string

Given a parent and a child node, join them in the given path

func MakeDirs

func MakeDirs(conn ZookeeperConnection, path string, makeLastNode bool, aclProvider ACLProvider) error

Make sure all the nodes in the path are created

func NewCuratorZookeeperClient

func NewCuratorZookeeperClient(zookeeperDialer ZookeeperDialer, ensembleProvider EnsembleProvider, sessionTimeout, connectionTimeout time.Duration,
	watcher Watcher, retryPolicy RetryPolicy, canReadOnly bool, authInfos []AuthInfo) *curatorZookeeperClient

func NewEnsurePath

func NewEnsurePath(path string) *ensurePath

func NewEnsurePathWithAcl

func NewEnsurePathWithAcl(path string, aclProvider ACLProvider) *ensurePath

func NewEnsurePathWithAclAndHelper

func NewEnsurePathWithAclAndHelper(path string, aclProvider ACLProvider, helper EnsurePathHelper) *ensurePath

func ValidatePath

func ValidatePath(path string) error

Validate the provided znode path string

Types

type ACLProvider

type ACLProvider interface {
	// Return the ACL list to use by default
	GetDefaultAcl() []zk.ACL

	// Return the ACL list to use for the given path
	GetAclForPath(path string) []zk.ACL
}

func NewDefaultACLProvider

func NewDefaultACLProvider() ACLProvider

type AtomicBool

type AtomicBool int32
const (
	FALSE AtomicBool = iota
	TRUE
)

func NewAtomicBool

func NewAtomicBool(b bool) AtomicBool

func (*AtomicBool) CompareAndSwap

func (b *AtomicBool) CompareAndSwap(oldValue, newValue bool) bool

func (*AtomicBool) Load

func (b *AtomicBool) Load() bool

func (*AtomicBool) Set

func (b *AtomicBool) Set(v bool)

func (*AtomicBool) Swap

func (b *AtomicBool) Swap(v bool) bool

type AuthInfo

type AuthInfo struct {
	Scheme string
	Auth   []byte
}

type BackgroundCallback

type BackgroundCallback func(client CuratorFramework, event CuratorEvent) error

Called when the async background operation completes

type CheckExistsBuilder

type CheckExistsBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) (*zk.Stat, error)

	// Watchable[T]
	//
	// Have the operation set a watch
	Watched() CheckExistsBuilder

	// Set a watcher for the operation
	UsingWatcher(watcher Watcher) CheckExistsBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() CheckExistsBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) CheckExistsBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) CheckExistsBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) CheckExistsBuilder
}

type Closeable

type Closeable interface {
	// Closes this and releases any system resources associated with it.
	Close() error
}

A Closeable is a source or destination of data that can be closed.

type CompressionProvider

type CompressionProvider interface {
	Compress(path string, data []byte) ([]byte, error)

	Decompress(path string, compressedData []byte) ([]byte, error)
}

type ConnectionState

type ConnectionState int32
const (
	UNKNOWN     ConnectionState = iota
	CONNECTED                   // Sent for the first successful connection to the server.
	SUSPENDED                   // There has been a loss of connection. Leaders, locks, etc.
	RECONNECTED                 // A suspended, lost, or read-only connection has been re-established
	LOST                        // The connection is confirmed to be lost. Close any locks, leaders, etc.
	READ_ONLY                   // The connection has gone into read-only mode.
)

func (ConnectionState) Connected

func (s ConnectionState) Connected() bool

func (ConnectionState) String

func (s ConnectionState) String() string

type ConnectionStateListenable

type ConnectionStateListenable interface {
	Listenable /* [T] */

	AddListener(listener ConnectionStateListener)

	RemoveListener(listener ConnectionStateListener)
}

type ConnectionStateListener

type ConnectionStateListener interface {
	// Called when there is a state change in the connection
	StateChanged(client CuratorFramework, newState ConnectionState)
}

func NewConnectionStateListener

func NewConnectionStateListener(callback connectionStateListenerCallback) ConnectionStateListener

type CreateBuilder

type CreateBuilder interface {
	// PathAndBytesable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) (string, error)

	// Commit the currently building operation using the given path and data
	ForPathWithData(path string, payload []byte) (string, error)

	// ParentsCreatable[T]
	//
	// Causes any parent nodes to get created if they haven't already been
	CreatingParentsIfNeeded() CreateBuilder

	// CreateModable[T]
	//
	// Set a create mode - the default is CreateMode.PERSISTENT
	WithMode(mode CreateMode) CreateBuilder

	// ACLable[T]
	//
	// Set an ACL list
	WithACL(acls ...zk.ACL) CreateBuilder

	// Compressible[T]
	//
	// Cause the data to be compressed using the configured compression provider
	Compressed() CreateBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() CreateBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) CreateBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) CreateBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) CreateBuilder
}

type CreateMode

type CreateMode int32

func (CreateMode) IsEphemeral

func (m CreateMode) IsEphemeral() bool

func (CreateMode) IsSequential

func (m CreateMode) IsSequential() bool

type CuratorEvent

type CuratorEvent interface {
	// check here first - this value determines the type of event and which methods will have valid values
	Type() CuratorEventType

	// "rc" from async callbacks
	Err() error

	// the path
	Path() string

	// the context object passed to Backgroundable.InBackground(interface{})
	Context() interface{}

	// any stat
	Stat() *zk.Stat

	// any data
	Data() []byte

	// any name
	Name() string

	// any children
	Children() []string

	// any ACL list or null
	ACLs() []zk.ACL

	WatchedEvent() *zk.Event
}

A super set of all the various Zookeeper events/background methods.

type CuratorEventType

type CuratorEventType int
const (
	CREATE   CuratorEventType = iota // CuratorFramework.Create() -> Err(), Path(), Data()
	DELETE                           // CuratorFramework.Delete() -> Err(), Path()
	EXISTS                           // CuratorFramework.CheckExists() -> Err(), Path(), Stat()
	GET_DATA                         // CuratorFramework.GetData() -> Err(), Path(), Stat(), Data()
	SET_DATA                         // CuratorFramework.SetData() -> Err(), Path(), Stat()
	CHILDREN                         // CuratorFramework.GetChildren() -> Err(), Path(), Stat(), Children()
	SYNC                             // CuratorFramework.Sync() -> Err(), Path()
	GET_ACL                          // CuratorFramework.GetACL() -> Err(), Path()
	SET_ACL                          // CuratorFramework.SetACL() -> Err(), Path()
	WATCHED                          // Watchable.UsingWatcher() -> WatchedEvent()
	CLOSING                          // Event sent when client is being closed
)

func (CuratorEventType) String

func (t CuratorEventType) String() string

type CuratorFramework

type CuratorFramework interface {
	// Start the client.
	// Most mutator methods will not work until the client is started
	Start() error

	// Stop the client
	Close() error

	// Returns the state of this instance
	State() State

	// Return true if the client is started, not closed, etc.
	Started() bool

	// Start a create builder
	Create() CreateBuilder

	// Start a delete builder
	Delete() DeleteBuilder

	// Start an exists builder
	CheckExists() CheckExistsBuilder

	// Start a get data builder
	GetData() GetDataBuilder

	// Start a set data builder
	SetData() SetDataBuilder

	// Start a get children builder
	GetChildren() GetChildrenBuilder

	// Start a get ACL builder
	GetACL() GetACLBuilder

	// Start a set ACL builder
	SetACL() SetACLBuilder

	// Start a transaction builder
	InTransaction() Transaction

	// Perform a sync on the given path - syncs are always in the background
	DoSync(path string, backgroundContextObject interface{})

	//  Start a sync builder. Note: sync is ALWAYS in the background even if you don't use one of the background() methods
	Sync() SyncBuilder

	// Returns the listenable interface for the Connect State
	ConnectionStateListenable() ConnectionStateListenable

	// Returns the listenable interface for events
	CuratorListenable() CuratorListenable

	// Returns the listenable interface for unhandled errors
	UnhandledErrorListenable() UnhandledErrorListenable

	// Returns a facade of the current instance that does _not_ automatically pre-pend the namespace to all paths
	NonNamespaceView() CuratorFramework

	// Returns a facade of the current instance that uses the specified namespace
	// or no namespace if newNamespace is empty.
	UsingNamespace(newNamespace string) CuratorFramework

	// Return the current namespace or "" if none
	Namespace() string

	// Return the managed zookeeper client
	ZookeeperClient() CuratorZookeeperClient

	// Allocates an ensure path instance that is namespace aware
	NewNamespaceAwareEnsurePath(path string) EnsurePath

	// Block until a connection to ZooKeeper is available.
	BlockUntilConnected() error

	// Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
	BlockUntilConnectedTimeout(maxWaitTime time.Duration) error
}

Zookeeper framework-style client

func NewClient

func NewClient(connString string, retryPolicy RetryPolicy) CuratorFramework

Create a new client with default session timeout and default connection timeout

func NewClientTimeout

func NewClientTimeout(connString string, sessionTimeout, connectionTimeout time.Duration, retryPolicy RetryPolicy) CuratorFramework

Create a new client

type CuratorFrameworkBuilder

type CuratorFrameworkBuilder struct {
	AuthInfos           []AuthInfo          // the connection authorization
	ZookeeperDialer     ZookeeperDialer     // the zookeeper dialer to use
	EnsembleProvider    EnsembleProvider    // the list ensemble provider.
	DefaultData         []byte              // the data to use when PathAndBytesable.ForPath(String) is used.
	Namespace           string              // as ZooKeeper is a shared space, users of a given cluster should stay within a pre-defined namespace
	SessionTimeout      time.Duration       // the session timeout
	ConnectionTimeout   time.Duration       // the connection timeout
	MaxCloseWait        time.Duration       // the time to wait during close to wait background tasks
	RetryPolicy         RetryPolicy         // the retry policy to use
	CompressionProvider CompressionProvider // the compression provider
	AclProvider         ACLProvider         // the provider for ACLs
	CanBeReadOnly       bool                // allow ZooKeeper client to enter read only mode in case of a network partition.
}

func (*CuratorFrameworkBuilder) Authorization

func (b *CuratorFrameworkBuilder) Authorization(scheme string, auth []byte) *CuratorFrameworkBuilder

Add connection authorization

func (*CuratorFrameworkBuilder) Build

Apply the current values and build a new CuratorFramework

func (*CuratorFrameworkBuilder) Compression

Add compression provider

func (*CuratorFrameworkBuilder) ConnectString

func (b *CuratorFrameworkBuilder) ConnectString(connectString string) *CuratorFrameworkBuilder

Set the list of servers to connect to.

type CuratorListenable

type CuratorListenable interface {
	Listenable /* [T] */

	AddListener(listener CuratorListener)

	RemoveListener(listener CuratorListener)
}

type CuratorListener

type CuratorListener interface {
	// Called when a background task has completed or a watch has triggered
	EventReceived(client CuratorFramework, event CuratorEvent) error
}

Receives notifications about errors and background events

func NewCuratorListener

func NewCuratorListener(callback curatorListenerCallback) CuratorListener

type CuratorZookeeperClient

type CuratorZookeeperClient interface {
	// Return the managed ZK connection.
	Conn() (ZookeeperConnection, error)

	// Return the current retry policy
	RetryPolicy() RetryPolicy

	// Return a new retry loop. All operations should be performed in a retry loop
	NewRetryLoop() RetryLoop

	// Returns true if the client is current connected
	Connected() bool

	// This method blocks until the connection to ZK succeeds.
	BlockUntilConnectedOrTimedOut() error

	// Must be called after created
	Start() error

	// Close the client
	Close() error

	// Start a new tracer
	StartTracer(name string) Tracer
}

A wrapper around Zookeeper that takes care of some low-level housekeeping

type DefaultZookeeperDialer

type DefaultZookeeperDialer struct {
	Dialer zk.Dialer
}

func (*DefaultZookeeperDialer) Dial

func (d *DefaultZookeeperDialer) Dial(connString string, sessionTimeout time.Duration, canBeReadOnly bool) (ZookeeperConnection, <-chan zk.Event, error)

type DeleteBuilder

type DeleteBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) error

	// ChildrenDeletable[T]
	//
	// Will also delete children if they exist.
	DeletingChildrenIfNeeded() DeleteBuilder

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) DeleteBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() DeleteBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) DeleteBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) DeleteBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) DeleteBuilder
}

type EnsembleProvider

type EnsembleProvider interface {
	// Curator will call this method when CuratorZookeeperClient.Start() is called
	Start() error

	// Curator will call this method when CuratorZookeeperClient.Close() is called
	Close() error

	// Return the current connection string to use
	ConnectionString() string
}

Abstraction that provides the ZooKeeper connection string

type EnsurePath

type EnsurePath interface {
	// First time, synchronizes and makes sure all nodes in the path are created.
	// Subsequent calls with this instance are NOPs.
	Ensure(client CuratorZookeeperClient) error

	// Returns a view of this EnsurePath instance that does not make the last node.
	ExcludingLast() EnsurePath
}

type EnsurePathHelper

type EnsurePathHelper interface {
	Ensure(client CuratorZookeeperClient, path string, makeLastNode bool) error
}

type ExponentialBackoffRetry

type ExponentialBackoffRetry struct {
	SleepingRetry
}

Retry policy that retries a set number of times with increasing sleep time between retries

func NewExponentialBackoffRetry

func NewExponentialBackoffRetry(baseSleepTime time.Duration, maxRetries int, maxSleep time.Duration) *ExponentialBackoffRetry

type FixedEnsembleProvider

type FixedEnsembleProvider struct {
	// contains filtered or unexported fields
}

Standard ensemble provider that wraps a fixed connection string

func NewFixedEnsembleProvider

func NewFixedEnsembleProvider(connectString string) *FixedEnsembleProvider

func (*FixedEnsembleProvider) Close

func (p *FixedEnsembleProvider) Close() error

func (*FixedEnsembleProvider) ConnectionString

func (p *FixedEnsembleProvider) ConnectionString() string

func (*FixedEnsembleProvider) Start

func (p *FixedEnsembleProvider) Start() error

type GetACLBuilder

type GetACLBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) ([]zk.ACL, error)

	// Statable[T]
	//
	// Have the operation fill the provided stat object
	StoringStatIn(stat *zk.Stat) GetACLBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() GetACLBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) GetACLBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) GetACLBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetACLBuilder
}

type GetChildrenBuilder

type GetChildrenBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) ([]string, error)

	// Statable[T]
	//
	// Have the operation fill the provided stat object
	StoringStatIn(stat *zk.Stat) GetChildrenBuilder

	// Watchable[T]
	//
	// Have the operation set a watch
	Watched() GetChildrenBuilder

	// Set a watcher for the operation
	UsingWatcher(watcher Watcher) GetChildrenBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() GetChildrenBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) GetChildrenBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) GetChildrenBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetChildrenBuilder
}

type GetDataBuilder

type GetDataBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) ([]byte, error)

	// Decompressible[T]
	//
	// Cause the data to be de-compressed using the configured compression provider
	Decompressed() GetDataBuilder

	// Statable[T]
	//
	// Have the operation fill the provided stat object
	StoringStatIn(stat *zk.Stat) GetDataBuilder

	// Watchable[T]
	//
	// Have the operation set a watch
	Watched() GetDataBuilder

	// Set a watcher for the operation
	UsingWatcher(watcher Watcher) GetDataBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() GetDataBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) GetDataBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) GetDataBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) GetDataBuilder
}

type GzipCompressionProvider

type GzipCompressionProvider struct {
	// contains filtered or unexported fields
}

func NewGzipCompressionProvider

func NewGzipCompressionProvider() *GzipCompressionProvider

func NewGzipCompressionProviderWithLevel

func NewGzipCompressionProviderWithLevel(level int) *GzipCompressionProvider

func (*GzipCompressionProvider) Compress

func (c *GzipCompressionProvider) Compress(path string, data []byte) ([]byte, error)

func (*GzipCompressionProvider) Decompress

func (c *GzipCompressionProvider) Decompress(path string, compressedData []byte) ([]byte, error)

type LZ4CompressionProvider

type LZ4CompressionProvider struct{}

func NewLZ4CompressionProvider

func NewLZ4CompressionProvider() *LZ4CompressionProvider

func (*LZ4CompressionProvider) Compress

func (c *LZ4CompressionProvider) Compress(path string, data []byte) ([]byte, error)

func (*LZ4CompressionProvider) Decompress

func (c *LZ4CompressionProvider) Decompress(path string, compressedData []byte) ([]byte, error)

type Listenable

type Listenable interface {
	Len() int

	Clear()

	ForEach(callback func(interface{}))
}

Abstracts a listenable object

type ListenerContainer

type ListenerContainer struct {
	// contains filtered or unexported fields
}

func (*ListenerContainer) Add

func (c *ListenerContainer) Add(listener interface{})

func (*ListenerContainer) Clear

func (c *ListenerContainer) Clear()

func (*ListenerContainer) ForEach

func (c *ListenerContainer) ForEach(callback func(interface{}))

func (*ListenerContainer) Len

func (c *ListenerContainer) Len() int

func (*ListenerContainer) Remove

func (c *ListenerContainer) Remove(listener interface{})

type OperationType

type OperationType int

Transaction operation types

const (
	OP_CREATE OperationType = iota
	OP_DELETE
	OP_SET_DATA
	OP_CHECK
)

type PathAndNode

type PathAndNode struct {
	Path, Node string
}

func SplitPath

func SplitPath(path string) (*PathAndNode, error)

Given a full path, return the the individual parts, without slashes.

type RetryLoop

type RetryLoop interface {
	// creates a retry loop calling the given proc and retrying if needed
	CallWithRetry(proc func() (interface{}, error)) (interface{}, error)
}

Mechanism to perform an operation on Zookeeper that is safe against disconnections and "recoverable" errors.

type RetryNTimes

type RetryNTimes struct {
	SleepingRetry
}

Retry policy that retries a max number of times

func NewRetryNTimes

func NewRetryNTimes(n int, sleepBetweenRetries time.Duration) *RetryNTimes

type RetryOneTime

type RetryOneTime struct {
	RetryNTimes
}

A retry policy that retries only once

func NewRetryOneTime

func NewRetryOneTime(sleepBetweenRetry time.Duration) *RetryOneTime

type RetryPolicy

type RetryPolicy interface {
	// Called when an operation has failed for some reason.
	// This method should return true to make another attempt.
	AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool
}

Abstracts the policy to use when retrying connections

type RetrySleeper

type RetrySleeper interface {
	// Sleep for the given time
	SleepFor(time time.Duration) error
}

Abstraction for retry policies to sleep

var DefaultRetrySleeper RetrySleeper = &defaultRetrySleeper{}

type RetryUntilElapsed

type RetryUntilElapsed struct {
	SleepingRetry
	// contains filtered or unexported fields
}

A retry policy that retries until a given amount of time elapses

func NewRetryUntilElapsed

func NewRetryUntilElapsed(maxElapsedTime, sleepBetweenRetries time.Duration) *RetryUntilElapsed

func (*RetryUntilElapsed) AllowRetry

func (r *RetryUntilElapsed) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool

type SetACLBuilder

type SetACLBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) (*zk.Stat, error)

	// ACLable[T]
	//
	// Set an ACL list
	WithACL(acls ...zk.ACL) SetACLBuilder

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) SetACLBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() SetACLBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) SetACLBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) SetACLBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SetACLBuilder
}

type SetDataBuilder

type SetDataBuilder interface {
	// PathAndBytesable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) (*zk.Stat, error)

	// Commit the currently building operation using the given path and data
	ForPathWithData(path string, payload []byte) (*zk.Stat, error)

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) SetDataBuilder

	// Compressible[T]
	//
	// Cause the data to be compressed using the configured compression provider
	Compressed() SetDataBuilder

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() SetDataBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) SetDataBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) SetDataBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SetDataBuilder
}

type SleepingRetry

type SleepingRetry struct {
	RetryPolicy

	N int
	// contains filtered or unexported fields
}

func (*SleepingRetry) AllowRetry

func (r *SleepingRetry) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool

type State

type State int32
const (
	LATENT  State = iota // Start() has not yet been called
	STARTED              // Start() has been called
	STOPPED              // Close() has been called
)

func (*State) Change

func (s *State) Change(oldState, newState State) bool

func (State) Check

func (s State) Check(state State, msg string)

func (*State) Value

func (s *State) Value() State

type SyncBuilder

type SyncBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) (string, error)

	// Backgroundable[T]
	//
	// Perform the action in the background
	InBackground() SyncBuilder

	// Perform the action in the background
	InBackgroundWithContext(context interface{}) SyncBuilder

	// Perform the action in the background
	InBackgroundWithCallback(callback BackgroundCallback) SyncBuilder

	// Perform the action in the background
	InBackgroundWithCallbackAndContext(callback BackgroundCallback, context interface{}) SyncBuilder
}

type Tracer

type Tracer interface {
	Commit()
}

type TracerDriver

type TracerDriver interface {
	// Record the given trace event
	AddTime(name string, d time.Duration)

	// Add to a named counter
	AddCount(name string, increment int)
}

Mechanism for timing methods and recording counters

type Transaction

type Transaction interface {
	// Start a create builder in the transaction
	Create() TransactionCreateBuilder

	// Start a delete builder in the transaction
	Delete() TransactionDeleteBuilder

	// Start a set data builder in the transaction
	SetData() TransactionSetDataBuilder

	// Start a check builder in the transaction
	Check() TransactionCheckBuilder
}

Transactional/atomic operations.

The general form for this interface is:

		curator.InTransaction().operation().arguments().ForPath(...).
             And().more-operations.
             And().Commit()

Here's an example that creates two nodes in a transaction

		curator.InTransaction().
             Create().ForPathWithData("/path-one", path-one-data).
             And().Create().ForPathWithData("/path-two", path-two-data).
             And().Commit()

<b>Important:</b> the operations are not submitted until CuratorTransactionFinal.Commit() is called.

type TransactionBridge

type TransactionBridge interface {
	TransactionFinal

	And() TransactionFinal
}

Syntactic sugar to make the fluent interface more readable

type TransactionCheckBuilder

type TransactionCheckBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) TransactionBridge

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) TransactionCheckBuilder
}

type TransactionCreateBuilder

type TransactionCreateBuilder interface {
	// PathAndBytesable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) TransactionBridge

	// Commit the currently building operation using the given path and data
	ForPathWithData(path string, payload []byte) TransactionBridge

	// CreateModable[T]
	//
	// Set a create mode - the default is CreateMode.PERSISTENT
	WithMode(mode CreateMode) TransactionCreateBuilder

	// ACLable[T]
	//
	// Set an ACL list
	WithACL(acls ...zk.ACL) TransactionCreateBuilder

	// Compressible[T]
	//
	// Cause the data to be compressed using the configured compression provider
	Compressed() TransactionCreateBuilder
}

type TransactionDeleteBuilder

type TransactionDeleteBuilder interface {
	// Pathable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) TransactionBridge

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) TransactionDeleteBuilder
}

type TransactionFinal

type TransactionFinal interface {
	Transaction

	// Commit all added operations as an atomic unit and return results for the operations.
	// One result is returned for each operation added.
	// Further, the ordering of the results matches the ordering that the operations were added.
	Commit() ([]TransactionResult, error)
}

Adds commit to the transaction interface

type TransactionResult

type TransactionResult struct {
	Type       OperationType
	ForPath    string
	ResultPath string
	ResultStat *zk.Stat
}

Holds the result of one transactional operation

type TransactionSetDataBuilder

type TransactionSetDataBuilder interface {
	// PathAndBytesable[T]
	//
	// Commit the currently building operation using the given path
	ForPath(path string) TransactionBridge

	// Commit the currently building operation using the given path and data
	ForPathWithData(path string, payload []byte) TransactionBridge

	// Versionable[T]
	//
	// Use the given version (the default is -1)
	WithVersion(version int32) TransactionSetDataBuilder

	// Compressible[T]
	//
	// Cause the data to be compressed using the configured compression provider
	Compressed() TransactionSetDataBuilder
}

type UnhandledErrorListenable

type UnhandledErrorListenable interface {
	Listenable /* [T] */

	AddListener(listener UnhandledErrorListener)

	RemoveListener(listener UnhandledErrorListener)
}

type UnhandledErrorListener

type UnhandledErrorListener interface {
	// Called when an exception is caught in a background thread, handler, etc.
	UnhandledError(err error)
}

func NewUnhandledErrorListener

func NewUnhandledErrorListener(callback unhandledErrorListenerCallback) UnhandledErrorListener

type Watcher

type Watcher interface {
	// contains filtered or unexported methods
}

func NewWatcher

func NewWatcher(fn func(event *zk.Event)) Watcher

type Watchers

type Watchers struct {
	// contains filtered or unexported fields
}

func NewWatchers

func NewWatchers(watchers ...Watcher) *Watchers

func (*Watchers) Add

func (w *Watchers) Add(watcher Watcher) Watcher

func (*Watchers) Fire

func (w *Watchers) Fire(event *zk.Event)

func (*Watchers) Len

func (w *Watchers) Len() int

func (*Watchers) Remove

func (w *Watchers) Remove(watcher Watcher) Watcher

func (*Watchers) Watch

func (w *Watchers) Watch(events <-chan zk.Event)

type ZookeeperConnection

type ZookeeperConnection interface {
	// Add the specified scheme:auth information to this connection.
	AddAuth(scheme string, auth []byte) error

	// Close this connection
	Close()

	// Create a node with the given path.
	//
	// The node data will be the given data, and node acl will be the given acl.
	Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)

	// Return the stat of the node of the given path. Return nil if no such a node exists.
	Exists(path string) (bool, *zk.Stat, error)

	ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

	// Delete the node with the given path.
	//
	// The call will succeed if such a node exists,
	// and the given version matches the node's version
	// (if the given version is -1, it matches any node's versions).
	Delete(path string, version int32) error

	// Return the data and the stat of the node of the given path.
	Get(path string) ([]byte, *zk.Stat, error)

	GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

	// Set the ACL for the node of the given path
	// if such a node exists and the given version matches the version of the node.
	// Return the stat of the node.
	Set(path string, data []byte, version int32) (*zk.Stat, error)

	// Return the list of the children of the node of the given path.
	Children(path string) ([]string, *zk.Stat, error)

	ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)

	// Return the ACL and stat of the node of the given path.
	GetACL(path string) ([]zk.ACL, *zk.Stat, error)

	// Set the ACL for the node of the given path
	// if such a node exists and the given version matches the version of the node.
	// Return the stat of the node.
	SetACL(path string, acl []zk.ACL, version int32) (*zk.Stat, error)

	// Executes multiple ZooKeeper operations or none of them.
	Multi(ops ...interface{}) ([]zk.MultiResponse, error)

	// Flushes channel between process and leader.
	Sync(path string) (string, error)
}

type ZookeeperDialFunc

type ZookeeperDialFunc func(connString string, sessionTimeout time.Duration, canBeReadOnly bool) (ZookeeperConnection, <-chan zk.Event, error)

type ZookeeperDialer

type ZookeeperDialer interface {
	Dial(connString string, sessionTimeout time.Duration, canBeReadOnly bool) (ZookeeperConnection, <-chan zk.Event, error)
}

Allocate a new ZooKeeper connection

func NewZookeeperDialer

func NewZookeeperDialer(dial ZookeeperDialFunc) ZookeeperDialer

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL