ring

package module
v0.0.0-...-8b80c72 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2016 License: BSD-3-Clause Imports: 25 Imported by: 0

README

Ring

Development Repository

Experimental: No stable version of this package yet exists; it is still in early development.

If you're not entirely sure what consistent hashing is, reading Basic Hash Ring might help.

Package ring contains tools for building and using a consistent hashing ring with replicas, automatic partitioning (ring ranges), and keeping replicas of the same partitions in as distinct tiered nodes as possible (tiers might be devices, servers, cabinets, rooms, data centers, geographical regions, etc.)

It also contains tools for using a ring as a messaging hub, easing communication between nodes in the ring.

Here's a quick example of building a ring and discovering what items are assigned to what nodes:

package main

import (
    "fmt"
    "hash/fnv"

    "github.com/gholt/ring"
)

func main() {
    // Note that we're ignoring errors for the purpose of a shorter example.
    // The 64 indicates how many bits can be used in a uint64 for node IDs;
    // 64 is fine unless you have a specific use case.
    builder := ring.NewBuilder(64)
    // (active, capacity, no tiers, no addresses, meta, no conf)
    builder.AddNode(true, 1, nil, nil, "NodeA", nil)
    builder.AddNode(true, 1, nil, nil, "NodeB", nil)
    builder.AddNode(true, 1, nil, nil, "NodeC", nil)
    // This rebalances if necessary and provides a usable Ring instance.
    ring := builder.Ring()
    // This value indicates how many bits are in use for determining ring
    // partitions.
    partitionBitCount := ring.PartitionBitCount()
    for _, item := range []string{"First", "Second", "Third"} {
        // We're using fnv hashing here, but you can use whatever you like.
        // We don't actually recommend fnv, but it's useful for this example.
        hasher := fnv.New64a()
        hasher.Write([]byte(item))
        partition := uint32(hasher.Sum64() >> (64 - partitionBitCount))
        // We can just grab the first node since this example just uses one
        // replica. See Builder.SetReplicaCount for more information.
        node := ring.ResponsibleNodes(partition)[0]
        fmt.Printf("%s is handled by %v\n", item, node.Meta())
    }
}

The output would be:

First is handled by NodeC
Second is handled by NodeB
Third is handled by NodeB

API Documentation
Basic Hash Ring
Partition Ring vs. Hash Ring

Other interesting ideas in this space:
Jump consistent hashing - dgryski implementation also dgryski shared key-value store
Multi-probe consistent hashing - dgryski implementation
GreenCHT replication scheme

This is the latest development area for the package.
Eventually a stable version of the package will be established but, for now, all things about this package are subject to change.

Copyright See AUTHORS. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.

Documentation

Overview

Package ring contains tools for building and using a consistent hashing ring with replicas, automatic partitioning (ring ranges), and keeping replicas of the same partitions in as distinct tiered nodes as possible (tiers might be devices, servers, cabinets, rooms, data centers, geographical regions, etc.)

It also contains tools for using a ring as a messaging hub, easing communication between nodes in the ring.

Here's a quick example of building a ring and discovering what items are assigned to what nodes:

package main

import (
    "fmt"
    "hash/fnv"

    "github.com/gholt/ring"
)

func main() {
    // Note that we're ignoring errors for the purpose of a shorter example.
    // The 64 indicates how many bits can be used in a uint64 for node IDs;
    // 64 is fine unless you have a specific use case.
    builder := ring.NewBuilder(64)
    // (active, capacity, no tiers, no addresses, meta, no config)
    builder.AddNode(true, 1, nil, nil, "NodeA", nil)
    builder.AddNode(true, 1, nil, nil, "NodeB", nil)
    builder.AddNode(true, 1, nil, nil, "NodeC", nil)
    // This rebalances if necessary and provides a usable Ring instance.
    ring := builder.Ring()
    // This value indicates how many bits are in use for determining ring
    // partitions.
    partitionBitCount := ring.PartitionBitCount()
    for _, item := range []string{"First", "Second", "Third"} {
        // We're using fnv hashing here, but you can use whatever you like.
        // We don't actually recommend fnv, but it's useful for this example.
        hasher := fnv.New64a()
        hasher.Write([]byte(item))
        partition := uint32(hasher.Sum64() >> (64 - partitionBitCount))
        // We can just grab the first node since this example just uses one
        // replica. See Builder.SetReplicaCount for more information.
        node := ring.ResponsibleNodes(partition)[0]
        fmt.Printf("%s is handled by %v\n", item, node.Meta())
    }
}

The output would be:

First is handled by NodeC
Second is handled by NodeB
Third is handled by NodeB

Index

Constants

View Source
const (
	// BUILDERVERSION is the builder file format version written to and checked
	// for in the builder file header. If the on disk format of the builder changes
	// this version should be incremented.
	BUILDERVERSION = "RINGBUILDERv0001"
)
View Source
const RINGVERSION = "RINGv00000000001"

RINGVERSION is the ring file format version written to and checked for in the ring file header. If the on disk format of the ring changes this version should be incremented.

Variables

View Source
var TCP_MSG_RING_VERSION = []byte("TCPMSGRINGv00001")

Functions

func CLI

func CLI(args []string, output io.Writer, ansiColor bool) error

CLI is the "ring" command line interface; it's included in the ring package itself so you can easily stub it to whatever executable name you might want, with code like:

package main

import (
    "fmt"
    "os"

    "github.com/gholt/ring"
    "github.com/gholt/brimtext"
)

func main() {
    if err := ring.CLI(os.Args, os.Stdout, false); err != nil {
        fmt.Fprintln(os.Stderr, brimtext.Sentence(err.Error()))
        os.Exit(1)
    }
}

The individual subcommands are also included in the ring package, prefixed with CLI, so you can build your command line interface with just the commands you want, or leverage them to build a web interface, etc. The ansiColor value indicates if you'd like ANSI color escape sequences embedded in the output.

func CLIAddOrSet

func CLIAddOrSet(b *Builder, args []string, n BuilderNode, output io.Writer) error

CLIAddOrSet adds a new node or updates an existing node; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIConfig

func CLIConfig(r Ring, b *Builder, args []string, output io.Writer) (changed bool, err error)

CLIConfig displays or sets the top-level config in the ring or builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIConfigFile

func CLIConfigFile(r Ring, b *Builder, args []string, output io.Writer) (changed bool, err error)

CLIConfigFile displays or sets the top-level config in the ring or builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLICreate

func CLICreate(filename string, args []string, output io.Writer) error

CLICreate creates a new builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIHelp

func CLIHelp(args []string, output io.Writer, ansiColor bool, markdown bool) error

CLIHelp outputs the help text for the default CLI. The ansiColor value indicates if you'd like ANSI color escape sequences embedded in the output. The markdown value indicates if you'd like the raw Markdown help text instead of the processed text.

func CLIInfo

func CLIInfo(r Ring, b *Builder, output io.Writer) error

CLIInfo outputs information about the ring or builder such as node count, partition count, etc.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLINode

func CLINode(r Ring, b *Builder, args []string, full bool, output io.Writer) (changed bool, err error)

CLINode outputs a list of nodes in the ring or builder, with optional filtering and also allows setting attributes on those nodes; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLINodeReport

func CLINodeReport(n Node) string

func CLIPartition

func CLIPartition(r Ring, b *Builder, args []string, output io.Writer) error

CLIPartition outputs information about a partition's node assignments; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIPretendElapsed

func CLIPretendElapsed(b *Builder, args []string, output io.Writer) error

CLIPretendElapsed updates a builder, pretending some time has elapsed for testing purposes; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIRemove

func CLIRemove(b *Builder, args []string, output io.Writer) error

CLIRemove removes a node from the builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLIRing

func CLIRing(b *Builder, filename string, output io.Writer) error

CLIRing writes a new ring file based on the builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func CLITier

func CLITier(r Ring, b *Builder, args []string, output io.Writer) error

CLITier outputs a list of tiers in the ring or builder; see the output of CLIHelp for detailed information.

Provide either the ring or the builder, but not both; set the other to nil. Normally the results from RingOrBuilder.

func PersistRingOrBuilder

func PersistRingOrBuilder(r Ring, b *Builder, filename string) error

PersistRingOrBuilder persists a given ring/builder to the provided filename

func RingOrBuilder

func RingOrBuilder(fileName string) (Ring, *Builder, error)

RingOrBuilder attempts to determine whether a file is a Ring or Builder file and then loads it accordingly.

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is used to construct Rings over time. Rings are the immutable state of a Builder's assignments at a given point in time.

func LoadBuilder

func LoadBuilder(r io.Reader) (*Builder, error)

LoadBuilder creates a new Builder instance based on the persisted data from the Reader (presumably previously saved with the Persist method).

func NewBuilder

func NewBuilder(idBits int) *Builder

NewBuilder creates an empty Builder with all default settings.

idBits indicates how many bits (1-64) may be used for node IDs; it must be set at Builder creation and cannot change once created.

func (*Builder) AddNode

func (b *Builder) AddNode(active bool, capacity uint32, tiers []string, addresses []string, meta string, config []byte) (BuilderNode, error)

AddNode will add a new node to the builder for data assigment. Actual data assignment won't ocurr until the Ring method is called, so you can add multiple nodes or alter node values after creation if desired.

func (*Builder) Config

func (b *Builder) Config() []byte

Config is the raw encoded global configuration.

func (*Builder) IDBits

func (b *Builder) IDBits() int

IDBits is the number of bits in use for node IDs.

func (*Builder) MaxPartitionBitCount

func (b *Builder) MaxPartitionBitCount() uint16

MaxPartitionBitCount caps how large the ring can grow. The default is 23, which means 2**23 or 8,388,608 partitions, which is about 100M for a 3 replica ring (each partition replica assignment is an int32).

func (*Builder) MoveWait

func (b *Builder) MoveWait() uint16

MoveWait is the number of minutes that should elapse before reassigning a replica of a partition again.

func (*Builder) Node

func (b *Builder) Node(nodeID uint64) BuilderNode

Node returns the node instance identified, if there is one.

func (*Builder) Nodes

func (b *Builder) Nodes() NodeSlice

Nodes returns a NodeSlice of the nodes the Builder references, but each Node in the slice can be typecast into a BuilderNode if needed.

func (*Builder) Persist

func (b *Builder) Persist(w io.Writer) error

Persist saves the Builder state to the given Writer for later reloading via the LoadBuilder method.

func (*Builder) PointsAllowed

func (b *Builder) PointsAllowed() byte

PointsAllowed is the number of percentage points over or under that the ring will try to keep data assignments within. The default is 1 for one percent extra or less data.

func (*Builder) PretendElapsed

func (b *Builder) PretendElapsed(minutes uint16)

PretendElapsed shifts the last movement records by the number of minutes given. This can be useful in testing, as the ring algorithms will not reassign replicas for a partition more often than once per MoveWait in order to let reassignments take effect before moving the same data yet again.

func (*Builder) RemoveNode

func (b *Builder) RemoveNode(nodeID uint64)

RemoveNode will remove the node from the list of nodes for this builder/ring. Note that this can be relatively expensive as all nodes that had been added after the removed node had been originally added will have their internal indexes shifted down one and all the replica-to-partition-to-node indexing will have to be updated, as well as clearing any assignments that were to the removed node. Normally it is better to just leave a "dead" node in place and simply set it as inactive.

func (*Builder) ReplicaCount

func (b *Builder) ReplicaCount() int

func (*Builder) Ring

func (b *Builder) Ring() Ring

Ring returns a Ring instance of the data defined by the builder. This will cause any pending rebalancing actions to be performed. The Ring returned will be immutable; to obtain updated ring data, Ring() must be called again.

func (*Builder) SetConfig

func (b *Builder) SetConfig(config []byte)

func (*Builder) SetMaxPartitionBitCount

func (b *Builder) SetMaxPartitionBitCount(count uint16)

func (*Builder) SetMoveWait

func (b *Builder) SetMoveWait(minutes uint16)

func (*Builder) SetPointsAllowed

func (b *Builder) SetPointsAllowed(points byte)

func (*Builder) SetReplicaCount

func (b *Builder) SetReplicaCount(count int)

func (*Builder) Tiers

func (b *Builder) Tiers() [][]string

Tiers returns the tier values in use at each level. Note that an empty string is always an available value at any level, although it is not returned from this method.

type BuilderNode

type BuilderNode interface {
	Node
	SetActive(value bool)
	SetCapacity(value uint32)
	SetTier(level int, value string)
	ReplaceTiers(tiers []string)
	SetAddress(index int, value string)
	ReplaceAddresses(addrs []string)
	SetMeta(value string)
	SetConfig(config []byte)
}

BuilderNode extends Node to allow for updating attributes. A Ring needs immutable nodes as the assignments are static at that point, but the Builder doesn't have that restriction.

type LogFunc

type LogFunc func(format string, v ...interface{})

type Msg

type Msg interface {
	// MsgType is the unique designator for the type of message content (such
	// as a pull replication request, a read request, etc.). Message types just
	// need to be unique values; usually picking 64 bits of a UUID is fine.
	MsgType() uint64
	// MsgLength returns the number of bytes for the content of the message
	// (the amount that will be written with a call to WriteContent).
	MsgLength() uint64
	// WriteContent will send the contents of the message to the given writer.
	//
	// Note that WriteContent may be called multiple times and may be called
	// concurrently.
	//
	// Also note that the content should be written as quickly as possible as
	// any delays may cause the message transmission to be aborted and dropped.
	// In other words, any significant processing to build the message content
	// should be done before the Msg is given to the MsgRing for delivery.
	WriteContent(io.Writer) (uint64, error)
	// Free will be called when the MsgRing no longer has any references to the
	// message and allows the message to free any resources it may have, or be
	// reused, etc.
	Free()
}

Msg is a single message to be sent to another node or nodes.

type MsgRing

type MsgRing interface {
	// Ring returns the ring information used to determine messaging endpoints;
	// note that this method may return nil if no ring information is yet
	// available.
	Ring() Ring
	// MaxMsgLength indicates the maximum number of bytes the content of a
	// message may contain to be handled by this MsgRing.
	MaxMsgLength() uint64
	// SetMsgHandler associates a message type with a handler; any incoming
	// messages with the type will be delivered to the handler. Message types
	// just need to be unique uint64 values; usually picking 64 bits of a UUID
	// is fine.
	SetMsgHandler(msgType uint64, handler MsgUnmarshaller)
	// MsgToNode queues the message for delivery to the indicated node; the
	// timeout should be considered for queueing, not for actual delivery.
	//
	// When the msg has actually been sent or has been discarded due to
	// delivery errors or delays, msg.Free() will be called.
	MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)
	// MsgToNode queues the message for delivery to all other replicas of a
	// partition; the timeout should be considered for queueing, not for actual
	// delivery.
	//
	// If the ring is not bound to a specific node (LocalNode() returns nil)
	// then the delivery attempts will be to all replicas.
	//
	// When the msg has actually been sent or has been discarded due to
	// delivery errors or delays, msg.Free() will be called.
	MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)
}

MsgRing will send and receive Msg instances to and from ring nodes. See TCPMsgRing for a concrete implementation.

The design is such that messages are not guaranteed delivery, or even transmission. Acknowledgement and retry logic is left outside to the producers and consumers of the messages themselves.

For example, if a connection to a node is too slow to keep up with the messages wanting to be delivered to it, many of the messages will simply be dropped.

For another example, if a message is ready to be delivered to a node that currently has no established connection, the connection process will be initiated (for future messages) but that current message will be dropped.

This design is because much ring-related messaging is based on passes over the entire ring with many messages being sent constantly and conditions can change rapidly, making messages less useful as they age. It is better for the distributed system overall if a single message is just dropped if it can't be sent immediately as a similar message will be generated and attempted later on in the next pass.

For messages that require guaranteed delivery, the sender's node ID can be embedded in the message and receiver would send an acknowledgement message back upon receipt. The exact retry logic could vary greatly and depends on the sender's implementation (e.g. background passes recording acks or dedicated goroutines for each message).

In previous systems we've written, distributed algorithms could easily get hung up trying to communicate to one faulty node (for example) making the rest of the system suffer as well.

type MsgUnmarshaller

type MsgUnmarshaller func(reader io.Reader, desiredBytesToRead uint64) (actualBytesRead uint64, err error)

MsgUnmarshaller will attempt to read desiredBytesToRead from the reader and will return the number of bytes actually read as well as any error that may have occurred. If error is nil then actualBytesRead must equal desiredBytesToRead.

Note that the message content should be read as quickly as possible as any delays may cause the message transmission to be aborted. In other words, any significant processing of the message should be done after the contents are read and this reader function returns.

type Node

type Node interface {
	// ID uniquely identifies this node; it will be non-zero as zero is used to
	// indicate "no node".
	ID() uint64
	// Active indicates whether the node should be in use or not. Nodes may be
	// deactivated for a while (during a maintenance, for example) and then
	// reactivated later. While deactivated, the builder will reassign all data
	// previously assigned to the node.
	Active() bool
	// Capacity indicates the amount of data that should be assigned to a node
	// relative to other nodes. It can be in any unit of designation as long as
	// all nodes use the same designation. Most commonly this is the number of
	// gigabytes the node can store, but could be based on CPU capacity or
	// another resource if that makes more sense to balance.
	Capacity() uint32
	// Tiers indicate the layout of the node with respect to other nodes. For
	// example, the lowest tier, tier 0, might be the server ip (where each
	// node represents a drive on that server). The next tier, 1, might then be
	// the power zone the server is in. The number of tiers is flexible, so
	// later an additional tier for geographic region could be added.
	Tiers() []string
	// Tier returns just the single tier value for the level.
	Tier(level int) string
	// Addresses give location information for the node; probably something
	// like ip:port. This is a list for those use cases where different
	// processes use different networks (such as replication using a
	// replication-only network).
	Addresses() []string
	// Address returns just the single address for the index.
	Address(index int) string
	// Meta is additional information for the node; not defined or used by the
	// builder or ring directly.
	Meta() string
	// Config contains the raw configuration bytes for this node.
	Config() []byte
}

Node represents an endpoint for ring data or other ring-based services.

type NodeSlice

type NodeSlice []Node

func (NodeSlice) Filter

func (ns NodeSlice) Filter(filters []string) (NodeSlice, error)

Filter will return a new NodeSlice with just the nodes that match the filters given; multiple filters are ANDed. The basic filter syntax is that "attribute=value" will filter to just nodes whose attribute exactly match the value and "attribute~=value" will similarly filter but treat the value as a regular expression (per the http://golang.org/pkg/regexp/ implementation). The available attributes to filter on are:

id          A node's id (uint64 represented as %d).
active      Whether a node is active or not (use "true" or "false").
capacity    A node's capacity.
tier        Any tier of a node.
tierX       A node's specific tier level specified by X.
address     Any address of a node.
addressX    A node's specific address index specified by X.
meta        A node's meta attribute.

For example:

ring.Nodes().Filter([]string{"active=true", `address=10\.1\.2\..*`})

type Ring

type Ring interface {
	// Version is the time.Now().UnixNano() of when the Ring data was
	// established.
	//
	// Version can indicate changes in ring data; for example, if a server is
	// currently working with one version of ring data and receives requests
	// that are based on a lesser version of ring data, it can ignore those
	// requests or send an "obsoleted" response or something along those lines.
	// Similarly, if the server receives requests for a greater version of ring
	// data, it can ignore those requests or try to obtain a newer ring
	// version.
	Version() int64
	// Config returns the raw encoded global configuration. This configuration
	// data isn't used by the ring itself, but can be useful in storing
	// configuration data for users of the ring.
	Config() []byte
	// Node returns the node instance identified, if there is one.
	Node(nodeID uint64) Node
	// Nodes returns a NodeSlice of the nodes the Ring references.
	Nodes() NodeSlice
	// NodeCount returns the number of nodes the Ring references.
	NodeCount() int
	// Tiers returns the tier values in use at each level. Note that an empty
	// string is always an available value at any level, although it is not
	// returned from this method.
	Tiers() [][]string
	// PartitionBitCount is the number of bits that can be used to determine a
	// partition number for the current data in the ring. For example, to
	// convert a uint64 hash value into a partition number you could use
	// hashValue >> (64 - ring.PartitionBitCount()). The PartitionBitCount also
	// indicates how many partitions the Ring has; for example, a value of 16
	// would indicate 2**16 or 65,536 partitions.
	PartitionBitCount() uint16
	// ReplicaCount specifies how many replicas the Ring has.
	ReplicaCount() int
	// LocalNode returns the node the ring is locally bound to, if any. This
	// local node binding is used by things such as MsgRing to know what items
	// are bound for the local instance or need to be sent to remote ones, etc.
	LocalNode() Node
	// SetLocalNode sets the node the ring is locally bound to, if any. This
	// local node binding is used by things such as MsgRing to know what items
	// are bound for the local instance or need to be sent to remote ones, etc.
	SetLocalNode(nodeID uint64)
	// Responsible will return true if LocalNode is set and one of the
	// partition's replicas is assigned to that local node.
	//
	// Note that the partition value is not bounds checked; an invalid
	// partition will cause a panic. See the documentation for the Ring
	// interface itself for further discussion.
	Responsible(partition uint32) bool
	// ResponsibleReplica will return the replica index >= 0 if LocalNode is
	// set and one of the partition's replicas is assigned to that local node;
	// it will return -1 if LocalNode is not responsible for the partition.
	//
	// Note that the partition value is not bounds checked; an invalid
	// partition will cause a panic. See the documentation for the Ring
	// interface itself for further discussion.
	ResponsibleReplica(partition uint32) int
	// ResponsibleNodes will return the list of nodes that are responsible for
	// the replicas of the partition.
	//
	// Note that the partition value is not bounds checked; an invalid
	// partition will cause a panic. See the documentation for the Ring
	// interface itself for further discussion.
	ResponsibleNodes(partition uint32) NodeSlice
	// Stats gives information about the ring and its health; the MaxUnder and
	// MaxOver values specifically indicate how balanced the ring is.
	Stats() *Stats
	// Persist saves the Ring state to the given Writer for later reloading via
	// the LoadRing method.
	Persist(w io.Writer) error
}

Ring is the immutable snapshot of data assignments to nodes.

The immutable characteristic is important, as it means code that uses a Ring can make calculations around its attributes and not worry about those attributes changing from call to call. For example, the PartitionBitCount can be used to generate a list of partitions that are then used with ResponsibleNodes without worrying the PartitionBitCount changed during the calculations. Changes to the Ring are done by the Builder, which will generate a new immutable Ring instance to be distributed to the users of the older Ring instance. There is one exception, however; the LocalNode attribute is mutable as its function is to represent the local user of that Ring instance.

Note that with several methods the partition value is not bounds checked; an invalid partition will cause a panic. This behavior is for speed reasons, as bounds checking every call would be wasteful in most use cases that already guarantee proper bounding. You could easily create a BoundedRing that wraps a Ring instance to provide such bounding if you really need it; but considering most uses of partitions involve bit shifting based on the PartitionBitCount, such bounding doesn't seem worth it.

func LoadRing

func LoadRing(rd io.Reader) (Ring, error)

LoadRing creates a new Ring instance based on the persisted data from the Reader (presumably previously saved with the Ring.Persist method).

type Stats

type Stats struct {
	ReplicaCount      int
	ActiveNodeCount   int
	InactiveNodeCount int
	PartitionBitCount uint16
	PartitionCount    int
	ActiveCapacity    uint64
	InactiveCapacity  uint64
	// MaxUnderNodePercentage is the percentage a node is underweight, or has
	// less data assigned to it than its capacity would indicate it desires.
	MaxUnderNodePercentage float64
	MaxUnderNodeID         uint64
	// MaxOverNodePercentage is the percentage a node is overweight, or has
	// more data assigned to it than its capacity would indicate it desires.
	MaxOverNodePercentage float64
	MaxOverNodeID         uint64
}

Stats gives an overview of the state and health of a Ring. It is returned by the Ring.Stats() method.

type TCPMsgRing

type TCPMsgRing struct {
	// contains filtered or unexported fields
}

func NewTCPMsgRing

func NewTCPMsgRing(c *TCPMsgRingConfig) (*TCPMsgRing, error)

NewTCPMsgRing creates a new MsgRing that will use TCP to send and receive Msg instances.

func (*TCPMsgRing) Listen

func (t *TCPMsgRing) Listen()

Listen on the configured TCP port, accepting new connections and processing messages from those connections; this function will not return until t.Shutdown() is called.

func (*TCPMsgRing) MaxMsgLength

func (t *TCPMsgRing) MaxMsgLength() uint64

MaxMsgLength indicates the maximum number of bytes the content of a message may contain to be handled by this TCPMsgRing.

func (*TCPMsgRing) MsgHandler

func (t *TCPMsgRing) MsgHandler(msgType uint64) MsgUnmarshaller

MsgHandler returns the handler for the given message type, if there is any set.

func (*TCPMsgRing) MsgToNode

func (t *TCPMsgRing) MsgToNode(msg Msg, nodeID uint64, timeout time.Duration)

MsgToNode queues the message for delivery to the indicated node; the timeout should be considered for queueing, not for actual delivery.

When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.

func (*TCPMsgRing) MsgToOtherReplicas

func (t *TCPMsgRing) MsgToOtherReplicas(msg Msg, partition uint32, timeout time.Duration)

MsgToNode queues the message for delivery to all other replicas of a partition; the timeout should be considered for queueing, not for actual delivery.

If the ring is not bound to a specific node (LocalNode() returns nil) then the delivery attempts will be to all replicas.

When the msg has actually been sent or has been discarded due to delivery errors or delays, msg.Free() will be called.

func (*TCPMsgRing) Ring

func (t *TCPMsgRing) Ring() Ring

Ring returns the ring information used to determine messaging endpoints; note that this method may return nil if no ring information is yet available.

func (*TCPMsgRing) SetChaosAddrDisconnect

func (t *TCPMsgRing) SetChaosAddrDisconnect(addr string, disconnect bool)

SetChaosAddrDisconnect will allow connections to and from addr but, after 10-70 seconds, it will abruptly close a connection.

func (*TCPMsgRing) SetChaosAddrOff

func (t *TCPMsgRing) SetChaosAddrOff(addr string, off bool)

SetChaosAddrOff will disable all outgoing connections to addr and immediately close any incoming connections from addr.

func (*TCPMsgRing) SetMsgHandler

func (t *TCPMsgRing) SetMsgHandler(msgType uint64, handler MsgUnmarshaller)

SetMsgHandler associates a message type with a handler; any incoming messages with the type will be delivered to the handler. Message types just need to be unique uint64 values; usually picking 64 bits of a UUID is fine.

func (*TCPMsgRing) SetRing

func (t *TCPMsgRing) SetRing(ring Ring)

SetRing sets the ring whose information used to determine messaging endpoints.

func (*TCPMsgRing) Shutdown

func (t *TCPMsgRing) Shutdown()

Shutdown will signal the shutdown of all connections, listeners, etc. related to the TCPMsgRing; once Shutdown you must create a new TCPMsgRing to restart operations.

func (*TCPMsgRing) Stats

func (t *TCPMsgRing) Stats(debug bool) *TCPMsgRingStats

Stats returns the current stat counters and resets those counters. In other words, if Stats().Dials gives the value 10 and no more dials occur before Stats() is called again, that second Stats().Dials will have the value 0.

If debug=true, additional information (left undocumented because it is greatly subject to change) may be given when calling TCPMsgRingStats.String().

type TCPMsgRingConfig

type TCPMsgRingConfig struct {
	// LogCritical sets the func to use for critical messages; these are
	// messages about issues that render the TCPMsgRing ring inoperative.
	// Defaults logging to os.Stderr.
	LogCritical LogFunc
	// LogDebug sets the func to use for debug messages. Defaults not logging
	// debug messages.
	LogDebug LogFunc
	// AddressIndex set the index to use with Node.Address(index) to lookup a
	// Node's TCP address.
	AddressIndex int
	// BufferedMessagesPerAddress indicates how many outgoing Msg instances can
	// be buffered before dropping additional ones. Defaults to 8.
	BufferedMessagesPerAddress int
	// ConnectTimeout indicates how many seconds before giving up on a TCP
	// connection establishment. Defaults to 60 seconds.
	ConnectTimeout int
	// ReconnectInterval indicates how many seconds to wait between connection
	// tries. Defaults to 10 seconds.
	ReconnectInterval int
	// ChunkSize indicates how many bytes to attempt to read at once with each
	// network read. Defaults to 16,384 bytes.
	ChunkSize int
	// WithinMessageTimeout indicates how many seconds before giving up on
	// reading data within a message. Defaults to 5 seconds.
	WithinMessageTimeout int
	// UseTLS enables use of TLS for server and client comms
	UseTLS         bool
	MutualTLS      bool
	SkipVerify     bool
	CustomCertPool bool
	CertFile       string
	KeyFile        string
	CAFile         string
}

TCPMsgRingConfig represents the set of values for configuring a TCPMsgRing. Note that changing the values (shallow changes) in this structure will have no effect on existing TCPMsgRings; but deep changes (such as reconfiguring an existing Logger) will.

type TCPMsgRingStats

type TCPMsgRingStats struct {
	Shutdown                  bool
	RingChanges               int32
	RingChangeCloses          int32
	MsgToNodes                int32
	MsgToNodeNoRings          int32
	MsgToNodeNoNodes          int32
	MsgToOtherReplicas        int32
	MsgToOtherReplicasNoRings int32
	ListenErrors              int32
	IncomingConnections       int32
	Dials                     int32
	DialErrors                int32
	OutgoingConnections       int32
	MsgChanCreations          int32
	MsgToAddrs                int32
	MsgToAddrQueues           int32
	MsgToAddrTimeoutDrops     int32
	MsgToAddrShutdownDrops    int32
	MsgReads                  int32
	MsgReadErrors             int32
	MsgWrites                 int32
	MsgWriteErrors            int32
}

func (*TCPMsgRingStats) String

func (s *TCPMsgRingStats) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL