process

package
v0.9.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package process is for background processes and listed at the ../processes endpoint.

Index

Constants

View Source
const (

	// Continue indicates the error was within max error tolerance.
	Continue ErrorAction = "Continue"
	// Abort indicates this error exceeds max error tolerance.
	Abort ErrorAction = "Abort"

	// Completed indicates that execution has terminated normally due to completion of work.
	Completed Progress = "Completed"
	// Updated indicates that the state was updated in storage.
	Updated Progress = "Updated"
	// Merged indicates that the state was merged, then updated in storage.
	Merged Progress = "Merged"
	// Aborted indicates that errors caused execution to prematurely stop (incomplete).
	Aborted Progress = "Aborted"
	// Conflict indicates that the state ownership was taken over by another instance.
	// Unlike Aborted, the Conflict level indicates that any further writes of state
	// to storage should not be attempted.
	Conflict Progress = "Conflict"
	// None indicates that there was no storage update at this time.
	None Progress = "None"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorAction

type ErrorAction string

ErrorAction indicates how an AddError or AddWorkError should be handled.

type Process

type Process struct {
	// contains filtered or unexported fields
}

Process is a background process that performs work at a scheduled frequency.

func NewProcess

func NewProcess(name string, worker Worker, store storage.Store, scheduleFrequency time.Duration, defaultSettings *pb.Process_Params) *Process

NewProcess creates a new process to perform work of a given name. It will trigger every "scheduleFrequency" and workers will report back status updates to the storage layer every "progressFrequency".

  • If the process is not found in the storage layer, it will initialize with "defaultSettings".
  • scheduleFrequency may be adjusted to meet schedule frequency constraints.

func (*Process) AddError

func (p *Process) AddError(err error, workStatus *pb.Process_Status, state *pb.Process) ErrorAction

AddError will add error state to a given status block. Set "workStatus" to nil if it is not specific.

func (*Process) AddStats

func (p *Process) AddStats(count float64, name string, state *pb.Process)

AddStats will increment metrics of a given name within the process status.

func (*Process) AddWorkError

func (p *Process) AddWorkError(err error, workName string, state *pb.Process) ErrorAction

AddWorkError will add error state to a given work item status block as well as the process status block.

func (*Process) AddWorkStats

func (p *Process) AddWorkStats(count float64, stat, workName string, state *pb.Process)

AddWorkStats will increment metrics of a given name within the work item and process status.

func (*Process) DefaultSettings

func (p *Process) DefaultSettings() *pb.Process_Params

DefaultSettings returns the default settings.

func (*Process) Progress

func (p *Process) Progress(state *pb.Process) (Progress, error)

Progress is called by workers every 1 or more units of work and may update the underlying state. Returns true if an update occured. Important note: take caution as maps may have been merged with data from storage layer. If so, Merged progress will be returned.

func (*Process) RegisterWork

func (p *Process) RegisterWork(workName string, workParams *pb.Process_Params, tx storage.Tx) (_ *pb.Process_Work, ferr error)

RegisterWork adds a work item to the state for workers to process.

func (*Process) Run

func (p *Process) Run(ctx context.Context)

Run schedules a background process. Typically this will be on its own go routine.

func (*Process) ScheduleFrequency

func (p *Process) ScheduleFrequency() time.Duration

ScheduleFrequency returns schedule frequency.

func (*Process) UnregisterWork

func (p *Process) UnregisterWork(workName string, tx storage.Tx) (ferr error)

UnregisterWork (eventually) removes a work item from the active state, and allows cleanup work to be performed.

func (*Process) UpdateFlowControl

func (p *Process) UpdateFlowControl(initialWaitDuration, minScheduleFrequency, progressFrequency time.Duration) error

UpdateFlowControl alters settings for how flow of processing is managed. These are advanced settings and should be carefully managed when used outside of tests. These should be based on the size of the processing work between updates and the expected total time for each run with sufficient tolerance for errors and retries to minimize collisions with 2+ workers grabbing control of the state.

func (*Process) UpdateSettings

func (p *Process) UpdateSettings(scheduleFrequency time.Duration, settings *pb.Process_Params, tx storage.Tx) (ferr error)

UpdateSettings alters resource management settings.

type Progress

type Progress string

Progress indicates how an update was handled.

type Worker

type Worker interface {
	// ProcessActiveWork has a worker perform the work needed to process an active work item.
	ProcessActiveWork(ctx context.Context, state *pb.Process, workName string, work *pb.Process_Work, process *Process) error
	// CleanupWork has a worker perform the work needed to clean up a work item that was active previously.
	CleanupWork(ctx context.Context, state *pb.Process, workName string, process *Process) error
	// Wait indicates that the worker should wait for the next active cycle to begin. Return false to exit worker.
	Wait(ctx context.Context, duration time.Duration) bool
}

Worker represents a process that perform work on the set of work items provided.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL