action

package
v0.0.0-...-d841f61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2021 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PriorityField = "priority"
	IdField       = "_id"
)
View Source
const (
	TaskNew = iota
	TaskNotMatched
	TaskCancelled
	TaskRpcError
)
View Source
const AbandonedDuration = 60
View Source
const MaxRetries = 5

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action struct {
	Type                     string                    `bson:"type" json:"type"`
	Parameters               interface{}               `bson:"parameters,omitempty" json:"parameters,omitempty"` // parameters for the action
	AlarmPatterns            pattern.AlarmPatternList  `bson:"alarm_patterns" json:"alarm_patterns"`
	EntityPatterns           pattern.EntityPatternList `bson:"entity_patterns" json:"entity_patterns"`
	DropScenarioIfNotMatched bool                      `bson:"drop_scenario_if_not_matched" json:"drop_scenario_if_not_matched"`
	EmitTrigger              bool                      `bson:"emit_trigger" json:"emit_trigger"`
}

Action represents a canopsis Action on alarms.

func (*Action) UnmarshalBSONValue

func (a *Action) UnmarshalBSONValue(_ bsontype.Type, b []byte) error

func (*Action) UnmarshalJSON

func (a *Action) UnmarshalJSON(b []byte) error

type Adapter

type Adapter interface {
	GetEnabled() ([]Scenario, error)
	GetEnabledById(id string) (Scenario, error)
	GetEnabledByIDs(ids []string) ([]Scenario, error)
}

func NewAdapter

func NewAdapter(dbClient mongo.DbClient) Adapter

type DelayedScenario

type DelayedScenario struct {
	ID            string        `json:"id"`
	ScenarioID    string        `json:"scenario_id"`
	AlarmID       string        `json:"alarm_id"`
	ExecutionTime types.CpsTime `json:"execution_time"`
	Paused        bool          `json:"paused"`
	TimeLeft      time.Duration `json:"time_left"`
}

type DelayedScenarioManager

type DelayedScenarioManager interface {
	AddDelayedScenario(context.Context, types.Alarm, Scenario) error
	PauseDelayedScenarios(context.Context, types.Alarm) error
	ResumeDelayedScenarios(context.Context, types.Alarm) error
	Run(context.Context) (<-chan DelayedScenarioTask, error)
}

func NewDelayedScenarioManager

func NewDelayedScenarioManager(
	adapter Adapter,
	alarmAdapter libalarm.Adapter,
	storage DelayedScenarioStorage,
	periodicalTimeout time.Duration,
	logger zerolog.Logger,
) DelayedScenarioManager

type DelayedScenarioStorage

type DelayedScenarioStorage interface {
	Add(ctx context.Context, scenario DelayedScenario) (string, error)
	GetAll(ctx context.Context) ([]DelayedScenario, error)
	Get(ctx context.Context, id string) (*DelayedScenario, error)
	Delete(ctx context.Context, id string) (bool, error)
	Update(ctx context.Context, scenario DelayedScenario) (bool, error)
}

func NewRedisDelayedScenarioStorage

func NewRedisDelayedScenarioStorage(
	key string,
	client redis.Cmdable,
	encoder encoding.Encoder,
	decoder encoding.Decoder,
) DelayedScenarioStorage

type DelayedScenarioTask

type DelayedScenarioTask struct {
	Alarm    types.Alarm
	Scenario Scenario
}

type ExecuteScenariosTask

type ExecuteScenariosTask struct {
	Triggers             []string
	DelayedScenarioID    string
	AbandonedExecutionID string
	Entity               types.Entity
	Alarm                types.Alarm
	AckResources         bool
}

type Execution

type Execution struct {
	Action   Action `json:"a"`
	Executed bool   `json:"e"`
}

type RpcResult

type RpcResult struct {
	CorrelationID   string
	Alarm           *types.Alarm
	AlarmChangeType types.AlarmChangeType
	Header          map[string]string
	Response        map[string]interface{}
	Error           error
}

type Scenario

type Scenario struct {
	ID                   string                  `bson:"_id,omitempty" json:"_id,omitempty"`
	Name                 string                  `bson:"name" json:"name"`
	Author               string                  `bson:"author" json:"author"`
	Enabled              bool                    `bson:"enabled" json:"enabled"`
	DisableDuringPeriods []string                `bson:"disable_during_periods" json:"disable_during_periods"`
	Triggers             []string                `bson:"triggers" json:"triggers"`
	Actions              []Action                `bson:"actions" json:"actions"`
	Priority             int                     `bson:"priority" json:"priority"`
	Delay                *types.DurationWithUnit `bson:"delay" json:"delay"`
	Created              types.CpsTime           `bson:"created,omitempty" json:"created,omitempty"`
	Updated              types.CpsTime           `bson:"updated,omitempty" json:"updated,omitempty"`
}

func (Scenario) IsTriggered

func (s Scenario) IsTriggered(triggers []string) bool

type ScenarioExecution

type ScenarioExecution struct {
	ID               string                 `json:"-"`
	ScenarioID       string                 `json:"-"`
	AlarmID          string                 `json:"-"`
	Entity           types.Entity           `json:"e"`
	ActionExecutions []Execution            `json:"ae"`
	LastUpdate       int64                  `json:"u"`
	AckResources     bool                   `json:"ar"`
	Tries            int64                  `json:"t"`
	Header           map[string]string      `json:"h,omitempty"`
	Response         map[string]interface{} `json:"r,omitempty"`
}

type ScenarioExecutionStorage

type ScenarioExecutionStorage interface {
	Get(ctx context.Context, executionID string) (*ScenarioExecution, error)
	GetAbandoned(ctx context.Context) ([]ScenarioExecution, error)
	Create(ctx context.Context, execution ScenarioExecution) (string, error)
	Update(ctx context.Context, execution ScenarioExecution) error
	Del(ctx context.Context, executionID string) error
	Inc(ctx context.Context, id string, inc int64, drop bool) (int64, error)
}

func NewRedisScenarioExecutionStorage

func NewRedisScenarioExecutionStorage(
	key string,
	redisClient redis.Cmdable,
	encoder encoding.Encoder,
	decoder encoding.Decoder,
	logger zerolog.Logger,
) ScenarioExecutionStorage

type ScenarioResult

type ScenarioResult struct {
	Alarm types.Alarm
	Err   error
}

type ScenarioStorage

type ScenarioStorage interface {
	// ReloadScenarios trigger a refresh on scenarios cache from DB
	ReloadScenarios() error

	// GetTriggeredScenarios returns scenarios which are triggered by triggers.
	GetTriggeredScenarios(
		triggers []string,
		alarm types.Alarm,
	) (triggered []Scenario, err error)

	// RunDelayedScenarios starts delay timeout for scenarios which are triggered by triggers.
	RunDelayedScenarios(
		ctx context.Context,
		triggers []string,
		alarm types.Alarm,
		entity types.Entity,
	) error

	// GetScenario returns scenario.
	GetScenario(id string) *Scenario
}

ScenarioStorage is used to provide scenarios.

func NewScenarioStorage

func NewScenarioStorage(
	actionAdapter Adapter,
	delayedScenarioManager DelayedScenarioManager,
	logger zerolog.Logger,
) ScenarioStorage

type Service

type Service interface {
	// Process parse an event to see if an action is suitable.
	Process(ctx context.Context, event *types.Event) error

	// ListenScenarioFinish receives message when all scenarios for event are finished
	// and acknowledges fifo.
	ListenScenarioFinish(ctx context.Context, channel <-chan ScenarioResult)

	// ProcessAbandonedExecutions checks execution storage and processes executions which
	// weren't updated for a long time
	ProcessAbandonedExecutions(ctx context.Context) error
}

Service allows you to manipulate actions.

func NewService

func NewService(
	alarmAdapter libalarm.Adapter,
	scenarioInputChan chan<- ExecuteScenariosTask,
	delayedScenarioManager DelayedScenarioManager,
	storage ScenarioExecutionStorage,
	encoder encoding.Encoder,
	fifoChan libamqp.Channel,
	fifoExchange string,
	fifoQueue string,
	activationService libalarm.ActivationService,
	logger zerolog.Logger,
) Service

NewService gives the correct action adapter.

type Task

type Task struct {
	Source       string
	Action       Action
	Alarm        types.Alarm
	Entity       types.Entity
	Step         int
	ExecutionID  string
	ScenarioID   string
	AckResources bool
	Header       map[string]string
	Response     map[string]interface{}
}

type TaskManager

type TaskManager interface {
	Run(ctx context.Context, rpcResultChannel <-chan RpcResult,
		inputChannel <-chan ExecuteScenariosTask) (<-chan ScenarioResult, error)
}

TaskManager is used to execute scenarios.

func NewTaskManager

func NewTaskManager(
	workerPool WorkerPool,
	executionStorage ScenarioExecutionStorage,
	scenarioStorage ScenarioStorage,
	logger zerolog.Logger,
) TaskManager

type TaskResult

type TaskResult struct {
	Source          string
	Alarm           types.Alarm
	Step            int
	ExecutionID     string
	AlarmChangeType types.AlarmChangeType
	Status          int
	Header          map[string]string
	Response        map[string]interface{}
	Err             error
}

type WorkerPool

type WorkerPool interface {
	RunWorkers(ctx context.Context, taskChannel <-chan Task) (<-chan TaskResult, error)
}

func NewWorkerPool

func NewWorkerPool(
	size int,
	axeRpcClient engine.RPCClient,
	webhookRpcClient engine.RPCClient,
	alarmAdapter alarm.Adapter,
	encoder encoding.Encoder,
	logger zerolog.Logger,
) WorkerPool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL