Documentation ¶
Overview ¶
Package engine contain implementation of canopsis engine.
Index ¶
- func IsConnectionError(err error) bool
- type Consumer
- func NewConcurrentConsumer(name, queue string, consumePrefetchCount, consumePrefetchSize int, ...) Consumer
- func NewDefaultConsumer(name, queue string, consumePrefetchCount, consumePrefetchSize int, ...) Consumer
- func NewRPCServer(name, queue string, consumePrefetchCount, consumePrefetchSize int, ...) Consumer
- type Engine
- type InstanceRunInfo
- type MessageProcessor
- type PeriodicalWorker
- func NewLoadConfigPeriodicalWorker(periodicalInterval time.Duration, adapter config.Adapter, ...) PeriodicalWorker
- func NewLoadUserInterfaceConfigPeriodicalWorker(periodicalInterval time.Duration, adapter config.UserInterfaceAdapter, ...) PeriodicalWorker
- func NewLockedPeriodicalWorker(lockClient redis.LockClient, lockKey string, worker PeriodicalWorker, ...) PeriodicalWorker
- func NewRunInfoMetricsPeriodicalWorker(periodicalInterval time.Duration, manager RunInfoManager, info InstanceRunInfo, ...) PeriodicalWorker
- func NewRunInfoPeriodicalWorker(periodicalInterval time.Duration, manager RunInfoManager, info InstanceRunInfo, ...) PeriodicalWorker
- type RPCClient
- type RPCMessage
- type RPCMessageProcessor
- type Routine
- type RunInfo
- type RunInfoManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsConnectionError ¶
IsConnectionError uses to check if stop engine or continue work.
Types ¶
type Consumer ¶
Consumer interface is used to implement AMQP consumer of engine. If Consume returns error engine will be stopped.
func NewConcurrentConsumer ¶
func NewDefaultConsumer ¶
func NewDefaultConsumer( name, queue string, consumePrefetchCount, consumePrefetchSize int, purgeQueue bool, nextExchange, nextQueue, fifoExchange, fifoQueue string, connection libamqp.Connection, processor MessageProcessor, logger zerolog.Logger, ) Consumer
NewDefaultConsumer creates consumer.
func NewRPCServer ¶
func NewRPCServer( name, queue string, consumePrefetchCount, consumePrefetchSize int, connection libamqp.Connection, processor MessageProcessor, logger zerolog.Logger, ) Consumer
NewRPCServer creates consumer.
type Engine ¶
type Engine interface { // AddConsumer adds AMQP consumer to engine. AddConsumer(Consumer) // AddPeriodicalWorker adds periodical worker to engine. AddPeriodicalWorker(name string, worker PeriodicalWorker) // AddRoutine adds a long-running goroutine to engine. AddRoutine(Routine) AddDeferFunc(deferFunc func(ctx context.Context)) // Run starts goroutines for all consumers and periodical workers. // Engine stops if one of consumer or periodical worker return error. Run(context.Context) error }
Engine interface is used to implement canopsis engine.
type InstanceRunInfo ¶
type InstanceRunInfo struct { ID string `json:"_id"` Name string `json:"name"` ConsumeQueue string `json:"consume_queue"` PublishQueue string `json:"publish_queue"` RpcConsumeQueues []string `json:"rpc_consume_queues"` RpcPublishQueues []string `json:"rpc_publish_queues"` QueueLength int `json:"queue_length"` Time datetime.CpsTime `json:"time"` }
InstanceRunInfo is instance of engine run information.
func NewInstanceRunInfo ¶
func NewInstanceRunInfo(name, consumeQueue, publishQueue string, rpcQueues ...[]string) InstanceRunInfo
type MessageProcessor ¶
type MessageProcessor interface {
Process(ctx context.Context, d amqp.Delivery) (newMessage []byte, err error)
}
MessageProcessor interface is used to implement AMQP message processor of consumer. If Process returns error engine will be stopped.
type PeriodicalWorker ¶
PeriodicalWorker interface is used to implement engine periodical worker. If Work returns error engine will be stopped.
func NewLoadUserInterfaceConfigPeriodicalWorker ¶
func NewLoadUserInterfaceConfigPeriodicalWorker( periodicalInterval time.Duration, adapter config.UserInterfaceAdapter, logger zerolog.Logger, updater *config.BaseUserInterfaceConfigProvider, ) PeriodicalWorker
func NewLockedPeriodicalWorker ¶
func NewLockedPeriodicalWorker( lockClient redis.LockClient, lockKey string, worker PeriodicalWorker, logger zerolog.Logger, ) PeriodicalWorker
func NewRunInfoMetricsPeriodicalWorker ¶
func NewRunInfoMetricsPeriodicalWorker( periodicalInterval time.Duration, manager RunInfoManager, info InstanceRunInfo, channel amqp.Channel, techMetricsSender techmetrics.Sender, techMetric string, logger zerolog.Logger, ) PeriodicalWorker
func NewRunInfoPeriodicalWorker ¶
func NewRunInfoPeriodicalWorker( periodicalInterval time.Duration, manager RunInfoManager, info InstanceRunInfo, channel amqp.Channel, logger zerolog.Logger, ) PeriodicalWorker
type RPCClient ¶
type RPCClient interface { // Consumer receives RPC responses from AMQP queue. Consumer // Call receives RPC request and publishes it to AMQP queue. Call(ctx context.Context, m RPCMessage) error }
RPCClient interface is used to implement AMQP RPC client.
func NewRPCClient ¶
func NewRPCClient( name, serverQueueName, clientQueueName string, consumePrefetchCount, consumePrefetchSize int, processor RPCMessageProcessor, amqpChannel libamqp.Channel, logger zerolog.Logger, ) RPCClient
NewRPCClient creates new AMQP RPC client.
type RPCMessage ¶
RPCMessage is AMQP RPC request or response.
type RPCMessageProcessor ¶
type RPCMessageProcessor interface {
Process(ctx context.Context, msg RPCMessage) error
}
RPCMessageProcessor interface is used to implement AMQP RPC response processor of consumer. If Process returns error engine will be stopped.
type Routine ¶
Routine interface is used to implement long-running goroutine of engine. If Routine returns error engine will be stopped.
type RunInfoManager ¶
type RunInfoManager interface { SaveInstance(ctx context.Context, info InstanceRunInfo, expiration time.Duration) error GetEngines(ctx context.Context) ([]RunInfo, error) }
RunInfoManager interface is used to implement engine run info storage.
func NewRunInfoManager ¶
func NewRunInfoManager(client redis.Cmdable, key ...string) RunInfoManager
NewRunInfoManager creates new run info manager.