gxsync

package
v1.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2023 License: Apache-2.0 Imports: 13 Imported by: 37

README

Sync

WorkerPool

WorkerPool is an interface which defined the behaviors of worker pool.

baseWorkerPool

baseWorkerPool is a versatile goroutine pool with multiple queues. You may refer to base_worker_pool.go for the architecture. baseWorkerPool provides some basic functions, like initialization, shutdown, etc. Developers are allowed to determine how to dispatch tasks to each queue, for example, dispatch the tasks to each queue fairly using Round Robin.

Worker LifeCycle

p.dispatch(numWorkers, wg) creates workers and dispatches them to each queue equally. The method receives an instance of WaitGroup in order to block newBaseWorkerPool method until all workers are available.

At the end, the worker will be killed if task queue they monitored is closed.

Why multi-queue structure?

In general, the worker pool has a queue only. baseWokerPool of gost, however, has multiple queues. Why? We found that multi-queue structure could improve performance greatly, especially in the case of using large number of goroutines.

There are some of benchmark results for your reference. The environment settings are:

  • MacBook Air 2020 with M1 Chip
  • Memory: 16GB
  • Golang: go1.16.6 darwin/arm64
  • @workers: 700
  • Tasks: CPUTask, IOTask and RandomTask

TaskPool(baseWorkerPool based)

BenchmarkTaskPool_CPUTask/AddTaskBalance-8        	  138986	     14779 ns/op
BenchmarkTaskPool_IOTask/AddTaskBalance-8  	          2872380	     440.2 ns/op
BenchmarkTaskPool_RandomTask/AddTaskBalance-8  	      2365293	     530.0 ns/op

WorkerPool

BenchmarkWorkerPool_CPUTask/Submit-8         	   70400	     17939 ns/op
BenchmarkWorkerPool_IOTask/Submit-8         	   1000000	   1011 ns/op
BenchmarkWorkerPool_RandomTask/Submit-8          1858268	   645.5 ns/op

WorkerPool with single queue

BenchmarkConnectionPool/CPUTask-8           	 1844893	     16738 ns/op
BenchmarkConnectionPool/IOTask-8            	 1000000	     16047 ns/op
BenchmarkConnectionPool/RandomTask-8        	 1000000	      1143 ns/op

ConnectionPool

ConnectionPool is a pool designed for managing connection based on baseWorkerPool. The pool will reject new tasks insertion if reaches the limitation.

When a new task is arriving, the task will be put into a queue using Round-Robin algorithm at the first time. If failed, the task will be put to a random queue within len(p.taskQueues)/2 times. If all attempts are failed, it means the pool reaches the limitation, and the task will be rejected eventually.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PoolBusyErr = perrors.New("pool is busy")
)

Functions

This section is empty.

Types

type ConnectionPool added in v1.11.16

type ConnectionPool struct {
	// contains filtered or unexported fields
}

func (ConnectionPool) Close added in v1.11.16

func (p ConnectionPool) Close()

func (ConnectionPool) IsClosed added in v1.11.16

func (p ConnectionPool) IsClosed() bool

func (ConnectionPool) NumWorkers added in v1.11.16

func (p ConnectionPool) NumWorkers() int32

func (*ConnectionPool) Submit added in v1.11.16

func (p *ConnectionPool) Submit(t task) error

func (*ConnectionPool) SubmitSync added in v1.11.16

func (p *ConnectionPool) SubmitSync(t task) error

type GenericTaskPool added in v1.9.6

type GenericTaskPool interface {
	// AddTask wait idle worker add task
	AddTask(t task) bool
	// AddTaskAlways add task to queues or do it immediately
	AddTaskAlways(t task)
	// AddTaskBalance add task to idle queue
	AddTaskBalance(t task)
	// Close use to close the task pool
	Close()
	// IsClosed use to check pool status.
	IsClosed() bool
}

GenericTaskPool represents an generic task pool.

func NewTaskPool

func NewTaskPool(opts ...TaskPoolOption) GenericTaskPool

NewTaskPool build a task pool

func NewTaskPoolSimple added in v1.9.6

func NewTaskPoolSimple(size int) GenericTaskPool

NewTaskPoolSimple build a simple task pool

type TaskPool

type TaskPool struct {
	TaskPoolOptions
	// contains filtered or unexported fields
}

/////////////////////////////////////// Task Pool /////////////////////////////////////// task pool: manage task ts

func (*TaskPool) AddTask

func (p *TaskPool) AddTask(t task) (ok bool)

return false when the pool is stop

func (*TaskPool) AddTaskAlways added in v1.7.0

func (p *TaskPool) AddTaskAlways(t task)

func (*TaskPool) AddTaskBalance added in v1.7.0

func (p *TaskPool) AddTaskBalance(t task)

do it immediately when no idle queue

func (*TaskPool) Close

func (p *TaskPool) Close()

func (*TaskPool) IsClosed

func (p *TaskPool) IsClosed() bool

check whether the session has been closed.

type TaskPoolOption

type TaskPoolOption func(*TaskPoolOptions)

func WithTaskPoolTaskPoolSize

func WithTaskPoolTaskPoolSize(size int) TaskPoolOption

WithTaskPoolTaskPoolSize set @size of the task queue pool size

func WithTaskPoolTaskQueueLength

func WithTaskPoolTaskQueueLength(length int) TaskPoolOption

WithTaskPoolTaskQueueLength set @length of the task queue length

func WithTaskPoolTaskQueueNumber

func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption

WithTaskPoolTaskQueueNumber set @number of the task queue number

type TaskPoolOptions

type TaskPoolOptions struct {
	// contains filtered or unexported fields
}

TaskPoolOptions is optional settings for task pool

type WorkerPool added in v1.11.16

type WorkerPool interface {
	// Submit adds a task to queue asynchronously.
	Submit(task) error
	// SubmitSync adds a task to queue synchronously.
	SubmitSync(task) error
	// Close closes the worker pool
	Close()
	// IsClosed returns close status of the worker pool
	IsClosed() bool
	// NumWorkers returns the number of workers
	NumWorkers() int32
}

func NewConnectionPool added in v1.11.16

func NewConnectionPool(config WorkerPoolConfig) WorkerPool

type WorkerPoolConfig added in v1.11.16

type WorkerPoolConfig struct {
	NumWorkers int
	NumQueues  int
	QueueSize  int
	Logger     gxlog.Logger
	Enable     bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL