syncsafe

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2022 License: MIT Imports: 7 Imported by: 0

README

Go Report Card Codacy Badge CI codecov GoDoc Licenses

sync.safe 🛟

Introduction

syncsafe package provides synchronization mechanisms similar to native sync package but in more defensive way.

  • WaitGroup implementation gives you a way of waiting with context addressing the risk of indefinite hanging because of stuck jobs inside whatever reasons are.
  • TaggedWaitGroup provides a way of having more insights on pending counters tagging every Add operation.

Usage

Installation
go get github.com/go-ext/syncsafe
WaitGroup examples
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
wg := NewWaitGroup()
for i := 0; i < 3; i++ {
    wg.Add(1)
    go func(int i) {
        defer wg.Done()
        time.Sleep(time.Second * time.Duration(i))
    }(i)
}
if err := wg.WaitContext(ctx); err != nil {
    log.Fatal(err, err.StackTrace())
}
TaggedWaitGroup examples
wg := NewTaggedWaitGroup()
doneCalcJob := wg.Add("calculate-job", 1)
doneSendJob := wg.Add("send-job", 1)
go func() {
    // After a while
    doneCalcJob()
    fmt.Println(wg.Counters()) // Will print map[send-job:1]
    doneSendJob()
}()
wg.Wait()

Documentation

Overview

Package syncsafe package provides synchronization mechanisms similar to native sync package but in more defensive way

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DoneFn added in v1.1.0

type DoneFn func()

DoneFn specifies a type of TaggedWaitGroup done function which decreases counter increased with Add call previously

type StackError

type StackError interface {
	error
	StackTrace() string
}

StackError specifies an object providing an error along with stack trace

type TaggedWaitGroup added in v1.1.0

type TaggedWaitGroup struct {
	// contains filtered or unexported fields
}

TaggedWaitGroup provides a way to wait for all async routines to be done exactly as sync.WaitGroup does it but providing more controllable ways of waiting, avoiding infinite blocking in case of any unexpected circumstances. It also gives a way of tagging every Add operation and have insights on pending counters (by tags) at any time. Due to tagging specifics, this kind of wait group doesn't provide a Done method from top level explicitly but returns a done function for a specific tag from Add call. Done function behaves in exactly the same way as native sync.WaitGroup decreasing particular tag counter by 1. TaggedWaitGroup instance is a single use only to prevent potential unnecessary mess in case of re-using

func NewTaggedWaitGroup added in v1.1.0

func NewTaggedWaitGroup() *TaggedWaitGroup

NewTaggedWaitGroup returns a new instance of WaitGroup

func (*TaggedWaitGroup) Add added in v1.1.0

func (g *TaggedWaitGroup) Add(tag string, delta int64) DoneFn

Add increases by 1 wait group counter having provided tag

Example
wg := NewTaggedWaitGroup()
doneCalcJob := wg.Add("calculate-job", 1)
doneSendJob := wg.Add("send-job", 1)
go func() {
	// After a while
	doneCalcJob()
	doneSendJob()
}()
wg.Wait()
Output:

func (*TaggedWaitGroup) Counters added in v1.1.0

func (g *TaggedWaitGroup) Counters() map[string]int64

Counters returns counters' current state

Example
wg := NewTaggedWaitGroup()
_ = wg.Add("calculate-job", 1)
doneSendJob := wg.Add("send-job", 1)
fmt.Println("Before done", wg.Counters()) // Will print map[calculate-job:1 send-job:1]

// After a while
doneSendJob()
fmt.Println("After done", wg.Counters()) // Will print map[calculate-job:1]
Output:

func (*TaggedWaitGroup) Wait added in v1.1.0

func (g *TaggedWaitGroup) Wait()

Wait blocks the routine until all tagged counters are zero

Example
wg := NewTaggedWaitGroup()
for i := 0; i < 3; i++ {
	done := wg.Add(fmt.Sprintf("some-job-%d", i), 1)
	go func() {
		defer done()
		// Some work
		time.Sleep(time.Millisecond * 100)
	}()
}
wg.Wait()
Output:

func (*TaggedWaitGroup) WaitChan added in v1.1.0

func (g *TaggedWaitGroup) WaitChan() <-chan struct{}

WaitChan returns a channel which can be used to implement custom wait handling behavior Channel will be closed once wait group all counters are zero

Example
// Set timeout to 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
wg := NewTaggedWaitGroup()
for i := 0; i < 3; i++ {
	done := wg.Add(fmt.Sprintf("some-job-%d", i), 1)
	go func(i int) {
		defer done()
		// Some work will take longer than timeout
		time.Sleep(time.Second * time.Duration(i))
	}(i)
}
select {
case <-wg.WaitChan():
case <-ctx.Done():
	log.Fatal("context cancelled before wait group done")
}
Output:

func (*TaggedWaitGroup) WaitContext added in v1.1.0

func (g *TaggedWaitGroup) WaitContext(ctx context.Context) StackError

WaitContext blocks the routine until all counters are zero or ctx is done, whatever comes first An appropriate error will be returned if ctx is done before counters are zero

Example
// Set timeout to 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
wg := NewTaggedWaitGroup()
for i := 0; i < 3; i++ {
	done := wg.Add(fmt.Sprintf("some-job-%d", i), 1)
	go func(i int) {
		defer done()
		// Some work will take longer than timeout
		time.Sleep(time.Second * time.Duration(i))
	}(i)
}
if err := wg.WaitContext(ctx); err != nil {
	log.Fatal(err, ": ", err.StackTrace())
}
Output:

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

WaitGroup provides a way to wait for all async routines to be done exactly as sync.WaitGroup does it but providing more controllable ways of waiting, avoiding infinite blocking in case of any unexpected circumstances WaitGroup instance is a single use only to prevent potential unnecessary mess in case of re-using

func NewWaitGroup

func NewWaitGroup() *WaitGroup

NewWaitGroup returns a new instance of WaitGroup

func (*WaitGroup) Add

func (g *WaitGroup) Add(delta int64)

Add adds delta to the wait group counter delta could be negative to implement done behavior with value >1, but it will panic if wait group counter goes to negative

func (*WaitGroup) Done

func (g *WaitGroup) Done()

Done decreases wait group counter by 1

func (*WaitGroup) Wait

func (g *WaitGroup) Wait()

Wait blocks the routine until counter is zero in exactly the same behavior as native sync.WaitGroup does

Example
wg := NewWaitGroup()
for i := 0; i < 3; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(time.Millisecond * 100)
	}()
}
wg.Wait()
Output:

func (*WaitGroup) WaitChan

func (g *WaitGroup) WaitChan() <-chan struct{}

WaitChan returns a channel which can be used to implement custom wait handling behavior Channel will be closed once wait group counter is zero

Example
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
wg := NewWaitGroup()
for i := 0; i < 3; i++ {
	wg.Add(1)
	go func(i int) {
		defer wg.Done()
		time.Sleep(time.Second * 2)
	}(i)
}
select {
case <-wg.WaitChan():
case <-ctx.Done():
	log.Fatal("context cancelled before wait group done")
}
Output:

func (*WaitGroup) WaitContext

func (g *WaitGroup) WaitContext(ctx context.Context) StackError

WaitContext blocks the routine until counter is zero or ctx is done, whatever comes first An appropriate error will be returned if ctx is done before the counter is zero

Example
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
wg := NewWaitGroup()
for i := 0; i < 3; i++ {
	wg.Add(1)
	go func(i int) {
		defer wg.Done()
		time.Sleep(time.Second * time.Duration(i))
	}(i)
}
if err := wg.WaitContext(ctx); err != nil {
	log.Fatal(err, err.StackTrace())
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL