sync

package
v0.0.0-...-460e94b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2020 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package sync implements primitives for synchronization between processes.

Index

Examples

Constants

View Source
const (

	// FUTEX_PRIVATE_FLAG is used to optimize futex usage for process-private futexes.
	FUTEX_PRIVATE_FLAG = 128
	// FUTEX_CLOCK_REALTIME is used to tell the kernel, that is must treat timeouts for
	// FUTEX_WAIT_BITSET, FUTEX_WAIT_REQUEUE_PI, and FUTEX_WAIT as an absolute time based on CLOCK_REALTIME
	FUTEX_CLOCK_REALTIME = 256
)
View Source
const (
	// CSemMaxVal is the maximum semaphore value,
	// which is guaranteed to be supported on all platforms.
	CSemMaxVal = 32767
)

Variables

View Source
var (
	// MaxCondWaiters is the maximum length of the waiting queue for this type of a cond.
	// This limit is actual for waitlist-based condvars, currently on windows and darwin.
	// If this limit is exceeded, Wait/WaitTimeout will panic with ErrTooManyWaiters.
	MaxCondWaiters = 128
	// ErrTooManyWaiters is an error, that indicates, that the waiting queue is full.
	ErrTooManyWaiters = errors.New("waiters limit has been reached")
)

Functions

func DestroyCond

func DestroyCond(name string) error

DestroyCond permanently removes condvar with the given name.

func DestroyEvent

func DestroyEvent(name string) error

DestroyEvent permanently destroys an event with the given name.

func DestroyFutexMutex

func DestroyFutexMutex(name string) error

DestroyFutexMutex permanently removes mutex with the given name.

func DestroyMutex

func DestroyMutex(name string) error

DestroyMutex permanently removes mutex with the given name.

func DestroyRWMutex

func DestroyRWMutex(name string) error

DestroyRWMutex permanently removes mutex with the given name.

func DestroySemaMutex

func DestroySemaMutex(name string) error

DestroySemaMutex permanently removes mutex with the given name.

func DestroySemaphore

func DestroySemaphore(name string) error

DestroySemaphore removes the semaphore permanently.

func DestroySpinMutex

func DestroySpinMutex(name string) error

DestroySpinMutex removes a mutex object with the given name

func FutexWait

func FutexWait(addr unsafe.Pointer, value int32, timeout time.Duration, flags int32) error

FutexWait checks if the the value equals futex's value. If it doesn't, Wait returns EWOULDBLOCK. Otherwise, it waits for the Wake call on the futex for not longer, than timeout.

func FutexWake

func FutexWake(addr unsafe.Pointer, count int32, flags int32) (int, error)

FutexWake wakes count threads waiting on the futex. Returns number of woken threads.

Types

type Cond

type Cond cond

Cond is a named interprocess condition variable.

Example
DestroyMutex("mut")
mut, err := NewMutex("mut", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new")
}
defer mut.Close()
DestroyCond("cond")
cond, err := NewCond("cond", os.O_CREATE|os.O_EXCL, 0666, mut)
if err != nil {
	panic("new")
}
defer cond.Close()
var sharedValue int
go func() {
	mut.Lock()
	defer mut.Unlock()
	sharedValue = 1
	cond.Signal()
}()
mut.Lock()
defer mut.Unlock()
if sharedValue == 0 {
	cond.Wait()
	if sharedValue == 0 {
		panic("bad value")
	}
}
Output:

func NewCond

func NewCond(name string, flag int, perm os.FileMode, l IPCLocker) (*Cond, error)

NewCond returns new interprocess condvar.

name - unique condvar name.
flag - a combination of open flags from 'os' package.
perm - object's permission bits.
l - a locker, associated with the shared resource.

func (*Cond) Broadcast

func (c *Cond) Broadcast()

Broadcast wakes all waiters.

func (*Cond) Close

func (c *Cond) Close() error

Close releases resources of the cond's shared state.

func (*Cond) Destroy

func (c *Cond) Destroy() error

Destroy permanently removes condvar.

func (*Cond) Signal

func (c *Cond) Signal()

Signal wakes one waiter.

func (*Cond) Wait

func (c *Cond) Wait()

Wait waits for the condvar to be signaled.

func (*Cond) WaitTimeout

func (c *Cond) WaitTimeout(timeout time.Duration) bool

WaitTimeout waits for the condvar to be signaled for not longer, than timeout.

type Event

type Event event

Event is a synchronization primitive used for notification. If it is signaled by a call to Set(), it'll stay in this state, unless someone calls Wait(). After it the event is reset into non-signaled state.

Example
event, err := NewEvent("event", os.O_CREATE|os.O_EXCL, 0666, false)
if err != nil {
	return
}
go func() {
	event.Set()
}()
if event.WaitTimeout(time.Millisecond * 250) {
	// event has been set
} else {
	// timeout elapsed
}
event.Destroy()
Output:

func NewEvent

func NewEvent(name string, flag int, perm os.FileMode, initial bool) (*Event, error)

NewEvent creates a new interprocess event. It uses the default implementation on the current platform.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.
initial - if true, the event will be set after creation.

func (*Event) Close

func (e *Event) Close() error

Close closes the event.

func (*Event) Destroy

func (e *Event) Destroy() error

Destroy permanently destroys the event.

func (*Event) Set

func (e *Event) Set()

Set sets the specified event object to the signaled state.

func (*Event) Wait

func (e *Event) Wait()

Wait waits for the event to be signaled.

func (*Event) WaitTimeout

func (e *Event) WaitTimeout(timeout time.Duration) bool

WaitTimeout waits until the event is signaled or the timeout elapses.

type FutexMutex

type FutexMutex struct {
	// contains filtered or unexported fields
}

FutexMutex is a mutex based on linux/freebsd futex object.

func NewFutexMutex

func NewFutexMutex(name string, flag int, perm os.FileMode) (*FutexMutex, error)

NewFutexMutex creates a new futex-based mutex. This implementation is based on a paper 'Futexes Are Tricky' by Ulrich Drepper, this document can be found in 'docs' folder.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.

func (*FutexMutex) Close

func (f *FutexMutex) Close() error

Close indicates, that the object is no longer in use, and that the underlying resources can be freed.

func (*FutexMutex) Destroy

func (f *FutexMutex) Destroy() error

Destroy removes the mutex object.

func (*FutexMutex) Lock

func (f *FutexMutex) Lock()

Lock locks the mutex. It panics on an error.

func (*FutexMutex) LockTimeout

func (f *FutexMutex) LockTimeout(timeout time.Duration) bool

LockTimeout tries to lock the locker, waiting for not more, than timeout.

func (*FutexMutex) TryLock

func (f *FutexMutex) TryLock() bool

TryLock makes one attempt to lock the mutex. It return true on succeess and false otherwise.

func (*FutexMutex) Unlock

func (f *FutexMutex) Unlock()

Unlock releases the mutex. It panics on an error, or if the mutex is not locked.

type IPCLocker

type IPCLocker interface {
	sync.Locker
	io.Closer
}

IPCLocker is a minimal interface, which must be satisfied by any synchronization primitive on any platform.

Example
DestroyMutex("mut")
mut, err := NewMutex("mut", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new")
}
defer mut.Close()
var sharedValue uint64
var wg sync.WaitGroup
wg.Add(8)
for i := 0; i < 8; i++ {
	go func() {
		defer wg.Done()
		mut, err := NewMutex("mut", 0, 0)
		if err != nil {
			panic("new")
		}
		defer mut.Close()
		for i := 0; i < 1000000; i++ {
			mut.Lock()
			sharedValue++
			mut.Unlock()
		}
	}()
}
wg.Wait()
if sharedValue != 8*1000000 {
	panic("invalid value ")
}
Output:

type RWMutex

type RWMutex struct {
	// contains filtered or unexported fields
}

RWMutex is a mutex, that can be held by any number of readers or one writer.

Example
const (
	writers = 4
	readers = 10
)
DestroyRWMutex("rw")
m, err := NewRWMutex("rw", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic(err)
}
// we create a shared array of consistently increasing ints for reading and wriring.
sharedData := make([]int, 128)
for i := range sharedData {
	sharedData[i] = i
}
var wg sync.WaitGroup
wg.Add(writers + readers)
// writers will update the data.
for i := 0; i < writers; i++ {
	go func() {
		defer wg.Done()
		start := rand.Intn(1024)
		m.Lock()
		for i := range sharedData {
			sharedData[i] = i + start
		}
		m.Unlock()
	}()
}
// readers will check the data.
for i := 0; i < readers; i++ {
	go func() {
		defer wg.Done()
		m.RLock()
		for i := 1; i < len(sharedData); i++ {
			if sharedData[i] != sharedData[i-1]+1 {
				panic("bad data")
			}
		}
		m.RUnlock()
	}()
}
wg.Wait()
fmt.Println("done")
Output:

done

func NewRWMutex

func NewRWMutex(name string, flag int, perm os.FileMode) (*RWMutex, error)

NewRWMutex returns new RWMutex

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.

func (*RWMutex) Close

func (rw *RWMutex) Close() error

Close closes shared state of the mutex.

func (*RWMutex) Destroy

func (rw *RWMutex) Destroy() error

Destroy closes the mutex and removes it permanently.

func (*RWMutex) Lock

func (rw *RWMutex) Lock()

Lock locks the mutex exclusively. It panics on an error.

func (*RWMutex) RLock

func (rw *RWMutex) RLock()

RLock locks the mutex for reading. It panics on an error.

func (*RWMutex) RLocker

func (rw *RWMutex) RLocker() IPCLocker

RLocker returns a Locker interface that implements the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.

func (*RWMutex) RUnlock

func (rw *RWMutex) RUnlock()

RUnlock desceases the number of mutex's readers. If it becomes 0, writers (if any) can proceed. It panics on an error, or if the mutex is not locked.

func (*RWMutex) Unlock

func (rw *RWMutex) Unlock()

Unlock releases the mutex. It panics on an error, or if the mutex is not locked.

type SemaMutex

type SemaMutex struct {
	// contains filtered or unexported fields
}

SemaMutex is a semaphore-based mutex for unix.

func NewSemaMutex

func NewSemaMutex(name string, flag int, perm os.FileMode) (*SemaMutex, error)

NewSemaMutex creates a new semaphore-based mutex.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.

func (*SemaMutex) Close

func (m *SemaMutex) Close() error

Close closes shared state of the mutex.

func (*SemaMutex) Destroy

func (m *SemaMutex) Destroy() error

Destroy closes the mutex and removes it permanently.

func (*SemaMutex) Lock

func (m *SemaMutex) Lock()

Lock locks the mutex. It panics on an error.

func (*SemaMutex) LockTimeout

func (m *SemaMutex) LockTimeout(timeout time.Duration) bool

LockTimeout tries to lock the locker, waiting for not more, than timeout.

func (*SemaMutex) TryLock

func (m *SemaMutex) TryLock() bool

TryLock makes one attempt to lock the mutex. It returns true on succeess and false otherwise.

func (*SemaMutex) Unlock

func (m *SemaMutex) Unlock()

Unlock releases the mutex. It panics on an error, or if the mutex is not locked.

type Semaphore

type Semaphore semaphore

Semaphore is a synchronization object with a resource counter, which can be used to control access to a shared resource. It provides access to actual OS semaphore primitive via:

CreateSemaprore on windows
semget on unix
Example
// create new semaphore with initial count set to 3.
DestroySemaphore("sema")
sema, err := NewSemaphore("sema", os.O_CREATE|os.O_EXCL, 0666, 3)
if err != nil {
	panic(err)
}
defer sema.Close()
// in the following cycle we consume three units of the resource and won't block.
for i := 0; i < 3; i++ {
	sema.Wait()
	fmt.Println("got one resource unit")
}
// the following two goroutines won't continue until we call Signal().
var wg sync.WaitGroup
wg.Add(2)
for i := 0; i < 2; i++ {
	go func() {
		defer wg.Done()
		// open existing semaphore
		sema, err := NewSemaphore("sema", 0, 0666, 0)
		if err != nil {
			panic(err)
		}
		defer sema.Close()
		sema.Wait()
		fmt.Println("got one resource unit after waiting")
	}()
}
// wake up goroutines
fmt.Println("waking up...")
sema.Signal(2)
wg.Wait()
fmt.Println("done")
Output:

got one resource unit
got one resource unit
got one resource unit
waking up...
got one resource unit after waiting
got one resource unit after waiting
done

func NewSemaphore

func NewSemaphore(name string, flag int, perm os.FileMode, initial int) (*Semaphore, error)

NewSemaphore creates new semaphore with the given name.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.
initial - this value will be added to the semaphore's value, if it was created.

func (*Semaphore) Close

func (s *Semaphore) Close() error

Close closes the semaphore.

func (*Semaphore) Signal

func (s *Semaphore) Signal(count int)

Signal increments the value of semaphore variable by 1, waking waiting process (if any).

func (*Semaphore) Wait

func (s *Semaphore) Wait()

Wait decrements the value of semaphore variable by -1, and blocks if the value becomes negative.

func (*Semaphore) WaitTimeout

func (s *Semaphore) WaitTimeout(timeout time.Duration) bool

WaitTimeout decrements the value of semaphore variable by 1. If the value becomes negative, it waites for not longer than timeout. On darwin and freebsd this func has some side effects, see sema_timed_bsd.go for details.

type SpinMutex

type SpinMutex struct {
	// contains filtered or unexported fields
}

SpinMutex is a synchronization object which performs busy wait loop.

func NewSpinMutex

func NewSpinMutex(name string, flag int, perm os.FileMode) (*SpinMutex, error)

NewSpinMutex creates a new spin mutex.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.

func (*SpinMutex) Close

func (spin *SpinMutex) Close() error

Close indicates, that the object is no longer in use, and that the underlying resources can be freed.

func (*SpinMutex) Destroy

func (spin *SpinMutex) Destroy() error

Destroy removes the mutex object.

func (*SpinMutex) Lock

func (spin *SpinMutex) Lock()

Lock locks the mutex waiting in a busy loop if needed.

func (*SpinMutex) LockTimeout

func (spin *SpinMutex) LockTimeout(timeout time.Duration) bool

LockTimeout locks the mutex waiting in a busy loop for not longer, than timeout.

func (*SpinMutex) TryLock

func (spin *SpinMutex) TryLock() bool

TryLock makes one attempt to lock the mutex. It return true on succeess and false otherwise.

func (*SpinMutex) Unlock

func (spin *SpinMutex) Unlock()

Unlock releases the mutex. It panics, if the mutex is not locked.

type TimedIPCLocker

type TimedIPCLocker interface {
	IPCLocker
	// LockTimeout tries to lock the locker, waiting for not more, than timeout
	LockTimeout(timeout time.Duration) bool
}

TimedIPCLocker is a locker, whose lock operation can be limited with duration.

Example
DestroyMutex("mut")
mut, err := NewMutex("mut", os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
	panic("new")
}
defer mut.Close()
tmut, ok := mut.(TimedIPCLocker)
if !ok {
	panic("not a timed locker")
}
var sharedValue int
rand.Seed(time.Now().Unix())
go func() {
	mut, err := NewMutex("mut", 0, 0)
	if err != nil {
		panic("new")
	}
	defer mut.Close()
	mut.Lock()
	// change value after [0..500] ms delay.
	time.Sleep(time.Duration(rand.Int()%6) * time.Millisecond * 100)
	sharedValue = 1
	mut.Unlock()
}()
// give another goroutine some time to lock the mutex.
time.Sleep(10 * time.Millisecond)
if tmut.LockTimeout(250 * time.Millisecond) {
	if sharedValue != 1 {
		panic("bad value")
	}
	tmut.Unlock()
} else {
} // timeout elapsed
Output:

func NewMutex

func NewMutex(name string, flag int, perm os.FileMode) (TimedIPCLocker, error)

NewMutex creates a new interprocess mutex. It uses the default implementation on the current platform.

name - object name.
flag - flag is a combination of open flags from 'os' package.
perm - object's permission bits.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL