cKafka

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2024 License: MIT Imports: 15 Imported by: 1

Documentation

Index

Constants

View Source
const (
	CONSUMER_OFFSET_OLDEST = sarama.OffsetNewest
	CONSUMER_OFFSET_NEWEST = sarama.OffsetOldest
)
View Source
const KafkaConfigName = "KafkaConf"

Variables

View Source
var Component = &KafkaComponent{}

Functions

func DefaultConsumer

func DefaultConsumer() (instance *consumer, err error)

func DefaultProducer

func DefaultProducer() (instance *producer, err error)

func GetAsyncProducer

func GetAsyncProducer(name string) (instance *producer, err error)

func GetConsumer

func GetConsumer(name string, typ ConsumerType) (instance *consumer, err error)

func GetGroupConsumer

func GetGroupConsumer(name string) (instance *consumer, err error)

func GetPartitionConsumer

func GetPartitionConsumer(name string) (instance *consumer, err error)

func GetProducer

func GetProducer(name string, typ ProducerType) (instance *producer, err error)

GetProducer 获取生产者

func GetSyncProducer

func GetSyncProducer(name string) (instance *producer, err error)

func NewConsumer

func NewConsumer(option ConsumerOption) (*consumer, error)

func NewProducer

func NewProducer(option ProducerOption) (*producer, error)

Types

type ConsumerOption

type ConsumerOption struct {
	ID         string
	Type       ConsumerType
	Config     *sarama.Config
	Brokers    []string
	Topic      string
	Group      string
	Partitions []int32
	Offset     int64
	Size       int64
	Wait       int64
	Multi      bool
}

func (*ConsumerOption) GetID

func (i *ConsumerOption) GetID() (clientId, consumerId string)

type ConsumerType

type ConsumerType string
const (
	CONSUMER_TYPE_PARTITION ConsumerType = "partition"
	CONSUMER_TYPE_GROUP     ConsumerType = "group"
)

type DefaultGroupHandler

type DefaultGroupHandler struct {
	Messages chan *sarama.ConsumerMessage
}

func (*DefaultGroupHandler) Cleanup

func (*DefaultGroupHandler) ConsumeClaim

func (*DefaultGroupHandler) Setup

type KafkaComponent added in v0.3.2

type KafkaComponent struct{}

func (*KafkaComponent) Inject added in v0.3.3

func (i *KafkaComponent) Inject(instance any) bool

func (*KafkaComponent) InjectConf added in v0.4.0

func (i *KafkaComponent) InjectConf(config cComponents.ConfigInterface) bool

func (*KafkaComponent) Listen added in v0.4.0

func (i *KafkaComponent) Listen() []*cComponents.ConfigListener

func (*KafkaComponent) Load added in v0.3.3

func (i *KafkaComponent) Load()

type KafkaConf

type KafkaConf struct {
	Connections map[string]*KafkaConf_Connection `json:"connections"`
	HealthCheck int64                            `json:"health_check"`
}

func (*KafkaConf) ConfigName

func (i *KafkaConf) ConfigName() string

type KafkaConf_Connection

type KafkaConf_Connection struct {
	Brokers  []string                       `json:"brokers"`
	Topic    string                         `json:"topic"`
	Net      *KafkaConf_Connection_Net      `json:"net"`
	Producer *KafkaConf_Connection_Producer `json:"producer"`
	Consumer *KafkaConf_Connection_Consumer `json:"consumer"`
	Version  [4]uint                        `json:"version"`
}

type KafkaConf_Connection_Consumer

type KafkaConf_Connection_Consumer struct {
	Hearbeat   int64   `json:"hearbeat"`
	Offset     int64   `json:"offset"`
	Partitions []int32 `json:"partitions"`
	Group      string  `json:"group"`
	Size       int64   `json:"size"`
	Wait       int64   `json:"wait"`
	Multi      bool    `json:"multi"`
}

type KafkaConf_Connection_Net

type KafkaConf_Connection_Net struct {
	MaxOpenRequests int                           `json:"max_open_requests"`
	Keepalive       int64                         `json:"keepalive"`
	DialTimeout     int64                         `json:"dial_timeout"`
	ReadTimeout     int64                         `json:"read_timeout"`
	WriteTimeout    int64                         `json:"write_timeout"`
	TLS             *KafkaConf_Connection_Net_TLS `json:"tls"`
}

type KafkaConf_Connection_Net_TLS

type KafkaConf_Connection_Net_TLS struct {
	Enable bool   `json:"enable"`
	Skip   bool   `json:"skip"`
	Cert   string `json:"cert"`
	Key    string `json:"key"`
}

type KafkaConf_Connection_Producer

type KafkaConf_Connection_Producer struct {
	MaxMessageBytes int                 `json:"max_message_bytes"`
	RequiredAck     bool                `json:"required_ack"`
	Partitioner     ProducerPartitioner `json:"partitioner"`
	Partition       int32               `json:"partition"`
}

type KafkaOperatorInterface

type KafkaOperatorInterface interface {
	ID() string
	Close(ctx *gin.Context)
	Delete()
}

type ProducerMessage

type ProducerMessage struct {
	Key     string
	Value   string
	Headers []sarama.RecordHeader
}

type ProducerOption

type ProducerOption struct {
	ID        string
	Type      ProducerType
	Config    *sarama.Config
	Brokers   []string
	Topic     string
	Partition int32
}

func (*ProducerOption) GetID

func (i *ProducerOption) GetID() (clientId, producerId string)

type ProducerPartitioner

type ProducerPartitioner string
const (
	PRODUCER_PARTITIONER_HASH   ProducerPartitioner = "hash"
	PRODUCER_PARTITIONER_RANDOM ProducerPartitioner = "random"
	PRODUCER_PARTITIONER_MANUAL ProducerPartitioner = "manual"
)

type ProducerType

type ProducerType string
const (
	PRODUCER_TYPE_SYNC  ProducerType = "sync"
	PRODUCER_TYPE_ASYNC ProducerType = "async"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL