server

package
v3.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(c config.Config) Client

func (*Client) AbortTask

func (c *Client) AbortTask(taskID string, exTime int) error

AbortTask

<exTime>: 过期时间,秒。<=0表示不过期

func (*Client) Clone

func (c *Client) Clone() *Client

func (*Client) GetAsyncResult

func (c *Client) GetAsyncResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)

GetAsyncResult Return the result whether the task is finished or not 无论任务是否结束都返回结果(此结果只要任务开始运行就有)

func (*Client) GetResult

func (c *Client) GetResult(taskId string, timeout time.Duration, sleepTime time.Duration) (message.Result, error)

GetResult Only return the result when the task is over 只有任务结束才返回结果(任务失败也算结束)

  • taskId:
  • timeout:
  • sleepDuration:

func (*Client) GetStatus

func (c *Client) GetStatus(taskId string, timeout time.Duration, sleepTime time.Duration) (int, error)

GetStatus taskId: timeout: sleepDuration:

func (*Client) Send

func (c *Client) Send(groupName string, workerName string, args ...interface{}) (string, error)

Send return: taskId, err

func (*Client) SetTaskCtl

func (c *Client) SetTaskCtl(name int, value interface{}) *Client

func (*Client) Workflow

func (c *Client) Workflow() *ClientWithWorkflow

Workflow start a workflow

type ClientWithWorkflow

type ClientWithWorkflow struct {
	WorkflowArgs message.MessageWorkflowArgs
	// contains filtered or unexported fields
}

func (*ClientWithWorkflow) Done

func (c *ClientWithWorkflow) Done() (string, error)

Done SendWorkflow return: taskId, err

func (*ClientWithWorkflow) Send

func (c *ClientWithWorkflow) Send(groupName string, workerName string, args ...interface{}) *ClientWithWorkflow

Send

  • args : 只有第一个任务才能填!!!后续任务的参数固定为第一个任务的返回值

func (*ClientWithWorkflow) SetTaskCtl

func (c *ClientWithWorkflow) SetTaskCtl(name int, value interface{}) *ClientWithWorkflow

type DelayServer

type DelayServer struct {
	sync.Map
	ServerUtils
	// contains filtered or unexported fields
}

func NewDelayServer

func NewDelayServer(groupName string, c config.Config, msgChan chan message.Message) DelayServer

func (*DelayServer) GetDelayMsgGoroutine

func (s *DelayServer) GetDelayMsgGoroutine()

获取延时任务到本地队列

func (*DelayServer) GetDelayMsgGoroutine_UpdateQueue

func (s *DelayServer) GetDelayMsgGoroutine_UpdateQueue(msg message.Message)

func (*DelayServer) GetReadyMsgGoroutine

func (s *DelayServer) GetReadyMsgGoroutine()

从本地队列中获取到处理时间的任务,发送到readyMsgChan

func (*DelayServer) GetReadyMsgGoroutine_Send

func (s *DelayServer) GetReadyMsgGoroutine_Send(msg message.Message) (err error)

func (*DelayServer) IsRunning

func (s *DelayServer) IsRunning() bool

func (*DelayServer) IsStop

func (s *DelayServer) IsStop() bool

func (*DelayServer) LSendQueue

func (s *DelayServer) LSendQueue()

func (*DelayServer) Run

func (s *DelayServer) Run()

func (*DelayServer) SendReadyMsgGoroutine

func (s *DelayServer) SendReadyMsgGoroutine()

从readyMsgChan中读取任务,传给inlineServer的Chan处理

func (*DelayServer) SendReadyMsgGoroutine_Send

func (s *DelayServer) SendReadyMsgGoroutine_Send(msg message.Message) (err error)

func (*DelayServer) SetRunning

func (s *DelayServer) SetRunning()

func (*DelayServer) SetStop

func (s *DelayServer) SetStop()

func (*DelayServer) Shutdown

func (s *DelayServer) Shutdown(ctx context.Context) error

type FuncWorker

type FuncWorker struct {
	Func         interface{} // 执行的函数
	CallbackFunc interface{} // 回调函数
	Name         string
	Logger       log.LoggerInterface
}

func (*FuncWorker) After

func (f *FuncWorker) After(ctl *TaskCtl, funcArgs []string, result *message.Result) error

func (*FuncWorker) Run

func (f *FuncWorker) Run(ctl *TaskCtl, funcArgs []string, result *message.Result) error

func (*FuncWorker) WorkerName

func (f *FuncWorker) WorkerName() string

type InlineServer

type InlineServer struct {
	sync.Map
	ServerUtils
	// contains filtered or unexported fields
}

func NewInlineServer

func NewInlineServer(groupName string, c config.Config) InlineServer

func (*InlineServer) Add

func (t *InlineServer) Add(workerName string, w interface{}, callbackFunc ...interface{})

Add worker to group w : worker func callbackFunc : callbackFunc func

func (*InlineServer) GetNextMessageGoroutine

func (t *InlineServer) GetNextMessageGoroutine()

GetNextMessageGoroutine describe: get next message if worker is ready

func (*InlineServer) IsRunning

func (t *InlineServer) IsRunning() bool

func (*InlineServer) IsStop

func (t *InlineServer) IsStop() bool

func (*InlineServer) MakeWorkerReady

func (t *InlineServer) MakeWorkerReady()

func (*InlineServer) Run

func (t *InlineServer) Run(numWorkers int)

func (*InlineServer) SetRunning

func (t *InlineServer) SetRunning()

func (*InlineServer) SetStop

func (t *InlineServer) SetStop()

func (*InlineServer) Shutdown

func (t *InlineServer) Shutdown(ctx context.Context) error

func (*InlineServer) WorkerGoroutine

func (t *InlineServer) WorkerGoroutine()

WorkerGoroutine describe: start worker to run

type Server

type Server struct {
	ServerMap      map[string]*InlineServer // groupName:server
	DelayServerMap map[string]*DelayServer  // groupName:server
	// contains filtered or unexported fields
}

func NewServer

func NewServer(c config.Config) Server

func (*Server) Add

func (t *Server) Add(groupName string, workerName string, w interface{}, callbackFunc ...interface{})

Add worker to group w : worker func callbackFunc:callbackFunc

func (*Server) GetClient

func (t *Server) GetClient() Client

func (*Server) Run

func (t *Server) Run(groupName string, numWorkers int, enableDelayServer ...bool)

func (*Server) Shutdown

func (t *Server) Shutdown(ctx context.Context) error

type ServerUtils

type ServerUtils struct {
	// contains filtered or unexported fields
}

ServerUtils 用于把 delayServer,inlineServer,client 用到的方法抽离出来

func (*ServerUtils) AbortTask

func (b *ServerUtils) AbortTask(id string, expireTime int) error

AbortTask - exTime : 过期时间,秒

func (*ServerUtils) BackendActivate

func (b *ServerUtils) BackendActivate()

func (*ServerUtils) BrokerActivate

func (b *ServerUtils) BrokerActivate()

func (*ServerUtils) GetBackendPoolSize

func (b *ServerUtils) GetBackendPoolSize() int

func (*ServerUtils) GetBrokerPoolSize

func (b *ServerUtils) GetBrokerPoolSize() int

func (ServerUtils) GetDelayGroupName

func (b ServerUtils) GetDelayGroupName(groupName string) string

func (ServerUtils) GetQueueName

func (b ServerUtils) GetQueueName(groupName string) string

func (*ServerUtils) GetResult

func (b *ServerUtils) GetResult(id string) (message.Result, error)

func (*ServerUtils) IsAbort

func (b *ServerUtils) IsAbort(id string) (bool, error)

func (*ServerUtils) LSendMsg

func (b *ServerUtils) LSendMsg(groupName string, msg message.Message) error

func (*ServerUtils) Next

func (b *ServerUtils) Next(groupName string) (message.Message, error)

func (*ServerUtils) Send

func (b *ServerUtils) Send(groupName string, workerName string, msgArgs message.MessageArgs, args ...interface{}) (string, error)

Send msg to Queue t.Send("groupName", "workerName" , 1,"hi",1.2)

func (*ServerUtils) SendMsg

func (b *ServerUtils) SendMsg(groupName string, msg message.Message) error

func (*ServerUtils) SetBackendPoolSize

func (b *ServerUtils) SetBackendPoolSize(num int)

func (*ServerUtils) SetBrokerPoolSize

func (b *ServerUtils) SetBrokerPoolSize(num int)

func (*ServerUtils) SetResult

func (b *ServerUtils) SetResult(result message.Result) error

type SortQueue

type SortQueue struct {
	sync.Mutex

	Queue []message.Message

	MaxLen int
	// contains filtered or unexported fields
}

func NewSortQueue

func NewSortQueue(maxLen int) SortQueue

func (*SortQueue) Get

func (s *SortQueue) Get(i int) message.Message

func (*SortQueue) Insert

func (s *SortQueue) Insert(msg message.Message) *message.Message

func (*SortQueue) IsFull

func (s *SortQueue) IsFull() bool

func (*SortQueue) Pop

func (s *SortQueue) Pop() *message.Message

type TaskCtl

type TaskCtl struct {
	message.Message
	// contains filtered or unexported fields
}

func NewTaskCtl

func NewTaskCtl(msg message.Message) TaskCtl

func (*TaskCtl) Abort

func (t *TaskCtl) Abort(msg string)

func (TaskCtl) CanRetry

func (t TaskCtl) CanRetry() bool

func (TaskCtl) GetError

func (t TaskCtl) GetError() error

func (*TaskCtl) GetRetryCount

func (t *TaskCtl) GetRetryCount() int

func (*TaskCtl) GetRunTime

func (t *TaskCtl) GetRunTime() time.Time

func (*TaskCtl) GetTaskId

func (t *TaskCtl) GetTaskId() string

func (*TaskCtl) IsAbort

func (t *TaskCtl) IsAbort() (bool, error)

func (*TaskCtl) IsExpired

func (t *TaskCtl) IsExpired() bool

func (*TaskCtl) IsZeroRunTime

func (t *TaskCtl) IsZeroRunTime() bool

func (*TaskCtl) Retry

func (t *TaskCtl) Retry(err error)

func (*TaskCtl) SetError

func (t *TaskCtl) SetError(err error)

func (*TaskCtl) SetExpireTime

func (t *TaskCtl) SetExpireTime(_t time.Time)

func (*TaskCtl) SetRetryCount

func (t *TaskCtl) SetRetryCount(c int)

func (*TaskCtl) SetRunTime

func (t *TaskCtl) SetRunTime(_t time.Time)

func (*TaskCtl) SetServerUtil

func (t *TaskCtl) SetServerUtil(su *ServerUtils)

type TaskCtlWorkflowArgs

type TaskCtlWorkflowArgs struct {
	GroupName  string
	WorkerName string
	RetryCount int
	RunAfter   time.Duration
	ExpireTime time.Time
}

type TaskMessage

type TaskMessage struct {
	Id         string   `json:"id"`
	WorkerName string   `json:"worker_name"`
	FuncArgs   []string `json:"func_args"` //yjson string slice
	Ctl        TaskCtl  `json:"task_ctl"`
}

type WorkerInterface

type WorkerInterface interface {
	Run(ctl *TaskCtl, funcArgs []string, result *message.Result) error
	WorkerName() string
	After(ctl *TaskCtl, funcArgs []string, result *message.Result) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL