native

package
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2017 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaProducer added in v0.4.3

type KafkaProducer struct {
	core.ProducerBase
	// contains filtered or unexported fields
}

KafkaProducer librdkafka producer plugin NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The kafka producer writes messages to a kafka cluster. This producer is backed by the native librdkafka (0.8.6) library so most settings relate to that library. This producer does not implement a fuse breaker. Configuration example

  • "native.KafkaProducer": ClientId: "weblog" RequiredAcks: 1 TimeoutMs: 1500 SendRetries: 0 Compression: "none" BatchSizeMaxKB: 1024 BatchMaxMessages: 100000 BatchMinMessages: 1000 BatchTimeoutMs: 1000 ServerTimeoutSec: 60 ServerMaxFails: 3 MetadataTimeoutMs: 1500 MetadataRefreshMs: 300000 SecurityProtocol: "plaintext" SslCipherSuites: "" SslKeyLocation: "" SslKeyPassword: "" SslCertificateLocation: "" SslCaLocation: "" SslCrlLocation: "" SaslMechanism: "" SaslUsername: "" SaslPassword: "" KeyFormatter: "" Servers:
  • "localhost:9092" Topic: "console" : "console"

SendRetries is mapped to message.send.max.retries. This defines the number of times librdkafka will try to re-send a message if it did not succeed. Set to 0 by default (don't retry).

Compression is mapped to compression.codec. Please note that "zip" has to be used instead of "gzip". Possible values are "none", "zip" and "snappy". By default this is set to "none".

TimeoutMs is mapped to request.timeout.ms. This defines the number of milliseconds to wait until a request is marked as failed. By default this is set to 1.5sec.

BatchSizeMaxKB is mapped to message.max.bytes (x1024). This defines the maximum message size in KB. By default this is set to 1 MB. Messages above this size are rejected.

BatchMaxMessages is mapped to queue.buffering.max.messages. This defines the maximum number of messages that can be pending at any given moment in time. If this limit is hit additional messages will be rejected. This value is set to 100.000 by default and should be adjusted according to your average message throughput.

BatchMinMessages is mapped to batch.num.messages. This defines the minimum number of messages required for a batch to be sent. This is set to 1000 by default and should be significantly lower than BatchMaxMessages to avoid messages to be rejected.

BatchTimeoutMs is mapped to queue.buffering.max.ms. This defines the number of milliseconds to wait until a batch is flushed to kafka. Set to 1sec by default.

ServerTimeoutSec is mapped to socket.timeout.ms. Defines the time in seconds after a server is defined as "not reachable". Set to 1 minute by default.

ServerMaxFails is mapped to socket.max.fails. Number of retries after a server is marked as "failing".

MetadataTimeoutMs is mapped to metadata.request.timeout.ms. Number of milliseconds a metadata request may take until considered as failed. Set to 1.5 seconds by default.

MetadataRefreshMs is mapped to topic.metadata.refresh.interval.ms. Interval in milliseconds for querying metadata. Set to 5 minutes by default.

SecurityProtocol is mapped to security.protocol. Protocol used to communicate with brokers. Set to plaintext by default.

SslCipherSuites is mapped to ssl.cipher.suites. Cipher Suites to use when connection via TLS/SSL. Not set by default.

SslKeyLocation is mapped to ssl.key.location. Path to client's private key (PEM) for used for authentication. Not set by default.

SslKeyPassword is mapped to ssl.key.password. Private key passphrase. Not set by default.

SslCertificateLocation is mapped to ssl.certificate.location. Path to client's public key (PEM) used for authentication. Not set by default.

SslCaLocation is mapped to ssl.ca.location. File or directory path to CA certificate(s) for verifying the broker's key. Not set by default.

SslCrlLocation is mapped to ssl.crl.location. Path to CRL for verifying broker's certificate validity. Not set by default.

SaslMechanism is mapped to sasl.mechanisms. SASL mechanism to use for authentication. Not set by default.

SaslUsername is mapped to sasl.username. SASL username for use with the PLAIN mechanism. Not set by default.

SaslPassword is mapped to sasl.password. SASL password for use with the PLAIN mechanism. Not set by default.

Servers defines the list of brokers to produce messages to.

Topic defines a stream to topic mapping. If a stream is not mapped a topic named like the stream is assumed.

KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.

KeyFormatterFirst can be set to true to apply the key formatter to the unformatted message. By default this is set to false, so that key formatter uses the message after Formatter has been applied. KeyFormatter does never affect the payload of the message sent to kafka.

FilterAfterFormat behaves like Filter but allows filters to be executed after the formatter has run. By default no such filter is set.

func (*KafkaProducer) Configure added in v0.4.3

func (prod *KafkaProducer) Configure(conf core.PluginConfig) error

Configure initializes this producer with values from a plugin config.

func (*KafkaProducer) OnMessageDelivered added in v0.4.3

func (prod *KafkaProducer) OnMessageDelivered(userdata []byte)

OnMessageDelivered gets called by librdkafka on message delivery success

func (*KafkaProducer) OnMessageError added in v0.4.3

func (prod *KafkaProducer) OnMessageError(reason string, userdata []byte)

OnMessageError gets called by librdkafka on message delivery failure

func (*KafkaProducer) Produce added in v0.4.3

func (prod *KafkaProducer) Produce(workers *sync.WaitGroup)

Produce writes to a buffer that is sent to a given socket.

type PcapHTTPConsumer

type PcapHTTPConsumer struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

PcapHTTPConsumer consumer plugin NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. This plugin utilizes libpcap to listen for network traffic and reassamble http requests from it. As it uses a CGO based library it will break cross platform builds (i.e. you will have to compile it on the correct platform). Configuration example

  • "native.PcapHTTPConsumer": Enable: true Interface: eth0 Filter: "dst port 80 and dst host 127.0.0.1" Promiscuous: true TimeoutMs: 3000

Interface defines the network interface to listen on. By default this is set to eth0, get your specific value from ifconfig.

Filter defines a libpcap filter for the incoming packages. You can filter for specific ports, portocols, ips, etc.. The documentation can be found here: http://www.tcpdump.org/manpages/pcap-filter.7.txt (manpage). By default this is set to listen on port 80 for localhost packages.

Promiscuous switches the network interface defined by Interface into promiscuous mode. This is required if you want to listen for all packages coming from the network, even those that were not meant for the ip bound to the interface you listen on. Enabling this can increase your CPU load. This setting is enabled by default.

TimeoutMs defines a timeout after which a tcp session is considered to have dropped, i.e. the (remaining) packages will be discarded. Every incoming packet will restart the timer for the specific client session. By default this is set to 3000, i.e. 3 seconds.

func (*PcapHTTPConsumer) Configure

func (cons *PcapHTTPConsumer) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*PcapHTTPConsumer) Consume

func (cons *PcapHTTPConsumer) Consume(workers *sync.WaitGroup)

Consume enables libpcap monitoring as configured.

type SystemdConsumer added in v0.4.5

type SystemdConsumer struct {
	core.ConsumerBase
	// contains filtered or unexported fields
}

SystemdConsumer consumer plugin NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The systemd consumer allows to read from the systemd journal. When attached to a fuse, this consumer will stop reading messages in case that fuse is burned. Configuration example

  • "native.Systemd": SystemdUnit: "sshd.service" DefaultOffset: "Newest" OffsetFile: ""

SystemdUnit defines what journal will be followed. This uses journal.add_match with _SYSTEMD_UNIT. By default this is set to "", which disables the filter.

DefaultOffset defines where to start reading the file. Valid values are "oldest" and "newest". If OffsetFile is defined the DefaultOffset setting will be ignored unless the file does not exist. By default this is set to "newest".

OffsetFile defines the path to a file that stores the current offset. If the consumer is restarted that offset is used to continue reading. By default this is set to "" which disables the offset file.

func (*SystemdConsumer) Configure added in v0.4.5

func (cons *SystemdConsumer) Configure(conf core.PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*SystemdConsumer) Consume added in v0.4.5

func (cons *SystemdConsumer) Consume(workers *sync.WaitGroup)

Consume enables systemd forwarding as configured.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL