threads

package module
v0.0.0-...-8388d70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: Unlicense Imports: 7 Imported by: 0

README

Threads

Threads is an easy-to-use tool for managing groups of Goroutines in common use-cases.

Example usage:

g := threads.NewGroup(ctx)

g.Go(func(ctx context.Context) error {
	return DoSomeTask(ctx)
})

g.Go(threads.PeriodicWorker(10*time.Second, func(ctx context.Context) error {
	fmt.Println("every 10 seconds this message will show up")
	return nil
}))

err := g.Wait()

It also forwards panics from the Goroutines to the waiting Goroutine which can be useful in some situations:

defer func() {
	if r := recover(); r != nil {
		fmt.Println("panic forwarded to main:", r)
	}
}()

ctx := context.Background()

g := threads.NewGroup(ctx)

g.Go(func(ctx context.Context) error {
	panic("foo")
})

g.Go(func(ctx context.Context) error {
	<-ctx.Done()
	fmt.Println("context canceled because of the panic on the other Goroutine")
	return nil
})

// A panic on any of the Goroutines will cause g.Wait() to panic
// immediately without waiting for the remaining Goroutines, but they
// will still receive a cancel signal so they can make a graceful
// shutdown.
g.Wait()

fmt.Println("not going to print this because of the panic")

Note: The panic redirection feature only works if you also call g.Wait(), if a panic occurs before g.Wait() is called it will just go through as it would on a normal Goroutine.

Main Features:

This library allows you to:

  1. Create several Goroutines easily and wait for them to return
  2. If any of the Goroutines return an error this group cancels the context causing a graceful shutdown.
  3. The graceful shutdown mechanism also simplifies the error handling as you can just wait for the .Wait() function to return and handle the error at that point.
  4. If any of the Goroutines panics after .Wait() has been called the panic will be forwarded from the original goroutine to the waiting Goroutine causing the .Wait() function to panic. This is useful if you want to perform a graceful shutdown on the main goroutine for example.

Helper Functions

PeriodicWorker

The threads.PeriodicWorker is a useful helper function that allows you to create a worker that will run periodically until it either returns an error or the context is cancelled, e.g.:

g := threads.NewGroup(ctx)

// A worker that runs immediately at start and then once every second:
g.Go(threads.PeriodicWorker(1*time.Second, func(ctx context.Context) error {
	fmt.Println("one second has passed: %v", time.Now())
	return nil
}))

g.Wait()

This worker is particularly useful because if the context is cancelled it will perform a graceful shutdown, so you don't have to write this behavior youself.

If you want to write unit tests for this worker there is a way of mocking the time.After call done inside of it:

ctx, cancel := context.WithCancel(ctx)
defer cancel()

ctx = threads.ContextWithTimeAfterMock(ctx, func(triggerCh chan time.Time, waitCh chan time.Duration) {
	<-waitCh                 // Wait until time.After is called
	triggerCh <- time.Time{} // Makes <-time.After return
	<-waitCh                 // Waits again
	cancel()                 // Forces the worker to stop:
})

g := threads.NewGroup(ctx)

// A worker that runs immediately at start and
// then once again when triggerCh receives a message:
count := 0
g.Go(threads.PeriodicWorker(1*time.Hour, func(ctx context.Context) error {
	count++
	fmt.Printf("Run count: %v\n", count)
	return nil
}))

g.Wait()
Safe Functions

safe.Get and safe.Set can be used to perform thread safe gets and sets on any variable passing a mux as its first argument e.g.:

mux := &sync.Mutex{}
var shared int

// Thread safe operations:
v := safe.Get(mux, &shared)
safe.Set(mux, &shared, v+1)

The safe.Do function is a convenient way of running any code inside a mutex Lock/Unlock window:

mux := &sync.Mutex{}
var shared int

// Thread safe operation:
err := safe.Do(mux, func() error {
  shared = shared + 1
})

LICENSE

This project was created by Blackpoint Cyber to help the community, it uses a public domain license meaning you can copy and use any part of it without worrying about any restrictions.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRestartGroup = fmt.Errorf("signal to restart the current threads.Group")
View Source
var ErrStartGracefulShutdown = fmt.Errorf("signal to stop the execution gracefully")

Functions

func AdjustInterval

func AdjustInterval(d time.Duration) error

func ContextWithTimeMock

func ContextWithTimeMock(ctx context.Context, t timeAfter) context.Context

func ForkAndWait

func ForkAndWait(ctx context.Context, fns ...Worker) error

func RetryWorkerIn

func RetryWorkerIn(d time.Duration) error

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

func NewGroup

func NewGroup(ctx context.Context) Group

func (*Group) Go

func (g *Group) Go(fn Worker)

func (*Group) SubGroup

func (g *Group) SubGroup(workers ...Worker)

func (Group) Wait

func (g Group) Wait() error

type Worker

type Worker func(ctx context.Context) error

func PeriodicWorker

func PeriodicWorker[T intervalType](
	baseIterationInterval T,
	doWork Worker,
) Worker

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL