Documentation ¶
Index ¶
- Constants
- func AnnotateTracerProvider(base oteltrace.TracerProvider) oteltrace.TracerProvider
- func CtxLogger(ctx context.Context) *log.Logger
- func CtxTracer(ctx context.Context) oteltrace.Tracer
- func CtxWorkerExecutionId(ctx context.Context) string
- func CtxWorkerName(ctx context.Context) string
- func Sanitize(str string) string
- type CtxWorkerExecutionIdKey
- type CtxWorkerNameKey
- type DefaultWorkerPoolFactory
- type ExecutionOptions
- type PoolOptions
- type TracerProviderWorkerAnnotator
- func (a *TracerProviderWorkerAnnotator) ForceFlush(context.Context) error
- func (a *TracerProviderWorkerAnnotator) OnEnd(otelsdktrace.ReadOnlySpan)
- func (a *TracerProviderWorkerAnnotator) OnStart(ctx context.Context, s otelsdktrace.ReadWriteSpan)
- func (a *TracerProviderWorkerAnnotator) Shutdown(context.Context) error
- type Worker
- type WorkerExecution
- func (e *WorkerExecution) AddEvent(message string) *WorkerExecution
- func (e *WorkerExecution) CurrentExecutionAttempt() int
- func (e *WorkerExecution) DeferredStartThreshold() float64
- func (e *WorkerExecution) Events() []*WorkerExecutionEvent
- func (e *WorkerExecution) HasEvent(message string) bool
- func (e *WorkerExecution) Id() string
- func (e *WorkerExecution) MaxExecutionsAttempts() int
- func (e *WorkerExecution) Name() string
- func (e *WorkerExecution) SetCurrentExecutionAttempt(current int) *WorkerExecution
- func (e *WorkerExecution) SetDeferredStartThreshold(threshold float64) *WorkerExecution
- func (e *WorkerExecution) SetId(id string) *WorkerExecution
- func (e *WorkerExecution) SetMaxExecutionsAttempts(max int) *WorkerExecution
- func (e *WorkerExecution) SetName(name string) *WorkerExecution
- func (e *WorkerExecution) SetStatus(status WorkerStatus) *WorkerExecution
- func (e *WorkerExecution) Status() WorkerStatus
- type WorkerExecutionEvent
- type WorkerExecutionOption
- type WorkerMetrics
- func (m *WorkerMetrics) IncrementWorkerExecutionError(workerName string) *WorkerMetrics
- func (m *WorkerMetrics) IncrementWorkerExecutionRestart(workerName string) *WorkerMetrics
- func (m *WorkerMetrics) IncrementWorkerExecutionStart(workerName string) *WorkerMetrics
- func (m *WorkerMetrics) IncrementWorkerExecutionSuccess(workerName string) *WorkerMetrics
- func (m *WorkerMetrics) Register(registry *prometheus.Registry) error
- type WorkerPool
- func (p *WorkerPool) Execution(name string) (*WorkerExecution, error)
- func (p *WorkerPool) Executions() map[string]*WorkerExecution
- func (p *WorkerPool) Metrics() *WorkerMetrics
- func (p *WorkerPool) Options() PoolOptions
- func (p *WorkerPool) Register(registrations ...*WorkerRegistration) *WorkerPool
- func (p *WorkerPool) Registration(name string) (*WorkerRegistration, error)
- func (p *WorkerPool) Registrations() map[string]*WorkerRegistration
- func (p *WorkerPool) Start(ctx context.Context) error
- func (p *WorkerPool) Stop() error
- type WorkerPoolFactory
- type WorkerPoolOption
- func WithGenerator(generator uuid.UuidGenerator) WorkerPoolOption
- func WithGlobalDeferredStartThreshold(threshold float64) WorkerPoolOption
- func WithGlobalMaxExecutionsAttempts(max int) WorkerPoolOption
- func WithMetrics(metrics *WorkerMetrics) WorkerPoolOption
- func WithWorker(worker Worker, options ...WorkerExecutionOption) WorkerPoolOption
- type WorkerRegistration
- type WorkerStatus
Constants ¶
const ( ExecutionStarted = "started" ExecutionRestarted = "restarted" ExecutionSuccess = "success" ExecutionError = "error" )
const ( DefaultDeferredStartThreshold = 0 DefaultMaxExecutionsAttempts = 1 DefaultMetricsNamespace = "" DefaultMetricsSubsystem = "" )
const ( LogRecordFieldWorkerName = "worker" LogRecordFieldWorkerExecutionId = "workerExecutionID" TraceSpanAttributeWorkerName = "Worker" TraceSpanAttributeWorkerExecutionId = "WorkerExecutionID" )
const TracerName = "worker"
TracerName is the workers tracer name.
Variables ¶
This section is empty.
Functions ¶
func AnnotateTracerProvider ¶
func AnnotateTracerProvider(base oteltrace.TracerProvider) oteltrace.TracerProvider
AnnotateTracerProvider extends a provided oteltrace.TracerProvider spans with worker execution attributes.
func CtxLogger ¶
CtxLogger returns the contextual log.Logger.
func CtxTracer ¶
CtxTracer returns the contextual oteltrace.Tracer.
func CtxWorkerExecutionId ¶
CtxWorkerExecutionId returns the contextual Worker execution id.
func CtxWorkerName ¶
CtxWorkerName returns the contextual Worker name.
Types ¶
type CtxWorkerExecutionIdKey ¶
type CtxWorkerExecutionIdKey struct{}
CtxWorkerExecutionIdKey is a contextual struct key for the current worker execution id.
type CtxWorkerNameKey ¶
type CtxWorkerNameKey struct{}
CtxWorkerNameKey is a contextual struct key for the current worker name.
type DefaultWorkerPoolFactory ¶
type DefaultWorkerPoolFactory struct{}
DefaultWorkerPoolFactory is the default WorkerPoolFactory implementation.
func (*DefaultWorkerPoolFactory) Create ¶
func (f *DefaultWorkerPoolFactory) Create(options ...WorkerPoolOption) (*WorkerPool, error)
Create returns a new WorkerPool, and accepts a list of WorkerPoolOption. For example:
var pool, _ = worker.NewDefaultWorkerPoolFactory().Create()
is equivalent to:
var pool, _ = worker.NewDefaultWorkerPoolFactory().Create( worker.WithGenerator(uuid.NewDefaultUuidGenerator()), // generator worker.WithMetrics(worker.NewWorkerMetrics("", "")), // metrics worker.WithGlobalMaxExecutionsAttempts(1), // no retries worker.WithGlobalDeferredStartThreshold(0), // no deferred start )
type ExecutionOptions ¶
ExecutionOptions are options for the Worker executions.
func DefaultWorkerExecutionOptions ¶
func DefaultWorkerExecutionOptions() ExecutionOptions
DefaultWorkerExecutionOptions are the default options for the Worker executions.
type PoolOptions ¶
type PoolOptions struct { GlobalDeferredStartThreshold float64 GlobalMaxExecutionsAttempts int Metrics *WorkerMetrics Generator uuid.UuidGenerator Registrations map[string]*WorkerRegistration }
PoolOptions are options for the WorkerPoolFactory implementations.
func DefaultWorkerPoolOptions ¶
func DefaultWorkerPoolOptions() PoolOptions
DefaultWorkerPoolOptions are the default options used in the DefaultWorkerPoolFactory.
type TracerProviderWorkerAnnotator ¶
type TracerProviderWorkerAnnotator struct{}
TracerProviderWorkerAnnotator is the oteltrace.TracerProvider workers annotator, implementing otelsdktrace.SpanProcessor.
func NewTracerProviderWorkerAnnotator ¶
func NewTracerProviderWorkerAnnotator() *TracerProviderWorkerAnnotator
NewTracerProviderWorkerAnnotator returns a new TracerProviderWorkerAnnotator.
func (*TracerProviderWorkerAnnotator) ForceFlush ¶
func (a *TracerProviderWorkerAnnotator) ForceFlush(context.Context) error
ForceFlush is just for otelsdktrace.SpanProcessor compliance.
func (*TracerProviderWorkerAnnotator) OnEnd ¶
func (a *TracerProviderWorkerAnnotator) OnEnd(otelsdktrace.ReadOnlySpan)
OnEnd is just for otelsdktrace.SpanProcessor compliance.
func (*TracerProviderWorkerAnnotator) OnStart ¶
func (a *TracerProviderWorkerAnnotator) OnStart(ctx context.Context, s otelsdktrace.ReadWriteSpan)
OnStart adds worker execution attributes to a given otelsdktrace.ReadWriteSpan.
func (*TracerProviderWorkerAnnotator) Shutdown ¶
func (a *TracerProviderWorkerAnnotator) Shutdown(context.Context) error
Shutdown is just for otelsdktrace.SpanProcessor compliance.
type WorkerExecution ¶
type WorkerExecution struct {
// contains filtered or unexported fields
}
WorkerExecution represents a Worker execution within the WorkerPool.
func NewWorkerExecution ¶
func NewWorkerExecution(id string, name string, options ExecutionOptions) *WorkerExecution
NewWorkerExecution returns a new WorkerExecution.
func (*WorkerExecution) AddEvent ¶
func (e *WorkerExecution) AddEvent(message string) *WorkerExecution
AddEvent adds a WorkerExecutionEvent to the WorkerExecution.
func (*WorkerExecution) CurrentExecutionAttempt ¶
func (e *WorkerExecution) CurrentExecutionAttempt() int
CurrentExecutionAttempt returns the WorkerExecution current execution attempt.
func (*WorkerExecution) DeferredStartThreshold ¶
func (e *WorkerExecution) DeferredStartThreshold() float64
DeferredStartThreshold returns the WorkerExecution max deferred start threshold, in seconds.
func (*WorkerExecution) Events ¶
func (e *WorkerExecution) Events() []*WorkerExecutionEvent
Events returns the WorkerExecution list of WorkerExecutionEvent.
func (*WorkerExecution) HasEvent ¶
func (e *WorkerExecution) HasEvent(message string) bool
HasEvent returns true if a WorkerExecutionEvent was found for a given message.
func (*WorkerExecution) Id ¶
func (e *WorkerExecution) Id() string
Id returns the WorkerExecution id.
func (*WorkerExecution) MaxExecutionsAttempts ¶
func (e *WorkerExecution) MaxExecutionsAttempts() int
MaxExecutionsAttempts returns the WorkerExecution max execution attempts.
func (*WorkerExecution) Name ¶
func (e *WorkerExecution) Name() string
Name returns the WorkerExecution name.
func (*WorkerExecution) SetCurrentExecutionAttempt ¶
func (e *WorkerExecution) SetCurrentExecutionAttempt(current int) *WorkerExecution
SetCurrentExecutionAttempt sets the WorkerExecution current execution attempt.
func (*WorkerExecution) SetDeferredStartThreshold ¶
func (e *WorkerExecution) SetDeferredStartThreshold(threshold float64) *WorkerExecution
SetDeferredStartThreshold sets the WorkerExecution max deferred start threshold, in seconds.
func (*WorkerExecution) SetId ¶
func (e *WorkerExecution) SetId(id string) *WorkerExecution
SetId sets the WorkerExecution id.
func (*WorkerExecution) SetMaxExecutionsAttempts ¶
func (e *WorkerExecution) SetMaxExecutionsAttempts(max int) *WorkerExecution
SetMaxExecutionsAttempts sets the WorkerExecution max execution attempts.
func (*WorkerExecution) SetName ¶
func (e *WorkerExecution) SetName(name string) *WorkerExecution
SetName sets the WorkerExecution name.
func (*WorkerExecution) SetStatus ¶
func (e *WorkerExecution) SetStatus(status WorkerStatus) *WorkerExecution
SetStatus sets the WorkerExecution status.
func (*WorkerExecution) Status ¶
func (e *WorkerExecution) Status() WorkerStatus
Status returns the WorkerExecution status.
type WorkerExecutionEvent ¶
type WorkerExecutionEvent struct {
// contains filtered or unexported fields
}
WorkerExecutionEvent is an event happening during a Worker execution.
func NewWorkerExecutionEvent ¶
func NewWorkerExecutionEvent(executionId string, message string, timestamp time.Time) *WorkerExecutionEvent
NewWorkerExecutionEvent returns a new WorkerExecutionEvent.
func (*WorkerExecutionEvent) ExecutionId ¶
func (e *WorkerExecutionEvent) ExecutionId() string
ExecutionId returns the worker execution id.
func (*WorkerExecutionEvent) Message ¶
func (e *WorkerExecutionEvent) Message() string
Message returns the worker execution message.
func (*WorkerExecutionEvent) String ¶
func (e *WorkerExecutionEvent) String() string
String returns a string representation of the WorkerExecutionEvent.
func (*WorkerExecutionEvent) Timestamp ¶
func (e *WorkerExecutionEvent) Timestamp() time.Time
Timestamp returns the worker execution timestamp.
type WorkerExecutionOption ¶
type WorkerExecutionOption func(o *ExecutionOptions)
WorkerExecutionOption are functional options for the Worker executions.
func WithDeferredStartThreshold ¶
func WithDeferredStartThreshold(t float64) WorkerExecutionOption
WithDeferredStartThreshold is used to specify the worker deferred start threshold, in seconds.
func WithMaxExecutionsAttempts ¶
func WithMaxExecutionsAttempts(l int) WorkerExecutionOption
WithMaxExecutionsAttempts is used to specify the worker max execution attempts.
type WorkerMetrics ¶
type WorkerMetrics struct {
// contains filtered or unexported fields
}
WorkerMetrics allows the WorkerPool to send worker metrics to a prometheus.Registry.
func NewWorkerMetrics ¶
func NewWorkerMetrics(namespace string, subsystem string) *WorkerMetrics
NewWorkerMetrics returns a new WorkerMetrics, and accepts metrics namespace and subsystem.
func (*WorkerMetrics) IncrementWorkerExecutionError ¶
func (m *WorkerMetrics) IncrementWorkerExecutionError(workerName string) *WorkerMetrics
IncrementWorkerExecutionError increments the failing workers counter for a given worker name.
func (*WorkerMetrics) IncrementWorkerExecutionRestart ¶
func (m *WorkerMetrics) IncrementWorkerExecutionRestart(workerName string) *WorkerMetrics
IncrementWorkerExecutionRestart increments the restarted workers counter for a given worker name.
func (*WorkerMetrics) IncrementWorkerExecutionStart ¶
func (m *WorkerMetrics) IncrementWorkerExecutionStart(workerName string) *WorkerMetrics
IncrementWorkerExecutionStart increments the started workers counter for a given worker name.
func (*WorkerMetrics) IncrementWorkerExecutionSuccess ¶
func (m *WorkerMetrics) IncrementWorkerExecutionSuccess(workerName string) *WorkerMetrics
IncrementWorkerExecutionSuccess increments the successful workers counter for a given worker name.
func (*WorkerMetrics) Register ¶
func (m *WorkerMetrics) Register(registry *prometheus.Registry) error
Register registers the WorkerMetrics against a prometheus.Registry.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is the Worker pool.
func NewWorkerPool ¶
func NewWorkerPool(options ...WorkerPoolOption) *WorkerPool
NewWorkerPool returns a new WorkerPool, with optional WorkerPoolOption.
func (*WorkerPool) Execution ¶
func (p *WorkerPool) Execution(name string) (*WorkerExecution, error)
Execution returns the WorkerExecution from the WorkerPool for a given worker name.
func (*WorkerPool) Executions ¶
func (p *WorkerPool) Executions() map[string]*WorkerExecution
Executions returns the WorkerPool list of WorkerExecution.
func (*WorkerPool) Metrics ¶
func (p *WorkerPool) Metrics() *WorkerMetrics
Metrics returns the WorkerPool internal WorkerMetrics.
func (*WorkerPool) Options ¶
func (p *WorkerPool) Options() PoolOptions
Options returns the list of PoolOptions of the WorkerPool.
func (*WorkerPool) Register ¶
func (p *WorkerPool) Register(registrations ...*WorkerRegistration) *WorkerPool
Register registers a new WorkerRegistration onto the WorkerPool.
func (*WorkerPool) Registration ¶
func (p *WorkerPool) Registration(name string) (*WorkerRegistration, error)
Registration returns the WorkerRegistration from the WorkerPool for a given worker name.
func (*WorkerPool) Registrations ¶
func (p *WorkerPool) Registrations() map[string]*WorkerRegistration
Registrations returns the WorkerPool list of WorkerRegistration.
func (*WorkerPool) Start ¶
func (p *WorkerPool) Start(ctx context.Context) error
Start starts all Worker registered in the WorkerPool.
func (*WorkerPool) Stop ¶
func (p *WorkerPool) Stop() error
Stop gracefully stops all Worker registered in the WorkerPool.
type WorkerPoolFactory ¶
type WorkerPoolFactory interface {
Create(options ...WorkerPoolOption) (*WorkerPool, error)
}
WorkerPoolFactory is the interface for WorkerPool factories.
func NewDefaultWorkerPoolFactory ¶
func NewDefaultWorkerPoolFactory() WorkerPoolFactory
NewDefaultWorkerPoolFactory returns a DefaultWorkerPoolFactory, implementing WorkerPoolFactory.
type WorkerPoolOption ¶
type WorkerPoolOption func(o *PoolOptions)
WorkerPoolOption are functional options for the WorkerPoolFactory implementations.
func WithGenerator ¶
func WithGenerator(generator uuid.UuidGenerator) WorkerPoolOption
WithGenerator is used to specify the uuid.UuidGenerator to use by the WorkerPool.
func WithGlobalDeferredStartThreshold ¶
func WithGlobalDeferredStartThreshold(threshold float64) WorkerPoolOption
WithGlobalDeferredStartThreshold is used to specify the global workers deferred start threshold, in seconds.
func WithGlobalMaxExecutionsAttempts ¶
func WithGlobalMaxExecutionsAttempts(max int) WorkerPoolOption
WithGlobalMaxExecutionsAttempts is used to specify the global workers max execution attempts.
func WithMetrics ¶
func WithMetrics(metrics *WorkerMetrics) WorkerPoolOption
WithMetrics is used to specify the WorkerMetrics to use by the WorkerPool.
func WithWorker ¶
func WithWorker(worker Worker, options ...WorkerExecutionOption) WorkerPoolOption
WithWorker is used to register a Worker in the WorkerPool, with an optional list of WorkerPoolOption.
type WorkerRegistration ¶
type WorkerRegistration struct {
// contains filtered or unexported fields
}
WorkerRegistration is a Worker registration, with optional WorkerExecutionOption.
func NewWorkerRegistration ¶
func NewWorkerRegistration(worker Worker, options ...WorkerExecutionOption) *WorkerRegistration
NewWorkerRegistration returns a new WorkerRegistration for a given Worker and an optional list of WorkerRegistration.
func (*WorkerRegistration) Options ¶
func (r *WorkerRegistration) Options() []WorkerExecutionOption
Options returns the list of WorkerExecutionOption of the WorkerRegistration.
func (*WorkerRegistration) Worker ¶
func (r *WorkerRegistration) Worker() Worker
Worker returns the Worker of the WorkerRegistration.
type WorkerStatus ¶
type WorkerStatus int
WorkerStatus is an enum for the possible statuses of a workers.
const ( Unknown WorkerStatus = iota Deferred Running Success Error )
func (WorkerStatus) String ¶
func (s WorkerStatus) String() string
String returns a string representation of the WorkerStatus.