liftbridge

package module
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2022 License: Apache-2.0 Imports: 22 Imported by: 8

Documentation

Overview

Package liftbridge implements a client for the Liftbridge messaging system. Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation NATS. In particular, it offers a publish-subscribe log API that is highly available and horizontally scalable.

This package provides APIs for creating and consuming Liftbridge streams and some utility APIs for using Liftbridge in combination with NATS.

Index

Examples

Constants

View Source
const MaxReplicationFactor int32 = -1

MaxReplicationFactor can be used to tell the server to set the replication factor equal to the current number of servers in the cluster when creating a stream.

Variables

View Source
var (
	// ErrStreamExists is returned by CreateStream if the specified stream
	// already exists in the Liftbridge cluster.
	ErrStreamExists = errors.New("stream already exists")

	// ErrNoSuchStream is returned by DeleteStream if the specified stream does
	// not exist in the Liftbridge cluster.
	ErrNoSuchStream = errors.New("stream does not exist")

	// ErrNoSuchPartition is returned by Subscribe or Publish if the specified
	// stream partition does not exist in the Liftbridge cluster.
	ErrNoSuchPartition = errors.New("stream partition does not exist")

	// ErrStreamDeleted is sent to subscribers when the stream they are
	// subscribed to has been deleted.
	ErrStreamDeleted = errors.New("stream has been deleted")

	// ErrPartitionPaused is sent to subscribers when the stream partition they
	// are subscribed to has been paused.
	ErrPartitionPaused = errors.New("stream partition has been paused")

	// ErrAckTimeout indicates a publish ack was not received in time.
	ErrAckTimeout = errors.New("publish ack timeout")

	// ErrReadonlyPartition is returned when all messages have been read from a
	// read only stream, or when the subscribed to stop position has been
	// reached. It is also returned when attempting to publish to a readonly
	// partition.
	ErrReadonlyPartition = errors.New("readonly partition")
)

Functions

func NewMessage

func NewMessage(value []byte, options ...MessageOption) []byte

NewMessage returns a serialized message for the given payload and options.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Flush()
defer conn.Close()

// Publish simple message.
msg := NewMessage([]byte("value"))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

// Publish message with options.
msg = NewMessage([]byte("value"),
	Key([]byte("key")),
	AckPolicyAll(),
	AckInbox("ack"),
	CorrelationID("123"),
)
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}
Output:

Types

type Ack

type Ack struct {
	// contains filtered or unexported fields
}

Ack represents an acknowledgement that a message was committed to a stream partition.

func UnmarshalAck

func UnmarshalAck(data []byte) (*Ack, error)

UnmarshalAck deserializes an Ack from the given byte slice. It returns an error if the given data is not actually an Ack.

Example
// Create NATS connection.
conn, err := nats.GetDefaultOptions().Connect()
if err != nil {
	panic(err)
}
defer conn.Close()

// Setup ack inbox.
ackInbox := "acks"
acked := make(chan struct{})
_, err = conn.Subscribe(ackInbox, func(m *nats.Msg) {
	ack, err := UnmarshalAck(m.Data)
	if err != nil {
		panic(err)
	}
	fmt.Println("ack:", ack.Stream(), ack.Offset(), ack.MessageSubject())
	close(acked)
})
if err != nil {
	panic(err)
}

// Publish message.
msg := NewMessage([]byte("value"), Key([]byte("key")), AckInbox(ackInbox))
if err := conn.Publish("foo", msg); err != nil {
	panic(err)
}

<-acked
Output:

func (*Ack) AckInbox

func (a *Ack) AckInbox() string

AckInbox is the NATS subject the ack was published to.

func (*Ack) AckPolicy

func (a *Ack) AckPolicy() AckPolicy

AckPolicy sent on the message.

func (*Ack) CommitTimestamp added in v2.1.0

func (a *Ack) CommitTimestamp() time.Time

CommitTimestamp is the timestamp the message was committed.

func (*Ack) CorrelationID

func (a *Ack) CorrelationID() string

CorrelationID is the user-supplied value from the message.

func (*Ack) MessageSubject

func (a *Ack) MessageSubject() string

MessageSubject is the NATS subject the message was received on.

func (*Ack) Offset

func (a *Ack) Offset() int64

Offset is the partition offset the message was committed to.

func (*Ack) PartitionSubject

func (a *Ack) PartitionSubject() string

PartitionSubject is the NATS subject the partition is attached to.

func (*Ack) ReceptionTimestamp added in v2.1.0

func (a *Ack) ReceptionTimestamp() time.Time

ReceptionTimestamp is the timestamp the message was received by the server.

func (*Ack) Stream

func (a *Ack) Stream() string

Stream the Message was received on.

type AckHandler

type AckHandler func(ack *Ack, err error)

AckHandler is used to handle the results of asynchronous publishes to a stream. If the AckPolicy on the published message is not NONE, the handler will receive the ack once it's received from the cluster or an error if the message was not received successfully.

type AckPolicy

type AckPolicy int32

AckPolicy controls the behavior of message acknowledgements.

type AutoOffset added in v2.3.0

type AutoOffset int

AutoOffset determines behavior for where a consumer should start consuming a stream partition when the consumer group does not have a committed offset, e.g. because the group was just created.

type BrokerInfo

type BrokerInfo struct {
	// contains filtered or unexported fields
}

BrokerInfo contains information for a Liftbridge cluster node.

func (*BrokerInfo) Addr

func (b *BrokerInfo) Addr() string

Addr returns <host>:<port> for the broker server.

func (*BrokerInfo) Host

func (b *BrokerInfo) Host() string

Host of the broker server.

func (*BrokerInfo) ID

func (b *BrokerInfo) ID() string

ID of the broker.

func (*BrokerInfo) LeaderCount added in v2.2.0

func (b *BrokerInfo) LeaderCount() int32

Number of partition leaders exists on this broker.

func (*BrokerInfo) PartitionCount added in v2.2.0

func (b *BrokerInfo) PartitionCount() int32

Total number of partitions on this broker.

func (*BrokerInfo) Port

func (b *BrokerInfo) Port() int32

Port of the broker server.

type Client

type Client interface {
	// Close the client connection.
	Close() error

	// CreateStream creates a new stream attached to a NATS subject. Subject is
	// the NATS subject the stream is attached to, and name is the stream
	// identifier, unique per subject. It returns ErrStreamExists if a stream
	// with the given subject and name already exists.
	CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error

	// DeleteStream deletes a stream and all of its partitions. Name is the
	// stream identifier, globally unique.
	DeleteStream(ctx context.Context, name string) error

	// PauseStream pauses a stream and some or all of its partitions. Name is
	// the stream identifier, globally unique. It returns an ErrNoSuchPartition
	// if the given stream or partition does not exist. By default, this will
	// pause all partitions. A partition is resumed when it is published to via
	// the Liftbridge Publish API or ResumeAll is enabled and another partition
	// in the stream is published to.
	PauseStream(ctx context.Context, name string, opts ...PauseOption) error

	// SetStreamReadonly sets the readonly flag on a stream and some or all of
	// its partitions. Name is the stream identifier, globally unique. It
	// returns an ErrNoSuchPartition if the given stream or partition does not
	// exist. By default, this will set the readonly flag on all partitions.
	// Subscribers to a readonly partition will see their subscription ended
	// with a ErrReadonlyPartition error once all messages currently in the
	// partition have been read.
	SetStreamReadonly(ctx context.Context, name string, opts ...ReadonlyOption) error

	// Subscribe creates an ephemeral subscription for the given stream. It
	// begins receiving messages starting at the configured position and waits
	// for new messages when it reaches the end of the stream. The default
	// start position is the end of the stream.
	// ErrNoSuchPartition is returned if the given stream or partition does not
	// exist.
	// ErrReadonlyPartition is return to subscribers when all messages have been
	// read from a read only stream, or when the configured stop position is
	// reached.
	// Use a cancelable Context to close a subscription.
	Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error

	// Publish publishes a new message to the Liftbridge stream. The partition
	// that gets published to is determined by the provided partition or
	// Partitioner passed through MessageOptions, if any. If a partition or
	// Partitioner is not provided, this defaults to the base partition. This
	// partition determines the underlying NATS subject that gets published to.
	// To publish directly to a specific NATS subject, use the low-level
	// PublishToSubject API.
	//
	// If the AckPolicy is not NONE, this will synchronously block until the
	// ack is received. If the ack is not received in time, ErrAckTimeout is
	// returned. If AckPolicy is NONE, this returns nil on success.
	Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error)

	// PublishAsync publishes a new message to the Liftbridge stream and
	// asynchronously processes the ack or error for the message.
	PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error

	// PublishToSubject publishes a new message to the NATS subject. Note that
	// because this publishes directly to a subject, there may be multiple (or
	// no) streams that receive the message. As a result, MessageOptions
	// related to partitioning will be ignored. To publish at the
	// stream/partition level, use the high-level Publish API.
	//
	// If the AckPolicy is not NONE and a deadline is provided, this will
	// synchronously block until the first ack is received. If an ack is not
	// received in time, ErrAckTimeout is returned. If an AckPolicy and
	// deadline are configured, this returns the first Ack on success,
	// otherwise it returns nil.
	PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error)

	// FetchMetadata returns cluster metadata including broker and stream
	// information.
	FetchMetadata(ctx context.Context, opts ...MetadataOption) (*Metadata, error)

	// SetCursor persists a cursor position for a particular stream partition.
	// This can be used to checkpoint a consumer's position in a stream to
	// resume processing later.
	SetCursor(ctx context.Context, id, stream string, partition int32, offset int64) error

	// FetchCursor retrieves a cursor position for a particular stream
	// partition. It returns -1 if the cursor does not exist.
	FetchCursor(ctx context.Context, id, stream string, partition int32) (int64, error)

	// FetchPartitionMetadata retrieves the metadata of a particular partition
	FetchPartitionMetadata(ctx context.Context, stream string, partition int32) (*PartitionInfo, error)

	// CreateConsumer creates a consumer that is part of a consumer group which
	// consumes a set of streams. Liftbridge handles assigning partitions to
	// the group members and tracking the group's position in the streams.
	CreateConsumer(groupID string, opts ...ConsumerOption) (*Consumer, error)
}

Client is the main API used to communicate with a Liftbridge cluster. Call Connect to get a Client instance.

Example (CreateStream)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Create stream with a single partition.
if err := client.CreateStream(context.Background(), "foo", "foo-stream"); err != nil {
	panic(err)
}

// Create stream with three partitions.
if err := client.CreateStream(context.Background(), "bar", "bar-stream", Partitions(3)); err != nil {
	panic(err)
}
Output:

Example (Publish)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Publish message to base stream partition.
if _, err := client.Publish(context.Background(), "foo-stream", []byte("hello")); err != nil {
	panic(err)
}

// Publish message to stream partition based on key.
if _, err := client.Publish(context.Background(), "bar-stream", []byte("hello"),
	Key([]byte("key")), PartitionByKey(),
); err != nil {
	panic(err)
}
Output:

Example (PublishToSubject)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Publish message directly to NATS subject.
if _, err := client.PublishToSubject(context.Background(), "foo.bar", []byte("hello")); err != nil {
	panic(err)
}
Output:

Example (Subscribe)
// Connect to Liftbridge.
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()

// Subscribe to base stream partition.
if err := client.Subscribe(context.Background(), "foo-stream", func(msg *Message, err error) {
	if err != nil {
		panic(err)
	}
	fmt.Println(msg.Offset(), string(msg.Value()))
}); err != nil {
	panic(err)
}

// Subscribe to a specific stream partition.
ctx := context.Background()
if err := client.Subscribe(ctx, "bar-stream", func(msg *Message, err error) {
	if err != nil {
		panic(err)
	}
	fmt.Println(msg.Offset(), string(msg.Value()))
}, Partition(1)); err != nil {
	panic(err)
}

<-ctx.Done()
Output:

func Connect

func Connect(addrs []string, options ...ClientOption) (Client, error)

Connect creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. Connect will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster.

Example
addr := "localhost:9292"
client, err := Connect([]string{addr})
if err != nil {
	panic(err)
}
defer client.Close()
Output:

func ConnectCtx added in v2.1.0

func ConnectCtx(ctx context.Context, addrs []string, options ...ClientOption) (Client, error)

ConnectCtx creates a Client connection for the given Liftbridge cluster. Multiple addresses can be provided. ConnectCtx will use whichever it connects successfully to first in random order. The Client will use the pool of addresses for failover purposes. Note that only one seed address needs to be provided as the Client will discover the other brokers when fetching metadata for the cluster. The connection will be blocking if a deadline has been provided via the context.

type ClientOption

type ClientOption func(*ClientOptions) error

ClientOption is a function on the ClientOptions for a connection. These are used to configure particular client options.

func AckWaitTime

func AckWaitTime(wait time.Duration) ClientOption

AckWaitTime is a ClientOption to set the default amount of time to wait for an ack to be received for a published message before ErrAckTimeout is returned. This can be overridden on individual requests by setting a timeout on the Context. This defaults to 5 seconds if not set.

func KeepAliveTime deprecated

func KeepAliveTime(keepAlive time.Duration) ClientOption

KeepAliveTime is a ClientOption to set the amount of time a pooled connection can be idle before it is closed and removed from the pool. The default is 30 seconds.

Deprecated: the client now maintains one connection per broker.

func MaxConnsPerBroker deprecated

func MaxConnsPerBroker(max int) ClientOption

MaxConnsPerBroker is a ClientOption to set the maximum number of connections to pool for a given broker in the cluster. The default is 2.

Deprecated: the client now maintains one connection per broker.

func ReadBufferSize added in v2.1.0

func ReadBufferSize(readBufferSize int) ClientOption

ReadBufferSize is a ClientOption to set the read buffer size configuration for the client.

func ResubscribeWaitTime

func ResubscribeWaitTime(wait time.Duration) ClientOption

ResubscribeWaitTime is a ClientOption to set the amount of time to attempt to re-establish a stream subscription after being disconnected. For example, if the server serving a subscription dies and the stream is replicated, the client will attempt to re-establish the subscription once the stream leader has failed over. This failover can take several moments, so this option gives the client time to retry. The default is 30 seconds.

func SetConnectToLowLatencyServer added in v2.2.0

func SetConnectToLowLatencyServer() ClientOption

SetConnectToLowLatencyServer is a ClientOption to set the client to connect to server with lowest latency.

func SetConnectToLowWorkloadServer added in v2.2.0

func SetConnectToLowWorkloadServer() ClientOption

SetConnectToLowWorkloadServer is a ClientOption to set the client to connect to server with lowest work loqd.

func TLSCert

func TLSCert(cert string) ClientOption

TLSCert is a ClientOption to set the TLS certificate for the client.

func TLSConfig

func TLSConfig(config *tls.Config) ClientOption

TLSConfig is a ClientOption to set the TLS configuration for the client. Overrides TLSCert.

func WriteBufferSize added in v2.1.0

func WriteBufferSize(writeBufferSize int) ClientOption

WriteBufferSize is a ClientOption to set the write buffer size configuration for the client.

type ClientOptions

type ClientOptions struct {
	// Brokers it the set of hosts the client will use when attempting to
	// connect.
	Brokers []string

	// Deprecated: MaxConnsPerBroker is no longer used since the client now
	// maintains one connection per broker.
	MaxConnsPerBroker int

	// Deprecated: KeepAliveTime is no longer used since the client now
	// maintains one connection per broker.
	KeepAliveTime time.Duration

	// TLSCert is the TLS certificate file to use. The client does not use a
	// TLS connection if this is not set.
	TLSCert string

	// TLSConfig is the TLS configuration to use. The client does not use a
	// TLS connection if this is not set. Overrides TLSCert if set.
	TLSConfig *tls.Config

	// ResubscribeWaitTime is the amount of time to attempt to re-establish a
	// stream subscription after being disconnected. For example, if the server
	// serving a subscription dies and the stream is replicated, the client
	// will attempt to re-establish the subscription once the stream leader has
	// failed over. This failover can take several moments, so this option
	// gives the client time to retry. The default is 30 seconds.
	ResubscribeWaitTime time.Duration

	// AckWaitTime is the default amount of time to wait for an ack to be
	// received for a published message before ErrAckTimeout is returned. This
	// can be overridden on individual requests by setting a timeout on the
	// Context. This defaults to 5 seconds if not set.
	AckWaitTime time.Duration

	// ReadBufferSize configures the size of the read buffer for connections.
	// -1 will use the GRPC defaults. See
	// https://godoc.org/google.golang.org/grpc#WithReadBufferSize.
	ReadBufferSize int

	// WriteBufferSize configures the size of the write buffer for connections.
	// -1 will use the GRPC defaults. See
	// https://godoc.org/google.golang.org/grpc#WithWriteBufferSize.
	WriteBufferSize int

	// Select the server to connecto to based on specific criteria
	ServerSelection SelectionCriteria
}

ClientOptions are used to control the Client configuration.

func DefaultClientOptions

func DefaultClientOptions() ClientOptions

DefaultClientOptions returns the default configuration options for the client.

func (ClientOptions) Connect

func (o ClientOptions) Connect() (Client, error)

Connect will attempt to connect to a Liftbridge server with multiple options.

func (ClientOptions) ConnectCtx added in v2.1.0

func (o ClientOptions) ConnectCtx(ctx context.Context) (Client, error)

ConnectCtx will attempt to connect to a Liftbridge server with multiple options.

type Consumer added in v2.3.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a member of a consumer group. Consumer groups provide an API to better facilitate the consumption of Liftbridge streams. This encompasses several different but related goals: 1) Provide a mechanism for clients to track their position in a stream automatically, i.e. "durable" consumers. 2) Provide a mechanism for distributed, fault-tolerant stream consumption. 3) Provide a mechanism for coordinating and balancing stream consumption by managing partition assignments for consumers. 4) Provide a mechanism for consuming multiple streams in aggregate.

func (*Consumer) Checkpoint added in v2.3.0

func (c *Consumer) Checkpoint(ctx context.Context) error

Checkpoint commits the consumer's current offset positions for the stream partitions it is subscribed to. The offsets that are committed will be used after each group rebalance or on consumer startup to allow consumers to pick up where they left off. This is intended to be used if auto checkpointing is disabled and the consumer needs more fine-grained control over when offsets are committed, e.g. to avoid redelivery of processed messages.

func (*Consumer) Close added in v2.3.0

func (c *Consumer) Close() error

Close the consumer and remove them from the consumer group.

func (*Consumer) Subscribe added in v2.3.0

func (c *Consumer) Subscribe(ctx context.Context, streams []string, handler Handler) error

Subscribe begins consuming from assigned partitions. If no partitions are assigned to this consumer, this will wait for partitions to be assigned.

type ConsumerError added in v2.3.0

type ConsumerError struct {
	// contains filtered or unexported fields
}

ConsumerError is an error that occurs asynchronously on a consumer.

func (*ConsumerError) Error added in v2.3.0

func (c *ConsumerError) Error() string

Error returns the error string.

type ConsumerGroupInfo added in v2.3.0

type ConsumerGroupInfo struct {
	// contains filtered or unexported fields
}

ConsumerGroupInfo contains information for a consumer group.

func (*ConsumerGroupInfo) Coordinator added in v2.3.0

func (c *ConsumerGroupInfo) Coordinator() string

Coordinator of the consumer group.

func (*ConsumerGroupInfo) Epoch added in v2.3.0

func (c *ConsumerGroupInfo) Epoch() uint64

Epoch of the consumer group.

func (*ConsumerGroupInfo) ID added in v2.3.0

func (c *ConsumerGroupInfo) ID() string

ID of the consumer group.

type ConsumerOption added in v2.3.0

type ConsumerOption func(*ConsumerOptions) error

ConsumerOption is a function on the ConsumerOptions for a consumer. These are used to configure particular consumer options.

func AutoCheckpoint added in v2.3.0

func AutoCheckpoint(interval time.Duration) ConsumerOption

AutoCheckpoint determines the frequency in which the consumer's positions should be committed to Liftbridge. A value of 0 disables auto checkpointing. Defaults to 5 seconds if not set.

func AutoOffsetEarliest added in v2.3.0

func AutoOffsetEarliest() ConsumerOption

AutoOffsetEarliest sets the default start position to the earliest message received in the partition.

func AutoOffsetLatest added in v2.3.0

func AutoOffsetLatest() ConsumerOption

AutoOffsetLatest sets the default start position to the last message received in the partition.

func AutoOffsetNone added in v2.3.0

func AutoOffsetNone() ConsumerOption

AutoOffsetNone will cause an error to be sent on the Handler if no previous offset is found for the consumer group.

func ConsumerID added in v2.3.0

func ConsumerID(id string) ConsumerOption

ConsumerID uniquely identifies a logical consumer. This ID should not be reused across consumers. If a ConsumerID is not supplied, a random one will be generated.

func FetchAssignmentsInterval added in v2.3.0

func FetchAssignmentsInterval(f func(time.Duration) time.Duration) ConsumerOption

FetchAssignmentsInterval is a function which returns the frequency to fetch partition assignments from the consumer group coordinator. This also acts as a health check to keep the consumer active in the group. Increasing this too much may cause the group coordinator to think the consumer has failed and remove it from the group. The function argument is the timeout duration configured on the server. If not set, this will default to 0.4 * timeout.

type ConsumerOptions added in v2.3.0

type ConsumerOptions struct {
	// ConsumerID uniquely identifies a logical consumer. This ID should not be
	// reused across consumers. If a ConsumerID is not supplied, a random one
	// will be generated.
	ConsumerID string

	// AutoCheckpointInterval determines the frequency the consumer's positions
	// are committed to Liftbridge. A value of 0 disables auto checkpointing.
	// The default value is 5 seconds if not set.
	AutoCheckpointInterval time.Duration

	// AutoOffsetDefault determines the behavior for where a consumer should
	// start consuming a stream partition when the consumer group does not have
	// a committed offset, e.g. because the group was just created. If not set,
	// defaults to AutoOffsetNewOnly.
	AutoOffset AutoOffset

	// FetchAssignmentsInterval is a function which returns the frequency to
	// fetch partition assignments from the consumer group coordinator. This
	// also acts as a health check to keep the consumer active in the group.
	// Increasing this too much may cause the group coordinator to think the
	// consumer has failed and remove it from the group. The function argument
	// is the timeout duration configured on the server. If not set, this will
	// default to 0.4 * timeout.
	FetchAssignmentsInterval func(timeout time.Duration) time.Duration
}

ConsumerOptions are used to configure new consumers.

type Handler

type Handler func(msg *Message, err error)

Handler is the callback invoked by Subscribe when a message is received on the specified stream. If err is not nil, the subscription will be terminated and no more messages will be received.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message received from a Liftbridge stream.

func UnmarshalMessage

func UnmarshalMessage(data []byte) (*Message, error)

UnmarshalMessage deserializes a message from the given byte slice. It returns an error if the given data is not actually a Message.

func (*Message) Headers

func (m *Message) Headers() map[string][]byte

Headers is a set of key-value pairs.

func (*Message) Key

func (m *Message) Key() []byte

Key is an optional label set on a Message, useful for partitioning and stream compaction.

func (*Message) Offset

func (m *Message) Offset() int64

Offset is a monotonic message sequence in the stream partition.

func (*Message) Partition

func (m *Message) Partition() int32

Partition the Message was received on.

func (*Message) ReplySubject

func (m *Message) ReplySubject() string

ReplySubject is the NATS reply subject on the Message, if any.

func (*Message) Stream

func (m *Message) Stream() string

Stream the Message was received on.

func (*Message) Subject

func (m *Message) Subject() string

Subject is the NATS subject the Message was received on.

func (*Message) Timestamp

func (m *Message) Timestamp() time.Time

Timestamp is the time the Message was received by the server.

func (*Message) Value

func (m *Message) Value() []byte

Value is the Message payload.

type MessageOption

type MessageOption func(*MessageOptions)

MessageOption is a function on the MessageOptions for a Message. These are used to configure particular optional Message fields.

func AckInbox

func AckInbox(ackInbox string) MessageOption

AckInbox is a MessageOption to set the NATS subject Liftbridge should publish the Message ack to. If this is not set, the server will generate a random inbox.

func AckPolicyAll

func AckPolicyAll() MessageOption

AckPolicyAll is a MessageOption that sets the AckPolicy of the Message to ALL. This means the Message ack will be sent when the message has been written to all replicas.

func AckPolicyLeader

func AckPolicyLeader() MessageOption

AckPolicyLeader is a MessageOption that sets the AckPolicy of the Message to LEADER. This means the Message ack will be sent when the stream leader has written it to its write-ahead log.

func AckPolicyNone

func AckPolicyNone() MessageOption

AckPolicyNone is a MessageOption that sets the AckPolicy of the Message to NONE. This means no ack will be sent.

func CorrelationID

func CorrelationID(correlationID string) MessageOption

CorrelationID is a MessageOption to set the identifier used to correlate an ack with the published Message. If this is not set, the ack will not have a correlation id.

func ExpectedOffset added in v2.1.0

func ExpectedOffset(expectedOffset int64) MessageOption

ExpectedOffset set the value of expected offset after publishing the message. This is required for optimistic concurrency control

func Header(name string, value []byte) MessageOption

Header is a MessageOption that adds a single header to the Message. This may overwrite previously set headers.

func Headers

func Headers(headers map[string][]byte) MessageOption

Headers is a MessageOption that adds a set of headers to the Message. This may overwrite previously set headers.

func Key

func Key(key []byte) MessageOption

Key is a MessageOption to set the key on a Message. If Liftbridge has stream compaction enabled, the stream will retain only the last value for each key.

func PartitionBy

func PartitionBy(partitioner Partitioner) MessageOption

PartitionBy is a MessageOption that specifies a Partitioner used to map Messages to stream partitions.

func PartitionByKey

func PartitionByKey() MessageOption

PartitionByKey is a MessageOption that maps Messages to stream partitions based on a hash of the Message key. This computes the partition number for a given message by hashing the key and modding by the number of partitions for the first stream found with the subject of the published message. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.

func PartitionByRoundRobin

func PartitionByRoundRobin() MessageOption

PartitionByRoundRobin is a MessageOption that maps Messages to stream partitions in a round-robin fashion. This computes the partition number for a given message by atomically incrementing a counter for the message subject and modding by the number of partitions for the first stream found with the subject. This does not work with streams containing wildcards in their subjects, e.g. "foo.*", since this matches on the subject literal of the published message. This also has undefined behavior if there are multiple streams for the given subject.

func ToPartition

func ToPartition(partition int32) MessageOption

ToPartition is a MessageOption that specifies the stream partition to publish the Message to. If this is set, any Partitioner will not be used.

type MessageOptions

type MessageOptions struct {
	// Key to set on the Message. If Liftbridge has stream compaction enabled,
	// the stream will retain only the last value for each key.
	Key []byte

	// AckInbox sets the NATS subject Liftbridge should publish the Message ack
	// to. If this is not set, the server will generate a random inbox.
	AckInbox string

	// CorrelationID sets the identifier used to correlate an ack with the
	// published Message. If this is not set, the ack will not have a
	// correlation id.
	CorrelationID string

	// AckPolicy controls the behavior of Message acks sent by the server. By
	// default, Liftbridge will send an ack when the stream leader has written
	// the Message to its write-ahead log.
	AckPolicy AckPolicy

	// Headers are key-value pairs to set on the Message.
	Headers map[string][]byte

	// Partitioner specifies the strategy for mapping a Message to a stream
	// partition.
	Partitioner Partitioner

	// Partition specifies the stream partition to publish the Message to. If
	// this is set, any Partitioner will not be used. This is a pointer to
	// allow distinguishing between unset and 0.
	Partition *int32

	// ExpectedOffset set the value of the expected offset after publishing the message.
	// This is required in case Optimistic Concurrency Control is activted
	// by default, this value should be set to -1 to indicate next offset.
	ExpectedOffset int64
}

MessageOptions are used to configure optional settings for a Message.

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata contains an immutable snapshot of information for a cluster and subset of streams.

func (*Metadata) Addrs

func (m *Metadata) Addrs() []string

Addrs returns the list of known broker addresses.

func (*Metadata) Broker added in v2.3.0

func (m *Metadata) Broker(id string) *BrokerInfo

Broker returns the broker for the given id or nil if it doesn't exist.

func (*Metadata) Brokers

func (m *Metadata) Brokers() []*BrokerInfo

Brokers returns a list of the cluster nodes.

func (*Metadata) GetConsumerGroup added in v2.3.0

func (m *Metadata) GetConsumerGroup(id string) *ConsumerGroupInfo

GetConsumerGroup returns the consumer group for the given id or nil if it doesn't exist.

func (*Metadata) GetStream

func (m *Metadata) GetStream(name string) *StreamInfo

GetStream returns the given stream or nil if unknown.

func (*Metadata) LastUpdated

func (m *Metadata) LastUpdated() time.Time

LastUpdated returns the time when this metadata was last updated from the server.

func (*Metadata) PartitionCountForStream

func (m *Metadata) PartitionCountForStream(stream string) int32

PartitionCountForStream returns the number of partitions for the given stream.

func (*Metadata) Streams added in v2.1.0

func (m *Metadata) Streams() []*StreamInfo

Streams returns the list of known streams.

type MetadataOption added in v2.3.0

type MetadataOption func(*MetadataOptions) error

MetadataOption is a function on the MetadataOptions for FetchMetadata. These are used to configure FetchMetadata requests.

func ConsumerGroups added in v2.3.0

func ConsumerGroups(groupIDs []string) MetadataOption

ConsumerGroups is a MetadataOption which sets the IDs of the consumer groups to fetch metadata for. If this is not set, no consumer group metadata will be fetched.

func Streams added in v2.3.0

func Streams(names []string) MetadataOption

Streams is a MetadataOption which sets the names of the streams to fetch metadata for. If this is not set, metadata will be fetched for all streams.

type MetadataOptions added in v2.3.0

type MetadataOptions struct {
	// ConsumerGroups determines the consumer groups to fetch metadata for. If
	// not set, no consumer group metadata will be fetched.
	ConsumerGroups []string

	// Streams determines the streams to fetch metadata for. If not set,
	// metadata for all streams will be fetched.
	Streams []string
	// contains filtered or unexported fields
}

MetadataOptions are used to control FetchMetadata behavior.

type PartitionEventTimestamps added in v2.1.0

type PartitionEventTimestamps struct {
	// contains filtered or unexported fields
}

PartitionEventTimestamps contains the first and latest times when a partition event has occurred.

func (PartitionEventTimestamps) FirstTime added in v2.1.0

func (e PartitionEventTimestamps) FirstTime() time.Time

FirstTime returns the time when the first event occurred.

func (PartitionEventTimestamps) LatestTime added in v2.1.0

func (e PartitionEventTimestamps) LatestTime() time.Time

LatestTime returns the time when the latest event occurred.

type PartitionInfo

type PartitionInfo struct {
	// contains filtered or unexported fields
}

PartitionInfo contains information for a Liftbridge stream partition.

func (*PartitionInfo) HighWatermark added in v2.1.0

func (p *PartitionInfo) HighWatermark() int64

HighWatermark returns highwatermark of the partition leader

func (*PartitionInfo) ID

func (p *PartitionInfo) ID() int32

ID of the partition.

func (*PartitionInfo) ISR

func (p *PartitionInfo) ISR() []*BrokerInfo

ISR returns the list of replicas currently in the in-sync replica set.

func (*PartitionInfo) Leader

func (p *PartitionInfo) Leader() *BrokerInfo

Leader returns the broker acting as leader for this partition or nil if there is no leader.

func (*PartitionInfo) MessagesReceivedTimestamps added in v2.1.0

func (p *PartitionInfo) MessagesReceivedTimestamps() PartitionEventTimestamps

MessagesReceivedTimestamps returns the first and latest times a message was received on this partition.

func (*PartitionInfo) NewestOffset added in v2.1.0

func (p *PartitionInfo) NewestOffset() int64

NewestOffset returns newestoffset of the partition leader

func (*PartitionInfo) PauseTimestamps added in v2.1.0

func (p *PartitionInfo) PauseTimestamps() PartitionEventTimestamps

PauseTimestamps returns the first and latest time this partition was paused or resumed.

func (*PartitionInfo) Paused added in v2.1.0

func (p *PartitionInfo) Paused() bool

Paused returns true if this partition is paused.

func (*PartitionInfo) Readonly added in v2.1.0

func (p *PartitionInfo) Readonly() bool

Readonly returns true if this partition is read-only.

func (*PartitionInfo) ReadonlyTimestamps added in v2.1.0

func (p *PartitionInfo) ReadonlyTimestamps() PartitionEventTimestamps

ReadonlyTimestamps returns the first and latest time this partition had its read-only status changed.

func (*PartitionInfo) Replicas

func (p *PartitionInfo) Replicas() []*BrokerInfo

Replicas returns the list of brokers replicating the partition.

type Partitioner

type Partitioner interface {
	// Partition computes the partition number for a given message.
	Partition(stream string, key, value []byte, metadata *Metadata) int32
}

Partitioner is used to map a message to a stream partition.

type PauseOption

type PauseOption func(*PauseOptions) error

PauseOption is a function on the PauseOptions for a pause call. These are used to configure particular pausing options.

func PausePartitions

func PausePartitions(partitions ...int32) PauseOption

PausePartitions sets the list of partition to pause or all of them if nil/empty.

func ResumeAll

func ResumeAll() PauseOption

ResumeAll will resume all partitions in the stream if one of them is published to instead of resuming only that partition.

type PauseOptions

type PauseOptions struct {
	// Partitions sets the list of partitions to pause or all of them if
	// nil/empty.
	Partitions []int32

	// ResumeAll will resume all partitions in the stream if one of them is
	// published to instead of resuming only that partition.
	ResumeAll bool
}

PauseOptions are used to setup stream pausing.

type ReadonlyOption added in v2.1.0

type ReadonlyOption func(*ReadonlyOptions) error

ReadonlyOption is a function on the ReadonlyOptions for a set readonly call. These are used to configure particular set readonly options.

func Readonly added in v2.1.0

func Readonly(readonly bool) ReadonlyOption

Readonly defines if the partitions should be set to readonly or to readwrite.

func ReadonlyPartitions added in v2.1.0

func ReadonlyPartitions(partitions ...int32) ReadonlyOption

ReadonlyPartitions sets the list of partition on which to set the readonly flag or all of them if nil/empty.

type ReadonlyOptions added in v2.1.0

type ReadonlyOptions struct {
	// Partitions sets the list of partitions on which to set the readonly flag
	// or all of them if nil/empty.
	Partitions []int32

	// Readwrite defines if the partitions should be set to readonly (false) or
	// to readwrite (true). This field is called readwrite and not readonly so
	// that the default value corresponds to "enable readonly".
	Readwrite bool
}

ReadonlyOptions are used to setup stream readonly operations.

type SelectionCriteria added in v2.2.0

type SelectionCriteria int
const (
	Random SelectionCriteria = iota
	Latency
	Workload
)

type StartPosition

type StartPosition int32

StartPosition controls where to begin consuming in a stream.

type StopPosition added in v2.1.0

type StopPosition int32

StopPosition controls where to stop consuming in a stream.

type StreamInfo

type StreamInfo struct {
	// contains filtered or unexported fields
}

StreamInfo contains information for a Liftbridge stream.

func (*StreamInfo) CreationTime added in v2.1.0

func (s *StreamInfo) CreationTime() time.Time

CreationTime returns the time when the stream has been created.

func (*StreamInfo) GetPartition

func (s *StreamInfo) GetPartition(id int32) *PartitionInfo

GetPartition returns the partition info for the given partition id or nil if no such partition exists.

func (*StreamInfo) Name added in v2.1.0

func (s *StreamInfo) Name() string

Name returns the stream's name.

func (*StreamInfo) Partitions

func (s *StreamInfo) Partitions() map[int32]*PartitionInfo

Partitions returns a map containing partition IDs and partitions for the stream.

func (*StreamInfo) Subject added in v2.1.0

func (s *StreamInfo) Subject() string

Subject returns the stream's subject.

type StreamOption

type StreamOption func(*StreamOptions) error

StreamOption is a function on the StreamOptions for a stream. These are used to configure particular stream options.

func AutoPauseDisableIfSubscribers added in v2.1.0

func AutoPauseDisableIfSubscribers(val bool) StreamOption

AutoPauseDisableIfSubscribers sets the value of auto.pause.disable.if.subscribers. This controls whether automatic partition pausing should be disabled when there are subscribers. If this is not set, it uses the server default value.

func AutoPauseTime added in v2.1.0

func AutoPauseTime(val time.Duration) StreamOption

AutoPauseTime sets the value of auto.pause.time. This controls the amount of time a stream partition can go idle, i.e. not receive a message, before it is automatically paused. If this is not set, it uses the server default value.

func CleanerInterval

func CleanerInterval(val time.Duration) StreamOption

CleanerInterval sets the value of the cleaner.interval configuration for the stream. This controls the frequency to check if a new stream log segment file should be rolled and whether any segments are eligible for deletion based on the retention policy or compaction if enabled. If this is not set, it uses the server default value.

func CompactEnabled

func CompactEnabled(val bool) StreamOption

CompactEnabled sets the value of the compact.enabled configuration for the stream. This controls the activation of stream log compaction. If this is not set, it uses the server default value.

func CompactMaxGoroutines

func CompactMaxGoroutines(val int32) StreamOption

CompactMaxGoroutines sets the value of the compact.max.goroutines configuration for the stream. This controls the maximum number of concurrent goroutines to use for compaction on a stream log (only applicable if compact.enabled is true). If this is not set, it uses the server default value.

func Encryption added in v2.2.0

func Encryption(val bool) StreamOption

Encryption sets the value of Encryption, which enables encryption of data at rest on the server side for the given partition

func Group

func Group(group string) StreamOption

Group is a StreamOption to set the load-balance group for a stream. When there are multiple streams in the same group, messages will be balanced among them.

func MaxReplication

func MaxReplication() StreamOption

MaxReplication is a StreamOption to set the stream replication factor equal to the current number of servers in the cluster.

func MinISR added in v2.1.0

func MinISR(minISR int) StreamOption

MinISR overrides clustering.min.insync.replicas for the given stream. This controls the minimum number of replicas that must acknowledge a stream write before it can be committed. If this is not set, it uses the server default value.

func OptimisticConcurrencyControl added in v2.1.0

func OptimisticConcurrencyControl(val bool) StreamOption

OptimisticConcurrencyControl sets the value of OptimisticConcurrencyControl, which effectively enables the behavior to control concurrency message publish

func Partitions

func Partitions(partitions int32) StreamOption

Partitions is a StreamOption to set the number of partitions for a stream. Partitions are ordered, replicated, and durably stored on disk and serve as the unit of storage and parallelism for a stream. A partitioned stream for NATS subject "foo.bar" with three partitions internally maps to the NATS subjects "foo.bar", "foo.bar.1", and "foo.bar.2". A single partition would map to "foo.bar" to match behavior of an "un-partitioned" stream. If this is not set, it defaults to 1.

func ReplicationFactor

func ReplicationFactor(replicationFactor int32) StreamOption

ReplicationFactor is a StreamOption to set the replication factor for a stream. The replication factor controls the number of servers to replicate a stream to. E.g. a value of 1 would mean only 1 server would have the data, and a value of 3 would be 3 servers would have it. If this is not set, it defaults to 1. A value of -1 will signal to the server to set the replication factor equal to the current number of servers in the cluster.

func RetentionMaxAge

func RetentionMaxAge(val time.Duration) StreamOption

RetentionMaxAge sets the value of the retention.max.age configuration for the stream. This controls the TTL for stream log segment files, after which they are deleted. A value of 0 indicates no TTL. If this is not set, it uses the server default value.

func RetentionMaxBytes

func RetentionMaxBytes(val int64) StreamOption

RetentionMaxBytes sets the value of the retention.max.bytes configuration for the stream. This controls the maximum size a stream's log can grow to, in bytes, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.

func RetentionMaxMessages

func RetentionMaxMessages(val int64) StreamOption

RetentionMaxMessages sets the value of the retention.max.messages configuration for the stream. This controls the maximum size a stream's log can grow to, in number of messages, before we will discard old log segments to free up space. A value of 0 indicates no limit. If this is not set, it uses the server default value.

func SegmentMaxAge

func SegmentMaxAge(val time.Duration) StreamOption

SegmentMaxAge sets the value of the segment.max.age configuration for the stream. Thia controls the maximum time before a new stream log segment is rolled out. A value of 0 means new segments will only be rolled when segment.max.bytes is reached. Retention is always done a file at a time, so a larger value means fewer files but less granular control over retention. If this is not set, it uses the server default value.

func SegmentMaxBytes

func SegmentMaxBytes(val int64) StreamOption

SegmentMaxBytes sets the value of the segment.max.bytes configuration for the stream. This controls the maximum size of a single stream log segment file in bytes. Retention is always done a file at a time, so a larger segment size means fewer files but less granular control over retention. If this is not set, it uses the server default value.

type StreamOptions

type StreamOptions struct {
	// Group is the name of a load-balance group. When there are multiple
	// streams in the same group, messages will be balanced among them.
	Group string

	// ReplicationFactor controls the number of servers to replicate a stream
	// to. E.g. a value of 1 would mean only 1 server would have the data, and
	// a value of 3 would be 3 servers would have it. If this is not set, it
	// defaults to 1. A value of -1 will signal to the server to set the
	// replication factor equal to the current number of servers in the
	// cluster.
	ReplicationFactor int32

	// Partitions determines how many partitions to create for a stream. If 0,
	// this will behave as a stream with a single partition. If this is not
	// set, it defaults to 1.
	Partitions int32

	// The maximum size a stream's log can grow to, in bytes, before we will
	// discard old log segments to free up space. A value of 0 indicates no
	// limit. If this is not set, it uses the server default value.
	RetentionMaxBytes *int64

	// The maximum size a stream's log can grow to, in number of messages,
	// before we will discard old log segments to free up space. A value of 0
	// indicates no limit. If this is not set, it uses the server default
	// value.
	RetentionMaxMessages *int64

	// The TTL for stream log segment files, after which they are deleted. A
	// value of 0 indicates no TTL. If this is not set, it uses the server
	// default value.
	RetentionMaxAge *time.Duration

	// The frequency to check if a new stream log segment file should be rolled
	// and whether any segments are eligible for deletion based on the
	// retention policy or compaction if enabled. If this is not set, it uses
	// the server default value.
	CleanerInterval *time.Duration

	// The maximum size of a single stream log segment file in bytes. Retention
	// is always done a file at a time, so a larger segment size means fewer
	// files but less granular control over retention. If this is not set, it
	// uses the server default value.
	SegmentMaxBytes *int64

	// The maximum time before a new stream log segment is rolled out. A value
	// of 0 means new segments will only be rolled when segment.max.bytes is
	// reached. Retention is always done a file at a time, so a larger value
	// means fewer files but less granular control over retention. If this is
	// not set, it uses the server default value.
	SegmentMaxAge *time.Duration

	// The maximum number of concurrent goroutines to use for compaction on a
	// stream log (only applicable if compact.enabled is true). If this is not
	// set, it uses the server default value.
	CompactMaxGoroutines *int32

	// CompactEnabled controls the activation of stream log compaction. If this
	// is not set, it uses the server default value.
	CompactEnabled *bool

	// The amount of time a stream partition can go idle before it is
	// automatically paused. If this is not set, it uses the server default
	// value.
	AutoPauseTime *time.Duration

	// Disables automatic partition pausing when there are subscribers. If this
	// is not set, it uses the server default value.
	AutoPauseDisableIfSubscribers *bool

	// The minimum number of replicas that must acknowledge a stream write
	// before it can be committed. If this is not set, it uses the server
	// default value.
	MinISR *int

	// OptimisticConcurrencyControl controls the activation of optimistic concurrency control
	// of the stream
	OptimisticConcurrencyControl *bool

	// Encryption controls the activation of encryption data-at-rest on server side
	Encryption *bool
}

StreamOptions are used to configure new streams.

type SubscriptionOption

type SubscriptionOption func(*SubscriptionOptions) error

SubscriptionOption is a function on the SubscriptionOptions for a subscription. These are used to configure particular subscription options.

func Partition

func Partition(partition int32) SubscriptionOption

Partition specifies the stream partition to consume. If not set, this defaults to 0.

func ReadISRReplica

func ReadISRReplica() SubscriptionOption

ReadISRReplica sets read replica option. If true, the client will request subscription from an random ISR replica instead of subscribing explicitly to partition's leader. As a random ISR replica is given, it may well be the partition's leader itself.

func Resume added in v2.1.0

func Resume() SubscriptionOption

Resume controls if a paused partition can be resumed before subscription. If true, subscribing to a paused partition will resume it before subscribing to it instead of failing.

func StartAtEarliestReceived

func StartAtEarliestReceived() SubscriptionOption

StartAtEarliestReceived sets the subscription start position to the earliest message received in the stream.

func StartAtLatestReceived

func StartAtLatestReceived() SubscriptionOption

StartAtLatestReceived sets the subscription start position to the last message received in the stream.

func StartAtNewOnly added in v2.3.0

func StartAtNewOnly() SubscriptionOption

StartAtNewOnly sets the subscription start position to receive only new messages received in the stream.

func StartAtOffset

func StartAtOffset(offset int64) SubscriptionOption

StartAtOffset sets the desired start offset to begin consuming from in the stream.

func StartAtTime

func StartAtTime(start time.Time) SubscriptionOption

StartAtTime sets the desired timestamp to begin consuming from in the stream.

func StartAtTimeDelta

func StartAtTimeDelta(ago time.Duration) SubscriptionOption

StartAtTimeDelta sets the desired timestamp to begin consuming from in the stream using a time delta in the past.

func StopAtLatestReceived added in v2.1.0

func StopAtLatestReceived() SubscriptionOption

StopAtLatestReceived sets the subscription stop position to the last message received in the stream.

func StopAtOffset added in v2.1.0

func StopAtOffset(offset int64) SubscriptionOption

StopAtOffset sets the desired stop offset to stop consuming at in the stream.

func StopAtTime added in v2.1.0

func StopAtTime(stop time.Time) SubscriptionOption

StopAtTime sets the desired timestamp to stop consuming at in the stream.

type SubscriptionOptions

type SubscriptionOptions struct {
	// StartPosition controls where to begin consuming from in the stream.
	StartPosition StartPosition

	// StartOffset sets the stream offset to begin consuming from.
	StartOffset int64

	// StartTimestamp sets the stream start position to the given timestamp.
	StartTimestamp time.Time

	// StopPosition controls where to stop consuming in the stream.
	StopPosition StopPosition

	// StopOffset sets the stream offset to stop consuming at.
	StopOffset int64

	// StopTimestamp sets the stream stop position to the given timestamp.
	StopTimestamp time.Time

	// Partition sets the stream partition to consume.
	Partition int32

	// ReadISRReplica sets client's ability to subscribe from a random ISR.
	ReadISRReplica bool

	// Resume controls if a paused partition can be resumed before
	// subscription.
	Resume bool
	// contains filtered or unexported fields
}

SubscriptionOptions are used to control a subscription's behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL