Documentation ¶
Overview ¶
Package gpool is used to easily manages a resizeable pool of context aware goroutines to bound concurrency, A Job is Enqueued to the pool and only N jobs can be processing concurrently.
A Job is simply a func(){} When you Enqueue(ctx, func(){}) a job the call will return ONCE the job has started processing. Otherwise if the pool is full it will block until: 1. pool has room for the job. 2. job's context is canceled. 3. the pool is stopped. A Pool is either closed or started, the Pool will not accept any job unless pool.Start() is called. Stopping the Pool using pool.Stop() it will wait for all processing jobs to finish before returning, it will also unblock any blocked job enqueues (enqueues will return ErrPoolClosed). The Pool can be re-sized using Resize() that will resize the pool in a concurrent safe-way. Resize can enlarge the pool and any blocked enqueue will unblock after pool is resized, in case of shrinking the pool resize will not affect any already processing job. Enqueuing a Job will return error nil once a job starts, ErrPoolClosed if the pool is closed, or ctx.Err() if the job's context is canceled while blocking waiting for the pool. Start, Stop, and Resize(N) is all concurrent safe and can be called from multiple goroutines, subsequent calls of Start or Stop has no effect unless called interchangeably.
Example (One) ¶
Example 1 - Simple Job Enqueue
package main import ( "context" "log" "time" "github.com/sherifabdlnaby/gpool" ) func main() { concurrency := 2 // Create and start pool. pool := gpool.NewPool(concurrency) defer pool.Stop() // Create JOB resultChan1 := make(chan int) ctx := context.Background() job := func() { time.Sleep(2000 * time.Millisecond) resultChan1 <- 1337 } // Enqueue Job err1 := pool.Enqueue(ctx, job) if err1 != nil { log.Printf("Job was not enqueued. Error: [%s]", err1.Error()) return } log.Printf("Job Enqueued and started processing") log.Printf("Job Done, Received: %v", <-resultChan1) }
Output:
Example (Three) ¶
Example 3 - Enqueue 10 Jobs and Stop pool mid-processing.
package main import ( "context" "fmt" "log" "time" "github.com/sherifabdlnaby/gpool" ) func main() { // Create and start pool. pool := gpool.NewPool(2) defer pool.Stop() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { for i := 0; i < 10; i++ { // Small Interval for more readable output time.Sleep(500 * time.Millisecond) go func(i int) { x := make(chan int, 1) log.Printf("Job [%v] Enqueueing", i) err := pool.Enqueue(ctx, func() { time.Sleep(2000 * time.Millisecond) x <- i }) if err != nil { log.Printf("Job [%v] was not enqueued. [%s]", i, err.Error()) return } log.Printf("Job [%v] Enqueue-ed ", i) log.Printf("Job [%v] Receieved, Result: [%v]", i, <-x) }(i) } }() // Uncomment to demonstrate ctx cancel of jobs. //time.Sleep(100 * time.Millisecond) //cancel() time.Sleep(5000 * time.Millisecond) fmt.Println("Stopping...") pool.Stop() fmt.Println("Stopped") fmt.Println("Sleeping for couple of seconds so canceled job have a chance to print out their status") time.Sleep(4000 * time.Millisecond) }
Output:
Example (Two) ¶
Example 2 - Enqueue A Job with Timeout
package main import ( "context" "log" "time" "github.com/sherifabdlnaby/gpool" ) func main() { concurrency := 2 // Create and start pool. pool := gpool.NewPool(concurrency) defer pool.Stop() // Create JOB resultChan := make(chan int) ctx := context.Background() job := func() { resultChan <- 1337 } // Enqueue 2 Jobs to fill pool (Will not finish unless we pull result from resultChan) _ = pool.Enqueue(ctx, job) _ = pool.Enqueue(ctx, job) ctxWithTimeout, cancel := context.WithTimeout(ctx, 1000*time.Millisecond) defer cancel() // Will block for 1 second only because of Timeout err1 := pool.Enqueue(ctxWithTimeout, job) if err1 != nil { log.Printf("Job was not enqueued. Error: [%s]", err1.Error()) } log.Printf("Job 1 Done, Received: %v", <-resultChan) log.Printf("Job 2 Done, Received: %v", <-resultChan) }
Output:
Index ¶
- Variables
- type Pool
- func (w *Pool) Enqueue(ctx context.Context, job func()) error
- func (w *Pool) EnqueueAndWait(ctx context.Context, job func()) error
- func (w *Pool) GetCurrent() int
- func (w *Pool) GetSize() int
- func (w *Pool) GetWaiting() int
- func (w *Pool) Resize(newSize int)
- func (w *Pool) Start()
- func (w *Pool) Stop()
- func (w *Pool) TryEnqueue(job func()) bool
- func (w *Pool) TryEnqueueAndWait(job func()) bool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPoolInvalidSize Returned if the Size of pool < 1. ErrPoolInvalidSize = errors.New("pool size is invalid, pool size must be >= 0") // ErrPoolStopped Error Returned if the Pool has not pool_started yet, or was stopped. ErrPoolStopped = errors.New("pool is stopped") )
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is an implementation of gpool.Pool interface to bound concurrency using a Semaphore.
func NewPool ¶ added in v1.0.0
NewPool returns a pool that uses semaphore implementation. Returns ErrPoolInvalidSize if size is < 1.
func (*Pool) Enqueue ¶
Enqueue Process job `func(){...}` and returns ONCE the func has started executing (not after it ends/return)
If the pool is full `Enqueue()` will block until either:
1- A worker/slot in the pool is done and is ready to take the job. 2- The Job context is canceled. 3- The Pool is closed by `pool.Stop()`.
@Returns nil once the job has started executing. @Returns ErrPoolStopped if the pool is not running. @Returns ctx.Err() if the job Enqueued context was canceled before the job could be processed by the pool.
func (*Pool) EnqueueAndWait ¶ added in v1.0.0
EnqueueAndWait Process job `func(){...}` and returns ONCE the func has returned.
If the pool is full `Enqueue()` will block until either:
1- A worker/slot in the pool is done and is ready to take the job. 2- The Job context is canceled. 3- The Pool is closed by `pool.Stop()`.
@Returns nil once the job has executed and returned. @Returns ErrPoolStopped if the pool is not running. @Returns ctx.Err() if the job Enqueued context was canceled before the job could be processed by the pool.
func (*Pool) GetCurrent ¶ added in v1.0.0
GetCurrent returns the current size of the pool.
func (*Pool) GetWaiting ¶ added in v1.0.0
GetWaiting return the current size of jobs waiting in the pool.
func (*Pool) Resize ¶ added in v0.2.0
Resize the pool size in concurrent-safe way.
`Resize` can enlarge the pool and any blocked enqueue will unblock after pool is resized, in case of shrinking the pool `resize` will not affect any already processing job.
func (*Pool) Start ¶
func (w *Pool) Start()
Start the Pool, otherwise it will not accept any job.
Subsequent calls to Start will not have any effect unless Stop() is called.
func (*Pool) Stop ¶
func (w *Pool) Stop()
Stop the Pool.
1- ALL Blocked/Waiting jobs will return immediately. 2- All Jobs Processing will finish successfully 3- Stop() WILL Block until all running jobs i s done.
Subsequent Calls to Stop() will have no effect unless start() is called.
func (*Pool) TryEnqueue ¶
TryEnqueue will not block if the pool is full, will return true once the job has started processing or false if the pool is closed or full.
func (*Pool) TryEnqueueAndWait ¶ added in v1.0.0
TryEnqueueAndWait will not block if the pool is full, will return true once the job has finished processing or false if the pool is closed or full.