Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrSkipMarkOffset = errors.New("skip mark offset")
)
Functions ¶
This section is empty.
Types ¶
type Handler ¶
type Handler func(context.Context, *sarama.ConsumerMessage) error
Handler handles kafka message.
To stop consuming, return error except ErrSkipMarkOffset. Return ErrSkipMarkOffset to skip mark commit offset.
type Option ¶
type Option func(*options)
func WithCleanupCallback ¶
func WithCleanupCallback(onCleanup func(sarama.ConsumerGroupSession) error) Option
func WithSetupCallback ¶
func WithSetupCallback(onSetup func(sarama.ConsumerGroupSession) error) Option
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewKafkaWorker ¶
func NewKafkaWorker(cg sarama.ConsumerGroup, opts ...Option) *Worker
func (*Worker) GracefulStop ¶
func (w *Worker) GracefulStop()
func (*Worker) RegisterHandler ¶
Click to show internal directories.
Click to hide internal directories.