execution

package
v0.0.0-...-a12d374 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusUnknown   Status = 0
	StatusQueued    Status = 1
	StatusRunning   Status = 2
	StatusCompleted Status = 3
	StatusCancelled Status = 4
	StatusErrored   Status = 5

	CauseUnknown  Cause = 0
	CauseManual   Cause = 1
	CauseSchedule Cause = 2
	CauseHook     Cause = 3
)

Variables

View Source
var ErrWorkerAlreadyRunning = errors.New("worker already running")

Functions

This section is empty.

Types

type Cause

type Cause uint

Job cause

func NewCause

func NewCause(input string) Cause

func (Cause) String

func (t Cause) String() string

type DbContext

type DbContext interface {
	GetScriptsRepository(projectID string) (scripts.Repository, error)
}

type Event

type Event struct {
	ProjectID string
	ScriptID  string
	CausedBy  Cause
}

Event represents an event that triggered a script execution

type FQLWorker

type FQLWorker struct {
	// contains filtered or unexported fields
}

func (*FQLWorker) Interrupt

func (w *FQLWorker) Interrupt()

func (*FQLWorker) IsRunning

func (w *FQLWorker) IsRunning() bool

func (*FQLWorker) Process

func (w *FQLWorker) Process() ([]byte, error)

type InMemoryQueue

type InMemoryQueue struct {
	// contains filtered or unexported fields
}

func NewInMemoryQueue

func NewInMemoryQueue(size uint64) (*InMemoryQueue, error)

func (*InMemoryQueue) Dequeue

func (q *InMemoryQueue) Dequeue(_ context.Context) (<-chan Job, error)

func (*InMemoryQueue) Enqueue

func (q *InMemoryQueue) Enqueue(ctx context.Context, job Job) error

type Job

type Job struct {
	ID        string
	ProjectID string
	CausedBy  Cause
	Script    scripts.ScriptEntity
}

Job represents a running script

type LogWriter

type LogWriter interface {
	Write(job Job, data []byte) (n int, err error)
}

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(writer LogWriter, job Job) *Logger

func (*Logger) Write

func (l *Logger) Write(data []byte) (int, error)

type OutputWriter

type OutputWriter interface {
	Write(job Job, data []byte) error
}

type Queue

type Queue interface {
	Enqueue(ctx context.Context, job Job) error
	Dequeue(ctx context.Context) (<-chan Job, error)
}

type Result

type Result struct {
	State
	Data []byte
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	settings Settings,
	logger *zerolog.Logger,
	db DbContext,
	compiler *compiler.Compiler,
	queue Queue,
	state StateWriter,
	logs LogWriter,
	output OutputWriter,
) (*Service, error)

func (*Service) Cancel

func (service *Service) Cancel(_ context.Context, projectID string, jobID string) error

func (*Service) Start

func (service *Service) Start(ctx context.Context, event Event) (string, error)

type Settings

type Settings struct {
	PoolSize uint64
}

func NewDefaultSettings

func NewDefaultSettings() Settings

type State

type State struct {
	Job       Job
	Timestamp time.Time
	Status    Status
	Error     error
}

type StateWriter

type StateWriter interface {
	Write(state State) error
}

type Status

type Status uint

Job state

func NewStatus

func NewStatus(input string) Status

func (Status) String

func (t Status) String() string

type Worker

type Worker interface {
	IsRunning() bool
	Process() ([]byte, error)
	Interrupt()
}

func NewFQLWorker

func NewFQLWorker(compiler *compiler.Compiler, log io.Writer, job Job) Worker

type WorkerFactory

type WorkerFactory func(job Job) Worker

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(
	size uint64,
	logger *zerolog.Logger,
	status StateWriter,
	factory WorkerFactory,
) (*WorkerPool, error)

func (*WorkerPool) Cancel

func (wp *WorkerPool) Cancel(projectID, jobID string) error

func (*WorkerPool) Consume

func (wp *WorkerPool) Consume(ctx context.Context, q Queue) (<-chan Result, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL