kafka

package
v0.0.0-...-d6b1de3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2012 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression
	MAGIC_DEFAULT = 1
	// magic + compression + chksum
	NO_LEN_HEADER_SIZE = 1 + 1 + 4
)
View Source
const (
	NO_COMPRESSION_ID   = 0
	GZIP_COMPRESSION_ID = 1
)
View Source
const (
	REQUEST_PRODUCE      RequestType = 0
	REQUEST_FETCH                    = 1
	REQUEST_MULTIFETCH               = 2
	REQUEST_MULTIPRODUCE             = 3
	REQUEST_OFFSETS                  = 4
)

Request Types

View Source
const (
	NETWORK = "tcp"
)

Variables

View Source
var DefaultCodecsMap = codecsMap(DefaultCodecs)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func (*Broker) EncodeConsumeRequest

func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte

<Request Header><OFFSET: uint64><MAX SIZE: uint32>

func (*Broker) EncodeOffsetRequest

func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte

<Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>

func (*Broker) EncodePublishRequest

func (b *Broker) EncodePublishRequest(messages ...*Message) []byte

<Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>

func (*Broker) EncodeRequestHeader

func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer

Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>

type BrokerConsumer

type BrokerConsumer struct {
	// contains filtered or unexported fields
}

func NewBrokerConsumer

func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer

Create a new broker consumer hostname - host and optionally port, delimited by ':' topic to consume partition to consume from offset to start consuming from maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)

func NewBrokerOffsetConsumer

func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer

Simplified consumer that defaults the offset and maxSize to 0. hostname - host and optionally port, delimited by ':' topic to consume partition to consume from

func (*BrokerConsumer) AddCodecs

func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)

Add Custom Payload Codecs for Consumer Decoding payloadCodecs - an array of PayloadCodec implementations

func (*BrokerConsumer) Consume

func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error)

func (*BrokerConsumer) ConsumeOnChannel

func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error)

func (*BrokerConsumer) GetOffsets

func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error)

Get a list of valid offsets (up to maxNumOffsets) before the given time, where time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) The result is a list of offsets, in descending order.

type BrokerPublisher

type BrokerPublisher struct {
	// contains filtered or unexported fields
}

func NewBrokerPublisher

func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher

func (*BrokerPublisher) BatchPublish

func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error)

func (*BrokerPublisher) Publish

func (b *BrokerPublisher) Publish(message *Message) (int, os.Error)

type GzipPayloadCodec

type GzipPayloadCodec struct {
}

func (*GzipPayloadCodec) Decode

func (codec *GzipPayloadCodec) Decode(data []byte) []byte

func (*GzipPayloadCodec) Encode

func (codec *GzipPayloadCodec) Encode(data []byte) []byte

func (*GzipPayloadCodec) Id

func (codec *GzipPayloadCodec) Id() byte

type Message

type Message struct {
	// contains filtered or unexported fields
}

func Decode

func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message)

func DecodeWithDefaultCodecs

func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message)

func NewCompressedMessage

func NewCompressedMessage(payload []byte) *Message

Create a Message using the default compression method (gzip)

func NewCompressedMessages

func NewCompressedMessages(messages ...*Message) *Message

func NewMessage

func NewMessage(payload []byte) *Message

Default is is create a message with no compression

func NewMessageWithCodec

func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message

func (*Message) Encode

func (m *Message) Encode() []byte

MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>

func (*Message) Offset

func (m *Message) Offset() uint64

func (*Message) Payload

func (m *Message) Payload() []byte

func (*Message) PayloadString

func (m *Message) PayloadString() string

func (*Message) Print

func (msg *Message) Print()

type MessageHandlerFunc

type MessageHandlerFunc func(msg *Message)

type NoCompressionPayloadCodec

type NoCompressionPayloadCodec struct {
}

func (*NoCompressionPayloadCodec) Decode

func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte

func (*NoCompressionPayloadCodec) Encode

func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte

func (*NoCompressionPayloadCodec) Id

func (codec *NoCompressionPayloadCodec) Id() byte

type PayloadCodec

type PayloadCodec interface {

	// the 1 byte id of the codec
	Id() byte

	// encoder interface for compression implementation
	Encode(data []byte) []byte

	// decoder interface for decompression implementation
	Decode(data []byte) []byte
}

type RequestType

type RequestType uint16

type Timing

type Timing struct {
	// contains filtered or unexported fields
}

func StartTiming

func StartTiming(label string) *Timing

func (*Timing) Print

func (t *Timing) Print()

func (*Timing) Stop

func (t *Timing) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL