gkafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2019 License: MIT Imports: 6 Imported by: 1

README

gkafka Go Doc

Package gkafka provides producer and consumer client for kafka server.

Documentation

Overview

Package gkafka provides producer and consumer client for kafka server.

Kafka客户端.

Example (Consumer)

Create consumer.

消费者

package main

import (
	"fmt"
	"github.com/gogf/gkafka"
)

func main() {
	group := "test-group"
	topic := "test"
	kafkaConfig := gkafka.NewConfig()
	kafkaConfig.Servers = "localhost:9092"
	kafkaConfig.AutoMarkOffset = false
	kafkaConfig.Topics = topic
	kafkaConfig.GroupId = group

	client := gkafka.NewClient(kafkaConfig)
	defer client.Close()

	// 标记开始读取的offset位置
	client.MarkOffset(topic, 0, 6)
	for {
		if msg, err := client.Receive(); err != nil {
			fmt.Println(err)
			break
		} else {
			fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))
			msg.MarkOffset()
		}
	}
}
Output:

Example (Producer)

Create producer.

生产者

package main

import (
	"fmt"
	"github.com/gogf/gkafka"
	"time"
)

func main() {
	topic := "test"
	kafkaConfig := gkafka.NewConfig()
	kafkaConfig.Servers = "localhost:9092"
	kafkaConfig.AutoMarkOffset = false
	kafkaConfig.Topics = topic

	client := gkafka.NewClient(kafkaConfig)
	defer client.Close()
	for {
		s := time.Now().String()
		fmt.Println("produce:", s)
		if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {
			fmt.Println(err)
		}
		time.Sleep(time.Second)
	}
}
Output:

Example (Topics)

Fetch all topics from server.

获取所有topics

package main

import (
	"fmt"
	"github.com/gogf/gkafka"
)

func main() {
	config := gkafka.NewConfig()
	config.Servers = "localhost:9092"

	client := gkafka.NewClient(config)
	defer client.Close()

	fmt.Println(client.Topics())
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Config *Config
	// contains filtered or unexported fields
}

Kafka Client(Consumer/SyncProducer/AsyncProducer)

func NewClient

func NewClient(config *Config) *Client

New a kafka client.

func (*Client) AsyncSend

func (client *Client) AsyncSend(message *Message) error

Send data to kafka in asynchronized way(concurrent safe).

异步发送/生产kafka消息(并发安全)。

func (*Client) Close

func (client *Client) Close()

Close client.

关闭客户端

func (*Client) MarkOffset

func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error

Mark the start offset for consumer.

标记指定topic 分区开始读取位置

func (*Client) Receive

func (client *Client) Receive() (*Message, error)

Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.

阻塞获取kafka消息。

func (*Client) SyncSend

func (client *Client) SyncSend(message *Message) error

Send data to kafka in synchronized way.

同步发送/生产kafka消息。

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

Get all topics from kafka server.

这里创建独立的消费客户端获取topics,获取完之后销毁该客户端对象。

type Config

type Config struct {
	sarama.Config
	GroupId        string // group id for consumer.
	Servers        string // server list, multiple servers joined by ','.
	Topics         string // topic list, multiple topics joined by ','.
	AutoMarkOffset bool   // auto mark message read after consumer message from server
}

Kafka client config based on sarama.Config

func NewConfig

func NewConfig() *Config

New a default configuration object.

创建一个默认的客户端配置对象。

type Message

type Message struct {
	Value     []byte
	Key       []byte
	Topic     string
	Partition int
	Offset    int
	// contains filtered or unexported fields
}

Kafka Message.

func (*Message) MarkOffset

func (msg *Message) MarkOffset()

Mark current message consumed.

自动标记已读取

Directories

Path Synopsis
third
github.com/Shopify/sarama
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later).
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later).
github.com/Shopify/sarama/mocks
Package mocks provides mocks that can be used for testing applications that use Sarama.
Package mocks provides mocks that can be used for testing applications that use Sarama.
github.com/davecgh/go-spew/spew
Package spew implements a deep pretty printer for Go data structures to aid in debugging.
Package spew implements a deep pretty printer for Go data structures to aid in debugging.
github.com/eapache/go-resiliency/batcher
Package batcher implements the batching resiliency pattern for Go.
Package batcher implements the batching resiliency pattern for Go.
github.com/eapache/go-resiliency/breaker
Package breaker implements the circuit-breaker resiliency pattern for Go.
Package breaker implements the circuit-breaker resiliency pattern for Go.
github.com/eapache/go-resiliency/deadline
Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
github.com/eapache/go-resiliency/retrier
Package retrier implements the "retriable" resiliency pattern for Go.
Package retrier implements the "retriable" resiliency pattern for Go.
github.com/eapache/go-resiliency/semaphore
Package semaphore implements the semaphore resiliency pattern for Go.
Package semaphore implements the semaphore resiliency pattern for Go.
github.com/eapache/queue
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
github.com/golang/snappy
Package snappy implements the Snappy compression format.
Package snappy implements the Snappy compression format.
github.com/johngcn/sarama-cluster
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
github.com/pierrec/lz4
Package lz4 implements reading and writing lz4 compressed data (a frame), as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
Package lz4 implements reading and writing lz4 compressed data (a frame), as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
github.com/pierrec/lz4/internal/xxh32
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
github.com/rcrowley/go-metrics
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics>
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics>
github.com/rcrowley/go-metrics/exp
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler
github.com/rcrowley/go-metrics/stathat
Metrics output to StatHat.
Metrics output to StatHat.
golang.org/x/sys/cpu
Package cpu implements processor feature detection for various CPU architectures.
Package cpu implements processor feature detection for various CPU architectures.
golang.org/x/sys/unix
Package unix contains an interface to the low-level operating system primitives.
Package unix contains an interface to the low-level operating system primitives.
golang.org/x/sys/windows/svc
Package svc provides everything required to build Windows service.
Package svc provides everything required to build Windows service.
golang.org/x/sys/windows/svc/debug
Package debug provides facilities to execute svc.Handler on console.
Package debug provides facilities to execute svc.Handler on console.
golang.org/x/sys/windows/svc/eventlog
Package eventlog implements access to Windows event log.
Package eventlog implements access to Windows event log.
golang.org/x/sys/windows/svc/example
Example service program that beeps.
Example service program that beeps.
golang.org/x/sys/windows/svc/mgr
Package mgr can be used to manage Windows service programs.
Package mgr can be used to manage Windows service programs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL