producer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReplicationConfigDir = "/proxy/replication"
)

Variables

This section is empty.

Functions

func GetFnvHash

func GetFnvHash(key string) int32

func NewBatchReplicator

func NewBatchReplicator(r *replicator, batchSize int) *batchReplicator

func NewReplicator

func NewReplicator(conf *config.ReplicationConfig, kafka *config.KafkaConfig) (*replicator, error)

Types

type MetaMessageType

type MetaMessageType byte
const (
	MetaStart MetaMessageType = iota
	MetaStop
)

type ProduceResponse

type ProduceResponse struct {
	Partition int32 `json:"partition"`
	Offset    int64 `json:"offset"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conf *config.Config) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) GetNumPartitions

func (p *Producer) GetNumPartitions(topic string) (int32, error)

func (*Producer) ProduceMessageWithReplication

func (p *Producer) ProduceMessageWithReplication(topic, key, value string, partition int32, partitionMethod string) (*ProduceResponse, error)

func (*Producer) ProduceMessageWithoutReplication

func (p *Producer) ProduceMessageWithoutReplication(topic, key, value string, partition int32) (*ProduceResponse, error)

func (*Producer) SelectPartition

func (p *Producer) SelectPartition(topic, key, partitionMethod string) (partition int32, err error)

type ReplicatedMessage

type ReplicatedMessage struct {
	BatchId       string
	MetaMessage   *ReplicationMetaMessage
	SrcDatacenter string
	DstDatacenter string // Used by airbus
	MsgID         string

	Topic           string
	Key             []byte // Q: Why using []byte instead of string ?
	Value           []byte // A: string when marshaled is required to be UTF-8, but []byte supports binary
	Partition       int32
	PartitionMethod string
}

type ReplicationMetaMessage

type ReplicationMetaMessage struct {
	CheckSum string // md5, checksum of all messages between meta START message and STOP message
	Total    int    // batch size
	Type     MetaMessageType
}

type Replicator

type Replicator interface {
	Setup(producer *Producer)
	Close()
	Replicate(msg *ReplicatedMessage) error
	Allow(topic string) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL