kafka

package
v1.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsyncDeliverer

func AsyncDeliverer(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error)

AsyncDeliverer delivers the supplied message and returns a nil response.

When using this deliverer please ensure that the supplied DecodeResponseFunc and PublisherResponseFunc are able to handle nil-type responses.

AsyncDeliverer will produce the message with the context detached due to the fact that actual message producing is called asynchronously (another goroutine) and at that time original context might be already canceled causing the producer to fail. The detached context will include values attached to the original context, but deadline and cancel will be reset. To provide a context for asynchronous deliverer please use AsyncDelivererCtx function instead.

func AsyncDelivererCtx

func AsyncDelivererCtx(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error)

AsyncDelivererCtx delivers the supplied message and returns a nil response.

When using this deliverer please ensure that the supplied DecodeResponseFunc and PublisherResponseFunc are able to handle nil-type responses.

func DefaultErrorEncoder

func DefaultErrorEncoder(ctx context.Context,
	err error, msg *kgo.Record, h Handler)

DefaultErrorEncoder simply ignores the message.

func EncodeJSONRequest

func EncodeJSONRequest(_ context.Context, msg *kgo.Record, request interface{}) error

EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a JSON object to the Message value. Many services can use it as a sensible default.

func SyncDeliverer

func SyncDeliverer(ctx context.Context, pub Publisher, msg *kgo.Record) (*kgo.Record, error)

SyncDeliverer is a deliverer that publishes the specified message and returns the first object. If the context times out while waiting for a reply, an error will be returned.

Types

type DecodeRequestFunc

type DecodeRequestFunc func(ctx context.Context, msg *kgo.Record) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from an Kafka message. It is designed to be used in Kafka Subscribers.

type DecodeResponseFunc

type DecodeResponseFunc func(context.Context, *kgo.Record) (response interface{}, err error)

DecodeResponseFunc extracts a user-domain response object from kafka response object. It's designed to be used in kafka publisher, for publisher-side endpoints. One straightforward DecodeResponseFunc could be something that JSON decodes from the response payload to the concrete response type.

type Deliverer

type Deliverer func(
	context.Context,
	Publisher,
	*kgo.Record,
) (*kgo.Record, error)

Deliverer is invoked by the Publisher to publish the specified Message, and to retrieve the appropriate response Event object.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, *kgo.Record, interface{}) error

EncodeRequestFunc encodes the passed request object into an Kafka message object. It is designed to be used in Kafka Publishers.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, *kgo.Record, interface{}) error

EncodeResponseFunc encodes the passed response object into a Kafka message object. It is designed to be used in Kafka Subscribers.

type ErrorEncoder

type ErrorEncoder func(ctx context.Context,
	err error, msg *kgo.Record, h Handler)

ErrorEncoder is responsible for encoding an error to the subscriber reply. Users are encouraged to use custom ErrorEncoders to encode errors to their replies, and will likely want to pass and check for their own error types.

type Handler

type Handler interface {
	Produce(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error))
	ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults
}

Handler is a handler interface to make testing possible. It is highly recommended to use *kafka.Producer as the interface implementation.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher wraps single Kafka topic for message publishing and implements endpoint.Endpoint.

func NewPublisher

func NewPublisher(
	handler Handler,
	topic string,
	enc EncodeRequestFunc,
	dec DecodeResponseFunc,
	options ...PublisherOption,
) *Publisher

NewPublisher constructs a new publisher for a single Kafka topic, which implements endpoint.Endpoint.

func (Publisher) Endpoint

func (p Publisher) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that invokes message publishing.

type PublisherOption

type PublisherOption func(publisher *Publisher)

PublisherOption sets an optional parameter for publishers.

func PublisherAfter

func PublisherAfter(after ...PublisherResponseFunc) PublisherOption

PublisherAfter adds one or more PublisherResponseFuncs, which are applied to the context after successful message publishing. This is useful for context-manipulation operations.

func PublisherBefore

func PublisherBefore(before ...RequestFunc) PublisherOption

PublisherBefore sets the RequestFuncs that are applied to the outgoing publisher request before it's invoked.

func PublisherDeliverer

func PublisherDeliverer(deliverer Deliverer) PublisherOption

PublisherDeliverer sets the deliverer function that the Publisher invokes.

func PublisherTimeout

func PublisherTimeout(timeout time.Duration) PublisherOption

PublisherTimeout sets the available timeout for a kafka request.

type PublisherResponseFunc

type PublisherResponseFunc func(ctx context.Context, msg *kgo.Record) context.Context

PublisherResponseFunc may take information from a request context. PublisherResponseFunc are only executed in producers, after a request has been produced.

type RequestFunc

type RequestFunc func(ctx context.Context, msg *kgo.Record) context.Context

RequestFunc may take information from a Kafka message and put it into a request context. In Subscribers, RequestFuncs are executed prior to invoking the endpoint.

func SetLogger

func SetLogger(l sdklogger.Logger) RequestFunc

SetLogger returns RequestFunc that sets SDK Logger to the request context. It will also try to setup context values to the logger fields.

func SetMetrics

func SetMetrics(m sdkmetrics.Metrics) RequestFunc

SetMetrics returns RequestFunc that sets the Metrics client to the request context.

func SetRequestID

func SetRequestID() RequestFunc

SetRequestID returns RequestFunc that sets RequestID to the request context if not previously set.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber wraps an endpoint and provides a handler for kafka messages.

func NewInstrumentedSubscriber

func NewInstrumentedSubscriber(e endpoint.Endpoint, dec DecodeRequestFunc, opts ...SubscriberOption) *Subscriber

NewInstrumentedSubscriber constructs a new subscriber provides a handler for kafka messages. It also instruments the subscriber with datadog tracing.

func NewSubscriber

func NewSubscriber(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	opts ...SubscriberOption,
) *Subscriber

NewSubscriber constructs a new subscriber provides a handler for kafka messages.

func (Subscriber) ServeMsg

func (s Subscriber) ServeMsg(h Handler) sdkkafka.MsgHandler

ServeMsg provides kafka.MsgHandler.

type SubscriberFinalizerFunc

type SubscriberFinalizerFunc func(ctx context.Context, msg *kgo.Record)

SubscriberFinalizerFunc can be used to perform work at the end of message processing, after the response has been constructed. The principal intended use is for request logging.

type SubscriberOption

type SubscriberOption func(consumer *Subscriber)

SubscriberOption sets an optional parameter for subscribers.

func SubscriberAfter

func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption

SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.

func SubscriberBefore

func SubscriberBefore(before ...RequestFunc) SubscriberOption

SubscriberBefore functions are executed on the subscriber message object before the request is decoded.

func SubscriberErrorEncoder

func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption

SubscriberErrorEncoder is used to encode errors to the subscriber reply whenever they're encountered in the processing of a request. Clients can use this to provide custom error formatting. By default, errors will be published with the DefaultErrorEncoder.

func SubscriberErrorHandler

func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption

SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.

func SubscriberFinalizer

func SubscriberFinalizer(f ...SubscriberFinalizerFunc) SubscriberOption

SubscriberFinalizer is executed at the end of every message processing. By default, no finalizer is registered.

type SubscriberResponseFunc

type SubscriberResponseFunc func(ctx context.Context, response interface{}) context.Context

SubscriberResponseFunc may take information from a request context and use it to manipulate a Publisher. SubscriberResponseFuncs are only executed in consumers, after invoking the endpoint but prior to publishing a reply.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL