fxcron

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: MIT Imports: 16 Imported by: 0

README

Fx Cron Module

ci go report codecov Deps PkgGoDev

Fx module for gocron.

Installation

go get github.com/ankorstore/yokai/fxcron

Features

This module provides the possibility to run internal cron jobs in your application with:

  • automatic panic recovery
  • configurable cron jobs scheduling and execution options
  • configurable logging, tracing and metrics for cron jobs executions

Documentation

Dependencies

This module is intended to be used alongside:

Loading

To load the module in your Fx application:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxcron"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"github.com/go-co-op/gocron/v2"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule,                      // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxtrace.FxTraceModule,
		fxmetrics.FxMetricsModule,
		fxgenerate.FxGenerateModule,
		fxcron.FxCronModule,                          // load the module
		fx.Invoke(func(scheduler gocron.Scheduler) {
			scheduler.Start()                         // start the cron jobs scheduler
		}),
	).Run()
}
Configuration

Configuration reference:

# ./configs/config.yaml
app:
  name: app
  env: dev
  version: 0.1.0
  debug: true
modules:
  log:
    level: info
    output: stdout
  trace:
    processor:
      type: stdout
  cron:
    scheduler:
      seconds: true                   # to allow seconds based cron jobs expressions (impact all jobs), disabled by default
      concurrency:
        limit:
          enabled: true               # to limit concurrent cron jobs executions, disabled by default
          max: 3                      # concurrency limit
          mode: wait                  # "wait" or "reschedule"
      stop:
        timeout: 5s                   # scheduler shutdown timeout for graceful cron jobs termination, 10 seconds by default
    jobs:                             # common cron jobs options
      execution:
        start:
          immediately: true           # to start cron jobs executions immediately (by default)
          at: "2023-01-01T14:00:00Z"  # or a given date time (RFC3339)
        limit:
          enabled: true               # to limit the number of per cron jobs executions, disabled by default
          max: 3                      # executions limit
      singleton:
        enabled: true                 # to execute the cron jobs in singleton mode, disabled by default
        mode: wait                    # "wait" or "reschedule"
    log:
      enabled: true                   # to log cron jobs executions, disabled by default (errors will always be logged).
      exclude:                        # to exclude by name cron jobs from logging
        - foo
        - bar
    metrics:
      collect:
        enabled: true                 # to collect cron jobs executions metrics (executions count and duration), disabled by default
        namespace: foo                # cron jobs metrics namespace (empty by default)
        subsystem: bar                # cron jobs metrics subsystem (empty by default)
      buckets: 1, 1.5, 10, 15, 100    # to define custom cron jobs executions durations metric buckets (in seconds)
    trace:
      enabled: true                   # to trace cron jobs executions, disabled by default
      exclude:                        # to exclude by name cron jobs from tracing
        - foo
        - bar

Notes:

  • the cron jobs executions logging will be based on the fxlog module configuration
  • the cron jobs executions tracing will be based on the fxtrace module configuration

Check the configuration files documentation for more details.

Cron jobs
Definition

This module provides a simple CronJob interface to implement for your cron jobs:

package cron

import (
	"context"

	"github.com/ankorstore/yokai/fxcron"
	"path/to/service"
)

type SomeCron struct {
	service *service.SomeService
}

func NewSomeCron(service *service.SomeService) *SomeCron {
	return &SomeCron{
		service: service,
	}
}

func (c *SomeCron) Name() string {
	return "some cron job"
}

func (c *SomeCron) Run(ctx context.Context) error {
	// contextual job name and execution id
	name, id := fxcron.CtxCronJobName(ctx), fxcron.CtxCronJobExecutionId(ctx)

	// contextual tracing
	ctx, span := fxcron.CtxTracer(ctx).Start(ctx, "some span")
	defer span.End()

	// contextual logging
	fxcron.CtxLogger(ctx).Info().Msg("some log")

	// invoke autowired dependency
	err := c.service.DoSomething(ctx, name, id)

	// returned errors will automatically be logged
	return err
}

Notes:

  • your cron job dependencies will be autowired
  • you can access from the provided context:
    • the cron job name with CtxCronJobName()
    • the cron job execution id with CtxCronJobExecutionId()
    • the tracer with CtxTracer(), which will automatically add to your spans the CronJob name and CronJobExecutionID attributes
    • the logger with CtxLogger(), which will automatically add to your log records the cronJob name and cronJobExecutionID fields
Registration

Once ready, you can register and schedule your cron job with AsCronJob():

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxcron"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"github.com/go-co-op/gocron/v2"
	"go.uber.org/fx"
	"path/to/cron"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule,      // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxmetrics.FxMetricsModule,
		fxgenerate.FxGenerateModule,
		fxcron.FxCronModule,          // load the module
		fx.Options(
			// register, autowire and schedule SomeCron to run every 2 minutes
			fxcron.AsCronJob(cron.NewSomeCron, `*/2 * * * *`),
		),
	).Run()
}

You can override, per job, the common job execution options by providing your own list of gocron.JobOption, for example:

fxcron.AsCronJob(cron.NewSomeCron, `*/2 * * * *`, gocron.WithLimitedRuns(10)),

If you need cron jobs to be scheduled on the seconds level, configure the scheduler with modules.cron.scheduler.seconds=true.

It will add seconds field to the beginning of the scheduling expression, for example to run every 5 seconds:

fxcron.AsCronJob(cron.NewSomeCron, `*/5 * * * * *`),

Note: you can use https://crontab.guru to help you with your scheduling definitions.

Override

By default, the gocron.Scheduler is created by the DefaultCronSchedulerFactory.

If needed, you can provide your own factory and override the module:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxcron"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"github.com/go-co-op/gocron/v2"
	"go.uber.org/fx"
)

type CustomCronSchedulerFactory struct{}

func NewCustomCronSchedulerFactory() fxcron.CronSchedulerFactory {
	return &CustomCronSchedulerFactory{}
}

func (f *CustomCronSchedulerFactory) Create(options ...gocron.SchedulerOption) (gocron.Scheduler, error) {
	return gocron.NewScheduler(options...)
}

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxmetrics.FxMetricsModule,
		fxgenerate.FxGenerateModule,
		fxcron.FxCronModule,                         // load the module
		fx.Decorate(NewCustomCronSchedulerFactory),  // override the module with a custom factory
		fx.Invoke(func(scheduler gocron.Scheduler) { // invoke the cron scheduler
			// ...
		}),
	).Run()
}

Documentation

Index

Constants

View Source
const (
	EXECUTION_SUCCESS = "success"
	EXECUTION_ERROR   = "error"
)
View Source
const (
	ModuleName                           = "cron"
	LogRecordFieldCronJobName            = "cronJob"
	LogRecordFieldCronJobExecutionId     = "cronJobExecutionID"
	TraceSpanAttributeCronJobName        = "CronJob"
	TraceSpanAttributeCronJobExecutionId = "CronJobExecutionID"
)
View Source
const NON_AVAILABLE = "n/a"

Variables

View Source
var FxCronModule = fx.Module(
	ModuleName,
	fx.Provide(
		NewDefaultCronSchedulerFactory,
		NewFxCronJobRegistry,
		NewFxCron,
		fx.Annotate(
			NewFxCronModuleInfo,
			fx.As(new(interface{})),
			fx.ResultTags(`group:"core-module-infos"`),
		),
	),
)

FxCronModule is the Fx cron module.

Functions

func AnnotateTracerProvider

func AnnotateTracerProvider(base oteltrace.TracerProvider) oteltrace.TracerProvider

AnnotateTracerProvider extends a provided oteltrace.TracerProvider spans with cron jobs execution attributes.

func AsCronJob

func AsCronJob(j any, expression string, options ...gocron.JobOption) fx.Option

AsCronJob registers a cron job into Fx, with an optional list of gocron.JobOption.

func Contains

func Contains(list []string, item string) bool

Contains returns true if a provided string is found in a list of strings.

func CtxCronJobExecutionId

func CtxCronJobExecutionId(ctx context.Context) string

CtxCronJobExecutionId returns the contextual cron job execution id.

func CtxCronJobName

func CtxCronJobName(ctx context.Context) string

CtxCronJobName returns the contextual cron job name.

func CtxLogger

func CtxLogger(ctx context.Context) *log.Logger

CtxLogger returns the contextual logger.

func CtxTracer

func CtxTracer(ctx context.Context) oteltrace.Tracer

CtxTracer returns the contextual tracer.

func GetReturnType

func GetReturnType(target any) string

GetReturnType returns the return type of a target.

func GetType

func GetType(target any) string

GetType returns the type of a target.

func NewFxCron

func NewFxCron(p FxCronParam) (gocron.Scheduler, error)

NewFxCron returns a new gocron.Scheduler.

func Sanitize

func Sanitize(str string) string

Sanitize transforms a given string to not contain spaces or dashes, and to be in lower case.

func Split added in v1.1.0

func Split(str string) []string

Split trims and splits a provided string by comma.

Types

type CronJob

type CronJob interface {
	Name() string
	Run(ctx context.Context) error
}

CronJob is the interface for cron jobs.

type CronJobDefinition

type CronJobDefinition interface {
	ReturnType() string
	Expression() string
	Options() []gocron.JobOption
}

CronJobDefinition is the interface for cron job definitions.

func NewCronJobDefinition

func NewCronJobDefinition(returnType string, expression string, options ...gocron.JobOption) CronJobDefinition

NewCronJobDefinition returns a new CronJobDefinition.

type CronJobMetrics

type CronJobMetrics struct {
	// contains filtered or unexported fields
}

CronJobMetrics is the metrics handler for the cron jobs.

func NewCronJobMetrics

func NewCronJobMetrics(namespace string, subsystem string) *CronJobMetrics

NewCronJobMetrics returns a new CronJobMetrics instance for provided metrics namespace and subsystem.

func NewCronJobMetricsWithBuckets

func NewCronJobMetricsWithBuckets(namespace string, subsystem string, buckets []float64) *CronJobMetrics

NewCronJobMetricsWithBuckets returns a new CronJobMetrics instance for provided metrics namespace, subsystem and buckets.

func (*CronJobMetrics) IncrementCronJobExecutionError

func (m *CronJobMetrics) IncrementCronJobExecutionError(jobName string) *CronJobMetrics

IncrementCronJobExecutionError increments the number of execution errors for a given cron job.

func (*CronJobMetrics) IncrementCronJobExecutionSuccess

func (m *CronJobMetrics) IncrementCronJobExecutionSuccess(jobName string) *CronJobMetrics

IncrementCronJobExecutionSuccess increments the number of execution successes for a given cron job.

func (*CronJobMetrics) ObserveCronJobExecutionDuration

func (m *CronJobMetrics) ObserveCronJobExecutionDuration(jobName string, jobDuration float64) *CronJobMetrics

ObserveCronJobExecutionDuration observes the duration of a cron job execution.

func (*CronJobMetrics) Register

func (m *CronJobMetrics) Register(registry *prometheus.Registry) error

Register allows the CronJobMetrics to register against a provided prometheus.Registry.

type CronJobRegistry

type CronJobRegistry struct {
	// contains filtered or unexported fields
}

CronJobRegistry is the registry collecting cron jobs and their definitions.

func NewFxCronJobRegistry

func NewFxCronJobRegistry(p FxCronJobRegistryParam) *CronJobRegistry

NewFxCronJobRegistry returns as new CronJobRegistry.

func (*CronJobRegistry) ResolveCronJobs

func (r *CronJobRegistry) ResolveCronJobs() ([]*ResolvedCronJob, error)

ResolveCronJobs resolves a list of ResolvedCronJob from their definitions.

type CronSchedulerFactory

type CronSchedulerFactory interface {
	Create(options ...gocron.SchedulerOption) (gocron.Scheduler, error)
}

CronSchedulerFactory is the interface for gocron.Scheduler factories.

func NewDefaultCronSchedulerFactory

func NewDefaultCronSchedulerFactory() CronSchedulerFactory

NewDefaultCronSchedulerFactory returns a DefaultCronSchedulerFactory, implementing CronSchedulerFactory.

type CtxCronJobExecutionIdKey

type CtxCronJobExecutionIdKey struct{}

CtxCronJobExecutionIdKey is a contextual struct key.

type CtxCronJobNameKey

type CtxCronJobNameKey struct{}

CtxCronJobNameKey is a contextual struct key.

type DefaultCronSchedulerFactory

type DefaultCronSchedulerFactory struct{}

DefaultCronSchedulerFactory is the default CronSchedulerFactory implementation.

func (*DefaultCronSchedulerFactory) Create

func (f *DefaultCronSchedulerFactory) Create(options ...gocron.SchedulerOption) (gocron.Scheduler, error)

Create returns a new gocron.Scheduler instance for an optional list of gocron.SchedulerOption.

type FxCronJobRegistryParam

type FxCronJobRegistryParam struct {
	fx.In
	CronJobs            []CronJob           `group:"cron-jobs"`
	CronJobsDefinitions []CronJobDefinition `group:"cron-jobs-definitions"`
}

FxCronJobRegistryParam allows injection of the required dependencies in NewFxCronJobRegistry.

type FxCronModuleInfo

type FxCronModuleInfo struct {
	// contains filtered or unexported fields
}

FxCronModuleInfo is a module info collector for fxcore.

func NewFxCronModuleInfo

func NewFxCronModuleInfo(scheduler gocron.Scheduler, registry *CronJobRegistry) *FxCronModuleInfo

NewFxCronModuleInfo returns a new FxCronModuleInfo.

func (*FxCronModuleInfo) Data

func (i *FxCronModuleInfo) Data() map[string]interface{}

Data return the data of the module info.

func (*FxCronModuleInfo) Name

func (i *FxCronModuleInfo) Name() string

Name return the name of the module info.

type FxCronParam

type FxCronParam struct {
	fx.In
	LifeCycle       fx.Lifecycle
	Generator       uuid.UuidGenerator
	TracerProvider  oteltrace.TracerProvider
	Factory         CronSchedulerFactory
	Config          *config.Config
	Registry        *CronJobRegistry
	Logger          *log.Logger
	MetricsRegistry *prometheus.Registry
}

FxCronParam allows injection of the required dependencies in NewFxCron.

type ResolvedCronJob

type ResolvedCronJob struct {
	// contains filtered or unexported fields
}

ResolvedCronJob represents a resolved cron job, with its expression and execution options.

func NewResolvedCronJob

func NewResolvedCronJob(implementation CronJob, expression string, options ...gocron.JobOption) *ResolvedCronJob

NewResolvedCronJob returns a new ResolvedCronJob instance.

func (*ResolvedCronJob) Expression

func (r *ResolvedCronJob) Expression() string

Expression returns the ResolvedCronJob cron job expression.

func (*ResolvedCronJob) Implementation

func (r *ResolvedCronJob) Implementation() CronJob

Implementation returns the ResolvedCronJob cron job implementation.

func (*ResolvedCronJob) Options

func (r *ResolvedCronJob) Options() []gocron.JobOption

Options returns the ResolvedCronJob cron job execution options.

type TracerProviderCronJobAnnotator

type TracerProviderCronJobAnnotator struct{}

TracerProviderCronJobAnnotator is the oteltrace.TracerProvider cron jobs annotator, implementing otelsdktrace.SpanProcessor.

func NewTracerProviderCronJobAnnotator

func NewTracerProviderCronJobAnnotator() *TracerProviderCronJobAnnotator

NewTracerProviderCronJobAnnotator returns a new [TracerProviderWorkerAnnotator].

func (*TracerProviderCronJobAnnotator) ForceFlush

ForceFlush is just for otelsdktrace.SpanProcessor compliance.

func (*TracerProviderCronJobAnnotator) OnEnd

OnEnd is just for otelsdktrace.SpanProcessor compliance.

func (*TracerProviderCronJobAnnotator) OnStart

OnStart adds cron job execution attributes to a given otelsdktrace.ReadWriteSpan.

func (*TracerProviderCronJobAnnotator) Shutdown

Shutdown is just for otelsdktrace.SpanProcessor compliance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL