xsync

package module
v0.0.0-...-69047f5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2020 License: MIT Imports: 9 Imported by: 0

README

xsync

GoDoc Travis

Priortizable and cancellable synchronization primitives in Go.

Usage

xsync.WorkerGroup

package main

import (
	"sync"

	"github.com/gobwas/xsync"
)

func main() {
	var wg xsync.WorkerGroup
	wg.Exec(
		xsync.Demand{
			Priority: xsync.BytePriority(42),
		},
		xsync.TaskFunc(func(ctx *xsync.WorkerContext) {

		}),
	})
	wg.Close()
}

xsync.Cond

package main

import (
	"sync"

	"github.com/gobwas/xsync"
)

func main() {
	var mu sync.Mutex
	cond := xsync.Cond{
		L: &mu,
	}
	mu.Lock()
	for !condition {
		cond.Wait(xsync.Demand{
			Priority: xsync.BytePriority(42),
		})
	}
	// action on condition.
	mu.Unlock()
}

examples

For more examples please take a look into examples folder.

Why?

The idea of this package appeared year ago or something when I was playing with high-performant concurrent programming. The main intention of the package is to provide an ability to select which kind of task should be performed when execution resources are limited.

Status

This package is one of those projects that might be developed but then not then released anywhen. Fortunately I found time and motivation to complete it and share with the community.

Current API is not fixed, thus there is no v1.0.0 tag yet. My main concerns are related to the Demand interface which I don't like much. Maybe I will change it over time.

Performance

Goroutine blocking mechanics are built over channels. Thus, it can't be more efficient than standard libarary's sync package. There are few benchmarks in tests, so feel free to run them.

Documentation

Overview

Package xsync provides priortizable and cancellable synchronization primitives such as condition variable.

Code generated by running "make generics". DO NOT EDIT.

Code generated by running "make generics". DO NOT EDIT.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCanceled = errors.New("xsync: canceled")
	ErrClosed   = errors.New("xsync: closed")
)

Errors returned by package structs.

Functions

func WithPriority

func WithPriority(ctx context.Context, p Priority) context.Context

WithPriority returns context with associated priority p.

Types

type BytePriority

type BytePriority byte

BytePriority is a priority specified by byte.

func (BytePriority) Compare

func (b BytePriority) Compare(x Priority) int

Compare implements Priority interface. If given priority is not BytePriority, it always returns 1.

type Cond

type Cond struct {
	L sync.Locker
	// contains filtered or unexported fields
}

Cond contains logic of checking condition and waiting for a condition change.

Waiting for condition change could be done with cancelation channel. This is the main intention of this cond existence.

func (*Cond) Broadcast

func (c *Cond) Broadcast()

Broadcast wakes all goroutines waiting on c.

func (*Cond) Signal

func (c *Cond) Signal()

Signal wakes n goroutines waiting on c, if there is any.

func (*Cond) Wait

func (c *Cond) Wait(d Demand) (err error)

Wait unlocks c.L and suspends execution of the calling goroutine.

Unlike sync.Cond Wait() can return before awoken by Signal() if and only if given cancelation channel become filled or closed. In that case returned err is ErrCanceled.

After later resume of execution, Wait() locks c.L before returning.

type Demand

type Demand struct {
	// Priority represends demand priority.
	Priority Priority

	// Cancel is a demand cancelation channel.
	Cancel <-chan struct{}
}

Demand represents a caller's demand on condition.

func ContextDemand

func ContextDemand(ctx context.Context) Demand

ContextDemand is a helper function which constructs Demand structure which Cancel field is set to ctx.Done(), and Priority is set to result of ContextPriority(ctx).

type LowestPriority

type LowestPriority int64

LowestPriority is a priority which must be treated as lower than any other priority.

Integer value it holds is used to compare instances which are both of LowestPriority type.

func (LowestPriority) Compare

func (p LowestPriority) Compare(x Priority) int

Compare implements Priority interface.

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

A Mutex is a mutual exclusion lock with ability to be locked with priority and/or cancellation. The zero value for a Mutex is an unlocked mutex.

A Mutex must not be copied after first use.

func (*Mutex) Lock

func (m *Mutex) Lock(d Demand) error

Lock locks m.

Unlike sync.Mutex if lock is already in use Lock() can return before Unlock() if and only if given demand's cancellation channel became filled of closed. In that case returned err is ErrCanceled.

func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks m. It panics if m is not locked on entry to Unlock().

type Priority

type Priority interface {
	// Compare compares itself with given priority.
	//
	// The result must be zero if both priorities are equal, less than zero if
	// given priority is higher, and greater than zero if given priority is
	// lower.
	//
	// Note that given argument might be nil.
	// When using with the WorkerGroup argument might also be of LowestPriority
	// type, which must be treated as always lower priority than application's.
	Compare(Priority) int
}

func ContextPriority

func ContextPriority(ctx context.Context) Priority

ContextPriority returns a priority associated with given context.

type Task

type Task interface {
	// Exec executes the task.
	// Given context holds the worker goroutine related info. Context might be
	// canceled when worker group is closing.
	Exec(*WorkerContext)
}

Task is the interface that holds task implementaion.

type TaskFunc

type TaskFunc func(*WorkerContext)

TaskFunc is an adapter to allow the use of ordinary functions as Task.

func (TaskFunc) Exec

func (f TaskFunc) Exec(ctx *WorkerContext)

Exec implements Task.

type WorkerContext

type WorkerContext struct {
	context.Context
	// contains filtered or unexported fields
}

WorkerContext represents worker goroutine context.

func (*WorkerContext) ID

func (c *WorkerContext) ID() uint32

ID returns worker identifier within its group. Returned id is always an integer that is less than worker group size.

NOTE: ID might be reused after worker exit as idle.

type WorkerGroup

type WorkerGroup struct {
	// QueueSize specifies the size of the internal tasks queue. Note that
	// workers fetch tasks from the queue in accordance with task priority (if
	// any was given).
	//
	// The greater queue size the more tasks with high priority will be
	// executed at first. The less queue size, the less difference in execution
	// order between tasks with different priorities.
	//
	// Note that FetchSize field also affects workers behaviour.
	QueueSize int

	// FetchSize specifies how many tasks will be pulled from the queue per
	// each scheduling cycle.
	//
	// The smaller FetchSize the higher starvation rate for the low priority
	// tasks. Contrariwise, when FetchSize is equal to QueueSize, then all
	// previously scheduled tasks will be fetched from the queue; that is,
	// queue will be drained.
	//
	// FetchSize must not be greater than QueueSize.
	FetchSize int

	// SizeLimit specifies the capacity of the worker group.
	// If SizeLimit is zero then worker group will contain one worker.
	SizeLimit int

	// IdleLimit specifies the maximum number of idle workers.
	// When set, IdleTimeout must also be set.
	// If IdleLimit is zero then no idle limit is used.
	IdleLimit int

	// IdleTimeout specifies the duration after which worker is considered
	// idle.
	IdleTimeout time.Duration

	// OnStart is an optional callback that will be called right after worker
	// goroutine has started.
	OnStart func(*WorkerContext)

	// OnComplete is an optional callback that will be called right before
	// worker goroutine complete.
	OnComplete func(*WorkerContext)
	// contains filtered or unexported fields
}

WorkerGroup contains options and logic of managing worker goroutines and sharing work between them.

func (*WorkerGroup) Barrier

func (w *WorkerGroup) Barrier(ctx context.Context) error

Barrier waits for completion of all currently running tasks.

That is, having two workers in the group and three tasks T1, T2 and T3 scheduled and T1 and T2 executing, call to Barrier() will block until T1 and T2 are done. It doesn't guarantee that T3 is done as well. To be sure that all tasks in the queue are done, use Flush().

func (*WorkerGroup) Close

func (w *WorkerGroup) Close()

Close terminates all spawned goroutines. It returns when all goroutines and all tasks scheduled before are done.

func (*WorkerGroup) Exec

func (w *WorkerGroup) Exec(d Demand, t Task) error

Exec makes t to be executed in one of the running workers.

func (*WorkerGroup) Flush

func (w *WorkerGroup) Flush(ctx context.Context) error

Flush waits for completion of all task successfully scheduled before.

Note that Flush() call leads to one-time full queue fetch inside group. That is, it affects the priority of execution if w.FetchSize was set (and acts like w.FetchSize was not set).

func (*WorkerGroup) Shutdown

func (w *WorkerGroup) Shutdown(ctx context.Context) error

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL