dataplane

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckIndication

type AckIndication struct {
	// Stream is the name of the stream
	Stream string `json:"stream" validate:"required,alphanum|uuid"`
	// Consumer is the name of the consumer
	Consumer string `json:"consumer" validate:"required,alphanum|uuid"`
	// SeqNum is the sequence number of the JetStream message
	SeqNum AckSeqNum `json:"seq_num" validate:"required,dive"`
}

AckIndication is the ACK of a NATs JetStream message which contains its key parameters

func (AckIndication) String

func (m AckIndication) String() string

String toString for ackIndication

type AckSeqNum

type AckSeqNum struct {
	// Stream is the JetStream message sequence number for this stream
	Stream uint64 `json:"stream" validate:"required,gte=0"`
	// Consumer is the JetStream message sequence number for this consumer
	Consumer uint64 `json:"consumer" validate:"required,gte=0"`
}

AckSeqNum are the sequence numbers of the NATs JetStream message

type AlertOnErrorCB

type AlertOnErrorCB func(err error)

AlertOnErrorCB callback used to expose internal error to an outer context for handling

type ForwardMessageHandlerCB

type ForwardMessageHandlerCB func(ctxt context.Context, msg *nats.Msg) error

ForwardMessageHandlerCB callback used to forward new messages to the next pipeline stage

type JetStreamACKBroadcaster

type JetStreamACKBroadcaster interface {
	// BroadcastACK broadcast a JetStream message ACK
	BroadcastACK(ctxt context.Context, ack AckIndication) error
}

JetStreamACKBroadcaster broadcasts JetStream message ACK through NATs subjects

func GetJetStreamACKBroadcaster

func GetJetStreamACKBroadcaster(
	natsClient *core.NatsClient, instance string,
) (JetStreamACKBroadcaster, error)

GetJetStreamACKBroadcaster define JetStreamACKBroadcaster

type JetStreamACKReceiver

type JetStreamACKReceiver interface {
	// SubscribeForACKs start receiving JetStream message ACKs
	SubscribeForACKs(wg *sync.WaitGroup, handler JetStreamAckHandler) error
}

JetStreamACKReceiver processes JetStream message ACKs being broadcast through NATs subjects

type JetStreamAckHandler

type JetStreamAckHandler func(context.Context, AckIndication)

JetStreamAckHandler is the function signature for callback processing a JetStream ACK

type JetStreamInflightMsgProcessor

type JetStreamInflightMsgProcessor interface {
	// RecordInflightMessage records a new JetStream message inflight awaiting ACK
	RecordInflightMessage(callCtxt context.Context, msg *nats.Msg, blocking bool) error
	// HandlerMsgACK processes a new message ACK
	HandlerMsgACK(callCtxt context.Context, ack AckIndication, blocking bool) error
}

JetStreamInflightMsgProcessor processes inflight JetStream messages awaiting ACK

type JetStreamPublisher

type JetStreamPublisher interface {
	// Publish publishes a new message into JetStream on a subject
	Publish(ctxt context.Context, subject string, msg []byte) error
}

JetStreamPublisher publishes new messages into JetStream

func GetJetStreamPublisher

func GetJetStreamPublisher(
	natsClient *core.NatsClient, instance string,
) (JetStreamPublisher, error)

GetJetStreamPublisher get new JetStreamPublisher

type JetStreamPushSubscriber

type JetStreamPushSubscriber interface {
	// StartReading begin reading data from JetStream
	StartReading(
		forwardCB ForwardMessageHandlerCB,
		errorCB AlertOnErrorCB,
		wg *sync.WaitGroup,
	) error
}

JetStreamPushSubscriber is directly reading from JetStream with a push consumer

type MessageDispatcher

type MessageDispatcher interface {
	// Start starts operations
	Start(msgOutput ForwardMessageHandlerCB, errorCB AlertOnErrorCB) error
}

MessageDispatcher process a consumer subscription request from a client and dispatch messages to that client

func GetPushMessageDispatcher

func GetPushMessageDispatcher(
	ctxt context.Context,
	natsClient *core.NatsClient,
	stream, subject, consumer string,
	deliveryGroup *string,
	maxInflightMsgs int,
	wg *sync.WaitGroup,
) (MessageDispatcher, error)

GetPushMessageDispatcher get a new push MessageDispatcher

type MsgToDeliver

type MsgToDeliver struct {
	// Stream is the name of the stream
	Stream string `json:"stream" validate:"required,alphanum|uuid"`
	// Subject is the name of the subject / subject filter
	Subject string `json:"subject" validate:"required"`
	// Consumer is the name of the consumer
	Consumer string `json:"consumer" validate:"required,alphanum|uuid"`
	// Sequence is the sequence numbers for this JetStream message
	Sequence MsgToDeliverSeq `json:"sequence" validate:"required,dive"`
	// Message is the message body
	Message []byte `json:"b64_msg" validate:"required" swaggertype:"string" format:"base64" example:"SGVsbG8gV29ybGQK"`
}

MsgToDeliver a structure for representing a message to send out to a subscribing client

func ConvertJSMessageDeliver

func ConvertJSMessageDeliver(subject string, msg *nats.Msg) (MsgToDeliver, error)

ConvertJSMessageDeliver convert a JetStream message for delivery

func (MsgToDeliver) String

func (m MsgToDeliver) String() string

String toString function for MsgToDeliver

type MsgToDeliverSeq

type MsgToDeliverSeq struct {
	// Stream is the message sequence number within the stream
	Stream uint64 `json:"stream" validate:"required,gte=0"`
	// Consumer is the message sequence number for this consumer
	Consumer uint64 `json:"consumer" validate:"required,gte=0"`
}

MsgToDeliverSeq sequence numbers for a JetStream message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL