Documentation ¶
Index ¶
- Variables
- func CreateKafkaPublisher(kafkaDSN string) (sarama.SyncProducer, error)
- func CreateKafkaSubscriber(kafkaDSN string, group string) (sarama.ConsumerGroup, error)
- type GroupConsumer
- func (consumer *GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (consumer *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (consumer *GroupConsumer) Output() <-chan GroupTask
- func (consumer *GroupConsumer) Setup(session sarama.ConsumerGroupSession) error
- func (consumer *GroupConsumer) StartConsume(gctx context.Context, allTopic string, consumerCount int) error
- func (consumer *GroupConsumer) StopConsume() error
- type GroupTask
- type Message
- type NewGroupTaskFunc
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var (
SHA256 scram.HashGeneratorFcn = sha256.New
)
Functions ¶
func CreateKafkaPublisher ¶
func CreateKafkaPublisher(kafkaDSN string) (sarama.SyncProducer, error)
func CreateKafkaSubscriber ¶
func CreateKafkaSubscriber(kafkaDSN string, group string) (sarama.ConsumerGroup, error)
Types ¶
type GroupConsumer ¶
type GroupConsumer struct { Ready chan bool // contains filtered or unexported fields }
func NewGroupConsumer ¶
func NewGroupConsumer(client sarama.ConsumerGroup, newFunc NewGroupTaskFunc) *GroupConsumer
func (*GroupConsumer) Cleanup ¶
func (consumer *GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*GroupConsumer) ConsumeClaim ¶
func (consumer *GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*GroupConsumer) Output ¶
func (consumer *GroupConsumer) Output() <-chan GroupTask
func (*GroupConsumer) Setup ¶
func (consumer *GroupConsumer) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
func (*GroupConsumer) StartConsume ¶
func (*GroupConsumer) StopConsume ¶
func (consumer *GroupConsumer) StopConsume() error
type Message ¶
type Message struct { Lag int64 *sarama.ConsumerMessage }
type NewGroupTaskFunc ¶
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.