async

package module
v0.0.0-...-85fbbdd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: Apache-2.0 Imports: 2 Imported by: 0

README

go-async

Build Coverage Status License

Status: EXPERIMENTAL

This library is primarily intended as a directed learning exercise and eventual collection of utilities and patterns for working asynchronously in Go.

Documentation

GoDoc

Full go doc style documentation for the project can be viewed online without installing this package by using the excellent GoDoc site here: http://godoc.org/github.com/41north/go-async

You can also view the documentation locally once the package is installed with the godoc tool by running godoc -http=":6060" and pointing your browser to http://localhost:6060/pkg/github.com/41north/go-async

Installation

$ go get -u github.com/41north/go-async

Add this import line to the file you're working in:

import "github.com/41north/go-async"

Quick Start

Future

A basic example:

// create a string future
f := NewFuture[string]()

// create a consumer channel
ch := f.Get()
go func() {
	println(fmt.Sprintf("Value: %s", <-ch))
}()

// set the value
f.Set("hello")
Counting Semaphore

A basic example:

// we create an input and output channel for work needing to be done
inCh := make(chan string, 128)
outCh := make(chan int, 128)

// we want a max of 10 in-flight processes
s := NewCountingSemaphore(10)

// we create more workers than tokens available
for i := 0; i < 100; i++ {
	go func() {
		for {
			// acquire a token, waiting until one is available
			s.Acquire(1)

			// consume from the input channel
			v, ok := <-inCh
			if !ok {
				// channel was closed
				return
			}

			// do some work and produce an output value
			outCh <- len(v)

			// you need to be careful about releasing, if possible perform it with defer
			s.Release(1)
		}
	}()
}

// generate some work and put it into the work queue
// ...
// ...

There are more examples available in the go doc.

License

Go-async is licensed under the Apache 2.0 License

Contact

If you want to get in touch drop us an email at hello@41north.dev

Documentation

Overview

Package async provides constructs for various asynchronous patterns.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CountingSemaphore

type CountingSemaphore interface {
	// Size returns the total number of tokens available withing this CountingSemaphore.
	Size() int32

	// Acquire attempts to acquire an amount of tokens from the semaphore, waiting until it is successful.
	Acquire(count int32)

	// TryAcquire attempts to acquire an amount of tokens from the semaphore and returns whether
	// it was successful or not.
	TryAcquire(count int32) bool

	// Release attempts to return a certain amount of tokens to the semaphore, waiting until it is successful.
	Release(count int32)

	// TryRelease attempts to return a certain amount of tokens to the semaphore and returns whether
	// it was successful or not.
	TryRelease(count int32) bool
}

CountingSemaphore can be used to limit the amount of in-flight processes / tasks.

Example
// we create an input and output channel for work needing to be done
inCh := make(chan string, 128)
outCh := make(chan int, 128)

// we want a max of 10 in-flight processes
s := NewCountingSemaphore(10)

// we create more workers than tokens available
for i := 0; i < 100; i++ {
	go func() {
		for {
			// acquire a token, waiting until one is available
			s.Acquire(1)

			// consume from the input channel
			v, ok := <-inCh
			if !ok {
				// channel was closed
				return
			}

			// do some work and produce an output value
			outCh <- len(v)

			// you need to be careful about releasing, if possible perform it with defer
			s.Release(1)
		}
	}()
}

// generate some work and put it into the work queue
// ...
// ...
Output:

func NewCountingSemaphore

func NewCountingSemaphore(size int32) CountingSemaphore

NewCountingSemaphore creates a new semaphore with specified amount of available tokens.

type Future

type Future[T any] interface {
	// Get returns a response channel of size 1 for receiving the future value.
	// If the value has already been set it will already be available within the return channel.
	Get() <-chan T

	// Set sets the return value and notifies consumers. Consumers are notified once only,
	// with the return value indicating if Set was successful or not.
	Set(value T) bool
}

Future represents a value of type T that will be set at some time in the future.

Example (Basic)
// create a string future
f := NewFuture[string]()

// create a consumer channel
ch := f.Get()
go func() {
	println(fmt.Sprintf("Value: %s", <-ch))
}()

// set the value
f.Set("hello")
Output:

Example (Multiple)
// create some futures
foo := NewFuture[string]()
bar := NewFuture[string]()

// compute in the background
go func() {
	foo.Set("foo")
}()

go func() {
	foo.Set("bar")
}()

// wait for their results
println(<-foo.Get())
println(<-bar.Get())
Output:

Example (Select)
// create some futures
foo := NewFuture[string]()
bar := NewFuture[string]()

// compute their values in the background
go func() {
	foo.Set("foo")
}()

go func() {
	bar.Set("bar")
}()

// create some consumer channels
fooCh := foo.Get()
barCh := bar.Get()

// wait with timeout

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

var result []string
finished := false

for {
	select {
	case <-ctx.Done():
		fmt.Println("timeout")
		finished = true
	case v, ok := <-fooCh:
		if ok {
			result = append(result, v)
		}
		finished = len(result) == 2
	case v, ok := <-barCh:
		if ok {
			result = append(result, v)
		}
		finished = len(result) == 2
	}

	if finished {
		// break out of the loop
		break
	}
}

// print all the results
fmt.Println(result)
Output:

func NewFuture

func NewFuture[T any]() Future[T]

NewFuture creates a new future of type T.

func NewFutureImmediate

func NewFutureImmediate[T any](value T) Future[T]

NewFutureImmediate creates a future of type T that has a value that is already set.

Example
f := NewFutureImmediate("hello")
println(<-f.Get())
Output:

type Result

type Result[T any] interface {
	// Unwrap deconstructs the contents of this Result into a tuple.
	Unwrap() (T, error)
}

Result is a simple wrapper for representing a value or an error.

func NewResult

func NewResult[T any](value T, err error) Result[T]

NewResult creates a result instance with a provided value and error. It's sometimes more convenient to instantiate like this when implementing library code.

Example
result := NewResultValue[string]("success")
v, _ := result.Unwrap()
println(v)
Output:

func NewResultErr

func NewResultErr[T any](err error) Result[T]

NewResultErr creates a failed result.

Example
result := NewResultErr[string](errors.New("failure"))
_, err := result.Unwrap()
panic(err)
Output:

func NewResultValue

func NewResultValue[T any](value T) Result[T]

NewResultValue creates a successful result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL