kafka

package
v0.0.0-...-b5d3b42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KafkaSuccess        int = 0   //kafka 0 成功
	KafkaSendInnerError int = 200 //kafka 100 内部错误
	KafkaSendNotInit    int = 201 //kafka 101 未init
	KafkaSendError      int = 202

	KafkaConsumeError      int = 203
	KafkaConsumeInitError  int = 204
	KafkaProducerInitError int = 205

	KafkaGetConsumeClientError  int = 206
	KafkaGetProducerClientError int = 207
)
View Source
const (
	P_KAFKA_PRE               string = "kfkclient"
	C_KAFKA_PRE               string = "ckfklient"
	KAFKA_INIT                string = "Init"
	KAFKA_GET_PRODUCER_CLIENT string = "PCGet"
	KAFKA_GET_CONSUME_CLIENT  string = "CCGet"
)

Variables

View Source
var (
	KAFKA_CLIENT_NOT_INIT = errors.New("kafka client not init")
	KAFKA_PARAMS_ERROR    = errors.New("kafka params error")
)
View Source
var (
	REQUIRED_ACK_NO_RESPONSE    string = "NoResponse"
	REQUIRED_ACK_WAIT_FOR_LOCAL string = "WaitForLocal"
	REQUIRED_ACK_WAIT_FOR_ALL   string = "WaitForAll"
)
View Source
var (
	// Logger is kafka client logger
	Logger = stdlog.New(ioutil.Discard, "[Sarama] ", stdlog.LstdFlags)
)

Functions

This section is empty.

Types

type ConsumeCallback

type ConsumeCallback interface {
	Process(values []byte)
}

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value     []byte
	Topic          string
	Partition      int32
	Offset         int64
	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
	MessageID      string
	CreateAt       time.Time
	// contains filtered or unexported fields
}

func (*ConsumerMessage) Context

func (m *ConsumerMessage) Context() context.Context

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(conf KafkaProductConfig) (*KafkaClient, error)

func (*KafkaClient) Close

func (kc *KafkaClient) Close() error

func (*KafkaClient) Errors

func (kc *KafkaClient) Errors() <-chan *ProducerError

func (*KafkaClient) Send

func (ksc *KafkaClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)

func (*KafkaClient) SendKeyMsg

func (kc *KafkaClient) SendKeyMsg(topic string, key string, msg []byte) error

func (*KafkaClient) SendMsg

func (kc *KafkaClient) SendMsg(topic string, msg []byte) error

func (*KafkaClient) Success

func (kc *KafkaClient) Success() <-chan *ProducerMessage

type KafkaConsumeClient

type KafkaConsumeClient struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumeClient

func NewKafkaConsumeClient(conf KafkaConsumeConfig) (*KafkaConsumeClient, error)

func (*KafkaConsumeClient) Close

func (kcc *KafkaConsumeClient) Close() error

func (*KafkaConsumeClient) CommitUpto

func (kcc *KafkaConsumeClient) CommitUpto(message *ConsumerMessage)

func (*KafkaConsumeClient) Errors

func (kcc *KafkaConsumeClient) Errors() <-chan error

func (*KafkaConsumeClient) GetMessages

func (kcc *KafkaConsumeClient) GetMessages() <-chan *ConsumerMessage

func (*KafkaConsumeClient) Messages

func (kcc *KafkaConsumeClient) Messages(closeChan chan bool, maxQueueSize int) chan []byte

type KafkaConsumeConfig

type KafkaConsumeConfig struct {
	ConsumeFrom    string `toml:"consume_from"`
	Zookeeperhost  string `toml:"zkpoints"`
	Topic          string `toml:"topic"`
	Group          string `toml:"group"`
	Initoffset     int    `toml:"initoffset"`
	ProcessTimeout int    `toml:"process_timeout"`
	CommitInterval int    `toml:"commit_interval"`
	GetError       bool   `toml:"get_error"`
	TraceEnable    bool   `toml:"trace_enable"`
}

type KafkaProductConfig

type KafkaProductConfig struct {
	ProducerTo     string `toml:"producer_to"`
	Broken         string `toml:"kafka_broken"`
	RetryMax       int    `toml:"retrymax"`
	RequiredAcks   string `toml:"RequiredAcks"`
	GetError       bool   `toml:"get_error"`
	GetSuccess     bool   `toml:"get_success"`
	RequestTimeout int    `toml:"request_timeout"`
	Printf         bool
	UseSync        bool
}

type KafkaSyncClient

type KafkaSyncClient struct {
	// contains filtered or unexported fields
}

func NewSyncProducterClient

func NewSyncProducterClient(conf KafkaProductConfig) (*KafkaSyncClient, error)

func (*KafkaSyncClient) Close

func (kc *KafkaSyncClient) Close() error

func (*KafkaSyncClient) Send

func (ksc *KafkaSyncClient) Send(ctx context.Context, message *ProducerMessage) (int32, int64, error)

Send message to kafka cluster, ctx is http/rpc context headers rfs = https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

func (*KafkaSyncClient) SendSyncMsg

func (ksc *KafkaSyncClient) SendSyncMsg(topic, key string, msg []byte) (int32, int64, error)

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

type ProducerMessage

type ProducerMessage struct {
	Topic string // The Kafka topic for this message.
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key string
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value []byte

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
	// Timestamp is the timestamp assigned to the message by the broker. This
	// is only guaranteed to be defined if the message was successfully
	// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
	// least version 0.10.0.
	Timestamp time.Time

	// MessageID
	MessageID string
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

type SendResponse

type SendResponse struct {
	Partition int32
	Offset    int64
	Err       error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL