intermediate

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxRetries    = 2
	MinExpiryTime = 100 * time.Millisecond
)

Functions

This section is empty.

Types

type AggregationElements

type AggregationElements struct {
	NonStatsElements                   []string
	StatsElements                      []string
	AggregatedSourceStatsElements      []string
	AggregatedDestinationStatsElements []string
	AntreaFlowEndSecondsElements       []string
	ThroughputElements                 []string
	SourceThroughputElements           []string
	DestinationThroughputElements      []string
}

type AggregationFlowRecord

type AggregationFlowRecord struct {
	Record entities.Record
	// Flow record contains mapping to its reference in priority queue.
	PriorityQueueItem *ItemToExpire
	// ReadyToSend is an indicator that we received all required records for the
	// given flow, i.e., records from source and destination nodes for the case
	// inter-node flow and record from the node for the case of intra-node flow.
	ReadyToSend bool
	// contains filtered or unexported fields
}

type AggregationInput

type AggregationInput struct {
	MessageChan           chan *entities.Message
	WorkerNum             int
	CorrelateFields       []string
	AggregateElements     *AggregationElements
	ActiveExpiryTimeout   time.Duration
	InactiveExpiryTimeout time.Duration
}

type AggregationProcess

type AggregationProcess struct {
	// contains filtered or unexported fields
}

func InitAggregationProcess

func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)

InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).

func (*AggregationProcess) AggregateMsgByFlowKey

func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error

AggregateMsgByFlowKey gets flow key from records in message and stores in cache

func (*AggregationProcess) AreCorrelatedFieldsFilled

func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) AreExternalFieldsFilled

func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) ForAllExpiredFlowRecordsDo

func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error

func (*AggregationProcess) ForAllRecordsDo

func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error

ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map

func (*AggregationProcess) GetExpiryFromExpirePriorityQueue

func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration

GetExpiryFromExpirePriorityQueue returns the earliest timestamp (active expiry or inactive expiry) from expire priority queue.

func (*AggregationProcess) GetNumFlows

func (a *AggregationProcess) GetNumFlows() int64

GetNumFlows returns total number of connections/flows stored in map

func (*AggregationProcess) GetRecords

func (a *AggregationProcess) GetRecords(flowKey *FlowKey) []map[string]interface{}

GetRecords returns map format flow records given a flow key. The key of the map is the element name and the value is the IE object. Returns partially matched flow records if the flow key is not complete. Returns all the flow records if the flow key is not provided.

func (*AggregationProcess) IsAggregatedRecordIPv4

func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool

func (*AggregationProcess) ResetStatAndThroughputElementsInRecord

func (a *AggregationProcess) ResetStatAndThroughputElementsInRecord(record entities.Record) error

ResetStatAndThroughputElementsInRecord is called by the user after the aggregation record is sent after its expiry either by active or inactive expiry interval. This should be called by user after acquiring the mutex in the Aggregation process.

func (*AggregationProcess) SetCorrelatedFieldsFilled

func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) SetExternalFieldsFilled

func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) Start

func (a *AggregationProcess) Start()

func (*AggregationProcess) Stop

func (a *AggregationProcess) Stop()

type FlowKey

type FlowKey struct {
	SourceAddress      string
	DestinationAddress string
	Protocol           uint8
	SourcePort         uint16
	DestinationPort    uint16
}

type FlowKeyRecordMapCallBack

type FlowKeyRecordMapCallBack func(key FlowKey, record *AggregationFlowRecord) error

type ItemToExpire

type ItemToExpire struct {
	// contains filtered or unexported fields
}

type TimeToExpirePriorityQueue

type TimeToExpirePriorityQueue []*ItemToExpire

func (TimeToExpirePriorityQueue) Len

func (pq TimeToExpirePriorityQueue) Len() int

func (TimeToExpirePriorityQueue) Less

func (pq TimeToExpirePriorityQueue) Less(i, j int) bool

func (TimeToExpirePriorityQueue) Peek

Peek returns the item at the beginning of the queue, without removing the item or otherwise mutating the queue. It is safe to call directly.

func (*TimeToExpirePriorityQueue) Pop

func (pq *TimeToExpirePriorityQueue) Pop() interface{}

func (*TimeToExpirePriorityQueue) Push

func (pq *TimeToExpirePriorityQueue) Push(x interface{})

func (TimeToExpirePriorityQueue) Swap

func (pq TimeToExpirePriorityQueue) Swap(i, j int)

func (*TimeToExpirePriorityQueue) Update

func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, activeExpireTime time.Time, inactiveExpireTime time.Time)

update modifies the priority and flow record of an Item in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL