asyncigo

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	_ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		task := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) {
			for i := range 3 {
				fmt.Printf("in subtask: %d\n", i)
				_ = asyncigo.Sleep(ctx, time.Second)
			}
			return 42, nil
		})

		for j := range 3 {
			fmt.Printf("in main task: %d\n", j)
			_ = asyncigo.Sleep(ctx, time.Second)
		}

		result, _ := task.Await(ctx)
		fmt.Printf("task result: %d\n", result)
		return nil
	})
}
Output:

in main task: 0
in subtask: 0
in main task: 1
in subtask: 1
in main task: 2
in subtask: 2
task result: 42
Example (Polyglot)

As Go's implementation of coroutines doesn't require us to use an "await" keyword every time we call one, it's easy to write "polyglottal" functions that work both synchronously and asynchronously, making the "coloured functions" problem less of an issue.

This example shows how one might write a sleep function that blocks if used outside an event loop, but not if an event loop is available.

package main

import (
	"context"
	"fmt"
	"math"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	ctx := context.Background()
	_ = sleepPolyglot(ctx, time.Second*2)

	_ = asyncigo.NewEventLoop().Run(ctx, func(ctx context.Context) error {
		start := time.Now()

		numTasks := 100
		tasks := asyncigo.Map(asyncigo.Range(numTasks), func(int) asyncigo.Futurer {
			return asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) {
				_ = sleepPolyglot(ctx, time.Second*2)
				return nil, nil
			})
		}).Collect()
		_ = asyncigo.Wait(ctx, asyncigo.WaitAll, tasks...)

		elapsed := time.Since(start)
		fmt.Printf("%d tasks took %d seconds to finish\n", len(tasks), int(math.Round(elapsed.Seconds())))

		return nil
	})
}

func sleepPolyglot(ctx context.Context, duration time.Duration) error {
	if _, ok := asyncigo.RunningLoopMaybe(ctx); ok {
		return asyncigo.Sleep(ctx, duration)
	}
	time.Sleep(duration)
	return nil
}
Output:

100 tasks took 2 seconds to finish

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotImplemented is returned by a Poller implementation for functions it does not support.
	ErrNotImplemented = errors.New("this function is not supported by this implementation")
)
View Source
var (
	ErrNotReady = errors.New("future is still pending")
)

Functions

func GetFirstResult

func GetFirstResult[T any](ctx context.Context, coros ...Coroutine2[T]) (T, error)

GetFirstResult returns the result of the first successful coroutine. Once a coroutine succeeds, all unfinished tasks will be cancelled. If no coroutine succeeds, the last error is returned.

func Sleep

func Sleep(ctx context.Context, duration time.Duration) error

Sleep suspends the current coroutine for the given duration.

func Wait

func Wait(ctx context.Context, mode WaitMode, futs ...Futurer) error

Wait will wait for any or all of the given Futures to complete depending on the WaitMode passed. If any of the futures fail, the most recent error will be returned. Wait will not cancel any futures.

Example

This example creates one future and two tasks and waits for them all to finish. Combining Wait with asyncigo.Awaitable.WriteResultTo allows for very succinct code.

In this case, the future and one of the tasks succeed, while the second task fails with an error. We can see how the error from the failing task is propagated by Wait, while the results are written to the specified locations.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	_ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		fut1 := asyncigo.NewFuture[string]()
		task1 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) {
			_ = asyncigo.Sleep(ctx, time.Second)

			fut1.SetResult("test", nil)

			return 20, nil
		})
		task2 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (float64, error) {
			_ = asyncigo.Sleep(ctx, time.Second)

			return 25.5, errors.New("oops")
		})

		var result1 string
		var result2 int
		var result3 float64
		err := asyncigo.Wait(
			ctx,
			asyncigo.WaitAll,
			fut1.WriteResultTo(&result1),
			task1.WriteResultTo(&result2),
			task2.WriteResultTo(&result3),
		)

		fmt.Println("results:", result1, result2, result3)
		fmt.Println("error:", err)
		return nil
	})
}
Output:

results: test 20 25.5
error: oops

func Zip

func Zip[T, U any, TI Iterable[T], UI Iterable[U]](it1 TI, it2 UI) iter.Seq2[T, U]

Zip returns an iterator which yields each pair of items from the given iterators in turn. If the iterators are of different length, Zip will stop once the end of the shortest iterator has been reached.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Zip(
		asyncigo.AsIterator([]int{1, 2, 3, 4}),
		asyncigo.AsIterator([]string{"a", "b", "c", "d", "e"}),
	)

	for a, b := range it {
		fmt.Printf("%d: %s\n", a, b)
	}
}
Output:

1: a
2: b
3: c
4: d

func ZipLongest

func ZipLongest[T, U any, TI Iterable[T], UI Iterable[U]](it1 TI, it2 UI) iter.Seq2[T, U]

ZipLongest returns an iterator which yields each pair of items from the given iterators in turn. If the iterators are of different length, ZipLongest will continue until the end of the longest iterator, yielding the empty value in place of the shorter iterable.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.ZipLongest(
		asyncigo.AsIterator([]int{1, 2, 3, 4}),
		asyncigo.AsIterator([]string{"a", "b", "c", "d", "e"}),
	)

	for a, b := range it {
		fmt.Printf("%d: %s\n", a, b)
	}
}
Output:

1: a
2: b
3: c
4: d
0: e

Types

type AsyncIterable

type AsyncIterable[T any] iter.Seq2[T, error]

AsyncIterable is a helper type for iterating over asynchronous streams that may error.

func AsyncIter

func AsyncIter[T any](f func(yield func(T) error) error) AsyncIterable[T]

AsyncIter is a helper function for constructing an AsyncIterable.

Example (Basic)

This is a basic example showing how results and errors are yielded when iterating over an AsyncIterable.

package main

import (
	"errors"
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.AsyncIter(func(yield func(int) error) error {
		for i := range 5 {
			if err := yield(i); err != nil {
				return err
			}
		}

		return errors.New("oops")
	})

	for i, err := range it {
		fmt.Printf("%d - %v\n", i, err)
	}
}
Output:

0 - <nil>
1 - <nil>
2 - <nil>
3 - <nil>
4 - <nil>
0 - oops
Example (Dial)

This shows a more advanced usage of AsyncIter, combining yields and awaits. It reads lines of data from an asynchronous stream and yields the length of each line.

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"net"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	defer serveFile("stream_example2.txt", "6172")()

	if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		it := asyncigo.AsyncIter(func(yield func(int) error) error {
			stream, err := asyncigo.RunningLoop(ctx).Dial(ctx, "tcp", "localhost:6172")
			if err != nil {
				return err
			}

			for {
				line, err := stream.ReadLine(ctx)
				if errors.Is(err, io.EOF) {
					return nil
				} else if err != nil {
					return err
				}

				unicode := []rune(strings.TrimSpace(string(line)))
				_ = yield(len(unicode))
			}
		})

		for lineLength, err := range it {
			if err != nil {
				return err
			}

			fmt.Println(lineLength)
		}
		return nil
	}); err != nil {
		panic(err)
	}
}

func serveFile(path, port string) (close func()) {
	data, err := os.ReadFile(filepath.Join("tests", path))
	if err != nil {
		panic(err)
	}

	address := net.JoinHostPort("localhost", port)
	var waiter sync.WaitGroup
	var l net.Listener
	waiter.Add(1)
	go func() {
		defer waiter.Done()

		var err error
		l, err = net.Listen("tcp", address)
		if err != nil {
			panic(err)
		}

		for {
			conn, err := l.Accept()
			if err != nil {
				return
			}

			if _, err := conn.Write(data); err != nil {
				panic(err)
			}
			_ = conn.Close()
		}
	}()

	for {
		conn, err := net.Dial("tcp", address)
		if errors.Is(err, syscall.ECONNREFUSED) {
			time.Sleep(time.Millisecond * 10)
		} else if err != nil {
			panic(err)
		} else {
			_ = conn.Close()
			break
		}
	}

	return func() {
		_ = l.Close()
		waiter.Wait()
	}
}
Output:

6
12
18
30

func (AsyncIterable[T]) ForEach

func (ai AsyncIterable[T]) ForEach(f func(T) error) error

ForEach calls the given function for each value yielded by this AsyncIterable until the iterator finishes or returns an error. ForEach returns an error if either the iterator or the callback function returns an error.

func (AsyncIterable[T]) UntilErr

func (ai AsyncIterable[T]) UntilErr(err *error) Iterator[T]

UntilErr returns a single-values iterator that yields each non-error value yielded by this AsyncIterable until the iterator finishes or returns an error. If the AsyncIterable returns an error, the error will be written to the variable referenced by the given error pointer.

Example

Basic usage of UntilErr.

package main

import (
	"errors"
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.AsyncIter(func(yield func(int) error) error {
		for i := range 5 {
			if err := yield(i); err != nil {
				return err
			}
		}

		return errors.New("oops")
	})

	var err error
	for i := range it.UntilErr(&err) {
		fmt.Println(i)
	}
	fmt.Println(err)
}
Output:

0
1
2
3
4
oops
Example (AsyncStreams_A)

UntilErr is particularly useful for ranging over network streams.

package main

import (
	"context"
	"errors"
	"fmt"
	"net"
	"os"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	defer serveFile("stream_example1.txt", "6172")()

	if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		stream, _ := asyncigo.RunningLoop(ctx).Dial(ctx, "tcp", "localhost:6172")

		var err error
		for line := range stream.Lines(ctx).UntilErr(&err) {
			fmt.Printf("got line: %s", line)
		}

		return err
	}); err != nil {
		panic(err)
	}
}

func serveFile(path, port string) (close func()) {
	data, err := os.ReadFile(filepath.Join("tests", path))
	if err != nil {
		panic(err)
	}

	address := net.JoinHostPort("localhost", port)
	var waiter sync.WaitGroup
	var l net.Listener
	waiter.Add(1)
	go func() {
		defer waiter.Done()

		var err error
		l, err = net.Listen("tcp", address)
		if err != nil {
			panic(err)
		}

		for {
			conn, err := l.Accept()
			if err != nil {
				return
			}

			if _, err := conn.Write(data); err != nil {
				panic(err)
			}
			_ = conn.Close()
		}
	}()

	for {
		conn, err := net.Dial("tcp", address)
		if errors.Is(err, syscall.ECONNREFUSED) {
			time.Sleep(time.Millisecond * 10)
		} else if err != nil {
			panic(err)
		} else {
			_ = conn.Close()
			break
		}
	}

	return func() {
		_ = l.Close()
		waiter.Wait()
	}
}
Output:

got line: Lorem ipsum dolor sit amet.
got line: Donec non velit consequat.
got line: Donec interdum in nulla ac scelerisque.
got line: Duis commodo, neque ac luctus eleifend.
got line: Fusce lacinia id quam ac porttitor.
Example (AsyncStreams_B)

Since UntilErr returns a standard single-valued iterator, you can easily manipulate the iterator using functions like asyncigo.Map and asyncigo.Enumerate. This example shows how you could combine multiple asynchronous iterators using asyncigo.Chain.

package main

import (
	"context"
	"errors"
	"fmt"
	"net"
	"os"
	"path/filepath"
	"sync"
	"syscall"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	defer serveFile("stream_example1.txt", "6172")()
	defer serveFile("stream_example2.txt", "6173")()

	if err := asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		loop := asyncigo.RunningLoop(ctx)

		var err error
		for line := range asyncigo.Chain(
			loop.DialLines(ctx, "tcp", "localhost:6172").UntilErr(&err),
			loop.DialLines(ctx, "tcp", "localhost:6173").UntilErr(&err),
		) {
			fmt.Printf("got line: %s", line)
		}

		return err
	}); err != nil {
		panic(err)
	}
}

func serveFile(path, port string) (close func()) {
	data, err := os.ReadFile(filepath.Join("tests", path))
	if err != nil {
		panic(err)
	}

	address := net.JoinHostPort("localhost", port)
	var waiter sync.WaitGroup
	var l net.Listener
	waiter.Add(1)
	go func() {
		defer waiter.Done()

		var err error
		l, err = net.Listen("tcp", address)
		if err != nil {
			panic(err)
		}

		for {
			conn, err := l.Accept()
			if err != nil {
				return
			}

			if _, err := conn.Write(data); err != nil {
				panic(err)
			}
			_ = conn.Close()
		}
	}()

	for {
		conn, err := net.Dial("tcp", address)
		if errors.Is(err, syscall.ECONNREFUSED) {
			time.Sleep(time.Millisecond * 10)
		} else if err != nil {
			panic(err)
		} else {
			_ = conn.Close()
			break
		}
	}

	return func() {
		_ = l.Close()
		waiter.Wait()
	}
}
Output:

got line: Lorem ipsum dolor sit amet.
got line: Donec non velit consequat.
got line: Donec interdum in nulla ac scelerisque.
got line: Duis commodo, neque ac luctus eleifend.
got line: Fusce lacinia id quam ac porttitor.
got line: 生麦生米生卵
got line: すもももももももものうち
got line: 東京特許許可局長今日急遽休暇許可却下
got line: 斜め77度の並びで泣く泣く嘶くナナハン7台難なく並べて長眺め

func (AsyncIterable[T]) YieldTo

func (ai AsyncIterable[T]) YieldTo(yield func(T) error) error

YieldTo will yield all the values from this AsyncIterable using the provided yield function for easier chaining of iterables.

type AsyncReadWriteCloser

type AsyncReadWriteCloser interface {
	io.ReadWriteCloser
	// WaitForReady suspends the calling coroutine until an I/O event occurs for this file handle.
	WaitForReady(ctx context.Context) error
}

AsyncReadWriteCloser represents a non-blocking file handle.

If an I/O operation is attempted when the underlying stream is not ready, e.g. because data is not yet available, a syscall.EAGAIN error will be returned.

type AsyncStream

type AsyncStream struct {
	// contains filtered or unexported fields
}

AsyncStream is a byte stream that can be read from and written to asynchronously.

func NewAsyncStream

func NewAsyncStream(file AsyncReadWriteCloser) *AsyncStream

NewAsyncStream constructs a new AsyncStream.

func (*AsyncStream) Chunks

func (a *AsyncStream) Chunks(ctx context.Context, chunkSize int) AsyncIterable[[]byte]

Chunks returns an AsyncIterable that iterates over the stream in fixed-size chunks of data.

func (*AsyncStream) Close

func (a *AsyncStream) Close() error

Close closes the stream.

func (*AsyncStream) Lines

func (a *AsyncStream) Lines(ctx context.Context) AsyncIterable[[]byte]

Lines returns an AsyncIterable that iterates over all lines in the stream. The newline character will be included with each line.

func (*AsyncStream) ReadAll

func (a *AsyncStream) ReadAll(ctx context.Context) ([]byte, error)

ReadAll reads until the end of the stream and returns all read data.

func (*AsyncStream) ReadChunk

func (a *AsyncStream) ReadChunk(ctx context.Context, chunkSize int) ([]byte, error)

ReadChunk reads a single fixed-size chunk of data from the stream.

func (*AsyncStream) ReadLine

func (a *AsyncStream) ReadLine(ctx context.Context) ([]byte, error)

ReadLine returns all data until a newline is encountered, including the newline.

func (*AsyncStream) ReadUntil

func (a *AsyncStream) ReadUntil(ctx context.Context, character byte) ([]byte, error)

ReadUntil returns all data until the given character is encountered, including the character.

func (*AsyncStream) Stream

func (a *AsyncStream) Stream(ctx context.Context, bufSize int) AsyncIterable[[]byte]

Stream returns an AsyncIterable that yields the next chunk of data as soon as it is available. The chunks will be no larger than the given buffer size.

func (*AsyncStream) Write

func (a *AsyncStream) Write(ctx context.Context, data []byte) Awaitable[int]

Write writes the given data to the stream. The returned Awaitable can be awaited to be sure that all data has been written before continuing.

type Awaitable

type Awaitable[T any] interface {
	Futurer
	// Await suspends the current task until this [Awaitable]
	// has completed and returns its result once completed.
	//
	// If the calling [Task] or the given [context.Context]
	// is cancelled before Await completes, an error will be returned
	// and the Awaitable will be cancelled as well.
	// See the [Awaitable.Shield] method if you do not want the Awaitable to be cancelled.
	Await(ctx context.Context) (T, error)
	// MustAwait is the same as [Awaitable.Await], but it panics if Await
	// returns an error.
	MustAwait(ctx context.Context) T
	// Shield returns a new [Future] which completes once this Awaitable completes,
	// but which will not cancel this Awaitable if cancelled.
	// Allows for awaiting an Awaitable from a [Task] without cancelling
	// the Awaitable if the Task is cancelled.
	Shield() *Future[T]
	// AddResultCallback registers a type-aware callback to run once this Awaitable
	// completes or is cancelled. If called when the Awaitable has already completed,
	// the callback will be run immediately.
	AddResultCallback(callback func(result T, err error)) Awaitable[T]
	// WriteResultTo registers a pointer to write the result
	// of this Awaitable to if it completes with no error.
	//
	// This method allows for particularly ergonomic use
	// of functions like [Wait].
	WriteResultTo(dst *T) Awaitable[T]
	// Future returns the underlying [Future] holding the result
	// of this Awaitable. Returns itself if the Awaitable is a Future.
	Future() *Future[T]
	// Result returns the result of this Awaitable.
	// If this Awaitable has not yet completed, [ErrNotReady] will be returned.
	Result() (T, error)
}

Awaitable is a type that holds the result of an operation that may complete at a later point in time, and which can be awaited to suspend the current coroutine until the operation has completed.

This interface enables polymorphism for Future and Task objects.

type Callback

type Callback struct {
	// contains filtered or unexported fields
}

Callback is a handle to a callback scheduled to be run by an EventLoop.

func NewCallback

func NewCallback(duration time.Duration, callback func()) *Callback

NewCallback creates a new handle to a callback specified to run after the given amount of time.

Calling this function will not actually schedule the callback to be run. Use EventLoop.ScheduleCallback instead.

func (*Callback) Cancel

func (c *Callback) Cancel() bool

Cancel removes this callback from its callback queue, preventing it from being run. If this callback is not currently scheduled, this method is a no-op and will return false.

type Coroutine1

type Coroutine1 func(ctx context.Context) error

Coroutine1 is a coroutine that can return an error.

func (Coroutine1) SpawnTask

func (c Coroutine1) SpawnTask(ctx context.Context) *Task[any]

SpawnTask is a convenience function for starting this coroutine as a background task.

type Coroutine2

type Coroutine2[R any] func(ctx context.Context) (R, error)

Coroutine2 is a coroutine that can return a result or an error.

func (Coroutine2[R]) SpawnTask

func (c Coroutine2[R]) SpawnTask(ctx context.Context) *Task[R]

SpawnTask is a convenience function for starting this coroutine as a background task.

type EpollAsyncFile

type EpollAsyncFile struct {
	// contains filtered or unexported fields
}

EpollAsyncFile is an implementation of AsyncReadWriteCloser for EpollPoller.

func NewEpollAsyncFile

func NewEpollAsyncFile(poller *EpollPoller, f Fder) *EpollAsyncFile

NewEpollAsyncFile wraps the given file handle using an EpollAsyncFile.

func (*EpollAsyncFile) Close

func (eaf *EpollAsyncFile) Close() error

Close implements io.Closer.

func (*EpollAsyncFile) Fd

func (eaf *EpollAsyncFile) Fd() uintptr

Fd implements Fder.

func (*EpollAsyncFile) Read

func (eaf *EpollAsyncFile) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*EpollAsyncFile) WaitForReady

func (eaf *EpollAsyncFile) WaitForReady(ctx context.Context) error

WaitForReady implements AsyncReadWriteCloser.

func (*EpollAsyncFile) Write

func (eaf *EpollAsyncFile) Write(p []byte) (n int, err error)

Write implements io.Writer.

type EpollPoller

type EpollPoller struct {
	// contains filtered or unexported fields
}

EpollPoller is an epoll-backed Poller implementation.

func (*EpollPoller) Close

func (e *EpollPoller) Close() error

Close implements Poller.

func (*EpollPoller) Dial

func (e *EpollPoller) Dial(ctx context.Context, network, address string) (conn AsyncReadWriteCloser, err error)

Dial implements Poller.

func (*EpollPoller) Open

func (e *EpollPoller) Open(fd uintptr) (file AsyncReadWriteCloser, err error)

Open wraps the given file descriptor and subscribes to its events.

func (*EpollPoller) Pipe

func (e *EpollPoller) Pipe() (r, w AsyncReadWriteCloser, err error)

Pipe implements Poller.

func (*EpollPoller) Subscribe

func (e *EpollPoller) Subscribe(target *EpollAsyncFile) error

Subscribe instructs the poller to start listening for events for the given file handle.

func (*EpollPoller) Unsubscribe

func (e *EpollPoller) Unsubscribe(target *EpollAsyncFile) error

Unsubscribe instructs the poller to stop listening for events for the given file handle.

func (*EpollPoller) Wait

func (e *EpollPoller) Wait(timeout time.Duration) error

Wait implements Poller.

func (*EpollPoller) WakeupThreadsafe

func (e *EpollPoller) WakeupThreadsafe() error

WakeupThreadsafe implements Poller.

type EpollSocket

type EpollSocket struct {
	// contains filtered or unexported fields
}

EpollSocket is a wrapper for a low-level socket file descriptor.

func NewSocket

func NewSocket(fd int) *EpollSocket

NewSocket wraps the given file descriptor using an EpollSocket.

func (*EpollSocket) Close

func (s *EpollSocket) Close() error

Close implements io.Closer.

func (*EpollSocket) Fd

func (s *EpollSocket) Fd() uintptr

Fd implements Fder.

func (*EpollSocket) Read

func (s *EpollSocket) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*EpollSocket) Write

func (s *EpollSocket) Write(p []byte) (n int, err error)

Write implements io.Writer.

type EventLoop

type EventLoop struct {
	// contains filtered or unexported fields
}

EventLoop implements the core mechanism for processing callbacks and I/O events.

func NewEventLoop

func NewEventLoop() *EventLoop

NewEventLoop constructs a new EventLoop.

func RunningLoop

func RunningLoop(ctx context.Context) *EventLoop

RunningLoop returns the EventLoop running in the current context. If no EventLoop is running, this function will panic. This function should not generally be called from a manually launched goroutine.

func RunningLoopMaybe

func RunningLoopMaybe(ctx context.Context) (loop *EventLoop, ok bool)

RunningLoopMaybe returns the EventLoop running in the current context, or nil with ok == false if no loop is running.

func (*EventLoop) Dial

func (e *EventLoop) Dial(ctx context.Context, network, address string) (*AsyncStream, error)

Dial opens a new network connection.

func (*EventLoop) DialLines

func (e *EventLoop) DialLines(ctx context.Context, network, address string) AsyncIterable[[]byte]

DialLines is a convenience method that calls EventLoop.Dial followed by AsyncStream.Lines. The connection attempt will be deferred until the AsyncIterable is ranged over. If the connection fails, the connection error will be returned immediately on the first iteration.

func (*EventLoop) Pipe

func (e *EventLoop) Pipe() (r, w *AsyncStream, err error)

Pipe creates two streams, where writing to w will make the written data available from r.

func (*EventLoop) Run

func (e *EventLoop) Run(ctx context.Context, main Coroutine1) error

Run starts the event loop with the given coroutine as the main task. The loop will exit once the main task has exited and there are no pending callbacks.

func (*EventLoop) RunCallback

func (e *EventLoop) RunCallback(callback func())

RunCallback schedules a callback for immediate execution by the event loop. Not threadsafe; use EventLoop.RunCallbackThreadsafe to schedule callbacks from other threads.

func (*EventLoop) RunCallbackThreadsafe

func (e *EventLoop) RunCallbackThreadsafe(ctx context.Context, callback func())

RunCallbackThreadsafe schedules a callback for immediate execution on the event loop's thread.

func (*EventLoop) ScheduleCallback

func (e *EventLoop) ScheduleCallback(delay time.Duration, callback func()) *Callback

ScheduleCallback schedules a callback to be executed after the given duration.

func (*EventLoop) WaitForCallbacks

func (e *EventLoop) WaitForCallbacks() *Future[any]

WaitForCallbacks returns a Future that will complete once there are no pending callback functions.

func (*EventLoop) Yield

func (e *EventLoop) Yield(ctx context.Context, fut Futurer) error

Yield yields control to the event loop for one tick, allowing pending callbacks and I/O to be processed.

type Fder

type Fder interface {
	io.ReadWriteCloser
	// Fd returns the file descriptor of this handle.
	Fd() uintptr
}

Fder represents a file handle that has an associated file descriptor.

type Future

type Future[ResType any] struct {
	// contains filtered or unexported fields
}

Future is a value container representing the result of a pending operation. It will run any callbacks registered using [Futurer.AddDoneCallback] or [Awaitable.AddResultCallback] once populated with a result using either Future.SetResult or [Futurer.Cancel].

func Go

func Go[T any](ctx context.Context, f func(ctx context.Context) (T, error)) *Future[T]

Go launches the given function in a goroutine and returns a Future that will complete when the goroutine finishes.

func NewFuture

func NewFuture[ResType any]() *Future[ResType]

NewFuture returns a new Future instance ready to be awaited or populated with a result.

func (*Future[ResType]) AddDoneCallback

func (f *Future[ResType]) AddDoneCallback(callback func(error)) Futurer

AddDoneCallback implements Futurer.

func (*Future[ResType]) AddResultCallback

func (f *Future[ResType]) AddResultCallback(callback func(ResType, error)) Awaitable[ResType]

AddResultCallback implements Awaitable.

func (*Future[ResType]) Await

func (f *Future[ResType]) Await(ctx context.Context) (ResType, error)

Await implements Awaitable.

func (*Future[ResType]) Cancel

func (f *Future[ResType]) Cancel(err error)

Cancel implements Futurer.

func (*Future[ResType]) Err

func (f *Future[ResType]) Err() error

Err implements Futurer.

func (*Future[ResType]) Future

func (f *Future[ResType]) Future() *Future[ResType]

Future implements Awaitable.

func (*Future[ResType]) HasResult

func (f *Future[ResType]) HasResult() bool

HasResult implements Futurer.

func (*Future[ResType]) MustAwait

func (f *Future[ResType]) MustAwait(ctx context.Context) ResType

MustAwait implements Awaitable.

func (*Future[ResType]) Result

func (f *Future[ResType]) Result() (ResType, error)

Result implements Awaitable.

func (*Future[ResType]) SetResult

func (f *Future[ResType]) SetResult(result ResType, err error)

SetResult populates this Future with a result. This will mark the Future as completed, and the provided result will be propagated to any registered callbacks and returned from any future calls to [Awaitable.Result].

func (*Future[ResType]) Shield

func (f *Future[ResType]) Shield() *Future[ResType]

Shield implements Awaitable.

Example
package main

import (
	"context"
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	_ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		shielded := asyncigo.NewFuture[any]()
		task1 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) {
			fmt.Println("waiting for shielded...")
			return shielded.Shield().Await(ctx)
		})

		unshielded := asyncigo.NewFuture[any]()
		task2 := asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) {
			fmt.Println("waiting for unshielded...")
			return unshielded.Await(ctx)
		})

		// yield to the event loop for one tick to initialise the tasks
		_ = asyncigo.RunningLoop(ctx).Yield(ctx, nil)

		task1.Cancel(nil)
		task2.Cancel(nil)

		fmt.Println("task1:", task1.Err())
		fmt.Println("task2:", task2.Err())
		fmt.Println("shielded:", shielded.Err())
		fmt.Println("unshielded:", unshielded.Err())
		return nil
	})
}
Output:

waiting for shielded...
waiting for unshielded...
task1: context canceled
task2: context canceled
shielded: <nil>
unshielded: context canceled

func (*Future[ResType]) WriteResultTo

func (f *Future[ResType]) WriteResultTo(dest *ResType) Awaitable[ResType]

WriteResultTo implements Awaitable.

type Futurer

type Futurer interface {
	// HasResult reports whether this Futurer has completed.
	// This is true if the Futurer has a result, or if it has been cancelled.
	HasResult() bool
	// Err returns a non-nil error if it has been cancelled
	// or completed with an error.
	Err() error
	// AddDoneCallback registers a type-unaware callback to run once this Futurer
	// completes or is cancelled. If called when the Futurer has already completed,
	// the callback will be run immediately.
	AddDoneCallback(callback func(error)) Futurer
	// Cancel cancels this Futurer. If err is nil, the Futurer
	// will be canceled with [context.Canceled]. If the Futurer
	// has already completed, this has no effect.
	Cancel(err error)
}

Futurer is an untyped view of an Awaitable, useful for storing heterogeneous Awaitable instances in a container.

type Iterable

type Iterable[V any] interface {
	~func(func(V) bool)
}

Iterable represents any function that can be ranged over and which yields one value.

type Iterable2

type Iterable2[K, V any] interface {
	~func(func(K, V) bool)
}

Iterable2 represents any function that can be ranged over and which yields two values.

type Iterator

type Iterator[V any] iter.Seq[V]

Iterator is a function that can be ranged over, yielding one value each iteration.

func AsIterator

func AsIterator[V any, VS []V](slice VS) Iterator[V]

AsIterator returns an Iterator which yields the values in the slice in turn.

func Chain

func Chain[T any, TS Iterable[T]](its ...TS) Iterator[T]

Chain yields each value from each of the given iterators as a single flat stream.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Chain(
		asyncigo.AsIterator([]int{1, 2, 3, 4}),
		asyncigo.AsIterator([]int{5, 6, 7}),
		asyncigo.AsIterator([]int{8, 9, 10, 11, 12}),
	)

	fmt.Println(it.Collect())
}
Output:

[1 2 3 4 5 6 7 8 9 10 11 12]

func Count

func Count[T constraints.Integer](start T) Iterator[T]

Count returns an infinite iterator which yields every integer starting at start.

func Filter

func Filter[T, TS Iterable[T]](it TS, f func(T) bool) Iterator[T]

Filter returns an iterator which yields the values from the given iterator in turn, skipping any values for which the given filtering function return false.

func FlatMap

func FlatMap[T, U any, TS Iterable[T], US Iterable[U]](it TS, f func(T) US) Iterator[U]

FlatMap returns an iterator which yields the values from the iterators returned by the mapping function as a single flat stream of values.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.FlatMap(
		asyncigo.Range(5),
		func(v int) asyncigo.Iterator[int] {
			return asyncigo.Range(v + 1)
		},
	)

	fmt.Println(it.Collect())
}
Output:

[0 0 1 0 1 2 0 1 2 3 0 1 2 3 4]

func Flatten

func Flatten[T any, TS Iterable[T], TSS Iterable[TS]](its TSS) Iterator[T]

Flatten yields each value from each of the nested iterators yielded by the given iterator as a single flat stream.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Flatten(func(yield func(iterator asyncigo.Iterator[int]) bool) {
		_ = yield(asyncigo.AsIterator([]int{1, 2, 3, 4})) &&
			yield(asyncigo.AsIterator([]int{5, 6, 7})) &&
			yield(asyncigo.AsIterator([]int{8, 9, 10, 11, 12}))
	})

	fmt.Println(it.Collect())
}
Output:

[1 2 3 4 5 6 7 8 9 10 11 12]

func Iter

func Iter[V any, VI Iterable[V]](it VI) Iterator[V]

func Map

func Map[T, U any, TS Iterable[T]](it TS, f func(T) U) Iterator[U]

Map returns an iterator which yields the result of passing each value from the given iterator through the provided mapping function in turn.

func Range

func Range[T constraints.Integer](count T) Iterator[T]

Range returns an iterator which yields every integer from 0 up to, but not including, count.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Range(5)

	for i := range it {
		fmt.Println(i)
	}
}
Output:

0
1
2
3
4

func Uniq

func Uniq[V comparable, VS Iterable[V]](it VS) Iterator[V]

Uniq returns an iterator which yields the values from the given iterator in turn, skipping any already encountered values.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Uniq(
		asyncigo.AsIterator([]int{1, 3, 5, 5, 8, 5, 2, 3, 9, 7, 3, 3, 4, 1}),
	)

	fmt.Println(it.Collect())
}
Output:

[1 3 5 8 2 9 7 4]

func (Iterator[V]) Collect

func (i Iterator[V]) Collect() []V

Collect consumes the Iterator and returns the yielded values as a slice.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Iter(func(yield func(int) bool) {
		a, b := 0, 1
		for a < 20 {
			if !yield(a) {
				return
			}
			a, b = a+b, a
		}
	})

	fmt.Println(it.Collect())
}
Output:

[0 1 1 2 3 5 8 13]

type MapIterator

type MapIterator[K comparable, V any] iter.Seq2[K, V]

MapIterator is a function that can be ranged over, yielding two values each iteration.

func AsIterator2

func AsIterator2[K comparable, V any](m map[K]V) MapIterator[K, V]

AsIterator2 returns a MapIterator which yields each key-value pair in the map.

func Enumerate

func Enumerate[T constraints.Integer, U any, UI Iterable[U]](start T, it UI) MapIterator[T, U]

Enumerate returns an iterator which yields each value from the given iterator along with its index.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.Enumerate(5, asyncigo.AsIterator([]string{"a", "b", "c", "d"}))

	for i, v := range it {
		fmt.Printf("%d: %s\n", i, v)
	}
}
Output:

5: a
6: b
7: c
8: d

func MapIter

func MapIter[K comparable, V any, I Iterable2[K, V]](it I) MapIterator[K, V]

func (MapIterator[K, V]) Collect

func (mi MapIterator[K, V]) Collect() map[K]V

Collect consumes the MapIterator and returns the yielded values as a map.

Example
package main

import (
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	it := asyncigo.MapIter(func(yield func(int, string) bool) {
		_ = yield(5, "a") &&
			yield(10, "b") &&
			yield(15, "c")
	})

	m := it.Collect()
	for k := range m {
		fmt.Printf("%d: %s\n", k, m[k])
	}
}
Output:

5: a
10: b
15: c

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex provides a simple asynchronous locking mechanism for coroutines. Mutex is not threadsafe.

func (*Mutex) Lock

func (m *Mutex) Lock(ctx context.Context) error

Lock locks the Mutex. If the Mutex is already locked, the calling coroutine will be suspended until unlocked.

func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks the Mutex.

type Poller

type Poller interface {
	// Close closes this Poller.
	Close() error
	// Wait waits for one or more I/O events. If an I/O event occurs, Wait
	// should wake up any coroutines waiting on [AsyncReadWriteCloser.WaitForReady]
	// for the corresponding file handle.
	Wait(timeout time.Duration) error
	// WakeupThreadsafe instructs the Poller to stop waiting and return control to the event loop.
	WakeupThreadsafe() error
	// Pipe constructs a pair of asynchronous file handles where writing to w
	// causes the same data to be read from r.
	Pipe() (r, w AsyncReadWriteCloser, err error)
	// Dial opens a non-blocking network connection.
	Dial(ctx context.Context, network, address string) (AsyncReadWriteCloser, error)
}

Poller represents a type that can wait for multiple I/O events simultaneously.

func NewPoller

func NewPoller() (Poller, error)

NewPoller constructs a new EpollPoller. Will fail if an epoll handle could not be created.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue provides a basic asynchronous queue. Queue is not threadsafe.

func (*Queue[T]) Get

func (q *Queue[T]) Get() *Future[T]

Get pops the first item from the Queue. The returned Future will resolve to the popped item once data is available.

func (*Queue[T]) Push

func (q *Queue[T]) Push(item T)

Push adds an item to the Queue.

type Task

type Task[RetType any] struct {
	// contains filtered or unexported fields
}

Task is responsible for driving a coroutine, intercepting any Awaitable instances awaited from the coroutine and advancing the coroutine once the pending Awaitable completes.

func SpawnTask

func SpawnTask[RetType any](ctx context.Context, coro Coroutine2[RetType]) *Task[RetType]

SpawnTask starts the given coroutine as a background task.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/arvidfm/asyncigo"
)

func main() {
	_ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		var counter int
		var tasks []asyncigo.Futurer
		for range 100000 {
			tasks = append(tasks, asyncigo.SpawnTask(ctx, func(ctx context.Context) (any, error) {
				for range 10 {
					counter++
					_ = asyncigo.Sleep(ctx, time.Millisecond*500)
				}
				return nil, nil
			}))
		}

		_ = asyncigo.Wait(ctx, asyncigo.WaitAll, tasks...)

		fmt.Println(counter)
		return nil
	})
}
Output:

1000000

func (*Task[_]) AddDoneCallback

func (t *Task[_]) AddDoneCallback(callback func(error)) Futurer

AddDoneCallback implements Futurer.

func (*Task[RetType]) AddResultCallback

func (t *Task[RetType]) AddResultCallback(callback func(result RetType, err error)) Awaitable[RetType]

AddResultCallback implements Awaitable.

func (*Task[RetType]) Await

func (t *Task[RetType]) Await(ctx context.Context) (RetType, error)

Await implements Awaitable.

func (*Task[_]) Cancel

func (t *Task[_]) Cancel(err error)

Cancel implements Futurer.

Example

When a task has been cancelled, it will continue running, but any calls to [Awaitable.Await] will immediately return context.Canceled. It's the responsibility of the task to stop early when cancelled.

package main

import (
	"context"
	"fmt"

	"github.com/arvidfm/asyncigo"
)

func main() {
	_ = asyncigo.NewEventLoop().Run(context.Background(), func(ctx context.Context) error {
		futs := make([]asyncigo.Future[int], 10)
		task := asyncigo.SpawnTask(ctx, func(ctx context.Context) (int, error) {
			for i := range futs {
				result, err := futs[i].Await(ctx)
				fmt.Printf("%d: (%v, %v)\n", i, result, err)
			}
			return 0, nil
		})

		loop := asyncigo.RunningLoop(ctx)
		for i := range futs {
			if i == 5 {
				task.Cancel(nil)
			}

			_ = loop.Yield(ctx, nil)
			futs[i].SetResult(i, nil)
		}

		result, err := task.Await(ctx)
		fmt.Printf("task result: (%v, %v)", result, err)
		return nil
	})
}
Output:

0: (0, <nil>)
1: (1, <nil>)
2: (2, <nil>)
3: (3, <nil>)
4: (4, <nil>)
5: (0, context canceled)
6: (0, context canceled)
7: (0, context canceled)
8: (0, context canceled)
9: (0, context canceled)
task result: (0, context canceled)

func (*Task[_]) Err

func (t *Task[_]) Err() error

Err implements Futurer.

func (*Task[RetType]) Future

func (t *Task[RetType]) Future() *Future[RetType]

Future implements Awaitable.

func (*Task[_]) HasResult

func (t *Task[_]) HasResult() bool

HasResult implements Futurer.

func (*Task[RetType]) MustAwait

func (t *Task[RetType]) MustAwait(ctx context.Context) RetType

MustAwait implements Awaitable.

func (*Task[RetType]) Result

func (t *Task[RetType]) Result() (RetType, error)

Result implements Awaitable.

func (*Task[RetType]) Shield

func (t *Task[RetType]) Shield() *Future[RetType]

Shield implements Awaitable.

func (*Task[_]) Stop

func (t *Task[_]) Stop()

Stop aborts the coroutine, preventing any further awaits. You should generally use [Futurer.Cancel] instead.

func (*Task[RetType]) WriteResultTo

func (t *Task[RetType]) WriteResultTo(dst *RetType) Awaitable[RetType]

WriteResultTo implements Awaitable.

type WaitMode

type WaitMode int

WaitMode modifies the behaviour of Wait.

const (
	WaitFirstResult WaitMode = iota // wait until any future has a result or an error
	WaitFirstError                  // wait until any future has an error or until all futures have completed
	WaitAll                         // wait until all futures have completed or errored
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL