kafkaUtil

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2020 License: MIT Imports: 13 Imported by: 0

README

kafkaUtil

a util for Quick to use sarama kafka client

Usage

AsyncProducer

producer, err := NewProducer(
    []string{"172.16.9.207:9092"},
    MarshalMsgFunc(codec.JSONMarshal),
    Async())
if err != nil {
    log.Fatalf("kafka error: [%s]", err)
}

go func() {
    for {
        s := <-producer.AsyncSuccesses()
        fmt.Printf("success: %v \n", s)
    }
}()

go func() {
    for {
        e := <-producer.AsyncErrors()
        fmt.Printf("error: %v \n", e)
    }
}()

for {
    t := <-time.After(200 * time.Millisecond)
    person := &example.Person{
        Name: fmt.Sprintf("%d", t.UnixNano()),
        Age:  uint8(t.Nanosecond()),
    }
    producer.AsyncSendMessages("person", ProducerMsg{
        Headers: map[string]string{"traceId": "123456"},
        Body:    person,
    })
}

SyncProducer

producer, err := NewProducer(
    []string{"172.16.9.207:9092"},
    MarshalMsgFunc(codec.JSONMarshal),
    Sync())
if err != nil {
    log.Fatalf("kafka error: [%s]", err)
}

for {
    t := <-time.After(200 * time.Millisecond)
    person := &example.Person{
        Name: fmt.Sprintf("%d", t.UnixNano()),
        Age:  uint8(t.Nanosecond()),
    }
    send, offset, err := producer.SyncSendMessages("person", ProducerMsg{
        Headers: map[string]string{"traceId": "123456"},
        Body:    person,
    })
    fmt.Println(send, offset, err)
}

Consumer

consumer, err := NewConsumer([]string{"172.16.9.207:9092"}, Newest())
if err != nil {
    log.Fatalf("kafka error: [%s]", err)
}
defer consumer.Close()

for i:=0;i<10;i++ {
    ctx, _ := context.WithTimeout(context.Background(), time.Second*20)
    publications := consumer.GroupConsume(ctx, []string{"person"})
    go func() {
        for e := range consumer.Errors() {
            t.Log(e)
        }
    }()
    for p := range publications {
        t.Log(p.Msg.Offset)
        p.Session.MarkMessage(p.Msg, "")
    }
    t.Log("=======================================")
}
t.Log("finished")

Documentation

Index

Constants

View Source
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	RequiredAcksNoResponse int16 = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	RequiredAcksWaitForLocal int16 = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	RequiredAcksWaitForAll int16 = -1
)
View Source
const (
	//CompressionNone no compression
	CompressionNone int8 = iota
	//CompressionGZIP compression using GZIP
	CompressionGZIP
	//CompressionSnappy compression using snappy
	CompressionSnappy
	//CompressionLZ4 compression using LZ4
	CompressionLZ4
	//CompressionZSTD compression using ZSTD
	CompressionZSTD
)

Variables

View Source
var (
	ConsumerReceiveMultipleCalledError = errors.New("The receive method can only be called once, and if you want to call it more than once, create multiple consumers")
	ConsumerCloseMultipleCalledError   = errors.New("The close method can only be called once, and if you want to call it more than once, create multiple consumers")
	ProducerClosedError                = errors.New("Producer had closed")

	SyncError  = errors.New("this is a sync method, but Producer is async")
	AsyncError = errors.New("this is an async method, but Producer is sync")

	NotConsumerGroupError = errors.New("this is not Consumer group")

	ConsumeCanceledError = errors.New("consume was canceled")
)

Functions

This section is empty.

Types

type Consumer added in v0.0.7

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(addresses []string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close added in v0.0.7

func (c *Consumer) Close() (err error)

func (*Consumer) Errors added in v0.0.7

func (c *Consumer) Errors() <-chan error

func (*Consumer) GroupConsume added in v0.0.7

func (c *Consumer) GroupConsume(ctx context.Context, topics []string, handlerFunc PublicationHandlerFunc) error

type ConsumerOption added in v0.0.7

type ConsumerOption func(*consumerOptions)

func BalanceStrategyRange added in v0.0.6

func BalanceStrategyRange() ConsumerOption

func BalanceStrategyRoundRobin added in v0.0.6

func BalanceStrategyRoundRobin() ConsumerOption

func BalanceStrategySticky added in v0.0.6

func BalanceStrategySticky() ConsumerOption

func ChannelBufferSize added in v0.1.0

func ChannelBufferSize(size int) ConsumerOption

func Group added in v0.0.6

func Group(groupID string) ConsumerOption

func Newest added in v0.0.6

func Newest() ConsumerOption

func Oldest added in v0.0.6

func Oldest() ConsumerOption

type Producer added in v0.0.2

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(addresses []string, opts ...ProducerOption) (*Producer, error)

func (*Producer) AsyncErrors added in v0.0.2

func (p *Producer) AsyncErrors() <-chan *sarama.ProducerError

func (*Producer) AsyncSendMessages added in v0.0.3

func (p *Producer) AsyncSendMessages(topic string, msgs ...ProducerMsg) error

func (*Producer) AsyncSendMsgWithHeaders added in v0.0.3

func (p *Producer) AsyncSendMsgWithHeaders(topic string, msg interface{}, headers map[string]string)

func (*Producer) AsyncSuccesses added in v0.0.2

func (p *Producer) AsyncSuccesses() <-chan *sarama.ProducerMessage

func (*Producer) Close added in v0.0.2

func (p *Producer) Close() (err error)

func (*Producer) SyncSendMessages added in v0.0.3

func (p *Producer) SyncSendMessages(topic string, msgs ...ProducerMsg) (partition int32, offset int64, err error)

func (*Producer) SyncSendMsgWithHeaders added in v0.0.3

func (p *Producer) SyncSendMsgWithHeaders(topic string, msg interface{}, headers map[string]string) (partition int32, offset int64, err error)

type ProducerMsg

type ProducerMsg struct {
	Headers  map[string]string
	Body     interface{}
	Key      string
	Metadata interface{}
	Marshal  codec.Marshal
}

type ProducerOption added in v0.0.2

type ProducerOption func(*producerOptions)

func Async

func Async() ProducerOption

func AsyncMonitor added in v0.1.3

func AsyncMonitor(success, failed bool) ProducerOption

func Compression added in v0.1.3

func Compression(compression int8) ProducerOption

func MarshalMsgFunc

func MarshalMsgFunc(marshal codec.Marshal) ProducerOption

func RequiredAcks added in v0.1.3

func RequiredAcks(requiredAcks int16) ProducerOption

func Sync

func Sync() ProducerOption

func TLSConfig added in v0.1.3

func TLSConfig(certFile, keyFile, caFile string, verifySsl bool) ProducerOption

type Publication

type Publication struct {
	Msg     *sarama.ConsumerMessage
	Session sarama.ConsumerGroupSession
}

type PublicationHandlerFunc added in v0.1.3

type PublicationHandlerFunc func(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL