dcron

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MIT Imports: 11 Imported by: 0

README

Dcron

Language Tests Go Report Card

简体中文 | English

a lightweight distributed job scheduler library based on redis or etcd

Theory

Use redis or etcd to sync the services list and the state of services. Use consistent-hash to select the node which can execute the task.

Why not use distributed-lock?

If use distributed-lock to implement it. I will depends on the system-time of each node. There are some problems when the system-time is not synchronous:

  1. If the task executing time is shorter than the system time, the task will be excuted again. (some node unlock after execution, but the lock will be locked by the other node which reach the execution time)

  2. Whatever there is only a little miss in system time, the most fast node will catch the lock in the first time. It will cause a thing that all the task will be executed only by the most fast node.

Features
  • Robustness: The node assignment of tasks does not depend on the system time, so the system time error between nodes can also ensure uniform distribution and single node execution.
  • Load Balance: Distribute tasks evenly according to task and node.
  • Seamless capacity extention :If the load of the task node is too large, some tasks will be automatically migrated to the new service after the new server is started directly to achieve seamless capacity extention.
  • Failover: If a single node fails, the task will be automatically transferred to other normal nodes after 10 seconds.
  • Unique: The same task in the same service will only start a single running instance, and will not be executed repeatedly.
  • Customized storage: add the node storage mode by implementing the driver interface.
  • Automatic recovery: if the process restart, the jobs which have been store will be recovered into memory.
Get Started
  1. Create redisDriver instance, set the ServiceName and initialize dcron. The ServiceName will defined the same task unit.
redisCli := redis.NewClient(&redis.Options{
  Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)
  1. Use cron-language to add task, you should set the TaskName, the TaskName is the primary-key of each task.
dcron.AddFunc("test1","*/3 * * * *",func(){
  fmt.Println("execute test1 task",time.Now().Format("15:04:05"))
})
  1. Begin the task
// you can use Start() or Run() to start the dcron.
// unblocking start.
dcron.Start()

// blocking start.
dcron.Run()
Example

examples

More configurations.

Dcron is based on https://github.com/robfig/cron, use NewDcron to initialize Dcron, the arg after the second argv will be passed to cron

For example, if you want to set the cron eval in second-level, you can use like that:

dcron := NewDcron("server1", drv,cron.WithSeconds())

Otherwise, you can sue NewDcronWithOption to initialize, to set the logger or others. Optional configuration can be referred to: https://github.com/BugKillerPro/dcron/blob/master/option.go

ServiceName

The ServiceName is used to define the same set of tasks, which can be understood as the boundary of task allocation and scheduling.

Multiple nodes using the same service name will be considered as the same task group. Tasks in the same task group will be evenly distributed to each node in the group and will not be executed repeatedly.

Star history

Star History Chart

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dcron

type Dcron struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Dcron is main struct

func NewDcron

func NewDcron(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Dcron

NewDcron create a Dcron

func NewDcronWithOption

func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...Option) *Dcron

NewDcronWithOption create a Dcron with Dcron Option

func (*Dcron) AddFunc

func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Dcron) AddJob

func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Dcron) GetLogger

func (d *Dcron) GetLogger() dlog.Logger

GetLogger get dcron logger

func (*Dcron) Remove

func (d *Dcron) Remove(jobName string)

Remove Job

func (*Dcron) Run

func (d *Dcron) Run()

Run Job

func (*Dcron) SetLogger

func (d *Dcron) SetLogger(logger dlog.Logger)

SetLogger set dcron logger

func (*Dcron) Start

func (d *Dcron) Start()

Start job

func (*Dcron) Stop

func (d *Dcron) Stop()

Stop job

type INodePool

type INodePool interface {
	Start(ctx context.Context) error
	CheckJobAvailable(jobName string) bool
	Stop(ctx context.Context) error

	GetNodeID() string
}

func NewNodePool

func NewNodePool(serviceName string, drv driver.DriverV2, updateDuration time.Duration, hashReplicas int, logger dlog.Logger) INodePool

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Dcron   *Dcron
	Name    string
	CronStr string
	Func    func()
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type NodePool

type NodePool struct {
	// contains filtered or unexported fields
}

NodePool is a node pool

func (*NodePool) CheckJobAvailable

func (np *NodePool) CheckJobAvailable(jobName string) bool

Check if this job can be run in this node.

func (*NodePool) GetNodeID

func (np *NodePool) GetNodeID() string

func (*NodePool) Start

func (np *NodePool) Start(ctx context.Context) (err error)

func (*NodePool) Stop

func (np *NodePool) Stop(ctx context.Context) error

type Option

type Option func(*Dcron)

Option is Dcron Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithLogger

func WithLogger(logger dlog.Logger) Option

WithLogger both set dcron and cron logger.

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithPrintLogInfo

func WithPrintLogInfo() Option

PrintLogInfo set log info level

func WithRecoverFunc

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

You can defined yourself recover function to make the job will be added to your dcron when the process restart

type RecoverFuncType

type RecoverFuncType func(d *Dcron)

type StableJob

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

This type of Job will be recovered in a node of service restarting.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL