kafka

package
v0.0.0-...-7429660 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2023 License: Apache-2.0, MIT Imports: 24 Imported by: 0

README

kafka

Kafka libraries, tools and example applications built on top of the sarama package.

Libraries
  • consumergroup: Distributed Kafka consumer, backed by Zookeeper, supporting load balancing and offset persistence, as defined by the Kafka documentation. API documentation can be found on godoc.org
Tools

The following tools can be useful for discovery, testing, and benchmarking. They also serve as examples of how to use Sarama.

  • tools/stressproducer: A tool to stress test the producer to measure throughput and latency.
  • tools/consoleconsumer: A tool to consume a kafka topic and write messages to STDOUT.
Examples
  • examples/consumergroup: An example consumer application that uses the consumergroup library mentioned above.

Documentation

Index

Constants

View Source
const (
	KafkaSuccess        int = 0   //kafka 0 成功
	KafkaSendInnerError int = 200 //kafka 100 内部错误
	KafkaSendNotInit    int = 201 //kafka 101 未init
	KafkaSendError      int = 202

	KafkaConsumeError      int = 203
	KafkaConsumeInitError  int = 204
	KafkaProducerInitError int = 205

	KafkaGetConsumeClientError  int = 206
	KafkaGetProducerClientError int = 207
)
View Source
const (
	P_KAFKA_PRE               string = "kfkclient"
	C_KAFKA_PRE               string = "ckfklient"
	KAFKA_INIT                string = "Init"
	KAFKA_GET_PRODUCER_CLIENT string = "PCGet"
	KAFKA_GET_CONSUME_CLIENT  string = "CCGet"
)

Variables

View Source
var (
	KAFKA_CLIENT_NOT_INIT = errors.New("kafka client not init")
	KAFKA_PARAMS_ERROR    = errors.New("kafka params error")
)
View Source
var (
	REQUIRED_ACK_NO_RESPONSE    string = "NoResponse"
	REQUIRED_ACK_WAIT_FOR_LOCAL string = "WaitForLocal"
	REQUIRED_ACK_WAIT_FOR_ALL   string = "WaitForAll"
)
View Source
var (
	// Logger is kafka client logger
	Logger = stdlog.New(ioutil.Discard, "[Sarama] ", stdlog.LstdFlags)
)

Functions

func NSKey

func NSKey(ctx context.Context) (string, bool)

func WithNSKey

func WithNSKey(ctx context.Context, ns string) context.Context

Types

type ConsumeCallback

type ConsumeCallback interface {
	Process(values []byte)
}

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value     []byte
	Topic          string
	Partition      int32
	Offset         int64
	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
	MessageID      string
	CreateAt       time.Time
	// contains filtered or unexported fields
}

func (*ConsumerMessage) Context

func (m *ConsumerMessage) Context() context.Context

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(conf KafkaProductConfig) (*KafkaClient, error)

func NewMockAsyncProducerClient

func NewMockAsyncProducerClient() (asyncClient *KafkaClient, asyncMock *mocks.AsyncProducer, err error)

NewMockAsyncProducerClient returns standard async client ,asyncMocker and err. asyncMocker example shows here: http://github.com/Shopify/sarama/mocks

func (*KafkaClient) Close

func (kc *KafkaClient) Close() error

func (*KafkaClient) Errors

func (kc *KafkaClient) Errors() <-chan *ProducerError

func (*KafkaClient) Send

func (ksc *KafkaClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)

func (*KafkaClient) SendKeyMsg deprecated

func (kc *KafkaClient) SendKeyMsg(topic string, key string, msg []byte) error

Deprecated: SendKeyMsg func should not use anymore. Use Send func instead

func (*KafkaClient) SendMsg deprecated

func (kc *KafkaClient) SendMsg(topic string, msg []byte) error

Deprecated: SendMsg func should not use anymore. Use Send func instead

func (*KafkaClient) Success

func (kc *KafkaClient) Success() <-chan *ProducerMessage

type KafkaConsumeClient

type KafkaConsumeClient struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumeClient

func NewKafkaConsumeClient(conf KafkaConsumeConfig) (*KafkaConsumeClient, error)

func (*KafkaConsumeClient) Close

func (kcc *KafkaConsumeClient) Close() error

func (*KafkaConsumeClient) CommitUpto

func (kcc *KafkaConsumeClient) CommitUpto(message *ConsumerMessage)

func (*KafkaConsumeClient) Errors

func (kcc *KafkaConsumeClient) Errors() <-chan error

func (*KafkaConsumeClient) GetMessages

func (kcc *KafkaConsumeClient) GetMessages() <-chan *ConsumerMessage

func (*KafkaConsumeClient) Messages

func (kcc *KafkaConsumeClient) Messages(closeChan chan bool, maxQueueSize int) chan []byte

type KafkaConsumeConfig

type KafkaConsumeConfig struct {
	ConsumeFrom    string `toml:"consume_from"`
	Zookeeperhost  string `toml:"zkpoints"`
	Topic          string `toml:"topic"`
	Group          string `toml:"group"`
	Initoffset     int    `toml:"initoffset"`
	ProcessTimeout int    `toml:"process_timeout"`
	CommitInterval int    `toml:"commit_interval"`
	GetError       bool   `toml:"get_error"`
	TraceEnable    bool   `toml:"trace_enable"`
	ConsumeAll     bool   `toml:"consume_all"`
}

type KafkaProductConfig

type KafkaProductConfig struct {
	ProducerTo     string `toml:"producer_to"`
	Broken         string `toml:"kafka_broken"`
	RetryMax       int    `toml:"retrymax"`
	RequiredAcks   string `toml:"RequiredAcks"`
	GetError       bool   `toml:"get_error"`
	GetSuccess     bool   `toml:"get_success"`
	RequestTimeout int    `toml:"request_timeout"`
	Printf         bool
	UseSync        bool
}

type KafkaSyncClient

type KafkaSyncClient struct {
	// contains filtered or unexported fields
}

func NewMockSyncProducerClient

func NewMockSyncProducerClient() (syncClient *KafkaSyncClient, syncMock *mocks.SyncProducer, err error)

NewMockSyncProducerClient returns standard sync client ,syncMocker and err. syncMocker example shows here: http://github.com/Shopify/sarama/mocks

func NewSyncProducterClient

func NewSyncProducterClient(conf KafkaProductConfig) (*KafkaSyncClient, error)

func (*KafkaSyncClient) Close

func (kc *KafkaSyncClient) Close() error

func (*KafkaSyncClient) Send

func (ksc *KafkaSyncClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)

Send message to kafka cluster, ctx is http/rpc context headers rfs = https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

func (*KafkaSyncClient) SendSyncMsg deprecated

func (ksc *KafkaSyncClient) SendSyncMsg(topic, key string, msg []byte) (int32, int64, error)

Deprecated: SendSyncMsg func should not use anymore. Use Send func instead

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

type ProducerMessage

type ProducerMessage struct {
	Topic string // The Kafka topic for this message.
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key string
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value []byte

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
	// Timestamp is the timestamp assigned to the message by the broker. This
	// is only guaranteed to be defined if the message was successfully
	// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
	// least version 0.10.0.
	Timestamp time.Time

	// MessageID
	MessageID string
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

type SendResponse

type SendResponse struct {
	Partition int32
	Offset    int64
	Err       error
}

type TestReporter

type TestReporter struct {
	// contains filtered or unexported fields
}

TestReporter records producer/consumer errors

func NewTestReporter

func NewTestReporter() *TestReporter

func (*TestReporter) Errorf

func (tr *TestReporter) Errorf(format string, args ...interface{})

Directories

Path Synopsis
examples
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL