Documentation ¶
Index ¶
- Constants
- Variables
- func IsVersionGreaterOrEqual(version, target string) bool
- func SetLevelInfo(value int8)
- type AddressResolver
- type AutoCommitStrategy
- type BindingsOptions
- func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
- func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
- func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
- func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
- func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
- type Broker
- type Brokers
- type ByteCapacity
- func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
- func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
- func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
- type CPartitionClose
- type CPartitionContext
- type ChannelClose
- type ChannelPublishConfirm
- type Client
- func (c *Client) BrokerForConsumer(stream string) (*Broker, error)
- func (c *Client) BrokerLeader(stream string) (*Broker, error)
- func (c *Client) Close() error
- func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
- func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
- func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
- func (c *Client) DeleteStream(streamName string) error
- func (c *Client) DeleteSuperStream(superStream string) error
- func (c *Client) QueryPartitions(superStream string) ([]string, error)
- func (c *Client) StreamExists(stream string) bool
- func (c *Client) StreamStats(streamName string) (*StreamStats, error)
- type ClientProperties
- type Code
- type Compression
- type ConfirmationStatus
- func (cs *ConfirmationStatus) GetError() error
- func (cs *ConfirmationStatus) GetErrorCode() uint16
- func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
- func (cs *ConfirmationStatus) GetProducerID() uint8
- func (cs *ConfirmationStatus) GetPublishingId() int64
- func (cs *ConfirmationStatus) IsConfirmed() bool
- func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
- type ConnectionProperties
- type Consumer
- func (consumer *Consumer) Close() error
- func (consumer *Consumer) GetCloseHandler() chan Event
- func (consumer *Consumer) GetLastStoredOffset() int64
- func (consumer *Consumer) GetName() string
- func (consumer *Consumer) GetOffset() int64
- func (consumer *Consumer) GetStreamName() string
- func (consumer *Consumer) NotifyClose() ChannelClose
- func (consumer *Consumer) QueryOffset() (int64, error)
- func (consumer *Consumer) StoreCustomOffset(offset int64) error
- func (consumer *Consumer) StoreOffset() error
- type ConsumerContext
- type ConsumerFilter
- type ConsumerOptions
- func (c *ConsumerOptions) IsFilterEnabled() bool
- func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
- func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
- func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
- func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
- func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
- func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
- func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
- func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
- func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
- func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
- type ConsumerUpdate
- type Coordinator
- func (coordinator *Coordinator) ConsumersCount() int
- func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
- func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
- func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
- func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
- func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
- func (coordinator *Coordinator) NewProducer(parameters *ProducerOptions) (*Producer, error)
- func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
- func (coordinator *Coordinator) NewResponseWitName(id string) *Response
- func (coordinator *Coordinator) Producers() map[interface{}]interface{}
- func (coordinator *Coordinator) ProducersCount() int
- func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
- func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
- func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
- func (coordinator *Coordinator) RemoveResponseByName(id string) error
- type Environment
- func (env *Environment) Close() error
- func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
- func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
- func (env *Environment) DeleteStream(streamName string) error
- func (env *Environment) DeleteSuperStream(superStreamName string) error
- func (env *Environment) IsClosed() bool
- func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
- func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
- func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, ...) (*SuperStreamConsumer, error)
- func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
- func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
- func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
- func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
- func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
- func (env *Environment) StreamExists(streamName string) (bool, error)
- func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
- func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
- type EnvironmentOptions
- func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
- func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
- type Event
- type FilterValue
- type HashRoutingStrategy
- type HeartBeat
- type KeyRoutingStrategy
- type MessagesHandler
- type OffsetSpecification
- func (o OffsetSpecification) First() OffsetSpecification
- func (o OffsetSpecification) Last() OffsetSpecification
- func (o OffsetSpecification) LastConsumed() OffsetSpecification
- func (o OffsetSpecification) Next() OffsetSpecification
- func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
- func (o OffsetSpecification) String() string
- func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
- type PPartitionClose
- type PPartitionContext
- type PartitionPublishConfirm
- type PartitionsOptions
- func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
- func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
- func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
- type PostFilter
- type Producer
- func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
- func (producer *Producer) Close() error
- func (producer *Producer) GetBroker() *Broker
- func (producer *Producer) GetID() uint8
- func (producer *Producer) GetLastPublishingId() (int64, error)
- func (producer *Producer) GetName() string
- func (producer *Producer) GetOptions() *ProducerOptions
- func (producer *Producer) GetStreamName() string
- func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
- func (producer *Producer) NotifyClose() ChannelClose
- func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
- func (producer *Producer) Send(streamMessage message.StreamMessage) error
- type ProducerFilter
- type ProducerOptions
- func (po *ProducerOptions) IsFilterEnabled() bool
- func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
- func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
- func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
- func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
- func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
- func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
- func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
- func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
- type PublishFilter
- type ReaderProtocol
- type Response
- type RoutingStrategy
- type SaslConfiguration
- type SingleActiveConsumer
- type StreamMetadata
- type StreamOptions
- type StreamStats
- type StreamsMetadata
- type SuperStreamConsumer
- type SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
- func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
- type SuperStreamOptions
- type SuperStreamProducer
- func (s *SuperStreamProducer) Close() error
- func (s *SuperStreamProducer) ConnectPartition(partition string) error
- func (s *SuperStreamProducer) GetPartitions() []string
- func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
- func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
- func (s *SuperStreamProducer) Send(message message.StreamMessage) error
- type SuperStreamProducerOptions
- type TCPParameters
- type TuneState
- type Version
Constants ¶
const ( ClientVersion = "1.4.2" CommandDeletePublisher = 6 CommandQueryOffset = 11 CommandUnsubscribe = 12 CommandMetadataUpdate = 16 CommandClose = 22 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" SocketClosed = "socket client closed" MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" StreamTcpPort = "5552" )
const ( SaslConfigurationPlain = "PLAIN" SaslConfigurationExternal = "EXTERNAL" )
const ( UnitMb = "mb" UnitKb = "kb" UnitGb = "gb" UnitTb = "tb" )
const SEED = 104729
Variables ¶
var AlreadyClosed = errors.New("Already Closed")
var AuthenticationFailure = errors.New("Authentication Failure")
var AuthenticationFailureLoopbackError = errors.New("Authentication Failure Loopback Error")
var CodeAccessRefused = errors.New("Resources Access Refused")
var ConfirmationTimoutError = errors.New("Confirmation Timeout Error")
var ConnectionClosed = errors.New("Can't Send the message, connection closed")
var ErrEnvironmentNotDefined = errors.New("Environment not defined")
var ErrMessageRouteNotFound = errors.New("Message Route not found for the message key")
var ErrProducerNotFound = errors.New("Producer not found in the SuperStream Producer")
var ErrSuperStreamConsumerOptionsNotDefined = errors.New("SuperStreamConsumerOptions not defined.")
var ErrSuperStreamProducerOptionsNotDefined = errors.New("SuperStreamProducerOptions not defined. The SuperStreamProducerOptions is mandatory with the RoutingStrategy")
var FilterNotSupported = errors.New("Filtering is not supported by the broker " +
"(requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
var InternalError = errors.New("Internal Error")
var LeaderNotReady = errors.New("Leader not Ready yet")
var OffsetNotFoundError = errors.New("Offset not found")
var PreconditionFailed = errors.New("Precondition Failed")
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
var SingleActiveConsumerNotSupported = errors.New("Single Active Consumer is not supported by the broker " +
"(requires RabbitMQ 3.11+ and stream_single_active_consumer feature flag activated)")
var StreamAlreadyExists = errors.New("Stream Already Exists")
var StreamDoesNotExist = errors.New("Stream Does Not Exist")
var StreamNotAvailable = errors.New("Stream Not Available")
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
var UnknownFrame = errors.New("Unknown Frame")
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
Functions ¶
func IsVersionGreaterOrEqual ¶ added in v1.3.3
func SetLevelInfo ¶
func SetLevelInfo(value int8)
Types ¶
type AddressResolver ¶
type AutoCommitStrategy ¶
type AutoCommitStrategy struct {
// contains filtered or unexported fields
}
func NewAutoCommitStrategy ¶
func NewAutoCommitStrategy() *AutoCommitStrategy
func (*AutoCommitStrategy) SetCountBeforeStorage ¶
func (ac *AutoCommitStrategy) SetCountBeforeStorage(messageCountBeforeStorage int) *AutoCommitStrategy
func (*AutoCommitStrategy) SetFlushInterval ¶
func (ac *AutoCommitStrategy) SetFlushInterval(flushInterval time.Duration) *AutoCommitStrategy
type BindingsOptions ¶ added in v1.4.0
type BindingsOptions struct { Bindings []string MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewBindingsOptions ¶ added in v1.4.0
func NewBindingsOptions(bindings []string) *BindingsOptions
func (*BindingsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetBalancedLeaderLocator() *BindingsOptions
func (*BindingsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *BindingsOptions) SetClientLocalLocator() *BindingsOptions
func (*BindingsOptions) SetMaxAge ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxAge(maxAge time.Duration) *BindingsOptions
func (*BindingsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *BindingsOptions
func (*BindingsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *BindingsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *BindingsOptions
type Broker ¶
type Brokers ¶
type Brokers struct {
// contains filtered or unexported fields
}
type ByteCapacity ¶
type ByteCapacity struct {
// contains filtered or unexported fields
}
func (ByteCapacity) B ¶
func (byteCapacity ByteCapacity) B(value int64) *ByteCapacity
func (ByteCapacity) From ¶
func (byteCapacity ByteCapacity) From(value string) *ByteCapacity
func (ByteCapacity) GB ¶
func (byteCapacity ByteCapacity) GB(value int64) *ByteCapacity
func (ByteCapacity) KB ¶
func (byteCapacity ByteCapacity) KB(value int64) *ByteCapacity
func (ByteCapacity) MB ¶
func (byteCapacity ByteCapacity) MB(value int64) *ByteCapacity
func (ByteCapacity) TB ¶
func (byteCapacity ByteCapacity) TB(value int64) *ByteCapacity
type CPartitionClose ¶ added in v1.4.0
type CPartitionClose struct { Partition string Event Event Context CPartitionContext }
CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed The user can use the NotifyPartitionClose to get the channel
type CPartitionContext ¶ added in v1.4.0
type CPartitionContext interface {
ConnectPartition(partition string, offset OffsetSpecification) error
}
CPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the CPartitionContext to reconnect a partition to the SuperStreamConsumer Specifying the offset to start from
type ChannelClose ¶
type ChannelClose = <-chan Event
type ChannelPublishConfirm ¶
type ChannelPublishConfirm chan []*ConfirmationStatus
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) BrokerForConsumer ¶
func (*Client) DeclarePublisher ¶
func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error)
func (*Client) DeclareStream ¶
func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
func (*Client) DeclareSubscriber ¶
func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Client) DeclareSuperStream ¶ added in v1.4.0
func (c *Client) DeclareSuperStream(superStream string, options SuperStreamOptions) error
func (*Client) DeleteStream ¶
func (*Client) DeleteSuperStream ¶ added in v1.4.0
func (*Client) QueryPartitions ¶ added in v1.4.0
func (*Client) StreamExists ¶
func (*Client) StreamStats ¶ added in v1.1.0
func (c *Client) StreamStats(streamName string) (*StreamStats, error)
type ClientProperties ¶
type ClientProperties struct {
// contains filtered or unexported fields
}
type Compression ¶
type Compression struct {
// contains filtered or unexported fields
}
func (Compression) Gzip ¶
func (compression Compression) Gzip() Compression
func (Compression) Lz4 ¶
func (compression Compression) Lz4() Compression
func (Compression) None ¶
func (compression Compression) None() Compression
func (Compression) Snappy ¶
func (compression Compression) Snappy() Compression
func (Compression) String ¶
func (compression Compression) String() string
func (Compression) Zstd ¶
func (compression Compression) Zstd() Compression
type ConfirmationStatus ¶
type ConfirmationStatus struct {
// contains filtered or unexported fields
}
func (*ConfirmationStatus) GetError ¶
func (cs *ConfirmationStatus) GetError() error
func (*ConfirmationStatus) GetErrorCode ¶
func (cs *ConfirmationStatus) GetErrorCode() uint16
func (*ConfirmationStatus) GetMessage ¶
func (cs *ConfirmationStatus) GetMessage() message.StreamMessage
func (*ConfirmationStatus) GetProducerID ¶
func (cs *ConfirmationStatus) GetProducerID() uint8
func (*ConfirmationStatus) GetPublishingId ¶
func (cs *ConfirmationStatus) GetPublishingId() int64
func (*ConfirmationStatus) IsConfirmed ¶
func (cs *ConfirmationStatus) IsConfirmed() bool
func (*ConfirmationStatus) LinkedMessages ¶
func (cs *ConfirmationStatus) LinkedMessages() []*ConfirmationStatus
type ConnectionProperties ¶
type ConnectionProperties struct {
// contains filtered or unexported fields
}
type Consumer ¶
type Consumer struct { ID uint8 MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*Consumer) GetCloseHandler ¶ added in v1.1.2
func (*Consumer) GetLastStoredOffset ¶
func (*Consumer) GetStreamName ¶
func (*Consumer) NotifyClose ¶
func (consumer *Consumer) NotifyClose() ChannelClose
func (*Consumer) QueryOffset ¶
func (*Consumer) StoreCustomOffset ¶
func (*Consumer) StoreOffset ¶
type ConsumerContext ¶
type ConsumerContext struct { Consumer *Consumer // contains filtered or unexported fields }
func (ConsumerContext) GetEntriesCount ¶ added in v1.3.2
func (cc ConsumerContext) GetEntriesCount() uint16
type ConsumerFilter ¶ added in v1.3.3
type ConsumerFilter struct { Values []string MatchUnfiltered bool PostFilter PostFilter }
func NewConsumerFilter ¶ added in v1.3.3
func NewConsumerFilter(values []string, matchUnfiltered bool, postFilter PostFilter) *ConsumerFilter
type ConsumerOptions ¶
type ConsumerOptions struct { ConsumerName string Offset OffsetSpecification CRCCheck bool ClientProvidedName string Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer // contains filtered or unexported fields }
func NewConsumerOptions ¶
func NewConsumerOptions() *ConsumerOptions
func (*ConsumerOptions) IsFilterEnabled ¶ added in v1.3.3
func (c *ConsumerOptions) IsFilterEnabled() bool
func (*ConsumerOptions) IsSingleActiveConsumerEnabled ¶ added in v1.4.0
func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool
func (*ConsumerOptions) SetAutoCommit ¶
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions
func (*ConsumerOptions) SetCRCCheck ¶
func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions
func (*ConsumerOptions) SetClientProvidedName ¶ added in v1.3.1
func (c *ConsumerOptions) SetClientProvidedName(clientProvidedName string) *ConsumerOptions
func (*ConsumerOptions) SetConsumerName ¶
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions
func (*ConsumerOptions) SetFilter ¶ added in v1.3.3
func (c *ConsumerOptions) SetFilter(filter *ConsumerFilter) *ConsumerOptions
func (*ConsumerOptions) SetInitialCredits ¶ added in v1.0.2
func (c *ConsumerOptions) SetInitialCredits(initialCredits int16) *ConsumerOptions
func (*ConsumerOptions) SetManualCommit ¶
func (c *ConsumerOptions) SetManualCommit() *ConsumerOptions
func (*ConsumerOptions) SetOffset ¶
func (c *ConsumerOptions) SetOffset(offset OffsetSpecification) *ConsumerOptions
func (*ConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *ConsumerOptions
type ConsumerUpdate ¶ added in v1.4.0
type ConsumerUpdate func(streamName string, isActive bool) OffsetSpecification
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator() *Coordinator
func (*Coordinator) ConsumersCount ¶
func (coordinator *Coordinator) ConsumersCount() int
func (*Coordinator) GetConsumerById ¶
func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, error)
func (*Coordinator) GetProducerById ¶
func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, error)
func (*Coordinator) GetResponseById ¶
func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error)
func (*Coordinator) GetResponseByName ¶
func (coordinator *Coordinator) GetResponseByName(id string) (*Response, error)
func (*Coordinator) NewConsumer ¶
func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, parameters *ConsumerOptions) *Consumer
Consumer functions
func (*Coordinator) NewProducer ¶
func (coordinator *Coordinator) NewProducer( parameters *ProducerOptions) (*Producer, error)
producersEnvironment
func (*Coordinator) NewResponse ¶
func (coordinator *Coordinator) NewResponse(commandId uint16, info ...string) *Response
func (*Coordinator) NewResponseWitName ¶
func (coordinator *Coordinator) NewResponseWitName(id string) *Response
func (*Coordinator) Producers ¶
func (coordinator *Coordinator) Producers() map[interface{}]interface{}
func (*Coordinator) ProducersCount ¶
func (coordinator *Coordinator) ProducersCount() int
func (*Coordinator) RemoveConsumerById ¶
func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error
func (*Coordinator) RemoveProducerById ¶
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
func (*Coordinator) RemoveResponseById ¶
func (coordinator *Coordinator) RemoveResponseById(id interface{}) error
func (*Coordinator) RemoveResponseByName ¶
func (coordinator *Coordinator) RemoveResponseByName(id string) error
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(options *EnvironmentOptions) (*Environment, error)
func (*Environment) Close ¶
func (env *Environment) Close() error
func (*Environment) DeclareStream ¶
func (env *Environment) DeclareStream(streamName string, options *StreamOptions) error
func (*Environment) DeclareSuperStream ¶ added in v1.4.0
func (env *Environment) DeclareSuperStream(superStreamName string, options SuperStreamOptions) error
func (*Environment) DeleteStream ¶
func (env *Environment) DeleteStream(streamName string) error
func (*Environment) DeleteSuperStream ¶ added in v1.4.0
func (env *Environment) DeleteSuperStream(superStreamName string) error
func (*Environment) IsClosed ¶
func (env *Environment) IsClosed() bool
func (*Environment) NewConsumer ¶
func (env *Environment) NewConsumer(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error)
func (*Environment) NewProducer ¶
func (env *Environment) NewProducer(streamName string, producerOptions *ProducerOptions) (*Producer, error)
func (*Environment) NewSuperStreamConsumer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamConsumer(superStream string, messagesHandler MessagesHandler, options *SuperStreamConsumerOptions) (*SuperStreamConsumer, error)
func (*Environment) NewSuperStreamProducer ¶ added in v1.4.0
func (env *Environment) NewSuperStreamProducer(superStream string, superStreamProducerOptions *SuperStreamProducerOptions) (*SuperStreamProducer, error)
func (*Environment) QueryOffset ¶
func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error)
func (*Environment) QueryPartitions ¶ added in v1.4.0
func (env *Environment) QueryPartitions(superStreamName string) ([]string, error)
func (*Environment) QueryRoute ¶ added in v1.4.0
func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error)
func (*Environment) QuerySequence ¶
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error)
QuerySequence gets the last id stored for a producer you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
func (*Environment) StreamExists ¶
func (env *Environment) StreamExists(streamName string) (bool, error)
func (*Environment) StreamMetaData ¶
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error)
func (*Environment) StreamStats ¶ added in v1.1.0
func (env *Environment) StreamStats(streamName string) (*StreamStats, error)
type EnvironmentOptions ¶
type EnvironmentOptions struct { ConnectionParameters []*Broker TCPParameters *TCPParameters SaslConfiguration *SaslConfiguration MaxProducersPerClient int MaxConsumersPerClient int AddressResolver *AddressResolver }
func NewEnvironmentOptions ¶
func NewEnvironmentOptions() *EnvironmentOptions
func (*EnvironmentOptions) IsTLS ¶
func (envOptions *EnvironmentOptions) IsTLS(val bool) *EnvironmentOptions
func (*EnvironmentOptions) SetAddressResolver ¶
func (envOptions *EnvironmentOptions) SetAddressResolver(addressResolver AddressResolver) *EnvironmentOptions
func (*EnvironmentOptions) SetHost ¶
func (envOptions *EnvironmentOptions) SetHost(host string) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxConsumersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxConsumersPerClient(maxConsumersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetMaxProducersPerClient ¶
func (envOptions *EnvironmentOptions) SetMaxProducersPerClient(maxProducersPerClient int) *EnvironmentOptions
func (*EnvironmentOptions) SetNoDelay ¶
func (envOptions *EnvironmentOptions) SetNoDelay(noDelay bool) *EnvironmentOptions
func (*EnvironmentOptions) SetPassword ¶
func (envOptions *EnvironmentOptions) SetPassword(password string) *EnvironmentOptions
func (*EnvironmentOptions) SetPort ¶
func (envOptions *EnvironmentOptions) SetPort(port int) *EnvironmentOptions
func (*EnvironmentOptions) SetReadBuffer ¶
func (envOptions *EnvironmentOptions) SetReadBuffer(readBuffer int) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedHeartbeat ¶
func (envOptions *EnvironmentOptions) SetRequestedHeartbeat(requestedHeartbeat time.Duration) *EnvironmentOptions
func (*EnvironmentOptions) SetRequestedMaxFrameSize ¶
func (envOptions *EnvironmentOptions) SetRequestedMaxFrameSize(requestedMaxFrameSize int) *EnvironmentOptions
func (*EnvironmentOptions) SetSaslConfiguration ¶ added in v1.2.0
func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions
func (*EnvironmentOptions) SetTLSConfig ¶
func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions
func (*EnvironmentOptions) SetUri ¶
func (envOptions *EnvironmentOptions) SetUri(uri string) *EnvironmentOptions
func (*EnvironmentOptions) SetUris ¶
func (envOptions *EnvironmentOptions) SetUris(uris []string) *EnvironmentOptions
func (*EnvironmentOptions) SetUser ¶
func (envOptions *EnvironmentOptions) SetUser(user string) *EnvironmentOptions
func (*EnvironmentOptions) SetVHost ¶
func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
func (*EnvironmentOptions) SetWriteBuffer ¶
func (envOptions *EnvironmentOptions) SetWriteBuffer(writeBuffer int) *EnvironmentOptions
type FilterValue ¶ added in v1.3.3
type FilterValue func(message message.StreamMessage) string
type HashRoutingStrategy ¶ added in v1.4.0
type HashRoutingStrategy struct {
RoutingKeyExtractor func(message message.StreamMessage) string
}
func NewHashRoutingStrategy ¶ added in v1.4.0
func NewHashRoutingStrategy(routingKeyExtractor func(message message.StreamMessage) string) *HashRoutingStrategy
func (*HashRoutingStrategy) Route ¶ added in v1.4.0
func (h *HashRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*HashRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type KeyRoutingStrategy ¶ added in v1.4.0
type KeyRoutingStrategy struct { // provided by the user to define the key based on a message RoutingKeyExtractor func(message message.StreamMessage) string // contains filtered or unexported fields }
KeyRoutingStrategy is a routing strategy that uses the key of the message
func NewKeyRoutingStrategy ¶ added in v1.4.0
func NewKeyRoutingStrategy( routingKeyExtractor func(message message.StreamMessage) string) *KeyRoutingStrategy
func (*KeyRoutingStrategy) Route ¶ added in v1.4.0
func (k *KeyRoutingStrategy) Route(message message.StreamMessage, partitions []string) ([]string, error)
func (*KeyRoutingStrategy) SetRouteParameters ¶ added in v1.4.0
type MessagesHandler ¶
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
type OffsetSpecification ¶
type OffsetSpecification struct {
// contains filtered or unexported fields
}
func (OffsetSpecification) First ¶
func (o OffsetSpecification) First() OffsetSpecification
func (OffsetSpecification) Last ¶
func (o OffsetSpecification) Last() OffsetSpecification
func (OffsetSpecification) LastConsumed ¶
func (o OffsetSpecification) LastConsumed() OffsetSpecification
func (OffsetSpecification) Next ¶
func (o OffsetSpecification) Next() OffsetSpecification
func (OffsetSpecification) Offset ¶
func (o OffsetSpecification) Offset(offset int64) OffsetSpecification
func (OffsetSpecification) String ¶
func (o OffsetSpecification) String() string
func (OffsetSpecification) Timestamp ¶
func (o OffsetSpecification) Timestamp(offset int64) OffsetSpecification
type PPartitionClose ¶ added in v1.4.0
type PPartitionClose struct { Partition string Event Event Context PPartitionContext }
PPartitionClose is a struct that is used to notify the user when a partition from a producer is closed The user can use the NotifyPartitionClose to get the channel
type PPartitionContext ¶ added in v1.4.0
PPartitionContext is an interface that is used to expose partition information and methods to the user. The user can use the PPartitionContext to reconnect a partition to the SuperStreamProducer
type PartitionPublishConfirm ¶ added in v1.4.0
type PartitionPublishConfirm struct { Partition string ConfirmationStatus []*ConfirmationStatus }
PartitionPublishConfirm is a struct that is used to notify the user when a message is confirmed or not per partition The user can use the NotifyPublishConfirmation to get the channel
type PartitionsOptions ¶ added in v1.4.0
type PartitionsOptions struct { Partitions int MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity LeaderLocator string // contains filtered or unexported fields }
func NewPartitionsOptions ¶ added in v1.4.0
func NewPartitionsOptions(partitions int) *PartitionsOptions
func (*PartitionsOptions) SetBalancedLeaderLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetBalancedLeaderLocator() *PartitionsOptions
func (*PartitionsOptions) SetClientLocalLocator ¶ added in v1.4.0
func (t *PartitionsOptions) SetClientLocalLocator() *PartitionsOptions
func (*PartitionsOptions) SetMaxAge ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxAge(maxAge time.Duration) *PartitionsOptions
func (*PartitionsOptions) SetMaxLengthBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxLengthBytes(maxLengthBytes *ByteCapacity) *PartitionsOptions
func (*PartitionsOptions) SetMaxSegmentSizeBytes ¶ added in v1.4.0
func (t *PartitionsOptions) SetMaxSegmentSizeBytes(maxSegmentSizeBytes *ByteCapacity) *PartitionsOptions
type PostFilter ¶ added in v1.3.3
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) BatchSend ¶
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
func (*Producer) GetLastPublishingId ¶
func (*Producer) GetOptions ¶
func (producer *Producer) GetOptions() *ProducerOptions
func (*Producer) GetStreamName ¶
func (*Producer) GetUnConfirmed ¶
func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus
func (*Producer) NotifyClose ¶
func (producer *Producer) NotifyClose() ChannelClose
func (*Producer) NotifyPublishConfirmation ¶
func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm
type ProducerFilter ¶ added in v1.3.3
type ProducerFilter struct {
FilterValue FilterValue
}
func NewProducerFilter ¶ added in v1.3.3
func NewProducerFilter(filterValue FilterValue) *ProducerFilter
type ProducerOptions ¶
type ProducerOptions struct { Name string // Producer name, it is useful to handle deduplication messages QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput BatchPublishingDelay int // Period to Send a batch of messages. SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id Compression Compression // Compression type, it is valid only if SubEntrySize > 1 ConfirmationTimeOut time.Duration // Time to wait for the confirmation ClientProvidedName string // Client provider name that will be shown in the management UI Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil // contains filtered or unexported fields }
func NewProducerOptions ¶
func NewProducerOptions() *ProducerOptions
func (*ProducerOptions) IsFilterEnabled ¶ added in v1.3.3
func (po *ProducerOptions) IsFilterEnabled() bool
func (*ProducerOptions) SetBatchPublishingDelay ¶
func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions
func (*ProducerOptions) SetBatchSize ¶
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions
func (*ProducerOptions) SetClientProvidedName ¶ added in v1.3.1
func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions
func (*ProducerOptions) SetCompression ¶
func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions
func (*ProducerOptions) SetConfirmationTimeOut ¶ added in v1.3.1
func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions
func (*ProducerOptions) SetFilter ¶ added in v1.3.3
func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions
func (*ProducerOptions) SetProducerName ¶
func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions
func (*ProducerOptions) SetQueueSize ¶
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions
func (*ProducerOptions) SetSubEntrySize ¶
func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions
type PublishFilter ¶ added in v1.3.3
type PublishFilter struct { }
func (PublishFilter) GetCommandKey ¶ added in v1.3.3
func (p PublishFilter) GetCommandKey() uint16
func (PublishFilter) GetMaxVersion ¶ added in v1.3.3
func (p PublishFilter) GetMaxVersion() uint16
func (PublishFilter) GetMinVersion ¶ added in v1.3.3
func (p PublishFilter) GetMinVersion() uint16
type ReaderProtocol ¶
type RoutingStrategy ¶ added in v1.4.0
type RoutingStrategy interface { //Route Based on the message and the partitions the routing strategy returns the partitions where the message should be sent // It could be zero, one or more partitions Route(message message.StreamMessage, partitions []string) ([]string, error) // SetRouteParameters is useful for the routing key strategies to set the query route function // or in general to set the parameters needed by the routing strategy SetRouteParameters(superStream string, queryRoute func(superStream string, routingKey string) ([]string, error)) }
type SaslConfiguration ¶ added in v1.2.0
type SaslConfiguration struct {
Mechanism string
}
type SingleActiveConsumer ¶ added in v1.4.0
type SingleActiveConsumer struct { Enabled bool // ConsumerUpdate is the function that will be called when the consumer is promoted // that is when the consumer is active. The function will receive a boolean that is true // the user can decide to return a new offset to start from. ConsumerUpdate ConsumerUpdate // contains filtered or unexported fields }
func NewSingleActiveConsumer ¶ added in v1.4.0
func NewSingleActiveConsumer(ConsumerUpdate ConsumerUpdate) *SingleActiveConsumer
func (*SingleActiveConsumer) SetEnabled ¶ added in v1.4.0
func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer
type StreamMetadata ¶
type StreamMetadata struct { Leader *Broker Replicas []*Broker // contains filtered or unexported fields }
func (StreamMetadata) New ¶
func (StreamMetadata) New(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (StreamMetadata) String ¶
func (sm StreamMetadata) String() string
type StreamOptions ¶
type StreamOptions struct { MaxAge time.Duration MaxLengthBytes *ByteCapacity MaxSegmentSizeBytes *ByteCapacity }
func NewStreamOptions ¶
func NewStreamOptions() *StreamOptions
func (*StreamOptions) SetMaxAge ¶
func (s *StreamOptions) SetMaxAge(maxAge time.Duration) *StreamOptions
func (*StreamOptions) SetMaxLengthBytes ¶
func (s *StreamOptions) SetMaxLengthBytes(maxLength *ByteCapacity) *StreamOptions
func (*StreamOptions) SetMaxSegmentSizeBytes ¶
func (s *StreamOptions) SetMaxSegmentSizeBytes(segmentSize *ByteCapacity) *StreamOptions
type StreamStats ¶ added in v1.1.0
type StreamStats struct {
// contains filtered or unexported fields
}
func (*StreamStats) CommittedChunkId ¶ added in v1.1.0
func (s *StreamStats) CommittedChunkId() (int64, error)
CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
It is the offset of the first message in the last chunk confirmed by a quorum of the stream cluster members (leader and replicas). The committed chunk ID is a good indication of what the last offset of a stream can be at a given time. The value can be stale as soon as the application reads it though, as the committed chunk ID for a stream that is published to changes all the time. return committed offset in this stream Error if there is no committed chunk yet
func (*StreamStats) FirstOffset ¶ added in v1.1.0
func (s *StreamStats) FirstOffset() (int64, error)
FirstOffset - The first offset in the stream. return first offset in the stream / Error if there is no first offset yet
func (*StreamStats) LastOffset ¶ added in v1.1.0
func (s *StreamStats) LastOffset() (int64, error)
LastOffset - The last offset in the stream. return last offset in the stream error if there is no first offset yet
type StreamsMetadata ¶
type StreamsMetadata struct {
// contains filtered or unexported fields
}
func (*StreamsMetadata) Add ¶
func (smd *StreamsMetadata) Add(stream string, responseCode uint16, leader *Broker, replicas []*Broker) *StreamMetadata
func (*StreamsMetadata) Get ¶
func (smd *StreamsMetadata) Get(stream string) *StreamMetadata
func (StreamsMetadata) New ¶
func (StreamsMetadata) New() *StreamsMetadata
type SuperStreamConsumer ¶ added in v1.4.0
type SuperStreamConsumer struct { SuperStream string SuperStreamConsumerOptions *SuperStreamConsumerOptions MessagesHandler MessagesHandler // contains filtered or unexported fields }
func (*SuperStreamConsumer) Close ¶ added in v1.4.0
func (s *SuperStreamConsumer) Close() error
func (*SuperStreamConsumer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSpecification) error
func (*SuperStreamConsumer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamConsumer) NotifyPartitionClose(size int) chan CPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
type SuperStreamConsumerOptions ¶ added in v1.4.0
type SuperStreamConsumerOptions struct { ClientProvidedName string Offset OffsetSpecification Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer ConsumerName string }
func NewSuperStreamConsumerOptions ¶ added in v1.4.0
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetClientProvidedName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetConsumerName ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetFilter ¶ added in v1.4.2
func (s *SuperStreamConsumerOptions) SetFilter(filter *ConsumerFilter) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetOffset ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetOffset(offset OffsetSpecification) *SuperStreamConsumerOptions
func (*SuperStreamConsumerOptions) SetSingleActiveConsumer ¶ added in v1.4.0
func (s *SuperStreamConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleActiveConsumer) *SuperStreamConsumerOptions
type SuperStreamOptions ¶ added in v1.4.0
type SuperStreamOptions interface {
// contains filtered or unexported methods
}
type SuperStreamProducer ¶ added in v1.4.0
type SuperStreamProducer struct { // public SuperStream string SuperStreamProducerOptions *SuperStreamProducerOptions // contains filtered or unexported fields }
func (*SuperStreamProducer) Close ¶ added in v1.4.0
func (s *SuperStreamProducer) Close() error
func (*SuperStreamProducer) ConnectPartition ¶ added in v1.4.0
func (s *SuperStreamProducer) ConnectPartition(partition string) error
ConnectPartition connects a partition to the SuperStreamProducer part of PPartitionContext interface The super stream producer is a producer that can send messages to multiple partitions that are hidden to the user. with the ConnectPartition the user can re-connect a partition to the SuperStreamProducer that should be used only in case of disconnection
func (*SuperStreamProducer) GetPartitions ¶ added in v1.4.0
func (s *SuperStreamProducer) GetPartitions() []string
func (*SuperStreamProducer) NotifyPartitionClose ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPartitionClose(size int) chan PPartitionClose
NotifyPartitionClose returns a channel that will be notified when a partition is closed Event will give the reason of the close size is the size of the channel
func (*SuperStreamProducer) NotifyPublishConfirmation ¶ added in v1.4.0
func (s *SuperStreamProducer) NotifyPublishConfirmation(size int) chan PartitionPublishConfirm
NotifyPublishConfirmation returns a channel that will be notified when a message is confirmed or not per partition size is the size of the channel
func (*SuperStreamProducer) Send ¶ added in v1.4.0
func (s *SuperStreamProducer) Send(message message.StreamMessage) error
Send sends a message to the partitions based on the routing strategy
type SuperStreamProducerOptions ¶ added in v1.4.0
type SuperStreamProducerOptions struct { RoutingStrategy RoutingStrategy ClientProvidedName string Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil }
func NewSuperStreamProducerOptions ¶ added in v1.4.0
func NewSuperStreamProducerOptions(routingStrategy RoutingStrategy) *SuperStreamProducerOptions
NewSuperStreamProducerOptions creates a new SuperStreamProducerOptions The RoutingStrategy is mandatory
func (SuperStreamProducerOptions) SetClientProvidedName ¶ added in v1.4.0
func (o SuperStreamProducerOptions) SetClientProvidedName(clientProvidedName string) *SuperStreamProducerOptions
func (SuperStreamProducerOptions) SetFilter ¶ added in v1.4.2
func (o SuperStreamProducerOptions) SetFilter(filter *ProducerFilter) *SuperStreamProducerOptions
type TCPParameters ¶
Source Files ¶
- aggregation.go
- available_features.go
- brokers.go
- buffer_reader.go
- buffer_writer.go
- client.go
- constants.go
- consumer.go
- converters.go
- coordinator.go
- environment.go
- exchange_commands.go
- listeners.go
- producer.go
- server_frame.go
- socket.go
- stream_options.go
- stream_stats.go
- super_stream.go
- super_stream_consumer.go
- super_stream_producer.go
- utils.go