producer

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxMessageKBLimit    = 1024
	DefaultMessageKBSize = 960
)
View Source
const (
	MaxMessageSizeLimit = 1024
)

Variables

This section is empty.

Functions

func InvokeCallback

func InvokeCallback(eventSubscriptionMap map[string]map[EventType]EventCallback,
	messageID string, eventType EventType,
)

InvokeCallback invokes relevant callback in the given events subscription map.

func NewGenericProducer

func NewGenericProducer(transportConfig *transport.TransportConfig) (transport.Producer, error)

Types

type EventCallback

type EventCallback func()

EventCallback is the type for subscription callbacks.

type EventType

type EventType string

EventType is the type of transportation-events that may occur.

const (
	// DeliveryAttempt event occurs when an attempted transport-delivery operation is attempted (sent to servers).
	DeliveryAttempt EventType = "attempt"
	// DeliverySuccess event occurs when an attempted transport-delivery operation is successful (ack from servers).
	DeliverySuccess EventType = "success"
	// DeliveryFailure event occurs when an attempted transport-delivery operation fails.
	DeliveryFailure EventType = "failure"
)

type GenericProducer

type GenericProducer struct {
	// contains filtered or unexported fields
}

func (*GenericProducer) Send

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

Producer abstracts hub-of-hubs/pkg/kafka kafka-producer's generic usage.

func NewKafkaProducer

func NewKafkaProducer(compressor compressor.Compressor, kafkaConfig *transport.KafkaConfig, log logr.Logger,
) (*KafkaProducer, error)

NewProducer returns a new instance of Producer object.

func (*KafkaProducer) Close

func (p *KafkaProducer) Close()

Close closes the KafkaProducer.

func (*KafkaProducer) Producer

func (p *KafkaProducer) Producer() *kafka.Producer

Producer returns the wrapped Confluent KafkaProducer.

func (*KafkaProducer) Send

func (p *KafkaProducer) Send(ctx context.Context, msg *transport.Message) error

support the transport producer interface to compatible with the cloudevents

func (*KafkaProducer) SendAsync

func (p *KafkaProducer) SendAsync(msg *transport.Message)

SendAsync sends a message to the transport asynchronously.

func (*KafkaProducer) Start

func (p *KafkaProducer) Start(ctx context.Context) error

Start starts the kafka.

func (*KafkaProducer) Subscribe

func (p *KafkaProducer) Subscribe(messageID string, callbacks map[EventType]EventCallback)

Subscribe adds a callback to be delegated when a given event occurs for a message with the given ID.

func (*KafkaProducer) SupportsDeltaBundles

func (p *KafkaProducer) SupportsDeltaBundles() bool

SupportsDeltaBundles returns true. kafka does support delta bundles.

type KafkaProducerConfig

type KafkaProducerConfig struct {
	ProducerID    string
	ProducerTopic string
}

type MessageBuilder

type MessageBuilder struct {
	// contains filtered or unexported fields
}

MessageBuilder uses the builder patten to construct a kafka message.

func NewMessageBuilder

func NewMessageBuilder(key string, topic *string, partitionID int32, headers []kafka.Header,
	payload []byte,
) *MessageBuilder

NewMessageBuilder creates a new instance of MessageBuilder.

func (*MessageBuilder) Build

func (builder *MessageBuilder) Build() *kafka.Message

Build returns the internal kafka message.

func (*MessageBuilder) Header

func (builder *MessageBuilder) Header(header kafka.Header) *MessageBuilder

Header adds a header to the message headers.

type Producer

type Producer interface {
	// SendAsync sends a message to the transport component asynchronously.
	SendAsync(message *transport.Message)
	// Subscribe adds a callback to be delegated when a given event occurs for a message with the given ID.
	Subscribe(messageID string, callbacks map[EventType]EventCallback)
	// Start starts the transport.
	Start(ctx context.Context) error
	// SupportsDeltaBundles returns true if the transport layer supports delta bundles, otherwise false.
	SupportsDeltaBundles() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL