xmachinery

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2021 License: Apache-2.0 Imports: 14 Imported by: 0

README

xmachinery

介绍

xmachinery 是github.com/RichardKnop/machinery 的扩展包,添加定时任务管理API,扩展支持广播任务

核心代码

定时任务管理
func (server *XServer) registerScheduledTask(task ScheduledTask) error {
	//检查cron表达式
	schedule, err := secondsParser.Parse(task.Spec)
	if err != nil {
		return err
	}
	//移除旧任务
	server.removeScheduledTask(task.Id)
	//包装任务函数
	f := func() {
		//抢占任务锁
		err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1)
		if err != nil {
			return
		}
		//创建任务签名
		signature := task.Signature()
		//发送任务
		_, err = server.SendTask(signature)
		if err != nil {
			log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error())
		}
	}
	//添加定时任务
	entryId, err := server.scheduler.AddFunc(task.Spec, f)
	newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...)
	newTask.entryId = entryId
	//存储定时任务信息
	server.registeredScheduledTasks[task.Id] = newTask
	return err
}
广播任务broker
func (b *BrokerBroadcast) nextBroadCastTask(queue string) (result []byte, err error) {
	//默认拉取消息间隔为1000ms
	pollPeriodMilliseconds := 1000
	if b.GetConfig().Redis != nil {
		configuredPollPeriod := b.GetConfig().Redis.NormalTasksPollPeriod
		if configuredPollPeriod > 0 {
			pollPeriodMilliseconds = configuredPollPeriod
		}
	}
	pollPeriod := time.Duration(pollPeriodMilliseconds) * time.Millisecond

	if b.lastBroadcastMsgId == "" {
		//若客户端记录的最新消息id为空,则从流中最新的消息id
		msgs, err := b.rclient.XRevRangeN(context.Background(), queue, "+", "-", 1).Result()
		if err != nil {
			return []byte{}, err
		}
		if len(msgs) == 0 {
			b.lastBroadcastMsgId = "0"
			return []byte{}, redis.Nil
		}
		b.lastBroadcastMsgId = msgs[0].ID
	}

	//消费广播消息,一次一条
	streams, err := b.rclient.XRead(context.Background(), &redis.XReadArgs{
		Streams: []string{queue, b.lastBroadcastMsgId},
		Count:   1,
		Block:   pollPeriod,
	}).Result()
	if err != nil {
		return []byte{}, err
	}

	if len(streams) == 0 || len(streams[0].Messages) == 0 {
		return []byte{}, redis.Nil
	}
	msg := streams[0].Messages[0]
	b.lastBroadcastMsgId = msg.ID
	signatureV := msg.Values[defaultStreamBroadcastMsgKey]
	signatureStr, ok := signatureV.(string)
	if ok {
		return []byte(signatureStr), nil
	}

	return []byte{}, fmt.Errorf("not support msg type")
}
func (b *BrokerBroadcast) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
	...

	// 监听广播任务的协程
	// 任务到达时直接投递给消费者
	b.broadcastWG.Add(1)
	go func() {
		defer b.broadcastWG.Done()

		for {
			select {
			// 监听消费者是否结束
			case <-b.GetStopChan():
				return
			default:
				task, _ := b.nextBroadCastTask(b.redisBroadcastTasksKey)
				if len(task) > 0 {
					deliveries <- task
				}
			}
		}
	}()

	...
}

示例

定时任务管理api
package main

import (
	"fmt"
	"gitee.com/sqxwww/xmachinery"
	"github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	"github.com/RichardKnop/machinery/v2/config"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/tasks"
)

var server *xmachinery.XServer

func init() {
	server, _ = startServer()
}

func main() {
	server.RegisterScheduledTask(&xmachinery.ScheduledTask{
		Id:       "countDoneScheduler",
		TaskCode: "countDown",
		Spec:     "0/2 * * * * ?",
		Args:     []tasks.Arg{{Type: "int", Value: 5}},
	})
	worker()
}

func countDown(count int) error {
	if count <= 0 {
		fmt.Println("removing countDoneScheduler")
		//移除定时任务
		server.RemoveScheduledTask("countDoneScheduler")
		return nil
	}
	fmt.Println("current count is ", count)
	count--
	//替换定时任务
	server.RegisterScheduledTask(&xmachinery.ScheduledTask{
		Id:       "countDoneScheduler",
		TaskCode: "countDown",
		Spec:     "0/2 * * * * ?",
		Args:     []tasks.Arg{{Type: "int", Value: count}},
	})
	return nil
}

func startServer() (*xmachinery.XServer, error) {
	cnf := &config.Config{
		DefaultQueue:    "machinery_tasks",
		ResultsExpireIn: 3600,
		Redis: &config.RedisConfig{
			MaxIdle:                3,
			IdleTimeout:            240,
			ReadTimeout:            15,
			WriteTimeout:           15,
			ConnectTimeout:         15,
			NormalTasksPollPeriod:  1000,
			DelayedTasksPollPeriod: 500,
		},
	}

	broker := redisbroker.New(cnf, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	tmp := machinery.NewServer(cnf, broker, backend, lock)
	server := xmachinery.NewServer(tmp)

	tasks := map[string]interface{}{
		"countDown": countDown,
	}

	return server, server.RegisterTasks(tasks)
}

func worker() error {
	consumerTag := "machinery_worker"
	worker := server.NewWorker(consumerTag, 0)
	return worker.Launch()
}

广播任务broker
package main

import (
	"fmt"
	broadcastbroker "gitee.com/sqxwww/xmachinery/brokers/redis"
	"github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	"github.com/RichardKnop/machinery/v2/config"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/tasks"
	"sync"
	"time"
)

var server *machinery.Server

func init() {
	server, _ = startServer()
}

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker()
		}()
	}
	time.Sleep(time.Second)
	singature, _ := tasks.NewSignature("hello", nil)
	//设置广播任务头
	singature.Headers = map[string]interface{}{"broadcastTask": ""}
	server.SendTask(singature)
	wg.Wait()
}

func startServer() (*machinery.Server, error) {
	cnf := &config.Config{
		DefaultQueue:    "machinery_tasks",
		ResultsExpireIn: 3600,
		Redis: &config.RedisConfig{
			MaxIdle:                3,
			IdleTimeout:            240,
			ReadTimeout:            15,
			WriteTimeout:           15,
			ConnectTimeout:         15,
			NormalTasksPollPeriod:  1000,
			DelayedTasksPollPeriod: 500,
		},
	}
	//使用支持广播的broker
	broker := broadcastbroker.New(cnf, []string{"localhost:6379"}, 0)
	backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	server := machinery.NewServer(cnf, broker, backend, lock)

	tasks := map[string]interface{}{
		"hello": func() error {
			fmt.Println("hello broadcast broker")
			return nil
		},
	}

	return server, server.RegisterTasks(tasks)
}

func worker() error {
	consumerTag := "machinery_worker"
	worker := server.NewWorker(consumerTag, 0)
	errorsChan := make(chan error)

	worker.LaunchAsync(errorsChan)

	return <-errorsChan
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ScheduledTask

type ScheduledTask struct {
	Id        string                 //定时任务id
	TaskCode  string                 //machinery任务编码
	Spec      string                 //cron表达式
	TaskQueue string                 //任务队列(空为默认)
	Headers   map[string]interface{} //任务签名头
	Args      []tasks.Arg            //任务参数
	// contains filtered or unexported fields
}

func NewScheduledTask

func NewScheduledTask(id, taskCode, spec, taskQueue string, args ...tasks.Arg) *ScheduledTask

func (*ScheduledTask) Equal

func (task *ScheduledTask) Equal(other *ScheduledTask) bool

func (*ScheduledTask) Signature

func (task *ScheduledTask) Signature() *tasks.Signature

type ServerRef

type ServerRef struct {
	// contains filtered or unexported fields
}

type XServer

type XServer struct {
	sync.Mutex
	*machinery.Server
	// contains filtered or unexported fields
}

func NewServer

func NewServer(server *machinery.Server) *XServer

通过原生server构建xserver

func (*XServer) RegisterScheduledTask

func (server *XServer) RegisterScheduledTask(task *ScheduledTask) error

注册定时任务

func (*XServer) RegisterScheduledTasks

func (server *XServer) RegisterScheduledTasks(tasks []*ScheduledTask) error

批量注册定时任务

func (*XServer) ReloadScheduledTasks

func (server *XServer) ReloadScheduledTasks(tasks []*ScheduledTask) error

重载所有定时任务

func (*XServer) RemoveScheduledTask

func (server *XServer) RemoveScheduledTask(id string)

移除定时任务

Directories

Path Synopsis
brokers
redis
* 用于关联github.com/RichardKnop/machinery/v2/brokers/redis包中的函数和方法
* 用于关联github.com/RichardKnop/machinery/v2/brokers/redis包中的函数和方法
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL