Documentation ¶
Overview ¶
Package kafka provides kafka support for substrate
Usage ¶
This package support two methods of use. The first is to directly use this package. See the function documentation for more details.
The second method is to use the suburl package. See https://godoc.org/github.com/charlie/substrate/suburl for more information.
Using suburl ¶
The url structure is kafka://host:port/topic/
The following url parameters are available:
broker - Specifies additional broker addresses in the form host%3Aport (where %3A is a url encoded ':')
Additionally, for sources, the following url parameters are available
offset - The initial offset. Valid values are `newest` and `oldest`. consumer-group - The consumer group id metadata-refresh - How frequently to refresh the cluster metadata. E.g., '10s' '2m'
Index ¶
Constants ¶
View Source
const ( // OffsetOldest indicates the oldest appropriate message available on the broker. OffsetOldest int64 = sarama.OffsetOldest // OffsetNewest indicates the next appropriate message available on the broker. OffsetNewest int64 = sarama.OffsetNewest )
Variables ¶
This section is empty.
Functions ¶
func NewAsyncMessageSink ¶
func NewAsyncMessageSink(config AsyncMessageSinkConfig) (substrate.AsyncMessageSink, error)
func NewAsyncMessageSource ¶
func NewAsyncMessageSource(c AsyncMessageSourceConfig) (substrate.AsyncMessageSource, error)
Types ¶
type AsyncMessageSinkConfig ¶
type AsyncMessageSourceConfig ¶
type AsyncMessageSourceConfig struct { ConsumerGroup string Topic string Brokers []string Offset int64 MetadataRefreshFrequency time.Duration OffsetsRetention time.Duration SessionTimeout time.Duration Version string Debug bool }
AsyncMessageSource represents a kafka message source and implements the substrate.AsyncMessageSource interface.
type MessageWithMetadata ¶
type MessageWithMetadata interface {
GetMetadata() Metadata
}
MessageWithMetadata
Click to show internal directories.
Click to hide internal directories.