Documentation ¶
Index ¶
- type Client
- func (c *Client) AbortTask(taskID string, exTime int) error
- func (c *Client) Clone() *Client
- func (c *Client) GetAsyncResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)
- func (c *Client) GetResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)
- func (c *Client) GetStatus(taskId string, timeout time.Duration, sleepTime time.Duration) (int, error)
- func (c *Client) Send(groupName string, workerName string, args ...interface{}) (string, error)
- func (c *Client) SetTaskCtl(name int, value interface{}) *Client
- func (c *Client) Workflow() *ClientWithWorkflow
- type ClientWithWorkflow
- type DelayServer
- func (s *DelayServer) GetDelayMsgGoroutine()
- func (s *DelayServer) GetDelayMsgGoroutine_UpdateQueue(msg message.Message)
- func (s *DelayServer) GetReadyMsgGoroutine()
- func (s *DelayServer) GetReadyMsgGoroutine_Send(msg message.Message) (err error)
- func (s *DelayServer) IsRunning() bool
- func (s *DelayServer) IsStop() bool
- func (s *DelayServer) LSendQueue()
- func (s *DelayServer) Run()
- func (s *DelayServer) SendReadyMsgGoroutine()
- func (s *DelayServer) SendReadyMsgGoroutine_Send(msg message.Message) (err error)
- func (s *DelayServer) SetRunning()
- func (s *DelayServer) SetStop()
- func (s *DelayServer) Shutdown(ctx context.Context) error
- type FuncWorker
- type InlineServer
- func (t *InlineServer) Add(workerName string, w interface{}, callbackFunc ...interface{})
- func (t *InlineServer) GetNextMessageGoroutine()
- func (t *InlineServer) IsRunning() bool
- func (t *InlineServer) IsStop() bool
- func (t *InlineServer) MakeWorkerReady()
- func (t *InlineServer) Run(numWorkers int)
- func (t *InlineServer) SetRunning()
- func (t *InlineServer) SetStop()
- func (t *InlineServer) Shutdown(ctx context.Context) error
- func (t *InlineServer) WorkerGoroutine()
- type Server
- type ServerUtils
- func (b *ServerUtils) AbortTask(id string, expireTime int) error
- func (b *ServerUtils) BackendActivate()
- func (b *ServerUtils) BrokerActivate()
- func (b *ServerUtils) GetBackendPoolSize() int
- func (b *ServerUtils) GetBrokerPoolSize() int
- func (b ServerUtils) GetDelayGroupName(groupName string) string
- func (b ServerUtils) GetQueueName(groupName string) string
- func (b *ServerUtils) GetResult(id string) (message.Result, error)
- func (b *ServerUtils) IsAbort(id string) (bool, error)
- func (b *ServerUtils) LSendMsg(groupName string, msg message.Message) error
- func (b *ServerUtils) Next(groupName string) (message.Message, error)
- func (b *ServerUtils) Send(groupName string, workerName string, msgArgs message.MessageArgs, ...) (string, error)
- func (b *ServerUtils) SendMsg(groupName string, msg message.Message) error
- func (b *ServerUtils) SetBackendPoolSize(num int)
- func (b *ServerUtils) SetBrokerPoolSize(num int)
- func (b *ServerUtils) SetResult(result message.Result) error
- type SortQueue
- type TaskCtl
- func (t *TaskCtl) Abort(msg string)
- func (t TaskCtl) CanRetry() bool
- func (t TaskCtl) GetError() error
- func (t *TaskCtl) GetRetryCount() int
- func (t *TaskCtl) GetRunTime() time.Time
- func (t *TaskCtl) GetTaskId() string
- func (t *TaskCtl) IsAbort() (bool, error)
- func (t *TaskCtl) IsExpired() bool
- func (t *TaskCtl) IsZeroRunTime() bool
- func (t *TaskCtl) Retry(err error)
- func (t *TaskCtl) SetError(err error)
- func (t *TaskCtl) SetExpireTime(_t time.Time)
- func (t *TaskCtl) SetRetryCount(c int)
- func (t *TaskCtl) SetRunTime(_t time.Time)
- func (t *TaskCtl) SetServerUtil(su *ServerUtils)
- type TaskCtlWorkflowArgs
- type TaskMessage
- type WorkerInterface
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) GetAsyncResult ¶
func (c *Client) GetAsyncResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)
GetAsyncResult Return the result whether the task is finished or not 无论任务是否结束都返回结果(此结果只要任务开始运行就有)
func (*Client) GetResult ¶
func (c *Client) GetResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)
GetResult Only return the result when the task is over 只有任务结束才返回结果(任务失败也算结束)
- taskId:
- timeout:
- sleepDuration:
func (*Client) GetStatus ¶
func (c *Client) GetStatus(taskId string, timeout time.Duration, sleepTime time.Duration) (int, error)
GetStatus taskId: timeout: sleepDuration:
func (*Client) SetTaskCtl ¶
type ClientWithWorkflow ¶
type ClientWithWorkflow struct { WorkflowArgs message.MessageWorkflowArgs // contains filtered or unexported fields }
func (*ClientWithWorkflow) Done ¶
func (c *ClientWithWorkflow) Done() (string, error)
Done SendWorkflow return: taskId, err
func (*ClientWithWorkflow) Send ¶
func (c *ClientWithWorkflow) Send(groupName string, workerName string, args ...interface{}) *ClientWithWorkflow
Send
- args : 只有第一个任务才能填!!!后续任务的参数固定为第一个任务的返回值
func (*ClientWithWorkflow) SetTaskCtl ¶
func (c *ClientWithWorkflow) SetTaskCtl(name int, value interface{}) *ClientWithWorkflow
type DelayServer ¶
type DelayServer struct { sync.Map ServerUtils // contains filtered or unexported fields }
func NewDelayServer ¶
func (*DelayServer) GetDelayMsgGoroutine_UpdateQueue ¶
func (s *DelayServer) GetDelayMsgGoroutine_UpdateQueue(msg message.Message)
func (*DelayServer) GetReadyMsgGoroutine ¶
func (s *DelayServer) GetReadyMsgGoroutine()
从本地队列中获取到处理时间的任务,发送到readyMsgChan
func (*DelayServer) GetReadyMsgGoroutine_Send ¶
func (s *DelayServer) GetReadyMsgGoroutine_Send(msg message.Message) (err error)
func (*DelayServer) IsRunning ¶
func (s *DelayServer) IsRunning() bool
func (*DelayServer) IsStop ¶
func (s *DelayServer) IsStop() bool
func (*DelayServer) LSendQueue ¶
func (s *DelayServer) LSendQueue()
func (*DelayServer) Run ¶
func (s *DelayServer) Run()
func (*DelayServer) SendReadyMsgGoroutine ¶
func (s *DelayServer) SendReadyMsgGoroutine()
从readyMsgChan中读取任务,传给inlineServer的Chan处理
func (*DelayServer) SendReadyMsgGoroutine_Send ¶
func (s *DelayServer) SendReadyMsgGoroutine_Send(msg message.Message) (err error)
func (*DelayServer) SetRunning ¶
func (s *DelayServer) SetRunning()
func (*DelayServer) SetStop ¶
func (s *DelayServer) SetStop()
type FuncWorker ¶
type FuncWorker struct { Func interface{} // 执行的函数 CallbackFunc interface{} // 回调函数 Name string Logger log.LoggerInterface }
func (*FuncWorker) WorkerName ¶
func (f *FuncWorker) WorkerName() string
type InlineServer ¶
type InlineServer struct { sync.Map ServerUtils // contains filtered or unexported fields }
func NewInlineServer ¶
func NewInlineServer(groupName string, c config.Config) InlineServer
func (*InlineServer) Add ¶
func (t *InlineServer) Add(workerName string, w interface{}, callbackFunc ...interface{})
Add worker to group w : worker func callbackFunc : callbackFunc func
func (*InlineServer) GetNextMessageGoroutine ¶
func (t *InlineServer) GetNextMessageGoroutine()
GetNextMessageGoroutine describe: get next message if worker is ready
func (*InlineServer) IsRunning ¶
func (t *InlineServer) IsRunning() bool
func (*InlineServer) IsStop ¶
func (t *InlineServer) IsStop() bool
func (*InlineServer) MakeWorkerReady ¶
func (t *InlineServer) MakeWorkerReady()
func (*InlineServer) Run ¶
func (t *InlineServer) Run(numWorkers int)
func (*InlineServer) SetRunning ¶
func (t *InlineServer) SetRunning()
func (*InlineServer) SetStop ¶
func (t *InlineServer) SetStop()
func (*InlineServer) WorkerGoroutine ¶
func (t *InlineServer) WorkerGoroutine()
WorkerGoroutine describe: start worker to run
type Server ¶
type Server struct { ServerMap map[string]*InlineServer // groupName:server DelayServerMap map[string]*DelayServer // groupName:server // contains filtered or unexported fields }
func (*Server) Add ¶
func (t *Server) Add(groupName string, workerName string, w interface{}, callbackFunc ...interface{})
Add worker to group w : worker func callbackFunc:callbackFunc
type ServerUtils ¶
type ServerUtils struct {
// contains filtered or unexported fields
}
ServerUtils 用于把 delayServer,inlineServer,client 用到的方法抽离出来
func (*ServerUtils) AbortTask ¶
func (b *ServerUtils) AbortTask(id string, expireTime int) error
AbortTask - exTime : 过期时间,秒
func (*ServerUtils) BackendActivate ¶
func (b *ServerUtils) BackendActivate()
func (*ServerUtils) BrokerActivate ¶
func (b *ServerUtils) BrokerActivate()
func (*ServerUtils) GetBackendPoolSize ¶
func (b *ServerUtils) GetBackendPoolSize() int
func (*ServerUtils) GetBrokerPoolSize ¶
func (b *ServerUtils) GetBrokerPoolSize() int
func (ServerUtils) GetDelayGroupName ¶
func (b ServerUtils) GetDelayGroupName(groupName string) string
func (ServerUtils) GetQueueName ¶
func (b ServerUtils) GetQueueName(groupName string) string
func (*ServerUtils) LSendMsg ¶
func (b *ServerUtils) LSendMsg(groupName string, msg message.Message) error
func (*ServerUtils) Send ¶
func (b *ServerUtils) Send(groupName string, workerName string, msgArgs message.MessageArgs, args ...interface{}) (string, error)
Send msg to Queue t.Send("groupName", "workerName" , 1,"hi",1.2)
func (*ServerUtils) SendMsg ¶
func (b *ServerUtils) SendMsg(groupName string, msg message.Message) error
func (*ServerUtils) SetBackendPoolSize ¶
func (b *ServerUtils) SetBackendPoolSize(num int)
func (*ServerUtils) SetBrokerPoolSize ¶
func (b *ServerUtils) SetBrokerPoolSize(num int)
type SortQueue ¶
type SortQueue struct { sync.Mutex Queue []message.Message MaxLen int // contains filtered or unexported fields }
func NewSortQueue ¶
type TaskCtl ¶
func NewTaskCtl ¶
func (*TaskCtl) GetRetryCount ¶
func (*TaskCtl) GetRunTime ¶
func (*TaskCtl) IsZeroRunTime ¶
func (*TaskCtl) SetExpireTime ¶
func (*TaskCtl) SetRetryCount ¶
func (*TaskCtl) SetRunTime ¶
func (*TaskCtl) SetServerUtil ¶
func (t *TaskCtl) SetServerUtil(su *ServerUtils)