Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Brokers []string // A list of host:port addresses to use for establishing the initial connection to the Kafka cluster. Topic string // Kafka topic to be consumed GroupID string // A name for the consumer group Logger *log.Logger // Logger used to log connection errors; defaults to log.New(os.Stderr, "", log.LstdFlags) MinBytes int32 // The minimum number of bytes to fetch in a request MaxBytes int32 // The maximum number of bytes to fetch in a request MaxWait time.Duration // The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyways }
Config is the configuration for creating a new consumer
type Consumer ¶
type Consumer interface { // Start starts consuming messages from the Kafka topic. Start() error // Messages returns a channel that can be used to receive messages // from the Kafka topic. Messages() <-chan *sarama.ConsumerMessage // Close stops the consumer and releases any resources it holds. Close() error // Closed returns true if the consumer has been closed. Closed() bool }
Consumer defines an interface for consuming messages from a Kafka topic.
func NewConsumer ¶
NewConsumer creates a new Kafka consumer with the given configuration. Returns an error if the config is nil, or if any required fields are empty. The returned consumer object can be used to start consuming messages.
Click to show internal directories.
Click to hide internal directories.