Documentation ¶
Overview ¶
Package gkafka provides producer and consumer client for kafka server.
Kafka客户端.
Example (Consumer) ¶
Create consumer.
消费者
package main import ( "fmt" "github.com/gogf/gkafka" ) func main() { group := "test-group" topic := "test" kafkaConfig := gkafka.NewConfig() kafkaConfig.Servers = "localhost:9092" kafkaConfig.AutoMarkOffset = false kafkaConfig.Topics = topic kafkaConfig.GroupId = group client := gkafka.NewClient(kafkaConfig) defer client.Close() // 标记开始读取的offset位置 client.MarkOffset(topic, 0, 6) for { if msg, err := client.Receive(); err != nil { fmt.Println(err) break } else { fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value)) msg.MarkOffset() } } }
Output:
Example (Producer) ¶
Create producer.
生产者
package main import ( "fmt" "github.com/gogf/gkafka" "time" ) func main() { topic := "test" kafkaConfig := gkafka.NewConfig() kafkaConfig.Servers = "localhost:9092" kafkaConfig.AutoMarkOffset = false kafkaConfig.Topics = topic client := gkafka.NewClient(kafkaConfig) defer client.Close() for { s := time.Now().String() fmt.Println("produce:", s) if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil { fmt.Println(err) } time.Sleep(time.Second) } }
Output:
Example (Topics) ¶
Fetch all topics from server.
获取所有topics
package main import ( "fmt" "github.com/gogf/gkafka" ) func main() { config := gkafka.NewConfig() config.Servers = "localhost:9092" client := gkafka.NewClient(config) defer client.Close() fmt.Println(client.Topics()) }
Output:
Index ¶
- type Client
- func (client *Client) AsyncSend(message *Message) error
- func (client *Client) Close()
- func (client *Client) MarkOffset(topic string, partition int, offset int, metadata ...string) error
- func (client *Client) Receive() (*Message, error)
- func (client *Client) SyncSend(message *Message) error
- func (client *Client) Topics() ([]string, error)
- type Config
- type Message
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { Config *Config // contains filtered or unexported fields }
Kafka Client(Consumer/SyncProducer/AsyncProducer)
func (*Client) AsyncSend ¶
Send data to kafka in asynchronized way(concurrent safe).
异步发送/生产kafka消息(并发安全)。
func (*Client) Receive ¶
Receive message from kafka from specified topics in config, in BLOCKING way, gkafka will handle offset tracking automatically.
阻塞获取kafka消息。
type Config ¶
type Config struct { sarama.Config GroupId string // group id for consumer. Servers string // server list, multiple servers joined by ','. Topics string // topic list, multiple topics joined by ','. AutoMarkOffset bool // auto mark message read after consumer message from server }
Kafka client config based on sarama.Config
Directories ¶
Path | Synopsis |
---|---|
third
|
|
github.com/Shopify/sarama
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later).
|
Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). |
github.com/Shopify/sarama/mocks
Package mocks provides mocks that can be used for testing applications that use Sarama.
|
Package mocks provides mocks that can be used for testing applications that use Sarama. |
github.com/davecgh/go-spew/spew
Package spew implements a deep pretty printer for Go data structures to aid in debugging.
|
Package spew implements a deep pretty printer for Go data structures to aid in debugging. |
github.com/eapache/go-resiliency/batcher
Package batcher implements the batching resiliency pattern for Go.
|
Package batcher implements the batching resiliency pattern for Go. |
github.com/eapache/go-resiliency/breaker
Package breaker implements the circuit-breaker resiliency pattern for Go.
|
Package breaker implements the circuit-breaker resiliency pattern for Go. |
github.com/eapache/go-resiliency/deadline
Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
|
Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go. |
github.com/eapache/go-resiliency/retrier
Package retrier implements the "retriable" resiliency pattern for Go.
|
Package retrier implements the "retriable" resiliency pattern for Go. |
github.com/eapache/go-resiliency/semaphore
Package semaphore implements the semaphore resiliency pattern for Go.
|
Package semaphore implements the semaphore resiliency pattern for Go. |
github.com/eapache/queue
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
|
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. |
github.com/golang/snappy
Package snappy implements the Snappy compression format.
|
Package snappy implements the Snappy compression format. |
github.com/johngcn/sarama-cluster
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
|
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes. |
github.com/pierrec/lz4
Package lz4 implements reading and writing lz4 compressed data (a frame), as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
|
Package lz4 implements reading and writing lz4 compressed data (a frame), as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html. |
github.com/pierrec/lz4/internal/xxh32
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
|
Package xxh32 implements the very fast XXH hashing algorithm (32 bits version). |
github.com/rcrowley/go-metrics
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics>
|
Go port of Coda Hale's Metrics library <https://github.com/rcrowley/go-metrics> Coda Hale's original work: <https://github.com/codahale/metrics> |
github.com/rcrowley/go-metrics/exp
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler
|
Hook go-metrics into expvar on any /debug/metrics request, load all vars from the registry into expvar, and execute regular expvar handler |
github.com/rcrowley/go-metrics/stathat
Metrics output to StatHat.
|
Metrics output to StatHat. |
golang.org/x/sys/cpu
Package cpu implements processor feature detection for various CPU architectures.
|
Package cpu implements processor feature detection for various CPU architectures. |
golang.org/x/sys/unix
Package unix contains an interface to the low-level operating system primitives.
|
Package unix contains an interface to the low-level operating system primitives. |
golang.org/x/sys/windows/svc
Package svc provides everything required to build Windows service.
|
Package svc provides everything required to build Windows service. |
golang.org/x/sys/windows/svc/debug
Package debug provides facilities to execute svc.Handler on console.
|
Package debug provides facilities to execute svc.Handler on console. |
golang.org/x/sys/windows/svc/eventlog
Package eventlog implements access to Windows event log.
|
Package eventlog implements access to Windows event log. |
golang.org/x/sys/windows/svc/example
Example service program that beeps.
|
Example service program that beeps. |
golang.org/x/sys/windows/svc/mgr
Package mgr can be used to manage Windows service programs.
|
Package mgr can be used to manage Windows service programs. |
Click to show internal directories.
Click to hide internal directories.