machinery

package module
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2024 License: MIT Imports: 38 Imported by: 0

README

Machinery

Machinery一个第三方开源的基于分布式消息分发的异步任务队列。类似于Python的Celery。

特性

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

架构

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。

  • Server :业务主体,我们可以使用用server暴露的接口方法进行所有任务编排的操作。如果是简单的使用那么了解它就够了。
  • Broker :数据存储层接口,主要功能是将数据放入任务队列和取出,控制任务并发,延迟也在这层。
  • Backend:数据存储层接口,主要用于更新获取任务执行结果,状态等。
  • Worker:数据处理层结构,主要是操作 Server、Broker、Backend 进行任务的获取,执行,处理执行状态及结果等。
  • Task: 数据处理层,这一层包括Task、Signature、Group、Chain、Chord等结构,主要是处理任务编排的逻辑。

任务编排

Machinery一共提供了三种任务编排方式:

  • Groups: 执行一组异步任务,任务之间互不影响。
  • Chords: 先执行一组同步任务,执行完成后,再调用最后一个回调函数。
  • Chains: 执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。

Docker部署开发环境

Redis
docker pull bitnami/redis:latest
docker pull bitnami/redis-exporter:latest

docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest

参考资料

Documentation

Index

Constants

View Source
const (
	SignatureNameKey              = attribute.Key("signature.name")
	SignatureUUIDKey              = attribute.Key("signature.uuid")
	SignatureGroupUUIDKey         = attribute.Key("signature.group.uuid")
	SignatureChordCallbackUUIDKey = attribute.Key("signature.chord.callback.uuid")
	SignatureChordCallbackNameKey = attribute.Key("signature.chord.callback.name")
	ChainTasksLengthKey           = attribute.Key("chain.tasks.length")
	GroupUUIDKey                  = attribute.Key("group.uuid")
	GroupTasksLengthKey           = attribute.Key("group.tasks.length")
	GroupConcurrencyKey           = attribute.Key("group.concurrency")
	GroupTasksKey                 = attribute.Key("group.tasks")
	ChordCallbackUUIDKey          = attribute.Key("chord.callback.uuid")
)

Variables

This section is empty.

Functions

func LogDebug

func LogDebug(args ...interface{})

func LogDebugf

func LogDebugf(format string, args ...interface{})

func LogError

func LogError(args ...interface{})

func LogErrorf

func LogErrorf(format string, args ...interface{})

func LogFatal

func LogFatal(args ...interface{})

func LogFatalf

func LogFatalf(format string, args ...interface{})

func LogInfo

func LogInfo(args ...interface{})

func LogInfof

func LogInfof(format string, args ...interface{})

func LogWarn

func LogWarn(args ...interface{})

func LogWarnf

func LogWarnf(format string, args ...interface{})

Types

type BackendType

type BackendType int
const (
	BackendTypeRedis    BackendType = iota // Redis
	BackendTypeAmqp                        // AMQP
	BackendTypeMemcache                    // Memcache
	BackendTypeMongoDB                     // MongoDB
	BackendTypeDynamoDB                    // Amazon DynamoDB
)

type BrokerType

type BrokerType int
const (
	BrokerTypeRedis     BrokerType = iota // Redis
	BrokerTypeAmqp                        // AMQP
	BrokerTypeGcpPubSub                   // GCP Pub/Sub
	BrokerTypeSQS                         // AWS SQS
)

type LockType

type LockType int
const (
	LockTypeRedis LockType = iota
)

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *tasks.Headers) MessageCarrier

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

type Server

type Server struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) HandleFunc

func (s *Server) HandleFunc(name string, handler interface{}) error

func (*Server) Name

func (s *Server) Name() string

func (*Server) NewChain

func (s *Server) NewChain(chainTasks ...TasksOption) error

NewChain 执行一组同步任务,任务有次序之分,上个任务的出参可作为下个任务的入参。

func (*Server) NewChord

func (s *Server) NewChord(chordTasks ...TasksOption) error

NewChord 先执行一组同步任务,执行完成后,再调用最后一个回调函数。

func (*Server) NewGroup

func (s *Server) NewGroup(groupTasks ...TasksOption) error

NewGroup 执行一组异步任务,任务之间互不影响。

func (*Server) NewPeriodicChain

func (s *Server) NewPeriodicChain(cronSpec string, chainTasks ...TasksOption) error

func (*Server) NewPeriodicChord

func (s *Server) NewPeriodicChord(cronSpec string, chordTasks ...TasksOption) error

func (*Server) NewPeriodicGroup

func (s *Server) NewPeriodicGroup(cronSpec string, groupTasks ...TasksOption) error

func (*Server) NewPeriodicTask

func (s *Server) NewPeriodicTask(cronSpec, typeName string, opts ...TaskOption) error

NewPeriodicTask 周期性定时任务,不支持秒级任务,最大精度只到分钟。

func (*Server) NewTask

func (s *Server) NewTask(typeName string, opts ...TaskOption) error

NewTask enqueue a new task

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

type ServerOption

type ServerOption func(o *Server)

func WithAMQPConfig

func WithAMQPConfig(cfg *config.AMQPConfig) ServerOption

func WithBrokerAddress

func WithBrokerAddress(addr string, db int, brokerType BrokerType) ServerOption

func WithConsumerOption

func WithConsumerOption(consumerTag string, concurrency int, queue string) ServerOption

func WithDefaultQueue

func WithDefaultQueue(name string) ServerOption

func WithDynamoDBConfig

func WithDynamoDBConfig(cfg *config.DynamoDBConfig) ServerOption

func WithEnableKeepAlive

func WithEnableKeepAlive(enable bool) ServerOption

WithEnableKeepAlive enable keep alive

func WithEnvironmentConfig

func WithEnvironmentConfig() ServerOption

WithEnvironmentConfig read config from env.

func WithGCPPubSubConfig

func WithGCPPubSubConfig(cfg *config.GCPPubSubConfig) ServerOption

func WithLockAddress

func WithLockAddress(addr string, db, retries int, lockType LockType) ServerOption

func WithMongoDBConfig

func WithMongoDBConfig(cfg *config.MongoDBConfig) ServerOption

func WithNoUnixSignals

func WithNoUnixSignals(noUnixSignals bool) ServerOption

func WithRedisConfig

func WithRedisConfig(cfg *config.RedisConfig) ServerOption

func WithResultBackendAddress

func WithResultBackendAddress(addr string, db int, backendType BackendType) ServerOption

func WithResultsExpireIn

func WithResultsExpireIn(expire int) ServerOption

func WithSQSConfig

func WithSQSConfig(cfg *config.SQSConfig) ServerOption

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

func WithYamlConfig

func WithYamlConfig(cnfPath string, keepReloading bool) ServerOption

WithYamlConfig read config from yaml file.

type TaskOption

type TaskOption func(o *tasks.Signature)

func WithArgument

func WithArgument(typeName string, value interface{}) TaskOption

func WithDelayTime

func WithDelayTime(delayTime time.Time) TaskOption

func WithHeader

func WithHeader(key string, value interface{}) TaskOption

func WithHeaders

func WithHeaders(headers tasks.Headers) TaskOption

func WithPriority

func WithPriority(priority uint8) TaskOption

func WithRetryCount

func WithRetryCount(count int) TaskOption

func WithRetryTimeout

func WithRetryTimeout(timeout int) TaskOption

func WithRoutingKey

func WithRoutingKey(key string) TaskOption

type TasksOption

type TasksOption func(o *[]*tasks.Signature)

func WithTask

func WithTask(typeName string, opts ...TaskOption) TasksOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL