core

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2020 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const CfgRoot = "pipeline."

CfgRoot configuration root path

Variables

This section is empty.

Functions

func AssertEqual

func AssertEqual(t *testing.T, expected, actual interface{})

func AssertEqualInt64

func AssertEqualInt64(t *testing.T, expected, actual int64)

func CfgAuthOpts

func CfgAuthOpts(cfg *viper.Viper) *shttp.AuthenticationOpts

CfgAuthOpts creates the auth options form configuration

func ConfigFromJSON

func ConfigFromJSON(t *testing.T, content []byte) *viper.Viper

func Main

func Main(defaultCfgFile string)

Main entry point for exporters

func NewAccountNone

func NewAccountNone(cfg *viper.Viper) (interface{}, error)

NewAccountNone create a new accounter

func NewClassifySubnet

func NewClassifySubnet(cfg *viper.Viper) (interface{}, error)

NewClassifySubnet returns a new classify based on cluster net masks from the global configuration

func NewClassifySubnetFromList

func NewClassifySubnetFromList(clusterNetMasks []string) (interface{}, error)

NewClassifySubnetFromList returns a new classify based on the given cluster net masks

func NewCompressGzip

func NewCompressGzip(cfg *viper.Viper) (interface{}, error)

NewCompressGzip create an encode object

func NewCompressNone

func NewCompressNone(cfg *viper.Viper) (interface{}, error)

NewCompressNone create an encode object

func NewEncodeCSV

func NewEncodeCSV(cfg *viper.Viper) (interface{}, error)

NewEncodeCSV creates an encode object

func NewEncodeJSON

func NewEncodeJSON(cfg *viper.Viper) (interface{}, error)

NewEncodeJSON creates an encode object

func NewFilterSubnet

func NewFilterSubnet(cfg *viper.Viper) (interface{}, error)

NewFilterSubnet returns a new filter based on config

func NewMangleNone

func NewMangleNone(cfg *viper.Viper) (interface{}, error)

NewMangleNone create a new mangle

func NewStoreBuffered

func NewStoreBuffered(cfg *viper.Viper) (interface{}, error)

NewStoreBuffered returns a new storage interface for storing flows to object store

func NewStoreDirect

func NewStoreDirect(cfg *viper.Viper) (interface{}, error)

NewStoreDirect returns a new storage interface for storing flows to object store

func NewStoreHeaderNone

func NewStoreHeaderNone(cfg *viper.Viper) (interface{}, error)

NewTransformNone create a new transform

func NewSubscriber

func NewSubscriber(pipeline *Pipeline, cfg *viper.Viper) (*websocket.StructSpeaker, error)

NewSubscriber returns a new flow subscriber writing to object store

func NewTransformNone

func NewTransformNone(cfg *viper.Viper) (interface{}, error)

NewTransformNone create a new transform

func NewWriteS3

func NewWriteS3(cfg *viper.Viper) (interface{}, error)

NewWriteS3 creates a new S3-compatible object storage client

func NewWriteStdout

func NewWriteStdout(cfg *viper.Viper) (interface{}, error)

NewWriteStdout returns a new storage interface for storing flows to object store

func SubscriberRun

func SubscriberRun(s *websocket.StructSpeaker)

SubscriberRun runs the subscriber under main

Types

type Accounter

type Accounter interface {
	Reset()
	Add(bytes int64)
}

Accounter helps in tracking how many bytes were exportered

type Classifier

type Classifier interface {
	GetFlowTag(fl *flow.Flow) Tag
}

Classifier exposes the interface for tag based classification

type Compressor

type Compressor interface {
	Compress(b []byte) (*bytes.Buffer, error)
}

Compressor exposes the interface for compressesing encoded flows

type EncodeCSV

type EncodeCSV struct {
}

EncodeCSV encoder encodes flows as CSV rows

func (*EncodeCSV) Encode

func (e *EncodeCSV) Encode(in interface{}) ([]byte, error)

Encode implements Encoder interface

type EncodeJSON

type EncodeJSON struct {
	// contains filtered or unexported fields
}

EncodeJSON encoder encodes flows as a JSON array

func (*EncodeJSON) Encode

func (e *EncodeJSON) Encode(in interface{}) ([]byte, error)

Encode implements Encoder interface

type Encoder

type Encoder interface {
	Encode(in interface{}) ([]byte, error)
}

Encoder exposes the interface for encoding flows

type Filterer

type Filterer interface {
	IsExcluded(tag Tag) bool
}

Filterer exposes the interface for tag based filtering

type Handler

type Handler = func(cfg *viper.Viper) (interface{}, error)

Handler used for creating a phase handler from configuration

type HandlersMap

type HandlersMap map[string]Handler

HandlersMap a map of handlers

var (
	ManglerHandlers     HandlersMap
	TransformerHandlers HandlersMap
	ClassifierHandlers  HandlersMap
	FiltererHandlers    HandlersMap
	StoreHeaderHandlers HandlersMap
	EncoderHandlers     HandlersMap
	CompressorHandlers  HandlersMap
	StorerHandlers      HandlersMap
	AccounterHandlers   HandlersMap
	WriterHandlers      HandlersMap
)

Global set of handlers

func (HandlersMap) Init

func (m HandlersMap) Init(cfg *viper.Viper, phase string) (interface{}, error)

Init creates resource from config

func (HandlersMap) Register

func (m HandlersMap) Register(name string, handler Handler, isDefault bool)

Register associates a handler with its' label

type Mangler

type Mangler interface {
	Mangle(in []interface{}) []interface{}
}

Mangler allows to change/enrich an entire batch of flows

type Pipeline

type Pipeline struct {
	sync.Mutex

	Mangler     Mangler
	Transformer Transformer
	Classifier  Classifier
	Filterer    Filterer
	StoreHeader StoreHeader
	Encoder     Encoder
	Compressor  Compressor
	Storer      Storer
	Accounter   Accounter
	Writer      Writer
}

Pipeline manager

func NewPipeline

func NewPipeline(cfg *viper.Viper) (*Pipeline, error)

NewPipeline defines the pipeline elements

func (*Pipeline) OnStructMessage

func (p *Pipeline) OnStructMessage(c ws.Speaker, msg *ws.StructMessage)

OnStructMessage is triggered when WS server sends us a message.

type StoreHeader

type StoreHeader interface {
	AddStoreHeader(flows []interface{}, startTime time.Time, endTime time.Time) interface{}
}

StoreHeader produces the header fields in the flow collection object

type Storer

type Storer interface {
	StoreFlows(flows map[Tag][]interface{}) error
	SetPipeline(p *Pipeline)
}

Storer interface of a store object

type Tag

type Tag string

Tag represents the flow classification

type Transformer

type Transformer interface {
	Transform(f *flow.Flow) interface{}
}

Transformer allows generic transformations of a flow

type Writer

type Writer interface {
	Write(bucket, objectKey, data, contentType, contentEncoding string, metadata map[string]*string) error
}

Writer allows uploading objects to an object storage service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL