grpool

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: MIT Imports: 9 Imported by: 0

README

grpool

Introduction

grpool is a groutine pool which can provide a fixed size of capacity, recycle the stale workers.

Installation

go get -u github.com/POABOB/grpool

How to use

If you want to process a massive number of jobs, you don't need to use the same number of goroutines. It's wasteful to use excessive memory, leading to high consumption.

A Simple Example with Default Pool
package main

import (
	"fmt"
	"strconv"
	"sync"
	"sync/atomic"

	"github.com/POABOB/grpool"
)

var wg sync.WaitGroup
var count int32 = 0

func main() {
	// Use default Pool Will use the math.MaxInt32 of capacity.
	pool := grpool.NewDefaultPool()
	// Release worker resource
	defer pool.Release()

	for i := 0; i < 10000; i++ {
		wg.Add(1)
		_ = pool.Schedule(func() {
			printFunc(i)
		})
	}

	wg.Wait()
    
	fmt.Printf("running goroutines: %d\n", pool.Running())
}

func printFunc(i int) {
	atomic.AddInt32(&count, 1)
	fmt.Println("Hello World! I am worker" + strconv.Itoa(i) + " from goroutine pool.")
	wg.Done()
}
Limit the goroutines and pre-alloc it
package main

import (
	"fmt"
	"strconv"
	"sync"
	"sync/atomic"

	"github.com/POABOB/grpool"
)

var count int32 = 0
var wg sync.WaitGroup

func main() {
	// init a capacity of 18 goroutine pool with preallocing the space.
	pool, _ := grpool.NewPool(18, grpool.WithPreAlloc(true))
	// Release worker resource
	defer pool.Release()

	for i := 0; i < 10; i++ {
		wg.Add(1)
		_ = pool.Schedule(func() {
			printFunc(i)
		})
	}

	wg.Wait()

	fmt.Printf("running goroutines: %d\n", pool.Running())
}

func printFunc(i int) {
	atomic.AddInt32(&count, 1)
	fmt.Println("Hello World! I am worker" + strconv.Itoa(i) + " from goroutine pool.")
	wg.Done()
}
Use non-blocking pool
pool, err := grpool.NewPool(1000, grpool.WithNonblocking(true))
Customize panic handler
func ph(v interface{}) {
    // dosomething...
	fmt.Printf("[panic occurred]: %v\n", v)
}
pool, err := grpool.NewPool(1000, grpool.WithPanicHandler(ph))
Customize the time interval of clear stale worker
pool, err := grpool.NewPool(1000, grpool.WithExpiryDuration(time.Second * 5))

License

MIT License

Documentation

Index

Constants

View Source
const (
	// 預設 Pool 最大容量
	DefaultPoolSize = math.MaxInt32

	// 預設每 1 秒清理一次 Pool
	DefaultCleanIntervalTime = time.Second
)
View Source
const (
	// Pool 開啟
	OPENED = 0

	// Pool 關閉
	CLOSED = 1
)

Pool 狀態

Variables

View Source
var (
	ErrLackPoolFunc        = errors.New("must provide func for pool")
	ErrInvalidPoolExpiry   = errors.New("invalid pool expiry")
	ErrPoolClosed          = errors.New("pool has been closed")
	ErrPoolOverload        = errors.New("too many goroutines blocked or Nonblocking is set")
	ErrInvalidPreAllocSize = errors.New("can not set up a negative capacity under PreAlloc mode")
	ErrTimeout             = errors.New("operation timed out")
)

定義各種錯誤

Functions

This section is empty.

Types

type Option

type Option func(opts *Options)

參數設定

func WithDisableClear

func WithDisableClear(disable bool) Option

是否要關閉 Clear

func WithExpiryDuration

func WithExpiryDuration(expiryDuration time.Duration) Option

設定過期時間

func WithNonblocking

func WithNonblocking(nonblocking bool) Option

若為 true,代表沒有有用的 Worker 時,會直接 ErrPoolOverload

func WithOptions

func WithOptions(options Options) Option

直接傳入 Options

func WithPanicHandler

func WithPanicHandler(panicHandler func(interface{})) Option

Panic 事件處理

func WithPreAlloc

func WithPreAlloc(preAlloc bool) Option

設定是否要提前創建空間

type Options

type Options struct {
	// 過期時間: 用於定時清理過期的 Worker (只要太久沒被使用的 Worker 就會被清理),預設為 1 秒
	ExpiryDuration time.Duration

	// 是否提前申請空間,大量執行需求中使用
	PreAlloc bool

	// Nonblocking 用來阻塞任務
	// 若設定為 true,就會返回 ErrPoolOverload 錯誤
	Nonblocking bool

	// 用來處理 worker panic 發生的事件
	PanicHandler func(interface{})

	// 若設定為 true,Worker 就不會被自動清除
	DisableClear bool
}

該設定會被用來引入至 Pool 中

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewDefaultPool

func NewDefaultPool() (defaultPool *Pool)

初始化一個預設的 Pool

func NewPool

func NewPool(size int, options ...Option) (*Pool, error)

初始化

func (*Pool) Cap

func (p *Pool) Cap() int

獲取 Pool 容量

func (*Pool) ClearStaleWorkers

func (p *Pool) ClearStaleWorkers(ctx context.Context)

定時清理過期的 workers

func (*Pool) Free

func (p *Pool) Free() int

Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

判斷是否被關閉

func (*Pool) Reboot

func (p *Pool) Reboot()

重啟一個可以使用的 Pool

func (*Pool) Release

func (p *Pool) Release()

清除 Pool 裡面的 Worker

func (*Pool) Running

func (p *Pool) Running() int

獲取正在執行的 Worker 數量

func (*Pool) Schedule

func (p *Pool) Schedule(task func()) error

獲取 worker 執行任務

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL