Documentation ¶
Overview ¶
Package producer provides types for producing messages to Kafka. Felice provides a generic way of defining messages that is not tied to the way this message is sent to Kafka. In Felice, a message contains a body and may contain a key and headers. The way this information is sent to Kafka is a function of the MessageConverter used. Some MessageConverter could decide to send headers using the Kafka headers feature and encode the body using JSON, whilst another might want to wrap the headers and body, into an Avro message and send everything as the Kafka value. This allows decoupling of business logic from the convention used to format messages so it is easy to change the format without changing too much code.
Producers require a valid configuration to be able to run properly. The Config type allows to define the client id and converter by also to customize Sarama's behaviour.
Default configuration: * Uses murmur2 partitioner to be compatible with JVM ecosystem, specially KStreams * Max retry 3 attemps * Wait for all in-sync replicas to ack the message
Example ¶
package main import ( "context" "github.com/heetch/felice/v2/codec" "github.com/heetch/felice/v2/producer" ) var endpoints []string func main() { config := producer.NewConfig("some-id", producer.MessageConverterV1()) p, err := producer.New(config, endpoints...) if err != nil { panic(err) } defer p.Close() err = p.SendMessage(context.Background(), &producer.Message{ Topic: "some topic", Key: codec.StringEncoder("some key"), Body: "some body", }) if err != nil { panic(err) } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MurmurHasher ¶ added in v2.2.0
MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface. The implementation is not full and does not support streaming. It only implements the interface to comply with sarama.NewCustomHashPartitioner signature. But Sarama only uses Write method once, when writing keys and values of the message, so streaming support is not necessary.
func NewJVMCompatiblePartitioner ¶ added in v2.2.0
func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.
Types ¶
type Config ¶
type Config struct { sarama.Config // Converter used to translate Felice messages to Sarama ones. Converter MessageConverter }
Config is used to configure the Producer.
func NewConfig ¶
func NewConfig(clientID string, converter MessageConverter) Config
NewConfig creates a config with sane defaults. Parameter clientID is directly copied in Sarama.Config.ClientID.
type Message ¶
type Message struct { // The Kafka topic this Message applies to. Topic string // If specified, messages with the same key will be sent to the same Kafka partition. Key sarama.Encoder // Body of the Kafka message. Body interface{} // The time at which this Message was produced. ProducedAt time.Time // Partition where this publication was stored. Partition int32 // Offset where this publication was stored. Offset int64 // Headers of the message. Headers map[string]string // Unique ID of the message. Defaults to an uuid. ID string }
Message represents a message to be sent via Kafka. Before sending it, the producer will transform this structure into a sarama.ProducerMessage using the registered Converter.
func NewMessage ¶
NewMessage creates a configured message with a generated unique ID.
type MessageConverter ¶
type MessageConverter interface {
ToKafka(context.Context, *Message) (*sarama.ProducerMessage, error)
}
A MessageConverter transforms a Message into a sarama.ProducerMessage. The role of the converter is to decouple the conventions defined by users from the producer. Each converter defines a set of convention regarding how the message is formatted in Kafka. A converter can add metadata, use an enveloppe to store every information in the body or even use Kafka headers.
func MessageConverterV1 ¶
func MessageConverterV1() MessageConverter
MessageConverterV1 is the first version of the default converter. The headers are sent using Kafka headers and the body is encoded into JSON. A Message-Id and Produced-At headers are automatically added containing respectively the message ID it not empty and the current time in UTC format.
type Option ¶
type Option func(*Message)
Option is a function type that receives a pointer to a Message and modifies it in place. Options are intended to customize a message before sending it. You can do this either by passing them as parameters to the New function, or by calling them directly against a Message.
func Float64Key ¶
Float64Key is an Option that specifies a key for the message as a float.
func Header ¶
Header is an Option that adds a custom header to the message. You may pass as many Header options to New as you wish. If multiple Header's are defined for the same key, the value of the last one past to New will be the value that appears on the Message.
type Producer ¶
type Producer struct { sarama.SyncProducer // contains filtered or unexported fields }
Producer sends messages to Kafka. It embeds the sarama.SyncProducer type and shadows the SendMessage method to use the Message type.
func New ¶
New creates a Producer. This Producer is synchronous, this means that it will wait for all the replicas to acknowledge the message.
func NewFrom ¶
func NewFrom(producer sarama.SyncProducer, config Config) (*Producer, error)
NewFrom creates a producer using the given SyncProducer. Useful when wanting to create multiple producers with different configurations but sharing the same underlying connection.
Example ¶
package main import ( "context" "github.com/Shopify/sarama" "github.com/heetch/felice/v2/codec" "github.com/heetch/felice/v2/producer" ) var endpoints []string type customConverter struct{} func (customConverter) ToKafka(context.Context, *producer.Message) (*sarama.ProducerMessage, error) { return nil, nil } func main() { config := producer.NewConfig("some-id", producer.MessageConverterV1()) p1, err := producer.New(config, endpoints...) if err != nil { panic(err) } defer p1.Close() config = producer.NewConfig("some-id", new(customConverter)) p2, err := producer.NewFrom(p1.SyncProducer, config) if err != nil { panic(err) } err = p2.SendMessage(context.Background(), &producer.Message{ Topic: "some topic", Key: codec.StringEncoder("some key"), Body: "some body", }) if err != nil { panic(err) } }
Output:
func (*Producer) Send ¶
func (p *Producer) Send(ctx context.Context, topic string, body interface{}, opts ...Option) (*Message, error)
Send creates and sends a message to Kafka synchronously. It returns the message.Message sent to the brokers.
Example ¶
package main import ( "context" "github.com/heetch/felice/v2/producer" ) var endpoints []string func main() { config := producer.NewConfig("some-id", producer.MessageConverterV1()) p, err := producer.New(config, endpoints...) if err != nil { panic(err) } defer p.Close() _, err = p.Send(context.Background(), "some topic", "some body", producer.StrKey("some key")) if err != nil { panic(err) } }
Output:
func (*Producer) SendMessage ¶
SendMessage sends the given message to Kafka synchronously.
type SendMessagesError ¶
SendMessagesError describes why one message failed to be sent.
type SendMessagesErrors ¶
type SendMessagesErrors []*SendMessagesError
SendMessagesErrors is the error type returned if SendMessages fails to send to Kafka.
func (SendMessagesErrors) Error ¶
func (e SendMessagesErrors) Error() string