jetstream

package
v0.0.0-...-8421924 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

README

JetStream Simplified Client

This doc covers the basic usage of the jetstream package in nats.go client.

Overview

jetstream package is a new client API to interact with NATS JetStream, aiming to replace the JetStream client implementation from nats package. The main goal of this package is to provide a simple and clear way to interact with JetStream API. Key differences between jetstream and nats packages include:

  • Using smaller, simpler interfaces to manage streams and consumers
  • Using more granular and predictable approach to consuming messages from a stream, instead of relying on often complicated and unpredictable Subscribe() method (and all of its flavors)
  • Allowing the usage of pull consumers to continuously receive incoming messages (including ordered consumer functionality)
  • Separating JetStream context from core NATS

jetstream package provides several ways of interacting with the API:

  • JetStream - top-level interface, used to create and manage streams, consumers and publishing messages
  • Stream - used to manage consumers for a specific stream, as well as performing stream-specific operations (purging, fetching and deleting messages by sequence number, fetching stream info)
  • Consumer - used to get information about a consumer as well as consuming messages
  • Msg - used for message-specific operations - reading data, headers and metadata, as well as performing various types of acknowledgements

NOTE: jetstream requires nats-server >= 2.9.0 to work correctly.

WARNING: The new API is currently provided as a preview, and will deprecate previous JetStream subscribe APIs. It is encouraged to start experimenting with the new APIs as soon as possible.

Basic usage

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    "github.com/ybm2dyd/nats.go"
    "github.com/ybm2dyd/nats.go/jetstream"
)

func main() {
    // In the `jetstream` package, almost all API calls rely on `context.Context` for timeout/cancellation handling
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    nc, _ := nats.Connect(nats.DefaultURL)

    // Create a JetStream management interface
    js, _ := jetstream.New(nc)

    // Create a stream
    s, _ := js.CreateStream(ctx, jetstream.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.*"},
    })

    // Publish some messages
    for i := 0; i < 100; i++ {
        js.Publish(ctx, "ORDERS.new", []byte("hello message "+strconv.Itoa(i)))
        fmt.Printf("Published hello message %d\n", i)
    }

    // Create durable consumer
    c, _ := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable:   "CONS",
        AckPolicy: jetstream.AckExplicitPolicy,
    })

    // Get 10 messages from the consumer
    messageCounter := 0
    msgs, _ := c.Fetch(10)
    for msg := range msgs.Messages() {
        msg.Ack()
        fmt.Printf("Received a JetStream message via fetch: %s\n", string(msg.Data()))
        messageCounter++
    }
    fmt.Printf("received %d messages\n", messageCounter)
    if msgs.Error() != nil {
        fmt.Println("Error during Fetch(): ", msgs.Error())
    }

    // Receive messages continuously in a callback
    cons, _ := c.Consume(func(msg jetstream.Msg) {
        msg.Ack()
        fmt.Printf("Received a JetStream message via callback: %s\n", string(msg.Data()))
        messageCounter++
    })
    defer cons.Stop()

    // Iterate over messages continuously
    it, _ := c.Messages()
    for i := 0; i < 10; i++ {
        msg, _ := it.Next()
        msg.Ack()
        fmt.Printf("Received a JetStream message via iterator: %s\n", string(msg.Data()))
        messageCounter++
    }
    it.Stop()

    // block until all 100 published messages have been processed
    for messageCounter < 100 {
        time.Sleep(10 * time.Millisecond)
    }
}

Streams

jetstream provides methods to manage and list streams, as well as perform stream-specific operations (purging, fetching/deleting messages by sequence id)

Stream management (CRUD)
js, _ := jetstream.New(nc)

// create a stream (this is an idempotent operation)
s, _ := js.CreateStream(ctx, jetstream.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"ORDERS.*"},
})

// update a stream
s, _ = js.UpdateStream(ctx, jetstream.StreamConfig{
    Name:        "ORDERS",
    Subjects:    []string{"ORDERS.*"},
    Description: "updated stream",
})

// get stream handle
s, _ = js.Stream(ctx, "ORDERS")

// delete a stream
js.DeleteStream(ctx, "ORDERS")
Listing streams and stream names
// list streams
streams := js.ListStreams(ctx)
for s := range streams.Info() {
    fmt.Println(s.Config.Name)
}
if streams.Err() != nil {
    fmt.Println("Unexpected error ocurred")
}

// list stream names
names := js.StreamNames(ctx)
for name := range names.Name() {
    fmt.Println(name)
}
if names.Err() != nil {
    fmt.Println("Unexpected error ocurred")
}
Stream-specific operations

Using Stream interface, it is also possible to:

  • Purge a stream
// remove all messages from a stream
_ = s.Purge(ctx)

// remove all messages from a stream that are stored on a specific subject
_ = s.Purge(ctx, jetstream.WithPurgeSubject("ORDERS.new"))

// remove all messages up to specified sequence number
_ = s.Purge(ctx, jetstream.WithPurgeSequence(100))

// remove messages, but keep 10 newest
_ = s.Purge(ctx, jetstream.WithPurgeKeep(10))
  • Get and messages from stream
// get message from stream with sequence number == 100
msg, _ := s.GetMsg(ctx, 100)

// get last message from "ORDERS.new" subject
msg, _ = s.GetLastMsgForSubject(ctx, "ORDERS.new")

// delete a message with sequence number == 100
_ = s.DeleteMsg(ctx, 100)
  • Get information about a stream
// Fetches latest stream info from server
info, _ := s.Info(ctx)
fmt.Println(info.Config.Name)

// Returns the most recently fetched StreamInfo, without making an API call to the server
cachedInfo := s.CachedInfo()
fmt.Println(cachedInfo.Config.Name)

Consumers

Only pull consumers are supported in jetstream package. However, unlike the JetStream API in nats package, pull consumers allow for continuous message retrieval (similarly to how nats.Subscribe() works). Because of that, push consumers can be easily replace by pull consumers for most of the use cases.

Consumers management

CRUD operations on consumers can be achieved on 2 levels:

  • on JetStream interface
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    Durable: "foo",
    AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
    AckPolicy: jetstream.AckExplicitPolicy,
})

// get consumer handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")

// delete a consumer
js.DeleteConsumer(ctx, "ORDERS", "foo")
  • on Stream interface
// Create a JetStream management interface
js, _ := jetstream.New(nc)

// get stream handle
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
    Durable:   "foo",
    AckPolicy: jetstream.AckExplicitPolicy,
})

// get consumer handle
cons, _ = stream.Consumer(ctx, "ORDERS", "foo")

// delete a consumer
stream.DeleteConsumer(ctx, "foo")

Consumer interface, returned when creating/fetching consumers, allows fetching ConsumerInfo:

// Fetches latest consumer info from server
info, _ := cons.Info(ctx)
fmt.Println(info.Config.Durable)

// Returns the most recently fetched ConsumerInfo, without making an API call to the server
cachedInfo := cons.CachedInfo()
fmt.Println(cachedInfo.Config.Durable)
Listing consumers and consumer names
// list consumers
consumers := s.ListConsumers(ctx)
for cons := range consumers.Info() {
    fmt.Println(cons.Name)
}
if consumers.Err() != nil {
    fmt.Println("Unexpected error ocurred")
}

// list consumer names
names := s.ConsumerNames(ctx)
for name := range names.Name() {
    fmt.Println(name)
}
if names.Err() != nil {
    fmt.Println("Unexpected error ocurred")
}
Ordered consumers

jetstream, in addition to basic named/ephemeral consumers, supports ordered consumer functionality. Ordered is strictly processing messages in the order that they were stored on the stream, providing a consistent and deterministic message ordering. It is also resilient to consumer deletion.

Ordered consumers present the same set of message consumption methods as standard pull consumers.

js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
    // Filter results from "ORDERS" stream by specific subject
    FilterSubjects: []{"ORDERS.A"},
})
Receiving messages from the consumer

The Consumer interface covers allows fetching messages on demand, with pre-defined batch size on bytes limit, or continuous push-like receiving of messages.

Single fetch

This pattern pattern allows fetching a defined number of messages in a single RPC.

  • Using Fetch or FetchBytes, consumer will return up to the provided number of messages/bytes. By default, Fetch() will wait 30 seconds before timing out (this behavior can be configured using FetchMaxWait() option):
// receive up to 10 messages from the stream
msgs, _ := c.Fetch(10)
for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}
if msgs.Error() != nil {
    // handle error
}

// receive up to 1024 B of data
msgs, _ := c.FetchBytes(1024)
for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}
if msgs.Error() != nil {
    // handle error
}

Similarly, FetchNoWait() can be used in order to only return messages from the stream available at the time of sending request:

// FetchNoWait will not wait for new messages if the whole batch is not available at the time of sending request.
msgs, _ := c.FetchNoWait(10)
for msg := range msgs.Messages() {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
}
if msgs.Error() != nil {
    // handle error
}

Warning: Both Fetch() and FetchNoWait() have worse performance when used to continuously retrieve messages in comparison to Messages() or Consume() methods, as they do not perform any optimizations (pre-buffering) and new subscription is created for each execution.

Continuous polling

There are 2 ways to achieve push-like behavior using pull consumers in jetstream package. Both Messages() and Consume() methods perform similar optimizations and for most cases can be used interchangeably.

There is an advantage of using Messages() instead of Consume() for work-queue scenarios, where messages should be fetched one by one, as it allows for finer control over fetching single messages on demand.

Subject filtering is achieved by configuring a consumer with a FilterSubject value.

Using Consume() receive messages in a callback
cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
    AckPolicy: jetstream.AckExplicitPolicy,
    // receive messages from ORDERS.A subject only
    FilterSubject: "ORDERS.A"
}))

consContext, _ := c.Consume(func(msg jetstream.Msg) {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
})
defer consContext.Stop()

Similarly to Messages(), Consume() can be supplied with options to modify the behavior of a single pull request:

  • PullMaxMessages(int) - up to provided number of messages will be buffered
  • PullMaxBytes(int) - up to provided number of bytes will be buffered. This setting and PullMaxMessages are mutually exclusive
  • PullExpiry(time.Duration) - timeout on a single pull request to the server type PullThresholdMessages int
  • PullThresholdMessages(int) - amount of messages which triggers refilling the buffer
  • PullThresholdBytes(int) - amount of bytes which triggers refilling the buffer
  • PullHeartbeat(time.Duration) - idle heartbeat duration for a single pull request. An error will be triggered if at least 2 heartbeats are missed
  • WithConsumeErrHandler(func (ConsumeContext, error)) - when used, sets a custom error handler on Consume(), allowing e.g. tracking missing heartbeats.

NOTE: Stop() should always be called on ConsumeContext to avoid leaking goroutines.

Using Messages() to iterate over incoming messages
iter, _ := cons.Messages()
for {
    msg, err := iter.Next()
    // Next can return error, e.g. when iterator is closed or no heartbeats were received
    if err != nil {
        //handle error
    }
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
    msg.Ack()
}
iter.Stop()

It can also be configured to only store up to defined number of messages/bytes in the buffer.

// a maximum of 10 messages or 1024 bytes will be stored in memory (whichever is encountered first)
iter, _ := cons.Messages(jetstream.PullMaxMessages(10), jetstream.PullMaxBytes(1024))

Messages() exposes the following options:

  • PullMaxMessages(int) - up to provided number of messages will be buffered
  • PullMaxBytes(int) - up to provided number of bytes will be buffered. This setting and PullMaxMessages are mutually exclusive
  • PullExpiry(time.Duration) - timeout on a single pull request to the server type PullThresholdMessages int
  • PullThresholdMessages(int) - amount of messages which triggers refilling the buffer
  • PullThresholdBytes(int) - amount of bytes which triggers refilling the buffer
  • PullHeartbeat(time.Duration) - idle heartbeat duration for a single pull request. An error will be triggered if at least 2 heartbeats are missed (unless WithMessagesErrOnMissingHeartbeat(false) is used)
Using Messages() to fetch single messages one by one

When implementing work queue, it is possible to use Messages() in order to fetch messages from the server one-by-one, without optimizations and pre-buffering (to avoid redeliveries when processing messages at slow rate).

// PullMaxMessages determines how many messages will be sent to the client in a single pull request
iter, _ := cons.Messages(jetstream.PullMaxMessages(1))
numWorkers := 5
sem := make(chan struct{}, numWorkers)
for {
    sem <- struct{}{}
    go func() {
        defer func() {
            <-sem
        }()
        msg, err := iter.Next()
        if err != nil {
            // handle err
        }
        fmt.Printf("Processing msg: %s\n", string(msg.Data()))
        doWork()
        msg.Ack()
    }()
}

Publishing on stream

JetStream interface allows publishing messages on stream in 2 ways:

Synchronous publish
js, _ := jetstream.New(nc)

// Publish message on subject ORDERS.new
// Given subject has to belong to a stream
ack, err := js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
})
fmt.Printf("Published msg with sequence number %d on stream %q", ack.Sequence, ack.Stream)

// A helper method accepting subject and data as parameters
ack, err = js.Publish(ctx, "ORDERS.new", []byte("hello"))

Both Publish() and PublishMsg() can be supplied with options allowing setting various headers. Additionally, for PublishMsg() headers can be set directly on nats.Msg.

// All 3 implementations are work identically 
ack, err := js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
    Header: nats.Header{
        "Nats-Msg-Id": []string{"id"},
    },
})

ack, err = js.PublishMsg(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
}, jetstream.WithMsgID("id"))

ack, err = js.Publish(ctx, "ORDERS.new", []byte("hello"), jetstream.WithMsgID("id"))
Async publish
js, _ := jetstream.New(nc)

// publish message and do not wait for ack
ackF, err := js.PublishMsgAsync(ctx, &nats.Msg{
    Data:    []byte("hello"),
    Subject: "ORDERS.new",
})

// block and wait for ack
select {
case ack := <-ackF.Ok():
    fmt.Printf("Published msg with sequence number %d on stream %q", ack.Sequence, ack.Stream)
case err := <-ackF.Err():
    fmt.Println(err)
}

// similarly to syncronous publish, there is a helper method accepting subject and data
ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))

Just as for synchronous publish, PublishAsync() and PublishMsgAsync() accept options for setting headers.

Examples

You can find more examples of jetstream usage here.

Documentation

Index

Constants

View Source
const (
	MsgIDHeader               = "Nats-Msg-Id"
	ExpectedStreamHeader      = "Nats-Expected-Stream"
	ExpectedLastSeqHeader     = "Nats-Expected-Last-Sequence"
	ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence"
	ExpectedLastMsgIDHeader   = "Nats-Expected-Last-Msg-Id"
	MsgRollup                 = "Nats-Rollup"
)
View Source
const (
	StreamHeader       = "Nats-Stream"
	SequenceHeader     = "Nats-Sequence"
	TimeStampHeaer     = "Nats-Time-Stamp"
	SubjectHeader      = "Nats-Subject"
	LastSequenceHeader = "Nats-Last-Sequence"
)

Headers for republished messages and direct gets.

View Source
const (
	MsgRollupSubject = "sub"
	MsgRollupAll     = "all"
)

Rollups, can be subject only or all messages.

View Source
const (
	// Default time wait between retries on Publish if err is NoResponders.
	DefaultPubRetryWait = 250 * time.Millisecond

	// Default number of retries
	DefaultPubRetryAttempts = 2
)
View Source
const (
	DefaultMaxMessages = 500
	DefaultExpires     = 30 * time.Second
	DefaultHeartbeat   = 5 * time.Second
)
View Source
const (
	// DefaultAPIPrefix is the default prefix for the JetStream API.
	DefaultAPIPrefix = "$JS.API."
)

Request API subjects for JetStream.

Variables

View Source
var (

	// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled.
	ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}

	// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
	ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabledForAccount, Description: "jetstream not enabled for account", Code: 503}}

	// ErrStreamNotFound is an error returned when stream with given name does not exist.
	ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

	// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
	ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

	// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
	ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

	// ErrMsgNotFound is returned when message with provided sequence number does not exist.
	ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

	// ErrBadRequest is returned when invalid request is sent to JetStream API.
	ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}

	// ErrConsumerCreate is returned when nats-server reports error when creating consumer (e.g. illegal update).
	ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}

	// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
	ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

	// ErrInvalidJSAck is returned when JetStream ack from message publish is invalid.
	ErrInvalidJSAck JetStreamError = &jsError{message: "invalid jetstream publish response"}

	// ErrStreamNameRequired is returned when the provided stream name is empty.
	ErrStreamNameRequired JetStreamError = &jsError{message: "stream name is required"}

	// ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once.
	ErrMsgAlreadyAckd JetStreamError = &jsError{message: "message was already acknowledged"}

	// ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error).
	ErrNoStreamResponse JetStreamError = &jsError{message: "no response from stream"}

	// ErrNotJSMessage is returned when attempting to get metadata from non JetStream message.
	ErrNotJSMessage JetStreamError = &jsError{message: "not a jetstream message"}

	// ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.').
	ErrInvalidStreamName JetStreamError = &jsError{message: "invalid stream name"}

	// ErrInvalidSubject is returned when the provided subject name is invalid.
	ErrInvalidSubject JetStreamError = &jsError{message: "invalid subject name"}

	// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.').
	ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}

	// ErrNoMessages is returned when no messages are currently available for a consumer.
	ErrNoMessages = &jsError{message: "no messages"}

	// ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set on a pull request.
	ErrMaxBytesExceeded = &jsError{message: "message size exceeds max bytes"}

	// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist.
	ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

	// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed.
	ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "leadership change"}

	// ErrHandlerRequired is returned when no handler func is provided in Stream().
	ErrHandlerRequired = &jsError{message: "handler cannot be empty"}

	// ErrEndOfData is returned when iterating over paged API from JetStream reaches end of data.
	ErrEndOfData = errors.New("nats: end of data reached")

	// ErrNoHeartbeat is received when no message is received in IdleHeartbeat time (if set).
	ErrNoHeartbeat = &jsError{message: "no heartbeat received"}

	// ErrConsumerHasActiveSubscription is returned when a consumer is already subscribed to a stream.
	ErrConsumerHasActiveSubscription = &jsError{message: "consumer has active subscription"}

	// ErrMsgNotBound is returned when given message is not bound to any subscription.
	ErrMsgNotBound = &jsError{message: "message is not bound to subscription/connection"}

	// ErrMsgNoReply is returned when attempting to reply to a message without a reply subject.
	ErrMsgNoReply = &jsError{message: "message does not have a reply"}

	// ErrMsgDeleteUnsuccessful is returned when an attempt to delete a message is unsuccessful.
	ErrMsgDeleteUnsuccessful = &jsError{message: "message deletion unsuccessful"}

	// ErrAsyncPublishReplySubjectSet is returned when reply subject is set on async message publish.
	ErrAsyncPublishReplySubjectSet = &jsError{message: "reply subject should be empty"}

	// ErrTooManyStalledMsgs is returned when too many outstanding async messages are waiting for ack.
	ErrTooManyStalledMsgs = &jsError{message: "stalled with too many outstanding async published messages"}

	// ErrInvalidOption is returned when there is a collision between options.
	ErrInvalidOption = &jsError{message: "invalid jetstream option"}

	// ErrMsgIteratorClosed is returned when attempting to get message from a closed iterator.
	ErrMsgIteratorClosed = &jsError{message: "messages iterator closed"}

	// ErrOrderedConsumerReset is returned when resetting ordered consumer fails due to too many attempts.
	ErrOrderedConsumerReset = &jsError{message: "recreating ordered consumer"}

	// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already used to process
	// messages using Fetch (or FetchBytes).
	ErrOrderConsumerUsedAsFetch = &jsError{message: "ordered consumer initialized as fetch"}

	// ErrOrderConsumerUsedAsConsume is returned when ordered consumer was already used to process
	// messages using Consume or Messages.
	ErrOrderConsumerUsedAsConsume = &jsError{message: "ordered consumer initialized as consume"}

	// ErrOrderedConsumerConcurrentRequests is returned when attempting to run concurrent operations
	// on ordered consumers.
	ErrOrderedConsumerConcurrentRequests = &jsError{message: "cannot run concurrent processing using ordered consumer"}

	// ErrOrderedConsumerNotCreated is returned when trying to get consumer info of an
	// ordered consumer which was not yet created.
	ErrOrderedConsumerNotCreated = &jsError{message: "consumer instance not yet created"}
)

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Code        int       `json:"code"`
	ErrorCode   ErrorCode `json:"err_code"`
	Description string    `json:"description,omitempty"`
}

APIError is included in all API responses if there was an error.

func (*APIError) APIError

func (e *APIError) APIError() *APIError

APIError implements the JetStreamError interface.

func (*APIError) Error

func (e *APIError) Error() string

Error prints the JetStream API error code and description

func (*APIError) Is

func (e *APIError) Is(err error) bool

Is matches against an APIError.

type APIStats

type APIStats struct {
	Total  uint64 `json:"total"`
	Errors uint64 `json:"errors"`
}

APIStats reports on API calls to JetStream for this account.

type AccountInfo

type AccountInfo struct {
	Memory    uint64        `json:"memory"`
	Store     uint64        `json:"storage"`
	Streams   int           `json:"streams"`
	Consumers int           `json:"consumers"`
	Domain    string        `json:"domain"`
	API       APIStats      `json:"api"`
	Limits    AccountLimits `json:"limits"`
}

AccountInfo contains info about the JetStream usage from the current account.

type AccountLimits

type AccountLimits struct {
	MaxMemory    int64 `json:"max_memory"`
	MaxStore     int64 `json:"max_storage"`
	MaxStreams   int   `json:"max_streams"`
	MaxConsumers int   `json:"max_consumers"`
}

AccountLimits includes the JetStream limits of the current account.

type AckPolicy

type AckPolicy int

AckPolicy determines how the consumer should acknowledge delivered messages.

const (
	// AckExplicitPolicy requires ack or nack for all messages.
	AckExplicitPolicy AckPolicy = iota

	// AckAllPolicy when acking a sequence number, this implicitly acks all
	// sequences below this one as well.
	AckAllPolicy

	// AckNonePolicy requires no acks for delivered messages.
	AckNonePolicy
)

func (AckPolicy) MarshalJSON

func (p AckPolicy) MarshalJSON() ([]byte, error)

func (AckPolicy) String

func (p AckPolicy) String() string

func (*AckPolicy) UnmarshalJSON

func (p *AckPolicy) UnmarshalJSON(data []byte) error

type ClientTrace

type ClientTrace struct {
	RequestSent      func(subj string, payload []byte)
	ResponseReceived func(subj string, payload []byte, hdr nats.Header)
}

ClientTrace can be used to trace API interactions for the JetStream Context.

type ClusterInfo

type ClusterInfo struct {
	Name     string      `json:"name,omitempty"`
	Leader   string      `json:"leader,omitempty"`
	Replicas []*PeerInfo `json:"replicas,omitempty"`
}

ClusterInfo shows information about the underlying set of servers that make up the stream or consumer.

type ConsumeContext

type ConsumeContext interface {
	Stop()
}

type ConsumeErrHandlerFunc

type ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error)

type Consumer

type Consumer interface {
	// Fetch is used to retrieve up to a provided number of messages from a stream.
	// This method will always send a single request and wait until either all messages are retreived
	// or request times out.
	Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
	// FetchBytes is used to retrieve up to a provided bytes from the stream.
	// This method will always send a single request and wait until provided number of bytes is
	// exceeded or request times out.
	FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)
	// FetchNoWait is used to retrieve up to a provided number of messages from a stream.
	// This method will always send a single request and immediately return up to a provided number of messages.
	FetchNoWait(batch int) (MessageBatch, error)
	// Consume can be used to continuously receive messages and handle them with the provided callback function
	Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error)
	// Messages returns [MessagesContext], allowing continuously iterating over messages on a stream.
	Messages(opts ...PullMessagesOpt) (MessagesContext, error)
	// Next is used to retrieve the next message from the stream.
	// This method will block until the message is retrieved or timeout is reached.
	Next(opts ...FetchOpt) (Msg, error)

	// Info returns Consumer details
	Info(context.Context) (*ConsumerInfo, error)
	// CachedInfo returns [*ConsumerInfo] cached on a consumer struct
	CachedInfo() *ConsumerInfo
}

Consumer contains methods for fetching/processing messages from a stream, as well as fetching consumer info

type ConsumerConfig

type ConsumerConfig struct {
	Name               string          `json:"name,omitempty"`
	Durable            string          `json:"durable_name,omitempty"`
	Description        string          `json:"description,omitempty"`
	DeliverPolicy      DeliverPolicy   `json:"deliver_policy"`
	OptStartSeq        uint64          `json:"opt_start_seq,omitempty"`
	OptStartTime       *time.Time      `json:"opt_start_time,omitempty"`
	AckPolicy          AckPolicy       `json:"ack_policy"`
	AckWait            time.Duration   `json:"ack_wait,omitempty"`
	MaxDeliver         int             `json:"max_deliver,omitempty"`
	BackOff            []time.Duration `json:"backoff,omitempty"`
	FilterSubject      string          `json:"filter_subject,omitempty"`
	ReplayPolicy       ReplayPolicy    `json:"replay_policy"`
	RateLimit          uint64          `json:"rate_limit_bps,omitempty"` // Bits per sec
	SampleFrequency    string          `json:"sample_freq,omitempty"`
	MaxWaiting         int             `json:"max_waiting,omitempty"`
	MaxAckPending      int             `json:"max_ack_pending,omitempty"`
	HeadersOnly        bool            `json:"headers_only,omitempty"`
	MaxRequestBatch    int             `json:"max_batch,omitempty"`
	MaxRequestExpires  time.Duration   `json:"max_expires,omitempty"`
	MaxRequestMaxBytes int             `json:"max_bytes,omitempty"`

	// Inactivity threshold.
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

	// Generally inherited by parent stream and other markers, now can be configured directly.
	Replicas int `json:"num_replicas"`
	// Force memory storage.
	MemoryStorage bool `json:"mem_storage,omitempty"`

	// NOTE: FilterSubjects requires nats-server v2.10.0+
	FilterSubjects []string `json:"filter_subjects,omitempty"`
}

ConsumerConfig is the configuration of a JetStream consumer.

type ConsumerInfo

type ConsumerInfo struct {
	Stream         string         `json:"stream_name"`
	Name           string         `json:"name"`
	Created        time.Time      `json:"created"`
	Config         ConsumerConfig `json:"config"`
	Delivered      SequenceInfo   `json:"delivered"`
	AckFloor       SequenceInfo   `json:"ack_floor"`
	NumAckPending  int            `json:"num_ack_pending"`
	NumRedelivered int            `json:"num_redelivered"`
	NumWaiting     int            `json:"num_waiting"`
	NumPending     uint64         `json:"num_pending"`
	Cluster        *ClusterInfo   `json:"cluster,omitempty"`
	PushBound      bool           `json:"push_bound,omitempty"`
}

ConsumerInfo is the info from a JetStream consumer.

type ConsumerInfoLister

type ConsumerInfoLister interface {
	Info() <-chan *ConsumerInfo
	Err() error
}

type ConsumerNameLister

type ConsumerNameLister interface {
	Name() <-chan string
	Err() error
}

type DeliverPolicy

type DeliverPolicy int
const (
	// DeliverAllPolicy starts delivering messages from the very beginning of a
	// stream. This is the default.
	DeliverAllPolicy DeliverPolicy = iota

	// DeliverLastPolicy will start the consumer with the last sequence
	// received.
	DeliverLastPolicy

	// DeliverNewPolicy will only deliver new messages that are sent after the
	// consumer is created.
	DeliverNewPolicy

	// DeliverByStartSequencePolicy will deliver messages starting from a given
	// sequence.
	DeliverByStartSequencePolicy

	// DeliverByStartTimePolicy will deliver messages starting from a given
	// time.
	DeliverByStartTimePolicy

	// DeliverLastPerSubjectPolicy will start the consumer with the last message
	// for all subjects received.
	DeliverLastPerSubjectPolicy
)

func (DeliverPolicy) MarshalJSON

func (p DeliverPolicy) MarshalJSON() ([]byte, error)

func (DeliverPolicy) String

func (p DeliverPolicy) String() string

func (*DeliverPolicy) UnmarshalJSON

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error

type DiscardPolicy

type DiscardPolicy int

DiscardPolicy determines how to proceed when limits of messages or bytes are reached.

const (
	// DiscardOld will remove older messages to return to the limits. This is
	// the default.
	DiscardOld DiscardPolicy = iota
	// DiscardNew will fail to store new messages.
	DiscardNew
)

func (DiscardPolicy) MarshalJSON

func (dp DiscardPolicy) MarshalJSON() ([]byte, error)

func (DiscardPolicy) String

func (dp DiscardPolicy) String() string

func (*DiscardPolicy) UnmarshalJSON

func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error

type ErrorCode

type ErrorCode uint16

ErrorCode represents error_code returned in response from JetStream API

const (
	JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
	JSErrCodeJetStreamNotEnabled           ErrorCode = 10076

	JSErrCodeStreamNotFound  ErrorCode = 10059
	JSErrCodeStreamNameInUse ErrorCode = 10058

	JSErrCodeConsumerCreate        ErrorCode = 10012
	JSErrCodeConsumerNotFound      ErrorCode = 10014
	JSErrCodeConsumerNameExists    ErrorCode = 10013
	JSErrCodeConsumerAlreadyExists ErrorCode = 10105

	JSErrCodeMessageNotFound ErrorCode = 10037

	JSErrCodeBadRequest ErrorCode = 10003
)

type ExternalStream

type ExternalStream struct {
	APIPrefix     string `json:"api"`
	DeliverPrefix string `json:"deliver"`
}

ExternalStream allows you to qualify access to a stream source in another account.

type FetchOpt

type FetchOpt func(*pullRequest) error

func FetchMaxWait

func FetchMaxWait(timeout time.Duration) FetchOpt

FetchMaxWait sets custom timeout for fetching predefined batch of messages

type GetMsgOpt

type GetMsgOpt func(*apiMsgGetRequest) error

func WithGetMsgSubject

func WithGetMsgSubject(subject string) GetMsgOpt

WithGetMsgSubject sets the stream subject from which the message should be retrieved. Server will return a first message with a seq >= to the input seq that has the specified subject.

type JetStream

type JetStream interface {
	// Returns *AccountInfo, containing details about the account associated with this JetStream connection
	AccountInfo(ctx context.Context) (*AccountInfo, error)

	StreamConsumerManager
	StreamManager
	Publisher
}

JetStream contains CRUD methods to operate on a stream Create, update and get operations return 'Stream' interface, allowing operations on consumers

CreateOrUpdateConsumer, Consumer and DeleteConsumer are helper methods used to create/fetch/remove consumer without fetching stream (bypassing stream API)

Client returns a JetStremClient, used to publish messages on a stream or fetch messages by sequence number

func New

func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error)

New returns a new JetStream instance.

Available options: WithClientTrace - enables request/response tracing. WithPublishAsyncErrHandler - sets error handler for async message publish. WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.

func NewWithAPIPrefix

func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (JetStream, error)

NewWithAPIPrefix returns a new JetStream instance and sets the API prefix to be used in requests to JetStream API

Available options: WithClientTrace - enables request/response tracing WithPublishAsyncErrHandler - sets error handler for async message publish WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.

func NewWithDomain

func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStream, error)

NewWithDomain returns a new JetStream instance and sets the domain name token used when sending JetStream requests

Available options: WithClientTrace - enables request/response tracing WithPublishAsyncErrHandler - sets error handler for async message publish WithPublishAsyncMaxPending - sets the maximum outstanding async publishes that can be inflight at one time. [WithDirectGet] - specifies whether client should use direct get requests.

type JetStreamError

type JetStreamError interface {
	APIError() *APIError
	error
}

JetStreamError is an error result that happens when using JetStream. In case of client-side error, APIError returns nil

type JetStreamOpt

type JetStreamOpt func(*jsOpts) error

func WithClientTrace

func WithClientTrace(ct *ClientTrace) JetStreamOpt

WithClientTrace enables request/response API calls tracing ClientTrace is used to provide handlers for each event

func WithPublishAsyncErrHandler

func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt

WithPublishAsyncErrHandler sets error handler for async message publish

func WithPublishAsyncMaxPending

func WithPublishAsyncMaxPending(max int) JetStreamOpt

WithPublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.

type MessageBatch

type MessageBatch interface {
	Messages() <-chan Msg
	Error() error
}

type MessageHandler

type MessageHandler func(msg Msg)

MessageHandler is a handler function used as callback in [Consume]

type MessagesContext

type MessagesContext interface {
	// Next retreives next message on a stream. It will block until the next message is available.
	Next() (Msg, error)
	// Stop closes the iterator and cancels subscription.
	Stop()
}

MessagesContext supports iterating over a messages on a stream.

type Msg

type Msg interface {
	// Metadata returns [MsgMetadata] for a JetStream message
	Metadata() (*MsgMetadata, error)
	// Data returns the message body
	Data() []byte
	// Headers returns a map of headers for a message
	Headers() nats.Header
	// Subject returns a subject on which a message is published
	Subject() string
	// Reply returns a reply subject for a message
	Reply() string

	// Ack acknowledges a message
	// This tells the server that the message was successfully processed and it can move on to the next message
	Ack() error
	// DoubleAck acknowledges a message and waits for ack from server
	DoubleAck(context.Context) error
	// Nak negatively acknowledges a message
	// This tells the server to redeliver the message
	Nak() error
	// NakWithDelay negatively acknowledges a message
	// This tells the server to redeliver the message
	// after the given `delay` duration
	NakWithDelay(delay time.Duration) error
	// InProgress tells the server that this message is being worked on
	// It resets the redelivery timer on the server
	InProgress() error
	// Term tells the server to not redeliver this message, regardless of the value of nats.MaxDeliver
	Term() error
}

Msg contains methods to operate on a JetStream message Metadata, Data, Headers, Subject and Reply can be used to retrieve the specific parts of the underlying message Ack, DoubleAck, Nak, InProgress and Term are various flavors of ack requests

type MsgErrHandler

type MsgErrHandler func(JetStream, *nats.Msg, error)

MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered.

type MsgMetadata

type MsgMetadata struct {
	Sequence     SequencePair
	NumDelivered uint64
	NumPending   uint64
	Timestamp    time.Time
	Stream       string
	Consumer     string
	Domain       string
}

MsgMetadata is the JetStream metadata associated with received messages.

type OrderedConsumerConfig

type OrderedConsumerConfig struct {
	FilterSubjects    []string      `json:"filter_subjects,omitempty"`
	DeliverPolicy     DeliverPolicy `json:"deliver_policy"`
	OptStartSeq       uint64        `json:"opt_start_seq,omitempty"`
	OptStartTime      *time.Time    `json:"opt_start_time,omitempty"`
	ReplayPolicy      ReplayPolicy  `json:"replay_policy"`
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
	HeadersOnly       bool          `json:"headers_only,omitempty"`

	// Maximum number of attempts for the consumer to be recreated
	// Defaults to unlimited
	MaxResetAttempts int
}

type PeerInfo

type PeerInfo struct {
	Name    string        `json:"name"`
	Current bool          `json:"current"`
	Offline bool          `json:"offline,omitempty"`
	Active  time.Duration `json:"active"`
	Lag     uint64        `json:"lag,omitempty"`
}

PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.

type Placement

type Placement struct {
	Cluster string   `json:"cluster"`
	Tags    []string `json:"tags,omitempty"`
}

Placement is used to guide placement of streams in clustered JetStream.

type PubAck

type PubAck struct {
	Stream    string `json:"stream"`
	Sequence  uint64 `json:"seq"`
	Duplicate bool   `json:"duplicate,omitempty"`
	Domain    string `json:"domain,omitempty"`
}

PubAck is an ack received after successfully publishing a message.

type PubAckFuture

type PubAckFuture interface {
	// Ok returns a receive only channel that can be used to get a PubAck.
	Ok() <-chan *PubAck

	// Err returns a receive only channel that can be used to get the error from an async publish.
	Err() <-chan error

	// Msg returns the message that was sent to the server.
	Msg() *nats.Msg
}

PubAckFuture is a future for a PubAck.

type PublishOpt

type PublishOpt func(*pubOpts) error

func WithExpectLastMsgID

func WithExpectLastMsgID(id string) PublishOpt

WithExpectLastMsgID sets the expected last msgId in the response from the publish.

func WithExpectLastSequence

func WithExpectLastSequence(seq uint64) PublishOpt

WithExpectLastSequence sets the expected sequence in the response from the publish.

func WithExpectLastSequencePerSubject

func WithExpectLastSequencePerSubject(seq uint64) PublishOpt

WithExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.

func WithExpectStream

func WithExpectStream(stream string) PublishOpt

WithExpectStream sets the expected stream to respond from the publish.

func WithMsgID

func WithMsgID(id string) PublishOpt

WithMsgID sets the message ID used for deduplication.

func WithRetryAttempts

func WithRetryAttempts(num int) PublishOpt

WithRetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.

func WithRetryWait

func WithRetryWait(dur time.Duration) PublishOpt

WithRetryWait sets the retry wait time when ErrNoResponders is encountered.

func WithStallWait

func WithStallWait(ttl time.Duration) PublishOpt

WithStallWait sets the max wait when the producer becomes stall producing messages.

type Publisher

type Publisher interface {
	// Publish performs a synchronous publish to a stream and waits for ack from server
	// It accepts subject name (which must be bound to a stream) and message data
	Publish(ctx context.Context, subject string, payload []byte, opts ...PublishOpt) (*PubAck, error)
	// PublishMsg performs a synchronous publish to a stream and waits for ack from server
	// It accepts subject name (which must be bound to a stream) and nats.Message
	PublishMsg(ctx context.Context, msg *nats.Msg, opts ...PublishOpt) (*PubAck, error)
	// PublishAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface
	// It accepts subject name (which must be bound to a stream) and message data
	PublishAsync(subject string, payload []byte, opts ...PublishOpt) (PubAckFuture, error)
	// PublishMsgAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface
	// It accepts subject name (which must be bound to a stream) and nats.Message
	PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error)
	// PublishAsyncPending returns the number of async publishes outstanding for this context
	PublishAsyncPending() int
	// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd
	PublishAsyncComplete() <-chan struct{}
}

type PullConsumeOpt

type PullConsumeOpt interface {
	// contains filtered or unexported methods
}

PullConsumeOpt represent additional options used in [Consume] for pull consumers

func ConsumeErrHandler

func ConsumeErrHandler(cb ConsumeErrHandlerFunc) PullConsumeOpt

ConsumeErrHandler sets custom error handler invoked when an error was encountered while consuming messages It will be invoked for both terminal (Consumer Deleted, invalid request body) and non-terminal (e.g. missing heartbeats) errors

type PullExpiry

type PullExpiry time.Duration

PullExpiry sets timeout on a single batch request, waiting until at least one message is available

type PullHeartbeat

type PullHeartbeat time.Duration

PullHeartbeat sets the idle heartbeat duration for a pull subscription If a client does not receive a heartbeat message from a stream for more than the idle heartbeat setting, the subscription will be removed and error will be passed to the message handler

type PullMaxBytes

type PullMaxBytes int

PullMaxBytes sets max_bytes limit on a fetch request

type PullMaxMessages

type PullMaxMessages int

PullMaxMessages limits the number of messages to be fetched from the stream in one request If not provided, a default of 100 messages will be used

type PullMessagesOpt

type PullMessagesOpt interface {
	// contains filtered or unexported methods
}

PullMessagesOpt represent additional options used in [Messages] for pull consumers

func WithMessagesErrOnMissingHeartbeat

func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt

WithMessagesErrOnMissingHeartbeat sets whether a missing heartbeat error should be reported when calling Next (Default: true).

type PullThresholdBytes

type PullThresholdBytes int

PullThresholdBytes sets the byte count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxBytes (if set).

type PullThresholdMessages

type PullThresholdMessages int

PullThresholdMessages sets the message count on which Consume will trigger new pull request to the server. Defaults to 50% of MaxMessages.

type RawStreamMsg

type RawStreamMsg struct {
	Subject  string
	Sequence uint64
	Header   nats.Header
	Data     []byte
	Time     time.Time
}

type RePublish

type RePublish struct {
	Source      string `json:"src,omitempty"`
	Destination string `json:"dest"`
	HeadersOnly bool   `json:"headers_only,omitempty"`
}

RePublish is for republishing messages once committed to a stream. The original subject is remapped from the subject pattern to the destination pattern.

type ReplayPolicy

type ReplayPolicy int

ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.

const (
	// ReplayInstantPolicy will replay messages as fast as possible.
	ReplayInstantPolicy ReplayPolicy = iota

	// ReplayOriginalPolicy will maintain the same timing as the messages were received.
	ReplayOriginalPolicy
)

func (ReplayPolicy) MarshalJSON

func (p ReplayPolicy) MarshalJSON() ([]byte, error)

func (ReplayPolicy) String

func (p ReplayPolicy) String() string

func (*ReplayPolicy) UnmarshalJSON

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy determines how messages in a set are retained.

const (
	// LimitsPolicy (default) means that messages are retained until any given limit is reached.
	// This could be one of MaxMsgs, MaxBytes, or MaxAge.
	LimitsPolicy RetentionPolicy = iota
	// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
	InterestPolicy
	// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
	WorkQueuePolicy
)

func (RetentionPolicy) MarshalJSON

func (rp RetentionPolicy) MarshalJSON() ([]byte, error)

func (RetentionPolicy) String

func (rp RetentionPolicy) String() string

func (*RetentionPolicy) UnmarshalJSON

func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error

type SequenceInfo

type SequenceInfo struct {
	Consumer uint64     `json:"consumer_seq"`
	Stream   uint64     `json:"stream_seq"`
	Last     *time.Time `json:"last_active,omitempty"`
}

SequenceInfo has both the consumer and the stream sequence and last activity.

type SequencePair

type SequencePair struct {
	Consumer uint64 `json:"consumer_seq"`
	Stream   uint64 `json:"stream_seq"`
}

SequencePair includes the consumer and stream sequence info from a JetStream consumer.

type StorageType

type StorageType int

StorageType determines how messages are stored for retention.

const (
	// FileStorage specifies on disk storage. It's the default.
	FileStorage StorageType = iota
	// MemoryStorage specifies in memory only.
	MemoryStorage
)

func (StorageType) MarshalJSON

func (st StorageType) MarshalJSON() ([]byte, error)

func (StorageType) String

func (st StorageType) String() string

func (*StorageType) UnmarshalJSON

func (st *StorageType) UnmarshalJSON(data []byte) error

type Stream

type Stream interface {

	// Info returns stream details
	Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error)
	// CachedInfo returns *StreamInfo cached on a consumer struct
	CachedInfo() *StreamInfo

	// Purge removes messages from a stream
	Purge(ctx context.Context, opts ...StreamPurgeOpt) error

	// GetMsg retrieves a raw stream message stored in JetStream by sequence number
	GetMsg(ctx context.Context, seq uint64, opts ...GetMsgOpt) (*RawStreamMsg, error)
	// GetLastMsgForSubject retrieves the last raw stream message stored in JetStream by subject
	GetLastMsgForSubject(ctx context.Context, subject string) (*RawStreamMsg, error)
	// DeleteMsg deletes a message from a stream.
	// The message is marked as erased, but not overwritten
	DeleteMsg(ctx context.Context, seq uint64) error
	// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
	// As a result, this operation is slower than DeleteMsg()
	SecureDeleteMsg(ctx context.Context, seq uint64) error
	// contains filtered or unexported methods
}

Stream contains CRUD methods on a consumer, as well as operations on an existing stream

type StreamConfig

type StreamConfig struct {
	Name                 string          `json:"name"`
	Description          string          `json:"description,omitempty"`
	Subjects             []string        `json:"subjects,omitempty"`
	Retention            RetentionPolicy `json:"retention"`
	MaxConsumers         int             `json:"max_consumers"`
	MaxMsgs              int64           `json:"max_msgs"`
	MaxBytes             int64           `json:"max_bytes"`
	Discard              DiscardPolicy   `json:"discard"`
	DiscardNewPerSubject bool            `json:"discard_new_per_subject,omitempty"`
	MaxAge               time.Duration   `json:"max_age"`
	MaxMsgsPerSubject    int64           `json:"max_msgs_per_subject"`
	MaxMsgSize           int32           `json:"max_msg_size,omitempty"`
	Storage              StorageType     `json:"storage"`
	Replicas             int             `json:"num_replicas"`
	NoAck                bool            `json:"no_ack,omitempty"`
	Template             string          `json:"template_owner,omitempty"`
	Duplicates           time.Duration   `json:"duplicate_window,omitempty"`
	Placement            *Placement      `json:"placement,omitempty"`
	Mirror               *StreamSource   `json:"mirror,omitempty"`
	Sources              []*StreamSource `json:"sources,omitempty"`
	Sealed               bool            `json:"sealed,omitempty"`
	DenyDelete           bool            `json:"deny_delete,omitempty"`
	DenyPurge            bool            `json:"deny_purge,omitempty"`
	AllowRollup          bool            `json:"allow_rollup_hdrs,omitempty"`

	// Allow republish of the message after being sequenced and stored.
	RePublish *RePublish `json:"republish,omitempty"`

	// Allow higher performance, direct access to get individual messages. E.g. KeyValue
	AllowDirect bool `json:"allow_direct"`
	// Allow higher performance and unified direct access for mirrors as well.
	MirrorDirect bool `json:"mirror_direct"`
}

type StreamConsumerManager

type StreamConsumerManager interface {
	// CreateOrUpdateConsumer creates a consumer on a given stream with given config.
	// If consumer already exists, it will be updated (if possible).
	// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
	CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
	// OrderedConsumer returns an OrderedConsumer instance.
	// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
	// for in order delivery of messages. Underlying consumer is re-created when necessary,
	// without additional client code.
	OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error)
	// Consumer returns a hook to an existing consumer, allowing processing of messages
	Consumer(ctx context.Context, stream string, consumer string) (Consumer, error)
	// DeleteConsumer removes a consumer with given name from a stream
	DeleteConsumer(ctx context.Context, stream string, consumer string) error
}

type StreamInfo

type StreamInfo struct {
	Config  StreamConfig        `json:"config"`
	Created time.Time           `json:"created"`
	State   StreamState         `json:"state"`
	Cluster *ClusterInfo        `json:"cluster,omitempty"`
	Mirror  *StreamSourceInfo   `json:"mirror,omitempty"`
	Sources []*StreamSourceInfo `json:"sources,omitempty"`
}

StreamInfo shows config and current state for this stream.

type StreamInfoLister

type StreamInfoLister interface {
	Info() <-chan *StreamInfo
	Err() error
}

type StreamInfoOpt

type StreamInfoOpt func(*streamInfoRequest) error

func WithDeletedDetails

func WithDeletedDetails(deletedDetails bool) StreamInfoOpt

WithDeletedDetails can be used to display the information about messages deleted from a stream on a stream info request

func WithSubjectFilter

func WithSubjectFilter(subject string) StreamInfoOpt

WithSubjectFilter can be used to display the information about messages stored on given subjects

type StreamListOpt

type StreamListOpt func(*streamsRequest) error

func WithStreamListSubject

func WithStreamListSubject(subject string) StreamListOpt

WithStreamListSubject can be used to filter results of ListStreams and StreamNames requests to only streams that have given subject in their configuration

type StreamManager

type StreamManager interface {
	// CreateStream creates a new stream with given config and returns a hook to operate on it
	CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
	// UpdateStream updates an existing stream
	UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error)
	// Stream returns a [Stream] hook for a given stream name
	Stream(ctx context.Context, stream string) (Stream, error)
	// StreamNameBySubject returns a stream name stream listening on given subject
	StreamNameBySubject(ctx context.Context, subject string) (string, error)
	// DeleteStream removes a stream with given name
	DeleteStream(ctx context.Context, stream string) error
	// ListStreams returns StreamInfoLister enabling iterating over a channel of stream infos
	ListStreams(context.Context, ...StreamListOpt) StreamInfoLister
	// StreamNames returns a  StreamNameLister enabling iterating over a channel of stream names
	StreamNames(context.Context, ...StreamListOpt) StreamNameLister
}

type StreamNameLister

type StreamNameLister interface {
	Name() <-chan string
	Err() error
}

type StreamPurgeOpt

type StreamPurgeOpt func(*StreamPurgeRequest) error

func WithPurgeKeep

func WithPurgeKeep(keep uint64) StreamPurgeOpt

WithPurgeKeep sets the number of messages to be kept in the stream after purge. Can be combined with WithPurgeSubject option, but not with WithPurgeSequence

func WithPurgeSequence

func WithPurgeSequence(sequence uint64) StreamPurgeOpt

WithPurgeSequence is used to set a specific sequence number up to which (but not including) messages will be purged from a stream Can be combined with WithPurgeSubject option, but not with WithPurgeKeep

func WithPurgeSubject

func WithPurgeSubject(subject string) StreamPurgeOpt

WithPurgeSubject sets a specific subject for which messages on a stream will be purged

type StreamPurgeRequest

type StreamPurgeRequest struct {
	// Purge up to but not including sequence.
	Sequence uint64 `json:"seq,omitempty"`
	// Subject to match against messages for the purge command.
	Subject string `json:"filter,omitempty"`
	// Number of messages to keep.
	Keep uint64 `json:"keep,omitempty"`
}

type StreamSource

type StreamSource struct {
	Name          string          `json:"name"`
	OptStartSeq   uint64          `json:"opt_start_seq,omitempty"`
	OptStartTime  *time.Time      `json:"opt_start_time,omitempty"`
	FilterSubject string          `json:"filter_subject,omitempty"`
	External      *ExternalStream `json:"external,omitempty"`
	Domain        string          `json:"-"`
}

StreamSource dictates how streams can source from other streams.

type StreamSourceInfo

type StreamSourceInfo struct {
	Name   string        `json:"name"`
	Lag    uint64        `json:"lag"`
	Active time.Duration `json:"active"`
}

StreamSourceInfo shows information about an upstream stream source.

type StreamState

type StreamState struct {
	Msgs        uint64            `json:"messages"`
	Bytes       uint64            `json:"bytes"`
	FirstSeq    uint64            `json:"first_seq"`
	FirstTime   time.Time         `json:"first_ts"`
	LastSeq     uint64            `json:"last_seq"`
	LastTime    time.Time         `json:"last_ts"`
	Consumers   int               `json:"consumer_count"`
	Deleted     []uint64          `json:"deleted"`
	NumDeleted  int               `json:"num_deleted"`
	NumSubjects uint64            `json:"num_subjects"`
	Subjects    map[string]uint64 `json:"subjects"`
}

StreamState is information about the given stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL