mq

package module
v0.0.0-...-101f611 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2020 License: MIT Imports: 15 Imported by: 0

README

mq

介绍

{以下是码云平台说明,您可以替换此简介 码云是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 无论是个人、团队、或是企业,都能够用码云实现代码托管、项目管理、协作开发。企业项目请看 https://gitee.com/enterprises}

软件架构

软件架构说明

安装教程
  1. xxxx
  2. xxxx
  3. xxxx
使用说明
  1. xxxx
  2. xxxx
  3. xxxx
参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request
码云特技
  1. 使用 Readme_XXX.md 来支持不同的语言,例如 Readme_en.md, Readme_zh.md
  2. 码云官方博客 blog.gitee.com
  3. 你可以 https://gitee.com/explore 这个地址来了解码云上的优秀开源项目
  4. GVP 全称是码云最有价值开源项目,是码云综合评定出的优秀开源项目
  5. 码云官方提供的使用手册 https://gitee.com/help
  6. 码云封面人物是一档用来展示码云会员风采的栏目 https://gitee.com/gitee-stars/

Documentation

Index

Constants

View Source
const (
	// JobStatusDefault 默认状态
	JobStatusDefault = iota
	// JobStatusDelay delay:不可执行状态,等待时钟周期
	JobStatusDelay
	// JobStatusReady ready:可执行状态,等待消费
	JobStatusReady
	// JobStatusReserved reserved: 已被消费者读取,但还未得到消费者的响应(delete、finish)
	JobStatusReserved
)
View Source
const (
	// JobPoolKey jobpool
	JobPoolKey = "mq:jobpool"
	// BucketKey bucket
	BucketKey = "mq:bucket"
	// ReadyQueueKey readyqueue
	ReadyQueueKey = "mq:readyqueue"
	// ReadyQueueCacheKey rqcachekey
	ReadyQueueCacheKey = "mq:readyqueuecachekey"
)

Variables

View Source
var (
	// ErrJobIDEmpty job.id is empty
	ErrJobIDEmpty = errors.New("job.id is empty")
	// ErrJobTopicEmpty job.topic is empty
	ErrJobTopicEmpty = errors.New("job.topic is empty")
)

Functions

func Ack

func Ack(jobID string) (bool, error)

Ack 根据id消费队列

func AddToBucket

func AddToBucket(b *Bucket, card *JobCard) error

AddToBucket 添加到bucket 有序集合score = 延迟秒数 + 当前时间戳, member = jobID 并且设置job.status = JobStatusDelay TTR>0时,有可能job先被删除后再添加到bucket,所以添加到bucket前需要检测job是否存在

func AddToJobPool

func AddToJobPool(j *Job) error

AddToJobPool 消息入队列

func AddToReadyQueue

func AddToReadyQueue(jobID string) error

AddToReadyQueue 消息入队列

func Encode

func Encode(j *Job) (string, error)

Encode job转成json字符串

func FormatTime

func FormatTime(t time.Time) string

FormatTime time.Time to "2019-06-07 12:00:00"

func GetBucketJobNum

func GetBucketJobNum(b *Bucket) int

GetBucketJobNum GetBucketJobNum

func GetBucketKeyByID

func GetBucketKeyByID(id string) string

GetBucketKeyByID GetBucketKeyByID

func GetJobConsumeNum

func GetJobConsumeNum(jobID string) (int, error)

GetJobConsumeNum 获取消息被消费次数

func GetJobDetailByID

func GetJobDetailByID(jobID string) (map[string]string, error)

GetJobDetailByID 获取消息的detail

func GetJobKeyByID

func GetJobKeyByID(id string) string

GetJobKeyByID GetJobKeyByID

func GetJobQueueByTopic

func GetJobQueueByTopic(topic string) string

GetJobQueueByTopic GetJobQueueByTopic

func GetJobStatus

func GetJobStatus(jobID string) (int, error)

GetJobStatus 获取消息的状态

func GetTopicByJobID

func GetTopicByJobID(jobID string) (string, error)

GetTopicByJobID 获取消息的topic

func IncrJobConsumeNum

func IncrJobConsumeNum(jobID string) (bool, error)

IncrJobConsumeNum 消息被消费的次数+1

func Pop

func Pop(topics ...string) (map[string]string, error)

Pop 根据topic消费队列

func Push

func Push(j string) error

Push json字符串类型消息 入队列

func RemoveFromBucket

func RemoveFromBucket() error

RemoveFromBucket 移除bucket

func RetrivalTimeoutJobs

func RetrivalTimeoutJobs(b *Bucket) (jobIDs []string, nextTime int, err error)

RetrivalTimeoutJobs 从指定bucket检索到期的job nextTime参数如下:

	-1 当前bucket已经没有jobs
 >0 当前bucket下个job到期时间

可能整个事务需要保证原子一致性

func SecToTimeString

func SecToTimeString(t interface{}) string

SecToTimeString 172992 to "48h3m12s"

func SetJobStatus

func SetJobStatus(jobID string, status int) error

SetJobStatus 设置消息的状态

func UnixToFormatTime

func UnixToFormatTime(t interface{}) string

UnixToFormatTime unix to "2019-06-07 12:00:00"

Types

type Bucket

type Bucket struct {
	sync.Mutex
	ID       string
	JobNum   int
	NextTime time.Time
	// contains filtered or unexported fields
}

Bucket Bucket

func (*Bucket) Key

func (b *Bucket) Key() string

Key Key

type ByID

type ByID []*Bucket

ByID ByID

func (ByID) Len

func (b ByID) Len() int

func (ByID) Less

func (b ByID) Less(i, j int) bool

func (ByID) Swap

func (b ByID) Swap(i, j int)

type ByNum

type ByNum []*Bucket

ByNum ByNum

func (ByNum) Len

func (b ByNum) Len() int

func (ByNum) Less

func (b ByNum) Less(i, j int) bool

func (ByNum) Swap

func (b ByNum) Swap(i, j int)

type Dispatcher

type Dispatcher struct {
	TTRBuckets []*Bucket
	// contains filtered or unexported fields
}

Dispatcher 调度器 添加Job到Job Pool 调度Job分配到bucket 管理bucket

func NewDispatcher

func NewDispatcher() *Dispatcher

NewDispatcher 新建调度器

func (*Dispatcher) AddToJobPool

func (d *Dispatcher) AddToJobPool(j *Job) error

AddToJobPool 添加任务到对象池

func (*Dispatcher) GetBuckets

func (d *Dispatcher) GetBuckets() []*Bucket

GetBuckets GetBuckets

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run 启用调度器 job调度器,负责bucket分配

type IServer

type IServer interface {
	Run()
}

IServer Server接口

func NewServ

func NewServ() IServer

NewServ 新建Server

type Job

type Job struct {
	ID         string `redis:"id"`
	Topic      string `redis:"topic"`
	Delay      int    `redis:"delay"`
	TTR        int    `redis:"TTR"` // time-to-run
	Body       string `redis:"body"`
	Status     int    `redis:"status"`
	ConsumeNum int    `redis:"consume_num"`
}

Job 进入队列的消息结构

func Decode

func Decode(j string) (*Job, error)

Decode json字符串转成job对象

func GetJobStuctByID

func GetJobStuctByID(jobID string) (*Job, error)

GetJobStuctByID 获取消息

func (*Job) Card

func (j *Job) Card() *JobCard

Card Card

func (*Job) CheckJobData

func (j *Job) CheckJobData() error

CheckJobData 检测job的数据结构

func (*Job) Key

func (j *Job) Key() string

Key 获取job的id

func (*Job) String

func (j *Job) String() string

type JobCard

type JobCard struct {
	// contains filtered or unexported fields
}

JobCard JobCard

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

MQ 消息队列

func New

func New() *MQ

New 新建mq

func (*MQ) Run

func (mq *MQ) Run(host, port, password string, maxIDle, maxActive int)

Run 启动mq

type RPCServer

type RPCServer struct {
}

RPCServer RPCServer

func (*RPCServer) Run

func (s *RPCServer) Run()

Run 启动RPCServer

type RedisDB

type RedisDB struct {
	Pool *redis.Pool
}

RedisDB redis连接池

var Redis *RedisDB

Redis 存放redis连接池的变量

func (*RedisDB) Bool

func (db *RedisDB) Bool(command string, args ...interface{}) (bool, error)

Bool 执行redis操作

func (*RedisDB) Do

func (db *RedisDB) Do(command string, args ...interface{}) (interface{}, error)

Do 执行redis操作

func (*RedisDB) InitPool

func (db *RedisDB) InitPool(host, port, password string, maxIDle, maxActive int)

InitPool 新建连接池

func (*RedisDB) Int

func (db *RedisDB) Int(command string, args ...interface{}) (int, error)

Int 执行redis操作

func (*RedisDB) Ints

func (db *RedisDB) Ints(command string, args ...interface{}) ([]int, error)

Ints 执行redis操作

func (*RedisDB) String

func (db *RedisDB) String(command string, args ...interface{}) (string, error)

String 执行redis操作

func (*RedisDB) StringMap

func (db *RedisDB) StringMap(command string, args ...interface{}) (map[string]string, error)

StringMap 执行redis操作

func (*RedisDB) Strings

func (db *RedisDB) Strings(command string, args ...interface{}) ([]string, error)

Strings 执行redis操作

type Service

type Service struct {
}

Service 服务

func (*Service) Ack

func (s *Service) Ack(id string, reply *bool) (err error)

Ack 根据id消费队列

func (*Service) Pop

func (s *Service) Pop(topic []string, reply *map[string]string) (err error)

Pop 根据topic消费队列

func (*Service) Push

func (s *Service) Push(j map[string]string, reply *string) error

Push 消息入队列

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL