concurrency

package module
v0.0.0-...-e1186ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2020 License: MIT Imports: 4 Imported by: 0

README

GitHub issues GitHub forks GitHub stars GitHub license Go Codecov branch

concurrency-go

Helper library for effective concurrency in golang

Install

go get -u github.com/Villenny/concurrency-go

Notable members:

ParallelForLimit(), AtomicInt64, Pool

Using ParallelForLimit:

  • Quickly use all the cores to process an array of work.
	import "github.com/villenny/concurrency-go"

	results := make([]MyStruct, len(input))

	ParallelForLimit(runtime.NumCPU(), len(input), func(n int) {
		in := input[n]
		results[n] = MyWorkFn(in)
	})

Using AtomicInt64:

  • Supports json marshal/unmarshal implicitly, just like regular int64's
  • Automatically eliminates false sharing
	import "github.com/villenny/concurrency-go"

	allocCount := NewAtomicInt64()
	allocCount.Add(1)
	bytes, _ := json.Marshal(&allocCount)
	_ = json.Unmarshal(bytes, &allocCount)
	itsOne := allocCount.Get()

Using the Pool:

  • Simplifies use by handling reset on put implicitly
import (
	"github.com/villenny/concurrency-go"
	"github.com/cespare/xxhash/v2"
)

var pool = concurrency.NewPool(&concurrency.PoolInfo{
	New:   func() interface{} { return xxhash.New() },
	Reset: func(t interface{}) { t.(*xxhash.Digest).Reset() },
})

func HashString(s string) uint64 {
	digest := pool.Get().(*xxhash.Digest)
	_, _ = digest.WriteString(s) // always returns len(s), nil
	hash := digest.Sum64()
	pool.Put(digest)
	return hash
}

Benchmark

Assuming you never call go fn() inside your work function, ParallelForLimit() is pretty tough to beat assuming you're using it for batch processing which is its intended use case. In the SHA benchmark, its pretty close to 4X the throughput of the serial version on my 4 core + hyperthreading processor.

Unfortunately without the ability to do something along the lines of go thiscore fn(), theres no way to do this optimally if you have asynchronous calls in your work function.

And while its easy to use, you absolutely can beat it, like by a lot, by feeding your input into one disruptor per long lived goroutine so each has totally independent in order circular array buffers with zero contention. See https://github.com/serialx/go-disruptor

Running tool: C:\Go\bin\go.exe test -benchmem -run=^$ github.com/villenny/concurrency-go -bench .

goos: windows
goarch: amd64
pkg: github.com/villenny/concurrency-go
BenchmarkFor_InlineFor_SQRT-8                   	231453978	         5.42 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_SerialFor_SQRT-8                   	327313581	         4.06 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_InlineFor_SQRT2-8                  	353307472	         3.54 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelForLimit_SQRT_1-8          	326426421	         3.51 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelForLimit_SQRT_2-8          	556136150	         2.87 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelForLimit_SQRT_4-8          	704042665	        10.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelForLimit_SQRT_8-8          	751173544	        11.4 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelForLimit_SQRT_16-8         	200899519	         7.09 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_ParallelFor_SQRT-8                 	 5315282	       230 ns/op	       0 B/op	       0 allocs/op
BenchmarkFor_SerialFor_SHA256-8                 	 1696674	       702 ns/op	      32 B/op	       1 allocs/op
BenchmarkFor_ParallelForLimit_SHA256-8          	 5976416	       205 ns/op	      32 B/op	       1 allocs/op
BenchmarkInt64_Add-8                            	188578995	         6.10 ns/op	       0 B/op	       0 allocs/op
BenchmarkInt64_Get-8                            	552139134	         2.18 ns/op	       8 B/op	       0 allocs/op
BenchmarkSafeInt64_Add-8                        	 7507633	       160 ns/op	       0 B/op	       0 allocs/op
BenchmarkSafeInt64_Get-8                        	28602208	        35.8 ns/op	       0 B/op	       0 allocs/op
BenchmarkAtomicInt64_Add-8                      	75078831	        16.0 ns/op	       0 B/op	       0 allocs/op
BenchmarkAtomicInt64_Get-8                      	1000000000	         1.53 ns/op	       0 B/op	       0 allocs/op
Benchmark_Inc500x2_SafeInt64-8                  	   10000	    114481 ns/op	     491 B/op	       1 allocs/op
Benchmark_Get500x2_Int64-8                      	 3048847	       391 ns/op	      31 B/op	       0 allocs/op
Benchmark_Get500x2_SafeInt64-8                  	   58312	     23007 ns/op	       0 B/op	       0 allocs/op
Benchmark_Inc500x2_AtomicInt64-8                	  138075	      8581 ns/op	       0 B/op	       0 allocs/op
Benchmark_Get500x2_AtomicInt64-8                	 1568206	       766 ns/op	      50 B/op	       0 allocs/op
Benchmark_Inc500x2_RawAtomic_falseSharing-8     	   55357	     21456 ns/op	       0 B/op	       0 allocs/op
Benchmark_Inc500x2_RawAtomic_noFalseSharing-8   	  138074	      8711 ns/op	       0 B/op	       0 allocs/op
Benchmark_Inc500x2_Int64_noFalseSharing-8       	  266944	      4558 ns/op	       0 B/op	       0 allocs/op
PASS
ok  	github.com/villenny/concurrency-go	89.611s

Contact

Ryan Haksi [ryan.haksi@gmail.com]

License

Available under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParallelFor

func ParallelFor(iMax int, fn func(i int))

ParallelForLimit: generally speaking this thing sucks except in trivial cases its does a single go func for each item, then blocks on a wait groupt until everything is done.

func ParallelForLimit

func ParallelForLimit(workerCount int, count int, fn func(i int))

ParallelForLimit: Implement a sequential scan over an input array with multiple go workers each of which write uncontended changes to their own chunk of the output array in the event a worker finishes it does work stealing from the other workers (which does introduce contention), each worker has its own atomic counter to track its position in the input.

func SerialFor

func SerialFor(iMax int, fn func(i int))

Types

type AtomicInt64

type AtomicInt64 struct {
	// contains filtered or unexported fields
}

////////////////////////////////////////////////////////// An AtomicInt is an int64 to be accessed atomically.

func NewAtomicInt64

func NewAtomicInt64() *AtomicInt64

func (*AtomicInt64) Add

func (i *AtomicInt64) Add(n int64) int64

func (*AtomicInt64) Get

func (i *AtomicInt64) Get() int64

func (*AtomicInt64) MarshalJSON

func (i *AtomicInt64) MarshalJSON() ([]byte, error)

func (*AtomicInt64) Set

func (i *AtomicInt64) Set(v int64)

func (*AtomicInt64) String

func (i *AtomicInt64) String() string

func (*AtomicInt64) UnmarshalJSON

func (i *AtomicInt64) UnmarshalJSON(data []byte) error

type Pool

type Pool struct {
	AllocationCount *AtomicInt64
	// contains filtered or unexported fields
}

func NewPool

func NewPool(p *PoolInfo) *Pool

func (*Pool) Get

func (p *Pool) Get() interface{}

func (*Pool) Put

func (p *Pool) Put(o interface{})

type PoolInfo

type PoolInfo struct {
	New   func() interface{}
	Reset func(object interface{})
}

type SafeInt64

type SafeInt64 struct {
	// contains filtered or unexported fields
}

////////////////////////////////////////////////////////// A SafeInt64 is an int64 to be accessed safely with a lock. You should prefer AtomicInt64 instead

func NewSafeInt64

func NewSafeInt64() *SafeInt64

func (*SafeInt64) Add

func (s *SafeInt64) Add(v int64) int64

func (*SafeInt64) Get

func (s *SafeInt64) Get() int64

func (*SafeInt64) Set

func (s *SafeInt64) Set(v int64)

func (*SafeInt64) String

func (i *SafeInt64) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL