Documentation ¶
Index ¶
Constants ¶
const (
// AuthDSNTemplate template for auth DSN.
AuthDSNTemplate = "SASLType:login:password"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(brokers []string, group string, config *sarama.Config) (*ConsumerGroup, error)
NewConsumerGroup creates new ConsumerGroup.
func (*ConsumerGroup) Consume ¶
func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
Consume starts non blocking consumer loop for ConsumerHandle on provided topics list. Returned channel will closed as soon as Setup step is happened is called for first handler call or if error happens first.
func (*ConsumerGroup) Errors ¶
func (c *ConsumerGroup) Errors() <-chan error
Errors returns errors channel.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer kafka producer.
func NewProducer ¶
NewProducer creates a new Producer using the given broker addresses and configuration.
func (*Producer) Close ¶
Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.
func (*Producer) SendMessage ¶
SendMessage produces a given message, and returns only when it either has succeeded or failed to produce. It will return the partition and the offset of the produced message, or an error if the message failed to produce.
type RecordHeaders ¶
type RecordHeaders []sarama.RecordHeader
func NewRecordHeadersFromPointers ¶
func NewRecordHeadersFromPointers(headers []*sarama.RecordHeader) RecordHeaders
NewRecordHeadersFromPointers converts []*sarama.RecordHeader -> []sarama.RecordHeader.
func (RecordHeaders) String ¶
func (p RecordHeaders) String() string
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.ClientConversation HashGeneratorFcn scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(user, password, authzID string) error
Begin prepares the client for the SCRAM exchange with the server with a user name and a password