dcron

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2023 License: MIT Imports: 13 Imported by: 0

README

dcron

Go Reference Actions Codecov Go Report Card GitHub go.mod Go version GitHub tag (latest by date)

A distributed cron framework.

Install

go get github.com/gochore/dcron

Quick Start

First, implement a distributed atomic operation that only requires support for one method: SetIfNotExists. You can implement it in any way you prefer, such as using Redis SetNX.

import "github.com/redis/go-redis/v9"

type RedisAtomic struct {
	client *redis.Client
}

func (m *RedisAtomic) SetIfNotExists(ctx context.Context, key, value string) bool {
	ret := m.client.SetNX(ctx, key, value, time.Hour)
	return ret.Err() == nil && ret.Val()
}

Now you can create a cron with that:

func main() {
	atomic := &RedisAtomic{
		client: redis.NewClient(&redis.Options{
			Addr: "localhost:6379",
		}),
	}
	cron := dcron.NewCron(dcron.WithKey("TestCron"), dcron.WithAtomic(atomic))
}

Then, create a job and add it to the cron.

	job1 := dcron.NewJob("Job1", "*/15 * * * * *", func(ctx context.Context) error {
		if task, ok := dcron.TaskFromContext(ctx); ok {
			log.Println("run:", task.Job.Spec(), task.Key)
		}
		// do something
		return nil
	})
	if err := cron.AddJobs(job1); err != nil {
		log.Fatal(err)
	}

Finally, start the cron:

	cron.Start()
	log.Println("cron started")
	time.Sleep(time.Minute)
	<-cron.Stop().Done()

If you start the program multiple times, you will notice that the cron will run the job once every 15 seconds on only one of the processes.

process 1 process 2 process 3
2023/10/13 11:39:45 cron started 2023/10/13 11:39:47 cron started 2023/10/13 11:39:48 cron started
2023/10/13 11:40:00 run: */15 * * * * * dcron:TestCron.Job1@1697168400
2023/10/13 11:40:15 run: */15 * * * * * dcron:TestCron.Job1@1697168415
2023/10/13 11:40:30 run: */15 * * * * * dcron:TestCron.Job1@1697168430
2023/10/13 11:40:45 run: */15 * * * * * dcron:TestCron.Job1@1697168445

One more thing, since dcron.WithAtomic(atomic) is optional, it's also a good idea to use it as a local cron.

	cron := dcron.NewCron()
	job2 := dcron.NewJob("A local job", "*/15 * * * * *", func(ctx context.Context) error {
		// do something
		return nil
	})
	if err := cron.AddJobs(job2); err != nil {
		log.Fatal(err)
	}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AfterFunc

type AfterFunc func(task Task)

AfterFunc represents the function could be called after Run.

type Atomic added in v0.3.0

type Atomic interface {
	// SetIfNotExists stores the key/value and return true if the key is not existed,
	// or does nothing and return false.
	// Note that the key/value should be kept for at least one minute.
	// For example, `SetNX(key, value, time.Minute)` via redis.
	SetIfNotExists(ctx context.Context, key, value string) bool
}

Atomic provides distributed atomic operation for dcron, it can be implemented easily via Redis/SQL and so on.

type BeforeFunc

type BeforeFunc func(task Task) (skip bool)

BeforeFunc represents the function could be called before Run.

type Cron

type Cron struct {
	// contains filtered or unexported fields
}

Cron keeps track of any number of jobs, invoking the associated func as specified.

func NewCron

func NewCron(options ...CronOption) *Cron

NewCron returns a cron with specified options.

func (*Cron) AddJobs added in v0.2.0

func (c *Cron) AddJobs(jobs ...Job) error

AddJobs helps to add multiple jobs.

func (*Cron) Hostname

func (c *Cron) Hostname() string

Hostname implements CronMeta.Hostname

func (*Cron) Jobs added in v0.5.0

func (c *Cron) Jobs() []JobMeta

Jobs implements CronMeta.Jobs

func (*Cron) Key

func (c *Cron) Key() string

Key implements CronMeta.Key

func (*Cron) Run

func (c *Cron) Run()

Run the cron scheduler, or no-op if already running.

func (*Cron) Start

func (c *Cron) Start()

Start the cron scheduler in its own goroutine, or no-op if already started.

func (*Cron) Statistics added in v0.5.0

func (c *Cron) Statistics() Statistics

Statistics implements CronMeta.Statistics

func (*Cron) Stop

func (c *Cron) Stop() context.Context

Stop stops the cron scheduler if it is running; otherwise it does nothing. A context is returned so the caller can wait for running jobs to complete.

type CronMeta

type CronMeta interface {
	// Key returns the unique key of the cron.
	Key() string
	// Hostname returns current hostname.
	Hostname() string
	// Statistics returns statistics info of the cron's all jobs.
	Statistics() Statistics
	// Jobs returns the cron's all jobs as JobMeta.
	Jobs() []JobMeta
}

CronMeta is a read only wrapper for Cron.

type CronOption

type CronOption func(c *Cron)

CronOption represents a modification to the default behavior of a Cron.

func WithAtomic added in v0.3.0

func WithAtomic(atomic Atomic) CronOption

WithAtomic uses the provided Atomic.

func WithContext added in v1.4.0

func WithContext(ctx context.Context) CronOption

WithContext sets the root context of the cron instance. It will be used as the parent context of all tasks, and when the context is done, the cron will be stopped.

func WithHostname

func WithHostname(hostname string) CronOption

WithHostname overrides the hostname of the cron instance.

func WithKey

func WithKey(key string) CronOption

WithKey overrides the key of the cron.

func WithLocation added in v0.5.0

func WithLocation(loc *time.Location) CronOption

WithLocation overrides the timezone of the cron instance.

type Group added in v1.2.0

type Group interface {
	// contains filtered or unexported methods
}

func NewGroup added in v1.2.0

func NewGroup(limit int) Group

type Job

type Job interface {
	// Key returns the unique key of the job.
	Key() string
	// Spec returns spec of the job, like "* * * * * *".
	Spec() string
	// Run is what the job do.
	Run(ctx context.Context) error
	// Options returns options of the job.
	Options() []JobOption
}

Job describes a type which could be added to a cron.

func NewJob

func NewJob(key, spec string, run RunFunc, options ...JobOption) Job

NewJob returns a new Job with specified options.

func NewJobWithAutoKey added in v0.4.0

func NewJobWithAutoKey(spec string, run RunFunc, options ...JobOption) Job

NewJobWithAutoKey returns a new Job with the "run" function's name as key. Be careful, the "run" should be a non-anonymous function, or returned Job will have an emtpy key, and can not be added to a Cron.

type JobMeta

type JobMeta interface {
	// Key returns the unique key of the job.
	Key() string
	// Spec returns the spec of the job.
	Spec() string
	// Statistics returns statistics info of the job.
	Statistics() Statistics
}

JobMeta is a read only wrapper for innerJob.

type JobOption

type JobOption func(job *innerJob)

JobOption represents a modification to the default behavior of a Job.

func WithAfterFunc

func WithAfterFunc(after AfterFunc) JobOption

WithAfterFunc specifies what to do after Run.

func WithBeforeFunc

func WithBeforeFunc(before BeforeFunc) JobOption

WithBeforeFunc specifies what to do before Run.

func WithGroup added in v1.2.0

func WithGroup(group Group) JobOption

WithGroup adds the current job to the group.

func WithNoMutex added in v0.2.1

func WithNoMutex() JobOption

WithNoMutex means the job will run at multiple cron instances, even though the cron has Atomic.

func WithRetryInterval

func WithRetryInterval(retryInterval RetryInterval) JobOption

WithRetryInterval indicates how long should delay before retrying when run failed `triedTimes` times.

func WithRetryTimes

func WithRetryTimes(retryTimes int) JobOption

WithRetryTimes specifies max times to retry, retryTimes will be set as 1 if it is less than 1.

type RetryInterval

type RetryInterval func(triedTimes int) time.Duration

RetryInterval indicates how long should delay before retrying when run failed `triedTimes` times.

type RunFunc

type RunFunc func(ctx context.Context) error

RunFunc represents the function could be called by a cron.

type Statistics added in v0.5.0

type Statistics struct {
	TotalTask   int64 // Total count of tasks processed
	PassedTask  int64 // Number of tasks successfully executed
	FailedTask  int64 // Number of tasks that failed during execution due to errors
	SkippedTask int64 // Number of tasks skipped due to BeforeFunc returning true
	MissedTask  int64 // Number of tasks executed by other instances

	TotalRun   int64 // Total count of execution runs
	PassedRun  int64 // Number of successfully executed runs
	FailedRun  int64 // Number of runs that have failed due to errors
	RetriedRun int64 // Number of runs that encountered errors and were subsequently retried
}

Statistics records statistics info for a cron or a job.

func (Statistics) Add added in v1.0.0

func (s Statistics) Add(delta Statistics) Statistics

Add return a new Statistics with two added.

type Task

type Task struct {
	Key        string
	Cron       CronMeta
	Job        JobMeta
	PlanAt     time.Time
	BeginAt    *time.Time
	EndAt      *time.Time
	Return     error
	Skipped    bool
	Missed     bool
	TriedTimes int
}

Task is an execute of a job.

func TaskFromContext added in v0.2.0

func TaskFromContext(ctx context.Context) (Task, bool)

TaskFromContext extracts a Task from a context, it is useful inner the Run function.

Directories

Path Synopsis
Code generated by MockGen.
Code generated by MockGen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL