goworker

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

README

worker

worker是一个后台常驻服务库,就像创建一个操作系统服务。

适用场景:

  • 长时间后台处理任务
  • mq数据消费服务
  • 定时计划
  • 异步任务处理

待支持

  • 脚本任务(linux脚本)
  • 失效转移(当某个实例在执行任务中宕机时,将重新由其它实例执行)
  • 支持后台动态添加任务,支持Cron任务暂停,支持手动停止正在执行的任务(有条件)
  • 任务依赖,多任务关联执行
  • 监控

支持

  • 服务集群,任务分片
  • cron
  • 出错重试
  • 自定义重试策略
  • 自定义logger配置

Example

Getting Started

  1. 定义worker类 实现接口:Handler
type Handler interface {
    Before(ctx context.Context) error
    Do(ctx context.Context) error
    After(ctx context.Context) error
    ErrorMessage(ctx context.Context, err error)
}

执行顺序:
Before -> Do -> After
ErrorMessage贯穿handler的执行全周期,负责接收执行期间出现的error

  1. 开始使用 例子
package main

import (
	"context"
	"fmt"
	"github.com/lufred/goworker"
	"github.com/lufred/goworker/config"
)

func main() {
	w := goworker.NewWorker(
		context.Background(),
		config.DefaultWorkerConfig(),
		&MyWorker{})

	if err := w.Run(); err != nil {
		fmt.Println(err)
		return
	}
	if err := w.Wait(); err != nil {
		fmt.Println(err.Error())
	}
}

type MyWorker struct {
}

func (w *MyWorker) Before(ctx context.Context) error {
	fmt.Println("任务初始化")
	return nil
}

func (w *MyWorker) Do(ctx context.Context) error {
	fmt.Println("任务执行中:helloWorld")
	return nil
}

func (w *MyWorker) After(ctx context.Context) error {
	fmt.Println("任务处理完成")
	return nil
}

func (w *MyWorker) ErrorMessage(ctx context.Context, err error) {
	fmt.Println(fmt.Sprintf("err:%+v", err))
}

Documentation

Index

Constants

View Source
const (
	HandleRunLabelRun   = true
	HandleRunLabelUnRun = false
)
View Source
const NoCronBackoff = time.Duration(-1)

NoCronBackoff 不存在cron重试

View Source
const (
	NoCronExpress = ""
)
View Source
const RetryDone = time.Duration(-1)

Variables

This section is empty.

Functions

func FanIn

func FanIn(ctx context.Context, channels ...<-chan error) <-chan error

func GetBackoffForNextSchedule

func GetBackoffForNextSchedule(cronSchedule string) (next time.Duration)

GetBackoffForNextSchedule 获取下一次任务执行时间

func ResolveClusterParameter added in v1.1.0

func ResolveClusterParameter(params string) map[int]string

func SetDiscovery added in v1.1.0

func SetDiscovery(protocol string, creator func(*config.WorkerDiscoveryConfig) (WorkerDiscovery, error))

func ValidateHandler

func ValidateHandler(handler interface{}) error

func ValidateSchedule

func ValidateSchedule(cronSchedule string) error

ValidateSchedule 校验cron表达式格式是否准确

Types

type BaseHandler added in v1.1.0

type BaseHandler interface {
	Before(ctx context.Context) error
	Do(ctx context.Context) error
	After(ctx context.Context) error
	ErrorMessage(ctx context.Context, err error)
}

BaseHandler 任务处理程序接口

type DefaultRetryPolicy

type DefaultRetryPolicy struct {
	// contains filtered or unexported fields
}

func GetDefaultRetryPolicy

func GetDefaultRetryPolicy() *DefaultRetryPolicy

func (*DefaultRetryPolicy) ComputeNextDelay

func (d *DefaultRetryPolicy) ComputeNextDelay(numAttempts int) time.Duration

func (*DefaultRetryPolicy) SetBackoffCoefficient

func (d *DefaultRetryPolicy) SetBackoffCoefficient(b float64) *DefaultRetryPolicy

func (*DefaultRetryPolicy) SetInitialInterval

func (d *DefaultRetryPolicy) SetInitialInterval(t time.Duration) *DefaultRetryPolicy

func (*DefaultRetryPolicy) SetMaximumAttempts

func (d *DefaultRetryPolicy) SetMaximumAttempts(ma int) *DefaultRetryPolicy

func (*DefaultRetryPolicy) SetMaximumInterval

func (d *DefaultRetryPolicy) SetMaximumInterval(t time.Duration) *DefaultRetryPolicy

type DefaultWorkerInstance added in v1.1.0

type DefaultWorkerInstance struct {
	Id           string       `json:"id"`
	ServiceName  string       `json:"serviceName"`
	CreatedAt    int64        `json:"createdAt"`
	ShardIndex   int          `json:"shardIndex"`
	HealthStatus HealthStatus `json:"healthStatus"`
	Expiration   int64        `json:"expiration"` //秒,服务到期时间,暂只有redis集群模式下 使用
}

func (*DefaultWorkerInstance) GetCreatedAt added in v1.1.0

func (d *DefaultWorkerInstance) GetCreatedAt() int64

func (*DefaultWorkerInstance) GetExpiration added in v1.1.0

func (d *DefaultWorkerInstance) GetExpiration() int64

func (*DefaultWorkerInstance) GetHealthStatus added in v1.1.0

func (d *DefaultWorkerInstance) GetHealthStatus() HealthStatus

func (*DefaultWorkerInstance) GetId added in v1.1.0

func (d *DefaultWorkerInstance) GetId() string

func (*DefaultWorkerInstance) GetServiceName added in v1.1.0

func (d *DefaultWorkerInstance) GetServiceName() string

func (*DefaultWorkerInstance) GetShardIndex added in v1.1.0

func (d *DefaultWorkerInstance) GetShardIndex() int

func (*DefaultWorkerInstance) UpdateExpiration added in v1.1.0

func (d *DefaultWorkerInstance) UpdateExpiration(exp int64)

func (*DefaultWorkerInstance) UpdateHealthStatus added in v1.1.0

func (d *DefaultWorkerInstance) UpdateHealthStatus(hs HealthStatus)

func (*DefaultWorkerInstance) UpdateShardIndex added in v1.1.0

func (d *DefaultWorkerInstance) UpdateShardIndex(i int)

type DefaultWorkerInstanceSlice added in v1.1.0

type DefaultWorkerInstanceSlice []DefaultWorkerInstance

func (DefaultWorkerInstanceSlice) Len added in v1.1.0

func (DefaultWorkerInstanceSlice) Less added in v1.1.0

func (s DefaultWorkerInstanceSlice) Less(i, j int) bool

func (DefaultWorkerInstanceSlice) Swap added in v1.1.0

func (s DefaultWorkerInstanceSlice) Swap(i, j int)

type HealthStatus added in v1.1.0

type HealthStatus int
const (
	Green  HealthStatus = 0
	Yellow HealthStatus = 1
	Red    HealthStatus = 2
)

type Resolver added in v1.1.0

type Resolver struct {
	// contains filtered or unexported fields
}

func NewResolver added in v1.1.0

func NewResolver(worker *Worker, discovery WorkerDiscovery, shardingTotalCount int) *Resolver

func (*Resolver) Build added in v1.1.0

func (r *Resolver) Build() (err error)

func (*Resolver) Context added in v1.1.0

func (r *Resolver) Context() context.Context

func (*Resolver) Instance added in v1.1.0

func (r *Resolver) Instance() DefaultWorkerInstance

func (*Resolver) KeepaliveInterval added in v1.1.0

func (r *Resolver) KeepaliveInterval() time.Duration

func (*Resolver) UpdateShardIndex added in v1.1.0

func (r *Resolver) UpdateShardIndex(i int)

func (*Resolver) UpdateState added in v1.1.0

func (r *Resolver) UpdateState(s State)

func (*Resolver) WorkerRegister added in v1.1.0

func (r *Resolver) WorkerRegister() (err error)

func (*Resolver) WorkerUnregister added in v1.1.0

func (r *Resolver) WorkerUnregister()

type Retrier

type Retrier interface {
	NextBackOff() time.Duration
	TryAgain() bool
	Reset()
}

type RetryPolicy

type RetryPolicy interface {
	ComputeNextDelay(numAttempts int) time.Duration
}

type State added in v1.1.0

type State struct {
	Instances []DefaultWorkerInstance
}

type Stater

type Stater interface {
	Handle()
	NextStage()
}

type WatchHandler added in v1.1.0

type WatchHandler interface {
	BaseHandler
	Watch(info WatchInfo)
}

type WatchInfo added in v1.1.0

type WatchInfo struct {
	CurrentInstance  DefaultWorkerInstance
	Param            string
	ClusterInstances []DefaultWorkerInstance
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker 任务核心类

func NewWorker

func NewWorker(ctx context.Context, cfg config.WorkerConfig, hd BaseHandler, opt ...WorkerOption) *Worker

NewWorker 获取任务类

func (*Worker) Cancel

func (w *Worker) Cancel()

Cancel 取消任务

func (*Worker) ChangeState

func (w *Worker) ChangeState(state Stater)

func (*Worker) GetInstance added in v1.1.0

func (w *Worker) GetInstance() DefaultWorkerInstance

GetInstance

func (*Worker) Run

func (w *Worker) Run() error

Run 启动服务

func (*Worker) UpdateInstance added in v1.1.0

func (w *Worker) UpdateInstance(ins DefaultWorkerInstance)

GetWorkerInstance

func (*Worker) UpdateShardIndex added in v1.1.0

func (w *Worker) UpdateShardIndex(index int)

GetWorkerInstance

func (*Worker) Validate

func (w *Worker) Validate() error

func (*Worker) Wait

func (w *Worker) Wait() error

Wait block住代码继续执行,直到发生以下几种情况: 1.接收到上下文超时或取消 2.任务执行完成

type WorkerConn added in v1.1.0

type WorkerConn interface {
	UpdateState(state State)

	UpdateShardIndex(index int)

	Context() context.Context

	KeepaliveInterval() time.Duration

	Instance() DefaultWorkerInstance
}

type WorkerDiscovery added in v1.1.0

type WorkerDiscovery interface {
	Register(instance DefaultWorkerInstance, w WorkerConn, shardingTotalCount int) error

	Reset(instance DefaultWorkerInstance, w WorkerConn, shardingTotalCount int) error

	Unregister(instance DefaultWorkerInstance) error

	Build(w WorkerConn) error

	Keepalive(w WorkerConn) (HealthStatus, error)
}

func GetDiscovery added in v1.1.0

func GetDiscovery(wdc *config.WorkerDiscoveryConfig) (WorkerDiscovery, error)

type WorkerModel added in v1.1.0

type WorkerModel int
const (
	Local   WorkerModel = 1
	Cluster WorkerModel = 2
)

type WorkerOption

type WorkerOption interface {
	// contains filtered or unexported methods
}

func WithCronSchedule

func WithCronSchedule(cron string) WorkerOption

WithCronSchedule cron表达式

func WithModel added in v1.1.0

func WithModel(model WorkerModel) WorkerOption

WithModel worker执行模式

func WithParams added in v1.1.0

func WithParams(params string) WorkerOption

WithParams handle执行时可传递的参数

func WithRetryPolicy

func WithRetryPolicy(rp RetryPolicy) WorkerOption

WithRetryPolicy 自定义重试规则

func WithServiceName added in v1.1.0

func WithServiceName(sn string) WorkerOption

WithModel worker执行模式

func WithShardingTotalCount added in v1.1.0

func WithShardingTotalCount(count int) WorkerOption

WithShardingTotalCount worker分片数

type WorkerState

type WorkerState int
const (
	// Initialized worker初始化完成
	Initialized WorkerState = iota
	// Pending 表示worker已创建,但未开始处理handler
	Pending
	// Running 表示worker已绑定一个process,正在处理
	Running
	// Shutdown 表示worker已结束
	Shutdown
)

type WorkerStateInitialized

type WorkerStateInitialized struct {
	// contains filtered or unexported fields
}

func (*WorkerStateInitialized) Handle

func (s *WorkerStateInitialized) Handle()

func (*WorkerStateInitialized) NextStage

func (s *WorkerStateInitialized) NextStage()

type WorkerStatePending

type WorkerStatePending struct {
	// contains filtered or unexported fields
}

func (*WorkerStatePending) Handle

func (s *WorkerStatePending) Handle()

func (*WorkerStatePending) NextStage

func (s *WorkerStatePending) NextStage()

type WorkerStateRunning

type WorkerStateRunning struct {
	// contains filtered or unexported fields
}

func (*WorkerStateRunning) Handle

func (s *WorkerStateRunning) Handle()

func (*WorkerStateRunning) NextStage

func (s *WorkerStateRunning) NextStage()

type WorkerStateShutdown

type WorkerStateShutdown struct {
	// contains filtered or unexported fields
}

func (*WorkerStateShutdown) Handle

func (s *WorkerStateShutdown) Handle()

func (*WorkerStateShutdown) NextStage

func (s *WorkerStateShutdown) NextStage()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL