kingjin

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2021 License: MIT Imports: 6 Imported by: 0

README

kingjin

Production Ready License

Installation

go get -u -v gitee.com/oliverxu/kingjin

##如何使用

配置启动
var QueueObj iface.IServer
config := &core.Config{
		Name:             "XXX work",
		Version:          "1.0.1",
		WorkerPoolSize:   8,
		MaxWorkerTaskLen: 2048,
		MaxDeadTaskLen:   2048,
		MaxDbTaskLen:     4096,
		MaxRetry:         3,
		DelayTime:        time.Second * 2,
}
QueueObj = core.NewServer(config)
QueueObj.Serve()

注册路由
QueueObj.AddRouter("importExcel", new(queue.Excel))

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewServer

func NewServer(config *Config) iface.IServer

Types

type Config

type Config struct {
	Name             string //队列名称
	WorkerPoolSize   uint32 //业务工作Worker池的数量
	MaxWorkerTaskLen uint32 //作业队列的数量
	MaxDeadTaskLen   uint32 //失败的队列数量
	MaxDbTaskLen     uint32 //最大DB作业数量
	MaxRetry         int64  //最大重试次数
	Version          string //版本名称
	DelayTime        time.Duration
}

type Message

type Message struct {
	BizType string
	Content string
	Param   map[string]interface{}
	MsgId   uint32
}

func NewMessage

func NewMessage(id uint32, bizType, Content string, param map[string]interface{}) *Message

创建一个Message消息包

func (*Message) GetBizType

func (m *Message) GetBizType() string

func (*Message) GetContent

func (m *Message) GetContent() string

func (*Message) GetMsgId

func (m *Message) GetMsgId() uint32

func (*Message) GetParam

func (m *Message) GetParam() map[string]interface{}

func (*Message) SetBizType

func (m *Message) SetBizType(bizType string)

func (*Message) SetContent

func (m *Message) SetContent(content string)

func (*Message) SetMsgId

func (m *Message) SetMsgId(msgId uint32)

func (*Message) SetParam

func (m *Message) SetParam(param map[string]interface{})

type MsgHandle

type MsgHandle struct {
	Apis           map[string]iface.IRouter //存放每个MsgId 所对应的处理方法的map属性
	WorkerPoolSize uint32                   //业务工作Worker池的数量
	TaskQueue      []chan iface.IMessage    //Worker负责取任务的消息队列
	Exit           chan struct{}            //退出
	Status         bool                     //开启的状态 true 开启  false 关闭
	Lock           sync.RWMutex             // 锁
	Pc             []atomic2.Int64
	RetryKV        map[string]atomic2.Int64 //计算MsgId  重试的次数
	MaxRetry       int64                    //最大重试次数
	DeadQueue      chan iface.IMessage      //处理失败任务的队列
	DbQueue        chan iface.IMessage
	Delay          time.Duration //延迟ACK失败返回队列
	Setting        *Config
}

func NewMsgHandle

func NewMsgHandle(config *Config) *MsgHandle

func (*MsgHandle) AddRouter

func (mh *MsgHandle) AddRouter(bizType string, router iface.IRouter)

为消息添加具体的处理逻辑

func (*MsgHandle) DoDbHandler

func (mh *MsgHandle) DoDbHandler(request iface.IMessage)

func (*MsgHandle) DoDeadHandler

func (mh *MsgHandle) DoDeadHandler(request iface.IMessage)

func (*MsgHandle) DoMsgHandler

func (mh *MsgHandle) DoMsgHandler(request iface.IMessage, workerID int)

马上以非阻塞方式处理消息

func (*MsgHandle) SendMsgToTaskQueue

func (mh *MsgHandle) SendMsgToTaskQueue(request iface.IMessage)

将消息交给TaskQueue,由worker进行处理

func (*MsgHandle) StartDbWork

func (mh *MsgHandle) StartDbWork()

func (*MsgHandle) StartDeadWork

func (mh *MsgHandle) StartDeadWork()

失败进入队列

func (*MsgHandle) StartOneWorker

func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan iface.IMessage)

启动一个Worker工作流程

func (*MsgHandle) StartWorkerPool

func (mh *MsgHandle) StartWorkerPool()

启动worker工作池

func (*MsgHandle) Stat

func (mh *MsgHandle) Stat() []atomic2.Int64

func (*MsgHandle) Stop

func (mh *MsgHandle) Stop()

type Server

type Server struct {
	//服务器的名称
	Name string
	// contains filtered or unexported fields
}

func (*Server) AddMessage

func (s *Server) AddMessage(msg iface.IMessage)

func (*Server) AddRouter

func (s *Server) AddRouter(bizType string, router iface.IRouter)

路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用

func (*Server) Serve

func (s *Server) Serve()

运行服务

func (*Server) Start

func (s *Server) Start()

开启网络服务

func (*Server) Stat

func (s *Server) Stat() []atomic2.Int64

func (*Server) Stop

func (s *Server) Stop()

停止服务

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL