Documentation ¶
Index ¶
- func DefaultConfig() *sarama.Config
- type Publisher
- type PublisherOption
- type Subscriber
- func (s *Subscriber) Cleanup(sess sarama.ConsumerGroupSession) error
- func (s *Subscriber) Close() error
- func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Subscriber) Listen(ctx context.Context) (err error)
- func (s *Subscriber) Setup(sess sarama.ConsumerGroupSession) error
- func (s *Subscriber) Subscribe(streamName string, h ...stream.EventHandler)
- type SubscriberOption
- func WithSubscriberBeforeFunc(fn func(*sarama.ConsumerMessage) (bool, error)) SubscriberOption
- func WithSubscriberCleanupFunc(fn func(sarama.ConsumerGroupSession) error) SubscriberOption
- func WithSubscriberCodec(codec event.Encoding) SubscriberOption
- func WithSubscriberContextFunc(fn func(context.Context) context.Context) SubscriberOption
- func WithSubscriberDeduplicator(d eventbus.Deduplicator) SubscriberOption
- func WithSubscriberErrorHandler(fn func(*event.Event, error)) SubscriberOption
- func WithSubscriberExitFunc(fn func()) SubscriberOption
- func WithSubscriberGroupName(groupName string) SubscriberOption
- func WithSubscriberSetupFunc(fn func(sarama.ConsumerGroupSession) error) SubscriberOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultConfig ¶
Types ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher( addr []string, conf *sarama.Config, opts ...PublisherOption, ) *Publisher
type PublisherOption ¶
type PublisherOption func(*Publisher)
func WithPublisherCodec ¶
func WithPublisherCodec(codec event.Encoding) PublisherOption
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( addr []string, conf *sarama.Config, opts ...SubscriberOption, ) *Subscriber
func (*Subscriber) Cleanup ¶
func (s *Subscriber) Cleanup(sess sarama.ConsumerGroupSession) error
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
func (*Subscriber) ConsumeClaim ¶
func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*Subscriber) Setup ¶
func (s *Subscriber) Setup(sess sarama.ConsumerGroupSession) error
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe(streamName string, h ...stream.EventHandler)
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
func WithSubscriberBeforeFunc ¶
func WithSubscriberBeforeFunc(fn func(*sarama.ConsumerMessage) (bool, error)) SubscriberOption
func WithSubscriberCleanupFunc ¶
func WithSubscriberCleanupFunc(fn func(sarama.ConsumerGroupSession) error) SubscriberOption
func WithSubscriberCodec ¶
func WithSubscriberCodec(codec event.Encoding) SubscriberOption
func WithSubscriberContextFunc ¶
func WithSubscriberContextFunc(fn func(context.Context) context.Context) SubscriberOption
func WithSubscriberDeduplicator ¶
func WithSubscriberDeduplicator(d eventbus.Deduplicator) SubscriberOption
func WithSubscriberErrorHandler ¶
func WithSubscriberErrorHandler(fn func(*event.Event, error)) SubscriberOption
func WithSubscriberExitFunc ¶
func WithSubscriberExitFunc(fn func()) SubscriberOption
func WithSubscriberGroupName ¶
func WithSubscriberGroupName(groupName string) SubscriberOption
func WithSubscriberSetupFunc ¶
func WithSubscriberSetupFunc(fn func(sarama.ConsumerGroupSession) error) SubscriberOption
Click to show internal directories.
Click to hide internal directories.