gkc

package module
v0.0.0-...-bd57e7a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2019 License: MIT Imports: 10 Imported by: 0

README

gkc

Wrapper over confluent go library for easy consumption

Features

  • Create a simple kafka consumer, with options provided
  • Hook interface to implement on success/error of a message
  • Commit of messages to kafka handled based on time interval

Installing

go get -u github.com/arriqaaq/gkc

Example

Here's a full example of a gkc that consumers from kafka:

You can run this example from a terminal:

go run example/main.go
package main

import (
	"github.com/arriqaaq/gkc"
	"log"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	config := &gkc.ConsumerConfig{
		GroupName:     "kylian",
		Topics:        []string{"topic"},
		Broker:        "localhost:9092,localhost:9092,localhost:9092,localhost:9092,localhost:9092",
		MessageHook:   gkc.NewHookFunc(func(msg *gkc.Message) error { return nil }),
		ErrorHook:     gkc.NewHookFunc(func(msg *gkc.Message) error { return nil }),
		Address:       "0.0.0.0:8101",
		ExposeMetrics: true,
	}

	consumer, err := gkc.NewConsumer(config)
	if err != nil {
		log.Fatalln(err)
	}
	consumer.DisableLog()
	consumer.Start()

	// Boring stuff
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, os.Interrupt)
	go func() {
		for item := range consumer.Messages() {
			log.Println(item)
		}
	}()
	<-signalCh
	consumer.Stop()
}

TODO

  • Add more test cases
  • Add zap logger for debug purpose

Contact

Farhan @arriqaaq

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultOptions         = Options{}
	DefaultMetricNamespace = "metrics"
	DefaultMetricSubsystem = "gkc"
	DefaultChannelSize     = 2000
)

Functions

func DefaultConfluentConfig

func DefaultConfluentConfig(config *ConsumerConfig) *kafka.ConfigMap

Types

type Consumer

type Consumer interface {

	// Name returns the name of this consumer group.
	Name() string

	// Topics returns the names of the topics being consumed.
	Topics() []string

	// Start starts the consumer
	Start() error
	// Stop stops the consumer
	Stop() error
	// Closed returns a channel which will be closed after this consumer is completely shutdown
	Closed() <-chan struct{}
	// Messages return the message channel for this consumer
	// Messages() <-chan Message
	Messages() <-chan *Message

	DisableLog()
}

func NewConsumer

func NewConsumer(config *ConsumerConfig) (Consumer, error)

type ConsumerConfig

type ConsumerConfig struct {
	// GroupName identifies your consumer group. Unless your application creates
	// multiple consumer groups (in which case it's suggested to have application name as
	// prefix of the group name), this should match your application name.
	GroupName string

	// Topic is the name of topic to consume from.
	Topics []string

	// Broker is the list of brokers in the kafka cluster to consume from.
	Broker string

	// Defines the logic after processing the kafka message
	MessageHook Hook

	// Defines the logic after processing the failed kafka message from DLQ
	ErrorHook Hook

	// Enable prometheus metrics
	ExposeMetrics bool

	// Prometheus address to export metrics on
	Address string
}

type Counter

type Counter interface {
	UpdateTotal(float64)
	UpdateSuccess(float64)
	UpdateLatency(time.Duration)
}

func NewCounter

func NewCounter(namespace, subsystem string) Counter

type Hook

type Hook interface {
	Execute(*Message) error
}

func NewHookFunc

func NewHookFunc(h HookFunc) Hook

type HookFunc

type HookFunc func(*Message) error

func (HookFunc) Execute

func (h HookFunc) Execute(m *Message) error

type Message

type Message struct {
	*kafka.Message
}

func (*Message) Value

func (m *Message) Value() string

type Options

type Options struct {
	RcvBufferSize        int // aggregate message buffer size
	Concurrency          int // number of goroutines that will concurrently process messages
	OffsetCommitInterval time.Duration
	ConsumerGroupNumber  int // No of goroutines to run to consume from n different partitions
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL