consumer

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2015 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Console

type Console struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Console consumer plugin Configuration example

  • "consumer.Console": Enable: true ExitOnEOF: true

This consumer reads from stdin. A message is generated after each newline character.

ExitOnEOF can be set to true to trigger an exit signal if StdIn is closed (e.g. when a pipe is closed). This is set to false by default.

This consumer does not define any options beside the standard ones.

func (*Console) Configure

func (cons *Console) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Console) Consume

func (cons *Console) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type File

type File struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

File consumer plugin Configuration example

  • "consumer.File": Enable: true File: "test.txt" DefaultOffset: "Oldest" OffsetFile: "/tmp/test.progress" Delimiter: "\n"

The file consumer allows to read from files while looking for a delimiter that marks the end of a message. If the file is part of e.g. a log rotation the file consumer can set to a symbolic link of the latest file and be told to reopen the file by sending a SIGHUP.

File is a mandatory setting and contains the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped. This means appends to the file will be recognized. Symlinks are always resolved, i.e. changing the symlink target will be ignored unless gollum is restarted or a log rotation is triggered.

DefaultOffset defines where to start reading the file. Valid values are "oldest" and "newest". If OffsetFile is defined this setting will be used only if the file does not exist. If OffsetFile is not defined this setting will allways be used. By default this is set to Newest.

OffsetFile defines the path to a file that stores the current offset inside the file. If the consumer is restarted that offset is used to continue reading.

Delimiter defines the end of a message inside the file. By default this is set to "\n".

func (*File) Configure

func (cons *File) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*File) Consume

func (cons *File) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type Http

type Http struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Http consumer plugin Configuration example

  • "consumer.Http": Enable: true Address: ":80" ReadTimeoutSec: 5 WithHeaders: false

Address stores the identifier to bind to. This is allowed be any ip address/dns and port like "localhost:5880". By default this is set to ":80".

ReadTimeoutSec specifies the maximum duration in seconds before timing out read of the request. By default this is set to 3 seconds.

WithHeaders can be set to false to only read the HTTP body instead of passing the while HTTP message. By default this setting is set to true.

func (*Http) Configure

func (cons *Http) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (Http) Consume

func (cons Http) Consume(workers *sync.WaitGroup)

Consume opens a new http server listen on specified ip and port (address)

type Kafka

type Kafka struct {
	core.ConsumerBase

	MaxPartitionID int32
	// contains filtered or unexported fields
}

Kafka consumer plugin Configuration example

  • "consumer.Kafka": Enable: true DefaultOffset: "Newest" OffsetFile: "/tmp/gollum_kafka.idx" ClientID: "logger" MaxOpenRequests: 6 ServerTimeoutSec: 10 MaxFetchSizeByte: 8192 MinFetchSizeByte: 0 FetchTimeoutMs: 500 MessageBufferCount: 1024 PresistTimoutMs: 1000 ElectRetries: 5 ElectTimeoutMs: 300 MetadataRefreshMs: 3000 Servers:
  • "192.168.222.30:9092"
  • "192.168.222.31:9092"

The kafka consumer reads from a given kafka topic. This consumer is based on the sarama library so most settings relate to the settings from this library.

DefaultOffset defines the message index to start reading from. Valid values are either "Newset", "Oldest", or a number. The default value is "Newest".

OffsetFile defines a path to a file containing the current index per topic partition. If a file is given the index stored in this file will be used as the default offset for a stored parition. If the partition is not stored in this file DefaultOffset is used.

ClientId sets the client id of this producer. By default this is "gollum".

MaxOpenRequests defines the number of simultanious connections are allowed. By default this is set to 5.

ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.

MaxFetchSizeByte sets the maximum size of a message to fetch. Larger messages will be ignored. By default this is set to 0 (fetch all messages).

MinFetchSizeByte defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this is set to 1.

FetchTimeoutMs defines the time in milliseconds the broker will wait for MinFetchSizeByte to be reached before processing data anyway. By default this is set to 250ms.

MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 256.

PresistTimoutMs defines the time in milliseconds between writes to OffsetFile. By default this is set to 5000. Shorter durations reduce the amount of duplicate messages after a fail but increases I/O.

ElectRetries defines how many times to retry during a leader election. By default this is set to 3.

ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.

MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 10000. This corresponds to the JVM setting `topic.metadata.refresh.interval.ms`.

Servers contains the list of all kafka servers to connect to. This setting is mandatory and thus has no defaults.

func (*Kafka) Configure

func (cons *Kafka) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Kafka) Consume

func (cons *Kafka) Consume(workers *sync.WaitGroup)

Consume starts a kafka consumer per partition for this topic

type LoopBack

type LoopBack struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

LoopBack consumer plugin for processing retried messages Configuration example

  • "consumer.LoopBack": Enable: true Channel: 8192 Routes: "_DROPPED_": "myStream" "myStream":
  • "myOtherStream"
  • "myOtherStream2"

The loopback consumer defines the configuration for the internal loopback or dropped message stream. Messages that are dropped because of a channel timeout will arrive at the _DROPPED_ channel. This consumer ignores the "Stream" setting.

Channel defines the size of the retry queue. By default this is set to 8192.

Routes defines a 1:n stream remapping. Messages reaching the LoopBack consumer are reassigned to the given stream(s). If no Route is set the message will be sent to its original stream.

func (*LoopBack) Configure

func (cons *LoopBack) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*LoopBack) Consume

func (cons *LoopBack) Consume(workers *sync.WaitGroup)

Consume is fetching and forwarding messages from the feedbackQueue

type Profiler

type Profiler struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Profiler consumer plugin Configuration example

  • "consumer.Console": Enable: true Runs: 100000 Batches: 100 TemplateCount: 20 Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_" Message: "{name:\"%100s\", number: %2d, float: %4f}"

The profiler plugin generates Runs x Batches messages and send them to the configured streams as fast as possible. This consumer can be used to profile producers and/or configurations.

Runs defines the number of messages per batch. By default this is set to 10000.

Batches defines the number of measurement runs to do. By default this is set to 10.

TemplateCount defines the number of message templates to be generated. A random message template will be chosen when a message is sent. Templates are generated in advance. By default this is set to 10.

Characters defines the characters to be used in generated strings. By default these are "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890 ".

Message defines a go format string to be used to generate the message payloads. The length of the values generated will be deducted from the format size parameter. I.e. "%200d" will generate a digit between 0 and 200, "%10s" will generate a string with 10 characters, etc.. By default this is set to "%256s".

func (*Profiler) Configure

func (cons *Profiler) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Profiler) Consume

func (cons *Profiler) Consume(workers *sync.WaitGroup)

Consume starts a profile run and exits gollum when done

type Proxy

type Proxy struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Proxy consumer plugin. Configuration example

  • "consumer.Proxy": Enable: true Address: "unix:///var/gollum.socket" Partitioner: "ascii" Delimiter: ":" Offset: 1

The proxy consumer reads messages directly as-is from a given socket. Messages are extracted by standard message size algorithms (see Parititioner). This consumer can be used with any compatible proxy producer to establish a two-way communication.

Address stores the identifier to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880". UDP is not supported.

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is included into the left hand message.
  • "ascii" reads an ASCII encoded number at a given offset until a given delimiter is found.
  • "binary" reads a binary number at a given offset and size
  • "binary_le" is an alias for "binary"
  • "binary_be" is the same as "binary" but uses big endian encoding
  • "fixed" assumes fixed size messages

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text paritioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

func (*Proxy) Configure

func (cons *Proxy) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Proxy) Consume

func (cons *Proxy) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Socket

type Socket struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Socket consumer plugin Configuration example

  • "consumer.Socket": Enable: true Address: "unix:///var/gollum.socket" Acknowledge: "ACK\n" Partitioner: "ascii" Delimiter: ":" Offset: 1

The socket consumer reads messages directly as-is from a given socket. Messages are separated from the stream by using a specific paritioner method.

Address stores the identifier to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880".

Acknowledge can be set to a non-empty value to inform the writer on success or error. On success the given string is send. Any error will close the connection. This setting is disabled by default, i.e. set to "". If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used.

Partitioner defines the algorithm used to read messages from the stream. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is removed from the message.
  • "ascii" reads an ASCII encoded number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • "binary" reads a binary number at a given offset and size
  • "binary_le" is an alias for "binary"
  • "binary_be" is the same as "binary" but uses big endian encoding
  • "fixed" assumes fixed size messages

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text paritioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

func (*Socket) Configure

func (cons *Socket) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Socket) Consume

func (cons *Socket) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Syslogd

type Syslogd struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Syslogd consumer plugin Configuration example

  • "consumer.Syslogd": Enable: true Address: "udp://0.0.0.0:5880" Format: "RFC3164"

The syslogd consumer accepts messages from a syslogd comaptible socket.

Address stores the identifier to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to "udp://0.0.0.0:514". The protocol can be defined along with the address, e.g. "tcp://..." but this may be ignored if a certain protocol format does not support the desired transport protocol.

Format define the used syslog standard. Three standards are currently supported:

By default this is set to "RFC6587".

func (*Syslogd) Configure

func (cons *Syslogd) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Syslogd) Consume

func (cons *Syslogd) Consume(workers *sync.WaitGroup)

Consume opens a new syslog socket. Messages are expected to be separated by \n.

func (*Syslogd) Handle

func (cons *Syslogd) Handle(parts syslogparser.LogParts, code int64, err error)

Handle implements the syslog handle interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL