connection

package
v0.0.0-...-7bbab05 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Index

Constants

View Source
const CustomURLKey = "__KEY_NAME__"

CustomURLKey place holder name, which will be replaced by kafka key

Variables

This section is empty.

Functions

func GetKafkaMsgHasher

func GetKafkaMsgHasher() core.Hasher

GetKafkaMsgHasher is Gloab function to get instance of KafkaMsgHasher

Types

type KafkaFoxtrotConn

type KafkaFoxtrotConn struct {
	EnableDebugLog bool
	Conf           interface{}
}

KafkaFoxtrotConn struct to abstract this connections Run

func (*KafkaFoxtrotConn) Run

func (c *KafkaFoxtrotConn) Run()

Run method to start this Connection from source to sink

type KafkaFoxtrotConnConfig

type KafkaFoxtrotConnConfig struct {
	KafkaHTTPConnConfig
}

KafkaFoxtrotConnConfig holds config to connect KafkaSource to http_sink

type KafkaFoxtrotMessage

type KafkaFoxtrotMessage struct {
	KafkaMessage
}

KafkaFoxtrotMessage is data attribute that will be passed from Source to Sink

func (*KafkaFoxtrotMessage) BatchPayload

func (k *KafkaFoxtrotMessage) BatchPayload(msgs []interface{}, version int) []byte

BatchPayload implements HTTPMsg for HttpSink processing

func (*KafkaFoxtrotMessage) BatchURL

func (k *KafkaFoxtrotMessage) BatchURL(msgs []interface{}, endpoint string, version int) string

BatchURL implements HTTPMsg for HttpSink processing This implementation passes in query parameter partition and offset for debuggin

func (*KafkaFoxtrotMessage) GetDebugPath

func (k *KafkaFoxtrotMessage) GetDebugPath() string

func (*KafkaFoxtrotMessage) GetHeaders

func (k *KafkaFoxtrotMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string

GetHeaders implements HTTPMsg for HttpSink processing

func (*KafkaFoxtrotMessage) GetPayload

func (k *KafkaFoxtrotMessage) GetPayload() []byte

GetPayload implements HTTPMsg for HttpSink processing

func (*KafkaFoxtrotMessage) GetURL

func (k *KafkaFoxtrotMessage) GetURL(endpoint string) string

GetURL implements HTTPMsg for HttpSink processing This implementation passes in query parameter partition and offset for debuggin

type KafkaHTTPConn

type KafkaHTTPConn struct {
	EnableDebugLog bool
	Conf           interface{}
	SidelineImpl   interface{}
}

KafkaHTTPConn struct to abstract this connections Run

func (*KafkaHTTPConn) Run

func (c *KafkaHTTPConn) Run()

Run method to start this Connection from source to sink

type KafkaHTTPConnConfig

type KafkaHTTPConnConfig struct {
	Dmux          core.DmuxConf                 `json:"dmux"`
	Source        source.KafkaConf              `json:"source"`
	Sink          sink.HTTPSinkConf             `json:"sink"`
	PendingAcks   int                           `json:"pending_acks"`
	OffsetMonitor offset_monitor.OffMonitorConf `json:"offset_monitor"`
}

KafkaHTTPConnConfig holds config to connect KafkaSource to http_sink

type KafkaMessage

type KafkaMessage struct {
	Msg       *sarama.ConsumerMessage
	Processed bool   //marker to know once this message has been processed by Sink
	Sidelined bool   // marker to know if the message gets sideliend
	URL       string // added to avoid GetURLPath to repeate concat during logging
}

KafkaMessage is data attribute that will be passed from Source to Sink

func (*KafkaMessage) BatchPayload

func (k *KafkaMessage) BatchPayload(msgs []interface{}, version int) []byte

BatchPayload implements HTTPMsg for HttpSink processing

func (*KafkaMessage) BatchURL

func (k *KafkaMessage) BatchURL(msgs []interface{}, endpoint string, version int) string

BatchURL implements HTTPMsg for HttpSink processing

func (*KafkaMessage) GetDebugPath

func (k *KafkaMessage) GetDebugPath() string

func (*KafkaMessage) GetHeaders

func (k *KafkaMessage) GetHeaders(conf sink.HTTPSinkConf) map[string]string

GetHeaders implements HTTPMsg for HttpSink processing

func (*KafkaMessage) GetPayload

func (k *KafkaMessage) GetPayload() []byte

GetPayload implements HTTPMsg for HttpSink processing

func (*KafkaMessage) GetRawMsg

func (k *KafkaMessage) GetRawMsg() *sarama.ConsumerMessage

GetRawMsg returns saram.ConsumerMessage under KafkaMessage

func (*KafkaMessage) GetURL

func (k *KafkaMessage) GetURL(endpoint string) string

GetURL implements HTTPMsg for HttpSink processing

func (*KafkaMessage) IsProcessed

func (k *KafkaMessage) IsProcessed() bool

IsProcessed returns true if KafkaMessage was MarkDone

func (*KafkaMessage) MarkDone

func (k *KafkaMessage) MarkDone()

MarkDone the KafkaMessage as processed

type KafkaMsgHasher

type KafkaMsgHasher struct{}

KafkaMsgHasher implements hasher a hash logic implementation of KafkaMessage. This runs consistenHashin on Key of KafkaKeyedMessage

func (*KafkaMsgHasher) ComputeHash

func (o *KafkaMsgHasher) ComputeHash(data interface{}) int

ComputeHash method for KafkaMessage

type KafkaOffsetHook

type KafkaOffsetHook struct {
	// contains filtered or unexported fields
}

KafkaOffsetHook implments HTTPSinkHook amd KafkaSourceHook interface to track kafka offsets

func GetKafkaHook

func GetKafkaHook(offsetTracker source.OffsetTracker, enableDebugLog bool) *KafkaOffsetHook

GetKafkaHook is a global function that returns instance of KafkaOffsetHook

func (*KafkaOffsetHook) PostHTTPCall

func (h *KafkaOffsetHook) PostHTTPCall(msg interface{}, success bool)

PostHTTPCall is invoked - after HttpSink execution. This implementation calls KafkaMessage MarkDone method on the data argument of Post, to mark this message and sucessfuly processed.

func (*KafkaOffsetHook) Pre

func (h *KafkaOffsetHook) Pre(data source.KafkaMsg)

Pre is invoked - before KafaSource pushes message to DMux. This implementation invokes OffsetTracker TrackMe method here, to ensure the Message to track is queued before its execution

func (*KafkaOffsetHook) PreHTTPCall

func (h *KafkaOffsetHook) PreHTTPCall(msg interface{})

PreHTTPCall is invoked - before HttpSink exection.

type PulsarConn

type PulsarConn struct {
	EnableDebugLog bool
	Conf           interface{}
}

PulsarConn abstracts connection

func (*PulsarConn) Run

func (c *PulsarConn) Run()

Run starts connection from source to sink

type PulsarConnConfig

type PulsarConnConfig struct {
	Dmux        core.DmuxConf     `json:"dmux"`
	Source      source.PulsarConf `json:"source"`
	Sink        sink.HTTPSinkConf `json:"sink"`
	PendingAcks int               `json:"pending_acks"`
}

PulsarConnConfig holds config to connect pulsar source to http sink

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL