producer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2019 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package producer provides types for producing messages to Kafka. Felice provides a generic way of defining messages that is not tied to the way this message is sent to Kafka. In Felice, a message contains a body and may contain a key and headers. The way this information is sent to Kafka is a function of the MessageConverter used. Some MessageConverter could decide to send headers using the Kafka headers feature and encode the body using JSON, whilst another might want to wrap the headers and body, into an Avro message and send everything as the Kafka value. This allows decoupling of business logic from the convention used to format messages so it is easy to change the format without changing too much code.

Producers require a valid configuration to be able to run properly. The Config type allows to define the client id and converter by also to customize Sarama's behaviour.

Example
package main

import (
	"context"

	"github.com/heetch/felice/codec"
	"github.com/heetch/felice/producer"
)

var endpoints []string

func main() {
	config := producer.NewConfig("some-id", producer.MessageConverterV1())

	p, err := producer.New(config, endpoints...)
	if err != nil {
		panic(err)
	}
	defer p.Close()

	err = p.SendMessage(context.Background(), &producer.Message{
		Topic: "some topic",
		Key:   codec.StringEncoder("some key"),
		Body:  "some body",
	})
	if err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	sarama.Config

	// Converter used to translate Felice messages to Sarama ones.
	Converter MessageConverter
}

Config is used to configure the Producer.

func NewConfig

func NewConfig(clientID string, converter MessageConverter) Config

NewConfig creates a config with sane defaults. Parameter clientID is directly copied in Sarama.Config.ClientID.

type Message

type Message struct {
	// The Kafka topic this Message applies to.
	Topic string

	// If specified, messages with the same key will be sent to the same Kafka partition.
	Key sarama.Encoder

	// Body of the Kafka message.
	Body interface{}

	// The time at which this Message was produced.
	ProducedAt time.Time

	// Partition where this publication was stored.
	Partition int32

	// Offset where this publication was stored.
	Offset int64

	// Headers of the message.
	Headers map[string]string

	// Unique ID of the message. Defaults to an uuid.
	ID string
}

Message represents a message to be sent via Kafka. Before sending it, the producer will transform this structure into a sarama.ProducerMessage using the registered Converter.

func NewMessage

func NewMessage(topic string, body interface{}) *Message

NewMessage creates a configured message with a generated unique ID.

type MessageConverter

type MessageConverter interface {
	ToKafka(context.Context, *Message) (*sarama.ProducerMessage, error)
}

A MessageConverter transforms a Message into a sarama.ProducerMessage. The role of the converter is to decouple the conventions defined by users from the producer. Each converter defines a set of convention regarding how the message is formatted in Kafka. A converter can add metadata, use an enveloppe to store every information in the body or even use Kafka headers.

func MessageConverterV1

func MessageConverterV1() MessageConverter

MessageConverterV1 is the first version of the default converter. The headers are sent using Kafka headers and the body is encoded into JSON. A Message-Id and Produced-At headers are automatically added containing respectively the message ID it not empty and the current time in UTC format.

type Option

type Option func(*Message)

Option is a function type that receives a pointer to a Message and modifies it in place. Options are intended to customize a message before sending it. You can do this either by passing them as parameters to the New function, or by calling them directly against a Message.

func Float64Key

func Float64Key(key float64) Option

Float64Key is an Option that specifies a key for the message as a float.

func Header(k, v string) Option

Header is an Option that adds a custom header to the message. You may pass as many Header options to New as you wish. If multiple Header's are defined for the same key, the value of the last one past to New will be the value that appears on the Message.

func Int64Key

func Int64Key(key int) Option

Int64Key is an Option that specifies a key for the message as an integer.

func Key

func Key(key codec.Encoder) Option

Key is an Option that specifies a key for the message. You should only pass this once to the New function, but if you pass it multiple times, the value set by the final one you pass will be what is set on the Message when it is returned by New.

func StrKey

func StrKey(key string) Option

StrKey is an Option that specifies a key for the message as a string.

type Producer

type Producer struct {
	sarama.SyncProducer
	// contains filtered or unexported fields
}

Producer sends messages to Kafka. It embeds the sarama.SyncProducer type and shadows the SendMessage method to use the Message type.

func New

func New(config Config, addrs ...string) (*Producer, error)

New creates a Producer. This Producer is synchronous, this means that it will wait for all the replicas to acknowledge the message.

func NewFrom

func NewFrom(producer sarama.SyncProducer, config Config) (*Producer, error)

NewFrom creates a producer using the given SyncProducer. Useful when wanting to create multiple producers with different configurations but sharing the same underlying connection.

Example
package main

import (
	"context"

	"github.com/Shopify/sarama"
	"github.com/heetch/felice/codec"
	"github.com/heetch/felice/producer"
)

var endpoints []string

type customConverter struct{}

func (customConverter) ToKafka(context.Context, *producer.Message) (*sarama.ProducerMessage, error) {
	return nil, nil
}

func main() {
	config := producer.NewConfig("some-id", producer.MessageConverterV1())

	p1, err := producer.New(config, endpoints...)
	if err != nil {
		panic(err)
	}
	defer p1.Close()

	config = producer.NewConfig("some-id", new(customConverter))
	p2, err := producer.NewFrom(p1.SyncProducer, config)
	if err != nil {
		panic(err)
	}

	err = p2.SendMessage(context.Background(), &producer.Message{
		Topic: "some topic",
		Key:   codec.StringEncoder("some key"),
		Body:  "some body",
	})
	if err != nil {
		panic(err)
	}
}
Output:

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, topic string, body interface{}, opts ...Option) (*Message, error)

Send creates and sends a message to Kafka synchronously. It returns the message.Message sent to the brokers.

Example
package main

import (
	"context"

	"github.com/heetch/felice/producer"
)

var endpoints []string

func main() {
	config := producer.NewConfig("some-id", producer.MessageConverterV1())

	p, err := producer.New(config, endpoints...)
	if err != nil {
		panic(err)
	}
	defer p.Close()

	_, err = p.Send(context.Background(), "some topic", "some body", producer.StrKey("some key"))
	if err != nil {
		panic(err)
	}
}
Output:

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, msg *Message) error

SendMessage sends the given message to Kafka synchronously.

func (*Producer) SendMessages added in v0.4.0

func (p *Producer) SendMessages(ctx context.Context, msgs []*Message) error

SendMessages sends all the given messages in order. If it fails to send the messages to Kafka, it will return a SendMessagesErrors error describing which messages failed.

type SendMessagesError added in v0.4.0

type SendMessagesError struct {
	Msg *Message
	Err error
}

SendMessagesError describes why one message failed to be sent.

type SendMessagesErrors added in v0.4.0

type SendMessagesErrors []*SendMessagesError

SendMessagesErrors is the error type returned if SendMessages fails to send to Kafka.

func (SendMessagesErrors) Error added in v0.4.0

func (e SendMessagesErrors) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL