worker

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: MIT Imports: 12 Imported by: 1

README

Worker Module

ci go report codecov Deps PkgGoDev

Worker module based on sync.

Installation

go get github.com/ankorstore/yokai/worker

Documentation

This module provides a WorkerPool, that:

The WorkerPool can be configured to:

  • defer all workers start with a threshold in seconds: 0 by default (start immediately)
  • attempt a maximum amount of runs in case of failures: 1 by default (no restarts)

The Worker executions:

  • have a unique identifier
  • have automatic panic recovery
  • are automatically logged
  • are automatically generating metrics
Workers

This module provides a Worker interface to implement to provide your own workers, for example:

package workers

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

// classic worker
type ClassicWorker struct{}

func NewClassicWorker() *ClassicWorker {
	return &ClassicWorker{}
}

func (w *ClassicWorker) Name() string {
	return "classic-worker"
}

func (w *ClassicWorker) Run(ctx context.Context) error {
	worker.CtxLogger(ctx).Info().Msg("run")

	return nil
}

// cancellable worker
type CancellableWorker struct{}

func NewCancellableWorker() *CancellableWorker {
	return &CancellableWorker{}
}

func (w *CancellableWorker) Name() string {
	return "cancellable-worker"
}

func (w *CancellableWorker) Run(ctx context.Context) error {
	logger := worker.CtxLogger(ctx)

	for {
		select {
		// when the WorkerPool stops, the ctx cancellation is forwarded to the workers
		case <-ctx.Done():
			logger.Info().Msg("cancel")

			return w.cancel()
		default:
			logger.Info().Msg("run")

			return w.run(ctx)
		}
	}
}

func (w *CancellableWorker) run(ctx context.Context) error {
	// your worker logic
}

func (w *CancellableWorker) cancel() error {
	// your worker cancel logic, for example graceful shutdown
}

Notes:

  • to perform more complex tasks, you can inject dependencies to your workers implementation (ex: database, cache, etc.)
  • it is recommended to design your workers with a single responsibility
WorkerPool

You can create a WorkerPool instance with the DefaultWorkerPoolFactory, register your Worker implementations, and start them:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"path/to/workers"
)

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithGlobalDeferredStartThreshold(1),                                              // will defer all workers start of 1 second
		worker.WithGlobalMaxExecutionsAttempts(2),                                               // will run 2 times max failing workers
		worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)),     // registers the ClassicWorker, with a deferred start of 3 second
		worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)),  // registers the CancellableWorker, with 4 runs max
	)

	// start the pool
	pool.Start(context.Background())

	// get all workers execution reports, in real time
	executions := pool.Executions()

	// stop the pool (will forward context cancellation to each worker)
	pool.Stop()

	// get a specific worker execution report, after pool stop
	execution, _ := pool.Execution("cancellable-worker")
}
Logging

You can use the CtxLogger() function to retrieve the contextual log.Logger from your workers, and emit correlated logs.

The workers executions are logged, with the following fields added automatically to each log records:

  • worker: worker name
  • workerExecutionID: worker execution id
package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
)

type LoggingWorker struct{}

func NewLoggingWorker() *LoggingWorker {
	return &LoggingWorker{}
}

func (w *LoggingWorker) Name() string {
	return "logging-worker"
}

func (w *LoggingWorker) Run(ctx context.Context) error {
	// log the current worker name and execution id
	worker.CtxLogger(ctx).Info().Msgf(
		"execution %s for worker %s",
		worker.CtxWorkerName(ctx),        // contextual worker name
		worker.CtxWorkerExecutionId(ctx), // contextual worker execution id
	)

	return nil
}

func main() {
	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewLoggingWorker()), // registers the LoggingWorker
	)

	// start the pool
	pool.Start(context.Background())
}
Tracing

You can use the CtxTracer() function to retrieve the contextual tracer from your workers, and emit correlated spans: they will have the Worker and WorkerExecutionID attributes added with respectively the worker name and execution id.

This module provides the AnnotateTracerProvider function, to extend a TracerProvider to add automatically current worker information id to the spans emitted during a worker execution:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"go.opentelemetry.io/otel/trace"
)

// tracing worker
type TracingWorker struct{}

func NewTracingWorker() *TracingWorker {
	return &TracingWorker{}
}

func (w *TracingWorker) Name() string {
	return "tracing-worker"
}

func (w *TracingWorker) Run(ctx context.Context) error {
	// emit some trace span
	_, span := worker.CtxTracer(ctx).Start(ctx, "some span")
	defer span.End()

	return nil
}

func main() {
	// tracer provider
	tp := trace.GetTracerProvider()

	// annotate the tracer provider
	worker.AnnotateTracerProvider(tp)

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
		worker.WithWorker(NewTracingWorker()),
	)

	// start the pool
	pool.Start(context.Background())
}
Metrics

The WorkerPool automatically generate metrics about:

  • started workers
  • re started workers
  • workers stopped with success
  • workers stopped with error

To enable those metrics in a registry, simply call Register on the WorkerMetrics of the WorkerPool:

package main

import (
	"context"

	"github.com/ankorstore/yokai/worker"
	"github.com/prometheus/client_golang/prometheus"
)

func main() {
	// metrics registry
	registry := prometheus.NewRegistry()

	// create the pool
	pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

	// register the pool metrics
	pool.Metrics().Register(registry)

	// start the pool
	pool.Start(context.Background())
}

Documentation

Index

Constants

View Source
const (
	ExecutionStarted   = "started"
	ExecutionRestarted = "restarted"
	ExecutionSuccess   = "success"
	ExecutionError     = "error"
)
View Source
const (
	DefaultDeferredStartThreshold = 0
	DefaultMaxExecutionsAttempts  = 1
	DefaultMetricsNamespace       = ""
	DefaultMetricsSubsystem       = ""
)
View Source
const (
	LogRecordFieldWorkerName            = "worker"
	LogRecordFieldWorkerExecutionId     = "workerExecutionID"
	TraceSpanAttributeWorkerName        = "Worker"
	TraceSpanAttributeWorkerExecutionId = "WorkerExecutionID"
)
View Source
const TracerName = "worker"

TracerName is the workers tracer name.

Variables

This section is empty.

Functions

func AnnotateTracerProvider

func AnnotateTracerProvider(base oteltrace.TracerProvider) oteltrace.TracerProvider

AnnotateTracerProvider extends a provided oteltrace.TracerProvider spans with worker execution attributes.

func CtxLogger

func CtxLogger(ctx context.Context) *log.Logger

CtxLogger returns the contextual log.Logger.

func CtxTracer

func CtxTracer(ctx context.Context) oteltrace.Tracer

CtxTracer returns the contextual oteltrace.Tracer.

func CtxWorkerExecutionId

func CtxWorkerExecutionId(ctx context.Context) string

CtxWorkerExecutionId returns the contextual Worker execution id.

func CtxWorkerName

func CtxWorkerName(ctx context.Context) string

CtxWorkerName returns the contextual Worker name.

func Sanitize

func Sanitize(str string) string

Sanitize transforms a given string to not contain spaces or dashes, and to be in lower case.

Types

type CtxWorkerExecutionIdKey

type CtxWorkerExecutionIdKey struct{}

CtxWorkerExecutionIdKey is a contextual struct key for the current worker execution id.

type CtxWorkerNameKey

type CtxWorkerNameKey struct{}

CtxWorkerNameKey is a contextual struct key for the current worker name.

type DefaultWorkerPoolFactory

type DefaultWorkerPoolFactory struct{}

DefaultWorkerPoolFactory is the default WorkerPoolFactory implementation.

func (*DefaultWorkerPoolFactory) Create

func (f *DefaultWorkerPoolFactory) Create(options ...WorkerPoolOption) (*WorkerPool, error)

Create returns a new WorkerPool, and accepts a list of WorkerPoolOption. For example:

var pool, _ = worker.NewDefaultWorkerPoolFactory().Create()

is equivalent to:

var pool, _ = worker.NewDefaultWorkerPoolFactory().Create(
	worker.WithGenerator(uuid.NewDefaultUuidGenerator()), // generator
	worker.WithMetrics(worker.NewWorkerMetrics("", "")),  // metrics
	worker.WithGlobalMaxExecutionsAttempts(1),            // no retries
	worker.WithGlobalDeferredStartThreshold(0),           // no deferred start
)

type ExecutionOptions

type ExecutionOptions struct {
	DeferredStartThreshold float64
	MaxExecutionsAttempts  int
}

ExecutionOptions are options for the Worker executions.

func DefaultWorkerExecutionOptions

func DefaultWorkerExecutionOptions() ExecutionOptions

DefaultWorkerExecutionOptions are the default options for the Worker executions.

type PoolOptions

type PoolOptions struct {
	GlobalDeferredStartThreshold float64
	GlobalMaxExecutionsAttempts  int
	Metrics                      *WorkerMetrics
	Generator                    uuid.UuidGenerator
	Registrations                map[string]*WorkerRegistration
}

PoolOptions are options for the WorkerPoolFactory implementations.

func DefaultWorkerPoolOptions

func DefaultWorkerPoolOptions() PoolOptions

DefaultWorkerPoolOptions are the default options used in the DefaultWorkerPoolFactory.

type TracerProviderWorkerAnnotator

type TracerProviderWorkerAnnotator struct{}

TracerProviderWorkerAnnotator is the oteltrace.TracerProvider workers annotator, implementing otelsdktrace.SpanProcessor.

func NewTracerProviderWorkerAnnotator

func NewTracerProviderWorkerAnnotator() *TracerProviderWorkerAnnotator

NewTracerProviderWorkerAnnotator returns a new TracerProviderWorkerAnnotator.

func (*TracerProviderWorkerAnnotator) ForceFlush

ForceFlush is just for otelsdktrace.SpanProcessor compliance.

func (*TracerProviderWorkerAnnotator) OnEnd

OnEnd is just for otelsdktrace.SpanProcessor compliance.

func (*TracerProviderWorkerAnnotator) OnStart

OnStart adds worker execution attributes to a given otelsdktrace.ReadWriteSpan.

func (*TracerProviderWorkerAnnotator) Shutdown

Shutdown is just for otelsdktrace.SpanProcessor compliance.

type Worker

type Worker interface {
	Name() string
	Run(ctx context.Context) error
}

Worker is the interface to implement to provide workers.

type WorkerExecution

type WorkerExecution struct {
	// contains filtered or unexported fields
}

WorkerExecution represents a Worker execution within the WorkerPool.

func NewWorkerExecution

func NewWorkerExecution(id string, name string, options ExecutionOptions) *WorkerExecution

NewWorkerExecution returns a new WorkerExecution.

func (*WorkerExecution) AddEvent

func (e *WorkerExecution) AddEvent(message string) *WorkerExecution

AddEvent adds a WorkerExecutionEvent to the WorkerExecution.

func (*WorkerExecution) CurrentExecutionAttempt

func (e *WorkerExecution) CurrentExecutionAttempt() int

CurrentExecutionAttempt returns the WorkerExecution current execution attempt.

func (*WorkerExecution) DeferredStartThreshold

func (e *WorkerExecution) DeferredStartThreshold() float64

DeferredStartThreshold returns the WorkerExecution max deferred start threshold, in seconds.

func (*WorkerExecution) Events

func (e *WorkerExecution) Events() []*WorkerExecutionEvent

Events returns the WorkerExecution list of WorkerExecutionEvent.

func (*WorkerExecution) HasEvent

func (e *WorkerExecution) HasEvent(message string) bool

HasEvent returns true if a WorkerExecutionEvent was found for a given message.

func (*WorkerExecution) Id

func (e *WorkerExecution) Id() string

Id returns the WorkerExecution id.

func (*WorkerExecution) MaxExecutionsAttempts

func (e *WorkerExecution) MaxExecutionsAttempts() int

MaxExecutionsAttempts returns the WorkerExecution max execution attempts.

func (*WorkerExecution) Name

func (e *WorkerExecution) Name() string

Name returns the WorkerExecution name.

func (*WorkerExecution) SetCurrentExecutionAttempt

func (e *WorkerExecution) SetCurrentExecutionAttempt(current int) *WorkerExecution

SetCurrentExecutionAttempt sets the WorkerExecution current execution attempt.

func (*WorkerExecution) SetDeferredStartThreshold

func (e *WorkerExecution) SetDeferredStartThreshold(threshold float64) *WorkerExecution

SetDeferredStartThreshold sets the WorkerExecution max deferred start threshold, in seconds.

func (*WorkerExecution) SetId

func (e *WorkerExecution) SetId(id string) *WorkerExecution

SetId sets the WorkerExecution id.

func (*WorkerExecution) SetMaxExecutionsAttempts

func (e *WorkerExecution) SetMaxExecutionsAttempts(max int) *WorkerExecution

SetMaxExecutionsAttempts sets the WorkerExecution max execution attempts.

func (*WorkerExecution) SetName

func (e *WorkerExecution) SetName(name string) *WorkerExecution

SetName sets the WorkerExecution name.

func (*WorkerExecution) SetStatus

func (e *WorkerExecution) SetStatus(status WorkerStatus) *WorkerExecution

SetStatus sets the WorkerExecution status.

func (*WorkerExecution) Status

func (e *WorkerExecution) Status() WorkerStatus

Status returns the WorkerExecution status.

type WorkerExecutionEvent

type WorkerExecutionEvent struct {
	// contains filtered or unexported fields
}

WorkerExecutionEvent is an event happening during a Worker execution.

func NewWorkerExecutionEvent

func NewWorkerExecutionEvent(executionId string, message string, timestamp time.Time) *WorkerExecutionEvent

NewWorkerExecutionEvent returns a new WorkerExecutionEvent.

func (*WorkerExecutionEvent) ExecutionId

func (e *WorkerExecutionEvent) ExecutionId() string

ExecutionId returns the worker execution id.

func (*WorkerExecutionEvent) Message

func (e *WorkerExecutionEvent) Message() string

Message returns the worker execution message.

func (*WorkerExecutionEvent) String

func (e *WorkerExecutionEvent) String() string

String returns a string representation of the WorkerExecutionEvent.

func (*WorkerExecutionEvent) Timestamp

func (e *WorkerExecutionEvent) Timestamp() time.Time

Timestamp returns the worker execution timestamp.

type WorkerExecutionOption

type WorkerExecutionOption func(o *ExecutionOptions)

WorkerExecutionOption are functional options for the Worker executions.

func WithDeferredStartThreshold

func WithDeferredStartThreshold(t float64) WorkerExecutionOption

WithDeferredStartThreshold is used to specify the worker deferred start threshold, in seconds.

func WithMaxExecutionsAttempts

func WithMaxExecutionsAttempts(l int) WorkerExecutionOption

WithMaxExecutionsAttempts is used to specify the worker max execution attempts.

type WorkerMetrics

type WorkerMetrics struct {
	// contains filtered or unexported fields
}

WorkerMetrics allows the WorkerPool to send worker metrics to a prometheus.Registry.

func NewWorkerMetrics

func NewWorkerMetrics(namespace string, subsystem string) *WorkerMetrics

NewWorkerMetrics returns a new WorkerMetrics, and accepts metrics namespace and subsystem.

func (*WorkerMetrics) IncrementWorkerExecutionError

func (m *WorkerMetrics) IncrementWorkerExecutionError(workerName string) *WorkerMetrics

IncrementWorkerExecutionError increments the failing workers counter for a given worker name.

func (*WorkerMetrics) IncrementWorkerExecutionRestart

func (m *WorkerMetrics) IncrementWorkerExecutionRestart(workerName string) *WorkerMetrics

IncrementWorkerExecutionRestart increments the restarted workers counter for a given worker name.

func (*WorkerMetrics) IncrementWorkerExecutionStart

func (m *WorkerMetrics) IncrementWorkerExecutionStart(workerName string) *WorkerMetrics

IncrementWorkerExecutionStart increments the started workers counter for a given worker name.

func (*WorkerMetrics) IncrementWorkerExecutionSuccess

func (m *WorkerMetrics) IncrementWorkerExecutionSuccess(workerName string) *WorkerMetrics

IncrementWorkerExecutionSuccess increments the successful workers counter for a given worker name.

func (*WorkerMetrics) Register

func (m *WorkerMetrics) Register(registry *prometheus.Registry) error

Register registers the WorkerMetrics against a prometheus.Registry.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is the Worker pool.

func NewWorkerPool

func NewWorkerPool(options ...WorkerPoolOption) *WorkerPool

NewWorkerPool returns a new WorkerPool, with optional WorkerPoolOption.

func (*WorkerPool) Execution

func (p *WorkerPool) Execution(name string) (*WorkerExecution, error)

Execution returns the WorkerExecution from the WorkerPool for a given worker name.

func (*WorkerPool) Executions

func (p *WorkerPool) Executions() map[string]*WorkerExecution

Executions returns the WorkerPool list of WorkerExecution.

func (*WorkerPool) Metrics

func (p *WorkerPool) Metrics() *WorkerMetrics

Metrics returns the WorkerPool internal WorkerMetrics.

func (*WorkerPool) Options

func (p *WorkerPool) Options() PoolOptions

Options returns the list of PoolOptions of the WorkerPool.

func (*WorkerPool) Register

func (p *WorkerPool) Register(registrations ...*WorkerRegistration) *WorkerPool

Register registers a new WorkerRegistration onto the WorkerPool.

func (*WorkerPool) Registration

func (p *WorkerPool) Registration(name string) (*WorkerRegistration, error)

Registration returns the WorkerRegistration from the WorkerPool for a given worker name.

func (*WorkerPool) Registrations

func (p *WorkerPool) Registrations() map[string]*WorkerRegistration

Registrations returns the WorkerPool list of WorkerRegistration.

func (*WorkerPool) Start

func (p *WorkerPool) Start(ctx context.Context) error

Start starts all Worker registered in the WorkerPool.

func (*WorkerPool) Stop

func (p *WorkerPool) Stop() error

Stop gracefully stops all Worker registered in the WorkerPool.

type WorkerPoolFactory

type WorkerPoolFactory interface {
	Create(options ...WorkerPoolOption) (*WorkerPool, error)
}

WorkerPoolFactory is the interface for WorkerPool factories.

func NewDefaultWorkerPoolFactory

func NewDefaultWorkerPoolFactory() WorkerPoolFactory

NewDefaultWorkerPoolFactory returns a DefaultWorkerPoolFactory, implementing WorkerPoolFactory.

type WorkerPoolOption

type WorkerPoolOption func(o *PoolOptions)

WorkerPoolOption are functional options for the WorkerPoolFactory implementations.

func WithGenerator

func WithGenerator(generator uuid.UuidGenerator) WorkerPoolOption

WithGenerator is used to specify the uuid.UuidGenerator to use by the WorkerPool.

func WithGlobalDeferredStartThreshold

func WithGlobalDeferredStartThreshold(threshold float64) WorkerPoolOption

WithGlobalDeferredStartThreshold is used to specify the global workers deferred start threshold, in seconds.

func WithGlobalMaxExecutionsAttempts

func WithGlobalMaxExecutionsAttempts(max int) WorkerPoolOption

WithGlobalMaxExecutionsAttempts is used to specify the global workers max execution attempts.

func WithMetrics

func WithMetrics(metrics *WorkerMetrics) WorkerPoolOption

WithMetrics is used to specify the WorkerMetrics to use by the WorkerPool.

func WithWorker

func WithWorker(worker Worker, options ...WorkerExecutionOption) WorkerPoolOption

WithWorker is used to register a Worker in the WorkerPool, with an optional list of WorkerPoolOption.

type WorkerRegistration

type WorkerRegistration struct {
	// contains filtered or unexported fields
}

WorkerRegistration is a Worker registration, with optional WorkerExecutionOption.

func NewWorkerRegistration

func NewWorkerRegistration(worker Worker, options ...WorkerExecutionOption) *WorkerRegistration

NewWorkerRegistration returns a new WorkerRegistration for a given Worker and an optional list of WorkerRegistration.

func (*WorkerRegistration) Options

Options returns the list of WorkerExecutionOption of the WorkerRegistration.

func (*WorkerRegistration) Worker

func (r *WorkerRegistration) Worker() Worker

Worker returns the Worker of the WorkerRegistration.

type WorkerStatus

type WorkerStatus int

WorkerStatus is an enum for the possible statuses of a workers.

const (
	Unknown WorkerStatus = iota
	Deferred
	Running
	Success
	Error
)

func (WorkerStatus) String

func (s WorkerStatus) String() string

String returns a string representation of the WorkerStatus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL