agscheduler

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2023 License: MIT Imports: 27 Imported by: 0

README

AGScheduler

test codecov Go Report Card Go Reference GitHub release (with filter) GitHub go.mod Go version (subdirectory of monorepo) license

Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, remote call, and cluster

English | 简体中文

Features

  • Supports three scheduling types
    • One-off execution
    • Interval execution
    • Cron-style scheduling
  • Supports multiple job store methods
  • Supports remote call
    • gRPC
    • HTTP APIs
  • Supports cluster
    • Remote worker nodes

Framework

Framework

Usage

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/kwkwc/agscheduler"
	"github.com/kwkwc/agscheduler/stores"
)

func printMsg(ctx context.Context, j agscheduler.Job) {
	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
}

func main() {
	agscheduler.RegisterFuncs(printMsg)

	store := &stores.MemoryStore{}
	scheduler := &agscheduler.Scheduler{}
	scheduler.SetStore(store)

	job1 := agscheduler.Job{
		Name:     "Job1",
		Type:     agscheduler.TYPE_INTERVAL,
		Interval: "2s",
		Timezone: "UTC",
		Func:     printMsg,
		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
	}
	job1, _ = scheduler.AddJob(job1)
	slog.Info(fmt.Sprintf("%s.\n\n", job1))

	job2 := agscheduler.Job{
		Name:     "Job2",
		Type:     agscheduler.TYPE_CRON,
		CronExpr: "*/1 * * * *",
		Timezone: "Asia/Shanghai",
		FuncName: "main.printMsg",
		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
	}
	job2, _ = s.AddJob(job2)
	slog.Info(fmt.Sprintf("%s.\n\n", job2))

	job3 := agscheduler.Job{
		Name:     "Job3",
		Type:     agscheduler.TYPE_DATETIME,
		StartAt:  "2023-09-22 07:30:08",
		Timezone: "America/New_York",
		Func:     printMsg,
		Args:     map[string]any{"arg8": "8", "arg9": "9"},
	}
	job3, _ = s.AddJob(job3)
	slog.Info(fmt.Sprintf("%s.\n\n", job3))

	jobs, _ := s.GetAllJobs()
	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))

	scheduler.Start()

	select {}
}

Register Funcs

Since golang can't serialize functions, you need to register them with RegisterFuncs before scheduler.Start()

gRPC

// Server
srservice := services.SchedulerRPCService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36360",
}
srservice.Start()

// Client
conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)

HTTP APIs

// Server
shservice := services.SchedulerHTTPService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36370",
}
shservice.Start()

// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(bJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))

Cluster

// Main Node
cnMain := &agscheduler.ClusterNodeClusterNode{
	Endpoint:          "127.0.0.1:36380",
	EndpointHTTP:      "127.0.0.1:36390",
	SchedulerEndpoint: "127.0.0.1:36360",
	Queue:             "default",
}
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()

// Node
cn := &agscheduler.ClusterNode{
	MainEndpoint:      "127.0.0.1:36380",
	Endpoint:          "127.0.0.1:36381",
	EndpointHTTP:      "127.0.0.1:36391",
	SchedulerEndpoint: "127.0.0.1:36361",
	Queue:             "node",
}
scheduler.SetClusterNode(ctx, cn)
cservice := &services.ClusterService{Cn: cn}
cservice.Start()

Scheduler API

gRPC Function HTTP Method HTTP Endpoint
AddJob POST /scheduler/job
GetJob GET /scheduler/job/:id
GetAllJobs GET /scheduler/jobs
UpdateJob PUT /scheduler/job
DeleteJob DELETE /scheduler/job/:id
DeleteAllJobs DELETE /scheduler/jobs
PauseJob POST /scheduler/job/:id/pause
ResumeJob POST /scheduler/job/:id/resume
RunJob POST /scheduler/job/run
Start POST /scheduler/start
Stop POST /scheduler/stop

Cluster API

RPC Function HTTP Method HTTP Endpoint
Nodes GET /cluster/nodes

Examples

Complete example

Thanks

APScheduler

Documentation

Index

Constants

View Source
const (
	TYPE_DATETIME = "datetime"
	TYPE_INTERVAL = "interval"
	TYPE_CRON     = "cron"
)

constant indicating a job's type

View Source
const (
	STATUS_RUNNING = "running"
	STATUS_PAUSED  = "paused"
)

constant indicating a job's status

View Source
const Version = "0.2.4"

Variables

View Source
var GetClusterNode = (*Scheduler).getClusterNode
View Source
var GetStore = (*Scheduler).getStore

Functions

func CalcNextRunTime

func CalcNextRunTime(j Job) (time.Time, error)

Calculate the next run time, different job type will be calculated in different ways, when the job is paused, will return `9999-09-09 09:09:09`.

func JobToPbJobPtr

func JobToPbJobPtr(j Job) *pb.Job

Used to gRPC Protobuf

func JobsToPbJobsPtr

func JobsToPbJobsPtr(js []Job) *pb.Jobs

Used to gRPC Protobuf

func RegisterFuncs

func RegisterFuncs(fs ...func(context.Context, Job))

func StateDump

func StateDump(j Job) ([]byte, error)

Serialize Job and convert to Bytes

Types

type ClusterNode

type ClusterNode struct {
	// The unique identifier of this node, automatically generated.
	// It should not be set manually.
	Id string
	// Main node RPC listening address.
	// If you are the main, `MainEndpoint` is the same as `Endpoint`.
	// Default: `127.0.0.1:36380`
	MainEndpoint string
	// RPC listening address.
	// Used to expose the cluster's internal API.
	// Default: `127.0.0.1:36380`
	Endpoint string
	// HTTP listening address.
	// Used to expose the cluster's external API.
	// Default: `127.0.0.1:36390`
	EndpointHTTP string
	// Scheduler gRPC listening address.
	// Used to expose the scheduler's external API.
	// Default: `127.0.0.1:36360`
	SchedulerEndpoint string
	// Useful when a job specifies a queue.
	// A queue can correspond to multiple nodes.
	// Default: `default`
	Queue string

	// Bind to each other and the scheduler.
	Scheduler *Scheduler
	// contains filtered or unexported fields
}

Each node provides `RPC`, `HTTP`, `Scheduler gRPC` services, but only the main node starts the scheduler, the other worker nodes register with the main node and then run jobs from the main node via the Scheduler's `RunJob` API.

func (*ClusterNode) NodeMap

func (cn *ClusterNode) NodeMap() map[string]map[string]map[string]any

func (*ClusterNode) RPCPing

func (cn *ClusterNode) RPCPing(args *Node, reply *Node)

RPC API

func (*ClusterNode) RPCRegister

func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)

RPC API

func (*ClusterNode) RegisterNodeRemote

func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error

Used for worker node

After initialization, node need to register with the main node and synchronize cluster node information.

type EmailConfig

type EmailConfig struct {
	SMTPServer string
	Port       int
	Username   string
	Password   string
	Sender     string
	Recipients []string
}

type FuncUnregisteredError

type FuncUnregisteredError string

func (FuncUnregisteredError) Error

func (e FuncUnregisteredError) Error() string

type HTTPCallbackConfig

type HTTPCallbackConfig struct {
	URL         string   // 企业微信机器人的URL
	MessageType string   // 消息类型,例如"text"
	MentionList []string // 要@的人的列表,存储企业微信ID
}

type Job

type Job struct {
	// The unique identifier of this job, automatically generated.
	// It should not be set manually.
	Id string `json:"id"`
	// User defined.
	Name string `json:"name"`
	// Optional: `TYPE_DATETIME` | `TYPE_INTERVAL` | `TYPE_CRON`
	Type string `json:"type"`
	// It can be used when Type is `TYPE_DATETIME`.
	StartAt string `json:"start_at"`
	// This field is useless.
	EndAt string `json:"end_at"`
	// It can be used when Type is `TYPE_INTERVAL`.
	Interval string `json:"interval"`
	// It can be used when Type is `TYPE_CRON`.
	CronExpr string `json:"cron_expr"`
	// Refer to `time.LoadLocation`.
	// Default: `UTC`
	Timezone string `json:"timezone"`
	// The job actually runs the function,
	// and you need to register it through 'RegisterFuncs' before using it.
	// Since it cannot be stored by serialization,
	// when using RPC or HTTP calls, you should use `FuncName`.
	Func func(context.Context, Job) `json:"-"`
	// The actual path of `Func`.
	// This field has a higher priority than `Func`
	FuncName string `json:"func_name"`
	// Arguments for `Func`.
	Args map[string]any `json:"args"`
	// The running timeout of `Func`.
	// Default: `1h`
	Timeout string `json:"timeout"`
	// Used in cluster mode, if empty, randomly pick a node to run `Func`.
	Queues []string `json:"queues"`

	// Automatic update, not manual setting.
	LastRunTime time.Time `json:"last_run_time"`
	// Automatic update, not manual setting.
	// When the job is paused, this field is set to `9999-09-09 09:09:09`.
	NextRunTime time.Time `json:"next_run_time"`
	// Optional: `STATUS_RUNNING` | `STATUS_PAUSED`
	// It should not be set manually.
	Status string `json:"status"`
}

Carry the information of the scheduled job

func PbJobPtrToJob

func PbJobPtrToJob(pbJob *pb.Job) Job

Used to gRPC Protobuf

func PbJobsPtrToJobs

func PbJobsPtrToJobs(pbJs *pb.Jobs) []Job

Used to gRPC Protobuf

func StateLoad

func StateLoad(state []byte) (Job, error)

Deserialize Bytes and convert to Job

func (*Job) FullName

func (j *Job) FullName() string

func (*Job) LastRunTimeWithTimezone

func (j *Job) LastRunTimeWithTimezone() time.Time

func (*Job) NextRunTimeWithTimezone

func (j *Job) NextRunTimeWithTimezone() time.Time

func (Job) String

func (j Job) String() string

type JobNotFoundError

type JobNotFoundError string

func (JobNotFoundError) Error

func (e JobNotFoundError) Error() string

type JobSlice

type JobSlice []Job

`sort.Interface`, sorted by 'NextRunTime', ascend.

func (JobSlice) Len

func (js JobSlice) Len() int

func (JobSlice) Less

func (js JobSlice) Less(i, j int) bool

func (JobSlice) Swap

func (js JobSlice) Swap(i, j int)

type JobTimeoutError

type JobTimeoutError struct {
	FullName string
	Timeout  string
	Err      error
}

func (*JobTimeoutError) Error

func (e *JobTimeoutError) Error() string

type Node

type Node struct {
	Id                string
	MainEndpoint      string
	Endpoint          string
	EndpointHTTP      string
	SchedulerEndpoint string
	Queue             string
	NodeMap           map[string]map[string]map[string]any
}

type Scheduler

type Scheduler struct {
	EmailConfig        *EmailConfig
	HTTPCallbackConfig *HTTPCallbackConfig
	// contains filtered or unexported fields
}

In standalone mode, the scheduler only needs to run jobs on a regular basis. In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(j Job) (Job, error)

func (*Scheduler) DeleteAllJobs

func (s *Scheduler) DeleteAllJobs() error

func (*Scheduler) DeleteJob

func (s *Scheduler) DeleteJob(id string) error

func (*Scheduler) GetAllJobs

func (s *Scheduler) GetAllJobs() ([]Job, error)

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(id string) (Job, error)

func (*Scheduler) PauseJob

func (s *Scheduler) PauseJob(id string) (Job, error)

func (*Scheduler) ResumeJob

func (s *Scheduler) ResumeJob(id string) (Job, error)

func (*Scheduler) RunJob

func (s *Scheduler) RunJob(j Job) error

func (*Scheduler) ScheduleJob

func (s *Scheduler) ScheduleJob(j Job) error

Used in cluster mode. Select a worker node

func (*Scheduler) SetClusterNode

func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error

Bind the cluster node

func (*Scheduler) SetStore

func (s *Scheduler) SetStore(sto Store) error

Bind the store

func (*Scheduler) Start

func (s *Scheduler) Start()

In addition to being called manually, it is also called after `AddJob`.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

In addition to being called manually, there is no job in store that will also be called.

func (*Scheduler) UpdateJob

func (s *Scheduler) UpdateJob(j Job) (Job, error)

type Store

type Store interface {
	// Initialization functions for each store,
	// called when the scheduler run `SetStore`.
	Init() error

	// Add job to this store.
	AddJob(j Job) error

	// Get the job from this store.
	//  @return error `JobNotFoundError` if there are no job.
	GetJob(id string) (Job, error)

	// Get all jobs from this store.
	GetAllJobs() ([]Job, error)

	// Update job in store with a newer version.
	UpdateJob(j Job) error

	// Delete the job from this store.
	DeleteJob(id string) error

	// Delete all jobs from this store.
	DeleteAllJobs() error

	// Get the earliest next run time of all the jobs stored in this store,
	// or `time.Time{}` if there are no job.
	// Used to set the wakeup interval for the scheduler.
	GetNextRunTime() (time.Time, error)

	// Clear all resources bound to this store.
	Clear() error
}

Defines the interface that each store must implement.

Directories

Path Synopsis
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL