rmq

package module
v1.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2023 License: MIT Imports: 17 Imported by: 0

README

Rmq

概述

一个用go写的redis消息队列

Import

go get github.com/lpong/rmq

https://github.com/lpong/rmq 欢迎提交 Issues 和 Pull Requests

创建Task

// Test1 ,仅一个函数
func Test1(ctx context.Context, msg *rmq.Message) (result string, err error) {
    return
}

// TestTask,实现 rmq.Task
type TestTask struct {
    Name string `json:"name"`
    Val  int    `json:"val"`
}
func (t *TestTask) TaskName() string {
    // TODO implement me
    return "TestTask"
}

func (t *TestTask) Run(ctx context.Context) (result string, err error) {
    return "ok", nil
}

// 自带的Task
rmq.CommandTask 执行一个系统命令
rmq.HttpTask: 执行一个http请求

// 定义解析task的数据,默认使用json.Unmarshal()
type TaskScanner interface {
    Scan(src []byte) error
}

// 定义task的数据序列号方式,默认json.Marshal()
type TaskValuer interface {
    Value() ([]byte, error)
}

// OnLoad 加载时执行的方法
type OnLoad interface {
    Load(ctx context.Context, msg *Message) error
}

// OnSuccess 执行成功时执行的方法
type OnSuccess interface {
    OnSuccess(ctx context.Context)
}

// OnFail 执行失败时执行的方法
type OnFail interface {
    OnFail(ctx context.Context)
}

// OnComplete 执行完成时会调用
type OnComplete interface {
    OnComplete(ctx context.Context)
}

创建消息

msg, err := rmq.NewMsg().SetTask(&TestTask{
    Name: "name-1",
    Val:  1,
})

// 定制消息
msg := rmq.NewMsg() // OR msg := rmq.NewBlankMsg()
msg.SetMeta(rmq.RetryMeta).SetDelay(3 * time.Minute).SetMaxRetry(1).SetTraceId("traceId").SetTimeout(30 * time.Second).SetExpire(30 * time.Second).SetExpiredAt(time.Now().Add(1 * time.Hour))

// 该消息要执行的任务
msg.SetRawTask("test1", map[string]any{"x": 1})
// or 
msg.SetTask(&TestTask{
    Name: fmt.Sprintf("testTask-%d", i),
    Val:  i * i,
})

queue.Push(msg)

创建队列

rdb := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
})

log := rmq.DefaultLog
broker := rmq.NewRedisBroker(rdb, rmq.DefaultRedisBrokerConfig, log)
queue = rmq.NewRmq(broker, log)
queue.RegisterFunc("test1", Test1)
queue.Register(&TestTask{})

消费

queue.StartWorker(&rmq.WorkerConfig{
    WorkerNum:  2, //多pod环境下,建议设置为1,主要影响获取消息的速度
    Concurrent: 20, //同时执行的任务数量,多个协程并发执行任务数量
})

Rmq Hook


// 可选,返回false将不会push消息
queue.Hook.OnPush(func(ctx context.Context, msg ...*rmq.Message) ([]*rmq.Message, error) {
    return msg,nil
})

// 任务开始执行时调用,注意,返回error将取消任务执行
queue.Hook.OnRun(func(ctx context.Context, r *rmq.TaskRuntime) error {
    fmt.Println("任务开始:", runtime.NumGoroutine())
    return nil
})

// 任务执行完成时调用
queue.Hook.OnComplete(func(ctx context.Context, r *rmq.TaskRuntime) error {
    fmt.Println("任务结束:", runtime.NumGoroutine())
    fmt.Println(rmq.Json(r))
    return nil
})

// 任务开始,创建Context时调用
queue.Hook.OnContext(func(ctx context.Context, r *rmq.TaskRuntime) context.Context {
    return context.WithValue(ctx, "x_trace_id", r.Msg.Meta.TraceId)
})

Task Hook

func (t *TestTask) OnSuccess(ctx context.Context) {
    fmt.Println(t.Name, "Success hook")
}

func (t *TestTask) OnFail(ctx context.Context) {
    // TODO implement me
    fmt.Println(t.Name, "Fail hook")
}

func (t *TestTask) OnComplete(ctx context.Context) {
    // TODO implement me
    fmt.Println(t.Name, "Complete hook")
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultLog = newDefaultLog()
View Source
var DefaultMeta = Meta{
	RetryRule: nil,
	Retry:     [2]int{0, 0},
	Delay:     0,
	Timeout:   30,
	TraceId:   "",
}
View Source
var DefaultRedisBrokerConfig = RedisBrokerConfig{
	Key:               "rmq:queue",
	WaitDuration:      1 * time.Second,
	DelayKey:          "rmq:queue:delay",
	DelayWaitDuration: 4 * time.Second,
}
View Source
var DefaultRetryRule = []int{
	60 * 1,
	60 * 2,
	60 * 5,
	60 * 15,
	60 * 60,
	60 * 60 * 3,
}
View Source
var RetryMeta = Meta{
	RetryRule: DefaultRetryRule,
	Retry:     [2]int{0, 7},
	Delay:     0,
	Timeout:   30,
	TraceId:   "",
}

Functions

func BrokerHookProtect

func BrokerHookProtect(ctx context.Context, f func(ctx context.Context) error) (err error)

func Json

func Json(v any) string

func Protect

func Protect(f func() error) (err error)

Types

type BatchError

type BatchError struct {
	// contains filtered or unexported fields
}

func (*BatchError) HasError

func (b *BatchError) HasError() bool

type Broker

type Broker interface {
	Encode(*Message) ([]byte, error)
	Decode(bytes []byte) (msg *Message, err error)

	Push(context.Context, ...*Message) error
	Pop(ctx context.Context) (msg *Message, err error)
}

type BrokerAfterExit

type BrokerAfterExit interface {
	AfterExit(ctx context.Context) error
}

type BrokerAfterStart

type BrokerAfterStart interface {
	AfterStart(ctx context.Context) error
}

type BrokerBeforeExit

type BrokerBeforeExit interface {
	BeforeExit(ctx context.Context) error
}

type BrokerBeforeStart

type BrokerBeforeStart interface {
	BeforeStart(ctx context.Context) error
}

type Callback

type Callback func(ctx context.Context, msg *Message) (string, error)

type CommandTask

type CommandTask struct {
	Shell   string
	Command []string
}

func NewCommandTask

func NewCommandTask(shell string, command ...string) *CommandTask

func (*CommandTask) Run

func (c *CommandTask) Run(ctx context.Context) (result string, err error)

func (*CommandTask) TaskName

func (c *CommandTask) TaskName() string

type Hook

type Hook struct {
	// contains filtered or unexported fields
}

func (*Hook) OnComplete

func (h *Hook) OnComplete(v func(ctx context.Context, runtime *TaskRuntime) error)

func (*Hook) OnContext

func (h *Hook) OnContext(v func(ctx context.Context, runtime *TaskRuntime) context.Context)

func (*Hook) OnPush

func (h *Hook) OnPush(v func(ctx context.Context, msg ...*Message) ([]*Message, error))

func (*Hook) OnRun

func (h *Hook) OnRun(v func(ctx context.Context, runtime *TaskRuntime) error)

type HttpTask

type HttpTask struct {
	Url    string            `json:"url"`
	Method string            `json:"method"`
	Header map[string]string `json:"header,omitempty"`
	Body   json.RawMessage   `json:"body,omitempty"` // 因为json序列号,保证消息好看一点
	// contains filtered or unexported fields
}

func NewHttpTaskGet

func NewHttpTaskGet(format string, arg ...any) *HttpTask

func NewHttpTaskJsonPost

func NewHttpTaskJsonPost(url string, data any) *HttpTask

func NewHttpTaskPostPostForm

func NewHttpTaskPostPostForm(url string, data url.Values) *HttpTask

func (*HttpTask) Load

func (h *HttpTask) Load(ctx context.Context, msg *Message) (err error)

func (*HttpTask) Message

func (h *HttpTask) Message() (msg *Message, err error)

func (*HttpTask) Run

func (h *HttpTask) Run(ctx context.Context) (result string, err error)

func (*HttpTask) Scan

func (h *HttpTask) Scan(src []byte) (err error)

func (*HttpTask) SetBody

func (h *HttpTask) SetBody(data []byte) *HttpTask

func (*HttpTask) SetHeader

func (h *HttpTask) SetHeader(k, v string) *HttpTask

func (*HttpTask) SetHeaders

func (h *HttpTask) SetHeaders(headers map[string]string) *HttpTask

func (*HttpTask) SetMethod

func (h *HttpTask) SetMethod(method string) *HttpTask

func (*HttpTask) TaskName

func (h *HttpTask) TaskName() string

type Logger

type Logger interface {
	Errorf(format string, arg ...interface{})
	Warningf(format string, args ...interface{})
	Infof(format string, arg ...interface{})
}

type Message

type Message struct {
	Id        string          `json:"id"`
	Task      string          `json:"task"`
	Data      json.RawMessage `json:"data"`
	RunAt     Timestamp       `json:"run_at"`     // 应执行时间
	ExpiredAt Timestamp       `json:"expired_at"` // 过期时间
	CreatedAt Timestamp       `json:"created_at"` // 创建时间
	Meta      Meta            `json:"meta,omitempty"`
}

func NewBlankMsg

func NewBlankMsg() *Message

func NewMsg

func NewMsg() *Message

func (*Message) SetData

func (m *Message) SetData(data any) *Message

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration) *Message

func (*Message) SetExpire

func (m *Message) SetExpire(d time.Duration) *Message

func (*Message) SetExpiredAt

func (m *Message) SetExpiredAt(t time.Time) *Message

func (*Message) SetMaxRetry

func (m *Message) SetMaxRetry(retry int) *Message

func (*Message) SetMeta

func (m *Message) SetMeta(meta Meta) *Message

func (*Message) SetRawData

func (m *Message) SetRawData(data json.RawMessage) *Message

func (*Message) SetRawTask added in v1.1.3

func (m *Message) SetRawTask(name string, data any) (msg *Message, err error)

func (*Message) SetRetryRule

func (m *Message) SetRetryRule(rule []int) *Message

func (*Message) SetTask

func (m *Message) SetTask(task Task) (message *Message, err error)

func (*Message) SetTimeout

func (m *Message) SetTimeout(t time.Duration) *Message

func (*Message) SetTraceId

func (m *Message) SetTraceId(traceId string) *Message

func (*Message) String

func (m *Message) String() string

func (*Message) TryRetry

func (m *Message) TryRetry(delay time.Duration) *Message

type Messages

type Messages []*Message

type Meta

type Meta struct {
	RetryRule []int  `json:"retry_rule,omitempty"` // 重试规则,单位秒
	Retry     [2]int `json:"retry,omitempty"`      // 当前执行次数,总共重试测试
	Delay     int    `json:"delay,omitempty"`      // 延迟时间
	Timeout   int    `json:"timeout,omitempty"`    // 超时时间,单位秒
	TraceId   string `json:"trace_id,omitempty"`   // 用于打通trace
}

type OnComplete

type OnComplete interface {
	OnComplete(ctx context.Context)
}

OnComplete 执行完成时会调用

type OnFail

type OnFail interface {
	OnFail(ctx context.Context)
}

OnFail 执行失败时执行的方法

type OnLoad

type OnLoad interface {
	Load(ctx context.Context, msg *Message) error
}

OnLoad 加载时执行的方法

type OnSuccess

type OnSuccess interface {
	OnSuccess(ctx context.Context)
}

OnSuccess 执行成功时执行的方法

type Process

type Process interface {
	Exec(ctx context.Context, msg *TaskRuntime) (err error)
}

type Producer

type Producer interface {
	Push(ctx context.Context)
}

type RedisBroker

type RedisBroker struct {
	// contains filtered or unexported fields
}

func NewRedisBroker

func NewRedisBroker(rd *redis.Client, c RedisBrokerConfig, log Logger) *RedisBroker

func (*RedisBroker) AfterExit

func (r *RedisBroker) AfterExit(ctx context.Context) error

func (*RedisBroker) AfterStart

func (r *RedisBroker) AfterStart(ctx context.Context) error

func (*RedisBroker) BeforeExit

func (r *RedisBroker) BeforeExit(ctx context.Context) error

func (*RedisBroker) BeforeStart

func (r *RedisBroker) BeforeStart(ctx context.Context) error

func (*RedisBroker) Decode

func (r *RedisBroker) Decode(bytes []byte) (msg *Message, err error)

func (*RedisBroker) Encode

func (r *RedisBroker) Encode(msg *Message) ([]byte, error)

func (*RedisBroker) Pop

func (r *RedisBroker) Pop(ctx context.Context) (msg *Message, err error)

Pop 获取message

func (*RedisBroker) Push

func (r *RedisBroker) Push(ctx context.Context, msg ...*Message) (err error)

Push 批量写入消息

type RedisBrokerConfig

type RedisBrokerConfig struct {
	Key          string        `json:"key" toml:"key" yaml:"key"`
	WaitDuration time.Duration `json:"wait_duration" toml:"wait_duration" yaml:"wait_duration"`

	DelayKey          string        `json:"delay_key" toml:"delay_key" yaml:"delay_key"`
	DelayWaitDuration time.Duration `json:"delay_wait_duration" yaml:"delay_wait_duration" json:"delay_wait_duration"`
}

type Register

type Register struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Register) AllTask

func (r *Register) AllTask() map[string]*TaskInfo

func (*Register) CreateTask

func (r *Register) CreateTask(name string) (task Task, v *TaskInfo, err error)

func (*Register) Register

func (r *Register) Register(v Task)

func (*Register) RegisterFunc

func (r *Register) RegisterFunc(name string, callback Callback)

type Rmq

type Rmq struct {

	// 钩子
	Hook *Hook
	// contains filtered or unexported fields
}

func NewRmq

func NewRmq(broker Broker, logger Logger) *Rmq

NewRmq 创建新队列

func (*Rmq) Exit

func (q *Rmq) Exit()

Exit 退出

func (*Rmq) Push

func (q *Rmq) Push(ctx context.Context, msg ...*Message) (err error)

Push 写入消息到队列

func (*Rmq) Register

func (q *Rmq) Register(task ...Task)

func (*Rmq) RegisterFunc

func (q *Rmq) RegisterFunc(name string, callback Callback)

func (*Rmq) SetBroker

func (q *Rmq) SetBroker(b Broker)

SetBroker 自定义处理引擎

func (*Rmq) SetProcess

func (q *Rmq) SetProcess(p Process)

SetProcess 自定义处理引擎

func (*Rmq) StartWorker

func (q *Rmq) StartWorker(c *WorkerConfig)

StartWorker 线上是多容器的,不用多个协程并发跑,只要加pod就行 某前理论存在丢失消息的可能,所以只能用于不重要的任务

func (*Rmq) Tasks

func (q *Rmq) Tasks() map[string]*TaskInfo

func (*Rmq) TryRetry

func (q *Rmq) TryRetry(ctx context.Context, msg *Message) (err error)

TryRetry 尝试重试

func (*Rmq) TryRun

func (q *Rmq) TryRun(ctx context.Context, msg *Message)

TryRun 解析消息,执行

type Task

type Task interface {
	TaskName() string
	Run(ctx context.Context) (result string, err error)
}

Task 自动实例化的Task

type TaskInfo

type TaskInfo struct {
	Name        string
	IsCallback  bool
	ReflectType reflect.Type
	Callback    Callback
}

type TaskName

type TaskName interface {
	TaskName() string
}

type TaskRuntime

type TaskRuntime struct {
	Msg       *Message      `json:"msg"`
	StartTime time.Time     `json:"start_time"`
	EndTime   time.Time     `json:"end_time"`
	Duration  time.Duration `json:"duration"`
	TaskError error         `json:"task_error,omitempty"` // 执行错误
	Error     error         `json:"error,omitempty"`      // 最后的错误
	Result    string        `json:"result,omitempty"`     // 结果
}

func (*TaskRuntime) IsSuccess

func (a *TaskRuntime) IsSuccess() bool

type TaskScanner

type TaskScanner interface {
	Scan(src []byte) error
}

type TaskValuer

type TaskValuer interface {
	Value() ([]byte, error)
}

type Timestamp

type Timestamp int64

func NewTimestamp

func NewTimestamp(t time.Time) Timestamp

func Now

func Now() Timestamp

func (Timestamp) Add

func (t Timestamp) Add(d time.Duration) Timestamp

func (Timestamp) Date

func (t Timestamp) Date() string

func (Timestamp) DateTime

func (t Timestamp) DateTime() string

func (Timestamp) Format

func (t Timestamp) Format(layout string) string

func (Timestamp) Time

func (t Timestamp) Time() time.Time

type WorkerConfig

type WorkerConfig struct {
	// 配置信息
	WorkerNum  int
	Concurrent int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL