Documentation ¶
Overview ¶
Package jsm provides client helpers for managing and interacting with NATS JetStream
Index ¶
- Variables
- func APISubject(subject string, prefix string, domain string) string
- func EventSubject(subject string, prefix string) string
- func IsErrorResponse(m *nats.Msg) bool
- func IsInternalStream(s string) bool
- func IsKVBucketStream(s string) bool
- func IsMQTTStateStream(s string) bool
- func IsNatsError(err error, code uint16) bool
- func IsOKResponse(m *nats.Msg) bool
- func IsObjectBucketStream(s string) bool
- func IsValidName(n string) bool
- func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]time.Duration, error)
- func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
- func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func NextSubject(stream string, consumer string) (string, error)
- func ParseErrorResponse(m *nats.Msg) error
- func ParseEvent(e []byte) (schema string, event any, err error)
- func ParsePubAck(m *nats.Msg) (*api.PubAck, error)
- func SubjectIsSubsetMatch(subject, test string) bool
- type Consumer
- func (c *Consumer) AckPolicy() api.AckPolicy
- func (c *Consumer) AckSampleSubject() string
- func (c *Consumer) AckWait() time.Duration
- func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
- func (c *Consumer) AdvisorySubject() string
- func (c *Consumer) Backoff() []time.Duration
- func (c *Consumer) Configuration() (config api.ConsumerConfig)
- func (c *Consumer) Delete() (err error)
- func (c *Consumer) DeliverGroup() string
- func (c *Consumer) DeliverPolicy() api.DeliverPolicy
- func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
- func (c *Consumer) DeliverySubject() string
- func (c *Consumer) Description() string
- func (c *Consumer) DurableName() string
- func (c *Consumer) FilterSubject() string
- func (c *Consumer) FilterSubjects() []string
- func (c *Consumer) FlowControl() bool
- func (c *Consumer) Heartbeat() time.Duration
- func (c *Consumer) InactiveThreshold() time.Duration
- func (c *Consumer) IsDurable() bool
- func (c *Consumer) IsEphemeral() bool
- func (c *Consumer) IsHeadersOnly() bool
- func (c *Consumer) IsPullMode() bool
- func (c *Consumer) IsPushMode() bool
- func (c *Consumer) IsSampled() bool
- func (c *Consumer) LatestState() (api.ConsumerInfo, error)
- func (c *Consumer) LeaderStepDown() error
- func (c *Consumer) MaxAckPending() int
- func (c *Consumer) MaxDeliver() int
- func (c *Consumer) MaxRequestBatch() int
- func (c *Consumer) MaxRequestExpires() time.Duration
- func (c *Consumer) MaxRequestMaxBytes() int
- func (c *Consumer) MaxWaiting() int
- func (c *Consumer) MemoryStorage() bool
- func (c *Consumer) Metadata() map[string]string
- func (c *Consumer) MetricSubject() string
- func (c *Consumer) Name() string
- func (c *Consumer) NextMsg() (*nats.Msg, error)
- func (c *Consumer) NextMsgContext(ctx context.Context) (*nats.Msg, error)
- func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
- func (c *Consumer) NextSubject() string
- func (c *Consumer) PendingAcknowledgement() (int, error)
- func (c *Consumer) PendingMessages() (uint64, error)
- func (c *Consumer) RateLimit() uint64
- func (c *Consumer) RedeliveryCount() (int, error)
- func (c *Consumer) ReplayPolicy() api.ReplayPolicy
- func (c *Consumer) Replicas() int
- func (c *Consumer) Reset() error
- func (c *Consumer) SampleFrequency() string
- func (c *Consumer) StartSequence() uint64
- func (c *Consumer) StartTime() time.Time
- func (c *Consumer) State() (api.ConsumerInfo, error)
- func (c *Consumer) StreamName() string
- func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
- func (c *Consumer) WaitingClientPulls() (int, error)
- type ConsumerOption
- func AckWait(t time.Duration) ConsumerOption
- func AcknowledgeAll() ConsumerOption
- func AcknowledgeExplicit() ConsumerOption
- func AcknowledgeNone() ConsumerOption
- func BackoffIntervals(i ...time.Duration) ConsumerOption
- func BackoffPolicy(policy []time.Duration) ConsumerOption
- func ConsumerDescription(d string) ConsumerOption
- func ConsumerMetadata(meta map[string]string) ConsumerOption
- func ConsumerName(s string) ConsumerOption
- func ConsumerOverrideMemoryStorage() ConsumerOption
- func ConsumerOverrideReplicas(r int) ConsumerOption
- func DeliverAllAvailable() ConsumerOption
- func DeliverGroup(g string) ConsumerOption
- func DeliverHeadersOnly() ConsumerOption
- func DeliverLastPerSubject() ConsumerOption
- func DeliverySubject(s string) ConsumerOption
- func DurableName(s string) ConsumerOption
- func FilterStreamBySubject(s ...string) ConsumerOption
- func IdleHeartbeat(hb time.Duration) ConsumerOption
- func InactiveThreshold(t time.Duration) ConsumerOption
- func LinearBackoffPolicy(steps uint, min time.Duration, max time.Duration) ConsumerOption
- func MaxAckPending(pending uint) ConsumerOption
- func MaxDeliveryAttempts(n int) ConsumerOption
- func MaxRequestBatch(max uint) ConsumerOption
- func MaxRequestExpires(max time.Duration) ConsumerOption
- func MaxRequestMaxBytes(max int) ConsumerOption
- func MaxWaiting(pulls uint) ConsumerOption
- func PushFlowControl() ConsumerOption
- func RateLimitBitsPerSecond(bps uint64) ConsumerOption
- func ReplayAsReceived() ConsumerOption
- func ReplayInstantly() ConsumerOption
- func SamplePercent(i int) ConsumerOption
- func StartAtSequence(s uint64) ConsumerOption
- func StartAtTime(t time.Time) ConsumerOption
- func StartAtTimeDelta(d time.Duration) ConsumerOption
- func StartWithLastReceived() ConsumerOption
- func StartWithNextReceived() ConsumerOption
- type Manager
- func (m *Manager) ConsumerNames(stream string) (names []string, err error)
- func (m *Manager) Consumers(stream string) (consumers []*Consumer, missing []string, err error)
- func (m *Manager) DeleteConsumer(stream string, consumer string) error
- func (m *Manager) DeleteStream(stream string) error
- func (m *Manager) DeleteStreamMessage(stream string, seq uint64, noErase bool) error
- func (m *Manager) EachStream(filter *StreamNamesFilter, cb func(*Stream)) (missing []string, err error)
- func (m *Manager) IsJetStreamEnabled() bool
- func (m *Manager) IsKnownConsumer(stream string, consumer string) (bool, error)
- func (m *Manager) IsKnownStream(stream string) (bool, error)
- func (m *Manager) IsStreamMaxBytesRequired() (bool, error)
- func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
- func (m *Manager) LoadConsumer(stream string, name string) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, ...) (consumer *Consumer, err error)
- func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) LoadStream(name string) (stream *Stream, err error)
- func (m *Manager) MetaLeaderStandDown(placement *api.Placement) error
- func (m *Manager) MetaPeerRemove(name string, id string) error
- func (m *Manager) MetaPurgeAccount(account string) error
- func (m *Manager) NatsConn() *nats.Conn
- func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
- func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
- func (m *Manager) NextMsg(stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
- func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, ...) error
- func (m *Manager) NextSubject(stream string, consumer string) (string, error)
- func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error)
- func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
- func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
- func (m *Manager) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)
- func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
- func (m *Manager) StreamTemplateNames() (templates []string, err error)
- func (m *Manager) Streams(filter *StreamNamesFilter) ([]*Stream, []string, error)
- type MsgInfo
- type Option
- type PagerOption
- type RestoreProgress
- type SnapshotOption
- func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
- func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
- func SnapshotConsumers() SnapshotOption
- func SnapshotDebug() SnapshotOption
- func SnapshotHealthCheck() SnapshotOption
- func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
- type SnapshotProgress
- type Stream
- func (s *Stream) AdvisorySubject() string
- func (s *Stream) Compression() api.Compression
- func (s *Stream) Configuration() api.StreamConfig
- func (s *Stream) ConsumerLimits() api.StreamConsumerLimits
- func (s *Stream) ConsumerNames() (names []string, err error)
- func (s *Stream) ContainedSubjects(filter ...string) (map[string]uint64, error)
- func (s *Stream) Delete() error
- func (s *Stream) DeleteAllowed() bool
- func (s *Stream) DeleteMessage(seq uint64) (err error)
- func (s *Stream) Description() string
- func (s *Stream) DetectGaps(ctx context.Context, progress func(seq uint64, pending uint64), ...) error
- func (s *Stream) DirectAllowed() bool
- func (s *Stream) DiscardNewPerSubject() bool
- func (s *Stream) DiscardPolicy() api.DiscardPolicy
- func (s *Stream) DuplicateWindow() time.Duration
- func (s *Stream) EachConsumer(cb func(consumer *Consumer)) (missing []string, err error)
- func (s *Stream) FastDeleteMessage(seq uint64) error
- func (s *Stream) FirstSequence() uint64
- func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error)
- func (s *Stream) IsCompressed() bool
- func (s *Stream) IsInternal() bool
- func (s *Stream) IsKVBucket() bool
- func (s *Stream) IsMQTTState() bool
- func (s *Stream) IsMirror() bool
- func (s *Stream) IsObjectBucket() bool
- func (s *Stream) IsRepublishing() bool
- func (s *Stream) IsSourced() bool
- func (s *Stream) IsTemplateManaged() bool
- func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
- func (s *Stream) LatestState() (state api.StreamState, err error)
- func (s *Stream) LeaderStepDown() error
- func (s *Stream) LoadConsumer(name string) (*Consumer, error)
- func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) MaxAge() time.Duration
- func (s *Stream) MaxBytes() int64
- func (s *Stream) MaxConsumers() int
- func (s *Stream) MaxMsgSize() int32
- func (s *Stream) MaxMsgs() int64
- func (s *Stream) MaxMsgsPerSubject() int64
- func (s *Stream) Metadata() map[string]string
- func (s *Stream) MetricSubject() string
- func (s *Stream) Mirror() *api.StreamSource
- func (s *Stream) MirrorDirectAllowed() bool
- func (s *Stream) Name() string
- func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
- func (s *Stream) NoAck() bool
- func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
- func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
- func (s *Stream) PurgeAllowed() bool
- func (s *Stream) ReadLastMessageForSubject(subj string) (*api.StoredMsg, error)
- func (s *Stream) ReadMessage(seq uint64) (msg *api.StoredMsg, err error)
- func (s *Stream) RemoveRAFTPeer(peer string) error
- func (s *Stream) Replicas() int
- func (s *Stream) Republish() *api.RePublish
- func (s *Stream) Reset() error
- func (s *Stream) Retention() api.RetentionPolicy
- func (s *Stream) RollupAllowed() bool
- func (s *Stream) Seal() error
- func (s *Stream) Sealed() bool
- func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
- func (s *Stream) Sources() []*api.StreamSource
- func (s *Stream) State(req ...api.JSApiStreamInfoRequest) (stats api.StreamState, err error)
- func (s *Stream) Storage() api.StorageType
- func (s *Stream) Subjects() []string
- func (s *Stream) Template() string
- func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
- type StreamNamesFilter
- type StreamOption
- func AllowDirect() StreamOption
- func AllowRollup() StreamOption
- func AppendSource(source *api.StreamSource) StreamOption
- func Compression(alg api.Compression) StreamOption
- func ConsumerLimits(limits api.StreamConsumerLimits) StreamOption
- func DenyDelete() StreamOption
- func DenyPurge() StreamOption
- func DiscardNew() StreamOption
- func DiscardNewPerSubject() StreamOption
- func DiscardOld() StreamOption
- func DuplicateWindow(d time.Duration) StreamOption
- func FileStorage() StreamOption
- func FirstSequence(seq uint64) StreamOption
- func InterestRetention() StreamOption
- func LimitsRetention() StreamOption
- func MaxAge(m time.Duration) StreamOption
- func MaxBytes(m int64) StreamOption
- func MaxConsumers(m int) StreamOption
- func MaxMessageSize(m int32) StreamOption
- func MaxMessages(m int64) StreamOption
- func MaxMessagesPerSubject(m int64) StreamOption
- func MemoryStorage() StreamOption
- func Mirror(stream *api.StreamSource) StreamOption
- func MirrorDirect() StreamOption
- func NoAck() StreamOption
- func NoAllowDirect() StreamOption
- func NoMirrorDirect() StreamOption
- func PlacementCluster(cluster string) StreamOption
- func PlacementTags(tags ...string) StreamOption
- func Replicas(r int) StreamOption
- func Republish(m *api.RePublish) StreamOption
- func Sources(streams ...*api.StreamSource) StreamOption
- func StreamDescription(d string) StreamOption
- func StreamMetadata(meta map[string]string) StreamOption
- func Subjects(s ...string) StreamOption
- func WorkQueueRetention() StreamOption
- type StreamPager
- type StreamQueryOpt
- func StreamQueryClusterName(c string) StreamQueryOpt
- func StreamQueryExpression(e string) StreamQueryOpt
- func StreamQueryFewerConsumersThan(c uint) StreamQueryOpt
- func StreamQueryIdleLongerThan(p time.Duration) StreamQueryOpt
- func StreamQueryInvert() StreamQueryOpt
- func StreamQueryIsMirror() StreamQueryOpt
- func StreamQueryIsSourced() StreamQueryOpt
- func StreamQueryOlderThan(p time.Duration) StreamQueryOpt
- func StreamQueryReplicas(r uint) StreamQueryOpt
- func StreamQueryServerName(s string) StreamQueryOpt
- func StreamQuerySubjectWildcard(s string) StreamQueryOpt
- func StreamQueryWithoutMessages() StreamQueryOpt
Constants ¶
This section is empty.
Variables ¶
var DefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, }
DefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
var DefaultStream = api.StreamConfig{ Retention: api.LimitsPolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: 1, NoAck: false, }
DefaultStream is a template configuration with StreamPolicy retention and 1 years maximum age. No storage type or subjects are set
var DefaultStreamConfiguration = DefaultStream
DefaultStreamConfiguration is the configuration that will be used to create new Streams in NewStream
var DefaultWorkQueue = api.StreamConfig{ Retention: api.WorkQueuePolicy, Discard: api.DiscardOld, MaxConsumers: -1, MaxMsgs: -1, MaxMsgsPer: -1, MaxBytes: -1, MaxAge: 24 * 365 * time.Hour, MaxMsgSize: -1, Replicas: api.StreamDefaultReplicas, NoAck: false, }
DefaultWorkQueue is a template configuration with WorkQueuePolicy retention and 1 years maximum age. No storage type or subjects are set
var ErrAckStreamIngestsAll = fmt.Errorf("configuration validation failed: streams with no_ack false may not have '>' or '*' as subjects")
var ErrMemoryStreamNotSupported = errors.New("memory streams do not support snapshots")
ErrMemoryStreamNotSupported is an error indicating a memory stream was being snapshotted which is not supported
var SampledDefaultConsumer = api.ConsumerConfig{ DeliverPolicy: api.DeliverAll, AckPolicy: api.AckExplicit, AckWait: 30 * time.Second, ReplayPolicy: api.ReplayInstant, SampleFrequency: "100%", }
SampledDefaultConsumer is the configuration that will be used to create new Consumers in NewConsumer
Functions ¶
func APISubject ¶ added in v0.0.21
APISubject returns API subject with prefix applied
func EventSubject ¶ added in v0.0.21
EventSubject returns Event subject with prefix applied
func IsErrorResponse ¶
func IsErrorResponse(m *nats.Msg) bool
IsErrorResponse checks if the message holds a standard JetStream error
func IsInternalStream ¶ added in v0.0.27
IsInternalStream indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func IsKVBucketStream ¶ added in v0.0.27
IsKVBucketStream determines if a stream is a KV bucket
func IsMQTTStateStream ¶ added in v0.0.27
IsMQTTStateStream determines if a stream holds internal MQTT state
func IsNatsError ¶ added in v0.0.25
IsNatsError checks if err is a ApiErr matching code
func IsOKResponse ¶
func IsOKResponse(m *nats.Msg) bool
IsOKResponse checks if the message holds a standard JetStream error
func IsObjectBucketStream ¶ added in v0.0.27
IsObjectBucketStream determines if a stream is a Object bucket
func IsValidName ¶ added in v0.0.18
IsValidName verifies if n is a valid stream, template or consumer name
func LinearBackoffPeriods ¶ added in v0.0.29
func LinearBackoffPeriods(steps uint, min time.Duration, max time.Duration) ([]time.Duration, error)
LinearBackoffPeriods creates a backoff policy without any jitter suitable for use in a consumer backoff policy
The periods start from min and increase linearly until ~max
func NewConsumerConfiguration ¶
func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (*api.ConsumerConfig, error)
NewConsumerConfiguration generates a new configuration based on template modified by opts
func NewStreamConfiguration ¶
func NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func ParseErrorResponse ¶
func ParseErrorResponse(m *nats.Msg) error
ParseErrorResponse parses the JetStream response, if it's an error returns an error instance holding the message else nil
func ParseEvent ¶
ParseEvent parses event e and returns event as for example *api.ConsumerAckMetric, all unknown event schemas will be of type *UnknownMessage
func ParsePubAck ¶ added in v0.0.25
ParsePubAck parses a stream publish response and returns an error if the publish failed or parsing failed
func SubjectIsSubsetMatch ¶ added in v0.0.33
SubjectIsSubsetMatch tests if a subject matches a standard nats wildcard
Types ¶
type Consumer ¶
Consumer represents a JetStream consumer
func (*Consumer) AckSampleSubject ¶
AckSampleSubject is the subject used to publish ack samples to
func (*Consumer) AcknowledgedFloor ¶
func (c *Consumer) AcknowledgedFloor() (api.SequenceInfo, error)
AcknowledgedFloor reports the highest contiguous message sequences that were acknowledged
func (*Consumer) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this consumer
func (*Consumer) Configuration ¶
func (c *Consumer) Configuration() (config api.ConsumerConfig)
Configuration is the Consumer configuration
func (*Consumer) Delete ¶
Delete deletes the Consumer, after this the Consumer object should be disposed
func (*Consumer) DeliverGroup ¶ added in v0.0.26
func (*Consumer) DeliverPolicy ¶
func (c *Consumer) DeliverPolicy() api.DeliverPolicy
func (*Consumer) DeliveredState ¶
func (c *Consumer) DeliveredState() (api.SequenceInfo, error)
DeliveredState reports the messages sequences that were successfully delivered
func (*Consumer) DeliverySubject ¶
func (*Consumer) Description ¶ added in v0.0.26
func (*Consumer) DurableName ¶
func (*Consumer) FilterSubject ¶
func (*Consumer) FilterSubjects ¶ added in v0.1.0
func (*Consumer) FlowControl ¶ added in v0.0.21
func (*Consumer) InactiveThreshold ¶ added in v0.0.29
func (*Consumer) IsEphemeral ¶
func (*Consumer) IsHeadersOnly ¶ added in v0.0.27
func (*Consumer) IsPullMode ¶
func (*Consumer) IsPushMode ¶
func (*Consumer) LatestState ¶ added in v0.0.23
func (c *Consumer) LatestState() (api.ConsumerInfo, error)
LatestState returns the most recently loaded state
func (*Consumer) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election
func (*Consumer) MaxAckPending ¶ added in v0.0.20
func (*Consumer) MaxDeliver ¶
func (*Consumer) MaxRequestBatch ¶ added in v0.0.29
func (*Consumer) MaxRequestExpires ¶ added in v0.0.29
func (*Consumer) MaxRequestMaxBytes ¶ added in v0.0.33
func (*Consumer) MaxWaiting ¶ added in v0.0.24
func (*Consumer) MemoryStorage ¶ added in v0.0.33
func (*Consumer) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all metrics for this consumer
func (*Consumer) NextMsg ¶
NextMsg retrieves the next message, waiting up to manager timeout for a response
func (*Consumer) NextMsgContext ¶ added in v0.0.19
NextMsgContext retrieves the next message, interrupted by the cancel context ctx
func (*Consumer) NextMsgRequest ¶ added in v0.0.20
func (c *Consumer) NextMsgRequest(inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages, data or control flow messages will be sent to inbox
func (*Consumer) NextSubject ¶
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Consumer) PendingAcknowledgement ¶ added in v0.0.20
PendingAcknowledgement reports the number of messages sent but not yet acknowledged
func (*Consumer) PendingMessages ¶ added in v0.0.20
PendingMessages is the number of unprocessed messages for this consumer
func (*Consumer) RedeliveryCount ¶
RedeliveryCount reports the number of redelivers that were done
func (*Consumer) ReplayPolicy ¶
func (c *Consumer) ReplayPolicy() api.ReplayPolicy
func (*Consumer) SampleFrequency ¶
func (*Consumer) StartSequence ¶
func (*Consumer) State ¶
func (c *Consumer) State() (api.ConsumerInfo, error)
State loads a snapshot of consumer state including delivery counts, retries and more
func (*Consumer) StreamName ¶
func (*Consumer) UpdateConfiguration ¶ added in v0.0.27
func (c *Consumer) UpdateConfiguration(opts ...ConsumerOption) error
UpdateConfiguration updates the consumer configuration At present the description, ack wait, max deliver, sample frequency, max ack pending, max waiting and header only settings can be changed
func (*Consumer) WaitingClientPulls ¶ added in v0.0.20
WaitingClientPulls is the number of clients that have outstanding pull requests against this consumer
type ConsumerOption ¶
type ConsumerOption func(o *api.ConsumerConfig) error
ConsumerOption configures consumers
func AckWait ¶
func AckWait(t time.Duration) ConsumerOption
AckWait sets the time a delivered message might remain unacknowledged before redelivery is attempted
func AcknowledgeAll ¶
func AcknowledgeAll() ConsumerOption
AcknowledgeAll enables an acknowledgement mode where acknowledging message 100 will also ack the preceding messages
func AcknowledgeExplicit ¶
func AcknowledgeExplicit() ConsumerOption
AcknowledgeExplicit requires that every message received be acknowledged
func AcknowledgeNone ¶
func AcknowledgeNone() ConsumerOption
AcknowledgeNone disables message acknowledgement
func BackoffIntervals ¶ added in v0.0.29
func BackoffIntervals(i ...time.Duration) ConsumerOption
BackoffIntervals sets a series of intervals by which retries will be attempted for this consumr
func BackoffPolicy ¶ added in v0.0.29
func BackoffPolicy(policy []time.Duration) ConsumerOption
BackoffPolicy sets a consumer policy
func ConsumerDescription ¶ added in v0.0.26
func ConsumerDescription(d string) ConsumerOption
ConsumerDescription is a textual description of this consumer to provide additional context
func ConsumerMetadata ¶ added in v0.1.0
func ConsumerMetadata(meta map[string]string) ConsumerOption
func ConsumerName ¶ added in v0.1.0
func ConsumerName(s string) ConsumerOption
ConsumerName sets a name for the consumer, when creating a durable consumer use DurableName, using ConsumerName allows for creating named ephemeral consumers, else a random name will be generated
func ConsumerOverrideMemoryStorage ¶ added in v0.0.33
func ConsumerOverrideMemoryStorage() ConsumerOption
func ConsumerOverrideReplicas ¶ added in v0.0.33
func ConsumerOverrideReplicas(r int) ConsumerOption
ConsumerOverrideReplicas override the replica count inherited from the Stream with this value
func DeliverAllAvailable ¶
func DeliverAllAvailable() ConsumerOption
DeliverAllAvailable delivers messages starting with the first available in the stream
func DeliverGroup ¶ added in v0.0.26
func DeliverGroup(g string) ConsumerOption
DeliverGroup when set will only deliver messages to subscriptions matching that group
func DeliverHeadersOnly ¶ added in v0.0.27
func DeliverHeadersOnly() ConsumerOption
DeliverHeadersOnly configures the consumer to only deliver existing header and the `Nats-Msg-Size` header, no bodies
func DeliverLastPerSubject ¶ added in v0.0.26
func DeliverLastPerSubject() ConsumerOption
DeliverLastPerSubject delivers the last message for each subject in a wildcard stream based on the filter subjects of the consumer
func DeliverySubject ¶
func DeliverySubject(s string) ConsumerOption
DeliverySubject is the subject where a Push consumer will deliver its messages
func DurableName ¶
func DurableName(s string) ConsumerOption
DurableName is the name given to the consumer, when not set an ephemeral consumer is created
func FilterStreamBySubject ¶
func FilterStreamBySubject(s ...string) ConsumerOption
FilterStreamBySubject filters the messages in a wildcard stream to those matching a specific subject
func IdleHeartbeat ¶ added in v0.0.21
func IdleHeartbeat(hb time.Duration) ConsumerOption
IdleHeartbeat sets the time before an idle consumer will send a empty message with Status header 100 indicating the consumer is still alive
func InactiveThreshold ¶ added in v0.0.29
func InactiveThreshold(t time.Duration) ConsumerOption
InactiveThreshold is the idle time an ephemeral consumer allows before it is removed
func LinearBackoffPolicy ¶ added in v0.0.29
LinearBackoffPolicy creates a backoff policy with linearly increasing steps between min and max
func MaxAckPending ¶ added in v0.0.20
func MaxAckPending(pending uint) ConsumerOption
MaxAckPending maximum number of messages without acknowledgement that can be outstanding, once this limit is reached message delivery will be suspended
func MaxDeliveryAttempts ¶
func MaxDeliveryAttempts(n int) ConsumerOption
MaxDeliveryAttempts is the number of times a message will be attempted to be delivered
func MaxRequestBatch ¶ added in v0.0.29
func MaxRequestBatch(max uint) ConsumerOption
MaxRequestBatch is the largest batch that can be specified when doing pulls against the consumer
func MaxRequestExpires ¶ added in v0.0.29
func MaxRequestExpires(max time.Duration) ConsumerOption
MaxRequestExpires is the longest pull request expire the server will allow
func MaxRequestMaxBytes ¶ added in v0.0.33
func MaxRequestMaxBytes(max int) ConsumerOption
MaxRequestMaxBytes sets the limit of max bytes a consumer my request
func MaxWaiting ¶ added in v0.0.24
func MaxWaiting(pulls uint) ConsumerOption
MaxWaiting is the number of outstanding pulls that are allowed on any one consumer. Pulls made that exceeds this limit are discarded.
func PushFlowControl ¶ added in v0.0.21
func PushFlowControl() ConsumerOption
PushFlowControl enables flow control for push based consumers
func RateLimitBitsPerSecond ¶ added in v0.0.18
func RateLimitBitsPerSecond(bps uint64) ConsumerOption
RateLimitBitsPerSecond limits message delivery to a rate in bits per second
func ReplayAsReceived ¶
func ReplayAsReceived() ConsumerOption
ReplayAsReceived delivers messages at the rate they were received at
func ReplayInstantly ¶
func ReplayInstantly() ConsumerOption
ReplayInstantly delivers messages to the consumer as fast as possible
func SamplePercent ¶
func SamplePercent(i int) ConsumerOption
SamplePercent configures sampling of a subset of messages expressed as a percentage
func StartAtSequence ¶
func StartAtSequence(s uint64) ConsumerOption
StartAtSequence starts consuming messages at a specific sequence in the stream
func StartAtTime ¶
func StartAtTime(t time.Time) ConsumerOption
StartAtTime starts consuming messages at a specific point in time in the stream
func StartAtTimeDelta ¶
func StartAtTimeDelta(d time.Duration) ConsumerOption
StartAtTimeDelta starts delivering messages at a past point in time
func StartWithLastReceived ¶
func StartWithLastReceived() ConsumerOption
StartWithLastReceived starts delivery at the last messages received in the stream
func StartWithNextReceived ¶
func StartWithNextReceived() ConsumerOption
StartWithNextReceived starts delivery at the next messages received in the stream
type Manager ¶ added in v0.0.19
func (*Manager) ConsumerNames ¶ added in v0.0.19
ConsumerNames is a sorted list of all known consumers within a stream
func (*Manager) Consumers ¶ added in v0.0.19
Consumers is a sorted list of all known Consumers within a Stream and a list of any consumer names that were known but no details were found
func (*Manager) DeleteConsumer ¶ added in v0.0.34
DeleteConsumer removes a consumer without all the drama of loading it etc
func (*Manager) DeleteStream ¶ added in v0.0.34
DeleteStream removes a stream without all the drama of loading it etc
func (*Manager) DeleteStreamMessage ¶ added in v0.0.25
DeleteStreamMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (*Manager) EachStream ¶ added in v0.0.19
func (m *Manager) EachStream(filter *StreamNamesFilter, cb func(*Stream)) (missing []string, err error)
EachStream iterates over all known Streams, does not handle any streams the cluster could not get data from but returns a list of those
func (*Manager) IsJetStreamEnabled ¶ added in v0.0.19
IsJetStreamEnabled determines if JetStream is enabled for the current account
func (*Manager) IsKnownConsumer ¶ added in v0.0.19
IsKnownConsumer determines if a Consumer is known for a specific Stream
func (*Manager) IsKnownStream ¶ added in v0.0.19
IsKnownStream determines if a Stream is known
func (*Manager) IsStreamMaxBytesRequired ¶ added in v0.0.31
IsStreamMaxBytesRequired determines if the JetStream account requires streams to set a byte limit
func (*Manager) JetStreamAccountInfo ¶ added in v0.0.19
func (m *Manager) JetStreamAccountInfo() (info *api.JetStreamAccountStats, err error)
JetStreamAccountInfo retrieves information about the current account limits and more
func (*Manager) LoadConsumer ¶ added in v0.0.19
LoadConsumer loads a consumer by name
func (*Manager) LoadOrNewConsumer ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads a consumer by name if known else creates a new one with these properties
func (*Manager) LoadOrNewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template
func (*Manager) LoadOrNewStream ¶ added in v0.0.19
func (m *Manager) LoadOrNewStream(name string, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStream loads an existing stream or creates a new one matching opts
func (*Manager) LoadOrNewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) LoadOrNewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
LoadOrNewStreamFromDefault loads an existing stream or creates a new one matching opts and template
func (*Manager) LoadStream ¶ added in v0.0.19
LoadStream loads a stream by name
func (*Manager) MetaLeaderStandDown ¶ added in v0.0.21
MetaLeaderStandDown requests the meta group leader to stand down, must be initiated by a system user
func (*Manager) MetaPeerRemove ¶ added in v0.0.21
MetaPeerRemove removes a peer from the JetStream meta cluster, evicting all streams, consumer etc. Use with extreme caution. If id is given it will be used by the server else name, it's generally best to remove by id
func (*Manager) MetaPurgeAccount ¶ added in v0.0.35
MetaPurgeAccount removes all data from an account, must be run in the system account
func (*Manager) NatsConn ¶ added in v0.0.25
func (m *Manager) NatsConn() *nats.Conn
NatsConn gives access to the underlying NATS Connection
func (*Manager) NewConsumer ¶ added in v0.0.19
func (m *Manager) NewConsumer(stream string, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a consumer based on DefaultConsumer modified by opts
func (*Manager) NewConsumerFromDefault ¶ added in v0.0.19
func (m *Manager) NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer based on a template config that gets modified by opts
func (*Manager) NewStream ¶ added in v0.0.19
func (m *Manager) NewStream(name string, opts ...StreamOption) (stream *Stream, err error)
NewStream creates a new stream using DefaultStream as a starting template allowing adjustments to be made using options
func (*Manager) NewStreamConfiguration ¶ added in v0.0.19
func (m *Manager) NewStreamConfiguration(template api.StreamConfig, opts ...StreamOption) (*api.StreamConfig, error)
NewStreamConfiguration generates a new configuration based on template modified by opts
func (*Manager) NewStreamFromDefault ¶ added in v0.0.19
func (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error)
NewStreamFromDefault creates a new stream based on a supplied template and options
func (*Manager) NextMsg ¶ added in v0.0.19
NextMsg requests the next message from the server with the manager timeout
func (*Manager) NextMsgContext ¶ added in v0.0.19
func (m *Manager) NextMsgContext(ctx context.Context, stream string, consumer string) (*nats.Msg, error)
NextMsgContext requests the next message from the server. This request will wait for as long as the context is active. If repeated pulls will be made it's better to use NextMsgRequest()
func (*Manager) NextMsgRequest ¶ added in v0.0.20
func (m *Manager) NextMsgRequest(stream string, consumer string, inbox string, req *api.JSApiConsumerGetNextRequest) error
NextMsgRequest creates a request for a batch of messages on a consumer, data or control flow messages will be sent to inbox
func (*Manager) NextSubject ¶ added in v0.0.21
NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
func (*Manager) QueryStreams ¶ added in v0.0.29
func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error)
QueryStreams filters the streams found in JetStream using various filter options
func (*Manager) ReadLastMessageForSubject ¶ added in v0.0.25
func (m *Manager) ReadLastMessageForSubject(stream string, sub string) (msg *api.StoredMsg, err error)
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Manager) RestoreSnapshotFromDirectory ¶ added in v0.0.21
func (m *Manager) RestoreSnapshotFromDirectory(ctx context.Context, stream string, dir string, opts ...SnapshotOption) (RestoreProgress, *api.StreamState, error)
func (*Manager) StreamContainedSubjects ¶ added in v0.0.34
func (m *Manager) StreamContainedSubjects(stream string, filter ...string) (map[string]uint64, error)
StreamContainedSubjects queries the stream for the subjects it holds with optional filter
func (*Manager) StreamNames ¶ added in v0.0.19
func (m *Manager) StreamNames(filter *StreamNamesFilter) (names []string, err error)
StreamNames is a sorted list of all known Streams filtered by filter
func (*Manager) StreamTemplateNames ¶ added in v0.0.19
StreamTemplateNames is a sorted list of all known StreamTemplates
type MsgInfo ¶
type MsgInfo struct {
// contains filtered or unexported fields
}
MsgInfo holds metadata about a message that was received from JetStream
func ParseJSMsgMetadata ¶
ParseJSMsgMetadata parse the reply subject metadata to determine message metadata
func ParseJSMsgMetadataReply ¶ added in v0.0.20
ParseJSMsgMetadataReply parses the reply subject of a JetStream originated message
func (*MsgInfo) ConsumerSequence ¶
ConsumerSequence is the sequence of this message in the consumer
func (*MsgInfo) Delivered ¶
Delivered is the number of times this message had delivery attempts including this one
func (*MsgInfo) Pending ¶ added in v0.0.20
Pending is the number of messages left to consume, -1 when the number is not reported
func (*MsgInfo) StreamSequence ¶
StreamSequence is the sequence of this message in the stream
type Option ¶ added in v0.0.19
type Option func(o *Manager)
Option is a option to configure the JetStream Manager
func WithAPIPrefix ¶ added in v0.0.21
WithAPIPrefix replace API endpoints like $JS.API.STREAM.NAMES with prefix.STREAM.NAMES
func WithAPIValidation ¶
func WithAPIValidation(v api.StructValidator) Option
WithAPIValidation validates responses sent from the NATS server using a validator
func WithDomain ¶ added in v0.0.24
WithDomain sets a JetStream domain, incompatible with WithApiPrefix()
func WithEventPrefix ¶ added in v0.0.21
WithEventPrefix replace event subjects like $JS.EVENT.ADVISORY.API with prefix.ADVISORY
func WithTimeout ¶
WithTimeout sets a timeout for the requests
type PagerOption ¶ added in v0.0.19
type PagerOption func(p *StreamPager)
PagerOption configures the stream pager
func PagerFilterSubject ¶ added in v0.0.23
func PagerFilterSubject(s string) PagerOption
PagerFilterSubject sets a filter subject for the pager
func PagerSize ¶ added in v0.0.19
func PagerSize(sz int) PagerOption
PagerSize is the size of pages to walk
func PagerStartDelta ¶ added in v0.0.19
func PagerStartDelta(d time.Duration) PagerOption
PagerStartDelta sets a starting time delta for the pager
func PagerStartId ¶ added in v0.0.19
func PagerStartId(id int) PagerOption
PagerStartId sets a starting stream sequence for the pager
func PagerTimeout ¶ added in v0.0.19
func PagerTimeout(d time.Duration) PagerOption
PagerTimeout is the time to wait for messages before it's assumed the end of the stream was reached
type RestoreProgress ¶
type RestoreProgress interface { // StartTime is when the process started StartTime() time.Time // EndTime is when the process ended - zero when not completed EndTime() time.Time // ChunkSize is the size of the data packets sent over NATS ChunkSize() int // ChunksSent is the number of chunks of size ChunkSize that was sent ChunksSent() uint32 // ChunksToSend number of chunks of ChunkSize expected to be sent ChunksToSend() int // BytesSent is the number of bytes sent so far BytesSent() uint64 // BytesPerSecond is the number of bytes received in the last second, 0 during the first second BytesPerSecond() uint64 }
type SnapshotOption ¶
type SnapshotOption func(o *snapshotOptions)
func RestoreConfiguration ¶ added in v0.0.22
func RestoreConfiguration(cfg api.StreamConfig) SnapshotOption
RestoreConfiguration overrides the configuration used to restore
func RestoreNotify ¶
func RestoreNotify(cb func(RestoreProgress)) SnapshotOption
RestoreNotify notifies cb about progress of the restore operation
func SnapshotConsumers ¶
func SnapshotConsumers() SnapshotOption
SnapshotConsumers includes consumer configuration and state in backups
func SnapshotDebug ¶
func SnapshotDebug() SnapshotOption
SnapshotDebug enables logging using the standard go logging library
func SnapshotHealthCheck ¶
func SnapshotHealthCheck() SnapshotOption
SnapshotHealthCheck performs a health check prior to starting the snapshot
func SnapshotNotify ¶
func SnapshotNotify(cb func(SnapshotProgress)) SnapshotOption
SnapshotNotify notifies cb about progress of the snapshot operation
type SnapshotProgress ¶
type SnapshotProgress interface { // StartTime is when the process started StartTime() time.Time // EndTime is when the process ended - zero when not completed EndTime() time.Time // ChunkSize is the size of the data packets sent over NATS ChunkSize() int // ChunksReceived is how many chunks of ChunkSize were received ChunksReceived() uint32 // BytesExpected is how many Bytes we should be receiving BytesExpected() uint64 // BytesReceived is how many Bytes have been received BytesReceived() uint64 // UncompressedBytesReceived is the number of bytes received uncompressed UncompressedBytesReceived() uint64 // BytesPerSecond is the number of bytes received in the last second, 0 during the first second BytesPerSecond() uint64 // HealthCheck indicates if health checking was requested HealthCheck() bool // Finished will be true after all data have been written Finished() bool }
type Stream ¶
Stream represents a JetStream Stream
func (*Stream) AdvisorySubject ¶
AdvisorySubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) Compression ¶ added in v0.1.0
func (s *Stream) Compression() api.Compression
func (*Stream) Configuration ¶
func (s *Stream) Configuration() api.StreamConfig
func (*Stream) ConsumerLimits ¶ added in v0.1.1
func (s *Stream) ConsumerLimits() api.StreamConsumerLimits
func (*Stream) ConsumerNames ¶
ConsumerNames is a list of all known consumers for this Stream
func (*Stream) ContainedSubjects ¶ added in v0.0.34
ContainedSubjects queries the stream for the subjects it holds with optional filter
func (*Stream) DeleteAllowed ¶ added in v0.0.34
func (*Stream) DeleteMessage ¶
DeleteMessage deletes a specific message from the Stream by overwriting it with random data, see FastDeleteMessage() to remove the message without over writing data
func (*Stream) Description ¶ added in v0.0.26
func (*Stream) DetectGaps ¶ added in v0.1.0
func (s *Stream) DetectGaps(ctx context.Context, progress func(seq uint64, pending uint64), gap func(first uint64, last uint64)) error
DetectGaps detects interior deletes in a stream, reports progress through the stream and each found gap.
It uses the extended stream info to get the sequences and use that to detect gaps. The Deleted information in StreamInfo is capped at some amount so if it determines there are more messages that are deleted in the stream it will then make a consumer and walk the remainder of the stream to detect gaps the hard way
func (*Stream) DirectAllowed ¶ added in v0.0.34
func (*Stream) DiscardNewPerSubject ¶ added in v0.0.35
func (*Stream) DiscardPolicy ¶ added in v0.0.35
func (s *Stream) DiscardPolicy() api.DiscardPolicy
func (*Stream) DuplicateWindow ¶ added in v0.0.18
func (*Stream) EachConsumer ¶
EachConsumer calls cb with each known consumer for this stream, error on any error to load consumers
func (*Stream) FastDeleteMessage ¶ added in v0.0.25
FastDeleteMessage deletes a specific message from the Stream without erasing the data, see DeleteMessage() for a safe delete
func (*Stream) FirstSequence ¶ added in v0.1.0
func (*Stream) Information ¶
func (s *Stream) Information(req ...api.JSApiStreamInfoRequest) (info *api.StreamInfo, err error)
Information loads the current stream information
func (*Stream) IsCompressed ¶ added in v0.1.0
IsCompressed determines if a stream is compressed
func (*Stream) IsInternal ¶ added in v0.0.27
IsInternal indicates if a stream is considered 'internal' by the NATS team, that is, it's a backing stream for KV, Object or MQTT state
func (*Stream) IsKVBucket ¶ added in v0.0.27
IsKVBucket determines if a stream is a KV bucket
func (*Stream) IsMQTTState ¶ added in v0.0.27
IsMQTTState determines if a stream holds internal MQTT state
func (*Stream) IsMirror ¶ added in v0.0.21
IsMirror determines if this stream is a mirror of another
func (*Stream) IsObjectBucket ¶ added in v0.0.27
IsObjectBucket determines if a stream is a Object bucket
func (*Stream) IsRepublishing ¶ added in v0.0.33
func (*Stream) IsSourced ¶ added in v0.0.21
IsSourced determines if this stream is sourcing data from another stream. Other streams could be synced to this stream and it would not be reported by this property
func (*Stream) IsTemplateManaged ¶
IsTemplateManaged determines if this stream is managed by a template
func (*Stream) LatestInformation ¶
func (s *Stream) LatestInformation() (info *api.StreamInfo, err error)
LatestInformation returns the most recently fetched stream information
func (*Stream) LatestState ¶
func (s *Stream) LatestState() (state api.StreamState, err error)
LatestState returns the most recently fetched stream state
func (*Stream) LeaderStepDown ¶ added in v0.0.21
LeaderStepDown requests the current RAFT group leader in a clustered JetStream to stand down forcing a new election
func (*Stream) LoadConsumer ¶
LoadConsumer loads a named consumer related to this Stream
func (*Stream) LoadOrNewConsumer ¶
func (s *Stream) LoadOrNewConsumer(name string, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumer loads or creates a consumer based on these options
func (*Stream) LoadOrNewConsumerFromDefault ¶
func (s *Stream) LoadOrNewConsumerFromDefault(name string, deflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
LoadOrNewConsumerFromDefault loads or creates a consumer based on these options that adjust supplied template
func (*Stream) MaxConsumers ¶
func (*Stream) MaxMsgSize ¶
func (*Stream) MaxMsgsPerSubject ¶ added in v0.0.24
func (*Stream) MetricSubject ¶
MetricSubject is a wildcard subscription subject that subscribes to all advisories for this stream
func (*Stream) Mirror ¶ added in v0.0.21
func (s *Stream) Mirror() *api.StreamSource
func (*Stream) MirrorDirectAllowed ¶ added in v0.0.34
func (*Stream) NewConsumer ¶
func (s *Stream) NewConsumer(opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumer creates a new consumer in this Stream based on DefaultConsumer
func (*Stream) NewConsumerFromDefault ¶
func (s *Stream) NewConsumerFromDefault(dflt api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error)
NewConsumerFromDefault creates a new consumer in this Stream based on a supplied template config
func (*Stream) PageContents ¶ added in v0.0.19
func (s *Stream) PageContents(opts ...PagerOption) (*StreamPager, error)
PageContents creates a StreamPager used to traverse the contents of the stream, Close() should be called to dispose of the background consumer and resources
func (*Stream) Purge ¶
func (s *Stream) Purge(opts ...*api.JSApiStreamPurgeRequest) error
Purge deletes messages from the Stream, an optional JSApiStreamPurgeRequest can be supplied to limit the purge to a subset of messages
func (*Stream) PurgeAllowed ¶ added in v0.0.27
func (*Stream) ReadLastMessageForSubject ¶ added in v0.0.25
ReadLastMessageForSubject reads the last message stored in the stream for a specific subject
func (*Stream) ReadMessage ¶
ReadMessage loads a message from the stream by its sequence number
func (*Stream) RemoveRAFTPeer ¶ added in v0.0.21
RemoveRAFTPeer removes a peer from the group indicating it will not return
func (*Stream) Retention ¶
func (s *Stream) Retention() api.RetentionPolicy
func (*Stream) RollupAllowed ¶ added in v0.0.27
func (*Stream) Seal ¶ added in v0.0.27
Seal updates a stream so that messages can not be added or removed using the API and limits will not be processed - messages will never age out. A sealed stream can not be unsealed.
func (*Stream) SnapshotToDirectory ¶ added in v0.0.21
func (s *Stream) SnapshotToDirectory(ctx context.Context, dir string, opts ...SnapshotOption) (SnapshotProgress, error)
SnapshotToDirectory creates a backup into s2 compressed tar file
func (*Stream) Sources ¶ added in v0.0.21
func (s *Stream) Sources() []*api.StreamSource
func (*Stream) State ¶
func (s *Stream) State(req ...api.JSApiStreamInfoRequest) (stats api.StreamState, err error)
State retrieves the Stream State
func (*Stream) Storage ¶
func (s *Stream) Storage() api.StorageType
func (*Stream) UpdateConfiguration ¶
func (s *Stream) UpdateConfiguration(cfg api.StreamConfig, opts ...StreamOption) error
UpdateConfiguration updates the stream using cfg modified by opts, reloads configuration from the server post update
type StreamNamesFilter ¶ added in v0.0.20
type StreamNamesFilter struct { // Subject filter the names to those consuming messages matching this subject or wildcard Subject string `json:"subject,omitempty"` }
StreamNamesFilter limits the names being returned by the names API
type StreamOption ¶
type StreamOption func(o *api.StreamConfig) error
StreamOption configures a stream
func AllowDirect ¶ added in v0.0.34
func AllowDirect() StreamOption
func AllowRollup ¶ added in v0.0.27
func AllowRollup() StreamOption
func AppendSource ¶ added in v0.0.21
func AppendSource(source *api.StreamSource) StreamOption
func Compression ¶ added in v0.1.0
func Compression(alg api.Compression) StreamOption
func ConsumerLimits ¶ added in v0.1.1
func ConsumerLimits(limits api.StreamConsumerLimits) StreamOption
func DenyDelete ¶ added in v0.0.27
func DenyDelete() StreamOption
func DenyPurge ¶ added in v0.0.27
func DenyPurge() StreamOption
func DiscardNew ¶
func DiscardNew() StreamOption
func DiscardNewPerSubject ¶ added in v0.0.35
func DiscardNewPerSubject() StreamOption
func DiscardOld ¶
func DiscardOld() StreamOption
func DuplicateWindow ¶ added in v0.0.18
func DuplicateWindow(d time.Duration) StreamOption
func FileStorage ¶
func FileStorage() StreamOption
func FirstSequence ¶ added in v0.1.0
func FirstSequence(seq uint64) StreamOption
func InterestRetention ¶
func InterestRetention() StreamOption
func LimitsRetention ¶
func LimitsRetention() StreamOption
func MaxAge ¶
func MaxAge(m time.Duration) StreamOption
func MaxBytes ¶
func MaxBytes(m int64) StreamOption
func MaxConsumers ¶
func MaxConsumers(m int) StreamOption
func MaxMessageSize ¶
func MaxMessageSize(m int32) StreamOption
func MaxMessages ¶
func MaxMessages(m int64) StreamOption
func MaxMessagesPerSubject ¶ added in v0.0.24
func MaxMessagesPerSubject(m int64) StreamOption
func MemoryStorage ¶
func MemoryStorage() StreamOption
func Mirror ¶ added in v0.0.21
func Mirror(stream *api.StreamSource) StreamOption
func MirrorDirect ¶ added in v0.0.34
func MirrorDirect() StreamOption
func NoAck ¶
func NoAck() StreamOption
func NoAllowDirect ¶ added in v0.0.34
func NoAllowDirect() StreamOption
func NoMirrorDirect ¶ added in v0.0.34
func NoMirrorDirect() StreamOption
func PlacementCluster ¶ added in v0.0.21
func PlacementCluster(cluster string) StreamOption
func PlacementTags ¶ added in v0.0.21
func PlacementTags(tags ...string) StreamOption
func Replicas ¶
func Replicas(r int) StreamOption
func Republish ¶ added in v0.0.33
func Republish(m *api.RePublish) StreamOption
func Sources ¶ added in v0.0.21
func Sources(streams ...*api.StreamSource) StreamOption
func StreamDescription ¶ added in v0.0.26
func StreamDescription(d string) StreamOption
StreamDescription is a textual description of this stream to provide additional context
func StreamMetadata ¶ added in v0.1.0
func StreamMetadata(meta map[string]string) StreamOption
func Subjects ¶
func Subjects(s ...string) StreamOption
func WorkQueueRetention ¶
func WorkQueueRetention() StreamOption
type StreamPager ¶ added in v0.0.19
type StreamPager struct {
// contains filtered or unexported fields
}
func (*StreamPager) Close ¶ added in v0.0.19
func (p *StreamPager) Close() error
Close dispose of the resources used by the pager and should be called when done
func (*StreamPager) NextMsg ¶ added in v0.0.19
func (p *StreamPager) NextMsg(ctx context.Context) (msg *nats.Msg, last bool, err error)
NextMsg retrieves the next message from the pager interrupted by ctx.
last indicates if the message is the last in the current page, the next call to NextMsg will first request the next page, if the client is prompting users to continue to the next page it should be done when last is true
When the end of the stream is reached err will be non nil and last will be true otherwise err being non nil while last is false indicate a failed state. End is indicated by no new messages arriving after ctx timeout or the time set using PagerTimes() is reached
type StreamQueryOpt ¶ added in v0.0.29
type StreamQueryOpt func(query *streamQuery) error
func StreamQueryClusterName ¶ added in v0.0.29
func StreamQueryClusterName(c string) StreamQueryOpt
StreamQueryClusterName limits results to servers within a cluster matched by a regular expression
func StreamQueryExpression ¶ added in v0.1.0
func StreamQueryExpression(e string) StreamQueryOpt
StreamQueryExpression filters the stream using the expr expression language
func StreamQueryFewerConsumersThan ¶ added in v0.0.29
func StreamQueryFewerConsumersThan(c uint) StreamQueryOpt
StreamQueryFewerConsumersThan limits results to streams with fewer than or equal consumers than c
func StreamQueryIdleLongerThan ¶ added in v0.0.29
func StreamQueryIdleLongerThan(p time.Duration) StreamQueryOpt
StreamQueryIdleLongerThan limits results to streams that has not received messages for a period longer than p
func StreamQueryInvert ¶ added in v0.0.29
func StreamQueryInvert() StreamQueryOpt
StreamQueryInvert inverts the logic of filters, older than becomes newer than and so forth
func StreamQueryIsMirror ¶ added in v0.1.0
func StreamQueryIsMirror() StreamQueryOpt
func StreamQueryIsSourced ¶ added in v0.1.0
func StreamQueryIsSourced() StreamQueryOpt
func StreamQueryOlderThan ¶ added in v0.0.29
func StreamQueryOlderThan(p time.Duration) StreamQueryOpt
StreamQueryOlderThan limits the results to streams older than p
func StreamQueryReplicas ¶ added in v0.1.0
func StreamQueryReplicas(r uint) StreamQueryOpt
StreamQueryReplicas finds streams with a certain number of replicas or less
func StreamQueryServerName ¶ added in v0.0.29
func StreamQueryServerName(s string) StreamQueryOpt
StreamQueryServerName limits results to servers matching a regular expression
func StreamQuerySubjectWildcard ¶ added in v0.0.33
func StreamQuerySubjectWildcard(s string) StreamQueryOpt
StreamQuerySubjectWildcard limits results to streams with subject interest matching standard a nats wildcard
func StreamQueryWithoutMessages ¶ added in v0.0.29
func StreamQueryWithoutMessages() StreamQueryOpt
StreamQueryWithoutMessages limits results to streams with no messages
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context.
|
Package natscontext provides a way for sets of configuration options to be stored in named files and later retrieved either by name or if no name is supplied by access a chosen default context. |