metamorphosis

package module
v0.0.0-...-20936e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2017 License: MIT Imports: 9 Imported by: 2

README

metamorphosis

metamorphosis is a Go client for easily interacting with Kafka. It works best when used to handle a Kafka setup that's clustered with Zookeeper and whose messages are encoded with Avro.

Usage

import "github.com/FoxComm/metamorphosis"

Construct a new consumer by creating a Consumer that connects to Zookeeper and the Avro schema registry and provide offset reset strategy value ("largest" or "smallest").

For example:

zookeeper := "localhost:2181"
schemaRepo := "http://localhost:8081"
resetOffsetStrategy := "smallest" // or "largest"

consumer, err := metamorphosis.NewConsumer(zookeeper, schemaRepo, resetOffsetStrategy)

To handle messages, define a handler and run against a topic:

handler := func(message AvroMessage) error {
  bytes := message.Bytes()
  fmt.Println(string(bytes))
  return nil
}

consumer.RunTopic("my_topic", handler)

License

MIT

Documentation

Index

Constants

View Source
const (
	OffsetResetSmallest = go_kafka_client.SmallestOffset
	OffsetResetLargest  = go_kafka_client.LargestOffset
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AvroMessage

type AvroMessage interface {
	// Bytes returns a byte array containing the parsed contents of the message.
	Bytes() []byte
}

AvroMessage represents a Kafka message that's been successfully decoded with Avro.

type Consumer

type Consumer interface {
	// SetClientID sets the identifier used to uniquely describe the consumer.
	SetClientID(clientID string)

	// SetGroupID sets the identifier used to uniquely describe the group to
	// which this consumer belongs.
	SetGroupID(groupID string)

	// RunTopic runs a message handler against a topic. The handler
	// gets called each time a new message is received.
	RunTopic(topic string, handler Handler)
}

Consumer represents the interface for consuming data from a Kafka topic.

func NewConsumer

func NewConsumer(zookeeper string, schemaRepo string, offsetStrategy string) (Consumer, error)

type Handler

type Handler func(message AvroMessage) error

Handler defines the function signature for handling the contents of a decoded Avro message.

type Producer

type Producer interface {
	Emit(topic string, value interface{}) error
}

Producer is the interface for emitting data to a Kafka topic.

func NewProducer

func NewProducer(broker, schemaRegistry string) (Producer, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL