message

package
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitKafkaSender

func InitKafkaSender(config config.InternalDataConfig)

InitKafkaSender constructs the singleton instance of a messagesender using the kafka transport

func ShutdownKafkaSender

func ShutdownKafkaSender()

ShutdownKafkaSender stops the kafka producer and unsets the singleton

Types

type KafkaMessageReceiver

type KafkaMessageReceiver struct {
	// contains filtered or unexported fields
}

KafkaMessageReceiver consumes messages from the configured 'messagetopic'.

func (*KafkaMessageReceiver) Initialized

func (r *KafkaMessageReceiver) Initialized() bool

Initialized indicates whether the receiver has completed initial read of all pending (non-acknowledged) messages from the transport.

func (*KafkaMessageReceiver) SetNotificationFunc

func (r *KafkaMessageReceiver) SetNotificationFunc(notifier NotificationFunc)

SetNotificationFunc assigns a function to be called for each message that is delivered on the topic.

func (*KafkaMessageReceiver) Shutdown

func (r *KafkaMessageReceiver) Shutdown()

Shutdown stops the message consumer

func (*KafkaMessageReceiver) Start

func (r *KafkaMessageReceiver) Start()

Start instructs the messagereceiver to begin accepting messages

type KafkaMessageSender

type KafkaMessageSender struct {
	// contains filtered or unexported fields
}

KafkaMessageSender is a message Sender for Kafka.

func (KafkaMessageSender) Ack

func (s KafkaMessageSender) Ack(msg Message) error

Ack acknowledges that the message has been received and completed; it will not be delivered again. Because of the inherent raciness of kafka message delivery this cannot be guaranteed, and all message processing within a node must be idempotent.

func (KafkaMessageSender) Send

func (s KafkaMessageSender) Send(msg Message) error

Send sends the message on the configured kafka topic

func (KafkaMessageSender) Shutdown

func (s KafkaMessageSender) Shutdown()

Shutdown stops the underlying kafka producer

type Message

type Message struct {
	MessageType string `json:"messagetype"` // e.g. RequestConsumerRecovery or UpdateRateData or CancelConsumerRecovery or RestartSource (for ES recovery?)
	Key         string `json:"key"`         // key for the Kafka message carrying this message; this can be either be a unique business key or synthetic unique key
	Payload     []byte `json:"payload"`     // the payload (optional) delivered to the target along with the message itself
}

Message is a single message to be delivered to a source or node

type NotificationFunc

type NotificationFunc func(msg Message) []error

NotificationFunc is the method used to send a new message to all sources / nodes that accept it

type Receiver

type Receiver interface {
	Start()
	Initialized() bool
	SetNotificationFunc(notifier NotificationFunc)
	Shutdown()
}

Receiver receives messages using the configured transport

func NewKafkaReceiver

func NewKafkaReceiver(config *config.InternalDataConfig) (Receiver, error)

NewKafkaReceiver creates a new instance of the messagereceiver

type Sender

type Sender interface {
	Send(msg Message) error
	Ack(msg Message) error
	Shutdown()
}

Sender sends messages using the configured transport

func GetSender

func GetSender() Sender

GetSender returns the singleton sender

func NewKafkaMessageSender

func NewKafkaMessageSender(config *config.InternalDataConfig) Sender

NewKafkaMessageSender creates and configures a Sender that uses a compact kafka topic as its transport.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL