zzk

package
v0.0.0-...-1c5d739 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2023 License: Apache-2.0 Imports: 8 Imported by: 22

Documentation

Index

Constants

View Source
const (
	DefaultConnectionTimeout = time.Minute
	MaxDelay                 = 15 * time.Second
)
View Source
const DefaultRetryTime = time.Minute

DefaultRetryTime is the time to retry a failed local operation

Variables

View Source
var (
	ErrTimeout  = errors.New("connection timeout")
	ErrShutdown = errors.New("listener shutdown")
	ErrBadConn  = errors.New("bad connection")
)

Errors

View Source
var ErrInvalidType = errors.New("invalid type")

ErrInvalidType is the error for invalid zk data types

View Source
var (
	ErrNotInitialized = errors.New("client not initialized")
)

Functions

func Connect

func Connect(path string, getConnection GetConnection) <-chan client.Connection

Connect generates a client connection asynchronously

func GeneratePoolPath

func GeneratePoolPath(poolID string) string

GeneratePoolPath generates the path for a pool-based connection

func GetHostID

func GetHostID(leader client.Leader) (string, error)

GetHostID finds the host of a led node

func GetLocalConnection

func GetLocalConnection(path string) (client.Connection, error)

GetLocalConnection acquires a connection from the local zookeeper client

func GetRemoteConnection

func GetRemoteConnection(path string) (client.Connection, error)

GetRemoteConnection acquires a connection from the remote zookeeper client

func InitializeLocalClient

func InitializeLocalClient(client *client.Client)

InitializeLocalClient initializes the local zookeeper client

func InitializeRemoteClient

func InitializeRemoteClient(client *client.Client)

InitializeRemoteClient initializes the remote zookeeper client

func Listen

func Listen(shutdown <-chan interface{}, ready chan<- error, conn client.Connection, l Listener)

Listen initializes a listener for a particular zookeeper node shutdown: signal to shutdown the listener ready: signal to indicate that the listener has started watching its

child nodes (must set buffer size >= 1)

l: object that manages the zk interface for a specific path

func Listen2

func Listen2(shutdown <-chan interface{}, conn client.Connection, s Spawner)

Listen2 manages spawning threads to handle nodes created under the parent path.

func Manage

func Manage(shutdown <-chan interface{}, root string, l Listener2)

Manage continuously restarts the listener until it shuts down

func MonitorRealm

func MonitorRealm(shutdown <-chan interface{}, conn client.Connection, path string) <-chan string

func PathExists

func PathExists(conn client.Connection, p string) (bool, error)

PathExists verifies if a path exists and does not raise an exception if the path does not exist

func Ready

func Ready(shutdown <-chan interface{}, conn client.Connection, p string) error

Ready waits for a node to be available for watching

func ShutdownConnections

func ShutdownConnections()

ShutdownConnections closes all local and remote zookeeper connections

func Start

func Start(shutdown <-chan interface{}, conn client.Connection, master Listener, listeners ...Listener)

Start starts a group of listeners that are governed by a master listener. When the master exits, it shuts down all of the child listeners and waits for all of the subprocesses to exit

func Sync

func Sync(conn client.Connection, data []Node, zkpath string) error

Sync synchronizes zookeeper data with what is in elastic or any other storage facility

Types

type GetConnection

type GetConnection func(string) (client.Connection, error)

GetConnection describes a generic function for acquiring a connection object

type HostLeader

type HostLeader struct {
	HostID string
	Realm  string
	// contains filtered or unexported fields
}

HostLeader is the node to store leader information for a host

func (*HostLeader) SetVersion

func (node *HostLeader) SetVersion(version interface{})

SetVersion implements client.Node

func (*HostLeader) Version

func (node *HostLeader) Version() interface{}

Version implements client.Node

type LeaderListener

type LeaderListener struct {
	// contains filtered or unexported fields
}

LeaderListener is generic watcher and broadcaster of leader types

func NewLeaderListener

func NewLeaderListener(path string) *LeaderListener

NewLeaderListener instantiates a listener to watch the leader election at a given path.

func (*LeaderListener) Run

func (l *LeaderListener) Run(cancel <-chan interface{}, conn client.Connection)

Run manages the event loop for this listener

func (*LeaderListener) Wait

func (l *LeaderListener) Wait() <-chan struct{}

Wait enqueues a watcher that will be updated when a new leader is elected

type Listener

type Listener interface {
	// SetConnection sets the connection object
	SetConnection(conn client.Connection)
	// GetPath concatenates the base path with whatever child nodes that are specified
	GetPath(nodes ...string) string
	// Ready verifies that the listener can start listening
	Ready() error
	// Done performs any cleanup when the listener exits
	Done()
	// Spawn is the action to be performed when a child node is found on the parent
	Spawn(<-chan interface{}, string)
	// PostProcess performs additional action based on the nodes that are in processing
	PostProcess(p map[string]struct{})
}

Listener is zookeeper node listener type

type Listener2

type Listener2 interface {

	// Listen is the method to call to start the listener
	Listen(cancel <-chan interface{}, conn client.Connection)

	// Exited does additional cleanup once shutdown is called
	Exited()
}

Listener2 is for monitoring the listener and its connection to zookeeper

type NewListener

type NewListener func(string) Listener

NewListener instantiates a new listener object

type Node

type Node interface {
	client.Node
	// GetID relates to the child node mapping in zookeeper
	GetID() string
	// Create creates the object in zookeeper
	Create(conn client.Connection) error
	// Update updates the object in zookeeper
	Update(conn client.Connection) error
}

Node manages zookeeper actions

type Spawner

type Spawner interface {

	// SetConn sets the zookeeper connection
	SetConn(conn client.Connection)

	// Path returns the parent path of the zookeeper node whose children are
	// the target of spawn
	Path() string

	// Pre performs a synchronous action to occur before spawn
	Pre()

	// Spawn is intended to manage individual nodes that exist from Path()
	Spawn(cancel <-chan struct{}, n string)

	// Post presents the complete list of nodes that are children of Path() for
	// further processing and synchronization
	Post(p map[string]struct{})
}

Spawner manages the spawning of individual goroutines for managing nodes under a particular zookeeper

type SyncHandler

type SyncHandler interface {
	// GetPath gets the path to the node
	GetPath(...string) string
	// Ready implements Listener
	Ready() error
	// Done implements Listener
	Done()
	// GetConnection acquires a path-based connection
	GetConnection(string) (client.Connection, error)
	// Allocate initialized a new Node object
	Allocate() Node
	// GetAll gets all local data
	GetAll() ([]Node, error)
	// AddUpdate performs a local update
	AddUpdate(string, Node) (string, error)
	// Delete deletes a Node locally
	Delete(string) error
}

SyncHandler is the handler for the Synchronizer

type Synchronizer

type Synchronizer struct {
	SyncHandler
	// contains filtered or unexported fields
}

Synchronizer is the remote synchronizer object

func NewSynchronizer

func NewSynchronizer(handler SyncHandler) *Synchronizer

NewSynchronizer instantiates a new synchronizer

func (*Synchronizer) AddListener

func (l *Synchronizer) AddListener(f NewListener)

AddListener creates new Listener objects based on the Synchronizer's child nodes

func (*Synchronizer) PostProcess

func (l *Synchronizer) PostProcess(processing map[string]struct{})

PostProcess deletes any orphaned data that exists locally

func (*Synchronizer) SetConnection

func (l *Synchronizer) SetConnection(conn client.Connection)

SetConnection implements Listener

func (*Synchronizer) Spawn

func (l *Synchronizer) Spawn(shutdown <-chan interface{}, nodeID string)

Spawn starts the remote Synchronizer based on nodeID

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL