forestbus: github.com/owlfish/forestbus Index | Examples | Files | Directories

package forestbus

import "github.com/owlfish/forestbus"

The forestbus package contains client functionality for communicating with a Forest Bus cluster.

The client handles establishing the connectivity to the individual nodes and finding the current location of leaders for given topics.

Index

Examples

Package Files

Client.go MessageBatcher.go

Constants

const UNLIMITED_RETRIES = -1

UNLIMITED_RETRIES can be passed to MessageBatcherRetries to enable unlimited retries for transient errors

Variables

var ErrClusterIdMismatch = errors.New("The nodes cluster ID does not match that given by the client.")

ErrClusterIdMismatch is a serious error where the nodes given do not identify with the same clusterID used to setup the Client object.

var ErrMessageBatcherBufferFull = errors.New("The message batcher buffer is full.")

ErrTopicNotFound is returned when the given topic could not be found on the node.

var ErrMessageBatcherClosed = errors.New("The message batcher was closed before the message could be sent.")

ErrMessageBatcherClosed is returned when the message batcher has been closed and the message not sent.

var ErrNoNodesAvailable = errors.New("Unable to connect to any of the given nodes.")

ErrNoNodesAvailable is a potentially transitory error that occurs when the client is unable to connect succesfully to any node in the cluster.

var ErrTopicNotFound = errors.New("Topic not found on the node.")

ErrTopicNotFound is returned when the given topic could not be found on the node.

func ParseConnectionString Uses

func ParseConnectionString(constr string) (clusterID string, topic string, nodes []string)

ParseConnectionString takes a connection string and returns consituent parts:

Connection string format is: clusterID#topic@nodelist

clusterID is the optional ID of the cluster
#topic is the optional name of the topic
@nodelist is list of one or more nodes, comma separated.

The return result is the cluster ID, topic and a slice of node connection strings.

type Client Uses

type Client struct {
    // contains filtered or unexported fields
}

The Client object is a multi-goroutine safe object for communicating with the cluster of nodes that form the Forest Bus.

Each Client establishes an underlying rpc.Client connection to the nodes as required. As such there may be some contention on the connection between parallel calls.

func NewClient Uses

func NewClient(clusterID string, nodes []string, config ...ClientConfiguration) *Client

NewClient returns a Client object for connecting to a Forst Bus cluster.

The clusterID must match the clusterID that each of the given nodes was started with. The nodes list should contain every node in the cluster for full failover. At least one node must be provided.

func (*Client) Close Uses

func (client *Client) Close()

Close closes down all RPC connections that are currently in use. Further calls to the Client will reopen connections as required.

func (*Client) GetMessages Uses

func (client *Client) GetMessages(topic string, index int64, quantity int, wait bool) (msgs [][]byte, nextID int64, err error)

GetMessages returns a list of messages from the cluster.

If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case zero messages will be returned and the nextID will be set to zero. Zero messages will also be returned if the topic contains no messages. Once a message has been sent to the cluster in this topic the commit index will be recalculated and GetMessages will return as normal.

GetMessages will usually return more or fewer messages than the quantity requested. This ensures effeicient message retrieval from the node as messages are aligned to offset and cache boundaries. If any messages are available at the requested index then at least one message will be returned.

The first message in the msgs slice has an ID of index. The last message in the slice has ID of nextID - 1.

If the index requested is no longer available on this node (i.e. clean-up has removed old data) then zero messages will be returned and the nextID will be the index of the first available message.

If the messages returned bring the client up to the end of the available messages, the nextID will contain the index of what will become the next message when it has been sent. By setting wait to True and passing in the index returned by nextID, GetMessages will block until at least one new message is available, before returning that message/messages.

Code:

client := NewClient("test-cluster", []string{"localhost:3000,localhost:3001,localhost:3002"})

msgs, nextID, err := client.GetMessages("test-topic", 1, 100, true)

if err != nil {
    fmt.Printf("Error getting messages: %v\n", err)
    return
}

if len(msgs) == 0 && nextID == 0 {
    fmt.Printf("Commit index is zero due to a full cluster restart.)\n")
    return
}

fmt.Printf("Recieved %v messages with the next index being %v\n", len(msgs), nextID)

func (*Client) GetTopicMaxAvailableIndex Uses

func (client *Client) GetTopicMaxAvailableIndex(topic string) (maxAvailableIndex int64, err error)

GetTopicMaxAvailableIndex returns the maximum available index from the currently connected node for the given topic.

If the cluster has been completely shutdown and restarted (rather than a rolling restart of individual nodes) then the commit index may be zero, in which case the maxAvailableIndex will be zero. Once a message has been sent to the cluster in this topic the commit index will be recalculated and the maximum commit index will return as normal.

func (*Client) SendMessages Uses

func (client *Client) SendMessages(topic string, messages [][]byte, waitForCommit bool) (indexes []int64, err error)

SendMessages sends a batch of messages to the Forest Bus cluster.

Messages are a slice of slices of bytes. Sending many messages (hundreds) at once gives better through-put than sending individual messages. To easily batch messages together in this way please see the MessageBatcher documentation.

If waitForCommit is false then SendMessages will return as soon as the message has been saved on the leader node for this topic. If waitForCommit is true then SendMessages will only return once the messages have been replicated to a majority of the nodes in the cluster and are therefore committed.

Code:

client := NewClient("test-cluster", []string{"localhost:3000,localhost:3001,localhost:3002"})

msgs := make([][]byte, 0)

msgs = append(msgs, []byte("Message 1"))
msgs = append(msgs, []byte("Message 2"))

// Send the messages and wait for commit
ids, err := client.SendMessages("test-topic", msgs, true)

if err != nil {
    fmt.Printf("Error sending the messages: %v\n", err)
    return
}

if len(ids) == 2 {
    fmt.Printf("Message one at index %v, two at index %v\n", ids[0], ids[1])
}

type ClientConfiguration Uses

type ClientConfiguration func(*Client)

ClientConfiguration functions are able to configure options on a Client.

The ClientConfiguration type is exposed externally to facilitate building a list of options programmatically, e.g.:

options := make ([]forestbus.ClientConfiguration,0)
options = append (options, forestbus.ClientEnableDebug())
options = append (options, forestbus.ClientPreferredNode ("localhost:3000"))
client := forestbus.NewClient ("testcluster", []string{"localhost:3000,localhost:3001"}, options...)

func ClientEnableDebug Uses

func ClientEnableDebug() ClientConfiguration

ClientEnableDebug returns a ClientConfiguration that can be passed to NewClient to enable debug loggging in the library.

For example:

client := forestbus.NewClient ("testcluster", []string{"localhost:3000"}, forestbus.ClientEnableDebug())

func ShuffleNodes Uses

func ShuffleNodes() ClientConfiguration

ShuffleNodes randomises the order in which connections attempts are made to the nodes that have been given. Use of this configuration options helps spread the load of different clients that are doing GetMessages calls across the cluster.

type MessageBatcher Uses

type MessageBatcher struct {
    // contains filtered or unexported fields
}

A MessageBatcher is created using NewMessageBatcher and provides a simple way to batch messages for sending effeciently using an existing Client.

func NewMessageBatcher Uses

func NewMessageBatcher(client *Client, topic string, config ...MessageBatcherConfiguration) *MessageBatcher

NewMessageBatcher returns a new MessageBatcher using the underlying client and for the topic specified.

Optional configuration parameters can be passed to set the commit policy and buffer size.

func (*MessageBatcher) AsyncSendMessage Uses

func (batcher *MessageBatcher) AsyncSendMessage(message []byte, reference interface{}, replyChannel chan *SendResult, blockIfFull bool) (err error)

AsyncSendMessage queues the given message and sends it, along with any other queued messages, using the configuration given in NewMessageBatcher.

The caller MUST ensure that there is sufficient capacity on the replyChannel to avoid contention in the MessageBatcher. At a minimum set the size of replyChannel to 200 or MessageBatcherSetBufferSize, whichever is greater.

If the caller does not need to know the result of sending the message ("fire and forget"), then pass a nil replyChannel to the call.

If blockIfFull is true and the MessageBatcher buffer is full, AsyncSendMessage will block until there is space to queue the message. If blockIfFull is false and the buffer is full, the error ErrMessageBatcherBufferFull will be returned and no message will be sent on the replyChannel.

The given reference will be available in the SendResult.GetReference() call, which makes it easier to tie the result of the asynchronous call to objects that may be held. If this isn't required, pass nil for this value.

SendMessage and AsyncSendMessage can be safely used simultaneously on the same MessageBatcher.

Code:

client := NewClient("test-cluster", []string{"localhost:3000,localhost:3001,localhost:3002"})

batcher := NewMessageBatcher(client, "test-topic")

responseChannel := make(chan *SendResult, 500)

go func() {
    for i := 0; i < 1000; i++ {
        // Need to use a local copy if i so we can pass it's address as the reference
        localID := i
        // Send messages, blocking if the buffer is full.
        // There is no need to check the error response in this instance.
        batcher.AsyncSendMessage([]byte("Message"), &localID, responseChannel, true)
    }
}()

for responseCount := 0; responseCount < 1000; responseCount++ {
    result := <-responseChannel
    if result.GetError() != nil {
        fmt.Printf("Message %v encountered error: %v\n", *result.GetReference().(*int), result.GetError())
    } else {
        fmt.Printf("Message %v given index %v\n", *result.GetReference().(*int), result.GetIndex())
    }
}

func (*MessageBatcher) Close Uses

func (batcher *MessageBatcher) Close()

Close the MessageBatcher. Pending messages will still be sent if this can complete within 2s, otherwise they will error with ErrMessageBatcherClosed.

If follower nodes are significantly behind, or unreachable from the leader node, then some messages may still be waiting in-flight when Close returns. These will return with suitable errors when the underlying Client.Close() call is made.

Once the MessageBatcher is closed it cannot be reused. MessageBatcher.Close is multi-goroutine safe and can be called multiple times.

func (*MessageBatcher) SendMessage Uses

func (batcher *MessageBatcher) SendMessage(message []byte, blockIfFull bool) (index int64, err error)

SendMessage creates batches of messages and submits them to the Forest Bus cluster using the configuration given in NewMessageBatcher.

If there are no messages pending, the message is sent straight away. If a client.SendMessages call is in progress then the message will be batched with any others waiting to be sent and sent immediately that the previous call completes. If blockIfFull is true and the MessageBatcher buffer is full, SendMessage will block until there is space to queue the message. If blockIfFull is false and the buffer is full, index will be returned as zero and the error ErrMessageBatcherBufferFull will be returned.

SendMessage is synchronous and best used when multiple goroutines are generating messages that need to be sent to the same topic.

SendMessage and AsyncSendMessage can be safely used simultaneously on the same MessageBatcher.

Code:

client := NewClient("test-cluster", []string{"localhost:3000,localhost:3001,localhost:3002"})

batcher := NewMessageBatcher(client, "test-topic")

wg := sync.WaitGroup{}

// Parallel goroutines can use the batcher, resulting in individual messages being batched together
for i := 0; i < 20; i++ {
    wg.Add(1)
    go func() {
        // Send the message - if the internal buffer is full wait until it can be added.
        index, err := batcher.SendMessage([]byte("Message"), true)
        if err != nil {
            fmt.Printf("Error sending message: %v\n", err)
        } else {
            fmt.Printf("Message index %v returned\n", index)
        }
        wg.Done()
    }()
}

wg.Wait()

type MessageBatcherConfiguration Uses

type MessageBatcherConfiguration func(*MessageBatcher)

MessageBatcherConfiguration functions are able to configure options on a MessageBatcher.

The MessageBatcherConfiguration type is exposed externally to facilitate building a list of options programmatically, e.g.:

options := make ([]forestbus.MessageBatcherConfiguration,0)
options = append (options, forestbus.MessageBatcherDoNotWaitForCommit())
options = append (options, forestbus.MessageBatcherSetBufferSize(1000))
batcher := forestbus.NewMessageBatcher (client, "test-topic", options...)

func MessageBatcherDoNotWaitForCommit Uses

func MessageBatcherDoNotWaitForCommit() MessageBatcherConfiguration

MessageBatcherDoNotWaitForCommit configures the MessageBatcher to not wait on commit when sending messages to the cluster.

func MessageBatcherEnableDebug Uses

func MessageBatcherEnableDebug() MessageBatcherConfiguration

MessageBatcherEnableDebug returns a MessageBatcherConfiguration that can be passed to NewMessageBatcher to enable debug loggging in the library.

func MessageBatcherRetries Uses

func MessageBatcherRetries(maxRetries int) MessageBatcherConfiguration

MessageBatcherEnableRetries returns a MessageBatcherConfiguration that can be passed to NewMessageBatcher to enable automatic retries for transient (e.g. ErrNoNodesAvailable) errors.

Retries start at 500ms intervals, backing off at 500ms intervals until reaching 5s. If the MessageBatcher is closed while re-trying the error ErrNoNodesAvailable will be passed back to clients calling SendMessage and on the replyChannel of AsyncSendMessage.

The default withtout this configuration option is 0 (no retries), pass forestbus.UNLIMITED_RETRIES to retry without limit.

func MessageBatcherSetBufferSize Uses

func MessageBatcherSetBufferSize(size int) MessageBatcherConfiguration

MessageBatcherSetBufferSize allows the default buffer size of 200 to be changed.

type SendResult Uses

type SendResult struct {
    // contains filtered or unexported fields
}

SendResult encapsulates the results from an AsyncSendMessage call.

func (*SendResult) GetError Uses

func (res *SendResult) GetError() error

GetError returns the error encountered sending the message, or nil if it was succesfully sent.

func (*SendResult) GetIndex Uses

func (res *SendResult) GetIndex() int64

GetIndex returns the index of the sent message. It will have a value of zero if an error occured during the send.

func (*SendResult) GetMessage Uses

func (res *SendResult) GetMessage() []byte

GetMessage returns the original message passed to the AsyncSendMessage call.

func (*SendResult) GetReference Uses

func (res *SendResult) GetReference() interface{}

GetReference returns the reference pointer provided in the AsyncSendMessage call.

Directories

PathSynopsis
rapiThe rapi package contains the Go RPC method arguments and return values used by both the Forest Bus Server and Client implementations.

Package forestbus imports 10 packages (graph) and is imported by 5 packages. Updated 2016-07-18. Refresh now. Tools for package owners.