optimizer

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2022 License: MIT Imports: 6 Imported by: 0

README

Optimizer

WorkerQueue

工作队列, 可以不断往里面添加不同种类的任务, 一旦有资源空闲就去执行

package main

import (
	"context"
	"fmt"
	"github.com/lxzan/optimizer"
	"time"
)

func Add(args interface{}) error {
	arr := args.([]int)
	ans := 0
	for _, item := range arr {
		ans += item
	}
	fmt.Printf("args=%v, ans=%d\n", args, ans)
	return nil
}

func Mul(args interface{}) error {
	arr := args.([]int)
	ans := 1
	for _, item := range arr {
		ans *= item
	}
	fmt.Printf("args=%v, ans=%d\n", args, ans)
	return nil
}

func main() {
	args1 := []int{1, 3}
	args2 := []int{1, 3, 5}
	w := optimizer.NewWorkerQueue(context.Background(), 8)
	w.Push(
		optimizer.Job{Args: args1, Do: Add},
		optimizer.Job{Args: args1, Do: Mul},
		optimizer.Job{Args: args2, Do: Add},
		optimizer.Job{Args: args2, Do: Mul},
	)
	w.StopAndWait(50*time.Millisecond, 30*time.Second)
}
args=[1 3], ans=4
args=[1 3 5], ans=15
args=[1 3], ans=3
args=[1 3 5], ans=9
WorkerGroup

工作组, 添加一组任务, 等待任务完全被执行

package main

import (
	"context"
	"fmt"
	"github.com/lxzan/optimizer"
	"sync/atomic"
)

func main() {
	sum := int64(0)
	w := optimizer.NewWorkerGroup(context.Background(), 8)
	for i := int64(1); i <= 100; i++ {
		w.Push(i)
	}
	w.OnMessage = func(options interface{}) error {
		atomic.AddInt64(&sum, options.(int64))
		return nil
	}
	w.StartAndWait()
	fmt.Printf("sum=%d\n", sum)
}
sum=5050

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultWorker = NewWorkerQueue(context.Background(), 16)

Functions

This section is empty.

Types

type Job added in v1.1.0

type Job struct {
	Args interface{}
	Do   func(args interface{}) error
}

type Queue

type Queue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue() *Queue

func (*Queue) All added in v1.1.0

func (c *Queue) All() []interface{}

All 返回所有数据并清空队列

func (*Queue) Front

func (c *Queue) Front() interface{}

func (*Queue) Len

func (c *Queue) Len() int

func (*Queue) Push

func (c *Queue) Push(eles ...interface{})

type WorkerGroup added in v1.1.0

type WorkerGroup struct {
	sync.Mutex

	OnMessage func(options interface{}) error
	// contains filtered or unexported fields
}

func NewWorkerGroup added in v1.1.0

func NewWorkerGroup(ctx context.Context, concurrency int64) *WorkerGroup

NewWorkerGroup 新建一个任务集 concurrency: 最大并发协程数量

func (*WorkerGroup) Err added in v1.1.0

func (c *WorkerGroup) Err() error

Err 获取错误返回

func (*WorkerGroup) Len added in v1.1.0

func (c *WorkerGroup) Len() int

Len 获取队列中剩余任务数量

func (*WorkerGroup) Push added in v1.1.0

func (c *WorkerGroup) Push(eles ...interface{})

Push 往任务队列中追加任务

func (*WorkerGroup) StartAndWait added in v1.1.0

func (c *WorkerGroup) StartAndWait()

StartAndWait 启动并等待所有任务执行完成

type WorkerQueue added in v1.1.0

type WorkerQueue struct {
	OnError func(err error)
	// contains filtered or unexported fields
}

func NewWorkerQueue added in v1.1.0

func NewWorkerQueue(ctx context.Context, concurrency int64) *WorkerQueue

NewWorkerQueue 创建一个工作队列 concurrency 最大并发协程数量

func (*WorkerQueue) Push added in v1.1.0

func (c *WorkerQueue) Push(jobs ...Job)

Push 追加任务, 有资源空闲的话会立即执行

func (*WorkerQueue) StopAndWait added in v1.1.0

func (c *WorkerQueue) StopAndWait(timeout time.Duration)

Stop 优雅退出 timeout 超时时间

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL