Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SubscribeNonBlocking ¶
func SubscribeNonBlocking(ctx context.Context, options SubscribeOptions, subscriber Subscriber, wg *sync.WaitGroup, consumerCount int) chan error
SubscribeNonBlocking allows you to start many consumers with the same SubscribeOptions, in a non-blocking way. If nothing is reading from errChan this function will be blocked and you will not be notified if any consumers weren't started.
Types ¶
type Message ¶
type Message interface { // Type returns the type of the message. Type() string // Payload returns the message payload. Payload() ([]byte, error) // WithPayload validates and sets the given payload on the message. WithPayload(payload []byte) error }
Message is a message that can be published and consumed.
type ObservedPublisher ¶
type ObservedPublisher struct {
// contains filtered or unexported fields
}
ObservedPublisher can be used to test the way an application publishes messages.
func NewObservedPublisher ¶
func NewObservedPublisher() *ObservedPublisher
NewObservedPublisher returns a new publisher that can be used when testing.
func (*ObservedPublisher) Clear ¶
func (p *ObservedPublisher) Clear() error
Clear clears out any stored messages.
func (*ObservedPublisher) Close ¶
func (p *ObservedPublisher) Close() error
Close performs no action.
func (*ObservedPublisher) Messages ¶
func (p *ObservedPublisher) Messages() []Message
Messages returns all stored messages.
func (*ObservedPublisher) Publish ¶
func (p *ObservedPublisher) Publish(m Message) error
Publish stores the given message internally so it can be retrieved later on.
type Publisher ¶
type Publisher interface { // Publish publishes the given message. // If an error is returned, the message has not been published. Publish(m Message) error // Close closes any open connections. Close() error }
Publisher can be used to publish messages.
func NewKafkaPublisher ¶
NewKafkaPublisher returns a Publisher that will publish messages to kafka.
type SubscribeOptions ¶
type SubscribeOptions struct { // ConsumeFn is the function to handle the consumed messages. ConsumeFn ConsumeFn // A message will only be consumed once per group. Group string // Types is the set of messages types to subscribe the ConsumeFn to. Types []string // IgnoreErrors defines whether or not errors returned from ConsumeFn will be written to Errors. // If this is false, a value must be provided for Errors. IgnoreErrors bool // Errors will receive any errors returned from ConsumeFn, if IgnoreErrors is false. Errors chan<- error }
SubscribeArgs contains a set of arguments used when Subscribing to Messages.
func (*SubscribeOptions) Validate ¶
func (x *SubscribeOptions) Validate() error
Validate makes sure we have a set of valid options and applies defaults.
type Subscriber ¶
type Subscriber interface { // Subscribe starts a consumer with the given context. // // If an error is returned then the consumer has not been started, otherwise you should listen // on the errChan and handle any consumer errors. // // The consumer will be stopped when the given context is cancelled. Subscribe(ctx context.Context, options SubscribeOptions) error }
Subscriber defines an interface that can be used to consume messages.
func NewKafkaSubscriber ¶
func NewKafkaSubscriber(brokers []string) Subscriber
NewKafkaSubscriber returns a Subscriber that will consume from kafka.