kafka

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Name     = "kafka"
	Version  = "v0.0.0"
	CommitID = ""
)
View Source
var DefaultReceiverConfig = ReceiverConfig{
	Brokers:            "localhost:9092",
	Topic:              "quickstart-events",
	GroupId:            "",
	Username:           "",
	Password:           "",
	CACert:             "",
	AccessCert:         "",
	AccessKey:          "",
	Version:            "",
	CommitInterval:     pointer.Int(1),
	ChannelBufferSize:  pointer.Int(0),
	TracePayloadOnNack: pointer.Bool(false),
}
View Source
var DefaultSenderConfig = SenderConfig{
	Brokers:             "localhost:9092",
	Topic:               "quickstart-events",
	Partition:           pointer.Int(-1),
	ChannelBufferSize:   pointer.Int(0),
	Username:            "",
	Password:            "",
	CACert:              "",
	AccessCert:          "",
	AccessKey:           "",
	Version:             "",
	SenderPoolSize:      pointer.Int(1),
	PartitionPath:       "",
	DynamicMetricLabels: make([]DynamicMetricLabel, 0),
}

Functions

func NewManualHashPartitioner

func NewManualHashPartitioner(topic string) sarama.Partitioner

func NewPlugin

func NewPlugin() (*pkgplugin.Plugin, error)

func NewPluginVersion

func NewPluginVersion(name string, version string, commitID string) (*pkgplugin.Plugin, error)

func NewReceiver

func NewReceiver(tid tenant.Id, plugin string, name string, config interface{}, secrets secret.Vault, tableSyncer syncer.DeltaSyncer) (receiver.Receiver, error)

func NewSender

func NewSender(tid tenant.Id, plugin string, name string, config interface{}, secrets secret.Vault, tableSyncer syncer.DeltaSyncer) (sender.Sender, error)

Types

type DynamicMetricLabel added in v0.4.0

type DynamicMetricLabel struct {
	Label string `json:"label,omitempty"`
	Path  string `json:"path,omitempty"`
}

type DynamicMetricValue added in v0.4.0

type DynamicMetricValue struct {
	Label string `json:"label,omitempty"`
	Value string `json:"value,omitempty"`
}

type ManualHashPartitioner

type ManualHashPartitioner struct {
	sarama.Partitioner
}

func (*ManualHashPartitioner) Partition

func (mp *ManualHashPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close(ctx context.Context)

func (*Producer) Partitions

func (p *Producer) Partitions(topic string) ([]int32, error)

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, topic string, partition int, headers map[string]string, bs []byte, e event.Event) error

type Receiver

type Receiver struct {
	sync.Mutex

	sarama.ConsumerGroupSession
	// contains filtered or unexported fields
}

func (*Receiver) Cleanup

func (r *Receiver) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Receiver) Close

func (r *Receiver) Close()

func (*Receiver) Config added in v0.2.1

func (r *Receiver) Config() interface{}

func (*Receiver) ConsumeClaim

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Receiver) EventErrorCount added in v1.1.2

func (r *Receiver) EventErrorCount() int

func (*Receiver) EventErrorVelocity added in v1.1.2

func (r *Receiver) EventErrorVelocity() int

func (*Receiver) EventSuccessCount added in v1.1.2

func (r *Receiver) EventSuccessCount() int

func (*Receiver) EventSuccessVelocity added in v1.1.2

func (r *Receiver) EventSuccessVelocity() int

func (*Receiver) EventTs added in v1.1.2

func (r *Receiver) EventTs() int64

func (*Receiver) Hash added in v1.1.2

func (r *Receiver) Hash() string

func (*Receiver) LogSuccess added in v1.1.2

func (r *Receiver) LogSuccess()

func (*Receiver) Name added in v0.2.1

func (r *Receiver) Name() string

func (*Receiver) Plugin added in v0.2.1

func (r *Receiver) Plugin() string

func (*Receiver) Receive

func (r *Receiver) Receive(next receiver.NextFn) error

func (*Receiver) Setup

func (r *Receiver) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Receiver) Start

func (r *Receiver) Start(handler func(*sarama.ConsumerMessage) bool)

func (*Receiver) StopReceiving

func (r *Receiver) StopReceiving(ctx context.Context) error

func (*Receiver) Tenant added in v0.3.0

func (r *Receiver) Tenant() tenant.Id

func (*Receiver) Trigger

func (r *Receiver) Trigger(e event.Event)

type ReceiverConfig

type ReceiverConfig struct {
	Brokers             string `json:"brokers,omitempty"`
	Topic               string `json:"topic,omitempty"`
	GroupId             string `json:"groupId,omitempty"`
	Username            string `json:"username,omitempty"` // yaml
	Password            string `json:"password,omitempty"`
	CACert              string `json:"caCert,omitempty"`
	AccessCert          string `json:"accessCert,omitempty"`
	AccessKey           string `json:"accessKey,omitempty"`
	Version             string `json:"version,omitempty"`
	CommitInterval      *int   `json:"commitInterval,omitempty"`
	ChannelBufferSize   *int   `json:"channelBufferSize,omitempty"`
	ConsumeByPartitions bool   `json:"consumeByPartitions,omitempty"`
	TLSEnable           bool   `json:"tlsEnable,omitempty"`
	TracePayloadOnNack  *bool  `json:"tracePayloadOnNack,omitempty"`
}

func (*ReceiverConfig) Validate

func (rc *ReceiverConfig) Validate() error

Validate returns an error upon validation failure

func (*ReceiverConfig) WithDefaults

func (rc *ReceiverConfig) WithDefaults() ReceiverConfig

WithDefaults returns a new config object that has all of the unset (nil) values filled in.

type Sender

type Sender struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Sender) Config added in v0.2.1

func (s *Sender) Config() interface{}

func (*Sender) Count

func (s *Sender) Count() int

func (*Sender) EventErrorCount added in v1.1.2

func (s *Sender) EventErrorCount() int

func (*Sender) EventErrorVelocity added in v1.1.2

func (s *Sender) EventErrorVelocity() int

func (*Sender) EventSuccessCount added in v1.1.2

func (s *Sender) EventSuccessCount() int

func (*Sender) EventSuccessVelocity added in v1.1.2

func (s *Sender) EventSuccessVelocity() int

func (*Sender) EventTs added in v1.1.2

func (s *Sender) EventTs() int64

func (*Sender) Hash added in v1.1.2

func (s *Sender) Hash() string

func (*Sender) Name added in v0.2.1

func (s *Sender) Name() string

func (*Sender) NewProducer

func (s *Sender) NewProducer(count int) (*Producer, error)

func (*Sender) NewSyncProducers

func (s *Sender) NewSyncProducers(count int) ([]sarama.SyncProducer, sarama.Client, error)

func (*Sender) Plugin added in v0.2.1

func (s *Sender) Plugin() string

func (*Sender) Send

func (s *Sender) Send(e event.Event)

func (*Sender) StopSending

func (s *Sender) StopSending(ctx context.Context)

func (*Sender) Tenant added in v0.3.0

func (s *Sender) Tenant() tenant.Id

func (*Sender) Unwrap

func (s *Sender) Unwrap() sender.Sender

type SenderConfig

type SenderConfig struct {
	Brokers             string               `json:"brokers,omitempty"`
	Topic               string               `json:"topic,omitempty"`
	Partition           *int                 `json:"partition,omitempty"`
	PartitionPath       string               `json:"partitionPath,omitempty"` // if path is set, look up partition from event rather than using the hard coded partition id
	Username            string               `json:"username,omitempty"`
	Password            string               `json:"password,omitempty"`
	CACert              string               `json:"caCert,omitempty"`
	AccessCert          string               `json:"accessCert,omitempty"`
	AccessKey           string               `json:"accessKey,omitempty"`
	Version             string               `json:"version,omitempty"`
	ChannelBufferSize   *int                 `json:"channelBufferSize,omitempty"`
	TLSEnable           bool                 `json:"tlsEnable,omitempty"`
	SenderPoolSize      *int                 `json:"senderPoolSize,omitempty"`
	DynamicMetricLabels []DynamicMetricLabel `json:"dynamicMetricLabel,omitempty"`
	CompressionMethod   string               `json:"compressionMethod,omitempty"`
	CompressionLevel    *int                 `json:"compressionLevel,omitempty"`
}

SenderConfig can be passed into NewSender() in order to configure the behavior of the sender.

func (*SenderConfig) Validate

func (sc *SenderConfig) Validate() error

Validate

func (SenderConfig) WithDefaults

func (sc SenderConfig) WithDefaults() SenderConfig

WithDefaults

type SenderMetrics added in v0.4.0

type SenderMetrics struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL