pulsar

package
v0.0.0-...-7bbab05 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const CustomURLKey = "__KEY_NAME__"

CustomURLKey place holder name, which will be replaced by kafka key

Variables

This section is empty.

Functions

func GetMessageHasher

func GetMessageHasher() core.Hasher

Types

type CursorHook

type CursorHook struct {
	// contains filtered or unexported fields
}

func GetPulsarHook

func GetPulsarHook(tracker PulsarCursorTracker, enableDebugLog bool) *CursorHook

func (*CursorHook) PostHTTPCall

func (h *CursorHook) PostHTTPCall(msg interface{}, success bool)

PostHTTPCall is invoked - after HttpSink execution. This implementation calls KafkaMessage MarkDone method on the data argument of Post, to mark this message and successfully processed.

func (*CursorHook) Pre

func (h *CursorHook) Pre(p MessageProcessor)

Pre is invoked - before pulsar source pushes message to DMux. This implementation invokes CursorTracker TrackMe method here, to track Message is queued before its execution

func (*CursorHook) PreHTTPCall

func (h *CursorHook) PreHTTPCall(msg interface{})

PreHTTPCall is invoked - before HttpSink execution.

type CursorTracker

type CursorTracker struct {
	// contains filtered or unexported fields
}

func (*CursorTracker) TrackMe

func (t *CursorTracker) TrackMe(msg MessageProcessor)

type Message

type Message struct {
	Msg       *pulsar.ConsumerMessage
	Processed bool
	Sidelined bool
}

Message is a container of message and it's state and implements HTTPMessage

func (*Message) BatchPayload

func (m *Message) BatchPayload(msgs []interface{}, version int) []byte

BatchPayload implements HTTPMsg interface

func (*Message) BatchURL

func (m *Message) BatchURL(msgs []interface{}, endpoint string, version int) string

BatchURL implements HTTPMsg interface

func (*Message) GetDebugPath

func (m *Message) GetDebugPath() string

GetDebugPath implements HTTPMsg interface

func (*Message) GetHeaders

func (m *Message) GetHeaders(conf sink.HTTPSinkConf) map[string]string

GetHeaders implements HTTPMsg interface

func (*Message) GetPayload

func (m *Message) GetPayload() []byte

func (*Message) GetRawMsg

func (m *Message) GetRawMsg() *pulsar.ConsumerMessage

func (*Message) GetURL

func (m *Message) GetURL(endpoint string) string

GetURL implements HTTPMsg interface

func (*Message) IsProcessed

func (m *Message) IsProcessed() bool

func (*Message) MarkDone

func (m *Message) MarkDone()

type MessageHasher

type MessageHasher struct {
}

func (*MessageHasher) ComputeHash

func (m *MessageHasher) ComputeHash(data interface{}) int

type MessageProcessor

type MessageProcessor interface {
	MarkDone()
	GetRawMsg() *pulsar.ConsumerMessage
	IsProcessed() bool
}

MessageProcessor is an interface to update Message

type PulsarConf

type PulsarConf struct {
	SubscriptionName string `json:"name"`
	Url              string `json:"url"`
	Topic            string `json:"topic"`
	ForceRestart     bool   `json:"force_restart"`
	ReadNewest       bool   `json:"read_newest"`
	SeekByTime       int64  `json:"seek_by_time"`
	AuthClientId     string `json:"client_id"`
	AuthClientSecret string `json:"auth_client_secret"`
	AuthIssuerURL    string `json:"auth_issuer_url"`
	AuthAudience     string `json:"auth_audience"`
	SubscriptionType string `json:"subscription_type"`
}

type PulsarCursorTracker

type PulsarCursorTracker interface {
	TrackMe(p MessageProcessor)
}

func GetCursorTracker

func GetCursorTracker(size int, source *PulsarSource) PulsarCursorTracker

type PulsarMessageFactoryImpl

type PulsarMessageFactoryImpl struct {
}

func (*PulsarMessageFactoryImpl) Create

type PulsarSource

type PulsarSource struct {
	// contains filtered or unexported fields
}

func GetPulsarSource

func GetPulsarSource(conf PulsarConf) *PulsarSource

func (*PulsarSource) Generate

func (p *PulsarSource) Generate(out chan<- interface{})

Generate is Source method implementation, which connects to Pulsar and pushes PulsarMessage into the channel

func (*PulsarSource) GetKey

func (p *PulsarSource) GetKey(msg interface{}) []byte

func (*PulsarSource) GetOffset

func (p *PulsarSource) GetOffset(msg interface{}) int64

func (*PulsarSource) GetPartition

func (p *PulsarSource) GetPartition(msg interface{}) int32

func (*PulsarSource) GetValue

func (p *PulsarSource) GetValue(msg interface{}) []byte

func (*PulsarSource) RegisterHook

func (p *PulsarSource) RegisterHook(hook SourceHook)

func (*PulsarSource) Stop

func (p *PulsarSource) Stop()

Stop method implements Source interface stop method, to Stop the KafkaConsumer

type SourceHook

type SourceHook interface {
	Pre(p MessageProcessor)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL