dcron

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2023 License: MIT Imports: 13 Imported by: 0

README

Dcron

a lightweight distributed job scheduler library based on redis

轻量分布式定时任务库

原理

使用 redis 同步服务节点列表及存活状态,在节点列表内使用一致性hash,选举可执行任务的节点。

为什么不直接用分布式锁实现?

通过各个节点在定时任务内抢锁方式实现,需要依赖各个节点系统时间完全一致,当系统时间有误差时可能会导致以下问题:

  1. 如果任务的执行时间小于系统时间差,任务仍然会被重复执行(某个节点定时执行完毕释放锁,又被另一个因为系统时间之后到达定时时间的节点取得锁)。
  2. 即使有极小的误差,因为某个节点的时间会比其他节点靠前,在抢锁时能第一时间取得锁,所以导致的结果是所有任务都只会被该节点执行,无法均匀分布到多节点。
特性
  • 鲁棒性: 任务的节点分配不依赖系统时间,所以各个节点间系统时间有误差也可以确保均匀分布及单节点执行。
  • 负载均衡:根据任务数据和节点数据均衡分发任务。
  • 无缝扩容:如果任务节点负载过大,直接启动新的服务器后部分任务会自动迁移至新服务实现无缝扩容。
  • 故障转移:单个节点故障,10s后会自动将任务自动转移至其他正常节点。
  • 任务唯一:同一个服务内同一个任务只会启动单个运行实例,不会重复执行。
  • 自定义存储:通过实现driver接口来增加节点数据存储方式。
  • 自动恢复:如果进程重启,则被持久化过的任务会被自动加载
快速开始

1.创建redisDriver实例,指定服务名并初始化dcron。服务名为执行相同任务的单元。

redisCli := redis.NewClient(&redis.Options{
  Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)

当然,如果你可以自己实现一个自定义的Driver也是可以的,只需要实现Driver接口即可。

2.使用cron语法添加任务,需要指定任务名。任务名作为任务的唯一标识,必须保证唯一。

dcron.AddFunc("test1","*/3 * * * *",func(){
  fmt.Println("执行 test1 任务",time.Now().Format("15:04:05"))
})

3.开始任务。

// 启动任务可使用 Start() 或者 Run()
// 使用协程异步启动任务
dcron.Start()

// 使用当前协程同步启动任务,会阻塞当前协程后续逻辑执行
dcron.Run()
更多配置

Dcron 项目基于 https://github.com/robfig/cron , 使用 NewDcron 初始化 Dcron 时的第三个参数之后都会原样配置到 cron 。

例如需要配置秒级的 cron 表达式,可以使用

dcron := NewDcron("server1", drv,cron.WithSeconds())

另外还可以通过 NewDcronWithOption 方法初始化,可以配置日志输出等。

服务名/serviceName

服务名是为了定义相同一组任务,可以理解为任务分配和调度的边界。

多个节点使用同一个服务名会被视为同一任务组,在同一个任务组内的任务会均匀分配至组内各个节点并确保不会重复执行

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dcron

type Dcron struct {
	ServerName string

	RecoverFunc RecoverFuncType
	// contains filtered or unexported fields
}

Dcron is main struct

func NewDcron

func NewDcron(serverName string, driver driver.Driver, cronOpts ...cron.Option) *Dcron

NewDcron create a Dcron

func NewDcronWithOption

func NewDcronWithOption(serverName string, driver driver.Driver, dcronOpts ...Option) *Dcron

NewDcronWithOption create a Dcron with Dcron Option

func (*Dcron) AddFunc

func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error)

AddFunc add a cron func

func (*Dcron) AddJob

func (d *Dcron) AddJob(jobName, cronStr string, job Job) (err error)

AddJob add a job

func (*Dcron) GetLogger

func (d *Dcron) GetLogger() Logger

GetLogger get dcron logger

func (*Dcron) Remove

func (d *Dcron) Remove(jobName string)

Remove Job

func (*Dcron) Run

func (d *Dcron) Run()

Run Job

func (*Dcron) SetLogger

func (d *Dcron) SetLogger(logger Logger)

SetLogger set dcron logger

func (*Dcron) Start

func (d *Dcron) Start()

Start job

func (*Dcron) Stop

func (d *Dcron) Stop()

Stop job

type Hash

type Hash func(data []byte) uint32

type INodePool

type INodePool interface {
	Start(ctx context.Context) error
	CheckJobAvailable(jobName string) bool
	Stop(ctx context.Context) error

	GetNodeID() string
}

func NewNodePool

func NewNodePool(serviceName string, drv driver.Driver, updateDuration time.Duration, hashReplicas int, logger Logger) INodePool

type Job

type Job interface {
	Run()
}

Job Interface

type JobWarpper

type JobWarpper struct {
	ID      cron.EntryID
	Dcron   *Dcron
	Name    string
	CronStr string
	Func    func()
	Job     Job
}

JobWarpper is a job warpper

func (JobWarpper) Run

func (job JobWarpper) Run()

Run is run job

type LogfLogger

type LogfLogger interface {
	Logf(string, ...interface{})
}

type Logger

type Logger interface {
	PrintfLogger
	Infof(string, ...interface{})
	Warnf(string, ...interface{})
	Errorf(string, ...interface{})
}

func NewLoggerForTest

func NewLoggerForTest(t *testing.T) Logger

type Map

type Map struct {
	// contains filtered or unexported fields
}

func New

func New(replicas int, fn Hash) *Map

func (*Map) Add

func (m *Map) Add(keys ...string)

Add some keys to the hash.

func (*Map) Get

func (m *Map) Get(key string) string

Get the closest item in the hash to the provided key.

func (*Map) IsEmpty

func (m *Map) IsEmpty() bool

IsEmpty returns true if there are no items available.

type NodePool

type NodePool struct {
	// contains filtered or unexported fields
}

NodePool is a node pool

func (*NodePool) CheckJobAvailable

func (np *NodePool) CheckJobAvailable(jobName string) bool

CheckJobAvailable Check if this job can be run in this node.

func (*NodePool) GetNodeID

func (np *NodePool) GetNodeID() string

func (*NodePool) Start

func (np *NodePool) Start(ctx context.Context) (err error)

func (*NodePool) Stop

func (np *NodePool) Stop(ctx context.Context) error

type Option

type Option func(*Dcron)

Option is Dcron Option

func CronOptionChain

func CronOptionChain(wrappers ...cron.JobWrapper) Option

CronOptionChain is Warp cron with chain

func CronOptionLocation

func CronOptionLocation(loc *time.Location) Option

CronOptionLocation is warp cron with location

func CronOptionParser

func CronOptionParser(p cron.ScheduleParser) Option

CronOptionParser is warp cron with schedules.

func CronOptionSeconds

func CronOptionSeconds() Option

CronOptionSeconds is warp cron with seconds

func WithHashReplicas

func WithHashReplicas(d int) Option

WithHashReplicas set hashReplicas

func WithLogger

func WithLogger(logger Logger) Option

WithLogger both set dcron and cron logger.

func WithNodeUpdateDuration

func WithNodeUpdateDuration(d time.Duration) Option

WithNodeUpdateDuration set node update duration

func WithPrintLogInfo

func WithPrintLogInfo() Option

WithPrintLogInfo set log info level

func WithRecoverFunc

func WithRecoverFunc(recoverFunc RecoverFuncType) Option

WithRecoverFunc You can defined yourself recover function to make the job will be added to your dcron when the process restart

type PrintfLogger

type PrintfLogger interface {
	Printf(string, ...interface{})
}

func NewPrintfLoggerFromLogfLogger

func NewPrintfLoggerFromLogfLogger(logger LogfLogger) PrintfLogger

type PrintfLoggerFromLogfLogger

type PrintfLoggerFromLogfLogger struct {
	Log LogfLogger
}

func (*PrintfLoggerFromLogfLogger) Printf

func (l *PrintfLoggerFromLogfLogger) Printf(fmt string, args ...interface{})

type RecoverFuncType

type RecoverFuncType func(d *Dcron)

type StableJob

type StableJob interface {
	Job
	GetCron() string
	Serialize() ([]byte, error)
	UnSerialize([]byte) error
}

StableJob This type of Job will be recovered in a node of service restarting.

type StdLogger

type StdLogger struct {
	Log PrintfLogger
}

func (*StdLogger) Errorf

func (l *StdLogger) Errorf(format string, args ...interface{})

func (*StdLogger) Infof

func (l *StdLogger) Infof(format string, args ...interface{})

func (*StdLogger) Printf

func (l *StdLogger) Printf(format string, args ...interface{})

func (*StdLogger) Warnf

func (l *StdLogger) Warnf(format string, args ...interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL