kafka

package module
v0.0.0-...-7fff535 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2013 License: Apache-2.0 Imports: 12 Imported by: 1

README

Kafka.go - Publisher & Consumer for Kafka in Go

Kafka is a distributed publish-subscribe messaging system: (http://kafka.apache.org)

Go language: (http://golang.org/)

Changes

4/13 - Merged back from the apache repository & outstanding patches from jira applied

Get up and running

Install go (version 1):
For more info see: http://weekly.golang.org/doc/install.html#install

Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Also set your GOPATH appropriately: http://weekly.golang.org/doc/code.html#tmp_13

Build from source: go install kafka
Make the tools (publisher & consumer)
go install consumer go install publisher go install offsets
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html

Tools

Start a consumer:


   ./tools/consumer/consumer -topic test -consumeforever
  Consuming Messages :
  From: localhost:9092, topic: test, partition: 0
   ---------------------- 

Now the consumer will just poll until a message is received.

Publish a message:


  ./tools/publisher/publisher -topic test -message "Hello World"

The consumer should output message.

API Usage

Publishing


broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewMessage([]byte("tesing 1 2 3")))

Publishing Compressed Messages


broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))

Consumer

broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
broker.Consume(func(msg *kafka.Message) { msg.Print() })

Or the consumer can use a channel based approach:


broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
go broker.ConsumeOnChannel(msgChan, 10, quitChan)

Consuming Offsets

broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
offsets, err := broker.GetOffsets(-1, 1)
Contact

jeffreydamick (at) gmail (dot) com

http://twitter.com/jeffreydamick

Big thank you to NeuStar for sponsoring this work.

Documentation

Index

Constants

View Source
const (
	// Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression
	MAGIC_DEFAULT = 1
	// magic + compression + chksum
	NO_LEN_HEADER_SIZE = 1 + 1 + 4
)
View Source
const (
	NO_COMPRESSION_ID   = 0
	GZIP_COMPRESSION_ID = 1
)
View Source
const (
	REQUEST_PRODUCE      RequestType = 0
	REQUEST_FETCH                    = 1
	REQUEST_MULTIFETCH               = 2
	REQUEST_MULTIPRODUCE             = 3
	REQUEST_OFFSETS                  = 4
)

Request Types

View Source
const (
	CONNECTION_RETRY_WAIT_IN_SECONDS = 10
)
View Source
const (
	NETWORK = "tcp"
)

Variables

View Source
var DefaultCodecsMap = codecsMap(DefaultCodecs)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func (*Broker) EncodeConsumeRequest

func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte

<Request Header><OFFSET: uint64><MAX SIZE: uint32>

func (*Broker) EncodeOffsetRequest

func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte

<Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>

func (*Broker) EncodePublishRequest

func (b *Broker) EncodePublishRequest(messages ...*Message) []byte

<Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>

func (*Broker) EncodeRequestHeader

func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer

Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>

type BrokerConsumer

type BrokerConsumer struct {
	// contains filtered or unexported fields
}

func NewBrokerConsumer

func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer

Create a new broker consumer hostname - host and optionally port, delimited by ':' topic to consume partition to consume from offset to start consuming from maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)

func NewBrokerOffsetConsumer

func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer

Simplified consumer that defaults the offset and maxSize to 0. hostname - host and optionally port, delimited by ':' topic to consume partition to consume from

func (*BrokerConsumer) AddCodecs

func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)

Add Custom Payload Codecs for Consumer Decoding payloadCodecs - an array of PayloadCodec implementations

func (*BrokerConsumer) Consume

func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error)

func (*BrokerConsumer) ConsumeOnChannel

func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error)

func (*BrokerConsumer) ConsumeUntilQuit

func (consumer *BrokerConsumer) ConsumeUntilQuit(pollTimeoutMs int64, quit chan os.Signal, msgHandler func(*Message)) (int64, int64, error)

Keeps consuming forward until quit, outputing errors, but not dying on them

func (*BrokerConsumer) GetOffsets

func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)

Get a list of valid offsets (up to maxNumOffsets) before the given time, where time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) The result is a list of offsets, in descending order.

type BrokerPublisher

type BrokerPublisher struct {
	// contains filtered or unexported fields
}

func NewBrokerPublisher

func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher

func (*BrokerPublisher) BatchPublish

func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error)

func (*BrokerPublisher) Publish

func (b *BrokerPublisher) Publish(message *Message) (int, error)

type GzipPayloadCodec

type GzipPayloadCodec struct {
}

func (*GzipPayloadCodec) Decode

func (codec *GzipPayloadCodec) Decode(data []byte) []byte

func (*GzipPayloadCodec) Encode

func (codec *GzipPayloadCodec) Encode(data []byte) []byte

func (*GzipPayloadCodec) Id

func (codec *GzipPayloadCodec) Id() byte

type Message

type Message struct {
	// contains filtered or unexported fields
}

func Decode

func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message)

func DecodeWithDefaultCodecs

func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message)

func NewCompressedMessage

func NewCompressedMessage(payload []byte) *Message

Create a Message using the default compression method (gzip)

func NewCompressedMessages

func NewCompressedMessages(messages ...*Message) *Message

func NewMessage

func NewMessage(payload []byte) *Message

Default is is create a message with no compression

func NewMessageWithCodec

func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message

func (*Message) Encode

func (m *Message) Encode() []byte

MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>

func (*Message) Offset

func (m *Message) Offset() uint64

func (*Message) Payload

func (m *Message) Payload() []byte

func (*Message) PayloadString

func (m *Message) PayloadString() string

func (*Message) Print

func (msg *Message) Print()

type MessageHandlerFunc

type MessageHandlerFunc func(msg *Message)

type NoCompressionPayloadCodec

type NoCompressionPayloadCodec struct {
}

func (*NoCompressionPayloadCodec) Decode

func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte

func (*NoCompressionPayloadCodec) Encode

func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte

func (*NoCompressionPayloadCodec) Id

func (codec *NoCompressionPayloadCodec) Id() byte

type PayloadCodec

type PayloadCodec interface {

	// the 1 byte id of the codec
	Id() byte

	// encoder interface for compression implementation
	Encode(data []byte) []byte

	// decoder interface for decompression implementation
	Decode(data []byte) []byte
}

type RequestType

type RequestType uint16

type Timing

type Timing struct {
	// contains filtered or unexported fields
}

func StartTiming

func StartTiming(label string) *Timing

func (*Timing) Print

func (t *Timing) Print()

func (*Timing) Stop

func (t *Timing) Stop()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL