engine

package
v0.0.0-...-9b5cd94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Overview

Package engine contain implementation of canopsis engine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsConnectionError

func IsConnectionError(err error) bool

IsConnectionError uses to check if stop engine or continue work.

Types

type Consumer

type Consumer interface {
	Consume(context.Context) error
}

Consumer interface is used to implement AMQP consumer of engine. If Consume returns error engine will be stopped.

func NewConcurrentConsumer

func NewConcurrentConsumer(
	name, queue string,
	consumePrefetchCount, consumePrefetchSize int,
	purgeQueue bool,
	nextExchange, nextQueue, fifoExchange, fifoQueue string,
	workers int,
	connection libamqp.Connection,
	processor MessageProcessor,
	logger zerolog.Logger,
) Consumer

func NewDefaultConsumer

func NewDefaultConsumer(
	name, queue string,
	consumePrefetchCount, consumePrefetchSize int,
	purgeQueue bool,
	nextExchange, nextQueue, fifoExchange, fifoQueue string,
	connection libamqp.Connection,
	processor MessageProcessor,
	logger zerolog.Logger,
) Consumer

NewDefaultConsumer creates consumer.

func NewRPCServer

func NewRPCServer(
	name, queue string,
	consumePrefetchCount, consumePrefetchSize int,
	connection libamqp.Connection,
	processor MessageProcessor,
	logger zerolog.Logger,
) Consumer

NewRPCServer creates consumer.

type Engine

type Engine interface {
	// AddConsumer adds AMQP consumer to engine.
	AddConsumer(Consumer)
	// AddPeriodicalWorker adds periodical worker to engine.
	AddPeriodicalWorker(name string, worker PeriodicalWorker)
	// AddRoutine adds a long-running goroutine to engine.
	AddRoutine(Routine)
	AddDeferFunc(deferFunc func(ctx context.Context))
	// Run starts goroutines for all consumers and periodical workers.
	// Engine stops if one of consumer or periodical worker return error.
	Run(context.Context) error
}

Engine interface is used to implement canopsis engine.

func New

func New(
	init func(ctx context.Context) error,
	deferFunc func(ctx context.Context),
	logger zerolog.Logger,
) Engine

type InstanceRunInfo

type InstanceRunInfo struct {
	ID               string           `json:"_id"`
	Name             string           `json:"name"`
	ConsumeQueue     string           `json:"consume_queue"`
	PublishQueue     string           `json:"publish_queue"`
	RpcConsumeQueues []string         `json:"rpc_consume_queues"`
	RpcPublishQueues []string         `json:"rpc_publish_queues"`
	QueueLength      int              `json:"queue_length"`
	Time             datetime.CpsTime `json:"time"`
}

InstanceRunInfo is instance of engine run information.

func NewInstanceRunInfo

func NewInstanceRunInfo(name, consumeQueue, publishQueue string, rpcQueues ...[]string) InstanceRunInfo

type MessageProcessor

type MessageProcessor interface {
	Process(ctx context.Context, d amqp.Delivery) (newMessage []byte, err error)
}

MessageProcessor interface is used to implement AMQP message processor of consumer. If Process returns error engine will be stopped.

type PeriodicalWorker

type PeriodicalWorker interface {
	GetInterval() time.Duration
	Work(context.Context)
}

PeriodicalWorker interface is used to implement engine periodical worker. If Work returns error engine will be stopped.

func NewLoadConfigPeriodicalWorker

func NewLoadConfigPeriodicalWorker(
	periodicalInterval time.Duration,
	adapter config.Adapter,
	logger zerolog.Logger,
	updaters ...config.Updater,
) PeriodicalWorker

func NewLoadUserInterfaceConfigPeriodicalWorker

func NewLoadUserInterfaceConfigPeriodicalWorker(
	periodicalInterval time.Duration,
	adapter config.UserInterfaceAdapter,
	logger zerolog.Logger,
	updater *config.BaseUserInterfaceConfigProvider,
) PeriodicalWorker

func NewLockedPeriodicalWorker

func NewLockedPeriodicalWorker(
	lockClient redis.LockClient,
	lockKey string,
	worker PeriodicalWorker,
	logger zerolog.Logger,
) PeriodicalWorker

func NewRunInfoMetricsPeriodicalWorker

func NewRunInfoMetricsPeriodicalWorker(
	periodicalInterval time.Duration,
	manager RunInfoManager,
	info InstanceRunInfo,
	channel amqp.Channel,
	techMetricsSender techmetrics.Sender,
	techMetric string,
	logger zerolog.Logger,
) PeriodicalWorker

func NewRunInfoPeriodicalWorker

func NewRunInfoPeriodicalWorker(
	periodicalInterval time.Duration,
	manager RunInfoManager,
	info InstanceRunInfo,
	channel amqp.Channel,
	logger zerolog.Logger,
) PeriodicalWorker

type RPCClient

type RPCClient interface {
	// Consumer receives RPC responses from AMQP queue.
	Consumer
	// Call receives RPC request and publishes it to AMQP queue.
	Call(ctx context.Context, m RPCMessage) error
}

RPCClient interface is used to implement AMQP RPC client.

func NewRPCClient

func NewRPCClient(
	name, serverQueueName, clientQueueName string,
	consumePrefetchCount, consumePrefetchSize int,
	processor RPCMessageProcessor,
	amqpChannel libamqp.Channel,
	logger zerolog.Logger,
) RPCClient

NewRPCClient creates new AMQP RPC client.

type RPCMessage

type RPCMessage struct {
	CorrelationID string
	Body          []byte
}

RPCMessage is AMQP RPC request or response.

type RPCMessageProcessor

type RPCMessageProcessor interface {
	Process(ctx context.Context, msg RPCMessage) error
}

RPCMessageProcessor interface is used to implement AMQP RPC response processor of consumer. If Process returns error engine will be stopped.

type Routine

type Routine func(context.Context) error

Routine interface is used to implement long-running goroutine of engine. If Routine returns error engine will be stopped.

type RunInfo

type RunInfo struct {
	Name             string
	ConsumeQueue     string
	PublishQueue     string
	RpcConsumeQueues []string
	RpcPublishQueues []string
	Instances        int
	QueueLength      int
	Time             datetime.CpsTime
	HasDiffConfig    bool
}

type RunInfoManager

type RunInfoManager interface {
	SaveInstance(ctx context.Context, info InstanceRunInfo, expiration time.Duration) error
	GetEngines(ctx context.Context) ([]RunInfo, error)
}

RunInfoManager interface is used to implement engine run info storage.

func NewRunInfoManager

func NewRunInfoManager(client redis.Cmdable, key ...string) RunInfoManager

NewRunInfoManager creates new run info manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL