chanque

package module
v1.0.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2023 License: MIT Imports: 17 Imported by: 2

README

chanque

MIT License GoDoc Go Report Card Releases

chanque provides simple framework for asynchronous programming and goroutine management and safe use of channel.

Installation

$ go get github.com/octu0/chanque

Usage

Queue

Queue implementation.
It provides blocking and non-blocking methods, as well as panic handling of channels.

func main() {
	que1 := chanque.NewQueue(10)
	defer que1.Close()

	go func() {
		for {
			val := que1.Dequeue()
			fmt.Println(val.(string))
		}
	}()
	if ok := que1.Enqueue("hello"); ok {
		fmt.Println("enqueue")
	}

	que2 := chanque.NewQueue(10,
		QueuePanicHandler(func(pt chanque.PanicType, rcv interface{}) {
			fmt.Println("panic occurred", rcv.(error))
		}),
	)
	defer que2.Close()
	if ok := que2.EnqueueNB("world w/ non-blocking enqueue"); ok {
		fmt.Println("enqueue")
	}
}
Executor

WorkerPool implementation,
which limits the number of concurrent executions of goroutines and creates goroutines as needed,
and can also be used as goroutine resource management.

func main() {
	// minWorker 1 maxWorker 2
	exec := chanque.NewExecutor(1, 2)
	defer exec.Release()

	exec.Submit(func() {
		fmt.Println("job1")
		time.Sleep(1 * time.Second)
	})
	exec.Submit(func() {
		fmt.Println("job2")
		time.Sleep(1 * time.Second)
	})

	// Blocking because it became the maximum number of workers,
	// executing when there are no more workers running
	exec.Submit(func() {
		fmt.Println("job3")
	})

	// Generate goroutines on demand up to the maximum number of workers.
	// Submit does not block up to the size of MaxCapacity
	// Workers that are not running are recycled to minWorker at the time of ReduceInterval.
	exec2 := chanque.NewExecutor(10, 50,
		chanque.ExecutorMaxCapacicy(1000),
		chanque.ExecutorReducderInterval(60*time.Second),
	)
	defer exec2.Release()

	for i := 0; i < 100; i += 1 {
		exec2.Submit(func(i id) func() {
			return func() {
				fmt.Println("heavy process", id)
				time.Sleep(100 * time.Millisecond)
				fmt.Println("done process", id)
			}
		}(i))
	}

	// On-demand tune min/max worker size
	exec.TuneMaxWorker(10)
	exec.TuneMinWorker(5)
}
Worker

Worker implementation for asynchronous execution, register WorkerHandler and execute it with Enqueue parameter.
Enqueue of parameter is blocked while WorkerHandler is running.
There is also a BufferWorker implementation that non-blocking enqueue during asynchronous execution.

func main() {
	handler := func(param interface{}) {
		if s, ok := param.(string); ok {
			fmt.Println(s)
		}
		time.Sleep(1 * time.Second)
	}

	// DefaultWorker executes in order, waiting for the previous one
	w1 := chanque.NewDefaultWorker(handler)
	defer w1.Shutdown()

	go func() {
		w1.Enqueue("hello")
		w1.Enqueue("world") // blocking during 1 sec
	}()

	w2 := chanque.NewBufferWorker(handler)
	defer w2.Shutdown()

	go func() {
		w2.Enqueue("hello")
		w2.Enqueue("world") // non-blocking
	}()

	// BufferWorker provides helpers for performing sequential operations
	// by using PreHook and PostHook to perform the operations collectively.
	pre := func() {
		db.Begin()
	}
	post := func() {
		db.Commit()
	}
	hnd := func(param interface{}) {
		db.Insert(param.(string))
	}
	w3 := chanque.NewBufferWorker(hnd,
		WorkerPreHook(pre),
		WorkerPostHook(post),
	)
	for i := 0; i < 100; i += 1 {
		w3.Enqueue(strconv.Itoa(i))
	}
	w3.ShutdownAndWait()
}
Parallel

Parallel provides for executing in parallel and acquiring the execution result.
extended implementation of Worker.

func main() {
	executor := chanque.NewExecutor(10, 100)
	defer executor.Release()

	para := chanque.NewParallel(
		executor,
		chanque.Parallelism(2),
	)
	para.Queue(func() (interface{}, error) {
		return "#1 result", nil
	})
	para.Queue(func() (interface{}, error) {
		return "#2 result", nil
	})
	para.Queue(func() (interface{}, error) {
		return nil, errors.New("#3 error")
	})

	future := para.Submit()
	for _, r := range future.Result() {
		if r.Value() != nil {
			println("result:", r.Value().(string))
		}
		if r.Err() != nil {
			println("error:", r.Err().Error())
		}
	}
}
Retry

Retry provides function retry based on the exponential backoff algorithm.

func main() {
	retry := chanque.NewRetry(
		chanque.RetryMax(10),
		chanque.RetryBackoffIntervalMin(100*time.Millisecond),
		chanque.RetryBackoffIntervalMax(30*time.Second),
	)
	future := retry.Retry(func(ctx context.Context) (interface{}, error) {
		req, _ := http.NewRequest("GET", url, nil)
		client := &http.Client{}
		resp, err := client.Do(req)
		if err != nil {
			return nil, err
		}
		...
		return ioutil.ReadAll(resp.Body), nil
	})
	r := future.Result()
	if err := r.Err(); err != nil {
		panic(err.Error())
	}
	fmt.Printf("GET resp = %s", r.Value().([]byte))
}
Wait

Wait provides wait handling like sync.WaitGroup and context.Done.
Provides implementations for patterns that run concurrently, wait for multiple processes, wait for responses, and many other use cases.

func one() {
	w := WaitOne()
	defer w.Cancel()

	go func(w *Wait) {
		defer w.Done()

		fmt.Println("heavy process")
	}(w)

	w.Wait()
}

func any() {
	w := WaitN(10)
	defer w.Cancel()

	for i := 0; i < 10; i += 1 {
		go func(w *Wait) {
			defer w.Done()

			fmt.Println("N proc")
		}(w)
	}
	w.Wait()
}

func sequencial() {
	w1, := WaitOne()
	defer w1.Cancel()
	go Preprocess(w1)

	w2 := WaitOne()
	defer w2.Cancel()
	go Preprocess(w2)

	ws := WaitSeq(w1, w2)
	defer ws.Cancel()

	// Wait for A.Done() -> B.Done() -> ... N.Done() ordered
	ws.Wait()
}

func rendezvous() {
	wr := WaitRendez(2)
	defer wr.Cancel()

	go func() {
		if err := wr.Wait(); err != nil {
			fmt.Println("timeout or cancel")
			return
		}
		fmt.Println("run sync")
	}()
	go func() {
		if err := wr.Wait(); err != nil {
			fmt.Println("timeout or cancel")
			return
		}
		fmt.Println("run sync")
	}()
}

func req() {
	wreq := WaitReq()
	defer wreq.Cancel()

	go func() {
		if err := wreq.Req("hello world"); err != nil {
			fmt.Println("timeout or cancel")
		}
		fmt.Println("send req")
	}()

	v, err := wreq.Wait()
	if err != nil {
		fmt.Println("timeout or cancel")
	}
	fmt.Println(v.(string)) // => "hello world"
}

func reqreply() {
	wrr := WaitReqReply()
	go func() {
		v, err := wrr.Req("hello")
		if err != nil {
			fmt.Println("timeout or cancel")
		}
		fmt.Println(v.(string)) // => "hello world2"
	}()
	go func() {
		err := wrr.Reply(func(v interface{}) (interface{}, err) {
			s := v.(string)
			return s + " world2", nil
		})
		if err != nil {
			fmt.Println("timeout or cancel")
		}
	}()
}
Loop

Loop provides safe termination of an infinite loop by goroutine.
You can use callbacks with Queue and time.Ticker.

func newloop() {
	e := NewExecutor(1, 10)

	queue := NewQueue(0)

	loop := NewLoop(e)
	loop.SetDequeue(func(val interface{}, ok bool) chanque.LoopNext {
		if ok != true {
			// queue closed
			return chanque.LoopNextBreak
		}
		println("queue=", val.(string))
		return chanque.LoopNextContinue
	}, queue)

	loop.ExecuteTimeout(10 * time.Second)

	go func() {
		queue.Enqueue("hello1")
		queue.Enqueue("hello2")
		time.Sleep(1 * time.Second)
		queue.EnqueueNB("world") // Enqueue / EnqueueNB / EnqueueRetry
	}()
	go func() {
		time.Sleep(1 * time.Second)
		loop.Stop() // done for loop
	}()
}
Pipeline

Pipeline provides sequential asynchronous input and output. Execute func combination asynchronously

func main() {
	calcFn := func(parameter interface{}) (interface{}, error) {
		// heavy process
		time.Sleep(1 * time.Second)

		if val, ok := parameter.(int); ok {
			return val * 2, nil
		}
		return -1, fmt.Errorf("invalid parameter")
	}
	outFn := func(result interface{}, err error) {
		if err != nil {
			fmt.Fatal(err)
			return
		}

		fmt.Println("value =", parameter.(int))
	}

	pipe := chanque.NewPipeline(calcFn, outFn)
	pipe.Enqueue(10)
	pipe.Enqueue(20)
	pipe.Enqueue(30)
	pipe.ShutdownAndWait()
}

Documentation

https://godoc.org/github.com/octu0/chanque

Benchmark

go func() vs Executor

$ go test -v -run=BenchmarkExecutor -bench=BenchmarkExecutor -benchmem  ./
goos: darwin
goarch: amd64
pkg: github.com/octu0/chanque
BenchmarkExecutor/goroutine-8         	 1000000	      2306 ns/op	     544 B/op	       2 allocs/op
BenchmarkExecutor/executor/100-1000-8 	  952410	      1252 ns/op	      16 B/op	       1 allocs/op
BenchmarkExecutor/executor/1000-5000-8    795402	      1327 ns/op	      18 B/op	       1 allocs/op
--- BENCH: BenchmarkExecutor
    executor_test.go:19: goroutine           	TotalAlloc=546437344	StackInUse=1996357632
    executor_test.go:19: executor/100-1000   	TotalAlloc=25966144	StackInUse=-1993277440
    executor_test.go:19: executor/1000-5000  	TotalAlloc=16092752	StackInUse=7012352
PASS
ok  	github.com/octu0/chanque	6.935s

License

MIT, see LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	Version string = "1.0.22"
)

Variables

View Source
var (
	ErrRetryNotComplete    = errors.New("RetryFuture retry not complete")
	ErrRetryContextTimeout = errors.New("Retry context timeout or canceled")
)
View Source
var (
	ErrPipeClosed = errors.New("pipe queue closed")
)
View Source
var (
	RetryRand *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))
)

Functions

This section is empty.

Types

type Backoff added in v1.0.11

type Backoff struct {
	// contains filtered or unexported fields
}

func NewBackoff added in v1.0.11

func NewBackoff(min, max time.Duration) *Backoff

func NewBackoffNoJitter added in v1.0.11

func NewBackoffNoJitter(min, max time.Duration) *Backoff

func (*Backoff) Next added in v1.0.11

func (b *Backoff) Next() time.Duration

func (*Backoff) Reset added in v1.0.11

func (b *Backoff) Reset()

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(minWorker, maxWorker int, funcs ...ExecutorOptionFunc) *Executor

func (*Executor) CurrentStacktrace added in v1.0.20

func (e *Executor) CurrentStacktrace() []byte

func (*Executor) ForceStop

func (e *Executor) ForceStop()

func (*Executor) MaxWorker added in v1.0.6

func (e *Executor) MaxWorker() int

func (*Executor) MinWorker added in v1.0.6

func (e *Executor) MinWorker() int

func (*Executor) Release

func (e *Executor) Release()

release goroutines

func (*Executor) ReleaseAndWait

func (e *Executor) ReleaseAndWait()

func (*Executor) Running

func (e *Executor) Running() int32

return num of running workers

func (*Executor) SubExecutor added in v1.0.1

func (e *Executor) SubExecutor() *SubExecutor

func (*Executor) Submit

func (e *Executor) Submit(fn Job)

enqueue job

func (*Executor) TuneMaxWorker added in v1.0.8

func (e *Executor) TuneMaxWorker(nextMaxWorker int)

func (*Executor) TuneMinWorker added in v1.0.8

func (e *Executor) TuneMinWorker(nextMinWorker int)

func (*Executor) Workers

func (e *Executor) Workers() int32

return num of goroutines

type ExecutorOptionFunc

type ExecutorOptionFunc func(*optExecutor)

func ExecutorCollectStacktrace added in v1.0.20

func ExecutorCollectStacktrace(enable bool) ExecutorOptionFunc

func ExecutorContext

func ExecutorContext(ctx context.Context) ExecutorOptionFunc

func ExecutorMaxCapacity

func ExecutorMaxCapacity(capacity int) ExecutorOptionFunc

func ExecutorPanicHandler

func ExecutorPanicHandler(handler PanicHandler) ExecutorOptionFunc

func ExecutorReducderInterval

func ExecutorReducderInterval(interval time.Duration) ExecutorOptionFunc

type Job

type Job func()

type Loop added in v1.0.7

type Loop struct {
	// contains filtered or unexported fields
}

func NewLoop added in v1.0.7

func NewLoop(e *Executor, funcs ...LoopOptionFunc) *Loop

func (*Loop) Execute added in v1.0.7

func (lo *Loop) Execute()

func (*Loop) ExecuteTimeout added in v1.0.7

func (lo *Loop) ExecuteTimeout(timeout time.Duration)

func (*Loop) SetDefault added in v1.0.7

func (lo *Loop) SetDefault(h LoopDefaultHandler)

func (*Loop) SetDequeue added in v1.0.7

func (lo *Loop) SetDequeue(h LoopDequeueHandler, q *Queue)

func (*Loop) SetTicker added in v1.0.7

func (lo *Loop) SetTicker(h LoopTickerHandler, dur time.Duration)

func (*Loop) Stop added in v1.0.7

func (lo *Loop) Stop()

func (*Loop) StopAndWait added in v1.0.7

func (lo *Loop) StopAndWait()

type LoopDefaultHandler added in v1.0.7

type LoopDefaultHandler func() LoopNext

type LoopDequeueHandler added in v1.0.7

type LoopDequeueHandler func(val interface{}, ok bool) LoopNext

type LoopNext added in v1.0.7

type LoopNext uint8
const (
	LoopNextContinue LoopNext = iota + 1
	LoopNextBreak
)

type LoopOptionFunc added in v1.0.7

type LoopOptionFunc func(*optLoop)

func LoopContext added in v1.0.7

func LoopContext(ctx context.Context) LoopOptionFunc

type LoopTickerHandler added in v1.0.7

type LoopTickerHandler func() LoopNext

type PanicHandler

type PanicHandler func(PanicType, interface{})

type PanicType

type PanicType uint8
const (
	PanicTypeEnqueue PanicType = iota + 1
	PanicTypeDequeue
	PanicTypeClose
)

func (PanicType) String

func (p PanicType) String() string

type Parallel added in v1.0.10

type Parallel struct {
	// contains filtered or unexported fields
}

func NewParallel added in v1.0.10

func NewParallel(e *Executor, funcs ...ParallelOptionFunc) *Parallel

func (*Parallel) Queue added in v1.0.10

func (p *Parallel) Queue(fn ParallelJob)

func (*Parallel) Submit added in v1.0.10

func (p *Parallel) Submit() *ParallelFuture

type ParallelFuture added in v1.0.10

type ParallelFuture struct {
	// contains filtered or unexported fields
}

func (*ParallelFuture) Result added in v1.0.10

func (f *ParallelFuture) Result() []ValueError

type ParallelFutureResult added in v1.0.10

type ParallelFutureResult struct {
	// contains filtered or unexported fields
}

func (*ParallelFutureResult) Results added in v1.0.10

func (r *ParallelFutureResult) Results() []ValueError

type ParallelJob added in v1.0.10

type ParallelJob func() (result interface{}, err error)

type ParallelOptionFunc added in v1.0.10

type ParallelOptionFunc func(*optParallel)

func ParallelContext added in v1.0.10

func ParallelContext(ctx context.Context) ParallelOptionFunc

func Parallelism added in v1.0.10

func Parallelism(p int) ParallelOptionFunc

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(inFunc PipelineInputFunc, outFunc PipelineOutputFunc, opts ...PipelineOptionFunc) *Pipeline

func (*Pipeline) CloseEnqueue

func (p *Pipeline) CloseEnqueue() bool

func (*Pipeline) Enqueue

func (p *Pipeline) Enqueue(parameter interface{}) bool

func (*Pipeline) Shutdown

func (p *Pipeline) Shutdown()

func (*Pipeline) ShutdownAndWait

func (p *Pipeline) ShutdownAndWait()

type PipelineInputFunc

type PipelineInputFunc func(parameter interface{}) (result interface{}, err error)

type PipelineOptionFunc

type PipelineOptionFunc func(*optPipeline)

func PipelineContext added in v1.0.13

func PipelineContext(ctx context.Context) PipelineOptionFunc

func PipelineExecutor added in v1.0.6

func PipelineExecutor(executor *Executor) PipelineOptionFunc

func PipelinePanicHandler

func PipelinePanicHandler(handler PanicHandler) PipelineOptionFunc

type PipelineOutputFunc

type PipelineOutputFunc func(result interface{}, err error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(c int, funcs ...QueueOptionFunc) *Queue

func (*Queue) Cap added in v1.0.6

func (q *Queue) Cap() int

func (*Queue) Chan

func (q *Queue) Chan() <-chan interface{}

func (*Queue) Close

func (q *Queue) Close() (closed bool)

func (*Queue) Closed added in v1.0.21

func (q *Queue) Closed() bool

func (*Queue) Dequeue

func (q *Queue) Dequeue() (val interface{}, found bool)

blocking dequeue

func (*Queue) DequeueNB

func (q *Queue) DequeueNB() (val interface{}, found bool)

non-blocking dequeue

func (*Queue) DequeueRetry

func (q *Queue) DequeueRetry(retryInterval time.Duration, retryLimit int) (val interface{}, found bool)

retry w/ dequeue until channel can be read(waiting for channel to write)

func (*Queue) Enqueue

func (q *Queue) Enqueue(val interface{}) (write bool)

blocking enqueue

func (*Queue) EnqueueNB

func (q *Queue) EnqueueNB(val interface{}) (write bool)

non-blocking enqueue

func (*Queue) EnqueueRetry

func (q *Queue) EnqueueRetry(val interface{}, retryInterval time.Duration, retryLimit int) (write bool)

retry w/ enqueue until channel can be written(waiting for channel to read)

func (*Queue) Len added in v1.0.6

func (q *Queue) Len() int

type QueueOptionFunc

type QueueOptionFunc func(*optQueue)

func QueuePanicHandler

func QueuePanicHandler(handler PanicHandler) QueueOptionFunc

type Retry added in v1.0.11

type Retry struct {
	// contains filtered or unexported fields
}

func NewRetry added in v1.0.11

func NewRetry(executor *Executor, funcs ...RetryOptionFunc) *Retry

func NewRetryWithBackoff added in v1.0.11

func NewRetryWithBackoff(executor *Executor, backoff *Backoff, funcs ...RetryOptionFunc) *Retry

func (*Retry) Retry added in v1.0.11

func (r *Retry) Retry(fn RetryFunc) *RetryFuture

func (*Retry) RetryWithErrorHandler added in v1.0.11

func (r *Retry) RetryWithErrorHandler(fn RetryFunc, eh RetryErrorHandler) *RetryFuture

type RetryErrorHandler added in v1.0.11

type RetryErrorHandler func(err error, b *Backoff) RetryNext

type RetryFunc added in v1.0.11

type RetryFunc func(context.Context) (interface{}, error)

type RetryFuture added in v1.0.11

type RetryFuture struct {
	// contains filtered or unexported fields
}

func (*RetryFuture) Result added in v1.0.11

func (f *RetryFuture) Result() ValueError

type RetryNext added in v1.0.11

type RetryNext uint8
const (
	RetryNextContinue RetryNext = iota + 1
	RetryNextBreak
)

type RetryOptionFunc added in v1.0.11

type RetryOptionFunc func(*optRetry)

func RetryBackoffIntervalMax added in v1.0.11

func RetryBackoffIntervalMax(dur time.Duration) RetryOptionFunc

func RetryBackoffIntervalMin added in v1.0.11

func RetryBackoffIntervalMin(dur time.Duration) RetryOptionFunc

func RetryBackoffUseJitter added in v1.0.11

func RetryBackoffUseJitter(useJitter bool) RetryOptionFunc

func RetryContext added in v1.0.11

func RetryContext(ctx context.Context) RetryOptionFunc

func RetryMax added in v1.0.11

func RetryMax(max int) RetryOptionFunc

type SubExecutor added in v1.0.1

type SubExecutor struct {
	// contains filtered or unexported fields
}

func (*SubExecutor) Submit added in v1.0.1

func (s *SubExecutor) Submit(fn Job)

func (*SubExecutor) Wait added in v1.0.1

func (s *SubExecutor) Wait()

type ValueError added in v1.0.10

type ValueError interface {
	Value() interface{}
	Err() error
}

type Wait added in v1.0.16

type Wait struct {
	// contains filtered or unexported fields
}

func WaitN added in v1.0.16

func WaitN(n int) *Wait

func WaitOne added in v1.0.16

func WaitOne() *Wait

func WaitTimeout added in v1.0.16

func WaitTimeout(dur time.Duration, n int) *Wait

func WaitTwo added in v1.0.16

func WaitTwo() *Wait

func (*Wait) Cancel added in v1.0.16

func (w *Wait) Cancel()

func (*Wait) Done added in v1.0.16

func (w *Wait) Done()

func (*Wait) Wait added in v1.0.16

func (w *Wait) Wait() error

type WaitRendezvous added in v1.0.16

type WaitRendezvous struct {
	// contains filtered or unexported fields
}

func WaitRendez added in v1.0.16

func WaitRendez(n int) *WaitRendezvous

func WaitRendezTimeout added in v1.0.16

func WaitRendezTimeout(dur time.Duration, n int) *WaitRendezvous

func (*WaitRendezvous) Cancel added in v1.0.16

func (r *WaitRendezvous) Cancel()

func (*WaitRendezvous) Wait added in v1.0.16

func (r *WaitRendezvous) Wait() error

type WaitReplyFunc added in v1.0.16

type WaitReplyFunc func(interface{}) (interface{}, error)

type WaitRequest added in v1.0.16

type WaitRequest struct {
	// contains filtered or unexported fields
}

func WaitReq added in v1.0.16

func WaitReq() *WaitRequest

func WaitReqTimeout added in v1.0.16

func WaitReqTimeout(dur time.Duration) *WaitRequest

func (*WaitRequest) Cancel added in v1.0.16

func (r *WaitRequest) Cancel()

func (*WaitRequest) Req added in v1.0.16

func (r *WaitRequest) Req(v interface{}) error

func (*WaitRequest) Wait added in v1.0.16

func (r *WaitRequest) Wait() (interface{}, error)

type WaitRequestReply added in v1.0.16

type WaitRequestReply struct {
	// contains filtered or unexported fields
}

func WaitReqReply added in v1.0.16

func WaitReqReply() *WaitRequestReply

func WaitReqReplyTimeout added in v1.0.16

func WaitReqReplyTimeout(dur time.Duration) *WaitRequestReply

func (*WaitRequestReply) Cancel added in v1.0.16

func (r *WaitRequestReply) Cancel()

func (*WaitRequestReply) Reply added in v1.0.16

func (r *WaitRequestReply) Reply(fn WaitReplyFunc) error

func (*WaitRequestReply) Req added in v1.0.16

func (r *WaitRequestReply) Req(v interface{}) (interface{}, error)

type WaitSequence added in v1.0.16

type WaitSequence struct {
	// contains filtered or unexported fields
}

func WaitSeq added in v1.0.16

func WaitSeq(wn ...*Wait) *WaitSequence

func WaitSeqTimeout added in v1.0.16

func WaitSeqTimeout(dur time.Duration, wn ...*Wait) *WaitSequence

func (*WaitSequence) Cancel added in v1.0.16

func (w *WaitSequence) Cancel()

func (*WaitSequence) Wait added in v1.0.16

func (w *WaitSequence) Wait() error

type Worker

type Worker interface {
	Enqueue(interface{}) bool
	CloseEnqueue() bool
	Shutdown()
	ShutdownAndWait()
	ForceStop()
}

func NewBufferWorker

func NewBufferWorker(handler WorkerHandler, funcs ...WorkerOptionFunc) Worker

func NewDefaultWorker

func NewDefaultWorker(handler WorkerHandler, funcs ...WorkerOptionFunc) Worker

type WorkerAbortQueueHandlerFunc added in v1.0.13

type WorkerAbortQueueHandlerFunc func(paramter interface{})

type WorkerHandler

type WorkerHandler func(parameter interface{})

type WorkerHook

type WorkerHook func()

type WorkerOptionFunc

type WorkerOptionFunc func(*optWorker)

func WorkerAbortQueueHandler added in v1.0.13

func WorkerAbortQueueHandler(handler WorkerAbortQueueHandlerFunc) WorkerOptionFunc

func WorkerAutoShutdown added in v1.0.21

func WorkerAutoShutdown(enable bool) WorkerOptionFunc

func WorkerCapacity added in v1.0.15

func WorkerCapacity(capacity int) WorkerOptionFunc

func WorkerContext

func WorkerContext(ctx context.Context) WorkerOptionFunc

func WorkerExecutor added in v1.0.4

func WorkerExecutor(executor *Executor) WorkerOptionFunc

func WorkerMaxDequeueSize added in v1.0.17

func WorkerMaxDequeueSize(size int) WorkerOptionFunc

func WorkerPanicHandler

func WorkerPanicHandler(handler PanicHandler) WorkerOptionFunc

func WorkerPostHook

func WorkerPostHook(hook WorkerHook) WorkerOptionFunc

func WorkerPreHook

func WorkerPreHook(hook WorkerHook) WorkerOptionFunc

func WorkerTimeout added in v1.0.21

func WorkerTimeout(timeout time.Duration) WorkerOptionFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL