Documentation ¶
Index ¶
- func LoadTLSConfig(cerfFilePath, keyFilePath, caFilePath string) (*tls.Config, error)
- func NewConsumerGroup(tlsConfig *tls.Config, clientID string, sessionTimeoutMS int, ...) (sarama.ConsumerGroup, error)
- func NewTLSSubscriberConfig(tlsCfg *tls.Config) *sarama.Config
- type Subscriber
- type Subscription
- func (s *Subscription) C() <-chan pubsub.Event[[]byte]
- func (s *Subscription) Cleanup(sarama.ConsumerGroupSession) error
- func (s *Subscription) Close() error
- func (s *Subscription) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *Subscription) Setup(session sarama.ConsumerGroupSession) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LoadTLSConfig ¶
LoadTLSConfig loads the TLS config from the given folder.
func NewConsumerGroup ¶
func NewConsumerGroup( tlsConfig *tls.Config, clientID string, sessionTimeoutMS int, heartbeatIntervalMS int, brokers []string, groupID string, ) (sarama.ConsumerGroup, error)
NewConsumerGroup generates a new kafka consumer to be used by the subscriber, allowing for dependency injection for testing with a Sarama mock.
Types ¶
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a kafka subscriber.
func NewSubscriber ¶
func NewSubscriber( slogHandler slog.Handler, consumerGroup sarama.ConsumerGroup, ) (*Subscriber, error)
NewSubscriber creates a new kafka subscriber.
func (Subscriber) Subscribe ¶
func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], error)
Subscribe creates a new subscription that runs in the background.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription represents a stream of events published to a kafka topic.
func (*Subscription) C ¶
func (s *Subscription) C() <-chan pubsub.Event[[]byte]
C returns a receive-only go channel of events published.
func (*Subscription) Cleanup ¶
func (s *Subscription) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (*Subscription) ConsumeClaim ¶
func (s *Subscription) ConsumeClaim( session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim, ) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*Subscription) Setup ¶
func (s *Subscription) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim.