agscheduler

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2024 License: MIT Imports: 20 Imported by: 0

README

AGScheduler

test codecov Go Report Card Go Reference GitHub release (with filter) GitHub go.mod Go version (subdirectory of monorepo) license

Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, remote call, and cluster

English | 简体中文

Features

  • Supports three scheduling types
    • One-off execution
    • Interval execution
    • Cron-style scheduling
  • Supports multiple job store methods
  • Supports remote call
  • Supports cluster
    • Remote worker nodes
    • Scheduler high availability (Experimental)

Framework

Framework

Usage

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/agscheduler/agscheduler"
	"github.com/agscheduler/agscheduler/stores"
)

func printMsg(ctx context.Context, j agscheduler.Job) {
	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
}

func main() {
	agscheduler.RegisterFuncs(
		agscheduler.FuncPkg{Func: printMsg},
	)

	store := &stores.MemoryStore{}
	scheduler := &agscheduler.Scheduler{}
	scheduler.SetStore(store)

	job1 := agscheduler.Job{
		Name:     "Job1",
		Type:     agscheduler.TYPE_INTERVAL,
		Interval: "2s",
		Timezone: "UTC",
		Func:     printMsg,
		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
	}
	job1, _ = scheduler.AddJob(job1)
	slog.Info(fmt.Sprintf("%s.\n\n", job1))

	job2 := agscheduler.Job{
		Name:     "Job2",
		Type:     agscheduler.TYPE_CRON,
		CronExpr: "*/1 * * * *",
		Timezone: "Asia/Shanghai",
		FuncName: "main.printMsg",
		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
	}
	job2, _ = s.AddJob(job2)
	slog.Info(fmt.Sprintf("%s.\n\n", job2))

	job3 := agscheduler.Job{
		Name:     "Job3",
		Type:     agscheduler.TYPE_DATETIME,
		StartAt:  "2023-09-22 07:30:08",
		Timezone: "America/New_York",
		Func:     printMsg,
		Args:     map[string]any{"arg8": "8", "arg9": "9"},
	}
	job3, _ = s.AddJob(job3)
	slog.Info(fmt.Sprintf("%s.\n\n", job3))

	jobs, _ := s.GetAllJobs()
	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))

	scheduler.Start()

	select {}
}

Register Funcs

Since golang can't serialize functions, you need to register them with RegisterFuncs before scheduler.Start()

gRPC

// Server
grservice := services.GRPCService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36360",
}
grservice.Start()

// Client
conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)

HTTP

// Server
hservice := services.HTTPService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36370",
}
hservice.Start()

// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))

Cluster

// Main Node
cnMain := &agscheduler.ClusterNode{
	Endpoint:     "127.0.0.1:36380",
	EndpointGRPC: "127.0.0.1:36360",
	EndpointHTTP: "127.0.0.1:36370",
	Queue:        "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()

// Worker Node
cnNode := &agscheduler.ClusterNode{
	EndpointMain: "127.0.0.1:36380",
	Endpoint:     "127.0.0.1:36381",
	EndpointGRPC: "127.0.0.1:36361",
	EndpointHTTP: "127.0.0.1:36371",
	Queue:        "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()

Cluster HA (High Availability, Experimental)


// HA requires the following conditions to be met:
//
// 1. The number of HA nodes in the cluster must be odd
// 2. All HA nodes need to connect to the same Store (excluding `MemoryStore`)
// 3. The `Mode` of the `ClusterNode` needs to be set to `HA`
// 4. The main HA node must be started first

// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}

// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}

// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}

Base API

gRPC Function HTTP Method HTTP Path
GetInfo GET /info
GetFuncs GET /funcs

Scheduler API

gRPC Function HTTP Method HTTP Path
AddJob POST /scheduler/job
GetJob GET /scheduler/job/:id
GetAllJobs GET /scheduler/jobs
UpdateJob PUT /scheduler/job
DeleteJob DELETE /scheduler/job/:id
DeleteAllJobs DELETE /scheduler/jobs
PauseJob POST /scheduler/job/:id/pause
ResumeJob POST /scheduler/job/:id/resume
RunJob POST /scheduler/job/run
ScheduleJob POST /scheduler/job/schedule
Start POST /scheduler/start
Stop POST /scheduler/stop

Cluster API

gRPC Function HTTP Method HTTP Path
GetNodes GET /cluster/nodes

Examples

Complete example

Development

# Clone code
git clone git@github.com:agscheduler/agscheduler.git

# Working directory
cd agscheduler

# Install dependencies
make install

# Up CI services
make up-ci-services

# Run check
make check-all

Thanks

APScheduler

simple-raft

Documentation

Index

Constants

View Source
const (
	TYPE_DATETIME = "datetime"
	TYPE_INTERVAL = "interval"
	TYPE_CRON     = "cron"
)

constant indicating a job's type

View Source
const (
	STATUS_RUNNING = "running"
	STATUS_PAUSED  = "paused"
)

constant indicating a job's status

View Source
const Version = "0.6.1"

Variables

View Source
var FuncMap = make(map[string]FuncPkg)

Record the actual path of function and the corresponding function. Since golang can't serialize functions, need to register them with `RegisterFuncs` before using it.

View Source
var GetClusterNode = (*Scheduler).getClusterNode
View Source
var GetStore = (*Scheduler).getStore

Functions

func CalcNextRunTime

func CalcNextRunTime(j Job) (time.Time, error)

Calculate the next run time, different job type will be calculated in different ways, when the job is paused, will return `9999-09-09 09:09:09`.

func FuncMapReadable

func FuncMapReadable() []map[string]string

func GetNextRunTimeMax

func GetNextRunTimeMax() (time.Time, error)

func JobToPbJobPtr

func JobToPbJobPtr(j Job) (*pb.Job, error)

Used to gRPC Protobuf

func JobsToPbJobsPtr

func JobsToPbJobsPtr(js []Job) (*pb.Jobs, error)

Used to gRPC Protobuf

func RegisterFuncs

func RegisterFuncs(fps ...FuncPkg)

func StateDump

func StateDump(j Job) ([]byte, error)

Serialize Job and convert to Bytes

Types

type ClusterNode

type ClusterNode struct {
	// Main node RPC listening address.
	// If you are the main, `EndpointMain` is the same as `Endpoint`.
	// Default: `127.0.0.1:36380`
	EndpointMain string
	// The unique identifier of this node.
	// RPC listening address.
	// Used to expose the cluster's internal API.
	// Default: `127.0.0.1:36380`
	Endpoint string
	// gRPC listening address.
	// Used to expose the external API.
	// Default: `127.0.0.1:36360`
	EndpointGRPC string
	// HTTP listening address.
	// Used to expose the external API.
	// Default: `127.0.0.1:36370`
	EndpointHTTP string
	// Useful when a job specifies a queue.
	// A queue can correspond to multiple nodes.
	// Default: `default`
	Queue string
	// Node mode, for Scheduler high availability.
	// If the value is `HA`, the node will join the raft group.
	// Default: “, Options `HA`
	Mode string

	// Bind to each other and the Scheduler.
	Scheduler *Scheduler

	// For Scheduler high availability.
	// Bind to each other and the Raft.
	Raft *Raft
	// Used to mark the status of Cluster Scheduler operation.
	SchedulerCanStart bool
	// contains filtered or unexported fields
}

Each node provides `Cluster RPC`, `gRPC`, `HTTP` services, but only the main node starts the scheduler, the other worker nodes register with the main node and then run jobs from the main node via the RPC's `RunJob` API.

func (*ClusterNode) GetEndpointMain

func (cn *ClusterNode) GetEndpointMain() string

func (*ClusterNode) HANodeMap

func (cn *ClusterNode) HANodeMap() TypeNodeMap

func (*ClusterNode) IsMainNode

func (cn *ClusterNode) IsMainNode() bool

func (*ClusterNode) MainNode

func (cn *ClusterNode) MainNode() map[string]any

func (*ClusterNode) NodeMapCopy

func (cn *ClusterNode) NodeMapCopy() TypeNodeMap

func (*ClusterNode) NodeMapToPbNodesPtr

func (cn *ClusterNode) NodeMapToPbNodesPtr() *pb.Nodes

Used to gRPC Protobuf

func (*ClusterNode) RPCHeartbeat

func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)

RPC API

func (*ClusterNode) RPCRegister

func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)

RPC API

func (*ClusterNode) RegisterNodeRemote

func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error

Used for worker node

After initialization, node need to register with the main node and synchronize cluster node information.

func (*ClusterNode) SetEndpointMain

func (cn *ClusterNode) SetEndpointMain(endpoint string)

type FuncPkg

type FuncPkg struct {
	Func func(context.Context, Job)
	// About this function.
	Info string
}

type FuncUnregisteredError

type FuncUnregisteredError string

func (FuncUnregisteredError) Error

func (e FuncUnregisteredError) Error() string

type HeartbeatArgs

type HeartbeatArgs struct {
	Term           int
	LeaderEndpoint string

	SchedulerCanStart bool
}

type HeartbeatReply

type HeartbeatReply struct {
	Term int
}

type Job

type Job struct {
	// The unique identifier of this job, automatically generated.
	// It should not be set manually.
	Id string `json:"id"`
	// User defined.
	Name string `json:"name"`
	// Optional: `TYPE_DATETIME` | `TYPE_INTERVAL` | `TYPE_CRON`
	Type string `json:"type"`
	// It can be used when Type is `TYPE_DATETIME`.
	StartAt string `json:"start_at"`
	// This field is useless.
	EndAt string `json:"end_at"`
	// It can be used when Type is `TYPE_INTERVAL`.
	Interval string `json:"interval"`
	// It can be used when Type is `TYPE_CRON`.
	CronExpr string `json:"cron_expr"`
	// Refer to `time.LoadLocation`.
	// Default: `UTC`
	Timezone string `json:"timezone"`
	// The job actually runs the function,
	// and you need to register it through 'RegisterFuncs' before using it.
	// Since it cannot be stored by serialization,
	// when using gRPC or HTTP calls, you should use `FuncName`.
	Func func(context.Context, Job) `json:"-"`
	// The actual path of `Func`.
	// This field has a higher priority than `Func`
	FuncName string `json:"func_name"`
	// Arguments for `Func`.
	Args map[string]any `json:"args"`
	// The running timeout of `Func`.
	// Default: `1h`
	Timeout string `json:"timeout"`
	// Used in cluster mode, if empty, randomly pick a node to run `Func`.
	Queues []string `json:"queues"`

	// Automatic update, not manual setting.
	LastRunTime time.Time `json:"last_run_time"`
	// Automatic update, not manual setting.
	// When the job is paused, this field is set to `9999-09-09 09:09:09`.
	NextRunTime time.Time `json:"next_run_time"`
	// Optional: `STATUS_RUNNING` | `STATUS_PAUSED`
	// It should not be set manually.
	Status string `json:"status"`
}

Carry the information of the scheduled job

func PbJobPtrToJob

func PbJobPtrToJob(pbJob *pb.Job) Job

Used to gRPC Protobuf

func PbJobsPtrToJobs

func PbJobsPtrToJobs(pbJs *pb.Jobs) []Job

Used to gRPC Protobuf

func StateLoad

func StateLoad(state []byte) (Job, error)

Deserialize Bytes and convert to Job

func (*Job) FullName

func (j *Job) FullName() string

func (*Job) LastRunTimeWithTimezone

func (j *Job) LastRunTimeWithTimezone() time.Time

func (*Job) NextRunTimeWithTimezone

func (j *Job) NextRunTimeWithTimezone() time.Time

func (Job) String

func (j Job) String() string

type JobNotFoundError

type JobNotFoundError string

func (JobNotFoundError) Error

func (e JobNotFoundError) Error() string

type JobSlice

type JobSlice []Job

`sort.Interface`, sorted by 'NextRunTime', ascend.

func (JobSlice) Len

func (js JobSlice) Len() int

func (JobSlice) Less

func (js JobSlice) Less(i, j int) bool

func (JobSlice) Swap

func (js JobSlice) Swap(i, j int)

type JobTimeoutError

type JobTimeoutError struct {
	FullName string
	Timeout  string
	Err      error
}

func (*JobTimeoutError) Error

func (e *JobTimeoutError) Error() string

type Node

type Node struct {
	EndpointMain string
	Endpoint     string
	EndpointGRPC string
	EndpointHTTP string
	Queue        string
	Mode         string

	NodeMap TypeNodeMap
}

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

func (*Raft) RPCHeartbeat

func (rf *Raft) RPCHeartbeat(args HeartbeatArgs, reply *HeartbeatReply) error

func (*Raft) RPCRequestVote

func (rf *Raft) RPCRequestVote(args VoteArgs, reply *VoteReply) error

type Role

type Role int
const (
	Follower Role = iota + 1
	Candidate
	Leader
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

In standalone mode, the scheduler only needs to run jobs on a regular basis. In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(j Job) (Job, error)

func (*Scheduler) DeleteAllJobs

func (s *Scheduler) DeleteAllJobs() error

func (*Scheduler) DeleteJob

func (s *Scheduler) DeleteJob(id string) error

func (*Scheduler) GetAllJobs

func (s *Scheduler) GetAllJobs() ([]Job, error)

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(id string) (Job, error)

func (*Scheduler) Info

func (s *Scheduler) Info() map[string]any

func (*Scheduler) IsClusterMode

func (s *Scheduler) IsClusterMode() bool

func (*Scheduler) IsRunning

func (s *Scheduler) IsRunning() bool

func (*Scheduler) PauseJob

func (s *Scheduler) PauseJob(id string) (Job, error)

func (*Scheduler) ResumeJob

func (s *Scheduler) ResumeJob(id string) (Job, error)

func (*Scheduler) RunJob

func (s *Scheduler) RunJob(j Job) error

func (*Scheduler) ScheduleJob

func (s *Scheduler) ScheduleJob(j Job) error

Used in cluster mode. Select a worker node

func (*Scheduler) SetClusterNode

func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error

Bind the cluster node

func (*Scheduler) SetStore

func (s *Scheduler) SetStore(sto Store) error

Bind the store

func (*Scheduler) Start

func (s *Scheduler) Start()

In addition to being called manually, it is also called after `AddJob`.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

In addition to being called manually, there is no job in store that will also be called.

func (*Scheduler) UpdateJob

func (s *Scheduler) UpdateJob(j Job) (Job, error)

type Store

type Store interface {
	// Initialization functions for each store,
	// called when the scheduler run `SetStore`.
	Init() error

	// Add job to this store.
	AddJob(j Job) error

	// Get the job from this store.
	//  @return error `JobNotFoundError` if there are no job.
	GetJob(id string) (Job, error)

	// Get all jobs from this store.
	GetAllJobs() ([]Job, error)

	// Update job in store with a newer version.
	UpdateJob(j Job) error

	// Delete the job from this store.
	DeleteJob(id string) error

	// Delete all jobs from this store.
	DeleteAllJobs() error

	// Get the earliest next run time of all the jobs stored in this store,
	// or `time.Time{}` if there are no job.
	// Used to set the wakeup interval for the scheduler.
	GetNextRunTime() (time.Time, error)

	// Clear all resources bound to this store.
	Clear() error
}

Defines the interface that each store must implement.

type TypeNodeMap

type TypeNodeMap map[string]map[string]any

type VoteArgs

type VoteArgs struct {
	Term              int
	CandidateEndpoint string
}

type VoteReply

type VoteReply struct {
	Term        int
	VoteGranted bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL