tgres: github.com/tgres/tgres/cluster Index | Examples | Files

package cluster

import "github.com/tgres/tgres/cluster"

Package cluster is a simplistic clustering implementaion built on top of https://godoc.org/github.com/hashicorp/memberlist.

The assumption behind this package is that you have identical nodes, each responsible for a certain part of the data, a datum, identified by an integer id, and any node forwards requests to the node designated for the datum. The designation is determined by a simple mod operation of datum id against the number of nodes, therefore id distribution matters. There is no leader.

If a node must terminate, it is given an opportunity to save the data it is responsible for, then signal the nodes now responsible that they can take over the processing.

Any cluster change triggers a "transition". During a transition each datum is "relinquished", and upon the relinquish the next responsible node is notified. All this is managed by Cluster, all that is required from the application is to enure that each datum implements the DistDatum interface.

Index

Examples

Package Files

cluster.go

type Cluster Uses

type Cluster struct {
    *memberlist.Memberlist
    sync.RWMutex
    // contains filtered or unexported fields
}

Cluster is based on Memberlist and adds some functionality on top of it such as the notion of a node being "ready".

This example joins a sole node cluster, and shows how to watch cluster changes and trigger transitions.

Code:

c, err := NewCluster()
if err != nil {
    fmt.Printf("Error creating cluster: %v\n", err)
}

if err = c.Join([]string{}); err != nil {
    fmt.Printf("Error joining cluster: %v\n", err)
}

clusterChgCh := c.NotifyClusterChanges()

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    for {
        _, ok := <-clusterChgCh
        if !ok {
            return
        }

        fmt.Printf("A cluster change occurred, running a transition.\n")
        if err := c.Transition(1 * time.Second); err != nil {
            fmt.Printf("Transition error: %v", err)
        }
    }
}()

// Leave the cluster (this triggers a cluster change event)
c.Leave(1 * time.Second)

// This will cause the goroutine to exit
close(clusterChgCh)
wg.Wait()

Output:

A cluster change occurred, running a transition.

func NewCluster Uses

func NewCluster() (*Cluster, error)

NewCluster creates a new Cluster with reasonable defaults.

func NewClusterBind Uses

func NewClusterBind(baddr string, bport int, aaddr string, aport int, rpcport int, name string) (*Cluster, error)

NewClusterBind creates a new Cluster while allowing for specification of the address/port to bind to, the address/port to advertize to the other nodes (use zero values for default) as well as the hostname. (This is useful if your app is running in a Docker container where it is impossible to figure out the outside IP addresses and the hostname can be the same).

func (*Cluster) Copies Uses

func (c *Cluster) Copies(n ...int) int

Set the number of copies of DistDatims that the Cluster will keep. The default is 1. You can only set it while the cluster is empty.

func (*Cluster) GetBroadcasts Uses

func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte

func (*Cluster) Join Uses

func (c *Cluster) Join(existing []string) error

Join joins a cluster given at least one node address/port. NB: You can always join yourself if this is a cluster of one node.

func (*Cluster) List Uses

func (c *Cluster) List() map[string]*ddEntry

func (*Cluster) LoadDistData Uses

func (c *Cluster) LoadDistData(f func() ([]DistDatum, error)) error

LoadDistData will trigger a load of DistDatum's. Its argument is a function which performs the actual load and returns the list, while also providing the data to the application in whatever way is needed by the user-side. This action has to be triggered from the user-side. You should LoadDistData prior to marking your node as ready.

func (*Cluster) LocalNode Uses

func (c *Cluster) LocalNode() *Node

LocalNode returns a pointer to the local node.

func (*Cluster) LocalState Uses

func (c *Cluster) LocalState(join bool) []byte

func (*Cluster) Members Uses

func (c *Cluster) Members() []*Node

Members lists cluster members (ready or not).

func (*Cluster) MergeRemoteState Uses

func (c *Cluster) MergeRemoteState(buf []byte, join bool)

func (*Cluster) NodeMeta Uses

func (c *Cluster) NodeMeta(limit int) []byte

func (*Cluster) NodesForDistDatum Uses

func (c *Cluster) NodesForDistDatum(dd DistDatum) []*Node

NodesForDistDatum returns the nodes responsible for this DistDatum. The first node is the one responsible for Relinquish(), the rest are up to the user to decide. The nodes are cached, the call doesn't compute anything. The idea is that a NodesForDistDatum() should be pretty fast so that you can call it a lot, e.g. for every incoming data point.

func (*Cluster) NotifyClusterChanges Uses

func (c *Cluster) NotifyClusterChanges() chan bool

NotifyClusterChanges returns a bool channel which will be sent true any time a cluster change happens (nodes join or leave, or node metadata changes).

func (*Cluster) NotifyJoin Uses

func (c *Cluster) NotifyJoin(n *memberlist.Node)

func (*Cluster) NotifyLeave Uses

func (c *Cluster) NotifyLeave(n *memberlist.Node)

func (*Cluster) NotifyMsg Uses

func (c *Cluster) NotifyMsg(b []byte)

func (*Cluster) NotifyUpdate Uses

func (c *Cluster) NotifyUpdate(n *memberlist.Node)

func (*Cluster) Ready Uses

func (c *Cluster) Ready(status bool) error

Ready sets the Node status in the metadata and broadcasts a change notification to the cluster.

func (*Cluster) RegisterMsgType Uses

func (c *Cluster) RegisterMsgType() (snd, rcv chan *Msg)

RegisterMsgType makes sending messages across nodes simpler. It returns two channels, one to send the other to receive a *Msg structure. The nodes of the cluster must call RegisterMsgType in exact same order because that is what determines the internal message id and the channel to which it will be passed. The message is sent to the destination specified in Msg.Dst. Messages are compressed using flate.

func (*Cluster) SetMetaData Uses

func (c *Cluster) SetMetaData(b []byte) error

Sets the metadata and broadcasts an UpdateNode message to the cluster.

func (*Cluster) Shutdown Uses

func (c *Cluster) Shutdown() error

func (*Cluster) SortedNodes Uses

func (c *Cluster) SortedNodes() ([]*Node, error)

SortedNodes returns nodes ordered by process start time

func (*Cluster) Transition Uses

func (c *Cluster) Transition(timeout time.Duration) error

Transition() provides the transition on cluster changes. Transitions should be triggered by user-land after receiving a cluster change event from a channel returned by NotifyClusterChanges(). The transition will call Relinquish() on all DistDatums that are transferring to other nodes and wait for confirmation of Relinquish() from other nodes for DistDatums transferring to this node. Generally a node should be buffering all the data it receives during a transition.

type ClusterRPC Uses

type ClusterRPC struct {
    // contains filtered or unexported fields
}

func (*ClusterRPC) Message Uses

func (rpc *ClusterRPC) Message(msg Msg, reply *Msg) error

type DistDatum Uses

type DistDatum interface {
    // Id returns an integer that uniquely identifies this datum for
    // this type. Datum -> node designation is determined by id %
    // numNodes, which means id distribution matters.
    Id() int64

    // Type returns a string that identifies the type. The value
    // doesn't matter, so long as the type:id conbination uniquely
    // identifies this DistDatum. (A good practice is to just use the
    // type name as a string).
    Type() string

    // Reqlinquish is a chance to persist the data before the datum
    // can be assigned to another node. On a cluater change that
    // requires a reassignment, the receiving node will wait for the
    // Relinquish operation to complete (up to a configurable
    // timeout).
    Relinquish() error

    // Acquire is chance to do something just before we can start
    // processing data for this DistDatum (which normally would just
    // be Relinquished by another node).
    Acquire() error

    // This is only used for logging/debugging. It should return some
    // kind of a meaningful symbolic name for this datum, if any.
    GetName() string
}

DistDatum is an interface for a piece of data distributed across the cluster. More preciesely, each DistDatum belongs to a node, and nodes are responsible for forwarding requests to the responsible node.

type Msg Uses

type Msg struct {
    Id       int
    Dst, Src *Node
    Body     []byte
}

Msg is the structure that should be passed to channels returned by c.RegisterMsgType().

func NewMsg Uses

func NewMsg(dest *Node, payload interface{}) (*Msg, error)

NewMsg creates a Msg from a payload which is gob-encodable

func (*Msg) Decode Uses

func (m *Msg) Decode(dst interface{}) error

implement gob.GobDecoder interface.

type Node Uses

type Node struct {
    *memberlist.Node
    // contains filtered or unexported fields
}

func (*Node) Meta Uses

func (n *Node) Meta() ([]byte, error)

Meta() will return the user part of the node metadata. (Cluster uses the beginning bytes to store its internal stuff such as the ready status of a node, trailed by user part).

func (*Node) Name Uses

func (n *Node) Name() string

Name returns the node name or "<nil>" if the pointer is nil.

func (*Node) Ready Uses

func (n *Node) Ready() bool

Ready returns the status of a node.

func (*Node) SanitizedAddr Uses

func (n *Node) SanitizedAddr() string

Package cluster imports 14 packages (graph) and is imported by 2 packages. Updated 2017-02-23. Refresh now. Tools for package owners.