koncurrent

package module
v3.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2021 License: MIT Imports: 4 Imported by: 0

README

Introduction

A Go lib for easier concurrency control. Inspired by ReactiveX and javascript Promise.

The v3 library dramatically improve performance than v2 and v1 library by reducing the heap memory allocation.

Benchmark result

Benchmark test on an AMD 2700X Ubuntu 20.04.3 LTS machine Go 1.16.8

v1
BenchmarkExecuteSerial_Immediate-16      	 1259301	       926.5 ns/op	     304 B/op	      14 allocs/op
BenchmarkExecuteSerial_Async-16          	  317821	      4144 ns/op	     616 B/op	      20 allocs/op
BenchmarkExecuteSerial_Pool-16           	  312390	      3894 ns/op	     616 B/op	      20 allocs/op
BenchmarkExecuteParallel_Immediate-16    	  897908	      1252 ns/op	     416 B/op	      17 allocs/op
BenchmarkExecuteParallel_Async-16        	  252895	      5007 ns/op	     728 B/op	      23 allocs/op
BenchmarkExecuteParallel_Pool-16         	  254157	      4411 ns/op	     728 B/op	      23 allocs/op
original v2
BenchmarkExecuteSerial_Immediate-16      	 1388926	       831.9 ns/op	     288 B/op	      11 allocs/op
BenchmarkExecuteSerial_Async-16          	  305587	      3940 ns/op	     600 B/op	      17 allocs/op
BenchmarkExecuteSerial_Pool-16           	  309193	      3626 ns/op	     600 B/op	      17 allocs/op
BenchmarkExecuteParallel_Immediate-16    	  923436	      1159 ns/op	     400 B/op	      14 allocs/op
BenchmarkExecuteParallel_Async-16        	  287938	      4582 ns/op	     712 B/op	      20 allocs/op
BenchmarkExecuteParallel_Pool-16         	  290116	      4016 ns/op	     712 B/op	      20 allocs/op
v3
BenchmarkExecuteSerial_Immediate-16              4078363               286.0 ns/op           168 B/op          3 allocs/op
BenchmarkExecuteSerial_Async-16                   432781              2540 ns/op             168 B/op          3 allocs/op
BenchmarkExecuteSerial_Pool-16                    452028              2473 ns/op             168 B/op          3 allocs/op
BenchmarkExecuteParallel_Immediate-16            1586167               762.7 ns/op           248 B/op          4 allocs/op
BenchmarkExecuteParallel_Async-16                 384945              3181 ns/op             248 B/op          4 allocs/op
BenchmarkExecuteParallel_Pool-16                  425948              2701 ns/op             248 B/op          4 allocs/op

Usage

Simple execution example
    var time1, time2 time.Time
    var t1 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        time1 = time.Now()
        return nil
    }
    var t2 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        time2 = time.Now()
        return nil
    }
    errIter, err := koncurrent.ExecuteParallel(t1.Async(), t2.Immediate()).Await(context.Background())
    fmt.Println(errIter)
    fmt.Println(err)
Cascaded execution example
    var time1, time2, time3, time4 time.Time
    var t1 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        time1 = time.Now()
        return nil
    }
    var t2 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        time2 = time.Now()
        return nil
    }
    var t3 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        tim3 = time.Now()
        return nil
    }
    var t4 koncurrent.TaskFunc = func(ctx context.Context) error {
        time.Sleep(100 * time.Millisecond)
        time4 = time.Now()
        return errors.New("task 4 error occur")
    }
    pe := koncurrent.NewPoolExecutor(20, 20)
    for i := range executors {
        errIter, err := koncurrent.ExecuteSerial(t1.Pool(pe), t2.Async()).
            ExecuteParallel(t3.Pool(pe), t4.Pool(pe)).
            Await(context.Background())
        fmt.Println(errIter)
        fmt.Println(err)
    }
Check more example in execution_test.go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncExecutor added in v3.1.0

type AsyncExecutor struct {
}

func (AsyncExecutor) Execute added in v3.1.0

func (p AsyncExecutor) Execute(ctx context.Context, taskFunc TaskFunc, taskId int, resultChn chan TaskResult, opt TaskExecutionOptions)

type CaseExecution

type CaseExecution struct {
	Execution Execution
	Case      func() bool
}

type Execution

type Execution struct {
	// contains filtered or unexported fields
}

func ExecuteParallel

func ExecuteParallel(tasks ...TaskExecution) Execution

func ExecuteSerial

func ExecuteSerial(tasks ...TaskExecution) Execution

func Switch

func Switch(defaultExec Execution, cases ...CaseExecution) Execution

func (Execution) Async

func (e Execution) Async(ctx context.Context, callback func(ExecutionResults, error))

func (Execution) Await

func (Execution) ExecuteParallel

func (e Execution) ExecuteParallel(tasks ...TaskExecution) Execution

func (Execution) ExecuteSerial

func (e Execution) ExecuteSerial(tasks ...TaskExecution) Execution

func (Execution) Switch

func (e Execution) Switch(defaultExec Execution, cases ...CaseExecution) Execution

type ExecutionResults

type ExecutionResults [][]error

func (ExecutionResults) FlattenErrors added in v3.1.0

func (er ExecutionResults) FlattenErrors() []error

type ImmediateExecutor added in v3.1.0

type ImmediateExecutor struct {
}

func (ImmediateExecutor) Execute added in v3.1.0

func (p ImmediateExecutor) Execute(ctx context.Context, taskFunc TaskFunc, taskId int, resultChn chan TaskResult, opt TaskExecutionOptions)

type PanicError added in v3.1.0

type PanicError struct {
	Stack []byte
}

func (PanicError) Error added in v3.1.0

func (e PanicError) Error() string

type PoolExecutor

type PoolExecutor struct {
	// contains filtered or unexported fields
}

func NewPoolExecutor

func NewPoolExecutor(poolSize int, queueSize int) PoolExecutor

func (PoolExecutor) Execute

func (p PoolExecutor) Execute(ctx context.Context, task TaskFunc, taskId int, resultChn chan TaskResult, opt TaskExecutionOptions)

type TaskExecution

type TaskExecution struct {
	// contains filtered or unexported fields
}

func (TaskExecution) Execution

func (t TaskExecution) Execution() Execution

func (TaskExecution) Recover added in v3.1.0

func (t TaskExecution) Recover() TaskExecution

func (TaskExecution) Tracing added in v3.1.0

func (t TaskExecution) Tracing(spanName string) TaskExecution

type TaskExecutionOptions added in v3.1.0

type TaskExecutionOptions struct {
	// contains filtered or unexported fields
}

type TaskExecutor added in v3.1.0

type TaskExecutor interface {
	Execute(ctx context.Context, taskFunc TaskFunc, taskId int, resultChn chan TaskResult, taskExecutionOpts TaskExecutionOptions)
}

type TaskFunc

type TaskFunc func(ctx context.Context) error

func (TaskFunc) Async

func (t TaskFunc) Async() TaskExecution

func (TaskFunc) Immediate

func (t TaskFunc) Immediate() TaskExecution

func (TaskFunc) Pool

func (t TaskFunc) Pool(executor PoolExecutor) TaskExecution

type TaskResult

type TaskResult struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL