event

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2018 License: MIT Imports: 14 Imported by: 0

README

wppurking/event GoDoc

wppurking/event 被设计为针对后端 RabbitMQ 的 Event 消息处理框架. 虽然可以涵盖部分 background job 的范畴, 但其核心为尽可能的利用 RabbitMQ 本身的特性用于处理 JSON 格式的 Message 消息. 代码 fork 自 gocraft/work, 其设计与 Ruby 环境的 HutchHutch-schedule 进行合作沟通.

Tips

后端任务需要的功能
  1. 某种后端消息格式, 通过一稳定后端中间件进行分布式的任务处理
  2. 可以设置延迟任务
  3. 可以设置唯一任务
  4. 可以设置 cron 任务
  5. 可以进行总并发量控制, 同时可以在总并发量之下, 控制单个任务并发
  6. 任务错误重试
  7. 重试过多后, 可查看死亡任务, 或者可以手动重新投递
  8. 拥有一个 UI 界面可查看内部执行情况
  9. 任务可以拥有优先级
Event 的涵盖的功能
  1. 通过 RabbitMQ 以及纯粹自定义的 JSON Format message, 进行分布式任务处理
  2. 可以设置延迟的 event 消息处理, 由 RabbitMQ 负责(fixed delay level)
  3. 可以进行总并发量控制, 同时可以在总并发量之下, 控制单个 Consumer 的并发
  4. Event 消息处理失败后重试. RabbitMQ 负责信息记录
  5. Event 消息失败过多后进入 Dead 队列. (非 RabbitMQ 的 DLX, 仅仅是 Dead 队列)
  6. 利用 RabbitMQ 的 UI 面板查看任务处理情况以及速度
  7. 默认与 Hutch-schedule 兼容, 做到 Ruby/Golang 两端消息可相互传递消息各自执行
现在的开源的后端任务项目介绍

下面列举了不同的后端任务/消息处理的框架, 可以根据需要进行选择

产品 后端 描述
gocraft/work redis golang 独立运行. 功能完备
go-workers redis golang 运行, 兼容 Sidekiq 消息格式
iamduo/workq self server golang 运行, 独立的后端任务系统
gocelery redis, rabbitmq golang 运行, 兼容 Celery 消息格式
hutch rabbitmq ruby 运行, 纯粹 json 消息格式
wppurking/event rabbitmq golang 运行, 纯粹 json 消息格式
如何使用

具体使用的例子可以参考 eventfakedata 例子

Special Features

Contexts

Just like in gocraft/web, gocraft/work lets you use your own contexts. Your context can be empty or it can have various fields in it. The fields can be whatever you want - it's your type! When a new job is processed by a worker, we'll allocate an instance of this struct and pass it to your middleware and handlers. This allows you to pass information from one middleware function to the next, and onto your handlers.

Custom contexts aren't really needed for trivial example applications, but are very important for production apps. For instance, one field in your context can be your tagged logger. Your tagged logger augments your log statements with a job-id. This lets you filter your logs by that job-id.

Scheduled Jobs

You can schedule jobs to be executed in the future. To do so, make a new Enqueuer and call its EnqueueIn method:

publisher := event.NewPublisher("my_app_namespace", cony.URL(*rabbitMqURL))
secondsInTheFuture := 300
_, err := publisher.PublishIn("app.prod.send_welcome_email", secondsInTheFuture, event.Q{"address": "test@example.com"})
Unique Jobs

当前设计为 Event 的消息处理框架, 而不是后端任务, 同时 RabbitMQ 在集群环境下也无法保证消息的唯一. 所以:

  1. 如果需要消息唯一, 请在用户端进行处理. 例如将状态记录在 DB.
  2. 将 event 的消息处理设计成为等幂处理, 多次执行得到同样的结果.

Design and concepts

Enqueueing jobs
  • When jobs are enqueued, they're serialized with JSON and added to a simple Redis list with LPUSH.
  • Jobs are added to a list with the same name as the job. Each job name gets its own queue. Whereas with other job systems you have to design which jobs go on which queues, there's no need for that here.
Processing a job
  • To process a job, a worker will execute a Lua script to atomically move a job its queue to an in-progress queue.
    • A job is dequeued and moved to in-progress if the job queue is not paused and the number of active jobs does not exceed concurrency limit for the job type
  • The worker will then run the job and increment the job lock. The job will either finish successfully or result in an error or panic.
    • If the process completely crashes, the reaper will eventually find it in its in-progress queue and requeue it.
  • If the job is successful, we'll simply remove the job from the in-progress queue.
  • If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue.
Workers and WorkerPools
  • WorkerPools provide the public API of gocraft/work.
    • You can attach jobs and middleware to them.
    • You can start and stop them.
    • Based on their concurrency setting, they'll spin up N worker goroutines.
  • Each worker is run in a goroutine. It will get a job from redis, run it, get the next job, etc.
    • Each worker is independent. They are not dispatched work -- they get their own work.
Retry job, scheduled jobs, and the requeuer
  • In addition to the normal list-based queues that normal jobs live in, there are two other types of queues: the retry queue and the scheduled job queue.
  • Both of these are implemented as Redis z-sets. The score is the unix timestamp when the job should be run. The value is the bytes of the job.
  • The requeuer will occasionally look for jobs in these queues that should be run now. If they should be, they'll be atomically moved to the normal list-based queue and eventually processed.
Dead jobs
  • 当 Message 被重新尝试超过一定数量后, 将会被投递到 Dead Queue 中
  • Dead Queue 是 RabbitMQ 中一个有限长度的 Queue 以及有 TTL 长的 Queue, 两种任意一个到达都将会清理 Dead Queue
  • 想查看或者重新投递, 通过 RabbitMQ 的 management UI

Consumer 级别的并发控制

  • 你可以通过 ConsumerOptions{MaxConcurrency: <num>} 来控制并发, 同时也注意 RabbitMQ 中的 Prefetch
  • 与 WorkerPool 的并发量不同, 这个用于并发在单个 redis 实例中, 有多少个 gorouting 可以同时运行(最多不超过 WorkerPool 的总量)
  • 用于控制任务并发的参数记录在为每一个 Consumer 对应的 consumerType.run
  • 默认的控制并发为 0, 意味着 当前 consume 没有限制
  • 注意 如果你想设置 consumer 为 "单线程" 那么你可以设置 MaxConcurrency 如下:
      worker_pool.ConsumerWithOptions(routingKey, ConsumerOptions{Prefetch: 30, MaxConcurrency: 5}, (*Context).WorkFxn)

Benchmarks

The benches folder contains various benchmark code. In each case, we enqueue 100k jobs across 5 queues. The jobs are almost no-op jobs: they simply increment an atomic counter. We then measure the rate of change of the counter to obtain our measurement.

Library Speed
gocraft/work/wppurking/event 20944 jobs/s
jrallison/go-workers 19945 jobs/s
benmanns/goworker 10328.5 jobs/s
albrow/jobs 40 jobs/s

Authors

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DelaySecondsLevel added in v0.6.0

func DelaySecondsLevel(seconds int) int

DelaySecondsLevel 根据传入的 seconds 返回具体延迟的 level 时间 5s 10s 20s 30s 60s 120s 180s 240s 300s 360s 420s 480s 540s 600s 1200s 1800s 2400s 3600s 7200s 10800s

Types

type BackoffCalculator added in v0.6.0

type BackoffCalculator func(msg *Message) int64

You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.

The builtin backoff calculator provides an exponentially increasing wait function.

type ConsumerOptions added in v0.6.0

type ConsumerOptions struct {
	QueueName      string            // 是否从指定的一个 Queue 中获取消息
	Prefetch       int               // 指定队列的 Prefetch 数量
	Priority       uint              // Priority from 1 to 10000
	MaxFails       uint              // 1: send straight to dead (unless SkipDead)
	SkipDead       bool              // If true, don't send failed jobs to the dead queue when retries are exhausted.
	MaxConcurrency uint              // Max number of jobs to keep in flight (default is 0, meaning no max)
	Backoff        BackoffCalculator // If not set, uses the default backoff algorithm
}

ConsumerOptions can be passed to ConsumerWithOptions.

type GenericHandler

type GenericHandler func(*Message) error

GenericHandler is a job handler without any custom context.

type GenericMiddlewareHandler

type GenericMiddlewareHandler func(*Message, NextMiddlewareFunc) error

GenericMiddlewareHandler is a middleware without any custom context.

type Message added in v0.6.0

type Message struct {
	*amqp.Delivery `json:"-"`

	// Inputs when making a new job
	Name string `json:"-"`
	// contains filtered or unexported fields
}

Message represents a job.

func (*Message) Ack added in v0.6.0

func (j *Message) Ack() bool

func (*Message) Fails added in v0.6.0

func (j *Message) Fails() int64

Fails 返回从 x-dead header 信息中记录的重试记录

func (*Message) Nack added in v0.6.0

func (j *Message) Nack() bool

func (*Message) Reject added in v0.6.0

func (j *Message) Reject() bool

type NextMiddlewareFunc

type NextMiddlewareFunc func() error

NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.

type Publisher added in v0.6.0

type Publisher struct {
	Namespace string // eg, "myapp-work"
	// contains filtered or unexported fields
}

Publisher can enqueue jobs.

func NewPublisher added in v0.6.0

func NewPublisher(namespace string, opts ...cony.ClientOpt) *Publisher

func NewPublisherWithExchange added in v0.6.0

func NewPublisherWithExchange(namespace, exchangeName string, opts ...cony.ClientOpt) *Publisher

NewPublisher creates a new enqueuer with the specified Redis namespace and Redis pool.

func (*Publisher) Publish added in v0.6.0

func (e *Publisher) Publish(routingKey string, msg interface{}) (*Message, error)

Publish will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Publish("send_email", work.Q{"addr": "test@example.com"})

func (*Publisher) PublishIn added in v0.6.0

func (e *Publisher) PublishIn(routingKey string, secondsFromNow int64, msg map[string]interface{}) (*Message, error)

PublishIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.

func (*Publisher) PublishInMessage added in v0.6.0

func (e *Publisher) PublishInMessage(job *Message, secondsFromNow int64) error

PublishInMessage 压入延时的 job

func (*Publisher) PublishMessage added in v0.6.0

func (e *Publisher) PublishMessage(msg *Message) error

PublishMessage 压入一个 job 任务

type Q

type Q map[string]interface{}

Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Publish("send_email", work.Q{"addr": "test@example.com", "track": true})

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.

func NewWorkerPool

func NewWorkerPool(ctx interface{}, concurrency uint, namespace string, puber *Publisher, opts ...cony.ClientOpt) *WorkerPool

func NewWorkerPoolWithExchangeName added in v0.6.0

func NewWorkerPoolWithExchangeName(ctx interface{}, concurrency uint, namespace, exchangeName string, puber *Publisher, opts ...cony.ClientOpt) *WorkerPool

NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently. 期望 cli 与 enqueuer 是两个 connection, 避免并发时候的竞争

func (*WorkerPool) Consumer added in v0.6.0

func (wp *WorkerPool) Consumer(name string, fn interface{}) *WorkerPool

Message registers the job name to the specified handler fn. For instance, when workers pull jobs from the name queue they'll be processed by the specified handler function. fn can take one of these forms: (*ContextType).func(*Message) error, (ContextType matches the type of ctx specified when creating a pool) func(*Message) error, for the generic handler format. 使用 routing-key 作为 consumerType 的 key

func (*WorkerPool) ConsumerWithOptions added in v0.6.0

func (wp *WorkerPool) ConsumerWithOptions(routingKey string, consumerOpts ConsumerOptions, fn interface{}) *WorkerPool

ConsumerWithOptions adds a handler for 'name' jobs as per the Message function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them. name: 大小写不敏感 TODO: 让 Consumer 可以动态被添加, 这样可以在运行时添加新的 consumer

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain()

Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return. TODO: 这个方法还不正确, 需要调整

func (*WorkerPool) Middleware

func (wp *WorkerPool) Middleware(fn interface{}) *WorkerPool

Middleware appends the specified function to the middleware chain. The fn can take one of these forms: (*ContextType).func(*Message, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) func(*Message, NextMiddlewareFunc) error, for the generic middleware format.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start starts the workers and associated processes.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop stops the workers and associated processes.

Directories

Path Synopsis
benches
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL