liftbridge

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

README

go-liftbridge CircleCI GoDoc

Go client for Liftbridge, a system that provides lightweight, fault-tolerant message streams for NATS.

Liftbridge provides the following high-level features:

  • Log-based API for NATS
  • Replicated for fault-tolerance
  • Horizontally scalable
  • Wildcard subscription support
  • At-least-once delivery support and message replay
  • Message key-value support
  • Log compaction by key

Installation

$ go get github.com/liftbridge-io/go-liftbridge

Basic Usage

package main

import (
	"fmt"

	lift "github.com/liftbridge-io/go-liftbridge"
	"github.com/liftbridge-io/go-liftbridge/liftbridge-grpc"
	"golang.org/x/net/context"
)

func main() {
	// Create Liftbridge client.
	addrs := []string{"localhost:9292", "localhost:9293", "localhost:9294"}
	client, err := lift.Connect(addrs)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Create a stream attached to the NATS subject "foo".
    	var (
        	subject = "foo"
        	name    = "foo-stream"
    	)
	if err := client.CreateStream(context.Background(), subject, name); err != nil {
		if err != lift.ErrStreamExists {
			panic(err)
		}
	}

	// Subscribe to the stream starting from the beginning.
	ctx := context.Background()
	if err := client.Subscribe(ctx, subject, name, func(msg *proto.Message, err error) {
		if err != nil {
			panic(err)
		}
		fmt.Println(msg.Offset, string(msg.Value))
	}, lift.StartAtEarliestReceived()); err != nil {
		panic(err)
	}

	<-ctx.Done()
}
Create Stream

Streams are a durable message log attached to a NATS subject. They record messages published to the subject for consumption.

Streams have a few key properties: a subject, which is the corresponding NATS subject, a name, which is a human-readable identifier for the stream, and a replication factor, which is the number of nodes the stream should be replicated to for redundancy. Optionally, there is a group which is the name of a load-balance group for the stream to join. When there are multiple streams in the same group, messages will be balanced among them.

// Create a stream attached to the NATS subject "foo.*" that is replicated to
// all the brokers in the cluster. ErrStreamExists is returned if a stream with
// the given name already exists for the subject.
client.CreateStream(context.Background(), "foo.*", "my-stream", lift.MaxReplication())
Subscription Start/Replay Options

Subscriptions are how Liftbridge streams are consumed. Clients can choose where to start consuming messages from in a stream. This is controlled using options passed to Subscribe.

// Subscribe starting with new messages only.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
})

// Subscribe starting with the most recently published value.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtLatestReceived())

// Subscribe starting with the oldest published value.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtEarliestReceived())

// Subscribe starting at a specific offset.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtOffset(42))

// Subscribe starting at a specific time.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtTime(time.Now()))

// Subscribe starting at a specific amount of time in the past.
client.Subscribe(ctx, stream.Subject, stream.Name, func(msg *proto.Message, err error) {
    fmt.Println(msg.Offset, string(msg.Value))
}, lift.StartAtTimeDelta(time.Minute))
Publishing

Since Liftbridge is simply an extension of NATS, a NATS client is used to publish messages. This means existing NATS publishers do not need any changes for messages to be consumed in Liftbridge.

package main

import "github.com/nats-io/go-nats"

func main() {
    // Connect to NATS.
    nc, _ := nats.Connect(nats.DefaultURL)

    // Publish a message.
    nc.Publish("foo.bar", []byte("Hello, world!")) 
    nc.Flush()
}

Liftbridge allows publishers to add metadata to messages, including a key, ack inbox, correlation ID, and ack policy. The message key can be used for stream compaction in Liftbridge. Acks are used to guarantee Liftbridge has recorded a message to ensure at-least-once delivery. The ack inbox determines a NATS subject to publish an acknowledgement to once Liftbridge has committed the message. The correlation id is used to correlate an ack back to the original message. The ack policy determines when Liftbridge acknowledges the message: when the stream leader has stored the message, when all replicas have stored it, or no ack at all.

This additional metadata is sent using a message envelope which is a protobuf. This client library provides APIs to make it easy to create envelopes and deal with acks.

var (
    ackInbox = "foo.acks"
    cid      = "some-random-id"
)

// Create a message envelope to publish.
msg := lift.NewMessage([]byte("Hello, world!"),
    lift.Key([]byte("foo")), // Key to set on the message
    lift.AckInbox(ackInbox), // Send ack to this NATS subject
    lift.AckPolicyAll(),     // Send ack once message is fully replicated
    lift.CorrelationID(cid), // Set the ID which will be sent on the ack
)

// Setup a NATS subscription for acks.
sub, _ := nc.SubscribeSync(ackInbox)

// Publish the message.
nc.Publish("foo.bar", msg)

// Wait for ack from Liftbridge.
resp, _ := sub.NextMsg(5*time.Second)
ack, _ := lift.UnmarshalAck(resp.Data)
if ack.CorrelationId == cid {
    fmt.Println("message acked!")
}

Documentation

Overview

Package liftbridge implements a client for the Liftbridge messaging system. Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation NATS. In particular, it offers a publish-subscribe log API that is highly available and horizontally scalable.

This package provides APIs for creating and consuming Liftbridge streams and some utility APIs for using Liftbridge in combination with NATS. Publishing messages to Liftbridge is handled by a NATS client since Liftbridge is simply an extension of NATS.

Index

Examples

Constants

View Source
const MaxReplicationFactor int32 = -1

MaxReplicationFactor can be used to tell the server to set the replication factor equal to the current number of servers in the cluster when creating a stream.

Variables

View Source
var (
	// ErrStreamExists is returned by CreateStream if the specified stream
	// already exists in the Liftbridge cluster.
	ErrStreamExists = errors.New("stream already exists")

	// ErrNoSuchStream is returned by Subscribe if the specified stream does
	// not exist in the Liftbridge cluster.
	ErrNoSuchStream = errors.New("stream does not exist")
)

Functions

func NewMessage

func NewMessage(value []byte, options ...MessageOption) []byte

NewMessage returns a serialized message for the given payload and options.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Flush()
defer conn.Close()

// Publish simple message.
msg := NewMessage([]byte("value"))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

// Publish message with options.
msg = NewMessage([]byte("value"),
	Key([]byte("key")),
	AckPolicyAll(),
	AckInbox("ack"),
	CorrelationID("123"),
)
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}
Output:

func UnmarshalAck

func UnmarshalAck(data []byte) (*proto.Ack, error)

UnmarshalAck deserializes an Ack from the given byte slice. It returns an error if the given data is not actually an Ack.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Close()

// Setup ack inbox.
ackInbox := "acks"
acked := make(chan struct{})
_, err = conn.Subscribe(ackInbox, func(m *nats.Msg) {
	ack, err := UnmarshalAck(m.Data)
	if err != nil {
		panic(err)
	}
	fmt.Println("ack:", ack.StreamSubject, ack.StreamName, ack.Offset, ack.MsgSubject)
	close(acked)
})
if err != nil {
	panic(err)
}

// Publish message.
msg := NewMessage([]byte("value"), Key([]byte("key")), AckInbox(ackInbox))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

<-acked
Output:

func UnmarshalMessage

func UnmarshalMessage(data []byte) (*proto.Message, bool)

UnmarshalMessage deserializes a message from the given byte slice. It returns a bool indicating if the given data was actually a Message or not.

Types

type Client

type Client interface {
	// Close the client connection.
	Close() error

	// CreateStream creates a new stream attached to a NATS subject. Subject is
	// the NATS subject the stream is attached to, and name is the stream
	// identifier, unique per subject. It returns ErrStreamExists if a stream
	// with the given subject and name already exists.
	CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error

	// Subscribe creates an ephemeral subscription for the given stream. It
	// begins receiving messages starting at the configured position and waits
	// for new messages when it reaches the end of the stream. The default
	// start position is the end of the stream. It returns an ErrNoSuchStream
	// if the given stream does not exist. Use a cancelable Context to close a
	// subscription.
	Subscribe(ctx context.Context, subject, name string, handler Handler, opts ...SubscriptionOption) error

	// Publish publishes a new message to the NATS subject. If the AckPolicy is
	// not NONE and a deadline is provided, this will synchronously block until
	// the first ack is received. If the ack is not received in time, a
	// DeadlineExceeded status code is returned. If an AckPolicy and deadline
	// are configured, this returns the first Ack on success, otherwise it
	// returns nil.
	Publish(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*proto.Ack, error)
}

Client is the main API used to communicate with a Liftbridge cluster. Call Connect to get a Client instance.

Example (CreateStream)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()
if err := client.CreateStream(context.Background(), "foo", "foo-stream"); err != nil {
	panic(err)
}
Output:

Example (Subscribe)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Subscribe to stream.
ctx := context.Background()
if err := client.Subscribe(ctx, "bar", "bar-stream", func(msg *proto.Message, err error) {
	if err != nil {
		panic(err)
	}
	fmt.Println(msg.Offset, string(msg.Value))
}); err != nil {
	panic(err)
}

<-ctx.Done()
Output:

func Connect

func Connect(addrs []string, options ...ClientOption) (Client, error)

Connect creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. Connect will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster.

Example
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()
Output:

type ClientOption

type ClientOption func(*ClientOptions) error

ClientOption is a function on the ClientOptions for a connection. These are used to configure particular client options.

func KeepAliveTime

func KeepAliveTime(keepAlive time.Duration) ClientOption

KeepAliveTime is a ClientOption to set the amount of time a pooled connection can be idle before it is closed and removed from the pool. The default is 30 seconds.

func MaxConnsPerBroker

func MaxConnsPerBroker(max int) ClientOption

MaxConnsPerBroker is a ClientOption to set the maximum number of connections to pool for a given broker in the cluster. The default is 2.

func ResubscribeWaitTime

func ResubscribeWaitTime(wait time.Duration) ClientOption

ResubscribeWaitTime is a ClientOption to set the amount of time to attempt to re-establish a stream subscription after being disconnected. For example, if the server serving a subscription dies and the stream is replicated, the client will attempt to re-establish the subscription once the stream leader has failed over. This failover can take several moments, so this option gives the client time to retry. The default is 30 seconds.

func TLSCert

func TLSCert(cert string) ClientOption

TLSCert is a ClientOption to set the TLS certificate for the client.

type ClientOptions

type ClientOptions struct {
	// Brokers it the set of hosts the client will use when attempting to
	// connect.
	Brokers []string

	// MaxConnsPerBroker is the maximum number of connections to pool for a
	// given broker in the cluster. The default is 2.
	MaxConnsPerBroker int

	// KeepAliveTime is the amount of time a pooled connection can be idle
	// before it is closed and removed from the pool. The default is 30
	// seconds.
	KeepAliveTime time.Duration

	// TLSCert is the TLS certificate file to use. The client does not use a
	// TLS connection if this is not set.
	TLSCert string

	// ResubscribeWaitTime is the amount of time to attempt to re-establish a
	// stream subscription after being disconnected. For example, if the server
	// serving a subscription dies and the stream is replicated, the client
	// will attempt to re-establish the subscription once the stream leader has
	// failed over. This failover can take several moments, so this option
	// gives the client time to retry. The default is 30 seconds.
	ResubscribeWaitTime time.Duration
}

ClientOptions are used to control the Client configuration.

func DefaultClientOptions

func DefaultClientOptions() ClientOptions

DefaultClientOptions returns the default configuration options for the client.

func (ClientOptions) Connect

func (o ClientOptions) Connect() (Client, error)

Connect will attempt to connect to a Liftbridge server with multiple options.

type Handler

type Handler func(msg *proto.Message, err error)

Handler is the callback invoked by Subscribe when a message is received on the specified stream. If err is not nil, the subscription will be terminated and no more messages will be received.

type MessageOption

type MessageOption func(*MessageOptions)

MessageOption is a function on the MessageOptions for a Message. These are used to configure particular optional Message fields.

func AckInbox

func AckInbox(ackInbox string) MessageOption

AckInbox is a MessageOption to set the NATS subject Liftbridge should publish the Message ack to. If it's not set, Liftbridge will not send an ack.

func AckPolicy

func AckPolicy(ackPolicy proto.AckPolicy) MessageOption

AckPolicy is a MessageOption that controls the behavior of Message acks sent by the server. By default, Liftbridge will send an ack when the stream leader has written the Message to its write-ahead log.

func AckPolicyAll

func AckPolicyAll() MessageOption

AckPolicyAll is a MessageOption that sets the AckPolicy of the Message to ALL. This means the Message ack will be sent when the message has been written to all replicas.

func AckPolicyLeader

func AckPolicyLeader() MessageOption

AckPolicyLeader is a MessageOption that sets the AckPolicy of the Message to LEADER. This means the Message ack will be sent when the stream leader has written it to its write-ahead log.

func AckPolicyNone

func AckPolicyNone() MessageOption

AckPolicyNone is a MessageOption that sets the AckPolicy of the Message to NONE. This means no ack will be sent.

func CorrelationID

func CorrelationID(correlationID string) MessageOption

CorrelationID is a MessageOption to set the identifier used to correlate an ack with the published Message. If it's not set, the ack will not have a correlation id.

func Key

func Key(key []byte) MessageOption

Key is a MessageOption to set the key on a Message. If Liftbridge has stream compaction enabled, the stream will retain only the last value for each key.

type MessageOptions

type MessageOptions struct {
	// Key to set on the Message. If Liftbridge has stream compaction enabled,
	// the stream will retain only the last value for each key.
	Key []byte

	// AckInbox sets the NATS subject Liftbridge should publish the Message ack
	// to. If it's not set, Liftbridge will not send an ack.
	AckInbox string

	// CorrelationID sets the identifier used to correlate an ack with the
	// published Message. If it's not set, the ack will not have a correlation
	// id.
	CorrelationID string

	// AckPolicy controls the behavior of Message acks sent by the server. By
	// default, Liftbridge will send an ack when the stream leader has written
	// the Message to its write-ahead log.
	AckPolicy proto.AckPolicy
}

MessageOptions are used to configure optional settings for a Message.

type StreamOption

type StreamOption func(*StreamOptions) error

StreamOption is a function on the StreamOptions for a stream. These are used to configure particular stream options.

func Group

func Group(group string) StreamOption

Group is a StreamOption to set the load-balance group for a stream. When there are multiple streams in the same group, messages will be balanced among them.

func MaxReplication

func MaxReplication() StreamOption

MaxReplication is a StreamOption to set the stream replication factor equal to the current number of servers in the cluster.

func ReplicationFactor

func ReplicationFactor(replicationFactor int32) StreamOption

ReplicationFactor is a StreamOption to set the replication factor for a stream. The replication factor controls the number of servers to replicate a stream to. E.g. a value of 1 would mean only 1 server would have the data, and a value of 3 would be 3 servers would have it. If this is not set, it defaults to 1. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster.

type StreamOptions

type StreamOptions struct {
	// Group is the name of a load-balance group. When there are multiple
	// streams in the same group, messages will be balanced among them.
	Group string

	// ReplicationFactor controls the number of servers to replicate a stream
	// to. E.g. a value of 1 would mean only 1 server would have the data, and
	// a value of 3 would be 3 servers would have it. If this is not set, it
	// defaults to 1. A value of -1 will signal to the server to set the
	// replication factor equal to the current number of servers in the
	// cluster.
	ReplicationFactor int32
}

StreamOptions are used to configure new streams.

type SubscriptionOption

type SubscriptionOption func(*SubscriptionOptions) error

SubscriptionOption is a function on the SubscriptionOptions for a subscription. These are used to configure particular subscription options.

func StartAt

func StartAt(start proto.StartPosition) SubscriptionOption

StartAt sets the desired start position for the stream.

func StartAtEarliestReceived

func StartAtEarliestReceived() SubscriptionOption

StartAtEarliestReceived sets the subscription start position to the earliest message received in the stream.

func StartAtLatestReceived

func StartAtLatestReceived() SubscriptionOption

StartAtLatestReceived sets the subscription start position to the last message received in the stream.

func StartAtOffset

func StartAtOffset(offset int64) SubscriptionOption

StartAtOffset sets the desired start offset to begin consuming from in the stream.

func StartAtTime

func StartAtTime(start time.Time) SubscriptionOption

StartAtTime sets the desired timestamp to begin consuming from in the stream.

func StartAtTimeDelta

func StartAtTimeDelta(ago time.Duration) SubscriptionOption

StartAtTimeDelta sets the desired timestamp to begin consuming from in the stream using a time delta in the past.

type SubscriptionOptions

type SubscriptionOptions struct {
	// StartPosition controls where to begin consuming from in the stream.
	StartPosition proto.StartPosition

	// StartOffset sets the stream offset to begin consuming from.
	StartOffset int64

	// StartTimestamp sets the stream start position to the given timestamp.
	StartTimestamp time.Time
}

SubscriptionOptions are used to control a subscription's behavior.

Directories

Path Synopsis
example
Package proto is a generated protocol buffer package.
Package proto is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL