consumer

package
v0.4.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2016 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Console

type Console struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Console consumer plugin This consumer reads from stdin. A message is generated after each newline character. When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned. Configuration example

  • "consumer.Console": ExitOnEOF: false

ExitOnEOF can be set to true to trigger an exit signal if StdIn is closed (e.g. when a pipe is closed). This is set to false by default.

func (*Console) Configure

func (cons *Console) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Console) Consume

func (cons *Console) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type File

type File struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

File consumer plugin The file consumer allows to read from files while looking for a delimiter that marks the end of a message. If the file is part of e.g. a log rotation the file consumer can be set to a symbolic link of the latest file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed. When attached to a fuse, this consumer will stop accepting messages in case that fuse is burned. Configuration example

  • "consumer.File": File: "/var/run/system.log" DefaultOffset: "Newest" OffsetFile: "" Delimiter: "\n"

File is a mandatory setting and contains the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped. I.e. appends to the attached file will be recognized automatically.

DefaultOffset defines where to start reading the file. Valid values are "oldest" and "newest". If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to "newest".

OffsetFile defines the path to a file that stores the current offset inside the given file. If the consumer is restarted that offset is used to continue reading. By default this is set to "" which disables the offset file.

Delimiter defines the end of a message inside the file. By default this is set to "\n".

func (*File) Configure

func (cons *File) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*File) Consume

func (cons *File) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type Http

type Http struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Http consumer plugin This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request. When attached to a fuse, this consumer will return error 503 in case that fuse is burned. Configuration example

  • "consumer.Http": Address: ":80" ReadTimeoutSec: 3 WithHeaders: true Certificate: "" PrivateKey: ""

Address stores the host and port to bind to. This is allowed be any ip address/dns and port like "localhost:5880". By default this is set to ":80".

ReadTimeoutSec specifies the maximum duration in seconds before timing out the HTTP read request. By default this is set to 3 seconds.

WithHeaders can be set to false to only read the HTTP body instead of passing the whole HTTP message. By default this setting is set to true.

Certificate defines a path to a root certificate file to make this consumer handle HTTPS connections. Left empty by default (disabled). If a Certificate is given, a PrivateKey must be given, too.

PrivateKey defines a path to the private key used for HTTPS connections. Left empty by default (disabled). If a Certificate is given, a PrivatKey must be given, too.

func (*Http) Configure

func (cons *Http) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (Http) Consume

func (cons Http) Consume(workers *sync.WaitGroup)

Consume opens a new http server listen on specified ip and port (address)

type Kafka

type Kafka struct {
	core.ConsumerBase

	MaxPartitionID int32
	// contains filtered or unexported fields
}

Kafka consumer plugin Thes consumer reads data from a given kafka topic. It is based on the sarama library so most settings are mapped to the settings from this library. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. Configuration example

  • "consumer.Kafka": Topic: "default" ClientId: "gollum" Version: "0.8.2" DefaultOffset: "newest" OffsetFile: "" FolderPermissions: "0755" Ordered: true MaxOpenRequests: 5 ServerTimeoutSec: 30 MaxFetchSizeByte: 0 MinFetchSizeByte: 1 FetchTimeoutMs: 250 MessageBufferCount: 256 PresistTimoutMs: 5000 ElectRetries: 3 ElectTimeoutMs: 250 MetadataRefreshMs: 10000 PrependKey: false KeySeparator: ":" Servers:
  • "localhost:9092"

Topic defines the kafka topic to read from. By default this is set to "default".

ClientId sets the client id of this producer. By default this is "gollum".

Version defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form "A.B" are allowed as well as "A.B.C" and "A.B.C.D". Defaults to "0.8.2". If the version given is not known, the closest possible version is chosen.

DefaultOffset defines where to start reading the topic. Valid values are "oldest" and "newest". If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to "newest".

OffsetFile defines the path to a file that stores the current offset inside a given partition. If the consumer is restarted that offset is used to continue reading. By default this is set to "" which disables the offset file.

FolderPermissions is used to create the offset file path if necessary. Set to 0755 by default.

Ordered can be set to enforce partitions to be read one-by-one in a round robin fashion instead of reading in parallel from all partitions. Set to false by default.

PrependKey can be enabled to prefix the read message with the key from the kafka message. A separator will ba appended to the key. See KeySeparator. By default this is option set to false.

KeySeparator defines the separator that is appended to the kafka message key if PrependKey is set to true. Set to ":" by default.

MaxOpenRequests defines the number of simultanious connections are allowed. By default this is set to 5.

ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.

MaxFetchSizeByte sets the maximum size of a message to fetch. Larger messages will be ignored. By default this is set to 0 (fetch all messages).

MinFetchSizeByte defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this is set to 1.

FetchTimeoutMs defines the time in milliseconds the broker will wait for MinFetchSizeByte to be reached before processing data anyway. By default this is set to 250ms.

MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.

PresistTimoutMs defines the time in milliseconds between writes to OffsetFile. By default this is set to 5000. Shorter durations reduce the amount of duplicate messages after a fail but increases I/O.

ElectRetries defines how many times to retry during a leader election. By default this is set to 3.

ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.

MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 10000. This corresponds to the JVM setting `topic.metadata.refresh.interval.ms`.

Servers contains the list of all kafka servers to connect to. By default this is set to contain only "localhost:9092".

func (*Kafka) Configure

func (cons *Kafka) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Kafka) Consume

func (cons *Kafka) Consume(workers *sync.WaitGroup)

Consume starts a kafka consumer per partition for this topic

type Kinesis added in v0.4.2

type Kinesis struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Kinesis consumer plugin This consumer reads message from an AWS Kinesis stream. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. Configuration example

  • "consumer.Kinesis": KinesisStream: "default" Region: "eu-west-1" Endpoint: "kinesis.eu-west-1.amazonaws.com" DefaultOffset: "Newest" OffsetFile: "" RecordsPerQuery: 100 RecordMessageDelimiter: "" QuerySleepTimeMs: 1000 RetrySleepTimeSec: 4 CredentialType: "none" CredentialId: "" CredentialToken: "" CredentialSecret: "" CredentialFile: "" CredentialProfile: ""

KinesisStream defines the stream to read from. By default this is set to "default"

Region defines the amazon region of your kinesis stream. By default this is set to "eu-west-1".

Endpoint defines the amazon endpoint for your kinesis stream. By default this is et to "kinesis.eu-west-1.amazonaws.com"

CredentialType defines the credentials that are to be used when connectiong to kensis. This can be one of the following: environment, static, shared, none. Static enables the parameters CredentialId, CredentialToken and CredentialSecretm shared enables the parameters CredentialFile and CredentialProfile. None will not use any credentials and environment will pull the credentials from environmental settings. By default this is set to none.

DefaultOffset defines the message index to start reading from. Valid values are either "Newset", "Oldest", or a number. The default value is "Newest".

OffsetFile defines a file to store the current offset per shard. By default this is set to "", i.e. it is disabled. If a file is set and found consuming will start after the stored offset.

RecordsPerQuery defines the number of records to pull per query. By default this is set to 100.

RecordMessageDelimiter defines the string to delimit messages within a record. By default this is set to "", i.e. it is disabled.

QuerySleepTimeMs defines the number of milliseconds to sleep before trying to pull new records from a shard that did not return any records. By default this is set to 1000.

RetrySleepTimeSec defines the number of seconds to wait after trying to reconnect to a shard. By default this is set to 4.

func (*Kinesis) Configure added in v0.4.2

func (cons *Kinesis) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Kinesis) Consume added in v0.4.2

func (cons *Kinesis) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type Profiler

type Profiler struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Profiler consumer plugin The profiler plugin generates Runs x Batches messages and send them to the configured streams as fast as possible. This consumer can be used to profile producers and/or configurations. When attached to a fuse, this consumer will stop processing messages in case that fuse is burned. Configuration example

  • "consumer.Profile": Runs: 10000 Batches: 10 TemplateCount: 10 Characters: "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890" Message: "%256s" DelayMs: 0 KeepRunning: false

Runs defines the number of messages per batch. By default this is set to 10000.

Batches defines the number of measurement runs to do. By default this is set to 10.

TemplateCount defines the number of message templates to be generated. A random message template will be chosen when a message is sent. Templates are generated in advance. By default this is set to 10.

Characters defines the characters to be used in generated strings. By default these are "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890 ".

Message defines a go format string to be used to generate the message payloads. The length of the values generated will be deducted from the format size parameter. I.e. "%200d" will generate a digit between 0 and 200, "%10s" will generate a string with 10 characters, etc.. By default this is set to "%256s".

DelayMs defines the number of milliseconds of sleep between messages. By default this is set to 0.

KeepRunning can be set to true to disable automatic shutdown of gollum after profiling is done. This can be used to e.g. read metrics after a profile run. By default this is set to false.

func (*Profiler) Configure

func (cons *Profiler) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Profiler) Consume

func (cons *Profiler) Consume(workers *sync.WaitGroup)

Consume starts a profile run and exits gollum when done

type Proxy

type Proxy struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Proxy consumer plugin. The proxy consumer reads messages directly as-is from a given socket. Messages are extracted by standard message size algorithms (see Partitioner). This consumer can be used with any compatible proxy producer to establish a two-way communication. When attached to a fuse, this consumer will stop accepting new connections and close all existing connections in case that fuse is burned. Configuration example

  • "consumer.Proxy": Address: ":5880" Partitioner: "delimiter" Delimiter: "\n" Offset: 0 Size: 1

Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880". UDP is not supported.

Partitioner defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is included into the left hand message.
  • "ascii" reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • "binary" reads a binary number at a given offset and size.
  • "binary_le" is an alias for "binary".
  • "binary_be" is the same as "binary" but uses big endian encoding.
  • "fixed" assumes fixed size messages.

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

func (*Proxy) Configure

func (cons *Proxy) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Proxy) Consume

func (cons *Proxy) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Socket

type Socket struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Socket consumer plugin The socket consumer reads messages directly as-is from a given socket. Messages are separated from the stream by using a specific partitioner method. When attached to a fuse, this consumer will stop accepting new connections (closing the socket) and close all existing connections in case that fuse is burned. Configuration example

  • "consumer.Socket": Address: ":5880" Permissions: "0770" Acknowledge: "" Partitioner: "delimiter" Delimiter: "\n" Offset: 0 Size: 1 ReconnectAfterSec: 2 AckTimoutSec: 2 ReadTimeoutSec: 5

Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to ":5880".

Permissions sets the file permissions for "unix://" based connections as an four digit octal number string. By default this is set to "0770".

Acknowledge can be set to a non-empty value to inform the writer on success or error. On success the given string is send. Any error will close the connection. This setting is disabled by default, i.e. set to "". If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used. If an error occurs during write "NOT <Acknowledge>" is returned.

Partitioner defines the algorithm used to read messages from the stream. By default this is set to "delimiter".

  • "delimiter" separates messages by looking for a delimiter string. The delimiter is removed from the message.
  • "ascii" reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • "binary" reads a binary number at a given offset and size.
  • "binary_le" is an alias for "binary".
  • "binary_be" is the same as "binary" but uses big endian encoding.
  • "fixed" assumes fixed size messages.

Delimiter defines the delimiter used by the text and delimiter partitioner. By default this is set to "\n".

Offset defines the offset used by the binary and text partitioner. By default this is set to 0. This setting is ignored by the fixed partitioner.

Size defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8. By default 4 is chosen. For fixed this defines the size of a message. By default 1 is chosen.

ReconnectAfterSec defines the number of seconds to wait before a connection is tried to be reopened again. By default this is set to 2.

AckTimoutSec defines the number of seconds waited for an acknowledge to succeed. Set to 2 by default.

ReadTimoutSec defines the number of seconds that waited for data to be received. Set to 5 by default.

RemoveOldSocket toggles removing exisiting files with the same name as the socket (unix://<path>) prior to connecting. Enabled by default.

func (*Socket) Configure

func (cons *Socket) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Socket) Consume

func (cons *Socket) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Syslogd

type Syslogd struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

Syslogd consumer plugin The syslogd consumer accepts messages from a syslogd comaptible socket. When attached to a fuse, this consumer will stop the syslogd service in case that fuse is burned. Configuration example

  • "consumer.Syslogd": Address: "udp://0.0.0.0:514" Format: "RFC6587"

Address defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". By default this is set to "udp://0.0.0.0:514". The protocol can be defined along with the address, e.g. "tcp://..." but this may be ignored if a certain protocol format does not support the desired transport protocol.

Format defines the syslog standard to expect for message encoding. Three standards are currently supported, by default this is set to "RFC6587".

func (*Syslogd) Configure

func (cons *Syslogd) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*Syslogd) Consume

func (cons *Syslogd) Consume(workers *sync.WaitGroup)

Consume opens a new syslog socket. Messages are expected to be separated by \n.

func (*Syslogd) Handle

func (cons *Syslogd) Handle(parts format.LogParts, code int64, err error)

Handle implements the syslog handle interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL