stream_aggregator

package module
v0.0.0-...-7672542 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2016 License: MPL-2.0 Imports: 8 Imported by: 0

README

heka-stream-aggregator

Stream aggregation plugin for Mozilla Heka

StreamAggregatorFilter

This is a filter that can be used for aggregating multiple payloads into a single message. It accepts an encoder option that will be used prior to aggregating each payload. Each payload is separated by the delimiter config value.

Config:

  • stream_aggregator_tag: Optional tagging for identifying new pack down the pipeline since you will lose any Fields previously held. This setting creates a new Heka Field called "StreamAggregatorTag" and is given the value of this option. Defaults to "aggregated"

  • flush_interval: Interval at which accumulated payloads should be compressed in milliseconds. Defaults to 1000 (i.e. one second)

  • flush_bytes: Number of payloads that, if processed, will trigger them to be compressed. Defaults to 10.

  • encoder: This option will run each Payload through the specified encoder prior to aggregating.(required)

Example:

[filter_stream_aggregator]
type = "StreamAggregatorFilter"
message_matcher = "Fields[decoded] == 'True'"
stream_aggregator_tag = "aggregated"
flush_interval = 30000
flush_bytes = 1000000
encoder = "encoder_json"
delimiter = "\n" # Default

Example2 for bulk ES inserts:

[filter_stream_aggregator]
type = "StreamAggregatorFilter"
message_matcher = "Fields[decoded] == 'True'"
stream_aggregator_tag = "aggregated"
flush_interval = 30000
flush_bytes = 1000000
encoder = "ESLogstashV0Encoder"
delimiter = ""

[ESLogstashV0Encoder]
index = "logstash-%{program}-%{2006.01.02}"
type_name = "%{program}"
es_index_from_timestamp = true
id = "%{id}"

HTTP Output heka instance

[HttpOutput]
message_matcher = "Fields[StreamAggregatorTag] == 'aggregated'"
address = "http://es01.foo.bar:9200/_bulk"
encoder	= "encoder_payload"

StreamSplitterDecoder

Used inside of MultiDecoder for splitting a payload generated by a StreamAggregatorFilter

Example:

[multi_decoder]
type = "MultiDecoder"
order = ['zlib_decoder', 'split_decoder', 'json_decoder']

[multi_decoder.subs.zlib_decoder]
type = "ZlibDecoder"

[multi_decoder.subs.split_decoder]
type = "StreamSplitterDecoder"
delimiter = "\n"

[multi_decoder.subs.json_decoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "/usr/share/heka/lua_decoders/json_decoder.lua"
preserve_data = true

To Build

See Building hekad with External Plugins for compiling in plugins.

Edit cmake/plugin_loader.cmake file and add

add_external_plugin(git https://github.com/michaelgibson/heka-stream-aggregator master)

Build Heka: . ./build.sh

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MsgPack

type MsgPack struct {
	// contains filtered or unexported fields
}

type StreamAggregatorBatch

type StreamAggregatorBatch struct {
	// contains filtered or unexported fields
}

type StreamAggregatorFilter

type StreamAggregatorFilter struct {
	*StreamAggregatorFilterConfig
	// contains filtered or unexported fields
}

func (*StreamAggregatorFilter) CleanUp

func (f *StreamAggregatorFilter) CleanUp()

func (*StreamAggregatorFilter) ConfigStruct

func (f *StreamAggregatorFilter) ConfigStruct() interface{}

func (*StreamAggregatorFilter) Init

func (f *StreamAggregatorFilter) Init(config interface{}) (err error)

func (*StreamAggregatorFilter) Prepare

func (f *StreamAggregatorFilter) Prepare(fr FilterRunner, h PluginHelper) error

func (*StreamAggregatorFilter) ProcessMessage

func (f *StreamAggregatorFilter) ProcessMessage(pack *PipelinePack) error

type StreamAggregatorFilterConfig

type StreamAggregatorFilterConfig struct {
	Delimiter string `toml:"delimiter"` // Delimiter used to append to end of each protobuf for splitting on when decoding later.
	// Defaults to '\n'
	FlushInterval uint32 `toml:"flush_interval"`
	FlushBytes    int    `toml:"flush_bytes"`
	// Number of messages that triggers a flush
	// (default to 10)
	FlushCount          int    `toml:"flush_count"`
	StreamAggregatorTag string `toml:"stream_aggregator_tag"`
	EncoderName         string `toml:"encoder"`
	UseBuffering        bool   `toml:"use_buffering"`
}

type StreamSplitterDecoder

type StreamSplitterDecoder struct {
	*StreamSplitterDecoderConfig
	// contains filtered or unexported fields
}

func (*StreamSplitterDecoder) ConfigStruct

func (ld *StreamSplitterDecoder) ConfigStruct() interface{}

func (*StreamSplitterDecoder) Decode

func (ld *StreamSplitterDecoder) Decode(pack *PipelinePack) (packs []*PipelinePack, err error)

Runs the message payload against decoder's map of JSONPaths. If there's a match, the message will be populated based on the decoder's message template, with capture values interpolated into the message template values.

func (*StreamSplitterDecoder) Init

func (ld *StreamSplitterDecoder) Init(config interface{}) (err error)

func (*StreamSplitterDecoder) SetDecoderRunner

func (ld *StreamSplitterDecoder) SetDecoderRunner(dr DecoderRunner)

Heka will call this to give us access to the runner.

type StreamSplitterDecoderConfig

type StreamSplitterDecoderConfig struct {
	// Keyed to the message field that should be filled in, the value will be
	// interpolated so it can use capture parts from the message match.
	Delimiter string `toml:"delimiter"` // Delimiter used to append to end of each protobuf for splitting on when decoding later.

}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL