workers

package
v1.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrScheduleQueueIsEmpty occurs when there is no schedule with
	// the `next_execution_time` in the past
	ErrScheduleQueueIsEmpty = errors.New("nothing to schedule")
)
View Source
var TaskQueueMetrics = struct {
	Labels             []string
	TaskDuration       *prometheus.HistogramVec
	TaskWaiting        *prometheus.HistogramVec
	DequeueDuration    prometheus.Histogram
	WorkerGauge        prometheus.Gauge
	WorkerWaiting      prometheus.Counter
	WorkerWorking      *prometheus.CounterVec
	WorkerWorkingGauge *prometheus.GaugeVec
	WorkerTask         *prometheus.CounterVec
	WorkerErrors       *prometheus.CounterVec
	QueueErrors        prometheus.Counter
}{
	Labels: queueMetricLabels,
	TaskDuration: promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "duration_s",
			Help:      "duration of a task in seconds by queue",
			Buckets:   durationSBuckets,
		},
		taskMetricLabels,
	),
	TaskWaiting: promauto.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "queue",
			Name:      "waiting_duration_s",
			Help:      "duration of a task waiting to start",
			Buckets:   durationSBuckets,
		},
		taskMetricLabels,
	),
	DequeueDuration: promauto.NewHistogram(
		prometheus.HistogramOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "wait_duration_s",
			Help:      "duration in seconds a worker spent waiting",
			Buckets:   durationSBuckets,
		},
	),
	WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "count",
		Help:      "count of initialized workers",
	}),
	WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "waiting_count",
		Help:      "count of workers waiting for a task",
	}),
	WorkerWorking: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "working_total",
			Help:      "count of workers working on a task",
		},
		taskMetricLabels,
	),
	WorkerWorkingGauge: promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "working",
			Help:      "count of working workers",
		},
		taskMetricLabels,
	),
	WorkerTask: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "task_total",
			Help:      "count of tasks seen by the worker",
		},
		taskMetricLabels,
	),
	WorkerErrors: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "worker",
			Name:      "error_total",
			Help:      "count of errors seen by the worker",
		},
		taskMetricLabels,
	),
	QueueErrors: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "worker",
		Name:      "queue_error_total",
		Help:      "count of dequeue errors seen by the worker",
	}),
}

TaskQueueMetrics provides access to the prometheus metric objects for the task queue

View Source
var TaskSchedulingMetrics = struct {
	Labels            []string
	IterationDuration prometheus.Histogram
	WorkerGauge,
	WorkerWorkingGauge prometheus.Gauge
	WorkerWaiting,
	WorkerWorking,
	WorkerErrors prometheus.Counter
	WorkerTaskScheduled *prometheus.CounterVec
}{
	Labels: queueMetricLabels,
	IterationDuration: promauto.NewHistogram(prometheus.HistogramOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "iteration_duration_ms",
		Help:      "duration of the scheduling iteration in ms",
		Buckets:   durationMsBuckets,
	}),
	WorkerGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "count",
		Help:      "count of initialized task scheduling workers",
	}),
	WorkerWorkingGauge: promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "working_count",
		Help:      "count of working task scheduling workers",
	}),
	WorkerWaiting: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "waiting_total",
		Help:      "count of workers waiting for a task to schedule",
	}),
	WorkerWorking: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "working_total",
		Help:      "count of workers working on a task scheduling",
	}),
	WorkerErrors: promauto.NewCounter(prometheus.CounterOpts{
		Namespace: "hub",
		Subsystem: "scheduling",
		Name:      "error_total",
		Help:      "count of task scheduling errors seen by the worker",
	}),
	WorkerTaskScheduled: promauto.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "hub",
			Subsystem: "scheduling",
			Name:      "task_total",
			Help:      "count of tasks scheduled by the worker",
		},
		taskMetricLabels,
	),
}

TaskSchedulingMetrics provides access to the prometheus metric objects for task scheduling

Functions

This section is empty.

Types

type TaskHandler

type TaskHandler interface {
	// Process implements the specific Task parsing logic
	Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) (err error)
}

TaskHandler is a type alias for a method that parses a task and returns any processing errors

type TaskHandlerFunc

type TaskHandlerFunc func(context.Context, queue.Task, chan<- queue.Progress) error

TaskHandlerFunc is an adapter that allows the use of a normal function as a TaskHandler

func (TaskHandlerFunc) Process

func (f TaskHandlerFunc) Process(ctx context.Context, task queue.Task, heartbeats chan<- queue.Progress) error

Process implements the specific Task parsing logic

type Worker

type Worker interface {
	// Work is responsible for getting and processing tasks
	// It should run continuously or until the context is cancelled
	Work(context.Context) error
}

Worker provides methods to do some kind of work

func NewScheduleWorker

func NewScheduleWorker(db *sql.DB, queue queue.Queuer, interval time.Duration) Worker

NewScheduleWorker creates a new task scheduling worker

func NewTaskWorker

func NewTaskWorker(dequeuer queue.Dequeuer, handler TaskHandler) Worker

NewTaskWorker creates a new Task Worker instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL