jsm

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: Apache-2.0 Imports: 23 Imported by: 51

README

Overview

This is a Go based library to manage and interact with JetStream.

This package is the underlying library for the nats CLI, our Terraform provider, GitHub Actions and Kubernetes CRDs. It's essentially a direct wrapping of the JetStream API with few userfriendly features and requires deep technical knowledge of the JetStream internals.

For typical end users we suggest the nats.go package.

Initialization

This package is modeled as a Manager instance that receives a NATS Connection and sets default timeouts and validation for all interaction with JetStream.

Multiple Managers can be used in your application each with own timeouts and connection.

mgr, _ := jsm.New(nc, jsm.WithTimeout(10*time.Second))

This creates a Manager with a 10 second timeout when accessing the JetStream API. All examples below assume a manager was created as above.

Schema Registry

All the JetStream API messages and some events and advisories produced by the NATS Server have JSON Schemas associated with them, the api package has a Schema Registry and helpers to discover and interact with these.

The Schema Registry can be accessed on the cli in the nats schemas command where you can list, search and view schemas and validate data based on schemas.

Example Message

To retrieve the Stream State for a specific Stream one accesses the $JS.API.STREAM.INFO.<stream> API, this will respond with data like below:

{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "TESTING",
    "subjects": [
      "js.in.testing"
    ],
    "retention": "limits",
    "max_consumers": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "discard": "old",
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "num_replicas": 1,
    "duplicate_window": 120000000000
  },
  "created": "2020-10-09T12:40:07.648216464Z",
  "state": {
    "messages": 1,
    "bytes": 81,
    "first_seq": 1017,
    "first_ts": "2020-10-09T19:43:40.867729419Z",
    "last_seq": 1017,
    "last_ts": "2020-10-09T19:43:40.867729419Z",
    "consumer_count": 1
  }
}

Here the type of the message is io.nats.jetstream.api.v1.stream_info_response, the API package can help parse this into the correct format.

Message Schemas

Given a message kind one can retrieve the full JSON Schema as bytes:

schema, _ := api.Schema("io.nats.jetstream.api.v1.stream_info_response")

Once can also retrieve it based on a specific message content:

schemaType, _ := api.SchemaTypeForMessage(m.Data)
schema, _ := api.Schema(schemaType)

Several other Schema related helpers exist to search Schemas, fine URLs and more. See the api Reference.

Parsing Message Content

JetStream will produce metrics about message Acknowledgments, API audits and more, here we subscribe to the metric subject and print a specific received message type.

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)
    log.Printf("Received message of type %s", kind) // io.nats.jetstream.advisory.v1.api_audit

    switch e := event.(type){
    case advisory.JetStreamAPIAuditV1:
        fmt.Printf("Audit event on subject %s from %s\n", e.Subject, e.Client.Name)
    }
})

Above we gain full access to all contents of the message in it's native format, but we need to know in advance what we will get, we can render the messages as text in a generic way though:

nc.Subscribe("$JS.EVENT.ADVISORY.>", func(m *nats.Msg){
    kind, msg, _ := api.ParseMessage(m.Data)

    if kind == "io.nats.unknown_message" {
        return // a message without metadata or of a unknown format was received
    }

    ne, ok := event.(api.Event)
    if !ok {
        return fmt.Errorf("event %q does not implement the Event interface", kind)
    }

    err = api.RenderEvent(os.Stdout, ne, api.TextCompactFormat)
    if err != nil {
        return fmt.Errorf("display failed: %s", err)
    }
})

This will produce output like:

11:25:49 [JS API] $JS.API.STREAM.INFO.TESTING $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:52 [JS API] $JS.API.STREAM.NAMES $G
11:25:53 [JS API] $JS.API.STREAM.INFO.TESTING $G

The api.TextCompactFormat is one of a few we support, also api.TextExtendedFormat for a full multi line format, api.ApplicationCloudEventV1Format for CloudEvents v1 format and api.ApplicationJSONFormat for JSON.

API Validation

The data structures sent to JetStream can be validated before submission to NATS which can speed up user feedback and provide better errors.

type SchemaValidator struct{}

func (v SchemaValidator) ValidateStruct(data any, schemaType string) (ok bool, errs []string) {
	s, err := api.Schema(schemaType)
	if err != nil {
		return false, []string{"unknown schema type %s", schemaType}
	}

	ls := gojsonschema.NewBytesLoader(s)
	ld := gojsonschema.NewGoLoader(data)
	result, err := gojsonschema.Validate(ls, ld)
	if err != nil {
		return false, []string{fmt.Sprintf("validation failed: %s", err)}
	}

	if result.Valid() {
		return true, nil
	}

	errors := make([]string, len(result.Errors()))
	for i, verr := range result.Errors() {
		errors[i] = verr.String()
	}

	return false, errors
}

This is a api.StructValidator implementation that uses JSON Schema to do deep validation of the structures sent to JetStream.

This can be used by the Manager to validate all API access.

mgr, _ := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)))

Documentation

Overview

Package jsm provides client helpers for managing and interacting with NATS JetStream

Index

Constants

This section is empty.

Variables

View Source
var DefaultConsumer = api.ConsumerConfig{
	DeliverPolicy: api.DeliverAll,
	AckPolicy:     api.AckExplicit,
	AckWait:       30 * time.Second,
	ReplayPolicy:  api.ReplayInstant,
}

DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer

View Source
var DefaultStream = api.StreamConfig{
	Retention:    api.LimitsPolicy,
	Discard:      api.DiscardOld,
	MaxConsumers: -1,
	MaxMsgs:      -1,
	MaxMsgsPer:   -1,
	MaxBytes:     -1,
	MaxAge:       24 * 365 * time.Hour,
	MaxMsgSize:   -1,
	Replicas:     1,
	NoAck:        false,
}

DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age. No storage type or subjects are set

View Source
var DefaultStreamConfiguration = DefaultStream

DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream

View Source
var DefaultWorkQueue = api.StreamConfig{
	Retention:    api.WorkQueuePolicy,
	Discard:      api.DiscardOld,
	MaxConsumers: -1,
	MaxMsgs:      -1,
	MaxMsgsPer:   -1,
	MaxBytes:     -1,
	MaxAge:       24 * 365 * time.Hour,
	MaxMsgSize:   -1,
	Replicas:     api.StreamDefaultReplicas,
	NoAck:        false,
}

DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age. No storage type or subjects are set

View Source
var ErrAckStreamIngestsAll = fmt.Errorf("configuration validation failed: streams with no_ack false may not have '>' or '*' as subjects")
View Source
var ErrMemoryStreamNotSupported = errors.New("memory streams do not support snapshots")

ErrMemoryStreamNotSupported is an error indicating a memory stream was being snapshotted which is not supported

View Source
var SampledDefaultConsumer = api.ConsumerConfig{
	DeliverPolicy:   api.DeliverAll,
	AckPolicy:       api.AckExplicit,
	AckWait:         30 * time.Second,
	ReplayPolicy:    api.ReplayInstant,
	SampleFrequency: "100%",
}

SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer

Functions

func APISubject added in v0.0.21

func APISubject(subject string, prefix string, domain string) string

APISubject returns API subject with prefix applied

func EventSubject added in v0.0.21

func EventSubject(subject string, prefix string) string

EventSubject returns Event subject with prefix applied

func IsErrorResponse

func IsErrorResponse(m *nats.Msg) bool

IsErrorResponse checks if the message holds a standard JetStream error

func IsInternalStream added in v0.0.27

func IsInternalStream(s string) bool

IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state

func IsKVBucketStream added in v0.0.27

func IsKVBucketStream(s string) bool

IsKVBucketStream determines if a stream is a KV bucket

func IsMQTTStateStream added in v0.0.27

func IsMQTTStateStream(s string) bool

IsMQTTStateStream determines if a stream holds internal MQTT state

func IsNatsError added in v0.0.25

func IsNatsError(err error, code uint16) bool

IsNatsError checks if err is a ApiErr matching code

func IsOKResponse

func IsOKResponse(m *nats.Msg) bool

IsOKResponse checks if the message holds a standard JetStream error

func IsObjectBucketStream added in v0.0.27

func IsObjectBucketStream(s string) bool

IsObjectBucketStream determines if a stream is a Object bucket

func IsValidName added in v0.0.18

func IsValidName(n string) bool

IsValidName verifies if n is a valid stream, template or consumer name

func LinearBackoffPeriods added in v0.0.29

func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]time.Duration, error)

LinearBackoffPeriods creates a backoff policy without any jitter suitable for use in a consumer backoff policy

The periods start from min and increase linearly until ~max

func NewConsumerConfiguration

func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)

NewConsumerConfiguration generates a new configuration based on template modified by opts

func NewStreamConfiguration

func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)

NewStreamConfiguration generates a new configuration based on template modified by opts

func NextSubject

func NextSubject(stream string, consumer string) (string, error)

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func ParseErrorResponse

func ParseErrorResponse(m *nats.Msg) error

ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil

func ParseEvent

func ParseEvent(e []byte) (schema string, event any, err error)

ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage

func ParsePubAck added in v0.0.25

func ParsePubAck(m *nats.Msg) (*api.PubAck, error)

ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed

func SubjectIsSubsetMatch added in v0.0.33

func SubjectIsSubsetMatch(subject, test string) bool

SubjectIsSubsetMatch tests if a subject matches a standard nats wildcard

Types

type Consumer

type Consumer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Consumer represents a JetStream consumer

func (*Consumer) AckPolicy

func (c *Consumer) AckPolicy() api.AckPolicy

func (*Consumer) AckSampleSubject

func (c *Consumer) AckSampleSubject() string

AckSampleSubject is the subject used to publish ack samples to

func (*Consumer) AckWait

func (c *Consumer) AckWait() time.Duration

func (*Consumer) AcknowledgedFloor

func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)

AcknowledgedFloor reports the highest contiguous message sequences that were acknowledged

func (*Consumer) AdvisorySubject

func (c *Consumer) AdvisorySubject() string

AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this consumer

func (*Consumer) Backoff added in v0.0.29

func (c *Consumer) Backoff() []time.Duration

func (*Consumer) Configuration

func (c *Consumer) Configuration() (config api.ConsumerConfig)

Configuration is the Consumer configuration

func (*Consumer) Delete

func (c *Consumer) Delete() (err error)

Delete deletes the Consumer, after this the Consumer object should be disposed

func (*Consumer) DeliverGroup added in v0.0.26

func (c *Consumer) DeliverGroup() string

func (*Consumer) DeliverPolicy

func (c *Consumer) DeliverPolicy() api.DeliverPolicy

func (*Consumer) DeliveredState

func (c *Consumer) DeliveredState() (api.SequenceInfo, error)

DeliveredState reports the messages sequences that were successfully delivered

func (*Consumer) DeliverySubject

func (c *Consumer) DeliverySubject() string

func (*Consumer) Description added in v0.0.26

func (c *Consumer) Description() string

func (*Consumer) DurableName

func (c *Consumer) DurableName() string

func (*Consumer) FilterSubject

func (c *Consumer) FilterSubject() string

func (*Consumer) FilterSubjects added in v0.1.0

func (c *Consumer) FilterSubjects() []string

func (*Consumer) FlowControl added in v0.0.21

func (c *Consumer) FlowControl() bool

func (*Consumer) Heartbeat added in v0.0.21

func (c *Consumer) Heartbeat() time.Duration

func (*Consumer) InactiveThreshold added in v0.0.29

func (c *Consumer) InactiveThreshold() time.Duration

func (*Consumer) IsDurable

func (c *Consumer) IsDurable() bool

func (*Consumer) IsEphemeral

func (c *Consumer) IsEphemeral() bool

func (*Consumer) IsHeadersOnly added in v0.0.27

func (c *Consumer) IsHeadersOnly() bool

func (*Consumer) IsPullMode

func (c *Consumer) IsPullMode() bool

func (*Consumer) IsPushMode

func (c *Consumer) IsPushMode() bool

func (*Consumer) IsSampled

func (c *Consumer) IsSampled() bool

func (*Consumer) LatestState added in v0.0.23

func (c *Consumer) LatestState() (api.ConsumerInfo, error)

LatestState returns the most recently loaded state

func (*Consumer) LeaderStepDown added in v0.0.21

func (c *Consumer) LeaderStepDown() error

LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election

func (*Consumer) MaxAckPending added in v0.0.20

func (c *Consumer) MaxAckPending() int

func (*Consumer) MaxDeliver

func (c *Consumer) MaxDeliver() int

func (*Consumer) MaxRequestBatch added in v0.0.29

func (c *Consumer) MaxRequestBatch() int

func (*Consumer) MaxRequestExpires added in v0.0.29

func (c *Consumer) MaxRequestExpires() time.Duration

func (*Consumer) MaxRequestMaxBytes added in v0.0.33

func (c *Consumer) MaxRequestMaxBytes() int

func (*Consumer) MaxWaiting added in v0.0.24

func (c *Consumer) MaxWaiting() int

func (*Consumer) MemoryStorage added in v0.0.33

func (c *Consumer) MemoryStorage() bool

func (*Consumer) Metadata added in v0.1.0

func (c *Consumer) Metadata() map[string]string

func (*Consumer) MetricSubject

func (c *Consumer) MetricSubject() string

MetricSubject is a wildcard subscription subject that subscribes to all metrics for this consumer

func (*Consumer) Name

func (c *Consumer) Name() string

func (*Consumer) NextMsg

func (c *Consumer) NextMsg() (*nats.Msg, error)

NextMsg retrieves the next message, waiting up to manager timeout for a response

func (*Consumer) NextMsgContext added in v0.0.19

func (c *Consumer) NextMsgContext(ctx context.Context) (*nats.Msg, error)

NextMsgContext retrieves the next message, interrupted by the cancel context ctx

func (*Consumer) NextMsgRequest added in v0.0.20

func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error

NextMsgRequest creates a request for a batch of messages, data or control flow messages will be sent to inbox

func (*Consumer) NextSubject

func (c *Consumer) NextSubject() string

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func (*Consumer) PendingAcknowledgement added in v0.0.20

func (c *Consumer) PendingAcknowledgement() (int, error)

PendingAcknowledgement reports the number of messages sent but not yet acknowledged

func (*Consumer) PendingMessages added in v0.0.20

func (c *Consumer) PendingMessages() (uint64, error)

PendingMessages is the number of unprocessed messages for this consumer

func (*Consumer) RateLimit added in v0.0.18

func (c *Consumer) RateLimit() uint64

func (*Consumer) RedeliveryCount

func (c *Consumer) RedeliveryCount() (int, error)

RedeliveryCount reports the number of redelivers that were done

func (*Consumer) ReplayPolicy

func (c *Consumer) ReplayPolicy() api.ReplayPolicy

func (*Consumer) Replicas added in v0.0.33

func (c *Consumer) Replicas() int

func (*Consumer) Reset

func (c *Consumer) Reset() error

Reset reloads the Consumer configuration from the JetStream server

func (*Consumer) SampleFrequency

func (c *Consumer) SampleFrequency() string

func (*Consumer) StartSequence

func (c *Consumer) StartSequence() uint64

func (*Consumer) StartTime

func (c *Consumer) StartTime() time.Time

func (*Consumer) State

func (c *Consumer) State() (api.ConsumerInfo, error)

State loads a snapshot of consumer state including delivery counts, retries and more

func (*Consumer) StreamName

func (c *Consumer) StreamName() string

func (*Consumer) UpdateConfiguration added in v0.0.27

func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error

UpdateConfiguration updates the consumer configuration At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed

func (*Consumer) WaitingClientPulls added in v0.0.20

func (c *Consumer) WaitingClientPulls() (int, error)

WaitingClientPulls is the number of clients that have outstanding pull requests against this consumer

type ConsumerOption

type ConsumerOption func(o *api.ConsumerConfig) error

ConsumerOption configures consumers

func AckWait

func AckWait(t time.Duration) ConsumerOption

AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted

func AcknowledgeAll

func AcknowledgeAll() ConsumerOption

AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages

func AcknowledgeExplicit

func AcknowledgeExplicit() ConsumerOption

AcknowledgeExplicit requires that every message received be acknowledged

func AcknowledgeNone

func AcknowledgeNone() ConsumerOption

AcknowledgeNone disables message acknowledgement

func BackoffIntervals added in v0.0.29

func BackoffIntervals(i ...time.Duration) ConsumerOption

BackoffIntervals sets a series of intervals by which retries will be attempted for this consumr

func BackoffPolicy added in v0.0.29

func BackoffPolicy(policy []time.Duration) ConsumerOption

BackoffPolicy sets a consumer policy

func ConsumerDescription added in v0.0.26

func ConsumerDescription(d string) ConsumerOption

ConsumerDescription is a textual description of this consumer to provide additional context

func ConsumerMetadata added in v0.1.0

func ConsumerMetadata(meta map[string]string) ConsumerOption

func ConsumerName added in v0.1.0

func ConsumerName(s string) ConsumerOption

ConsumerName sets a name for the consumer, when creating a durable consumer use DurableName, using ConsumerName allows for creating named ephemeral consumers, else a random name will be generated

func ConsumerOverrideMemoryStorage added in v0.0.33

func ConsumerOverrideMemoryStorage() ConsumerOption

func ConsumerOverrideReplicas added in v0.0.33

func ConsumerOverrideReplicas(r int) ConsumerOption

ConsumerOverrideReplicas override the replica count inherited from the Stream with this value

func DeliverAllAvailable

func DeliverAllAvailable() ConsumerOption

DeliverAllAvailable delivers messages starting with the first available in the stream

func DeliverGroup added in v0.0.26

func DeliverGroup(g string) ConsumerOption

DeliverGroup when set will only deliver messages to subscriptions matching that group

func DeliverHeadersOnly added in v0.0.27

func DeliverHeadersOnly() ConsumerOption

DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies

func DeliverLastPerSubject added in v0.0.26

func DeliverLastPerSubject() ConsumerOption

DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer

func DeliverySubject

func DeliverySubject(s string) ConsumerOption

DeliverySubject is the subject where a Push consumer will deliver its messages

func DurableName

func DurableName(s string) ConsumerOption

DurableName is the name given to the consumer, when not set an ephemeral consumer is created

func FilterStreamBySubject

func FilterStreamBySubject(s ...string) ConsumerOption

FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject

func IdleHeartbeat added in v0.0.21

func IdleHeartbeat(hb time.Duration) ConsumerOption

IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive

func InactiveThreshold added in v0.0.29

func InactiveThreshold(t time.Duration) ConsumerOption

InactiveThreshold is the idle time an ephemeral consumer allows before it is removed

func LinearBackoffPolicy added in v0.0.29

func LinearBackoffPolicy(steps uint, min time.Duration, max time.Duration) ConsumerOption

LinearBackoffPolicy creates a backoff policy with linearly increasing steps between min and max

func MaxAckPending added in v0.0.20

func MaxAckPending(pending uint) ConsumerOption

MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended

func MaxDeliveryAttempts

func MaxDeliveryAttempts(n int) ConsumerOption

MaxDeliveryAttempts is the number of times a message will be attempted to be delivered

func MaxRequestBatch added in v0.0.29

func MaxRequestBatch(max uint) ConsumerOption

MaxRequestBatch is the largest batch that can be specified when doing pulls against the consumer

func MaxRequestExpires added in v0.0.29

func MaxRequestExpires(max time.Duration) ConsumerOption

MaxRequestExpires is the longest pull request expire the server will allow

func MaxRequestMaxBytes added in v0.0.33

func MaxRequestMaxBytes(max int) ConsumerOption

MaxRequestMaxBytes sets the limit of max bytes a consumer my request

func MaxWaiting added in v0.0.24

func MaxWaiting(pulls uint) ConsumerOption

MaxWaiting is the number of outstanding pulls that are allowed on any one consumer. Pulls made that exceeds this limit are discarded.

func PushFlowControl added in v0.0.21

func PushFlowControl() ConsumerOption

PushFlowControl enables flow control for push based consumers

func RateLimitBitsPerSecond added in v0.0.18

func RateLimitBitsPerSecond(bps uint64) ConsumerOption

RateLimitBitsPerSecond limits message delivery to a rate in bits per second

func ReplayAsReceived

func ReplayAsReceived() ConsumerOption

ReplayAsReceived delivers messages at the rate they were received at

func ReplayInstantly

func ReplayInstantly() ConsumerOption

ReplayInstantly delivers messages to the consumer as fast as possible

func SamplePercent

func SamplePercent(i int) ConsumerOption

SamplePercent configures sampling of a subset of messages expressed as a percentage

func StartAtSequence

func StartAtSequence(s uint64) ConsumerOption

StartAtSequence starts consuming messages at a specific sequence in the stream

func StartAtTime

func StartAtTime(t time.Time) ConsumerOption

StartAtTime starts consuming messages at a specific point in time in the stream

func StartAtTimeDelta

func StartAtTimeDelta(d time.Duration) ConsumerOption

StartAtTimeDelta starts delivering messages at a past point in time

func StartWithLastReceived

func StartWithLastReceived() ConsumerOption

StartWithLastReceived starts delivery at the last messages received in the stream

func StartWithNextReceived

func StartWithNextReceived() ConsumerOption

StartWithNextReceived starts delivery at the next messages received in the stream

type Manager added in v0.0.19

type Manager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func New added in v0.0.19

func New(nc *nats.Conn, opts ...Option) (*Manager, error)

func (*Manager) ConsumerNames added in v0.0.19

func (m *Manager) ConsumerNames(stream string) (names []string, err error)

ConsumerNames is a sorted list of all known consumers within a stream

func (*Manager) Consumers added in v0.0.19

func (m *Manager) Consumers(stream string) (consumers []*Consumer, missing []string, err error)

Consumers is a sorted list of all known Consumers within a Stream and a list of any consumer names that were known but no details were found

func (*Manager) DeleteConsumer added in v0.0.34

func (m *Manager) DeleteConsumer(stream string, consumer string) error

DeleteConsumer removes a consumer without all the drama of loading it etc

func (*Manager) DeleteStream added in v0.0.34

func (m *Manager) DeleteStream(stream string) error

DeleteStream removes a stream without all the drama of loading it etc

func (*Manager) DeleteStreamMessage added in v0.0.25

func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error

DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete

func (*Manager) EachStream added in v0.0.19

func (m *Manager) EachStream(filter *StreamNamesFilter, cb func(*Stream)) (missing []string, err error)

EachStream iterates over all known Streams, does not handle any streams the cluster could not get data from but returns a list of those

func (*Manager) IsJetStreamEnabled added in v0.0.19

func (m *Manager) IsJetStreamEnabled() bool

IsJetStreamEnabled determines if JetStream is enabled for the current account

func (*Manager) IsKnownConsumer added in v0.0.19

func (m *Manager) IsKnownConsumer(stream string, consumer string) (bool, error)

IsKnownConsumer determines if a Consumer is known for a specific Stream

func (*Manager) IsKnownStream added in v0.0.19

func (m *Manager) IsKnownStream(stream string) (bool, error)

IsKnownStream determines if a Stream is known

func (*Manager) IsStreamMaxBytesRequired added in v0.0.31

func (m *Manager) IsStreamMaxBytesRequired() (bool, error)

IsStreamMaxBytesRequired determines if the JetStream account requires streams to set a byte limit

func (*Manager) JetStreamAccountInfo added in v0.0.19

func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)

JetStreamAccountInfo retrieves information about the current account limits and more

func (*Manager) LoadConsumer added in v0.0.19

func (m *Manager) LoadConsumer(stream string, name string) (consumer *Consumer, err error)

LoadConsumer loads a consumer by name

func (*Manager) LoadOrNewConsumer added in v0.0.19

func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumer loads a consumer by name if known else creates a new one with these properties

func (*Manager) LoadOrNewConsumerFromDefault added in v0.0.19

func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template

func (*Manager) LoadOrNewStream added in v0.0.19

func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)

LoadOrNewStream loads an existing stream or creates a new one matching opts

func (*Manager) LoadOrNewStreamFromDefault added in v0.0.19

func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)

LoadOrNewStreamFromDefault loads an existing stream or creates a new one matching opts and template

func (*Manager) LoadStream added in v0.0.19

func (m *Manager) LoadStream(name string) (stream *Stream, err error)

LoadStream loads a stream by name

func (*Manager) MetaLeaderStandDown added in v0.0.21

func (m *Manager) MetaLeaderStandDown(placement *api.Placement) error

MetaLeaderStandDown requests the meta group leader to stand down, must be initiated by a system user

func (*Manager) MetaPeerRemove added in v0.0.21

func (m *Manager) MetaPeerRemove(name string, id string) error

MetaPeerRemove removes a peer from the JetStream meta cluster, evicting all streams, consumer etc. Use with extreme caution. If id is given it will be used by the server else name, it's generally best to remove by id

func (*Manager) MetaPurgeAccount added in v0.0.35

func (m *Manager) MetaPurgeAccount(account string) error

MetaPurgeAccount removes all data from an account, must be run in the system account

func (*Manager) NatsConn added in v0.0.25

func (m *Manager) NatsConn() *nats.Conn

NatsConn gives access to the underlying NATS Connection

func (*Manager) NewConsumer added in v0.0.19

func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumer creates a consumer based on DefaultConsumer modified by opts

func (*Manager) NewConsumerFromDefault added in v0.0.19

func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumerFromDefault creates a new consumer based on a template config that gets modified by opts

func (*Manager) NewStream added in v0.0.19

func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)

NewStream creates a new stream using DefaultStream as a starting template allowing adjustments to be made using options

func (*Manager) NewStreamConfiguration added in v0.0.19

func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)

NewStreamConfiguration generates a new configuration based on template modified by opts

func (*Manager) NewStreamFromDefault added in v0.0.19

func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)

NewStreamFromDefault creates a new stream based on a supplied template and options

func (*Manager) NextMsg added in v0.0.19

func (m *Manager) NextMsg(stream string, consumer string) (*nats.Msg, error)

NextMsg requests the next message from the server with the manager timeout

func (*Manager) NextMsgContext added in v0.0.19

func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)

NextMsgContext requests the next message from the server. This request will wait for as long as the context is active. If repeated pulls will be made it's better to use NextMsgRequest()

func (*Manager) NextMsgRequest added in v0.0.20

func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, req *api.JSApiConsumerGetNextRequest) error

NextMsgRequest creates a request for a batch of messages on a consumer, data or control flow messages will be sent to inbox

func (*Manager) NextSubject added in v0.0.21

func (m *Manager) NextSubject(stream string, consumer string) (string, error)

NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer

func (*Manager) QueryStreams added in v0.0.29

func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error)

QueryStreams filters the streams found in JetStream using various filter options

func (*Manager) ReadLastMessageForSubject added in v0.0.25

func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)

ReadLastMessageForSubject reads the last message stored in the stream for a specific subject

func (*Manager) RestoreSnapshotFromDirectory added in v0.0.21

func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)

func (*Manager) StreamContainedSubjects added in v0.0.34

func (m *Manager) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)

StreamContainedSubjects queries the stream for the subjects it holds with optional filter

func (*Manager) StreamNames added in v0.0.19

func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)

StreamNames is a sorted list of all known Streams filtered by filter

func (*Manager) StreamTemplateNames added in v0.0.19

func (m *Manager) StreamTemplateNames() (templates []string, err error)

StreamTemplateNames is a sorted list of all known StreamTemplates

func (*Manager) Streams added in v0.0.19

func (m *Manager) Streams(filter *StreamNamesFilter) ([]*Stream, []string, error)

Streams is a sorted list of all known Streams and a list of any stream names that were known but no details were found

type MsgInfo

type MsgInfo struct {
	// contains filtered or unexported fields
}

MsgInfo holds metadata about a message that was received from JetStream

func ParseJSMsgMetadata

func ParseJSMsgMetadata(m *nats.Msg) (info *MsgInfo, err error)

ParseJSMsgMetadata parse the reply subject metadata to determine message metadata

func ParseJSMsgMetadataReply added in v0.0.20

func ParseJSMsgMetadataReply(reply string) (info *MsgInfo, err error)

ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message

func (*MsgInfo) Consumer

func (i *MsgInfo) Consumer() string

Consumer is the name of the consumer that produced this message

func (*MsgInfo) ConsumerSequence

func (i *MsgInfo) ConsumerSequence() uint64

ConsumerSequence is the sequence of this message in the consumer

func (*MsgInfo) Delivered

func (i *MsgInfo) Delivered() int

Delivered is the number of times this message had delivery attempts including this one

func (*MsgInfo) Domain added in v0.0.26

func (i *MsgInfo) Domain() string

Domain is the domain the message came from, can be empty

func (*MsgInfo) Pending added in v0.0.20

func (i *MsgInfo) Pending() uint64

Pending is the number of messages left to consume, -1 when the number is not reported

func (*MsgInfo) Stream

func (i *MsgInfo) Stream() string

Stream is the stream this message is stored in

func (*MsgInfo) StreamSequence

func (i *MsgInfo) StreamSequence() uint64

StreamSequence is the sequence of this message in the stream

func (*MsgInfo) TimeStamp

func (i *MsgInfo) TimeStamp() time.Time

TimeStamp is the time the message was received by JetStream

type Option added in v0.0.19

type Option func(o *Manager)

Option is a option to configure the JetStream Manager

func WithAPIPrefix added in v0.0.21

func WithAPIPrefix(s string) Option

WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES

func WithAPIValidation

func WithAPIValidation(v api.StructValidator) Option

WithAPIValidation validates responses sent from the NATS server using a validator

func WithDomain added in v0.0.24

func WithDomain(d string) Option

WithDomain sets a JetStream domain, incompatible with WithApiPrefix()

func WithEventPrefix added in v0.0.21

func WithEventPrefix(s string) Option

WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout sets a timeout for the requests

func WithTrace

func WithTrace() Option

WithTrace enables logging of JSON API requests and responses

type PagerOption added in v0.0.19

type PagerOption func(p *StreamPager)

PagerOption configures the stream pager

func PagerFilterSubject added in v0.0.23

func PagerFilterSubject(s string) PagerOption

PagerFilterSubject sets a filter subject for the pager

func PagerSize added in v0.0.19

func PagerSize(sz int) PagerOption

PagerSize is the size of pages to walk

func PagerStartDelta added in v0.0.19

func PagerStartDelta(d time.Duration) PagerOption

PagerStartDelta sets a starting time delta for the pager

func PagerStartId added in v0.0.19

func PagerStartId(id int) PagerOption

PagerStartId sets a starting stream sequence for the pager

func PagerTimeout added in v0.0.19

func PagerTimeout(d time.Duration) PagerOption

PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached

type RestoreProgress

type RestoreProgress interface {
	// StartTime is when the process started
	StartTime() time.Time
	// EndTime is when the process ended - zero when not completed
	EndTime() time.Time
	// ChunkSize is the size of the data packets sent over NATS
	ChunkSize() int
	// ChunksSent is the number of chunks of size ChunkSize that was sent
	ChunksSent() uint32
	// ChunksToSend number of chunks of ChunkSize expected to be sent
	ChunksToSend() int
	// BytesSent is the number of bytes sent so far
	BytesSent() uint64
	// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
	BytesPerSecond() uint64
}

type SnapshotOption

type SnapshotOption func(o *snapshotOptions)

func RestoreConfiguration added in v0.0.22

func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption

RestoreConfiguration overrides the configuration used to restore

func RestoreNotify

func RestoreNotify(cb func(RestoreProgress)) SnapshotOption

RestoreNotify notifies cb about progress of the restore operation

func SnapshotConsumers

func SnapshotConsumers() SnapshotOption

SnapshotConsumers includes consumer configuration and state in backups

func SnapshotDebug

func SnapshotDebug() SnapshotOption

SnapshotDebug enables logging using the standard go logging library

func SnapshotHealthCheck

func SnapshotHealthCheck() SnapshotOption

SnapshotHealthCheck performs a health check prior to starting the snapshot

func SnapshotNotify

func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption

SnapshotNotify notifies cb about progress of the snapshot operation

type SnapshotProgress

type SnapshotProgress interface {
	// StartTime is when the process started
	StartTime() time.Time
	// EndTime is when the process ended - zero when not completed
	EndTime() time.Time
	// ChunkSize is the size of the data packets sent over NATS
	ChunkSize() int
	// ChunksReceived is how many chunks of ChunkSize were received
	ChunksReceived() uint32
	// BytesExpected is how many Bytes we should be receiving
	BytesExpected() uint64
	// BytesReceived is how many Bytes have been received
	BytesReceived() uint64
	// UncompressedBytesReceived is the number of bytes received uncompressed
	UncompressedBytesReceived() uint64
	// BytesPerSecond is the number of bytes received in the last second, 0 during the first second
	BytesPerSecond() uint64
	// HealthCheck indicates if health checking was requested
	HealthCheck() bool
	// Finished will be true after all data have been written
	Finished() bool
}

type Stream

type Stream struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Stream represents a JetStream Stream

func (*Stream) AdvisorySubject

func (s *Stream) AdvisorySubject() string

AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this stream

func (*Stream) Compression added in v0.1.0

func (s *Stream) Compression() api.Compression

func (*Stream) Configuration

func (s *Stream) Configuration() api.StreamConfig

func (*Stream) ConsumerLimits added in v0.1.1

func (s *Stream) ConsumerLimits() api.StreamConsumerLimits

func (*Stream) ConsumerNames

func (s *Stream) ConsumerNames() (names []string, err error)

ConsumerNames is a list of all known consumers for this Stream

func (*Stream) ContainedSubjects added in v0.0.34

func (s *Stream) ContainedSubjects(filter ...string) (map[string]uint64, error)

ContainedSubjects queries the stream for the subjects it holds with optional filter

func (*Stream) Delete

func (s *Stream) Delete() error

Delete deletes the Stream, after this the Stream object should be disposed

func (*Stream) DeleteAllowed added in v0.0.34

func (s *Stream) DeleteAllowed() bool

func (*Stream) DeleteMessage

func (s *Stream) DeleteMessage(seq uint64) (err error)

DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data

func (*Stream) Description added in v0.0.26

func (s *Stream) Description() string

func (*Stream) DetectGaps added in v0.1.0

func (s *Stream) DetectGaps(ctx context.Context, progress func(seq uint64, pending uint64), gap func(first uint64, last uint64)) error

DetectGaps detects interior deletes in a stream, reports progress through the stream and each found gap.

It uses the extended stream info to get the sequences and use that to detect gaps. The Deleted information in StreamInfo is capped at some amount so if it determines there are more messages that are deleted in the stream it will then make a consumer and walk the remainder of the stream to detect gaps the hard way

func (*Stream) DirectAllowed added in v0.0.34

func (s *Stream) DirectAllowed() bool

func (*Stream) DiscardNewPerSubject added in v0.0.35

func (s *Stream) DiscardNewPerSubject() bool

func (*Stream) DiscardPolicy added in v0.0.35

func (s *Stream) DiscardPolicy() api.DiscardPolicy

func (*Stream) DuplicateWindow added in v0.0.18

func (s *Stream) DuplicateWindow() time.Duration

func (*Stream) EachConsumer

func (s *Stream) EachConsumer(cb func(consumer *Consumer)) (missing []string, err error)

EachConsumer calls cb with each known consumer for this stream, error on any error to load consumers

func (*Stream) FastDeleteMessage added in v0.0.25

func (s *Stream) FastDeleteMessage(seq uint64) error

FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete

func (*Stream) FirstSequence added in v0.1.0

func (s *Stream) FirstSequence() uint64

func (*Stream) Information

func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error)

Information loads the current stream information

func (*Stream) IsCompressed added in v0.1.0

func (s *Stream) IsCompressed() bool

IsCompressed determines if a stream is compressed

func (*Stream) IsInternal added in v0.0.27

func (s *Stream) IsInternal() bool

IsInternal indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state

func (*Stream) IsKVBucket added in v0.0.27

func (s *Stream) IsKVBucket() bool

IsKVBucket determines if a stream is a KV bucket

func (*Stream) IsMQTTState added in v0.0.27

func (s *Stream) IsMQTTState() bool

IsMQTTState determines if a stream holds internal MQTT state

func (*Stream) IsMirror added in v0.0.21

func (s *Stream) IsMirror() bool

IsMirror determines if this stream is a mirror of another

func (*Stream) IsObjectBucket added in v0.0.27

func (s *Stream) IsObjectBucket() bool

IsObjectBucket determines if a stream is a Object bucket

func (*Stream) IsRepublishing added in v0.0.33

func (s *Stream) IsRepublishing() bool

func (*Stream) IsSourced added in v0.0.21

func (s *Stream) IsSourced() bool

IsSourced determines if this stream is sourcing data from another stream. Other streams could be synced to this stream and it would not be reported by this property

func (*Stream) IsTemplateManaged

func (s *Stream) IsTemplateManaged() bool

IsTemplateManaged determines if this stream is managed by a template

func (*Stream) LatestInformation

func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)

LatestInformation returns the most recently fetched stream information

func (*Stream) LatestState

func (s *Stream) LatestState() (state api.StreamState, err error)

LatestState returns the most recently fetched stream state

func (*Stream) LeaderStepDown added in v0.0.21

func (s *Stream) LeaderStepDown() error

LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election

func (*Stream) LoadConsumer

func (s *Stream) LoadConsumer(name string) (*Consumer, error)

LoadConsumer loads a named consumer related to this Stream

func (*Stream) LoadOrNewConsumer

func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumer loads or creates a consumer based on these options

func (*Stream) LoadOrNewConsumerFromDefault

func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

LoadOrNewConsumerFromDefault loads or creates a consumer based on these options that adjust supplied template

func (*Stream) MaxAge

func (s *Stream) MaxAge() time.Duration

func (*Stream) MaxBytes

func (s *Stream) MaxBytes() int64

func (*Stream) MaxConsumers

func (s *Stream) MaxConsumers() int

func (*Stream) MaxMsgSize

func (s *Stream) MaxMsgSize() int32

func (*Stream) MaxMsgs

func (s *Stream) MaxMsgs() int64

func (*Stream) MaxMsgsPerSubject added in v0.0.24

func (s *Stream) MaxMsgsPerSubject() int64

func (*Stream) Metadata added in v0.1.0

func (s *Stream) Metadata() map[string]string

func (*Stream) MetricSubject

func (s *Stream) MetricSubject() string

MetricSubject is a wildcard subscription subject that subscribes to all advisories for this stream

func (*Stream) Mirror added in v0.0.21

func (s *Stream) Mirror() *api.StreamSource

func (*Stream) MirrorDirectAllowed added in v0.0.34

func (s *Stream) MirrorDirectAllowed() bool

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) NewConsumer

func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumer creates a new consumer in this Stream based on DefaultConsumer

func (*Stream) NewConsumerFromDefault

func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)

NewConsumerFromDefault creates a new consumer in this Stream based on a supplied template config

func (*Stream) NoAck

func (s *Stream) NoAck() bool

func (*Stream) PageContents added in v0.0.19

func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)

PageContents creates a StreamPager used to traverse the contents of the stream, Close() should be called to dispose of the background consumer and resources

func (*Stream) Purge

func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error

Purge deletes messages from the Stream, an optional JSApiStreamPurgeRequest can be supplied to limit the purge to a subset of messages

func (*Stream) PurgeAllowed added in v0.0.27

func (s *Stream) PurgeAllowed() bool

func (*Stream) ReadLastMessageForSubject added in v0.0.25

func (s *Stream) ReadLastMessageForSubject(subj string) (*api.StoredMsg, error)

ReadLastMessageForSubject reads the last message stored in the stream for a specific subject

func (*Stream) ReadMessage

func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error)

ReadMessage loads a message from the stream by its sequence number

func (*Stream) RemoveRAFTPeer added in v0.0.21

func (s *Stream) RemoveRAFTPeer(peer string) error

RemoveRAFTPeer removes a peer from the group indicating it will not return

func (*Stream) Replicas

func (s *Stream) Replicas() int

func (*Stream) Republish added in v0.0.33

func (s *Stream) Republish() *api.RePublish

func (*Stream) Reset

func (s *Stream) Reset() error

Reset reloads the Stream configuration from the JetStream server

func (*Stream) Retention

func (s *Stream) Retention() api.RetentionPolicy

func (*Stream) RollupAllowed added in v0.0.27

func (s *Stream) RollupAllowed() bool

func (*Stream) Seal added in v0.0.27

func (s *Stream) Seal() error

Seal updates a stream so that messages can not be added or removed using the API and limits will not be processed - messages will never age out. A sealed stream can not be unsealed.

func (*Stream) Sealed added in v0.0.27

func (s *Stream) Sealed() bool

func (*Stream) SnapshotToDirectory added in v0.0.21

func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)

SnapshotToDirectory creates a backup into s2 compressed tar file

func (*Stream) Sources added in v0.0.21

func (s *Stream) Sources() []*api.StreamSource

func (*Stream) State

func (s *Stream) State(req ...api.JSApiStreamInfoRequest) (stats api.StreamState, err error)

State retrieves the Stream State

func (*Stream) Storage

func (s *Stream) Storage() api.StorageType

func (*Stream) Subjects

func (s *Stream) Subjects() []string

func (*Stream) Template

func (s *Stream) Template() string

func (*Stream) UpdateConfiguration

func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error

UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update

type StreamNamesFilter added in v0.0.20

type StreamNamesFilter struct {
	// Subject filter the names to those consuming messages matching this subject or wildcard
	Subject string `json:"subject,omitempty"`
}

StreamNamesFilter limits the names being returned by the names API

type StreamOption

type StreamOption func(o *api.StreamConfig) error

StreamOption configures a stream

func AllowDirect added in v0.0.34

func AllowDirect() StreamOption

func AllowRollup added in v0.0.27

func AllowRollup() StreamOption

func AppendSource added in v0.0.21

func AppendSource(source *api.StreamSource) StreamOption

func Compression added in v0.1.0

func Compression(alg api.Compression) StreamOption

func ConsumerLimits added in v0.1.1

func ConsumerLimits(limits api.StreamConsumerLimits) StreamOption

func DenyDelete added in v0.0.27

func DenyDelete() StreamOption

func DenyPurge added in v0.0.27

func DenyPurge() StreamOption

func DiscardNew

func DiscardNew() StreamOption

func DiscardNewPerSubject added in v0.0.35

func DiscardNewPerSubject() StreamOption

func DiscardOld

func DiscardOld() StreamOption

func DuplicateWindow added in v0.0.18

func DuplicateWindow(d time.Duration) StreamOption

func FileStorage

func FileStorage() StreamOption

func FirstSequence added in v0.1.0

func FirstSequence(seq uint64) StreamOption

func InterestRetention

func InterestRetention() StreamOption

func LimitsRetention

func LimitsRetention() StreamOption

func MaxAge

func MaxAge(m time.Duration) StreamOption

func MaxBytes

func MaxBytes(m int64) StreamOption

func MaxConsumers

func MaxConsumers(m int) StreamOption

func MaxMessageSize

func MaxMessageSize(m int32) StreamOption

func MaxMessages

func MaxMessages(m int64) StreamOption

func MaxMessagesPerSubject added in v0.0.24

func MaxMessagesPerSubject(m int64) StreamOption

func MemoryStorage

func MemoryStorage() StreamOption

func Mirror added in v0.0.21

func Mirror(stream *api.StreamSource) StreamOption

func MirrorDirect added in v0.0.34

func MirrorDirect() StreamOption

func NoAck

func NoAck() StreamOption

func NoAllowDirect added in v0.0.34

func NoAllowDirect() StreamOption

func NoMirrorDirect added in v0.0.34

func NoMirrorDirect() StreamOption

func PlacementCluster added in v0.0.21

func PlacementCluster(cluster string) StreamOption

func PlacementTags added in v0.0.21

func PlacementTags(tags ...string) StreamOption

func Replicas

func Replicas(r int) StreamOption

func Republish added in v0.0.33

func Republish(m *api.RePublish) StreamOption

func Sources added in v0.0.21

func Sources(streams ...*api.StreamSource) StreamOption

func StreamDescription added in v0.0.26

func StreamDescription(d string) StreamOption

StreamDescription is a textual description of this stream to provide additional context

func StreamMetadata added in v0.1.0

func StreamMetadata(meta map[string]string) StreamOption

func Subjects

func Subjects(s ...string) StreamOption

func WorkQueueRetention

func WorkQueueRetention() StreamOption

type StreamPager added in v0.0.19

type StreamPager struct {
	// contains filtered or unexported fields
}

func (*StreamPager) Close added in v0.0.19

func (p *StreamPager) Close() error

Close dispose of the resources used by the pager and should be called when done

func (*StreamPager) NextMsg added in v0.0.19

func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, err error)

NextMsg retrieves the next message from the pager interrupted by ctx.

last indicates if the message is the last in the current page, the next call to NextMsg will first request the next page, if the client is prompting users to continue to the next page it should be done when last is true

When the end of the stream is reached err will be non nil and last will be true otherwise err being non nil while last is false indicate a failed state. End is indicated by no new messages arriving after ctx timeout or the time set using PagerTimes() is reached

type StreamQueryOpt added in v0.0.29

type StreamQueryOpt func(query *streamQuery) error

func StreamQueryClusterName added in v0.0.29

func StreamQueryClusterName(c string) StreamQueryOpt

StreamQueryClusterName limits results to servers within a cluster matched by a regular expression

func StreamQueryExpression added in v0.1.0

func StreamQueryExpression(e string) StreamQueryOpt

StreamQueryExpression filters the stream using the expr expression language

func StreamQueryFewerConsumersThan added in v0.0.29

func StreamQueryFewerConsumersThan(c uint) StreamQueryOpt

StreamQueryFewerConsumersThan limits results to streams with fewer than or equal consumers than c

func StreamQueryIdleLongerThan added in v0.0.29

func StreamQueryIdleLongerThan(p time.Duration) StreamQueryOpt

StreamQueryIdleLongerThan limits results to streams that has not received messages for a period longer than p

func StreamQueryInvert added in v0.0.29

func StreamQueryInvert() StreamQueryOpt

StreamQueryInvert inverts the logic of filters, older than becomes newer than and so forth

func StreamQueryIsMirror added in v0.1.0

func StreamQueryIsMirror() StreamQueryOpt

func StreamQueryIsSourced added in v0.1.0

func StreamQueryIsSourced() StreamQueryOpt

func StreamQueryOlderThan added in v0.0.29

func StreamQueryOlderThan(p time.Duration) StreamQueryOpt

StreamQueryOlderThan limits the results to streams older than p

func StreamQueryReplicas added in v0.1.0

func StreamQueryReplicas(r uint) StreamQueryOpt

StreamQueryReplicas finds streams with a certain number of replicas or less

func StreamQueryServerName added in v0.0.29

func StreamQueryServerName(s string) StreamQueryOpt

StreamQueryServerName limits results to servers matching a regular expression

func StreamQuerySubjectWildcard added in v0.0.33

func StreamQuerySubjectWildcard(s string) StreamQueryOpt

StreamQuerySubjectWildcard limits results to streams with subject interest matching standard a nats wildcard

func StreamQueryWithoutMessages added in v0.0.29

func StreamQueryWithoutMessages() StreamQueryOpt

StreamQueryWithoutMessages limits results to streams with no messages

Directories

Path Synopsis
api
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL