dcron

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 13 Imported by: 0

README

Dcron

Language codecov Tests Go Report Card

简体中文 | English

a lightweight distributed job scheduler library based on redis or etcd

Declarations

Why fork this repo? Because we use etcd v2 in production environment. This repository contains v3's etcd, which is incompatible, so I removed support for etcd and keep the repo simple.

As we all known, etcd v2 client module is no longer maintained, there are lots of annoying issues, eg:

module google.golang.org/grpc@latest found (v1.61.0), but does not contain package google.golang.org/grpc/naming  

github.com/coreos/etcd/pkg/srv: module github.com/coreos/etcd@latest found (v2.3.8+incompatible), but does not contain package github.com/coreos/etcd/pkg/srv

... but does not contain package google.golang.org/grpc/credentials/insecure

When some repos use google.golang.org/grpc lower version (like v1.26.0, it contains grpc/naming package) but another repo need higher version(v1.61.0), so it's hard to put it together. Meanwhile you need to replace bbolt, grpc and get things done. Finally I remove the etcd part away from dron and adaptive for our code (it's hard to migrate etcd v2 to v3 in production environment).

By the way, etcd package in go mod is shit and hinder the go community development. The author didn' t care about package and refuse to embrace go mod. Disaster.

Theory

Use redis or etcd to sync the services list and the state of services. Use consistent-hash to select the node which can execute the task.

Why not use distributed-lock?

If use distributed-lock to implement it. I will depends on the system-time of each node. There are some problems when the system-time is not synchronous:

  1. If the task executing time is shorter than the system time, the task will be excuted again. (some node unlock after execution, but the lock will be locked by the other node which reach the execution time)

  2. Whatever there is only a little miss in system time, the most fast node will catch the lock in the first time. It will cause a thing that all the task will be executed only by the most fast node.

Features
  • Robustness: The node assignment of tasks does not depend on the system time, so the system time error between nodes can also ensure uniform distribution and single node execution.
  • Load Balance: Distribute tasks evenly according to task and node.
  • Seamless capacity extention :If the load of the task node is too large, some tasks will be automatically migrated to the new service after the new server is started directly to achieve seamless capacity extention.
  • Failover: If a single node fails, the task will be automatically transferred to other normal nodes after 10 seconds.
  • Unique: The same task in the same service will only start a single running instance, and will not be executed repeatedly.
  • Customized storage: add the node storage mode by implementing the driver interface.
  • Automatic recovery: if the process restart, the jobs which have been store will be recovered into memory.
Get Started
  1. Create redisDriver instance, set the ServiceName and initialize dcron. The ServiceName will defined the same task unit.
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)
  1. Use cron-language to add task, you should set the TaskName, the TaskName is the primary-key of each task.
dcron.AddFunc("test1", "*/3 * * * *", func (){
fmt.Println("execute test1 task", time.Now().Format("15:04:05"))
})
  1. Begin the task
// you can use Start() or Run() to start the dcron.
// unblocking start.
dcron.Start()

// blocking start.
dcron.Run()
Example
More configurations.

Dcron is based on https://github.com/robfig/cron, use NewDcron to initialize Dcron, the arg after the second argv will be passed to cron

For example, if you want to set the cron eval in second-level, you can use like that:

dcron := NewDcron("server1", drv, cron.WithSeconds())

Otherwise, you can sue NewDcronWithOption to initialize, to set the logger or others. Optional configuration can be referred to: https://github.com/libi/dcron/blob/master/option.go

ServiceName

The ServiceName is used to define the same set of tasks, which can be understood as the boundary of task allocation and scheduling.

Multiple nodes using the same service name will be considered as the same task group. Tasks in the same task group will be evenly distributed to each node in the group and will not be executed repeatedly.

Star history

Star History Chart

Documentation

Index

Constants

View Source
const (
	NodePoolStateSteady  = "NodePoolStateSteady"
	NodePoolStateUpgrade = "NodePoolStateUpgrade"
)

Variables

View Source
var (
	ErrNodePoolIsUpgrading = errors.New("nodePool is upgrading")
)

Functions

This section is empty.

Types

type Dcron

type Dcron struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Dcron is main struct

func NewDcron

func NewDcron(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Dcron

NewDcron create a Dcron

func NewDcronWithOption

func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...Option) *Dcron

NewDcronWithOption create a Dcron with Dcron Option

func (*Dcron) AddFunc

func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Dcron) AddJob

func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Dcron) GetLogger

func (d *Dcron) GetLogger() dlog.Logger

GetLogger get dcron logger

func (*Dcron) NodeID

func (d *Dcron) NodeID() string

func (*Dcron) Remove

func (d *Dcron) Remove(jobName string)

Remove Job

func (*Dcron) Run

func (d *Dcron) Run()

Run Job

func (*Dcron) SetLogger

func (d *Dcron) SetLogger(logger dlog.Logger)

SetLogger set dcron logger

func (*Dcron) Start

func (d *Dcron) Start()

Start job

func (*Dcron) Stop

func (d *Dcron) Stop()

Stop job

type INodePool

type INodePool interface {
	Start(ctx context.Context) error
	CheckJobAvailable(jobName string) (bool, error)
	Stop(ctx context.Context) error

	GetNodeID() string
	GetLastNodesUpdateTime() time.Time
}

func NewNodePool

func NewNodePool(
	serviceName string,
	drv driver.DriverV2,
	updateDuration time.Duration,
	hashReplicas int,
	logger dlog.Logger,
) INodePool

type IRecentJobPacker

type IRecentJobPacker interface {
	// goroutine safety.
	// Add a job to packer
	// will save recent jobs (like 2 * heartbeat duration)
	AddJob(jobName string, t time.Time) error

	// goroutine safety.
	// Pop out all jobs in packer
	PopAllJobs() (jobNames []string)
}

IRecentJobPacker this is an interface which be used to pack the jobs running in the cluster state is `unstable`. like some nodes broken or new nodes were add.

func NewRecentJobPacker

func NewRecentJobPacker(timeWindow time.Duration) IRecentJobPacker

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Dcron   *Dcron
	Name    string
	CronStr string
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Execute

func (job JobWarpper) Execute()

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type JobWithTime

type JobWithTime struct {
	JobName     string
	RunningTime time.Time
}

type JobWithTimeHeap

type JobWithTimeHeap []JobWithTime

func (*JobWithTimeHeap) Index

func (jobHeap *JobWithTimeHeap) Index(i int) interface{}

func (*JobWithTimeHeap) Len

func (jobHeap *JobWithTimeHeap) Len() int

func (*JobWithTimeHeap) Less

func (jobHeap *JobWithTimeHeap) Less(i, j int) bool

func (*JobWithTimeHeap) Pop

func (jobHeap *JobWithTimeHeap) Pop() (ret interface{})

func (*JobWithTimeHeap) Push

func (jobHeap *JobWithTimeHeap) Push(x interface{})

func (*JobWithTimeHeap) Swap

func (jobHeap *JobWithTimeHeap) Swap(i, j int)

type NodePool

type NodePool struct {
	// contains filtered or unexported fields
}

NodePool For cluster steable. NodePool has 2 states:

  1. Steady If this nodePoolLists is the same as the last update, we will mark this node's state to Steady. In this state, this node can run jobs.
  2. Upgrade If this nodePoolLists is different to the last update, we will mark this node's state to Upgrade. In this state, this node can not run jobs.

func (*NodePool) CheckJobAvailable

func (np *NodePool) CheckJobAvailable(jobName string) (bool, error)

Check if this job can be run in this node.

func (*NodePool) GetLastNodesUpdateTime

func (np *NodePool) GetLastNodesUpdateTime() time.Time

func (*NodePool) GetNodeID

func (np *NodePool) GetNodeID() string

func (*NodePool) Start

func (np *NodePool) Start(ctx context.Context) (err error)

func (*NodePool) Stop

func (np *NodePool) Stop(ctx context.Context) error

type Option

type Option func(*Dcron)

Option is Dcron Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func WithClusterStable

func WithClusterStable(timeWindow time.Duration) Option

You can use this option to start the recent jobs rerun after the cluster upgrading.

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithLogger

func WithLogger(logger dlog.Logger) Option

WithLogger both set dcron and cron logger.

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithRecoverFunc

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

You can defined yourself recover function to make the job will be added to your dcron when the process restart

type RecentJobPacker

type RecentJobPacker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*RecentJobPacker) AddJob

func (rjp *RecentJobPacker) AddJob(jobName string, t time.Time) (err error)

func (*RecentJobPacker) PopAllJobs

func (rjp *RecentJobPacker) PopAllJobs() (jobNames []string)

type RecoverFuncType

type RecoverFuncType func(d *Dcron)

type StableJob

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

This type of Job will be recovered in a node of service restarting.

Directories

Path Synopsis
Package cron implements a cron spec parser and job runner.
Package cron implements a cron spec parser and job runner.
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL