zksync

package module
v0.0.0-...-8c12d02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2015 License: Apache-2.0 Imports: 12 Imported by: 0

README

zksync

GoDoc Build Status

zksync provides a go implementation the synchronization primitives that you'll find in the ZooKeeper documentation: locks and barriers. These can be used to coordinate computation across multiple processes.

RWMutex

RWMutex provides a read-write lock. Readers can share concurrent access as long as a writer hasn't claimed the lock. Writers must be exclusive, and will block on any other lock holder.

Lock acquisition is handled on a first-come, first-serve basis, with ordering determined by the ZooKeeper server.

Locks are stored as ephemeral znodes, so if a client disconnects from ZooKeeper unexpectedly, the lock will be released within the session timeout that the client established when they connected.

Barrier

Barrier provides a shared barrier for multiple clients. When the barrier is set, any call to Barrier.Wait will block any client of the barrier. A call to Barrier.Unset will unblock all clients.

Setting the barrier is an inherently racy process; clients can optimistically call Barrier.Set and handle zk.ErrNodeExists if the barrier is already in place.

Barriers are not ephemeral, since a disconnect from the barrier-setter should not obviously cause all other clients to proceed. This means that any client has permission to remove the barrier.

Double Barriers

Double Barriers allow clients to synchronize start and end times of a computation. All participants in the double barrier should agree on the number of participants ahead of time. Then, they each call DoubleBarrier.Enter() and block until all participants have called that function and written their data in ZooKeeper.

When a client finishes its computation, it calls DoubleBarrier.Exit(), which will block until all clients have either called DoubleBarrier.Exit() or disconnected from ZooKeeper.

All clients must agree on the number of participants throughout the entry process. If the group is expecting 5 participants, and then only 4 successfully call Enter, then those 4 will all block forever.

Development

Run tests with GOMAXPROCS=4 godep go test -timeout 10s ./... (or some other value of GOMAXPROCS, of course - the point is, you want parallelism).

Tests require a working ZooKeeper cluster. You can set this in either of two ways:

  1. Use Vagrant. vagrant up will launch a VM with a 5-node ZooKeeper ensemble listening on 192.168.100.67, ports 2181 through 2185. This is the assumed default in tests.
  2. Set the environment variable ZOOKEEPER_PEERS to a comma-separated list of ZooKeeper hostports, like so:
    $ ZOOKEEPER_PEERS=localhost:2181,localhost:2182 GOMAXPROCS=4 godep go test -timeout 10s ./...
    

The vagrant ZooKeeper cluster uses Toxiproxy to simulate network failures. By default, tests will attempt to connect to 192.168.100.67:8474 to talk to Toxiproxy. You can override this by setting a TOXIPROXY environment variable to your preferred hostport. If unable to connect to Toxiproxy, then tests which rely upon it will be skipped.

Either way, the tests will all be run under a namespace, /zksync-test, and will delete everything under that node after each test run.

Documentation

Overview

package zksync provides synchronization primitives backed by ZooKeeper.

Index

Constants

View Source
const (
	Fatal severity = iota
	Error
	Warning
	Info
	Debug
	Trace
)

Variables

View Source
var (
	ErrMalformedLock = errors.New("not a valid lock")
	ErrNoLockPresent = errors.New("not currently holding a lock")
)

Functions

func SetLogger

func SetLogger(l Logger)

func SetVerbosity

func SetVerbosity(v severity)

Types

type Barrier

type Barrier struct {
	// contains filtered or unexported fields
}

Bariers block processing on until a condition is met, at which time all processes blocked on the barrier are allowed to proceed.

Barriers are implemented by setting a znode in ZooKeeper. If the znode exists, the barrier is in place.

func NewBarrier

func NewBarrier(conn *zk.Conn, path string, acl []zk.ACL) *Barrier

NewBarrier instantiates a Barrier struct. It doesn't actually create anything in ZooKeeper or communicate with ZooKeeper in any way. If the path doesn't exist when `Barrier.Set()` is called, then it will be created, along with any parent nodes, using the provided ACL.

func (*Barrier) Set

func (b *Barrier) Set() error

Set places the barrier, blocking any callers of b.Wait(). Returns an error if the barrier exists; callers can handle the zk.ErrNodeExists themselves.

func (*Barrier) Unset

func (b *Barrier) Unset() error

Unset removes the barrier. Returns an error if the barrier does not exist; callers can handle the zk.ErrNoNode themselves.

func (*Barrier) Wait

func (b *Barrier) Wait() error

Wait blocks until the barrier is removed.

type DoubleBarrier

type DoubleBarrier struct {
	// contains filtered or unexported fields
}

Double barriers enable clients to synchronize the beginning and the end of a computation. When enough processes have joined the barrier, processes start their computation and leave the barrier once they have finished.

Double barrier clients register with an ID string which is used to know which clients have not started or finished a computation.

Double barrier clients need to know how many client processes are participating in order to know when all clients are ready.

func NewDoubleBarrier

func NewDoubleBarrier(conn *zk.Conn, path string, id string, n int, acl []zk.ACL) *DoubleBarrier

NewDoubleBarrier creates a DoubleBarrier using the provided connection to ZooKeeper. The barrier is registered under the given path, and the client will identify itself with the given ID. When Enter or Exit are called, it will block until n clients have similarly entered or exited. The acl is used when creating any znodes.

func (*DoubleBarrier) CancelEnter

func (db *DoubleBarrier) CancelEnter()

CancelEnter aborts an Enter call and cleans up as it aborts. This can be used in conjunction with a timeout to exit early from a Double Barrier.

func (*DoubleBarrier) Enter

func (db *DoubleBarrier) Enter() (err error)

Enter joins the computation. It registers this client at the znode, and then blocks until all n clients have registered. If the path does not exist, then it is created, along with any of its parents if they don't exist.

func (*DoubleBarrier) Exit

func (db *DoubleBarrier) Exit() error

Exit reports this client as done with the computation. It deregisters this node from ZooKeeper, then blocks until all nodes have deregistered.

type Logger

type Logger interface {
	Printf(string, ...interface{})
	Fatalf(string, ...interface{})
}

type RWMutex

type RWMutex struct {
	// contains filtered or unexported fields
}

RWMutex provides a read-write lock backed by ZooKeeper. Multiple clients can use the lock as long as they use the same path on the same ZooKeeper ensemble.

Access is provided on a first-come, first-serve basis. Readers are granted the lock if there are no current writers (so reads can happen concurrently). Writers are only granted the lock exclusively.

It is important to release the lock with `Unlock`, of course.

In case of an unexpected disconnection from ZooKeeper, any locks will be released because the Session Timeout will expire, but its the caller's responsibilty to halt any computation in this case. This can be done by listening to the Event channel provided by zk.Connect (https://godoc.org/github.com/samuel/go-zookeeper/zk#Connect).

RWMutexes are not safe for shared local use across goroutines.

func NewRWMutex

func NewRWMutex(conn *zk.Conn, path string, acl []zk.ACL) *RWMutex

NewRWMutex creates a new RWMutex object. It doesn't actually perform any locking or communicate with ZooKeeper in any way. If the path does not exist, it will be created when RLock or WLock are called, as will any of its parents, using the provided ACL.

func (*RWMutex) RLock

func (m *RWMutex) RLock() error

RLock acquires a read lock on a znode. This will block if there are any write locks already on that znode until the write locks are released.

func (*RWMutex) Unlock

func (m *RWMutex) Unlock() error

Unlock releases the lock. Returns an error if not currently holding the lock.

func (*RWMutex) WLock

func (m *RWMutex) WLock() error

WLock acquires a write lock on a znode. This will block if there are any read or write locks already on that znode until those locks are released.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL