dalga

package module
v4.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2021 License: MIT Imports: 21 Imported by: 0

README

Dalga

Dalga is a job scheduler. It's like cron-as-a-service.

  • Can schedule periodic or one-off jobs.
  • Stores jobs in a MySQL table with location info.
  • Has an HTTP interface for scheduling and cancelling jobs.
  • Makes a POST request to the endpoint defined in config on the job's execution time.
  • Retries failed jobs with constant or exponential backoff.
  • Multiple instances can be run for high availability and scaling out.

Install

Use pre-built Docker image:

$ docker run -e DALGA_MYSQL_HOST=mysql.example.com cenkalti/dalga

or download the latest binary from releases page.

Usage

See example config file for configuration options. TOML and YAML file formats are supported. Configuration values can also be set via environment variables with DALGA_ prefix.

First, you must create the table for storing jobs:

$ dalga -config dalga.toml -create-tables

Then, run the server:

$ dalga -config dalga.toml

Schedule a new job to run every 60 seconds:

$ curl -i -X PUT 'http://127.0.0.1:34006/jobs/check_feed/1234?interval=60'
HTTP/1.1 201 Created
Content-Type: application/json; charset=utf-8
Date: Tue, 11 Nov 2014 22:10:40 GMT
Content-Length: 83

{"path":"check_feed","body":"1234","interval":60,"next_run":"2014-11-11T22:11:40Z"}

PUT always returns 201. If there is an existing job with path and body, it will be rescheduled.

There are 4 options that you can pass to Schedule but not every combination is valid:

Param Description Type Example
interval Run job at intervals Integer or ISO 8601 interval 60 or PT60S
first-run Do not run job until this time RFC3339 Timestamp 1985-04-12T23:20:50.52Z
one-off Run job only once Boolean true, false, 1, 0
immediate Run job immediately as it is scheduled Boolean true, false, 1, 0

60 seconds later, Dalga makes a POST to your endpoint defined in config:

Path: <config.baseurl>/<job.path>
Body: <job.body>

The endpoint must return 200 if the job is successful.

The endpoint may return 204 if job is invalid. In this case Dalga will remove the job from the table.

Anything other than 200 or 204 makes Dalga to retry the job indefinitely with an exponential backoff.

Get the status of a job:

$ curl -i -X GET 'http://127.0.0.1:34006/jobs/check_feed/1234'
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Tue, 11 Nov 2014 22:12:21 GMT
Content-Length: 83

{"path":"check_feed","body":"1234","interval":60,"next_run":"2014-11-11T22:12:41Z"}

GET may return 404 if job is not found.

Cancel previously scheduled job:

$ curl -i -X DELETE 'http://127.0.0.1:34006/jobs/check_feed/1234'
HTTP/1.1 204 No Content
Date: Tue, 11 Nov 2014 22:13:35 GMT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = Config{
	Jobs: jobsConfig{
		RetryInterval:    time.Minute,
		RetryMultiplier:  1,
		RetryMaxInterval: time.Minute,
		ScanFrequency:    time.Second,
		MaxRunning:       100,
	},
	MySQL: mysqlConfig{
		Host:                          "127.0.0.1",
		Port:                          3306,
		DB:                            "test",
		Table:                         "dalga",
		User:                          "root",
		Password:                      "",
		MaxOpenConns:                  50,
		SkipLocked:                    true,
		TransactionIsolationParamName: "transaction_isolation",
		DialTimeout:                   30 * time.Second,
		ReadTimeout:                   30 * time.Second,
		WriteTimeout:                  30 * time.Second,
	},
	Listen: listenConfig{
		Host:            "127.0.0.1",
		Port:            34006,
		ShutdownTimeout: 10 * time.Second,
		IdleTimeout:     60 * time.Second,
		ReadTimeout:     10 * time.Second,
		WriteTimeout:    10 * time.Second,
	},
	Endpoint: endpointConfig{
		BaseURL: "http://127.0.0.1:5000/",
		Timeout: 10 * time.Second,
	},
}

DefaultConfig contains sensible defaults for Dalga instance. For a simple deployment, you only need to override MySQL options.

View Source
var ErrNotExist = table.ErrNotExist

ErrNotExist is returned when requested job does not exist.

Functions

This section is empty.

Types

type Client

type Client struct {
	BaseURL string
	// contains filtered or unexported fields
}

Client is used to interact with a Dalga cluster using REST.

func NewClient

func NewClient(baseURL string, opts ...ClientOpt) *Client

NewClient creates a REST Client for a Dalga cluster.

func (*Client) Cancel

func (clnt *Client) Cancel(ctx context.Context, path, body string) error

Cancel deletes the job with path and body.

func (*Client) Disable

func (clnt *Client) Disable(ctx context.Context, path, body string) (*Job, error)

Disable stops the job with path and body from running at its scheduled times.

func (*Client) Enable

func (clnt *Client) Enable(ctx context.Context, path, body string) (*Job, error)

Enable allows the job with path and body to continue running at its scheduled times.

If the next scheduled run is still in the future, the job will execute at that point. If the scheduled run is now in the past, the behavior depends upon the value of the FixedIntervals setting:

If FixedIntervals is false, the job will run immediately.

If FixedIntervals is true, the job will reschedule to the next appropriate point in the future based on its interval setting, effectively skipping the scheduled runs that were missed while the job was disabled.

func (*Client) Get

func (clnt *Client) Get(ctx context.Context, path, body string) (*Job, error)

Get retrieves the job with path and body.

func (*Client) List added in v4.0.1

func (clnt *Client) List(ctx context.Context, path, sortBy string, reverse bool, limit int64) (jobs []Job, cursor string, err error)

func (*Client) ListContinue added in v4.0.1

func (clnt *Client) ListContinue(ctx context.Context, cursor string) (jobs []Job, nextCursor string, err error)

func (*Client) Schedule

func (clnt *Client) Schedule(ctx context.Context, path, body string, opts ...ScheduleOpt) (*Job, error)

Schedule creates a new job with path and body, and the provided options.

func (*Client) Status

func (clnt *Client) Status(ctx context.Context) (*Status, error)

Status returns general information about the Dalga cluster.

type ClientOpt

type ClientOpt func(c *Client)

ClientOpt is an option that can be provided to a Dalga client.

func WithClient

func WithClient(clnt *http.Client) ClientOpt

WithClient provides a specific HTTP client.

type Config

type Config struct {
	Jobs     jobsConfig
	MySQL    mysqlConfig
	Listen   listenConfig
	Endpoint endpointConfig
}

Config values for Dalga instance.

type Dalga

type Dalga struct {
	Jobs *jobmanager.JobManager
	// contains filtered or unexported fields
}

Dalga is a job scheduler.

func New

func New(config Config) (*Dalga, error)

New returns a new Dalga instance. Close must be called when disposing the object.

func (*Dalga) Close

func (d *Dalga) Close()

Close database connections and HTTP listener.

func (*Dalga) CreateTable

func (d *Dalga) CreateTable() error

CreateTable creates the table for storing jobs on database.

func (*Dalga) NotifyDone

func (d *Dalga) NotifyDone() chan struct{}

NotifyDone returns a channel that will be closed when Run method returns.

func (*Dalga) Run

func (d *Dalga) Run(ctx context.Context)

Run Dalga. This function is blocking.

func (*Dalga) UseClock

func (d *Dalga) UseClock(now time.Time) *clock.Clock

UseClock overrides Dalga's datetime to help test schedules, retry behavior, etc. Use the returned "clock" to manually advance time and trigger jobs as desired.

type Job

type Job = table.JobJSON

Job is the external representation of a job in Dalga.

type ScheduleOpt

type ScheduleOpt func(o *jobmanager.ScheduleOptions)

ScheduleOpt is an option that can be provided to the Schedule method.

func MustWithIntervalString

func MustWithIntervalString(s string) ScheduleOpt

MustWithIntervalString is identical to WithInterval, except that it performs a parsing step. It panics if s is not a valid ISO8601 duration.

func MustWithLocationName

func MustWithLocationName(n string) ScheduleOpt

MustWithLocationName is identical to WithLocation, except that it performs a parsing step. It panics if n is not a valid *time.Location name.

func WithFirstRun

func WithFirstRun(t time.Time) ScheduleOpt

WithFirstRun specifies the job's first scheduled execution time. It's incompatible with the WithImmediate option.

The timezone of t is used when computing the first execution's instant in time, but subsequent intervals are computed within the timezone specified by the job's location.

If neither WithFirstRun or WithImmediate are used, the job's initial run will occur after one interval has elapsed.

func WithImmediate

func WithImmediate() ScheduleOpt

WithImmediate specifies that the job should run immediately. It's incompatible with the WithFirstRun option.

If neither WithFirstRun or WithImmediate are used, the job's initial run will occur after one interval has elapsed.

func WithInterval

func WithInterval(d duration.Duration) ScheduleOpt

WithInterval specifies that a job should recur, with frequency given as an ISO8601 duration as an interval: https://en.wikipedia.org/wiki/ISO_8601#Time_intervals

This option is incompatible with the WithOneOff option.

func WithLocation

func WithLocation(l *time.Location) ScheduleOpt

WithLocation specifies what location a job's schedule should be relative to. This is solely relevant for calculating intervals using an ISO8601 duration, since "P1D" can mean 23 or 25 hours of real time if the job's location is undergoing a daylight savings shift within that period.

Note that Dalga will not double-execute a job if it's scheduled at a time that repeats itself during a daylight savings shift, since it doesn't use wall clock time.

If this option is omitted, the job will default to UTC as a location.

func WithOneOff

func WithOneOff() ScheduleOpt

WithOneOff specifies that the job should run once and then delete itself. It's incompatible with the WithInterval option.

type Status

type Status = server.Status

Status contains general information about a Dalga cluster.

Directories

Path Synopsis
cmd
internal
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL