krater

package module
v2.1.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2016 License: MIT Imports: 10 Imported by: 7

README

krater

GoDoc

An io.Writer / io.ReaderFrom that produces to Kafka using sarama.

Includes an example command line tool that writes from stdin to Kafka. Documentation for the tool here.

Documentation

Overview

Package krater provides io.Writer and io.ReaderFrom implementations that produce messages to Kafka. Each Write() call will be written as a separate message.

AckingWriter

AckingWriter's Write and ReadFrom methods write messages to Kafka, blocking until a response is received from the broker. To allow for this, the sarama producer used to create a new AckingWriter must have Producer.Return.Successes = true and Producer.Return.Errors = true in their Config.

Example of AckingWriter usage (error checking and imports omitted for brevity):

pc := sarama.NewConfig()
// these must both be true or the writer will deadlock
pc.Producer.Return.Successes = true
pc.Producer.Return.Errors = true

kp, err := sarama.NewAsyncProducer(opts.Brokers, pc)

// writer for topic "example-topic", allowing at most 10 concurrent writes
aw := NewAckingWriter("example-topic", kp, 10)
aw.Write([]byte("ahoy thar")) // this will block until Kafka responds

UnsafeWriter

UnsafeWriter's Write and ReadFrom methods write messages to Kafka without waiting for responses from the broker. Both methods will block only if the Producer's Input() channel is full. Errors are ignored. The following example will use Kafka as the output of the standard logger package.

pc := sarama.NewConfig()
// these must both be false or the writer will deadlock
pc.Producer.Return.Successes = false
pc.Producer.Return.Errors = false

kp, err := sarama.NewAsyncProducer(opts.Brokers, pc)

uw := NewUnsafeWriter("example-unsafe", kp)
log.New(uw, "[AHOY] ", log.LstdFlags) // create new logger that writes to uw
log.Println("Well this is handy")

Index

Constants

This section is empty.

Variables

View Source
var LogOutput io.Writer = ioutil.Discard

LogOutput is the writer used by krater's loggers

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

Functions

func LogTo

func LogTo(w io.Writer) func()

LogTo makes krater and sarama loggers output to the given writer by replacing sarama.Logger and LogOutput. It returns a function that sets LogOutput and sarama.Logger to whatever values they had before the call to LogTo.

As an example

defer LogTo(os.Stderr)()

would start logging to stderr immediately and defer restoring the loggers.

Types

type AckingWriter

type AckingWriter struct {
	// contains filtered or unexported fields
}

AckingWriter is an io.Writer that writes messages to Kafka. Parallel calls to Write() will cause messages to be queued by the producer, and each Write() call will block until a response is received from the broker.

The AsyncProducer passed to NewAckingWriter must have Config.Return.Successes == true and Config.Return.Errors == true

Close() must be called when the writer is no longer needed.

func NewAckingWriter

func NewAckingWriter(topic string, kp sarama.AsyncProducer, maxConcurrent int) *AckingWriter

NewAckingWriter returns an AckingWriter that uses kp to produce messages to Kafka topic 'topic', with a maximum of maxConcurrent concurrent writes.

kp MUST have been initialized with AckSuccesses = true or Write will block indefinitely.

func (*AckingWriter) Close

func (aw *AckingWriter) Close() error

Close closes the writer. If the writer has already been closed, Close will return syscall.EINVAL. Thread-safe.

func (*AckingWriter) Closed

func (aw *AckingWriter) Closed() bool

Closed returns true if the AckingWriter has been closed, false otherwise. Thread-safe.

func (*AckingWriter) ReadFrom

func (aw *AckingWriter) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom reads all available bytes from r and writes them to Kafka in a single message. The returned int64 will always be the total length of bytes read from r or 0 if reading from r returned an error. Trying to Write to a closed writer will return syscall.EINVAL

Note that AckingWriter doesn't support "streaming", so r is read in full before it's sent.

Implements io.ReaderFrom.

func (*AckingWriter) SetKeyer

func (aw *AckingWriter) SetKeyer(fn KeyerFn)

SetKeyer sets the keyer function used to specify keys for messages. Defaults to having nil keys for all messages. SetKeyer is NOT thread safe, and it must not be used if any writes are underway.

func (*AckingWriter) SetLogger

func (aw *AckingWriter) SetLogger(l StdLogger)

SetLogger sets the logger used by this AckingWriter. Not thread-safe.

func (*AckingWriter) Write

func (aw *AckingWriter) Write(p []byte) (n int, err error)

Write will queue p as a single message, blocking until a response is received. n will always be len(p) if the message was sent successfully, 0 otherwise. The message's key is determined by the keyer function set with SetKeyer, and defaults to nil. Trying to Write to a closed writer will return syscall.EINVAL.

Thread-safe.

Implements io.Writer.

type GroupReader

type GroupReader struct {
	// contains filtered or unexported fields
}

func NewGroupReader

func NewGroupReader(group string, topics []string, zookeeper string, cgConf *kafkaconsumer.Config) (gr *GroupReader, err error)

func (*GroupReader) Close

func (gr *GroupReader) Close() (err error)

func (GroupReader) String

func (gr GroupReader) String() string

func (*GroupReader) WriteTo

func (gr *GroupReader) WriteTo(w io.Writer) (n int64, err error)

WriteTo joins the consumer group and starts consuming from its topics.

type KeyerFn

type KeyerFn func(*sarama.ProducerMessage) sarama.Encoder

type KeyerFn represents any function that can turn a message into a key for that particular message

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
	Panic(v ...interface{})
	Panicf(format string, v ...interface{})
}

StdLogger is the interface for log.Logger-compatible loggers

type UnsafeWriter

type UnsafeWriter struct {
	// contains filtered or unexported fields
}

UnsafeWriter is an io.Writer that writes messages to Kafka, ignoring any error responses sent by the brokers. Parallel calls to Write / ReadFrom are safe.

The AsyncProducer passed to NewUnsafeWriter must have Config.Return.Successes == false and Config.Return.Errors == false

Close() must be called when the writer is no longer needed.

func NewUnsafeWriter

func NewUnsafeWriter(topic string, kp sarama.AsyncProducer) *UnsafeWriter

func (*UnsafeWriter) Close

func (uw *UnsafeWriter) Close() (err error)

Close closes the writer. If the writer has already been closed, Close will return syscall.EINVAL. Thread-safe.

func (*UnsafeWriter) Closed

func (uw *UnsafeWriter) Closed() bool

Closed returns true if the UnsafeWriter has been closed, false otherwise. Thread-safe.

func (*UnsafeWriter) ReadFrom

func (uw *UnsafeWriter) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads all available bytes from r and writes them to Kafka without checking for broker error responses. The returned error will be either nil or anything returned when reading from r. The returned int64 will always be the total length of bytes read from r or 0 if reading from r returned an error. Trying to ReadFrom using a closed Writer will return syscall.EINVAL.

Note that UnsafeWriter doesn't support "streaming", so r is read in full before it's sent.

Implements io.ReaderFrom.

func (*UnsafeWriter) SetKeyer

func (uw *UnsafeWriter) SetKeyer(fn KeyerFn)

SetKeyer sets the keyer function used to specify keys for messages. Defaults to having nil keys for all messages. SetKeyer is NOT thread safe, and it must not be used if any writes are underway.

func (*UnsafeWriter) SetLogger

func (uw *UnsafeWriter) SetLogger(l StdLogger)

SetLogger sets the logger used by this UnsafeWriter. Not thread-safe.

func (*UnsafeWriter) Write

func (uw *UnsafeWriter) Write(p []byte) (n int, err error)

Write writes byte slices to Kafka without checking for error responses. n will always be len(p) and err will be nil. Trying to Write to a closed writer will return syscall.EINVAL. Thread-safe.

Write might block if the Input() channel of the underlying sarama.AsyncProducer is full.

Directories

Path Synopsis
cmd
to_kafka
to_kafka reads delimited data from stdin and writes it to Kafka.
to_kafka reads delimited data from stdin and writes it to Kafka.
package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github
package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL