Documentation ¶
Overview ¶
Package liftbridge implements a client for the Liftbridge messaging system. Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation NATS. In particular, it offers a publish-subscribe log API that is highly available and horizontally scalable.
This package provides APIs for creating and consuming Liftbridge streams and some utility APIs for using Liftbridge in combination with NATS. Publishing messages to Liftbridge is handled by a NATS client since Liftbridge is simply an extension of NATS.
Index ¶
- Constants
- Variables
- func NewMessage(value []byte, options ...MessageOption) []byte
- func UnmarshalAck(data []byte) (*proto.Ack, error)
- func UnmarshalMessage(data []byte) (*proto.Message, bool)
- type Client
- type ClientOption
- type ClientOptions
- type Handler
- type MessageOption
- type MessageOptions
- type StreamOption
- type StreamOptions
- type SubscriptionOption
- func StartAt(start proto.StartPosition) SubscriptionOption
- func StartAtEarliestReceived() SubscriptionOption
- func StartAtLatestReceived() SubscriptionOption
- func StartAtOffset(offset int64) SubscriptionOption
- func StartAtTime(start time.Time) SubscriptionOption
- func StartAtTimeDelta(ago time.Duration) SubscriptionOption
- type SubscriptionOptions
Examples ¶
Constants ¶
const MaxReplicationFactor int32 = -1
MaxReplicationFactor can be used to tell the server to set the replication factor equal to the current number of servers in the cluster when creating a stream.
Variables ¶
var ( // ErrStreamExists is returned by CreateStream if the specified stream // already exists in the Liftbridge cluster. ErrStreamExists = errors.New("stream already exists") // ErrNoSuchStream is returned by Subscribe if the specified stream does // not exist in the Liftbridge cluster. ErrNoSuchStream = errors.New("stream does not exist") )
Functions ¶
func NewMessage ¶
func NewMessage(value []byte, options ...MessageOption) []byte
NewMessage returns a serialized message for the given payload and options.
Example ¶
// Create NATS connection. conn, err := nats.GetDefaultOptions().Connect() if err != nil { panic(err) } defer conn.Flush() defer conn.Close() // Publish simple message. msg := NewMessage([]byte("value")) if err := conn.Publish("foo", msg); err != nil { panic(err) } // Publish message with options. msg = NewMessage([]byte("value"), Key([]byte("key")), AckPolicyAll(), AckInbox("ack"), CorrelationID("123"), ) if err := conn.Publish("foo", msg); err != nil { panic(err) }
Output:
func UnmarshalAck ¶
UnmarshalAck deserializes an Ack from the given byte slice. It returns an error if the given data is not actually an Ack.
Example ¶
// Create NATS connection. conn, err := nats.GetDefaultOptions().Connect() if err != nil { panic(err) } defer conn.Close() // Setup ack inbox. ackInbox := "acks" acked := make(chan struct{}) _, err = conn.Subscribe(ackInbox, func(m *nats.Msg) { ack, err := UnmarshalAck(m.Data) if err != nil { panic(err) } fmt.Println("ack:", ack.StreamSubject, ack.StreamName, ack.Offset, ack.MsgSubject) close(acked) }) if err != nil { panic(err) } // Publish message. msg := NewMessage([]byte("value"), Key([]byte("key")), AckInbox(ackInbox)) if err := conn.Publish("foo", msg); err != nil { panic(err) } <-acked
Output:
func UnmarshalMessage ¶
UnmarshalMessage deserializes a message from the given byte slice. It returns a bool indicating if the given data was actually a Message or not.
Types ¶
type Client ¶
type Client interface { // Close the client connection. Close() error // CreateStream creates a new stream attached to a NATS subject. Subject is // the NATS subject the stream is attached to, and name is the stream // identifier, unique per subject. It returns ErrStreamExists if a stream // with the given subject and name already exists. CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error // Subscribe creates an ephemeral subscription for the given stream. It // begins receiving messages starting at the configured position and waits // for new messages when it reaches the end of the stream. The default // start position is the end of the stream. It returns an ErrNoSuchStream // if the given stream does not exist. Use a cancelable Context to close a // subscription. Subscribe(ctx context.Context, subject, name string, handler Handler, opts ...SubscriptionOption) error // Publish publishes a new message to the NATS subject. If the AckPolicy is // not NONE and a deadline is provided, this will synchronously block until // the first ack is received. If the ack is not received in time, a // DeadlineExceeded status code is returned. If an AckPolicy and deadline // are configured, this returns the first Ack on success, otherwise it // returns nil. Publish(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*proto.Ack, error) }
Client is the main API used to communicate with a Liftbridge cluster. Call Connect to get a Client instance.
Example (CreateStream) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() if err := client.CreateStream(context.Background(), "foo", "foo-stream"); err != nil { panic(err) }
Output:
Example (Subscribe) ¶
// Connect to Liftbridge. addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close() // Subscribe to stream. ctx := context.Background() if err := client.Subscribe(ctx, "bar", "bar-stream", func(msg *proto.Message, err error) { if err != nil { panic(err) } fmt.Println(msg.Offset, string(msg.Value)) }); err != nil { panic(err) } <-ctx.Done()
Output:
func Connect ¶
func Connect(addrs []string, options ...ClientOption) (Client, error)
Connect creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. Connect will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster.
Example ¶
addr := "localhost:9292" client, err := Connect([]string{addr}) if err != nil { panic(err) } defer client.Close()
Output:
type ClientOption ¶
type ClientOption func(*ClientOptions) error
ClientOption is a function on the ClientOptions for a connection. These are used to configure particular client options.
func KeepAliveTime ¶
func KeepAliveTime(keepAlive time.Duration) ClientOption
KeepAliveTime is a ClientOption to set the amount of time a pooled connection can be idle before it is closed and removed from the pool. The default is 30 seconds.
func MaxConnsPerBroker ¶
func MaxConnsPerBroker(max int) ClientOption
MaxConnsPerBroker is a ClientOption to set the maximum number of connections to pool for a given broker in the cluster. The default is 2.
func ResubscribeWaitTime ¶
func ResubscribeWaitTime(wait time.Duration) ClientOption
ResubscribeWaitTime is a ClientOption to set the amount of time to attempt to re-establish a stream subscription after being disconnected. For example, if the server serving a subscription dies and the stream is replicated, the client will attempt to re-establish the subscription once the stream leader has failed over. This failover can take several moments, so this option gives the client time to retry. The default is 30 seconds.
func TLSCert ¶
func TLSCert(cert string) ClientOption
TLSCert is a ClientOption to set the TLS certificate for the client.
type ClientOptions ¶
type ClientOptions struct { // Brokers it the set of hosts the client will use when attempting to // connect. Brokers []string // MaxConnsPerBroker is the maximum number of connections to pool for a // given broker in the cluster. The default is 2. MaxConnsPerBroker int // KeepAliveTime is the amount of time a pooled connection can be idle // before it is closed and removed from the pool. The default is 30 // seconds. KeepAliveTime time.Duration // TLSCert is the TLS certificate file to use. The client does not use a // TLS connection if this is not set. TLSCert string // ResubscribeWaitTime is the amount of time to attempt to re-establish a // stream subscription after being disconnected. For example, if the server // serving a subscription dies and the stream is replicated, the client // will attempt to re-establish the subscription once the stream leader has // failed over. This failover can take several moments, so this option // gives the client time to retry. The default is 30 seconds. ResubscribeWaitTime time.Duration }
ClientOptions are used to control the Client configuration.
func DefaultClientOptions ¶
func DefaultClientOptions() ClientOptions
DefaultClientOptions returns the default configuration options for the client.
func (ClientOptions) Connect ¶
func (o ClientOptions) Connect() (Client, error)
Connect will attempt to connect to a Liftbridge server with multiple options.
type Handler ¶
type Handler func(msg *proto.Message, err error)
Handler is the callback invoked by Subscribe when a message is received on the specified stream. If err is not nil, the subscription will be terminated and no more messages will be received.
type MessageOption ¶
type MessageOption func(*MessageOptions)
MessageOption is a function on the MessageOptions for a Message. These are used to configure particular optional Message fields.
func AckInbox ¶
func AckInbox(ackInbox string) MessageOption
AckInbox is a MessageOption to set the NATS subject Liftbridge should publish the Message ack to. If it's not set, Liftbridge will not send an ack.
func AckPolicy ¶
func AckPolicy(ackPolicy proto.AckPolicy) MessageOption
AckPolicy is a MessageOption that controls the behavior of Message acks sent by the server. By default, Liftbridge will send an ack when the stream leader has written the Message to its write-ahead log.
func AckPolicyAll ¶
func AckPolicyAll() MessageOption
AckPolicyAll is a MessageOption that sets the AckPolicy of the Message to ALL. This means the Message ack will be sent when the message has been written to all replicas.
func AckPolicyLeader ¶
func AckPolicyLeader() MessageOption
AckPolicyLeader is a MessageOption that sets the AckPolicy of the Message to LEADER. This means the Message ack will be sent when the stream leader has written it to its write-ahead log.
func AckPolicyNone ¶
func AckPolicyNone() MessageOption
AckPolicyNone is a MessageOption that sets the AckPolicy of the Message to NONE. This means no ack will be sent.
func CorrelationID ¶
func CorrelationID(correlationID string) MessageOption
CorrelationID is a MessageOption to set the identifier used to correlate an ack with the published Message. If it's not set, the ack will not have a correlation id.
func Key ¶
func Key(key []byte) MessageOption
Key is a MessageOption to set the key on a Message. If Liftbridge has stream compaction enabled, the stream will retain only the last value for each key.
type MessageOptions ¶
type MessageOptions struct { // Key to set on the Message. If Liftbridge has stream compaction enabled, // the stream will retain only the last value for each key. Key []byte // AckInbox sets the NATS subject Liftbridge should publish the Message ack // to. If it's not set, Liftbridge will not send an ack. AckInbox string // CorrelationID sets the identifier used to correlate an ack with the // published Message. If it's not set, the ack will not have a correlation // id. CorrelationID string // AckPolicy controls the behavior of Message acks sent by the server. By // default, Liftbridge will send an ack when the stream leader has written // the Message to its write-ahead log. AckPolicy proto.AckPolicy }
MessageOptions are used to configure optional settings for a Message.
type StreamOption ¶
type StreamOption func(*StreamOptions) error
StreamOption is a function on the StreamOptions for a stream. These are used to configure particular stream options.
func Group ¶
func Group(group string) StreamOption
Group is a StreamOption to set the load-balance group for a stream. When there are multiple streams in the same group, messages will be balanced among them.
func MaxReplication ¶
func MaxReplication() StreamOption
MaxReplication is a StreamOption to set the stream replication factor equal to the current number of servers in the cluster.
func ReplicationFactor ¶
func ReplicationFactor(replicationFactor int32) StreamOption
ReplicationFactor is a StreamOption to set the replication factor for a stream. The replication factor controls the number of servers to replicate a stream to. E.g. a value of 1 would mean only 1 server would have the data, and a value of 3 would be 3 servers would have it. If this is not set, it defaults to 1. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster.
type StreamOptions ¶
type StreamOptions struct { // Group is the name of a load-balance group. When there are multiple // streams in the same group, messages will be balanced among them. Group string // ReplicationFactor controls the number of servers to replicate a stream // to. E.g. a value of 1 would mean only 1 server would have the data, and // a value of 3 would be 3 servers would have it. If this is not set, it // defaults to 1. A value of -1 will signal to the server to set the // replication factor equal to the current number of servers in the // cluster. ReplicationFactor int32 }
StreamOptions are used to configure new streams.
type SubscriptionOption ¶
type SubscriptionOption func(*SubscriptionOptions) error
SubscriptionOption is a function on the SubscriptionOptions for a subscription. These are used to configure particular subscription options.
func StartAt ¶
func StartAt(start proto.StartPosition) SubscriptionOption
StartAt sets the desired start position for the stream.
func StartAtEarliestReceived ¶
func StartAtEarliestReceived() SubscriptionOption
StartAtEarliestReceived sets the subscription start position to the earliest message received in the stream.
func StartAtLatestReceived ¶
func StartAtLatestReceived() SubscriptionOption
StartAtLatestReceived sets the subscription start position to the last message received in the stream.
func StartAtOffset ¶
func StartAtOffset(offset int64) SubscriptionOption
StartAtOffset sets the desired start offset to begin consuming from in the stream.
func StartAtTime ¶
func StartAtTime(start time.Time) SubscriptionOption
StartAtTime sets the desired timestamp to begin consuming from in the stream.
func StartAtTimeDelta ¶
func StartAtTimeDelta(ago time.Duration) SubscriptionOption
StartAtTimeDelta sets the desired timestamp to begin consuming from in the stream using a time delta in the past.
type SubscriptionOptions ¶
type SubscriptionOptions struct { // StartPosition controls where to begin consuming from in the stream. StartPosition proto.StartPosition // StartOffset sets the stream offset to begin consuming from. StartOffset int64 // StartTimestamp sets the stream start position to the given timestamp. StartTimestamp time.Time }
SubscriptionOptions are used to control a subscription's behavior.
Directories ¶
Path | Synopsis |
---|---|
example
|
|
Package proto is a generated protocol buffer package.
|
Package proto is a generated protocol buffer package. |