consumer

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2019 License: Apache-2.0 Imports: 36 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AwsKinesis added in v0.5.0

type AwsKinesis struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`

	// AwsMultiClient is public to make AwsMultiClient.Configure() callable
	AwsMultiClient components.AwsMultiClient `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

AwsKinesis consumer

This consumer reads a message from an AWS Kinesis router.

Parameters

- KinesisStream: This value defines the stream to read from. By default this parameter is set to "default".

- OffsetFile: This value defines a file to store the current offset per shard. To disable this parameter, set it to "". If the parameter is set and the file is found, consuming will start after the offset stored in the file. By default this parameter is set to "".

- RecordsPerQuery: This value defines the number of records to pull per query. By default this parameter is set to "100".

- RecordMessageDelimiter: This value defines the string to delimit messages within a record. To disable this parameter, set it to "". By default this parameter is set to "".

- QuerySleepTimeMs: This value defines the number of milliseconds to sleep before trying to pull new records from a shard that did not return any records. By default this parameter is set to "1000".

- RetrySleepTimeSec: This value defines the number of seconds to wait after trying to reconnect to a shard. By default this parameter is set to "4".

- CheckNewShardsSec: This value sets a timer to update shards in Kinesis. You can set this parameter to "0" for disabling. By default this parameter is set to "0".

- DefaultOffset: This value defines the message index to start reading from. Valid values are either "newest", "oldest", or a number. By default this parameter is set to "newest".

Examples

This example consumes a kinesis stream "myStream" and create messages:

KinesisIn:
  Type: consumer.AwsKinesis
  Credential:
    Type: shared
    File: /Users/<USERNAME>/.aws/credentials
    Profile: default
  Region: "eu-west-1"
  KinesisStream: myStream

func (*AwsKinesis) Configure added in v0.5.0

func (cons *AwsKinesis) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*AwsKinesis) Consume added in v0.5.0

func (cons *AwsKinesis) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

type Console

type Console struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Console consumer

This consumer reads from stdin or a named pipe. A message is generated after each newline character.

Metadata

*NOTE: The metadata will only set if the parameter `SetMetadata` is active.*

- pipe: Name of the pipe the message was received on (set)

Parameters

- Pipe: Defines the pipe to read from. This can be "stdin" or the path to a named pipe. If the named pipe doesn't exist, it will be created. By default this paramater is set to "stdin".

- Permissions: Defines the UNIX filesystem permissions used when creating the named pipe as an octal number. By default this paramater is set to "0664".

- ExitOnEOF: If set to true, the plusing triggers an exit signal if the pipe is closed, i.e. when EOF is detected. By default this paramater is set to "true".

- SetMetadata: When this value is set to "true", the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to "false".

Examples

This config reads data from stdin e.g. when starting gollum via unix pipe.

ConsoleIn:
  Type: consumer.Console
  Streams: console
  Pipe: stdin

func (*Console) Configure

func (cons *Console) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Console) Consume

func (cons *Console) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

func (*Console) Enqueue added in v0.5.0

func (cons *Console) Enqueue(data []byte)

Enqueue creates a new message

type File

type File struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

File consumer plugin

The File consumer reads messages from a file, looking for a customizable delimiter sequence that marks the end of a message. If the file is part of e.g. a log rotation, the consumer can be set to read from a symbolic link pointing to the current file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed.

Metadata

*NOTE: The metadata will only set if the parameter `SetMetadata` is active.*

- file: The file name of the consumed file (set)

- dir: The directory of the consumed file (set)

Parameters

- File: This value is a mandatory setting and contains the name of the file to read. The file will be read from beginning to end and the reader will stay attached until the consumer is stopped, so appends to the file will be recognized automatically.

- OffsetFile: This value defines the path to a file that stores the current offset inside the source file. If the consumer is restarted, that offset is used to continue reading from the previous position. To disable this setting, set it to "". By default this parameter is set to "".

- Delimiter: This value defines the delimiter sequence to expect at the end of each message in the file. By default this parameter is set to "\n".

- ObserveMode: This value select how the source file is observed. Available values are `poll` and `watch`. NOTE: The watch implementation uses the fsnotify/fsnotify(https://github.com/fsnotify/fsnotify) package. If your source file is rotated (moved or removed), please verify that your file system and distribution support the `RENAME` and `REMOVE` events; the consumer's stability depends on them. By default this parameter is set to `poll`.

- DefaultOffset: This value defines the default offset from which to start reading within the file. Valid values are "oldest" and "newest". If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. By default this parameter is set to "newest".

- PollingDelay: This value defines the duration the consumer waits between checking the source file for new content after hitting the end of file (EOF). The value is in milliseconds (ms). NOTE: This settings only takes effect if the consumer is running in `poll` mode! By default this parameter is set to "100".

- SetMetadata: When this value is set to "true", the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to "false".

Examples

This example will read the `/var/log/system.log` file and create a message for each new entry.

FileIn:
  Type: consumer.File
  File: /var/log/system.log
  DefaultOffset: newest
  OffsetFile: ""
  Delimiter: "\n"
  ObserveMode: poll
  PollingDelay: 100

func (*File) Configure

func (cons *File) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*File) Consume

func (cons *File) Consume(workers *sync.WaitGroup)

Consume listens to stdin.

func (*File) Enqueue added in v0.5.0

func (cons *File) Enqueue(data []byte)

Enqueue creates a new message

type HTTP added in v0.5.0

type HTTP struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

HTTP consumer plugin

This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request.

Parameters

- Address: Defines the TCP port and optional IP address to listen on. Sets http.Server.Addr; for defails, see its Go documentation.

Syntax: [hostname|address]:<port>

- ReadTimeoutSec: Defines the maximum duration in seconds before timing out the HTTP read request. Sets http.Server.ReadTimeout; for details, see its Go documentation.

- WithHeaders: If true, relays the complete HTTP request to the generated Gollum message. If false, relays only the HTTP request body and ignores headers.

- Htpasswd: Path to an htpasswd-formatted password file. If defined, turns on HTTP Basic Authentication in the server.

- BasicRealm: Defines the Authentication Realm for HTTP Basic Authentication. Meaningful only in conjunction with Htpasswd.

- Certificate: Path to an X509 formatted certificate file. If defined, turns on SSL/TLS support in the HTTP server. Requires PrivateKey to be set.

- PrivateKey: Path to an X509 formatted private key file. Meaningful only in conjunction with Certificate.

Examples

This example listens on port 9090 and writes to the stream "http_in_00".

"HttpIn00":
  Type: "consumer.HTTP"
  Streams: "http_in_00"
  Address: "localhost:9090"
  WithHeaders: false

func (*HTTP) Configure added in v0.5.0

func (cons *HTTP) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (HTTP) Consume added in v0.5.0

func (cons HTTP) Consume(workers *sync.WaitGroup)

Consume opens a new http server listen on specified ip and port (address)

type Kafka

type Kafka struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`

	MaxPartitionID int32
	// contains filtered or unexported fields
}

Kafka consumer

This consumer reads data from a kafka topic. It is based on the sarama library; most settings are mapped to the settings from this library.

Metadata

*NOTE: The metadata will only set if the parameter `SetMetadata` is active.*

- topic: Contains the name of the kafka topic

- key: Contains the key of the kafka message

Parameters

- Servers: Defines the list of all kafka brokers to initially connect to when querying topic metadata. This list requires at least one borker to work and ideally contains all the brokers in the cluster. By default this parameter is set to ["localhost:9092"].

- Topic: Defines the kafka topic to read from. By default this parameter is set to "default".

- ClientId: Sets the client id used in requests by this consumer. By default this parameter is set to "gollum".

- GroupId: Sets the consumer group of this consumer. If empty, consumer groups are not used. This setting requires Kafka version >= 0.9. By default this parameter is set to "".

- Version: Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form "A.B" are allowed as well as "A.B.C" and "A.B.C.D". If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < "0.9", "0.9.0.1" will be used. By default this parameter is set to "0.8.2".

- SetMetadata: When this value is set to "true", the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to "false".

- DefaultOffset: Defines the initial offest when starting to read the topic. Valid values are "oldest" and "newest". If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. If GroupId is defined, this setting will only be used for the first request. By default this parameter is set to "newest".

- OffsetFile: Defines the path to a file that holds the current offset of a given partition. If the consumer is restarted, reading continues from that offset. To disable this setting, set it to "". Please note that offsets stored in the file might be outdated. In that case DefaultOffset "oldest" will be used. By default this parameter is set to "".

- FolderPermissions: Used to create the path to the offset file if necessary. By default this parameter is set to "0755".

- Ordered: Forces partitions to be read one-by-one in a round robin fashion instead of reading them all in parallel. Please note that this may restore the original ordering but does not necessarily do so. The term "ordered" refers to an ordered reading of all partitions, as opposed to reading them randomly. By default this parameter is set to false.

- MaxOpenRequests: Defines the number of simultaneous connections to a broker at a time. By default this parameter is set to 5.

- ServerTimeoutSec: Defines the time after which a connection will time out. By default this parameter is set to 30.

- MaxFetchSizeByte: Sets the maximum size of a message to fetch. Larger messages will be ignored. When set to 0 size of the messages is ignored. By default this parameter is set to 0.

- MinFetchSizeByte: Defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this parameter is set to 1.

- DefaultFetchSizeByte: Defines the average amout of data to fetch per request. This value must be greater than 0. By default this parameter is set to 32768.

- FetchTimeoutMs: Defines the time in milliseconds to wait on reaching MinFetchSizeByte before fetching new data regardless of size. By default this parameter is set to 250.

- MessageBufferCount: Sets the internal channel size for the kafka client. By default this parameter is set to 8192.

- PresistTimoutMs: Defines the interval in milliseconds in which data is written to the OffsetFile. A short duration reduces the amount of duplicate messages after a crash but increases I/O. When using GroupId this setting controls the pause time after receiving errors. By default this parameter is set to 5000.

- ElectRetries: Defines how many times to retry fetching the new master partition during a leader election. By default this parameter is set to 3.

- ElectTimeoutMs: Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

- MetadataRefreshMs: Defines the interval in milliseconds used for fetching kafka metadata from the cluster (e.g. number of partitons). By default this parameter is set to 10000.

- TlsEnable: Defines whether to use TLS based authentication when communicating with brokers. By default this parameter is set to false.

- TlsKeyLocation: Defines the path to the client's PEM-formatted private key used for TLS based authentication. By default this parameter is set to "".

- TlsCertificateLocation: Defines the path to the client's PEM-formatted public key used for TLS based authentication. By default this parameter is set to "".

- TlsCaLocation: Defines the path to the CA certificate(s) for verifying a broker's key when using TLS based authentication. By default this parameter is set to "".

- TlsServerName: Defines the expected hostname used by hostname verification when using TlsInsecureSkipVerify. By default this parameter is set to "".

- TlsInsecureSkipVerify: Enables verification of the server's certificate chain and host name. By default this parameter is set to false.

- SaslEnable:Defines whether to use SASL based authentication when communicating with brokers. By default this parameter is set to false.

- SaslUsername: Defines the username for SASL/PLAIN authentication. By default this parameter is set to "gollum".

- SaslPassword: Defines the password for SASL/PLAIN authentication. By default this parameter is set to "".

Examples

This config reads the topic "logs" from a cluster with 4 brokers.

kafkaIn:
  Type: consumer.Kafka
  Streams: logs
  Topic: logs
  ClientId: "gollum log reader"
  DefaultOffset: newest
  OffsetFile: /var/gollum/logs.offset
  Servers:
    - "kafka0:9092"
    - "kafka1:9092"
    - "kafka2:9092"
    - "kafka3:9092"

func (*Kafka) Configure

func (cons *Kafka) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Kafka) Consume

func (cons *Kafka) Consume(workers *sync.WaitGroup)

Consume starts a kafka consumer per partition for this topic

type Profiler

type Profiler struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Profiler consumer plugin

The "Profiler" consumer plugin autogenerates messages in user-defined quantity, size and density. It can be used to profile producers and configurations and to provide a message source for testing.

Before startup, [TemplateCount] template payloads are generated based on the format specifier [Message], using characters from [Characters]. The length of each template is determined by format size specifier(s) in [Message].

During execution, [Batches] batches of [Runs] messages are generated, with a [DelayMs] ms delay between each message. Each message's payload is randomly selected from the set of template payloads above.

Parameters

- Runs: Defines the number of messages per batch.

- Batches: Defines the number of batches to generate.

- TemplateCount: Defines the number of message templates to generate. Templates are generated in advance and a random message template is chosen from this set every time a message is sent.

- Characters: Defines the set of characters use when generated templates.

- Message: Defines a go format string to use for generating the message templaets. The length of the values generated will be deduced from the format size parameter - "%200d" will generate a digit between 0 and 200, "%10s" will generate a string with 10 characters, etc.

- DelayMs: Defines the number of milliseconds to sleep between messages.

- KeepRunning: If set to true, shuts down Gollum after Batches * Runs messages have been generated. This can be used to e.g. read metrics after a profile run.

Examples

Generate a short message every 0.5s, useful for testing and debugging

JunkGenerator:
  Type: "consumer.Profiler"
  Message: "%20s"
  Streams: "junkstream"
  Characters: "abcdefghijklmZ"
  KeepRunning: true
  Runs: 10000
  Batches: 3000000
  DelayMs: 500

func (*Profiler) Configure

func (cons *Profiler) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Profiler) Consume

func (cons *Profiler) Consume(workers *sync.WaitGroup)

Consume starts a profile run and exits gollum when done

type Proxy

type Proxy struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Proxy consumer

This consumer reads messages from a given socket like consumer.Socket but allows reverse communication, too. Producers which require this kind of communication can access message.GetSource to write data back to the client sending the message. See producer.Proxy as an example target producer.

Parameters

- Address: Defines the protocol, host and port or the unix domain socket to listen to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". Only unix and tcp protocols are supported. By default this parameter is set to ":5880".

- Partitioner: Defines the algorithm used to read messages from the router. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to "delimiter".

  • delimiter: Separates messages by looking for a delimiter string. The delimiter is removed from the message.

  • ascii: Reads an ASCII number at a given offset until a given delimiter is found. Everything to the left of and including the delimiter is removed from the message.

  • binary: reads a binary number at a given offset and size. The number is removed from the message.

  • binary_le: is an alias for "binary".

  • binary_be: acts like "binary"_le but uses big endian encoding.

  • fixed: assumes fixed size messages.

- Delimiter: Defines the delimiter string used to separate messages if partitioner is set to "delimiter" or the string used to separate the message length if partitioner is set to "ascii". By default this parameter is set to "\n".

- Offset: Defines an offset in bytes used to read the length provided for partitioner "binary" and "ascii". By default this parameter is set to 0.

- Size: Defines the size of the length prefix used by partitioner "binary" or the message total size when using partitioner "fixed". When using partitioner "binary" this parameter can be set to 1,2,4 or 8 when using uint8,uint16,uint32 or uint64 length prefixes. By default this parameter is set to 4.

Examples

This example will accepts 64bit length encoded data on TCP port 5880.

proxyReceive:
  Type: consumer.Proxy
  Streams: proxyData
  Address: ":5880"
  Partitioner: binary
  Size: 8

func (*Proxy) Configure

func (cons *Proxy) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Proxy) Consume

func (cons *Proxy) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Socket

type Socket struct {
	sync.Mutex
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Socket consumer plugin

The socket consumer reads messages as-is from a given network or filesystem socket. Messages are separated from the stream by using a specific partitioner method.

Parameters

- Address: This value defines the protocol, host and port or socket to bind to. This can either be any ip address and port like "localhost:5880" or a file like "unix:///var/gollum.socket". Valid protocols can be derived from the golang net package documentation. Common values are "udp", "tcp" and "unix". By default this parameter is set to "tcp://0.0.0.0:5880".

- Permissions: This value sets the filesystem permissions for UNIX domain sockets as a four-digit octal number. By default this parameter is set to "0770".

- Acknowledge: This value can be set to a non-empty value to inform the writer that data has been accepted. On success, the given string is sent. Any error will close the connection. Acknowledge does not work with UDP based sockets. By default this parameter is set to "".

- Partitioner: This value defines the algorithm used to read messages from the router. By default this is set to "delimiter". The following options are available:

  • "delimiter": Separates messages by looking for a delimiter string. The delimiter is removed from the message.
  • "ascii": Reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.
  • "binary": Reads a binary number at a given offset and size.
  • "binary_le": An alias for "binary".
  • "binary_be": The same as "binary" but uses big endian encoding.
  • "fixed": Assumes fixed size messages.

- Delimiter: This value defines the delimiter used by the text and delimiter partitioners. By default this parameter is set to "\n".

- Offset: This value defines the offset used by the binary and text partitioners. This setting is ignored by the fixed partitioner. By default this parameter is set to "0".

- Size: This value defines the size in bytes used by the binary and fixed partitioners. For binary, this can be set to 1,2,4 or 8. The default value is 4. For fixed , this defines the size of a message. By default this parameter is set to "1".

- ReconnectAfterSec: This value defines the number of seconds to wait before a connection is retried. By default this parameter is set to "2".

- AckTimeoutSec: This value defines the number of seconds to wait for acknowledges to succeed. By default this parameter is set to "1".

- ReadTimeoutSec: This value defines the number of seconds to wait for data to be received. This setting affects the maximum shutdown duration of this consumer. By default this parameter is set to "2".

- RemoveOldSocket: If set to true, any existing file with the same name as the socket (unix://<path>) is removed prior to connecting. By default this parameter is set to "true".

Examples

This example open a socket and expect messages with a fixed length of 256 bytes:

socketIn:
  Type: consumer.Socket
  Address: unix:///var/gollum.socket
  Partitioner: fixed
  Size: 256

func (*Socket) Configure

func (cons *Socket) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Socket) Consume

func (cons *Socket) Consume(workers *sync.WaitGroup)

Consume listens to a given socket.

type Syslogd

type Syslogd struct {
	core.SimpleConsumer `gollumdoc:"embed_type"`
	// contains filtered or unexported fields
}

Syslogd consumer plugin

The syslogd consumer creates a syslogd-compatible log server and receives messages on a TCP or UDP port or a UNIX filesystem socket.

Parameters

- Address: Defines the IP address or UNIX socket to listen to. This can take one of the four forms below, to listen on a TCP, UDP or UNIX domain socket. However, see the "Format" option for details on transport support by different formats. * [hostname|ip]:<tcp-port> * tcp://<hostname|ip>:<tcp-port> * udp://<hostname|ip>:<udp-port> * unix://<filesystem-path> By default this parameter is set to "udp://0.0.0.0:514"

- Format: Defines which syslog standard the server will support. Three standards, listed below, are currently available. All standards support listening to UDP and UNIX domain sockets. RFC6587 additionally supports TCP sockets. Default: "RFC6587". * RFC3164 (https://tools.ietf.org/html/rfc3164) - unix, udp * RFC5424 (https://tools.ietf.org/html/rfc5424) - unix, udp * RFC6587 (https://tools.ietf.org/html/rfc6587) - unix, upd, tcp By default this parameter is set to "RFC6587".

- SetMetadata: When set to true, syslog based metadata will be attached to the message. The metadata fields added depend on the protocol version used. RFC3164 supports: tag, timestamp, hostname, priority, facility, severity. RFC5424 and RFC6587 support: app_name, version, proc_id , msg_id, timestamp, hostname, priority, facility, severity. By default this parameter is set to "false".

- TimestampFormat: When using SetMetadata this string denotes the go time format used to convert syslog timestamps into strings. By default this parameter is set to "2006-01-02T15:04:05.000 MST".

Examples

Replace the system's standard syslogd with Gollum

SyslogdSocketConsumer:
  Type: consumer.Syslogd
  Streams: "system_syslog"
  Address: "unix:///dev/log"
  Format: "RFC3164"

Listen on a TCP socket

SyslogdTCPSocketConsumer:
  Type: consumer.Syslogd
  Streams: "tcp_syslog"
  Address: "tcp://0.0.0.0:5599"
  Format: "RFC6587"

func (*Syslogd) Configure

func (cons *Syslogd) Configure(conf core.PluginConfigReader)

Configure initializes this consumer with values from a plugin config.

func (*Syslogd) Consume

func (cons *Syslogd) Consume(workers *sync.WaitGroup)

Consume opens a new syslog socket. Messages are expected to be separated by \n.

func (*Syslogd) Handle

func (cons *Syslogd) Handle(parts format.LogParts, code int64, err error)

Handle implements the syslog handle interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL