transport

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: MIT Imports: 6 Imported by: 5

Documentation

Overview

Package transport provides a Kafka transport abstraction.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeJSONRequest added in v0.2.0

func EncodeJSONRequest(_ context.Context, msg *kafka.Message, request interface{}) error

EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a JSON object to the Message value. Many services can use it as a sensible default.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps an endpoint and implements kafka.Handler.

func NewConsumer

func NewConsumer(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	opts ...ConsumerOption,
) *Consumer

NewConsumer constructs a new consumer, which implements kafka.Handler and wraps the provided endpoint.

func (Consumer) Handle

func (c Consumer) Handle(ctx context.Context, msg *kafka.Message) (err error)

Handle implements kafka.Handler.

type ConsumerFinalizerFunc

type ConsumerFinalizerFunc func(ctx context.Context, msg *kafka.Message, err error)

ConsumerFinalizerFunc can be used to perform work at the end of message processing, after the response has been constructed. The principal intended use is for request logging.

type ConsumerOption

type ConsumerOption func(consumer *Consumer)

ConsumerOption sets an optional parameter for a Consumer.

func ConsumerAfter

func ConsumerAfter(after ...ConsumerResponseFunc) ConsumerOption

ConsumerAfter functions are executed on the consumer reply after the endpoint is invoked, but before anything is published to the reply.

func ConsumerBefore

func ConsumerBefore(before ...RequestFunc) ConsumerOption

ConsumerBefore functions are executed on the consumer message object before the request is decoded.

func ConsumerErrorHandler

func ConsumerErrorHandler(errorHandler transport.ErrorHandler) ConsumerOption

ConsumerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.

func ConsumerFinalizer added in v0.2.0

func ConsumerFinalizer(f ...ConsumerFinalizerFunc) ConsumerOption

ConsumerFinalizer is executed at the end of every message processing. By default, no finalizer is registered.

type ConsumerResponseFunc

type ConsumerResponseFunc func(ctx context.Context, response interface{}) context.Context

ConsumerResponseFunc may take information from a request context and use it to manipulate a Producer. ConsumerResponseFuncs are only executed in consumers, after invoking the endpoint but prior to publishing a reply.

type DecodeRequestFunc

type DecodeRequestFunc func(ctx context.Context, msg *kafka.Message) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from an Kafka message. It is designed to be used in Kafka Consumers.

type EncodeRequestFunc added in v0.2.0

type EncodeRequestFunc func(context.Context, *kafka.Message, interface{}) error

EncodeRequestFunc encodes the passed request object into an Kafka message object. It is designed to be used in Kafka Producers.

type EncodeResponseFunc added in v0.2.0

type EncodeResponseFunc func(context.Context, interface{}) error

EncodeResponseFunc encodes the passed response object into an Kafka message object. It is designed to be used in Kafka Consumers.

type Producer added in v0.2.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer wraps single Kafka topic for message producing and implements endpoint.Endpoint.

func NewProducer added in v0.2.0

func NewProducer(
	handler kafka.Handler,
	topic string,
	enc EncodeRequestFunc,
	options ...ProducerOption,
) *Producer

NewProducer constructs a new producer for a single Kafka topic.

func (Producer) Endpoint added in v0.2.0

func (p Producer) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that invokes message producing.

type ProducerFinalizerFunc added in v0.2.0

type ProducerFinalizerFunc func(ctx context.Context, err error)

ProducerFinalizerFunc can be used to perform work at the end of a producing Kafka message, after response is returned. The principal intended use is for error logging.

type ProducerOption added in v0.2.0

type ProducerOption func(producer *Producer)

ProducerOption sets an optional parameter for a Producer.

func ProducerAfter added in v0.2.0

func ProducerAfter(after ...ProducerResponseFunc) ProducerOption

ProducerAfter adds one or more ProducerResponseFuncs, which are applied to the context after successful message producing. This is useful for context-manipulation operations.

func ProducerBefore added in v0.2.0

func ProducerBefore(before ...RequestFunc) ProducerOption

ProducerBefore sets the RequestFuncs that are applied to the outgoing producer request before it's invoked.

func ProducerFinalizer added in v0.2.0

func ProducerFinalizer(f ...ProducerFinalizerFunc) ProducerOption

ProducerFinalizer adds one or more ProducerFinalizerFuncs to be executed at the end of producing Kafka message. Finalizers are executed in the order in which they were added. By default, no finalizer is registered.

func ProducerResponse added in v0.2.1

func ProducerResponse(response interface{}) ProducerOption

ProducerResponse sets the successful response value for a Producer.

type ProducerResponseFunc added in v0.2.0

type ProducerResponseFunc func(ctx context.Context) context.Context

ProducerResponseFunc may take information from a request context. ProducerResponseFunc are only executed in producers, after a request has been produced.

type RequestFunc added in v0.2.0

type RequestFunc func(ctx context.Context, msg *kafka.Message) context.Context

RequestFunc may take information from a Kafka message and put it into a request context. In Consumers, RequestFuncs are executed prior to invoking the endpoint.

type Router added in v0.1.4

type Router map[string][]kafka.Handler

Router represents mapping topic -> []kafka.Handler and implements kafka.Handler with routing handlers by topic.

func (Router) AddHandler added in v0.1.4

func (r Router) AddHandler(topic string, handler kafka.Handler) Router

AddHandler appends the kafka.Handler for specific topic.

func (Router) Handle added in v0.1.4

func (r Router) Handle(ctx context.Context, msg *kafka.Message) error

Handle implements the kafka.Handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL