pool

package module
v0.0.0-...-9365e71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: MIT Imports: 4 Imported by: 0

README

worker-pools

Go Report Card

Go package for managing a set of lazily constructed, self-expiring, concurrency-limited worker pools.

maxConcurrentWorkloads := 500
stalePoolExpiration := 10*time.Minute
maxPoolLifetime := 4*time.Hour
poolManager := pool.NewWorkerPoolManager(
  maxConcurrentWorkloads, stalePoolExpiration, maxPoolLifetime,
)

pool, doneUsing := poolManager.GetPool("pool 1")
pool.Submit(func() {
  // Do anything here. Only maxConcurrentWorkloads will be allowed to execute concurrently per pool.
  // This is useful for limiting concurrent usage of external resources.
})
close(doneUsing)

Each pool instance is constructed when it is required and cached for stalePoolExpiration each time it is used, up to a maximum of maxPoolLifetime if the pool is receiving constant usage. Multiple goroutines may safely reserve and use pools concurrently. The pool will spin up worker routines lazily as they're required, allowing for large levels of concurrency and a high cardinality of pools in the manager.

If you want to attach shared data or behavior to each pool instance:

type myPooledData struct {
	pool.WorkerPool

	// You can put shared data of any type here
	myData string
}
func (p *myPooledData) Dispose() {
	p.WorkerPool.Dispose()
	// Release shared data here.
}

poolManager := pool.NewWorkerPoolManager(500, 10*time.Minute, 4*time.Hour)

var poolFactory pool.Factory = func(maxSize int) (pool.WorkerPool, error) {
	workerPool, _ := pool.NewWorkerPool(maxSize)

	// Build shared resources here, return errors, etc.

	return &myPooledData{
		WorkerPool: workerPool,
		myData: "my shared data"
	}, nil
}

pool, doneUsing, err := s.ClientBundleManager.GetPoolWithFactory("pool 1", sendSize, bundleFactory)
fmt.Println(pool.(*myPooledData).myData)
// Any error returned in the Factory function will bubble up here
if err != nil {
  // Handle
}
pool.Submit(func() {
  // Do anything here. Only maxConcurrentWorkloads will be allowed to execute concurrently per pool.
})
close(doneUsing)

See GoDoc for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseWorkerPool

type BaseWorkerPool struct {
	// contains filtered or unexported fields
}

BaseWorkerPool is the base implementation of WorkerPool

func (*BaseWorkerPool) Dispose

func (p *BaseWorkerPool) Dispose()

Dispose the pool, closing down the workers and releasing any shared resources.

func (*BaseWorkerPool) Submit

func (p *BaseWorkerPool) Submit(w Work)

Submit an item of Work to be executed.

When all workers are busy, and an additional workerPoolMaxSize of pending work beyond that is also already enqueued, this method will block until workers become available.

type Factory

type Factory func(maxSize int) (WorkerPool, error)

Factory builds a new WorkerPool

type Work

type Work func()

Work - a unit of work

type WorkerPool

type WorkerPool interface {
	Submit(w Work)
	Dispose()
	// contains filtered or unexported methods
}

WorkerPool is a fixed-size pool of workers.

func NewWorkerPool

func NewWorkerPool(maxSize int) (WorkerPool, error)

NewWorkerPool builds a new BaseWorkerPool and return it as a WorkerPool. This is the default pool factory.

type WorkerPoolManager

type WorkerPoolManager struct {
	// contains filtered or unexported fields
}

WorkerPoolManager - Self-expiring, lazily constructed map of fixed-size worker pools safe for concurrent use

func NewWorkerPoolManager

func NewWorkerPoolManager(
	poolSize int, stalePoolExpiration time.Duration, maxPoolLifetime time.Duration,
) *WorkerPoolManager

NewWorkerPoolManager factory constructor

* poolSize - The max number of workers for each key * stalePoolExpiration - how long to cache unused pools for * maxPoolLifetime - max time to allow pools to live

func (*WorkerPoolManager) Dispose

func (m *WorkerPoolManager) Dispose()

Dispose clears the underlying cache and stops launched goroutines

func (*WorkerPoolManager) GetPool

func (m *WorkerPoolManager) GetPool(key string, sendSize int) (WorkerPool, chan<- bool)

GetPool returns the WorkerPool for this key, building a BaseWorkerPool and caching it if necessary. Spawns sendSize workers, up to a max of the manager's poolSize.

This returns the pool in an "unexpirable" state - the caller should signal the returned done channel when it no longer requires the returned bundle.

func (*WorkerPoolManager) GetPoolWithFactory

func (m *WorkerPoolManager) GetPoolWithFactory(
	key string, sendSize int, factory Factory,
) (WorkerPool, chan<- bool, error)

GetPoolWithFactory returns the WorkerPool for this key, allowing you to specify a custom pool.Factory if you want to build a custom WorkerPool implementation which embeds a BaseWorkerPool and attaches supplimentary shared data for the pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL