kafka

package
v1.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2024 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventBlockGroup = "blockgroup"
	EventTraceGroup = "tracegroup"
)
View Source
const (
	DefaultReplicas             = 1
	DefaultPartitions           = 1
	DefaultTopicEnvironmentName = "local"
	DefaultTopicResourceName    = "en-0"
	DefaultMaxMessageBytes      = 1000000
	DefaultRequiredAcks         = 1
	DefaultSegmentSizeBytes     = 1000000 // 1 MB
	DefaultMaxMessageNumber     = 100     // max number of messages in buffer
	DefaultKafkaMessageVersion  = MsgVersion1_0
	DefaultProducerIdPrefix     = "producer-"
	DefaultExpirationTime       = time.Duration(0)
)
View Source
const (
	// item indices of message header
	MsgHeaderTotalSegments = iota
	MsgHeaderSegmentIdx
	MsgHeaderVersion
	MsgHeaderProducerId
	MsgHeaderLength
)
View Source
const (
	KeyTotalSegments = "totalSegments"
	KeySegmentIdx    = "segmentIdx"
	KeyVersion       = "version"
	KeyProducerId    = "producerId"
)
View Source
const LegacyMsgHeaderLength = 2
View Source
const (
	MsgVersion1_0 = "1.0"
)

Variables

View Source
var (
	DefaultSetup   = func(s sarama.ConsumerGroupSession) error { return nil }
	DefaultCleanup = func(s sarama.ConsumerGroupSession) error { return nil }
)
View Source
var Logger sarama.StdLogger = log.New(os.Stdout, "[Chaindatafetcher] ", log.LstdFlags)

Logger is the instance of a sarama.StdLogger interface that chaindatafetcher leaves the SDK level information. By default it is set to print all log messages as standard output, but you can set it to redirect wherever you want.

Functions

func GetDefaultProducerId added in v1.6.3

func GetDefaultProducerId() string

func NewRepository

func NewRepository(config *KafkaConfig) (*repository, error)

Types

type CheckpointDB

type CheckpointDB struct {
	// contains filtered or unexported fields
}

func NewCheckpointDB

func NewCheckpointDB() *CheckpointDB

func (*CheckpointDB) ReadCheckpoint

func (db *CheckpointDB) ReadCheckpoint() (int64, error)

func (*CheckpointDB) SetComponent

func (db *CheckpointDB) SetComponent(component interface{})

func (*CheckpointDB) WriteCheckpoint

func (db *CheckpointDB) WriteCheckpoint(checkpoint int64) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a reference structure to subscribe block or trace group produced by EN.

func NewConsumer

func NewConsumer(config *KafkaConfig, groupId string) (*Consumer, error)

func (*Consumer) AddTopicAndHandler

func (c *Consumer) AddTopicAndHandler(event string, handler TopicHandler) error

AddTopicAndHandler adds a topic associated the given event and its handler function to consume published messages of the topic.

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(s sarama.ConsumerGroupSession) error

Cleanup is called at the end of a session, once all ConsumeClaim goroutines have exited but before the offsets are committed for the very last time.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the ConsumerGroup and detaches any running sessions. It is required to call this function before the object passes out of scope, as it will otherwise leak memory.

func (*Consumer) ConsumeClaim

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

func (*Consumer) Setup

Setup is called at the beginning of a new session, before ConsumeClaim.

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context) error

Subscribe subscribes the registered topics with the handlers until the consumer is closed.

type ConsumerGroupSession

type ConsumerGroupSession interface {
	MarkOffset(topic string, partition int32, offset int64, metadata string)
	MarkMessage(msg *sarama.ConsumerMessage, metadata string)
}

ConsumerGroupSession is for mocking sarama.ConsumerGroupSession for better testing.

type IKey

type IKey interface {
	Key() string
}

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka connects to the brokers in an existing kafka cluster.

func NewKafka

func NewKafka(conf *KafkaConfig) (*Kafka, error)

func (*Kafka) Close

func (k *Kafka) Close()

func (*Kafka) CreateTopic

func (k *Kafka) CreateTopic(topic string) error

func (*Kafka) DeleteTopic

func (k *Kafka) DeleteTopic(topic string) error

func (*Kafka) ListTopics

func (k *Kafka) ListTopics() (map[string]sarama.TopicDetail, error)

func (*Kafka) Publish

func (k *Kafka) Publish(topic string, data interface{}) error

type KafkaConfig

type KafkaConfig struct {
	SaramaConfig         *sarama.Config `json:"-"` // kafka client configurations.
	MsgVersion           string         // MsgVersion is the version of Kafka message.
	ProducerId           string         // ProducerId is for the identification of the message publisher.
	Brokers              []string       // Brokers is a list of broker URLs.
	TopicEnvironmentName string
	TopicResourceName    string
	Partitions           int32 // Partitions is the number of partitions of a topic.
	Replicas             int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster.
	SegmentSizeBytes     int   // SegmentSizeBytes is the size of kafka message segment
	// (number of partitions) * (average size of segments) * buffer size should not be greater than memory size.
	// default max number of messages is 100
	MaxMessageNumber int // MaxMessageNumber is the maximum number of consumer messages.

	ExpirationTime time.Duration
	ErrCallback    func(string) error
	Setup          func(s sarama.ConsumerGroupSession) error
	Cleanup        func(s sarama.ConsumerGroupSession) error
}

func GetDefaultKafkaConfig

func GetDefaultKafkaConfig() *KafkaConfig

func (*KafkaConfig) GetTopicName

func (c *KafkaConfig) GetTopicName(event string) string

func (*KafkaConfig) String

func (c *KafkaConfig) String() string

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment represents a message segment with the parsed headers.

func (*Segment) String

func (s *Segment) String() string

type TopicHandler

type TopicHandler func(message *sarama.ConsumerMessage) error

TopicHandler is a handler function in order to consume published messages.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL