reactor

package
v0.0.0-...-052ef2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultInvokeQueueSize = 1024 * 1
	DefaultWakeQueueSize   = 1024 * 1
	DefaultSpawnQueueSize  = 1024 * 1
)
View Source
const (
	DefaultEnqueueTimeout = time.Second * 10
)

Variables

View Source
var (
	ErrQueueFull = errors.New("queue full")
	ErrStop      = errors.New("stop")
)
View Source
var (
	Micros50 = Cadence{Durations: []time.Duration{
		time.Microsecond * 50,
		time.Microsecond * 100,
		time.Microsecond * 150,
		time.Microsecond * 200,
		time.Microsecond * 250,
		time.Microsecond * 350,
		time.Microsecond * 500,
		time.Microsecond * 750,
		time.Microsecond * 900,
		time.Microsecond * 1000,
	}}
	Micros250 = Cadence{Durations: []time.Duration{
		time.Microsecond * 250,
		time.Microsecond * 500,
		time.Microsecond * 750,
		time.Microsecond * 1000,
	}}
	Millis5 = Cadence{Durations: []time.Duration{
		time.Millisecond * 5,
		time.Millisecond * 10,
		time.Millisecond * 20,
		time.Millisecond * 30,
		time.Millisecond * 50,
		time.Millisecond * 100,
		time.Millisecond * 200,
		time.Millisecond * 250,
		time.Millisecond * 500,
	}}
	Millis10 = Cadence{Durations: []time.Duration{
		time.Millisecond * 10,
		time.Millisecond * 20,
		time.Millisecond * 30,
		time.Millisecond * 50,
		time.Millisecond * 100,
		time.Millisecond * 200,
		time.Millisecond * 250,
		time.Millisecond * 500,
	}}
	Millis20 = Cadence{Durations: []time.Duration{
		time.Millisecond * 20,
		time.Millisecond * 40,
		time.Millisecond * 60,
		time.Millisecond * 100,
		time.Millisecond * 200,
		time.Millisecond * 500,
	}}
	Millis25 = Cadence{Durations: []time.Duration{
		time.Millisecond * 25,
		time.Millisecond * 50,
		time.Millisecond * 75,
		time.Millisecond * 100,
		time.Millisecond * 150,
		time.Millisecond * 250,
		time.Millisecond * 500,
		time.Millisecond * 750,
	}}
	Millis50 = Cadence{Durations: []time.Duration{
		time.Millisecond * 50,
		time.Millisecond * 100,
		time.Millisecond * 200,
		time.Millisecond * 250,
		time.Millisecond * 500,
		time.Millisecond * 750,
	}}
	Millis100 = Cadence{Durations: []time.Duration{
		time.Millisecond * 100,
		time.Millisecond * 200,
		time.Millisecond * 500,
	}}
	Millis250 = Cadence{Durations: []time.Duration{
		time.Millisecond * 250,
		time.Millisecond * 500,
		time.Millisecond * 750,
	}}
	Millis500 = Cadence{Durations: []time.Duration{
		time.Millisecond * 500,
	}}
	Seconds = Cadence{Durations: []time.Duration{
		time.Second,
		time.Second * 2,
		time.Second * 3,
		time.Second * 5,
		time.Second * 10,
		time.Second * 15,
		time.Second * 20,
		time.Second * 30,
		time.Second * 45,
	}}
	Minutes = Cadence{Durations: []time.Duration{
		time.Second * 5,
		time.Second * 60,
		time.Minute * 2,
		time.Minute * 3,
		time.Minute * 5,
		time.Minute * 10,
		time.Minute * 15,
		time.Minute * 20,
		time.Minute * 30,
	}}
	Hours = Cadence{Durations: []time.Duration{
		time.Minute * 5,
		time.Hour,
		time.Hour * 2,
		time.Hour * 4,
		time.Hour * 6,
		time.Hour * 12,
		time.Hour * 24,
	}}
)

Functions

func EnqueueBlocking

func EnqueueBlocking(task func()) bool

func Init

func Init(
	numLoops int,
	tick Cadence,
	queueSize int,
	blockingQueueSize int,
)

func NumReactors

func NumReactors() int

func PollFnPointer

func PollFnPointer(poll func(event Context) error) unsafe.Pointer

func PollToPollFnPointer

func PollToPollFnPointer(future Future) unsafe.Pointer

Types

type BlockingPool

type BlockingPool struct {
	// contains filtered or unexported fields
}

BlockingPool executes tasks that may block, but *should execute rather quickly <1s. These tasks are forbidden to sleep. Use a worker for those types of tasks. This pool has a fixed number of worker goroutines and tasks are spread among them in round-robin fashion.

func NewBlockingPool

func NewBlockingPool(numWorkers, queueSize int) *BlockingPool

func (*BlockingPool) Checkpoint

func (b *BlockingPool) Checkpoint()

func (*BlockingPool) Close

func (b *BlockingPool) Close() error

func (*BlockingPool) Enqueue

func (b *BlockingPool) Enqueue(fn func()) bool

func (*BlockingPool) EnqueueTimeout

func (b *BlockingPool) EnqueueTimeout(fn func(), timeout time.Duration) bool

type Cadence

type Cadence struct {
	Durations []time.Duration
}

func (*Cadence) Tick

func (c *Cadence) Tick() time.Duration

type CloseEvent

type CloseEvent struct {
	Task   *Task
	Time   int64
	Reason any
}

type Config

type Config struct {
	Name         string
	Level1Wheel  Wheel
	Level2Wheel  Wheel
	Level3Wheel  Wheel
	InvokeQSize  int
	WakeQSize    int
	SpawnQSize   int
	LockOSThread bool
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

type Context

type Context struct {
	// Task is the associated Task object.
	Task *Task
	// Time is a possible low frequency time from the reactor.
	// Each Reactor loop pass captures a high frequency nano-time which
	// is what this field is set to. High frequency time is relatively
	// expensive 10-30ns and a sudden burst of task polls or function invokes
	// can add 10-30ns per call which adds up.
	//
	// Generally, this has a precision in the nanosecond to microsecond level
	// and possibly single-digit millisecond to tick duration. Values beyond this
	// means the Reactor is overloaded.
	Time int64
	// Interval is the current interval of the Task or 0 if no interval is set.
	// If the Interval value changes, then the Reactor will subscribe or resubscribe
	// or unsubscribe as necessary.
	Interval time.Duration
	// After represents the amount of time to wait before waking. This is a one-time
	// wake and generally used for cooperative scheduling (sharing CPU between all Tasks).
	After time.Duration
	// Reason why Poll was called
	Reason PollReason
}

Context provides the low-level management of a Task with its Reactor.

func (*Context) Reactor

func (p *Context) Reactor() *Reactor

Reactor the Task belongs to.

func (*Context) SetInterval

func (p *Context) SetInterval(duration time.Duration)

SetInterval sets the interval for the task

func (*Context) Stop

func (p *Context) Stop()

Stop marks the task to be stopped and deleted

func (*Context) WakeAfter

func (p *Context) WakeAfter(duration time.Duration)

WakeAfter wakes this task again after the specified time.Duration

func (*Context) WakeOnNextTick

func (p *Context) WakeOnNextTick()

WakeOnNextTick this task again on the next tick

type Func

type Func struct {
}

type FuncMap

type FuncMap struct{}

type FuncSlot

type FuncSlot struct {
	Value func()
	// contains filtered or unexported fields
}

func (*FuncSlot) LastWake

func (fs *FuncSlot) LastWake() int64

func (*FuncSlot) Remove

func (fs *FuncSlot) Remove() bool

type FuncStats

type FuncStats struct {
	// contains filtered or unexported fields
}

type FuncSwapSlice

type FuncSwapSlice struct {
	// contains filtered or unexported fields
}

func (*FuncSwapSlice) Add

func (s *FuncSwapSlice) Add(value *FuncSlot)

func (*FuncSwapSlice) Get

func (s *FuncSwapSlice) Get(index int) (value *FuncSlot, ok bool)

func (*FuncSwapSlice) Iterate

func (s *FuncSwapSlice) Iterate(fn func(slot *FuncSlot) bool)

func (*FuncSwapSlice) LastWake

func (s *FuncSwapSlice) LastWake() int64

func (*FuncSwapSlice) Len

func (s *FuncSwapSlice) Len() int

func (*FuncSwapSlice) Remove

func (s *FuncSwapSlice) Remove(value *FuncSlot) bool

func (*FuncSwapSlice) Unsafe

func (s *FuncSwapSlice) Unsafe() []*FuncSlot

type Future

type Future interface {
	Poll(ctx Context) error
}

type FutureTask

type FutureTask interface {
	Future

	Task() *Task

	SetTask(task *Task)
}

type ObjectMap

type ObjectMap struct{}

type PollClose

type PollClose interface {
	PollClose(ev CloseEvent) error
}

type PollReason

type PollReason uint8
const (
	ReasonStart          PollReason = 0 // ReasonStart first time Poll is invoked after creating Task
	ReasonWake           PollReason = 1 // ReasonWake the Task is awoken. Namaste
	ReasonInterval       PollReason = 2 // ReasonInterval the Task's interval has elapsed.
	ReasonIntervalBehind PollReason = 3 // ReasonIntervalBehind the Task has missed interval wakes due to overloaded Reactor
	ReasonPing           PollReason = 4 // ReasonPing
	ReasonClose          PollReason = 5 // ReasonClose the Task will immediately close on return
)

type Reactor

type Reactor struct {
	Stats
	// contains filtered or unexported fields
}

Reactor runs all tasks on a single goroutine. It has an optimized timing mechanism with a fixed tickDur duration and a fixed interval duration. The interval is broken down into slots for better resource allocation. For example, a tickDur duration of 4ms with 5 slots gives an interval duration of 20ms with each 4ms handling ~20% of the load. The timing is constantly adjusting to ensure the tickDur duration is accurate from the start adjusting for CPU Time. In addition, there is a lock-free MPMC queue that accepts invokes to run immediately without having to wait for a Tick.

func NewReactor

func NewReactor(config Config) (*Reactor, error)

func NextReactor

func NextReactor() *Reactor

func (*Reactor) CheckGID

func (r *Reactor) CheckGID() bool

func (*Reactor) Duration

func (r *Reactor) Duration(ticks int64) time.Duration

func (*Reactor) ID

func (r *Reactor) ID() int

func (*Reactor) Invoke

func (r *Reactor) Invoke(fn func()) bool

func (*Reactor) InvokeBlocking

func (r *Reactor) InvokeBlocking(fn func()) bool

func (*Reactor) InvokeRef

func (r *Reactor) InvokeRef(fn *func()) bool

func (*Reactor) Now

func (r *Reactor) Now() int64

func (*Reactor) Print

func (r *Reactor) Print()

func (*Reactor) SnapshotStats

func (r *Reactor) SnapshotStats() Stats

func (*Reactor) Spawn

func (r *Reactor) Spawn(future Future) (*Task, error)

func (*Reactor) SpawnInterval

func (r *Reactor) SpawnInterval(future Future, interval time.Duration) (*Task, error)

func (*Reactor) SpawnWorkerFn

func (r *Reactor) SpawnWorkerFn(fn func()) error

func (*Reactor) Start

func (r *Reactor) Start()

func (*Reactor) Ticks

func (r *Reactor) Ticks(duration time.Duration) int64

func (*Reactor) Wake

func (r *Reactor) Wake(task *Task) error

func (*Reactor) WakeAfter

func (r *Reactor) WakeAfter(task *Task, after time.Duration) error

type Runnable

type Runnable interface{}

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) CheckGID

func (t *Task) CheckGID() bool

func (*Task) ID

func (t *Task) ID() int64

func (*Task) Interval

func (t *Task) Interval() time.Duration

func (*Task) LastPoll

func (t *Task) LastPoll() int64

func (*Task) Poll

func (t *Task) Poll() Future

func (*Task) Polls

func (t *Task) Polls() int64

func (*Task) Reactor

func (t *Task) Reactor() *Reactor

func (*Task) SetStop

func (t *Task) SetStop(stop bool)

func (*Task) Started

func (t *Task) Started() int64

func (*Task) Stop

func (t *Task) Stop() bool

func (*Task) Wake

func (t *Task) Wake() error

func (*Task) WakeAfter

func (t *Task) WakeAfter(duration time.Duration) error

func (*Task) Wakes

func (t *Task) Wakes() int64

type TaskProvider

type TaskProvider struct {
	// contains filtered or unexported fields
}

func (*TaskProvider) CheckGID

func (tp *TaskProvider) CheckGID() bool

func (*TaskProvider) Reactor

func (tp *TaskProvider) Reactor() *Reactor

func (*TaskProvider) SetTask

func (tp *TaskProvider) SetTask(task *Task)

func (*TaskProvider) Task

func (tp *TaskProvider) Task() *Task

func (*TaskProvider) Wake

func (tp *TaskProvider) Wake() error

type TaskSet

type TaskSet struct {
	// contains filtered or unexported fields
}

func (*TaskSet) Add

func (tl *TaskSet) Add(value FutureTask) (*TaskSlot, error)

func (*TaskSet) AddFunc

func (tl *TaskSet) AddFunc(r *Reactor, value func()) (*FuncSlot, error)

func (*TaskSet) Close

func (tl *TaskSet) Close() error

func (*TaskSet) IsEmpty

func (tl *TaskSet) IsEmpty() bool

func (*TaskSet) LastSoftWakeLatency

func (tl *TaskSet) LastSoftWakeLatency() int64

func (*TaskSet) LastWakeLatency

func (tl *TaskSet) LastWakeLatency() int64

func (*TaskSet) NumEntries

func (tl *TaskSet) NumEntries() int

func (*TaskSet) NumReactors

func (tl *TaskSet) NumReactors() int

func (*TaskSet) Release

func (tl *TaskSet) Release() bool

func (*TaskSet) Spawn

func (tl *TaskSet) Spawn(future FutureTask) (*Task, error)

func (*TaskSet) SpawnInterval

func (tl *TaskSet) SpawnInterval(
	future FutureTask,
	interval time.Duration,
) (*Task, error)

func (*TaskSet) SpawnIntervalOn

func (tl *TaskSet) SpawnIntervalOn(
	reactor *Reactor,
	future FutureTask,
	interval time.Duration,
) (*Task, error)

func (*TaskSet) SpawnOn

func (tl *TaskSet) SpawnOn(reactor *Reactor, future FutureTask) (*Task, error)

func (*TaskSet) Stop

func (tl *TaskSet) Stop() int64

func (*TaskSet) Wake

func (tl *TaskSet) Wake() error

type TaskSlot

type TaskSlot struct {
	// contains filtered or unexported fields
}

func (*TaskSlot) LastWake

func (ts *TaskSlot) LastWake() int64

func (*TaskSlot) Remove

func (ts *TaskSlot) Remove() bool

type TaskSwapSlice

type TaskSwapSlice struct {
	// contains filtered or unexported fields
}

func (*TaskSwapSlice) Add

func (s *TaskSwapSlice) Add(value *TaskSlot)

func (*TaskSwapSlice) Get

func (s *TaskSwapSlice) Get(index int) (value *TaskSlot, ok bool)

func (*TaskSwapSlice) Iterate

func (s *TaskSwapSlice) Iterate(fn func(slot *TaskSlot) bool) int64

func (*TaskSwapSlice) LastWake

func (s *TaskSwapSlice) LastWake() int64

func (*TaskSwapSlice) Len

func (s *TaskSwapSlice) Len() int

func (*TaskSwapSlice) Remove

func (s *TaskSwapSlice) Remove(value *TaskSlot) bool

func (*TaskSwapSlice) Unsafe

func (s *TaskSwapSlice) Unsafe() []*TaskSlot

type Tick

type Tick struct {
	Time      int64
	Tick      int64
	Dur       time.Duration
	Precision time.Duration
}

type TickListener

type TickListener struct {
	// contains filtered or unexported fields
}

func (*TickListener) Chan

func (tl *TickListener) Chan() <-chan int64

func (*TickListener) Close

func (tl *TickListener) Close() error

type Ticker

type Ticker struct {
	// contains filtered or unexported fields
}

func StartTicker

func StartTicker(duration time.Duration) *Ticker

func (*Ticker) Close

func (t *Ticker) Close() error

func (*Ticker) Register

func (t *Ticker) Register(duration time.Duration, owner interface{}, ch chan int64) (*TickListener, error)

type WakeList

type WakeList struct {
	// contains filtered or unexported fields
}

func (*WakeList) Close

func (w *WakeList) Close() error

func (*WakeList) Len

func (w *WakeList) Len() int

func (*WakeList) Reactor

func (w *WakeList) Reactor() *Reactor

type WakeLists

type WakeLists struct {
	// contains filtered or unexported fields
}

func (*WakeLists) Ensure

func (w *WakeLists) Ensure(reactorID int) bool

func (*WakeLists) Get

func (w *WakeLists) Get(reactorID int) (*WakeList, bool)

func (*WakeLists) GetOrCreate

func (w *WakeLists) GetOrCreate(reactorID int, tl *TaskSet) (*WakeList, bool)

func (*WakeLists) Len

func (w *WakeLists) Len() int

func (*WakeLists) Put

func (w *WakeLists) Put(reactorID int, l *WakeList)

type Waker

type Waker interface {
	Wake() error
}

type Wheel

type Wheel struct {
	// contains filtered or unexported fields
}

func NewWheel

func NewWheel(cadence Cadence) Wheel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL