go_worker

package module
v1.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

README

codingXiang/go-worker

Introduction

go-worker is a distributed task system for Go which implement master-worker architecture, there are few feature in this project:

  • high available architecture
  • schedule task with cron expression
  • dynamic task assign
  • simple restful api
  • command line interface

Architecture

Single Master

Multiple Master

install

go get github.com/codingXiang/go-worker

Usage

master
package main

import (
	"github.com/codingXiang/go-worker"
	"github.com/coreos/etcd/clientv3"
	"github.com/gocraft/work"
	"github.com/gomodule/redigo/redis"
	"time"
)

func main() {
    //make a redis connection pool
    redisPool := &redis.Pool{
        MaxActive: 5,
        MaxIdle:   5,
        Wait:      true,
        Dial: func() (redis.Conn, error) {
            return redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("a12345"))
        },
    }
    //create single master node
    //master := go_worker.NewMaster(redisPool, "demo", &go_worker.MasterOption{
    //	IsCluster: false,
    //})
    
    //create multiple master node with options
    master := go_worker.NewMaster(redisPool, "demo", &go_worker.MasterOption{
        IsCluster: true,
        ETCDConfig: clientv3.Config{
            Endpoints:   []string{"localhost:32773", "localhost:32769", "localhost:32772"},
            DialTimeout: 5 * time.Second,
        },
    })
    //initialize master settings
    master.Init()
    //add task with cron spec
    id, _ := master.AddTask("*/3 * * * * *", "send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
    //exec task by id
    master.ExecTask(id)
    select {}
}

worker
package main

import (
    go_worker "github.com/codingXiang/go-worker"
    "github.com/gocraft/work"
    "github.com/gomodule/redigo/redis"
    "log"
)

//define a custom job
type CustomJob struct {
    customerID int64
    Pool       *work.WorkerPool
    Cloud      string `json:"cloud"`
    Area       string `json:"area"`
    Namespace  string `json:"namespace"`
    Spec       string `json:"spec"`
}

//custom must to implement `Do(job *work.Job) error` function
func (c *CustomJob) Do(job *work.Job) error {
    log.Println(job.Name)
    return nil
}


func main() {
    //make a redis connection pool
    redisPool := &redis.Pool{
        MaxActive: 5,
        MaxIdle:   5,
        Wait:      true,
        Dial: func() (redis.Conn, error) {
            return redis.Dial("tcp", ":6379", redis.DialPassword("a12345"))
        },
    }
    //make a worker instance
    worker := go_worker.NewWorker(10, "demo", redisPool)
    //make a CustomJob instance
    customJob := &CustomJob{}
    //define job to worker with name, instance and options
    worker.AddJob("send_email", customJob, nil)
    //start worker
    worker.Start()
    select {}
}


Dependencies

Documentation

Index

Constants

View Source
const (
	IDENTITY          = "identity"
	NAMESPACE         = "namespace"
	JOB_NAME          = "jobName"
	STATUS            = "status"
	UPDATE            = "$set"
	STATUS_SCHEDULING = "scheduling"
	STATUS_REMOVE     = "remove"
	STATUS_PENDING    = "pending"
	STATUS_RUNNING    = "running"
	STATUS_COMPLETE   = "complete"
	STATUS_FAILED     = "failed"
	HISTORY           = "history"
)
View Source
const (
	MASTER = "master"
	TASK   = "task"
	LOCK   = "lock"
)
View Source
const (
	Now = "now"
)

Variables

This section is empty.

Functions

func BuildKeyPath

func BuildKeyPath(basePath string, paths ...string) string

BuildKeyPath 組合 etcd key 的路徑

Types

type ETCDAuth added in v0.0.26

type ETCDAuth struct {
	Endpoints []string
	Username  string
	Password  string
}

type EnqueueEntity

type EnqueueEntity struct {
	ID       string                 `json:"id"`
	Location *time.Location         `json:"-"`
	Cron     *cronV3.Cron           `json:"-"`
	Engine   *work.Enqueuer         `json:"-"`
	Spec     string                 `json:"Spec"`
	EntryID  cronV3.EntryID         `json:"EntryID"`
	JobName  string                 `json:"JobName"`
	Args     map[string]interface{} `json:"Args"`
}

EnqueueEntity 實例

func NewEnqueue

func NewEnqueue(Engine *work.Enqueuer, location *time.Location, Spec, JobName string, Args map[string]interface{}) *EnqueueEntity

NewEnqueue 建立一個新的 Enqueue instance

func (*EnqueueEntity) AddArgs

func (g *EnqueueEntity) AddArgs(key string, value interface{}) *EnqueueEntity

func (*EnqueueEntity) Do

func (g *EnqueueEntity) Do() (*work.Job, error)

func (*EnqueueEntity) GetArgs

func (g *EnqueueEntity) GetArgs() map[string]interface{}

func (*EnqueueEntity) GetEntryID

func (g *EnqueueEntity) GetEntryID() cronV3.EntryID

func (*EnqueueEntity) GetID

func (g *EnqueueEntity) GetID() string

func (*EnqueueEntity) GetInstance

func (g *EnqueueEntity) GetInstance() *EnqueueEntity

func (*EnqueueEntity) GetJobName

func (g *EnqueueEntity) GetJobName() string

func (*EnqueueEntity) GetSpec

func (g *EnqueueEntity) GetSpec() string

func (*EnqueueEntity) RemoveArgs

func (g *EnqueueEntity) RemoveArgs(key string) *EnqueueEntity

func (*EnqueueEntity) Run

func (g *EnqueueEntity) Run()

Run 排程專用

func (*EnqueueEntity) SetEntryID

func (g *EnqueueEntity) SetEntryID(id cronV3.EntryID) *EnqueueEntity

func (*EnqueueEntity) SetJobName

func (g *EnqueueEntity) SetJobName(JobName string) *EnqueueEntity

func (*EnqueueEntity) SetSpec

func (g *EnqueueEntity) SetSpec(Spec string) *EnqueueEntity

type Job

type Job interface {
	Do(job *work.Job) error
}

type Master

type Master interface {
	Init() Master
	GetID() string
	AddTask(info *TaskInfo) (*TaskInfo, error)
	GetEnqueues() map[string]*EnqueueEntity
	GetEnqueue(id string) (*EnqueueEntity, error)
	GetWorkerHeartbeats() ([]*work.WorkerPoolHeartbeat, error)
	GetBusyWorkers() ([]*work.WorkerObservation, error)
	GetQueues() ([]*work.Queue, error)
	ExecTask(id string) error
	WaitTask(id string, onChange func(data *mongo.RawData) (bool, error), onDelete func()) error
	RemoveTask(id string) error
	RemoveTaskRecord(id string) error
}

func NewMaster

func NewMaster(pool *redis.Pool, namespace string, option *MasterOption) Master

NewMaster 建立 Master 實例

func NewMasterCluster

func NewMasterCluster(base *MasterEntity, option *MasterOption) Master

NewMasterCluster 建立集群版本 Master Instance

type MasterClusterEntity

type MasterClusterEntity struct {
	*MasterEntity
	// contains filtered or unexported fields
}

func (*MasterClusterEntity) AddTask

func (g *MasterClusterEntity) AddTask(info *TaskInfo) (*TaskInfo, error)

AddTask 新增任務

func (*MasterClusterEntity) ExecTask added in v0.0.5

func (g *MasterClusterEntity) ExecTask(id string) error

func (*MasterClusterEntity) Init

func (g *MasterClusterEntity) Init() Master

Init 初始化

func (*MasterClusterEntity) RemoveTask

func (g *MasterClusterEntity) RemoveTask(id string) error

RemoveTask 移除任務

func (*MasterClusterEntity) RemoveTaskRecord added in v1.0.24

func (g *MasterClusterEntity) RemoveTaskRecord(id string) error

func (*MasterClusterEntity) WaitTask added in v1.0.32

func (g *MasterClusterEntity) WaitTask(id string, onChange func(data *mongo.RawData) (bool, error), onDelete func()) error

func (*MasterClusterEntity) WatchMaster

func (g *MasterClusterEntity) WatchMaster()

WatchMaster 集群監聽 Master

func (*MasterClusterEntity) WatchTask

func (g *MasterClusterEntity) WatchTask()

WatchTask 集群監聽任務

type MasterEntity

type MasterEntity struct {
	// contains filtered or unexported fields
}

Master 實例

func (*MasterEntity) AddTask

func (g *MasterEntity) AddTask(info *TaskInfo) (*TaskInfo, error)

AddTask 加入任務

func (*MasterEntity) ExecTask

func (g *MasterEntity) ExecTask(id string) error

啟動排程

func (*MasterEntity) GetBusyWorkers

func (g *MasterEntity) GetBusyWorkers() ([]*work.WorkerObservation, error)

func (*MasterEntity) GetEnqueue

func (g *MasterEntity) GetEnqueue(id string) (*EnqueueEntity, error)

func (*MasterEntity) GetEnqueues

func (g *MasterEntity) GetEnqueues() map[string]*EnqueueEntity

func (*MasterEntity) GetID

func (g *MasterEntity) GetID() string

func (*MasterEntity) GetQueues

func (g *MasterEntity) GetQueues() ([]*work.Queue, error)

func (*MasterEntity) GetWorkerHeartbeats

func (g *MasterEntity) GetWorkerHeartbeats() ([]*work.WorkerPoolHeartbeat, error)

GetWorkerHeartbeats 取得 worker heartbeats 陣列

func (*MasterEntity) Init

func (g *MasterEntity) Init() Master

func (*MasterEntity) RemoveTask

func (g *MasterEntity) RemoveTask(id string) error

移除排程

func (*MasterEntity) RemoveTaskRecord added in v1.0.24

func (g *MasterEntity) RemoveTaskRecord(id string) error

func (*MasterEntity) WaitTask added in v1.0.32

func (g *MasterEntity) WaitTask(id string, onChange func(data *mongo.RawData) (bool, error), onDelete func()) error

type MasterOption

type MasterOption struct {
	IsCluster   bool
	ETCDConfig  clientv3.Config
	MongoClient *mongo.Client
	BasePath    string
}

type TaskInfo

type TaskInfo struct {
	MasterID         string `json:"masterId"`
	ID               string `json:"id"`
	TimeZone         string `json:"timezone"`
	Spec             string `json:"spec"`
	JobName          string `json:"jobName"`
	Active           bool   `json:"active"`
	DisableTimeRange *struct {
		Start time.Time `json:"start"`
		End   time.Time `json:"end"`
	} `json:"disableTimeRange"`
	Args map[string]interface{} `json:"args"`
}

type Worker

type Worker interface {
	AddJob(name string, job Job, option *work.JobOptions) Worker
	Start() Worker
	Stop() Worker
}

func NewWorker

func NewWorker(concurrency uint, namespace string, pool *redis.Pool) Worker

type WorkerEntity

type WorkerEntity struct {
	// contains filtered or unexported fields
}

func (*WorkerEntity) AddJob

func (g *WorkerEntity) AddJob(name string, job Job, option *work.JobOptions) Worker

func (*WorkerEntity) Start

func (g *WorkerEntity) Start() Worker

func (*WorkerEntity) Stop

func (g *WorkerEntity) Stop() Worker

Directories

Path Synopsis
example
util

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL