Documentation ¶
Index ¶
- Constants
- func AssertEqual(t *testing.T, expected, actual interface{})
- func AssertEqualInt64(t *testing.T, expected, actual int64)
- func CfgAuthOpts(cfg *viper.Viper) *shttp.AuthenticationOpts
- func ConfigFromJSON(t *testing.T, content []byte) *viper.Viper
- func Main(defaultCfgFile string)
- func NewAccountNone(cfg *viper.Viper) (interface{}, error)
- func NewClassifySubnet(cfg *viper.Viper) (interface{}, error)
- func NewClassifySubnetFromList(clusterNetMasks []string) (interface{}, error)
- func NewCompressGzip(cfg *viper.Viper) (interface{}, error)
- func NewCompressNone(cfg *viper.Viper) (interface{}, error)
- func NewEncodeCSV(cfg *viper.Viper) (interface{}, error)
- func NewEncodeJSON(cfg *viper.Viper) (interface{}, error)
- func NewFilterSubnet(cfg *viper.Viper) (interface{}, error)
- func NewMangleNone(cfg *viper.Viper) (interface{}, error)
- func NewStoreBuffered(cfg *viper.Viper) (interface{}, error)
- func NewStoreDirect(cfg *viper.Viper) (interface{}, error)
- func NewStoreHeaderNone(cfg *viper.Viper) (interface{}, error)
- func NewSubscriber(pipeline *Pipeline, cfg *viper.Viper) (*websocket.StructSpeaker, error)
- func NewTransformNone(cfg *viper.Viper) (interface{}, error)
- func NewWriteS3(cfg *viper.Viper) (interface{}, error)
- func NewWriteStdout(cfg *viper.Viper) (interface{}, error)
- func SubscriberRun(s *websocket.StructSpeaker)
- type Accounter
- type Classifier
- type Compressor
- type EncodeCSV
- type EncodeJSON
- type Encoder
- type Filterer
- type Handler
- type HandlersMap
- type Mangler
- type Pipeline
- type StoreHeader
- type Storer
- type Tag
- type Transformer
- type Writer
Constants ¶
const CfgRoot = "pipeline."
CfgRoot configuration root path
Variables ¶
This section is empty.
Functions ¶
func AssertEqual ¶
func AssertEqualInt64 ¶
func CfgAuthOpts ¶
func CfgAuthOpts(cfg *viper.Viper) *shttp.AuthenticationOpts
CfgAuthOpts creates the auth options form configuration
func NewAccountNone ¶
NewAccountNone create a new accounter
func NewClassifySubnet ¶
NewClassifySubnet returns a new classify based on cluster net masks from the global configuration
func NewClassifySubnetFromList ¶
NewClassifySubnetFromList returns a new classify based on the given cluster net masks
func NewCompressGzip ¶
NewCompressGzip create an encode object
func NewCompressNone ¶
NewCompressNone create an encode object
func NewEncodeCSV ¶
NewEncodeCSV creates an encode object
func NewEncodeJSON ¶
NewEncodeJSON creates an encode object
func NewFilterSubnet ¶
NewFilterSubnet returns a new filter based on config
func NewMangleNone ¶
NewMangleNone create a new mangle
func NewStoreBuffered ¶
NewStoreBuffered returns a new storage interface for storing flows to object store
func NewStoreDirect ¶
NewStoreDirect returns a new storage interface for storing flows to object store
func NewStoreHeaderNone ¶
NewTransformNone create a new transform
func NewSubscriber ¶
NewSubscriber returns a new flow subscriber writing to object store
func NewTransformNone ¶
NewTransformNone create a new transform
func NewWriteS3 ¶
NewWriteS3 creates a new S3-compatible object storage client
func NewWriteStdout ¶
NewWriteStdout returns a new storage interface for storing flows to object store
func SubscriberRun ¶
func SubscriberRun(s *websocket.StructSpeaker)
SubscriberRun runs the subscriber under main
Types ¶
type Accounter ¶
type Accounter interface { Reset() Add(bytes int64) }
Accounter helps in tracking how many bytes were exportered
type Classifier ¶
Classifier exposes the interface for tag based classification
type Compressor ¶
Compressor exposes the interface for compressesing encoded flows
type EncodeJSON ¶
type EncodeJSON struct {
// contains filtered or unexported fields
}
EncodeJSON encoder encodes flows as a JSON array
func (*EncodeJSON) Encode ¶
func (e *EncodeJSON) Encode(in interface{}) ([]byte, error)
Encode implements Encoder interface
type HandlersMap ¶
HandlersMap a map of handlers
var ( ManglerHandlers HandlersMap TransformerHandlers HandlersMap ClassifierHandlers HandlersMap FiltererHandlers HandlersMap StoreHeaderHandlers HandlersMap EncoderHandlers HandlersMap CompressorHandlers HandlersMap StorerHandlers HandlersMap AccounterHandlers HandlersMap WriterHandlers HandlersMap )
Global set of handlers
type Mangler ¶
type Mangler interface {
Mangle(in []interface{}) []interface{}
}
Mangler allows to change/enrich an entire batch of flows
type Pipeline ¶
type Pipeline struct { sync.Mutex Mangler Mangler Transformer Transformer Classifier Classifier Filterer Filterer StoreHeader StoreHeader Encoder Encoder Compressor Compressor Storer Storer Accounter Accounter Writer Writer }
Pipeline manager
func NewPipeline ¶
NewPipeline defines the pipeline elements
func (*Pipeline) OnStructMessage ¶
func (p *Pipeline) OnStructMessage(c ws.Speaker, msg *ws.StructMessage)
OnStructMessage is triggered when WS server sends us a message.
type StoreHeader ¶
type StoreHeader interface {
AddStoreHeader(flows []interface{}, startTime time.Time, endTime time.Time) interface{}
}
StoreHeader produces the header fields in the flow collection object
type Transformer ¶
Transformer allows generic transformations of a flow