taskpool

package module
v1.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: MIT Imports: 10 Imported by: 1

README

1. 介绍

  • 支持预分配协程和用多少申请多少
  • 分配的协程都有一个生命周期,生命周期到了会被自动回收
  • 协程池最少有一个哨兵协程,最多有 maxWork + 1(哨兵)
  • 如果要关闭协程池需要手动释放, 异常情况时会自动释放

2. 使用

go get -u gitee.com/xuesongtao/taskpool
    pushPool := lib.NewTaskPool("poolName", 10, lib.WithProGoWorker())
    defer pushPool.SafeClose() // 局部使用需要使用这个进行 Close()

    everyTaskHandleSum := 500
    l := 50000
    pushIds := []string{xxx}
    totalPage := math.Ceil(float64(l) / float64(everyTaskHandleSum))
    for page := 1; page <= int(totalPage); page++ {
        // 根据切片分页
        startIndex, endIndex := lib.GetPapeSliceIndex(int32(page), int32(everyTaskHandleSum), l)
        tmpArr := pushIds[startIndex:endIndex]
        i.log.Infof("startIndex: %d, endIndex: %d, total: %d", startIndex, endIndex, l)

        // 阻塞式
        pushPool.Submit(func() {
            i.PushBatchMsg(tmpArr, info)
        })
    }
  • 其他: 在使用时需要注意闭包引用问题, 在 _example 下添加使用示例

最后

  • 欢迎大佬们指正, 希望大佬给 star

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Logger added in v1.2.0

type Logger interface {
	Info(v ...interface{})
	Infof(format string, v ...interface{})
	Error(v ...interface{})
	Errorf(format string, v ...interface{})
}

type TaskPool

type TaskPool struct {
	// contains filtered or unexported fields
}

TaskPool 任务池

func NewTaskPool

func NewTaskPool(poolName string, capacity int, opts ...TaskPoolOption) *TaskPool

NewTaskPool 通过此方法内部创建 ctx, 需要通过 Close() 来关闭协程池, 防止协程泄露

func (*TaskPool) Blocking

func (t *TaskPool) Blocking() int32

Blocking 获取阻塞的 worker 数量

func (*TaskPool) Close

func (t *TaskPool) Close()

Close 关闭协程池,

注意:

  1. 每次调用完一定要释放
  2. 局部使用推荐使用 SafeClose, 防止任务未执行完就退出

func (*TaskPool) FreeWorkerQueueLen

func (t *TaskPool) FreeWorkerQueueLen() int

FreeWorkerQueueLen 空闲队列池里的长度

func (*TaskPool) Running

func (t *TaskPool) Running() int32

Running 获取运行 worker 数量

func (*TaskPool) SafeClose added in v1.2.0

func (t *TaskPool) SafeClose(timeout ...time.Duration)

SafeClose 安全的关闭, 这样可以保证未处理的任务都执行完 注: 只能阻塞同步提交的任务

func (*TaskPool) Submit

func (t *TaskPool) Submit(task taskFunc, async ...bool)

Submit 对外通过此方法向协程池添加任务 使用:

  1. 如果任务为 func() 的话可以直接传入,
  2. 如果带参的 func 需要包裹下, 如: test(1, 2, 3) => func() {test(1, 2, 3)}

注: 调用 SafeClose(局部调用)的场景, 使用异步提交的时候会失败

type TaskPoolOption

type TaskPoolOption func(p *TaskPool)

func WithCtx added in v1.2.10

func WithCtx(ctx context.Context) TaskPoolOption

WithCtx 外部设置 content.Context

func WithPolTime

func WithPolTime(t time.Duration) TaskPoolOption

WithPolTime 设置 taskPool 中哨兵轮询地时间

func WithPoolLogger

func WithPoolLogger(logger Logger) TaskPoolOption

WithPoolLogger 设置日志 log

func WithPoolPrint added in v1.2.6

func WithPoolPrint(print bool) TaskPoolOption

WithPoolPrint 设置是否打印 log

func WithProGoWorker

func WithProGoWorker() TaskPoolOption

WithProGoWorker 预分配协程

func WithWorkerMaxLifeCycle

func WithWorkerMaxLifeCycle(timeForSec sec) TaskPoolOption

WithWorkerMaxLifeCycle 设置 taskPool 中空闲的 worker 存活的时间

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL