dcron

package module
v0.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: MIT Imports: 13 Imported by: 8

README

Dcron

Language codecov Tests Go Report Card

简体中文 | English

a lightweight distributed job scheduler library based on redis or etcd

Theory

Use redis or etcd to sync the services list and the state of services. Use consistent-hash to select the node which can execute the task.

Why not use distributed-lock?

If use distributed-lock to implement it. I will depends on the system-time of each node. There are some problems when the system-time is not synchronous:

  1. If the task executing time is shorter than the system time, the task will be excuted again. (some node unlock after execution, but the lock will be locked by the other node which reach the execution time)

  2. Whatever there is only a little miss in system time, the most fast node will catch the lock in the first time. It will cause a thing that all the task will be executed only by the most fast node.

Features
  • Robustness: The node assignment of tasks does not depend on the system time, so the system time error between nodes can also ensure uniform distribution and single node execution.
  • Load Balance: Distribute tasks evenly according to task and node.
  • Seamless capacity extention :If the load of the task node is too large, some tasks will be automatically migrated to the new service after the new server is started directly to achieve seamless capacity extention.
  • Failover: If a single node fails, the task will be automatically transferred to other normal nodes after 10 seconds.
  • Unique: The same task in the same service will only start a single running instance, and will not be executed repeatedly.
  • Customized storage: add the node storage mode by implementing the driver interface.
  • Automatic recovery: if the process restart, the jobs which have been store will be recovered into memory.
Get Started
  1. Create redisDriver instance, set the ServiceName and initialize dcron. The ServiceName will defined the same task unit.
redisCli := redis.NewClient(&redis.Options{
  Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)
  1. Use cron-language to add task, you should set the TaskName, the TaskName is the primary-key of each task.
dcron.AddFunc("test1","*/3 * * * *",func(){
  fmt.Println("execute test1 task",time.Now().Format("15:04:05"))
})
  1. Begin the task
// you can use Start() or Run() to start the dcron.
// unblocking start.
dcron.Start()

// blocking start.
dcron.Run()
Example
More configurations.

Dcron is based on https://github.com/robfig/cron, use NewDcron to initialize Dcron, the arg after the second argv will be passed to cron

For example, if you want to set the cron eval in second-level, you can use like that:

dcron := NewDcron("server1", drv,cron.WithSeconds())

Otherwise, you can sue NewDcronWithOption to initialize, to set the logger or others. Optional configuration can be referred to: https://github.com/libi/dcron/blob/master/option.go

ServiceName

The ServiceName is used to define the same set of tasks, which can be understood as the boundary of task allocation and scheduling.

Multiple nodes using the same service name will be considered as the same task group. Tasks in the same task group will be evenly distributed to each node in the group and will not be executed repeatedly.

Star history

Star History Chart

Documentation

Index

Constants

View Source
const (
	NodePoolStateSteady  = "NodePoolStateSteady"
	NodePoolStateUpgrade = "NodePoolStateUpgrade"
)

Variables

View Source
var (
	ErrJobExist     = errors.New("jobName already exist")
	ErrJobNotExist  = errors.New("jobName not exist")
	ErrJobWrongNode = errors.New("job is not running in this node")
)
View Source
var (
	ErrNodePoolIsUpgrading = errors.New("nodePool is upgrading")
)

Functions

This section is empty.

Types

type Dcron

type Dcron struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Dcron is main struct

func NewDcron

func NewDcron(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Dcron

NewDcron create a Dcron

func NewDcronWithOption

func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...Option) *Dcron

NewDcronWithOption create a Dcron with Dcron Option

func (*Dcron) AddFunc

func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Dcron) AddJob

func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Dcron) GetJob added in v0.5.5

func (d *Dcron) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error)

Get job by jobName if this jobName not exist, will return error.

if `thisNodeOnly` is true
	if this job is not available in this node, will return error.
otherwise return the struct of JobWarpper whose name is jobName.

func (*Dcron) GetJobs added in v0.5.5

func (d *Dcron) GetJobs(thisNodeOnly bool) []*JobWarpper

Get job list.

if `thisNodeOnly` is true
	return all jobs available in this node.
otherwise return all jobs added to dcron.

we never return nil. If there is no job. this func will return an empty slice.

func (*Dcron) GetLogger

func (d *Dcron) GetLogger() dlog.Logger

GetLogger get dcron logger

func (*Dcron) NodeID added in v0.5.1

func (d *Dcron) NodeID() string

func (*Dcron) Remove

func (d *Dcron) Remove(jobName string)

Remove Job by jobName

func (*Dcron) Run

func (d *Dcron) Run()

Run Job

func (*Dcron) SetLogger

func (d *Dcron) SetLogger(logger dlog.Logger)

SetLogger set dcron logger

func (*Dcron) Start

func (d *Dcron) Start()

Start job

func (*Dcron) Stop

func (d *Dcron) Stop()

Stop job

type INodePool added in v0.5.0

type INodePool interface {
	Start(ctx context.Context) error
	CheckJobAvailable(jobName string) (bool, error)
	Stop(ctx context.Context) error

	GetNodeID() string
	GetLastNodesUpdateTime() time.Time
}

func NewNodePool added in v0.5.0

func NewNodePool(
	serviceName string,
	drv driver.DriverV2,
	updateDuration time.Duration,
	hashReplicas int,
	logger dlog.Logger,
) INodePool

type IRecentJobPacker added in v0.5.1

type IRecentJobPacker interface {
	// goroutine safety.
	// Add a job to packer
	// will save recent jobs (like 2 * heartbeat duration)
	AddJob(jobName string, t time.Time) error

	// goroutine safety.
	// Pop out all jobs in packer
	PopAllJobs() (jobNames []string)
}

IRecentJobPacker this is an interface which be used to pack the jobs running in the cluster state is `unstable`. like some nodes broken or new nodes were add.

func NewRecentJobPacker added in v0.5.1

func NewRecentJobPacker(timeWindow time.Duration) IRecentJobPacker

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Dcron   *Dcron
	Name    string
	CronStr string
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Execute added in v0.5.1

func (job JobWarpper) Execute()

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type JobWithTime added in v0.5.1

type JobWithTime struct {
	JobName     string
	RunningTime time.Time
}

type JobWithTimeHeap added in v0.5.1

type JobWithTimeHeap []JobWithTime

func (*JobWithTimeHeap) Index added in v0.5.1

func (jobHeap *JobWithTimeHeap) Index(i int) interface{}

func (*JobWithTimeHeap) Len added in v0.5.1

func (jobHeap *JobWithTimeHeap) Len() int

func (*JobWithTimeHeap) Less added in v0.5.1

func (jobHeap *JobWithTimeHeap) Less(i, j int) bool

func (*JobWithTimeHeap) Pop added in v0.5.1

func (jobHeap *JobWithTimeHeap) Pop() (ret interface{})

func (*JobWithTimeHeap) Push added in v0.5.1

func (jobHeap *JobWithTimeHeap) Push(x interface{})

func (*JobWithTimeHeap) Swap added in v0.5.1

func (jobHeap *JobWithTimeHeap) Swap(i, j int)

type NodePool

type NodePool struct {
	// contains filtered or unexported fields
}

NodePool For cluster steable. NodePool has 2 states:

  1. Steady If this nodePoolLists is the same as the last update, we will mark this node's state to Steady. In this state, this node can run jobs.
  2. Upgrade If this nodePoolLists is different to the last update, we will mark this node's state to Upgrade. In this state, this node can not run jobs.

func (*NodePool) CheckJobAvailable added in v0.5.0

func (np *NodePool) CheckJobAvailable(jobName string) (bool, error)

Check if this job can be run in this node.

func (*NodePool) GetLastNodesUpdateTime added in v0.5.1

func (np *NodePool) GetLastNodesUpdateTime() time.Time

func (*NodePool) GetNodeID added in v0.5.0

func (np *NodePool) GetNodeID() string

func (*NodePool) Start added in v0.5.0

func (np *NodePool) Start(ctx context.Context) (err error)

func (*NodePool) Stop added in v0.5.0

func (np *NodePool) Stop(ctx context.Context) error

type Option

type Option func(*Dcron)

Option is Dcron Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func RunningLocally added in v0.5.5

func RunningLocally() Option

func WithClusterStable added in v0.5.1

func WithClusterStable(timeWindow time.Duration) Option

You can use this option to start the recent jobs rerun after the cluster upgrading.

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithLogger

func WithLogger(logger dlog.Logger) Option

WithLogger both set dcron and cron logger.

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithRecoverFunc added in v0.5.0

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

You can defined yourself recover function to make the job will be added to your dcron when the process restart

type RecentJobPacker added in v0.5.1

type RecentJobPacker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*RecentJobPacker) AddJob added in v0.5.1

func (rjp *RecentJobPacker) AddJob(jobName string, t time.Time) (err error)

func (*RecentJobPacker) PopAllJobs added in v0.5.1

func (rjp *RecentJobPacker) PopAllJobs() (jobNames []string)

type RecoverFuncType added in v0.5.0

type RecoverFuncType func(d *Dcron)

type StableJob added in v0.5.0

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

This type of Job will be recovered in a node of service restarting.

Directories

Path Synopsis
Package cron implements a cron spec parser and job runner.
Package cron implements a cron spec parser and job runner.
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL