gueron

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2022 License: MIT Imports: 17 Imported by: 0

README

gueron

GoDev Coverage Status ReportCard License

Gueron is Golang cron implemented on top of github.com/vgarvardt/gue. It uses github.com/robfig/cron/v3 to calculate execution time for the jobs and schedules them using gue Jobs that are being handled by the gue workers. Scheduler controls that the jobs will be scheduled only once, even if it runs on several instances.

It is up to the user to control the workers pool size, so take into consideration peak number of jobs that is going to be scheduled if it is critical to handle jobs ASAP and avoid delayed execution.

Install

go get -u github.com/vgarvardt/gueron

Additionally, you need to apply DB migration (includes gue migration as well).

Scheduler format

Scheduler uses github.com/robfig/cron/v3 under the hood and is set up to work with the crontab format

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                   7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * *

and with the nonstandard predefined scheduling definitions:

  • @yearly (or @annually) => 0 0 1 1 *
  • @monthly => 0 0 1 * *
  • @weekly => 0 0 * * 0
  • @daily (or @midnight) => 0 0 * * *
  • @hourly => 0 * * * *
  • @every [interval] where [interval] is the duration string that can be parsed by time.ParseDuration()

Usage Example

package main

import (
  "context"
  "log"
  "os"
  "os/signal"
  "syscall"

  "github.com/jackc/pgx/v5/pgxpool"
  "github.com/vgarvardt/gue/v4"
  "github.com/vgarvardt/gue/v4/adapter/pgxv5"
)

func main() {
  pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }

  pgxPool, err := pgxpool.NewConfig(context.Background(), pgxCfg)
  if err != nil {
    log.Fatal(err)
  }
  defer pgxPool.Close()

  poolAdapter := pgxv5.NewConnPool(pgxPool)

  s, err := gueron.NewScheduler(poolAdapter)
  if err != nil {
    log.Fatal(err)
  }

  wm := gue.WorkMap{}

  s.MustAdd("@every 15m", "log-foo-bar", nil)
  wm["log-foo-bar"] = func(ctx context.Context, j *gue.Job) error {
    log.Printf("Working scheduled job: %d\n", j.ID)
    return nil
  }

  ctx, cancel := context.WithCancel(context.Background())
  go func() {
    if err := s.Run(ctx, wm, 4); err != nil {
      log.Fatal(err)
    }
  }()

  sigChan := make(chan os.Signal, 1)
  signal.Notify(sigChan, os.Interrupt,
    syscall.SIGHUP,
    syscall.SIGINT,
    syscall.SIGTERM,
    syscall.SIGQUIT,
  )

  // wait for exit signal, e.g. the one sent by Ctrl-C
  sig := <-sigChan
  cancel()
  log.Printf("Got exit signal [%s], exiting...\n", sig.String())
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler responsible for collecting period tasks and generating gue.Job list for defined period of time.

func NewScheduler

func NewScheduler(pool adapter.ConnPool, opts ...SchedulerOption) (*Scheduler, error)

NewScheduler builds new Scheduler instance. Note that internally Scheduler uses gue.Client with the backoff set to gue.BackoffNever, so any errored job will be discarded immediately - this is how original cron works.

func (*Scheduler) Add

func (s *Scheduler) Add(spec, jobType string, args []byte) (*Scheduler, error)

Add adds new periodic task information to the Scheduler. Parameters are:

  • spec is the cron specification parsable by the github.com/robfig/cron/v3
  • jobType gue.Job Type value, make sure that gue.WorkerPool that will be handling jobs is aware of all the values
  • args is the gue.Job Args, can be used to pass some static parameters to the scheduled job, e.g. when the same job type is used in several crons and handler has some branching logic based on the arguments. Make sure this value is valid JSON as this is gue DB constraint

func (*Scheduler) MustAdd

func (s *Scheduler) MustAdd(spec, jobType string, args []byte) *Scheduler

MustAdd is the same as Scheduler.Add but instead of returning an error it panics.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context, wm gue.WorkMap, poolSize int, options ...gue.WorkerPoolOption) error

Run initializes cron jobs and gue.WorkerPool that handles them. Run blocks until all workers exit. Use context cancellation for shutdown. WorkerMap parameter must have all the handlers that are going to handle cron jobs. Note that some gue.WorkerPoolOption will be overridden by Scheduler, they are:

  • gue.WithPoolQueue - Scheduler queue will be set, use WithQueueName if you need to customise it
  • gue.WithPoolID - "gueron-<random-id>/pool" will be used
  • gue.WithPoolLogger - Scheduler logger will be set, use WithLogger if you need to customise it

type SchedulerOption

type SchedulerOption func(s *Scheduler)

SchedulerOption is the Scheduler builder options

func WithHorizon

func WithHorizon(d time.Duration) SchedulerOption

WithHorizon sets the scheduler cron jobs scheduling horizon. The more often the app is being redeployed/restarted the shorter the schedule horizon should be as rescheduling causes stop-the-world situation, so the fewer jobs to schedule or the shorter the horizon - the quicker the crons are ready to perform the duties.

func WithLogger

func WithLogger(logger adapter.Logger) SchedulerOption

WithLogger sets logger that will be used both for scheduler and gue.Client log

func WithMeter

func WithMeter(meter metric.Meter) SchedulerOption

WithMeter sets metric.Meter to the underlying gue.Client.

func WithQueueName

func WithQueueName(qName string) SchedulerOption

WithQueueName sets custom scheduler queue name

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL