tbkafka

package
v0.0.0-...-1256235 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	OffsetNewest = sarama.OffsetNewest
	OffsetOldest = sarama.OffsetOldest
)

Functions

func TbMessage2OttoValue

func TbMessage2OttoValue(msg *CGMessage) (otto.Value, error)

Types

type CGMessage

type CGMessage struct {
	Topic     string
	Group     string
	Partition int32
	Offset    int64
	Payload   string
}

func CGMessage2TbCGMessage

func CGMessage2TbCGMessage(cgmsg *ConsumerGroupMessage) (*CGMessage, error)

type CompressionType

type CompressionType int8
const (
	CompressionNone   CompressionType = 0
	CompressionGZIP   CompressionType = 1
	CompressionSnappy CompressionType = 2
)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(addrs []string, options ...ConsumerOption) (*Consumer, error)

func (*Consumer) Add

func (c *Consumer) Add(topic string, partitions []int, offset int64) error

func (*Consumer) Fini

func (c *Consumer) Fini()

func (*Consumer) Output

func (c *Consumer) Output() <-chan *ConsumerMessage

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(addrs []string, options ...ConsumerGroupOption) (*ConsumerGroup, error)

func (*ConsumerGroup) Add

func (cg *ConsumerGroup) Add(topic string, group string,
	handlers ...func(*ConsumerGroupMessage)) error

func (*ConsumerGroup) Del

func (cg *ConsumerGroup) Del(topic string, group string) error

func (*ConsumerGroup) Fini

func (cg *ConsumerGroup) Fini()

func (*ConsumerGroup) Output

func (cg *ConsumerGroup) Output() <-chan *ConsumerGroupMessage

type ConsumerGroupMessage

type ConsumerGroupMessage struct {
	Topic         string
	ConsumerGroup string
	Partition     int32
	Offset        int64
	Payload       []byte
	Error         error
}

type ConsumerGroupOption

type ConsumerGroupOption func(*ConsumerGroup) error

func OptionConsumerGroupFailedCh

func OptionConsumerGroupFailedCh(ch chan<- *ConsumerGroupMessage) ConsumerGroupOption

func OptionConsumerGroupHeartbeatInterval

func OptionConsumerGroupHeartbeatInterval(d time.Duration) ConsumerGroupOption

func OptionConsumerGroupOffsetsInitial

func OptionConsumerGroupOffsetsInitial(initial int64) ConsumerGroupOption

func OptionConsumerGroupSessionTimeout

func OptionConsumerGroupSessionTimeout(d time.Duration) ConsumerGroupOption

func OptionConsumerGroupVersion

func OptionConsumerGroupVersion(version string) ConsumerGroupOption

0.10.2.1 2.8.0.0

type ConsumerMessage

type ConsumerMessage struct {
	Headers   []*sarama.RecordHeader // only set if kafka is version 0.11+
	Topic     string
	Partition int32
	Offset    int64
	Payload   []byte
	Error     error
}

type ConsumerOption

type ConsumerOption func(*Consumer) error

func OptionConsumerFailedCh

func OptionConsumerFailedCh(ch chan<- *ConsumerMessage) ConsumerOption

func OptionConsumerNetTLS

func OptionConsumerNetTLS(config *tls.Config) ConsumerOption

func OptionConsumerSasl

func OptionConsumerSasl(user, passwd string) ConsumerOption

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

* Producer结束应该调用Fini,这样会等待本模块所有可能block * 的函数结束才返回。 *

func NewProducer

func NewProducer(addrs []string, options ...ProducerOption) (*Producer, error)

func (*Producer) Fini

func (p *Producer) Fini()

func (*Producer) Input

func (p *Producer) Input() chan<- *ProducerMessage

type ProducerMessage

type ProducerMessage struct {
	//topic
	Topic string
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key sarama.Encoder
	//Payload to produce.
	Payload []byte
	// The headers are key-value pairs that are transparently passed
	// by Kafka between producers and consumers.
	Headers []sarama.RecordHeader
	//Custom data which will be enqueued FailedQueue if failed,
	//or be enqueued SucceedQueue if succeed.
	Custom interface{}
	//Partition specified by user when OptionManualPartition
	//was set, or returned by broker.
	Partition int32
	//Offset returned by broker.
	Offset int64
	//Error will be set while showing up in FailedQueue.
	Error error
}

用户消息

func OttoValue2PMessage

func OttoValue2PMessage(msg otto.Value) (*ProducerMessage, error)
{
	"Topic": "foo",
	"Payload": "bar"
}

type ProducerOption

type ProducerOption func(*Producer) error

func OptionCompression

func OptionCompression(compressionType CompressionType) ProducerOption

func OptionFailedCh

func OptionFailedCh(ch chan<- *ProducerMessage) ProducerOption

func OptionFlushBytes

func OptionFlushBytes(size int) ProducerOption

func OptionFlushFrequency

func OptionFlushFrequency(frequency time.Duration) ProducerOption

func OptionFlushMaxMessages

func OptionFlushMaxMessages(count int) ProducerOption

func OptionFlushMessages

func OptionFlushMessages(count int) ProducerOption

func OptionManualPartition

func OptionManualPartition() ProducerOption

func OptionMaxMessageBytes

func OptionMaxMessageBytes(size int) ProducerOption

func OptionNetTLS

func OptionNetTLS(config *tls.Config) ProducerOption

func OptionProducerConcu

func OptionProducerConcu(queue uint64, concu uint32) ProducerOption

用户队列和分派并发

func OptionProducerUnshare

func OptionProducerUnshare() ProducerOption

每个topic使用独立的连接

func OptionRequiredAcks

func OptionRequiredAcks(requireType RequireType) ProducerOption

func OptionRetryBackoff

func OptionRetryBackoff(backoff time.Duration) ProducerOption

func OptionRetryMax

func OptionRetryMax(retry int) ProducerOption

func OptionSasl

func OptionSasl(user, passwd string) ProducerOption

func OptionSucceedCh

func OptionSucceedCh(ch chan<- *ProducerMessage) ProducerOption

如果库消息发送成功,则把该消息回写ch

func OptionTimeout

func OptionTimeout(timeout time.Duration) ProducerOption

func OptionTopic

func OptionTopic(topic string, queue uint64, concu uint32) ProducerOption

每个topic配置的读队列和并发

func OptionVersion

func OptionVersion(version sarama.KafkaVersion) ProducerOption

type RequireType

type RequireType int8
const (
	RequireNoResponse  RequireType = 0
	RequireOnlyLeader  RequireType = 1
	RequireAllReplicas RequireType = -1
)

type TbProducer

type TbProducer struct {
	// contains filtered or unexported fields
}

func NewTbProducer

func NewTbProducer() (*TbProducer, error)

func (*TbProducer) Produce

func (tbproducer *TbProducer) Produce(call otto.FunctionCall) otto.Value

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL