stream

package
v2.0.0-...-90a9bfb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package stream docs are in need of some❤️ TODO: write package docs See https://github.com/rabbitmq/amqp091-go/blob/main/doc.go for inspiration.

Index

Constants

View Source
const (
	DefaultUri                              = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
	DefaultHost                             = "localhost"
	DefaultPort                             = 5552
	DefaultUsername                         = "guest"
	DefaultPassword                         = "guest"
	DefaultVirtualHost                      = "/"
	DefaultMaxProducersByConnection         = 255
	DefaultMaxTrackingConsumersByConnection = 50
	DefaultMaxConsumersByConnection         = 255
	DefaultLazyInitialization               = false
	DefaultId                               = "rabbitmq-stream"
)

Default values used in EnvironmentConfiguration instead of zero-values

View Source
const (
	Kilobyte = 1_000
	Megabyte = Kilobyte * 1_000
	Gigabyte = Megabyte * 1_000
	Terabyte = Gigabyte * 1_000
)
View Source
const (
	// DefaultTimeout in all network calls.
	DefaultTimeout = time.Second * 30
)

Variables

View Source
var (
	ErrNoLocators           = fmt.Errorf("no locators configured")
	ErrUnsupportedOperation = fmt.Errorf("unsupported operation")
)

Functions

func NewHeartBeater

func NewHeartBeater(duration time.Duration, client raw.Clienter, logger *slog.Logger) *heartBeater

Types

type ByteCapacity

type ByteCapacity uint64

func (ByteCapacity) String

func (b ByteCapacity) String() string

type CreateStreamOptions

type CreateStreamOptions struct {
	MaxAge         time.Duration
	MaxLength      ByteCapacity
	MaxSegmentSize ByteCapacity
}

type DoneChan

type DoneChan struct {
	C chan struct{}
	// contains filtered or unexported fields
}

func NewDoneChan

func NewDoneChan() *DoneChan

func (*DoneChan) GracefulClose

func (dc *DoneChan) GracefulClose()

GracefulClose closes the DoneChan only if the Done chan is not already closed.

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

func NewEnvironment

func NewEnvironment(ctx context.Context, configuration EnvironmentConfiguration) (*Environment, error)

func (*Environment) Close

func (e *Environment) Close(ctx context.Context)

Close the connection to RabbitMQ server. This function closes all connections to RabbitMQ gracefully. A graceful disconnection sends a close request to RabbitMQ and awaits a confirmation response. If there's any error closing a connection, the error is logged to a logger extracted from the context.

func (*Environment) CreateStream

func (e *Environment) CreateStream(ctx context.Context, name string, opts CreateStreamOptions) error

CreateStream with name and given options.

func (*Environment) DeleteStream

func (e *Environment) DeleteStream(ctx context.Context, name string) error

DeleteStream with given name. Returns an error if the stream does not exist, or if any unknown error occurs. The context may carry a slog.Logger to log operations and intermediate errors, if any.

See also: raw.NewContextWithLogger

func (*Environment) QueryOffset

func (e *Environment) QueryOffset(ctx context.Context, consumer, stream string) (uint64, error)

QueryOffset retrieves the last consumer offset stored for a given consumer name and stream name.

func (*Environment) QueryPartitions

func (e *Environment) QueryPartitions(ctx context.Context, superstream string) ([]string, error)

QueryPartitions returns a list of partition streams for a given superstream name

func (*Environment) QuerySequence

func (e *Environment) QuerySequence(ctx context.Context, reference, stream string) (uint64, error)

QuerySequence retrieves the last publishingID for a given producer (reference) and stream name.

func (*Environment) QueryStreamStats

func (e *Environment) QueryStreamStats(ctx context.Context, name string) (Stats, error)

QueryStreamStats queries the server for Stats from a given stream name. Stats available are 'first offset id' and 'committed chunk id'

This command is available in RabbitMQ 3.11+

type EnvironmentConfiguration

type EnvironmentConfiguration struct {
	// The URI of the nodes to try to connect to (cluster). This takes precedence
	// over URI and Host + Port.
	//
	// If Uris, Uri, Host and Port are zero-values, it will
	// default to "rabbitmq-stream://guest:guest@localhost:5552/%2f"
	Uris []string
	// The URI of the node to connect to (single node). This takes precedence over Host + Port.
	//
	// If Uris, Uri, Host and Port are zero-values, it will
	// default to "rabbitmq-stream://guest:guest@localhost:5552/%2f"
	Uri string
	// Host to connect to. Uris and Uri take precedence over this. Leave Uris and Uri unset
	// to use this and Port for connection.
	//
	// If Uris, Uri, Host and Port are zero-values, it will
	// default to "localhost"
	Host string
	// Port to use. Uris and Uri take precedence over this. Leave Uris and Uri unset
	// to use this and Host for connection.
	//
	// If Uris, Uri, Host and Port are zero-values, it will
	//default to 5552
	Port int
	// Username to use to connect.
	//
	// Default: "guest"
	Username string
	// Password to use to connect
	//
	// Default: "guest"
	Password string
	// Virtual host to connect to
	//
	// Default: "/"
	VirtualHost string
	// The maximum number of `Producer` instances a single connection can maintain
	// before a new connection is open. The value must be between 1 and 255
	//
	// Default: 255
	MaxProducersByConnection int
	// The maximum number of `Consumer` instances that store their offset a single
	// connection can maintain before a new connection is open. The value must be
	// between 1 and 255
	//
	// Default: 50
	MaxTrackingConsumersByConnection int
	// The maximum number of `Consumer` instances a single connection can maintain
	// before a new connection is open. The value must be between 1 and 255
	//
	// Default: 255
	MaxConsumersByConnection int
	// To delay the connection opening until necessary
	//
	// Default: false
	LazyInitialization bool
	// Informational ID for the environment instance. Used as a prefix for connection
	// names
	//
	// Default: "rabbitmq-stream"
	Id string
}

type EnvironmentConfigurationOption

type EnvironmentConfigurationOption func(*EnvironmentConfiguration)

func WithId

WithId configures the environment informational ID for the environment instance. Used as a prefix for connection names.

func WithLazyInitialization

func WithLazyInitialization(lazy bool) EnvironmentConfigurationOption

WithLazyInitialization configures the environment to use lazy initialization. With lazy initialization enabled, it will delay the connection opening until necessary.

func WithMaxConsumersByConnection

func WithMaxConsumersByConnection(n int) EnvironmentConfigurationOption

WithMaxConsumersByConnection configures the environment with N maximum number of consumers per connection. Additional connections will be created when this number is reached.

Valid range is 1 <= N <= 255. If N < 1, max consumers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.

func WithMaxProducersByConnection

func WithMaxProducersByConnection(n int) EnvironmentConfigurationOption

WithMaxProducersByConnection configures the environment with N maximum number of producers per connection. Additional connections will be created when this number is reached.

Valid range is 1 <= N <= 255. If N < 1, max producers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.

func WithMaxTrackingConsumersByConnection

func WithMaxTrackingConsumersByConnection(n int) EnvironmentConfigurationOption

WithMaxTrackingConsumersByConnection configures the environment with N maximum number of tracking consumers per connection. Additional connections will be created when this number is reached.

Consumers that don't use automatic offset tracking strategy do not count towards this limit.

Valid range is 1 <= N <= 255. If N < 1, max tracking consumers per connection will be set to 1. If N > 255, will be set to 255. Set to N otherwise.

func WithUri

WithUri configures the environment with the attributes from the URI. URI must conform to the general form:

[scheme:][//[userinfo@]host][/]path

Whilst it is possible to write a URI as "/some-vhost", it is advisable to write the full URI to avoid ambiguities during parsing.

func WithUris

func WithUris(uris ...string) EnvironmentConfigurationOption

WithUris configures the environment with the attributes from the first URI, and keeps the other URIs. Having multiple URIs is useful in clusters, so that different URIs can be tried if one RabbitMQ server becomes unavailable.

URI must conform to the general form:

[scheme:][//[userinfo@]host][/]path

Whilst it is possible to write a URI as "/some-vhost", it is advisable to write the full URI to avoid ambiguities during parsing.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

func (Stats) CommittedChunkId

func (s Stats) CommittedChunkId() int64

func (Stats) FirstOffset

func (s Stats) FirstOffset() int64

type StreamOptions deprecated

type StreamOptions = CreateStreamOptions

StreamOptions is an alias for backwards compatibility with v1 of this client.

Deprecated: use CreateStreamOptions. This alias is kept for backwards compatibility

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL