kafkashopifysarama

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadTLSConfig

func LoadTLSConfig(cerfFilePath, keyFilePath, caFilePath string) (*tls.Config, error)

LoadTLSConfig loads the TLS config from the given folder.

func NewConsumerGroup

func NewConsumerGroup(
	tlsConfig *tls.Config,
	clientID string,
	sessionTimeoutMS int,
	heartbeatIntervalMS int,
	brokers []string,
	groupID string,
) (sarama.ConsumerGroup, error)

NewConsumerGroup generates a new kafka consumer to be used by the subscriber, allowing for dependency injection for testing with a Sarama mock.

func NewTLSSubscriberConfig

func NewTLSSubscriberConfig(tlsCfg *tls.Config) *sarama.Config

NewTLSSubscriberConfig creates a new kafka subscriber config with TLS authentication.

Types

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a kafka subscriber.

func NewSubscriber

func NewSubscriber(
	slogHandler slog.Handler,
	consumerGroup sarama.ConsumerGroup,
) (*Subscriber, error)

NewSubscriber creates a new kafka subscriber.

func (Subscriber) Subscribe

func (s Subscriber) Subscribe(channels ...string) (pubsub.Subscription[[]byte], error)

Subscribe creates a new subscription that runs in the background.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription represents a stream of events published to a kafka topic.

func (*Subscription) C

func (s *Subscription) C() <-chan pubsub.Event[[]byte]

C returns a receive-only go channel of events published.

func (*Subscription) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.

func (*Subscription) Close

func (s *Subscription) Close() error

Close closes the subscription.

func (*Subscription) ConsumeClaim

func (s *Subscription) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Subscription) Setup

func (s *Subscription) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL