dcron

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2023 License: MIT Imports: 11 Imported by: 0

README

Dcron

Language Tests Go Report Card

简体中文 | English

轻量分布式定时任务库

原理

使用 redis/etcd 同步服务节点列表及存活状态,在节点列表内使用一致性hash,选举可执行任务的节点。

为什么不直接用分布式锁实现?

通过各个节点在定时任务内抢锁方式实现,需要依赖各个节点系统时间完全一致,当系统时间有误差时可能会导致以下问题:

  1. 如果任务的执行时间小于系统时间差,任务仍然会被重复执行(某个节点定时执行完毕释放锁,又被另一个因为系统时间之后到达定时时间的节点取得锁)。
  2. 即使有极小的误差,因为某个节点的时间会比其他节点靠前,在抢锁时能第一时间取得锁,所以导致的结果是所有任务都只会被该节点执行,无法均匀分布到多节点。
特性
  • 鲁棒性: 任务的节点分配不依赖系统时间,所以各个节点间系统时间有误差也可以确保均匀分布及单节点执行。
  • 负载均衡:根据任务数据和节点数据均衡分发任务。
  • 无缝扩容:如果任务节点负载过大,直接启动新的服务器后部分任务会自动迁移至新服务实现无缝扩容。
  • 故障转移:单个节点故障,10s后会自动将任务自动转移至其他正常节点。
  • 任务唯一:同一个服务内同一个任务只会启动单个运行实例,不会重复执行。
  • 自定义存储:通过实现driver接口来增加节点数据存储方式。
  • 自动恢复:如果进程重启,则被持久化过的任务会被自动加载
  • 支持Namespace:节点通过namespace分组
快速开始

1.安装依赖

go get github.com/Casper-Mars/dcron

2.创建redisDriver实例,指定服务名并初始化dcron。服务名为执行相同任务的单元。

  drv, _ := redis.NewDriver(&redis.Options{
    Host: "127.0.0.1:6379"
  })
  dcron := NewDcron("server1", drv)

3.使用cron语法添加任务,需要指定任务名。任务名作为任务的唯一标识,必须保证唯一。

    dcron.AddFunc("test1","*/3 * * * *",func(){
		fmt.Println("执行 test1 任务",time.Now().Format("15:04:05"))
	})

4.开始任务。

// 启动任务可使用 Start() 或者 Run()
// 使用协程异步启动任务
dcron.Start()

// 使用当前协程同步启动任务,会阻塞当前协程后续逻辑执行
dcron.Run()
使用案例

examples

更多配置

Dcron 项目基于 https://github.com/robfig/cron , 使用 NewDcron 初始化 Dcron 时的第三个参数之后都会原样配置到 cron 。

例如需要配置秒级的 cron 表达式,可以使用

dcron := NewDcron("server1", drv,cron.WithSeconds())

另外还可以通过 NewDcronWithOption 方法初始化,可以配置日志输出等。 可选配置可以参考:https://github.com/libi/dcron/blob/master/option.go

服务名/serviceName

服务名是为了定义相同一组任务,可以理解为任务分配和调度的边界。

多个节点使用同一个服务名会被视为同一任务组,在同一个任务组内的任务会均匀分配至组内各个节点并确保不会重复执行

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dcron

type Dcron struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Dcron is main struct

func NewDcron

func NewDcron(serverName string, driver driver.Driver, cronOpts ...cron.Option) *Dcron

NewDcron create a Dcron

func NewDcronWithOption

func NewDcronWithOption(serverName string, driver driver.Driver, dcronOpts ...Option) *Dcron

NewDcronWithOption create a Dcron with Dcron Option

func (*Dcron) AddFunc

func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Dcron) AddJob

func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Dcron) GetLogger

func (d *Dcron) GetLogger() dlog.Logger

GetLogger get dcron logger

func (*Dcron) Remove

func (d *Dcron) Remove(jobName string)

Remove Job

func (*Dcron) Run

func (d *Dcron) Run()

Run Job

func (*Dcron) SetLogger

func (d *Dcron) SetLogger(logger dlog.Logger)

SetLogger set dcron logger

func (*Dcron) Start

func (d *Dcron) Start()

Start job

func (*Dcron) Stop

func (d *Dcron) Stop()

Stop job

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Dcron   *Dcron
	Name    string
	CronStr string
	Func    func()
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type NodePool

type NodePool struct {
	Driver driver.Driver
	// contains filtered or unexported fields
}

NodePool is a node pool

func (*NodePool) PickNodeByJobName

func (np *NodePool) PickNodeByJobName(jobName string) string

PickNodeByJobName : 使用一致性hash算法根据任务名获取一个执行节点

func (*NodePool) StartPool

func (np *NodePool) StartPool() error

StartPool Start Service Watch Pool

type Option

type Option func(*Dcron)

Option is Dcron Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithLogger

func WithLogger(logger dlog.Logger) Option

WithLogger both set dcron and cron logger.

func WithNamespace

func WithNamespace(namespace string) Option

func WithNodeFilter

func WithNodeFilter(filter node.Filter) Option

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithPrintLogInfo

func WithPrintLogInfo() Option

PrintLogInfo set log info level

func WithRecoverFunc

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

You can defined yourself recover function to make the job will be added to your dcron when the process restart

type RecoverFuncType

type RecoverFuncType func(d *Dcron)

type StableJob

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

This type of Job will be recovered in a node of service restarting.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL