curator

package module
v0.0.0-...-d41e26c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2018 License: MIT Imports: 12 Imported by: 0

README

go-curator

GoDoc

ZooKeeper High API Client, inspired by Netflix/curator

Documentation

Index

Examples

Constants

View Source
const (
	ConnDisconnected = 0
	ConnConnected    = 1
)
View Source
const (
	DefaultSessionTimeout    = 60 * time.Second
	DefaultConnectionTimeout = 15 * time.Second
)

Variables

View Source
var DefaultZookeeperFactory = defaultZookeeperFactory{}
View Source
var (
	ErrClientClosed = errors.New("curator: ZookeeperClient had been closed")
)
View Source
var ErrConnectionLoss = errors.New("curator: connection loss")

Functions

func CallWithRetryLoop

func CallWithRetryLoop(client *ZookeeperClient, operate func() error) (err error)

func CreateAll

func CreateAll(client *ZookeeperClient, nodePath string, value []byte, flags int32, aclv []zk.ACL) (string, error)

func DeleteAll

func DeleteAll(client *ZookeeperClient, node string) error

func ShouldRetry

func ShouldRetry(err error) bool

Types

type ChildrenCache

type ChildrenCache struct {
	// contains filtered or unexported fields
}

func NewChildrenCache

func NewChildrenCache(client *ZookeeperClient, node string, callback OnChildrenCacheChange) *ChildrenCache

func (*ChildrenCache) Close

func (w *ChildrenCache) Close() error

func (*ChildrenCache) Get

func (w *ChildrenCache) Get(child string) (data []byte, stat *zk.Stat, ok bool)

func (*ChildrenCache) Start

func (w *ChildrenCache) Start() error

type ChildrenCacheEvent

type ChildrenCacheEvent struct {
	ChildNode string
	Data      []byte
	Stat      *zk.Stat
	Type      ChildrenCacheEventType
}

type ChildrenCacheEventType

type ChildrenCacheEventType int
const (
	ChildrenCacheAdd    ChildrenCacheEventType = 1
	ChildrenCacheUpdate ChildrenCacheEventType = 2
	ChildrenCacheDel    ChildrenCacheEventType = 3
)

func (ChildrenCacheEventType) String

func (c ChildrenCacheEventType) String() string

type Conn

type Conn interface {
	Close()

	Get(path string) (data []byte, stat *zk.Stat, err error)
	GetW(path string) (data []byte, stat *zk.Stat, watch <-chan zk.Event, err error)

	Children(path string) (children []string, stat *zk.Stat, err error)
	ChildrenW(path string) (children []string, stat *zk.Stat, watch <-chan zk.Event, err error)

	Exists(path string) (exist bool, stat *zk.Stat, err error)
	ExistsW(path string) (exist bool, stat *zk.Stat, watch <-chan zk.Event, err error)

	Create(path string, value []byte, flags int32, acl []zk.ACL) (pathCreated string, err error)
	CreateProtectedEphemeralSequential(path string, value []byte, aclv []zk.ACL) (pathCreated string, err error)

	Set(path string, value []byte, version int32) (stat *zk.Stat, err error)

	Delete(path string, version int32) (err error)

	GetACL(path string) ([]zk.ACL, *zk.Stat, error)
	SetACL(path string, acl []zk.ACL, version int32) (*zk.Stat, error)
}

type EnsembleProvider

type EnsembleProvider interface {
	Start() error
	Close() error
	GetConnectionString() string
}

type ExponentialBackoffRetry

type ExponentialBackoffRetry struct {
	N            int
	BaseDuration time.Duration
	MaxDuration  time.Duration
}

func NewExponentialBackoffRetry

func NewExponentialBackoffRetry(n int, baseDuration, maxDuration time.Duration) ExponentialBackoffRetry

func (ExponentialBackoffRetry) AllowRetry

func (r ExponentialBackoffRetry) AllowRetry(count int, elapsed time.Duration, sleeper RetrySleeper) bool

type FixedEnsembleProvider

type FixedEnsembleProvider string

func NewFixedEnsembleProvider

func NewFixedEnsembleProvider(connString string) FixedEnsembleProvider

func (FixedEnsembleProvider) Close

func (FixedEnsembleProvider) Close() error

func (FixedEnsembleProvider) GetConnectionString

func (f FixedEnsembleProvider) GetConnectionString() string

func (FixedEnsembleProvider) Start

func (FixedEnsembleProvider) Start() error

type LeaderSelector

type LeaderSelector struct {
	// contains filtered or unexported fields
}

func NewLeaderSelector

func NewLeaderSelector(client *ZookeeperClient, basePath string, listener LeaderSelectorListener, aclv []zk.ACL) *LeaderSelector

func (*LeaderSelector) Close

func (l *LeaderSelector) Close() error

func (*LeaderSelector) HasLeaderShip

func (l *LeaderSelector) HasLeaderShip() bool

func (*LeaderSelector) Start

func (l *LeaderSelector) Start() error

type LeaderSelectorListener

type LeaderSelectorListener interface {
	TakeLeaderShip(client *ZookeeperClient, cancel <-chan struct{}) error
}

type Locker

type Locker interface {
	Acquire() error
	Release() error
}

type Logger

type Logger interface {
	Infof(format string, v ...interface{})
	Infoln(v ...interface{})
	Errorf(format string, v ...interface{})
	Errorln(v ...interface{})
	Debugf(format string, v ...interface{})
	Debugln(v ...interface{})
	Warnf(format string, v ...interface{})
	Warnln(v ...interface{})
}
var Log Logger = &stdLogger{}

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

func NewMutex

func NewMutex(client *ZookeeperClient, basePath string, aclv []zk.ACL) *Mutex

func (*Mutex) Acquire

func (m *Mutex) Acquire() error

func (*Mutex) Release

func (m *Mutex) Release() error

type OnChildrenCacheChange

type OnChildrenCacheChange func(event ChildrenCacheEvent)

type RetryForever

type RetryForever struct {
	Duration time.Duration
}

func NewRetryForever

func NewRetryForever(duration time.Duration) RetryForever

func (RetryForever) AllowRetry

func (r RetryForever) AllowRetry(count int, elapsed time.Duration, sleeper RetrySleeper) bool

type RetryNTimes

type RetryNTimes struct {
	N        int
	Duration time.Duration
}

func NewRetryNTimes

func NewRetryNTimes(n int, duration time.Duration) RetryNTimes

func (RetryNTimes) AllowRetry

func (r RetryNTimes) AllowRetry(count int, elapsed time.Duration, sleeper RetrySleeper) bool

type RetryPolicy

type RetryPolicy interface {
	AllowRetry(count int, elapsed time.Duration, sleeper RetrySleeper) bool
}

type RetryPolicyFunc

type RetryPolicyFunc func(count int, elapsed time.Duration, sleeper RetrySleeper) bool

func (RetryPolicyFunc) AllowRetry

func (r RetryPolicyFunc) AllowRetry(count int, elapsed time.Duration, sleeper RetrySleeper) bool

type RetrySleeper

type RetrySleeper interface {
	Sleep(duration time.Duration) error
}

func NopRetrySleeper

func NopRetrySleeper() RetrySleeper

type RetrySleeperFunc

type RetrySleeperFunc func(duration time.Duration) error

func (RetrySleeperFunc) Sleep

func (r RetrySleeperFunc) Sleep(duration time.Duration) error

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func NewWatcher

func NewWatcher(f func(zk.Event)) *Watcher

func (*Watcher) Fire

func (w *Watcher) Fire(event zk.Event)

type ZookeeperClient

type ZookeeperClient struct {
	// contains filtered or unexported fields
}

func NewZookeeperClient

func NewZookeeperClient(
	factory ZookeeperFactory,
	ensemble EnsembleProvider,
	sessionTimeout, connectTimeout time.Duration,
	retryPolicy RetryPolicy, canBeReadOnly bool) (*ZookeeperClient, error)
Example
servers := "192.168.27.216:2181,192.168.27.217:2181,192.168.27.218:2181"
ensemble := curator.NewFixedEnsembleProvider(servers)
sessionTimeout := 3 * time.Second
connectionTimeout := 1 * time.Second
retryPolicy := curator.NewRetryForever(500 * time.Millisecond)
client, err := curator.NewZookeeperClient(curator.DefaultZookeeperFactory, ensemble, sessionTimeout, connectionTimeout, retryPolicy, true)
if err != nil {
	log.Println("failed to curator.NewZookeeperClient, err:", err)
	return
}
if err := client.Start(); err != nil {
	log.Println("failed to client.Start, err:", err)
	return
}
defer client.Close()

children, stat, err := client.Children("/zookeeper")
if err != nil {
	log.Println("failed to client.Children, err:", err)
	return
}

log.Printf("stat:%+v", stat)

for _, child := range children {
	log.Println("child:", child)
}
Output:

func (*ZookeeperClient) BlockUntilConnectedOrTimeout

func (c *ZookeeperClient) BlockUntilConnectedOrTimeout()

func (*ZookeeperClient) Children

func (c *ZookeeperClient) Children(path string) (children []string, stat *zk.Stat, err error)

func (*ZookeeperClient) ChildrenW

func (c *ZookeeperClient) ChildrenW(path string) (children []string, stat *zk.Stat, watch <-chan zk.Event, err error)

func (*ZookeeperClient) Close

func (c *ZookeeperClient) Close() error

func (*ZookeeperClient) Create

func (c *ZookeeperClient) Create(path string, value []byte, flags int32, aclv []zk.ACL) (pathCreated string, err error)

func (*ZookeeperClient) CreateProtectedEphemeralSequential

func (c *ZookeeperClient) CreateProtectedEphemeralSequential(path string, value []byte, aclv []zk.ACL) (pathCreated string, err error)

func (*ZookeeperClient) Delete

func (c *ZookeeperClient) Delete(path string, version int32) (err error)

func (*ZookeeperClient) Exists

func (c *ZookeeperClient) Exists(path string) (exist bool, stat *zk.Stat, err error)

func (*ZookeeperClient) ExistsW

func (c *ZookeeperClient) ExistsW(path string) (exist bool, stat *zk.Stat, watch <-chan zk.Event, err error)

func (*ZookeeperClient) Get

func (c *ZookeeperClient) Get(path string) (data []byte, stat *zk.Stat, err error)

func (*ZookeeperClient) GetACL

func (c *ZookeeperClient) GetACL(path string) (acl []zk.ACL, stat *zk.Stat, err error)

func (*ZookeeperClient) GetConn

func (c *ZookeeperClient) GetConn() Conn

func (*ZookeeperClient) GetRetryPolicy

func (c *ZookeeperClient) GetRetryPolicy() RetryPolicy

func (*ZookeeperClient) GetW

func (c *ZookeeperClient) GetW(path string) (data []byte, stat *zk.Stat, watch <-chan zk.Event, err error)

func (ZookeeperClient) IsConnected

func (c ZookeeperClient) IsConnected() bool

func (*ZookeeperClient) Set

func (c *ZookeeperClient) Set(path string, value []byte, version int32) (stat *zk.Stat, err error)

func (*ZookeeperClient) SetACL

func (c *ZookeeperClient) SetACL(path string, acl []zk.ACL, version int32) (stat *zk.Stat, err error)

func (*ZookeeperClient) Start

func (c *ZookeeperClient) Start() error

type ZookeeperClientBuilder

type ZookeeperClientBuilder struct {
	// contains filtered or unexported fields
}
Example
servers := "192.168.27.216:2181,192.168.27.217:2181,192.168.27.218:2181"
client, err := curator.NewZookeeperClientBuidler().
	WithEnsembleProvider(curator.NewFixedEnsembleProvider(servers)).
	WithRetryPolicy(curator.NewRetryForever(100 * time.Millisecond)).
	Build()
if err != nil {
	log.Println("failed to curator.NewZookeeperClient, err:", err)
	return
}
if err := client.Start(); err != nil {
	log.Println("failed to client.Start, err:", err)
	return
}
defer client.Close()

children, stat, err := client.Children("/zookeeper")
if err != nil {
	log.Println("failed to client.Children, err:", err)
	return
}

log.Printf("stat:%+v", stat)

for _, child := range children {
	log.Println("child:", child)
}
Output:

func NewZookeeperClientBuidler

func NewZookeeperClientBuidler() *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) Build

func (*ZookeeperClientBuilder) WithCanBeReadOnly

func (b *ZookeeperClientBuilder) WithCanBeReadOnly(canBeReadOnly bool) *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) WithConnectionTimeout

func (b *ZookeeperClientBuilder) WithConnectionTimeout(timeout time.Duration) *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) WithEnsembleProvider

func (b *ZookeeperClientBuilder) WithEnsembleProvider(ensemble EnsembleProvider) *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) WithRetryPolicy

func (b *ZookeeperClientBuilder) WithRetryPolicy(policy RetryPolicy) *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) WithSessionTimeout

func (b *ZookeeperClientBuilder) WithSessionTimeout(timeout time.Duration) *ZookeeperClientBuilder

func (*ZookeeperClientBuilder) WithZookeeperFactory

func (b *ZookeeperClientBuilder) WithZookeeperFactory(factory ZookeeperFactory) *ZookeeperClientBuilder

type ZookeeperFactory

type ZookeeperFactory interface {
	Create(connectString string, sessionTimeout, connectTimeout time.Duration, readOnly bool) (*zk.Conn, <-chan zk.Event, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL