Documentation ¶
Index ¶
- Constants
- Variables
- func EnqueueBlocking(task func()) bool
- func Init(numLoops int, tick Cadence, queueSize int, blockingQueueSize int)
- func NumReactors() int
- func PollFnPointer(poll func(event Context) error) unsafe.Pointer
- func PollToPollFnPointer(future Future) unsafe.Pointer
- type BlockingPool
- type Cadence
- type CloseEvent
- type Config
- type Conn
- type Context
- type Func
- type FuncMap
- type FuncSlot
- type FuncStats
- type FuncSwapSlice
- func (s *FuncSwapSlice) Add(value *FuncSlot)
- func (s *FuncSwapSlice) Get(index int) (value *FuncSlot, ok bool)
- func (s *FuncSwapSlice) Iterate(fn func(slot *FuncSlot) bool)
- func (s *FuncSwapSlice) LastWake() int64
- func (s *FuncSwapSlice) Len() int
- func (s *FuncSwapSlice) Remove(value *FuncSlot) bool
- func (s *FuncSwapSlice) Unsafe() []*FuncSlot
- type Future
- type FutureTask
- type ObjectMap
- type PollClose
- type PollReason
- type Reactor
- func (r *Reactor) CheckGID() bool
- func (r *Reactor) Duration(ticks int64) time.Duration
- func (r *Reactor) ID() int
- func (r *Reactor) Invoke(fn func()) bool
- func (r *Reactor) InvokeBlocking(fn func()) bool
- func (r *Reactor) InvokeRef(fn *func()) bool
- func (r *Reactor) Now() int64
- func (r *Reactor) Print()
- func (r *Reactor) SnapshotStats() Stats
- func (r *Reactor) Spawn(future Future) (*Task, error)
- func (r *Reactor) SpawnInterval(future Future, interval time.Duration) (*Task, error)
- func (r *Reactor) SpawnWorkerFn(fn func()) error
- func (r *Reactor) Start()
- func (r *Reactor) Ticks(duration time.Duration) int64
- func (r *Reactor) Wake(task *Task) error
- func (r *Reactor) WakeAfter(task *Task, after time.Duration) error
- type Runnable
- type Stats
- type Task
- func (t *Task) CheckGID() bool
- func (t *Task) ID() int64
- func (t *Task) Interval() time.Duration
- func (t *Task) LastPoll() int64
- func (t *Task) Poll() Future
- func (t *Task) Polls() int64
- func (t *Task) Reactor() *Reactor
- func (t *Task) SetStop(stop bool)
- func (t *Task) Started() int64
- func (t *Task) Stop() bool
- func (t *Task) Wake() error
- func (t *Task) WakeAfter(duration time.Duration) error
- func (t *Task) Wakes() int64
- type TaskProvider
- type TaskSet
- func (tl *TaskSet) Add(value FutureTask) (*TaskSlot, error)
- func (tl *TaskSet) AddFunc(r *Reactor, value func()) (*FuncSlot, error)
- func (tl *TaskSet) Close() error
- func (tl *TaskSet) IsEmpty() bool
- func (tl *TaskSet) LastSoftWakeLatency() int64
- func (tl *TaskSet) LastWakeLatency() int64
- func (tl *TaskSet) NumEntries() int
- func (tl *TaskSet) NumReactors() int
- func (tl *TaskSet) Release() bool
- func (tl *TaskSet) Spawn(future FutureTask) (*Task, error)
- func (tl *TaskSet) SpawnInterval(future FutureTask, interval time.Duration) (*Task, error)
- func (tl *TaskSet) SpawnIntervalOn(reactor *Reactor, future FutureTask, interval time.Duration) (*Task, error)
- func (tl *TaskSet) SpawnOn(reactor *Reactor, future FutureTask) (*Task, error)
- func (tl *TaskSet) Stop() int64
- func (tl *TaskSet) Wake() error
- type TaskSlot
- type TaskSwapSlice
- func (s *TaskSwapSlice) Add(value *TaskSlot)
- func (s *TaskSwapSlice) Get(index int) (value *TaskSlot, ok bool)
- func (s *TaskSwapSlice) Iterate(fn func(slot *TaskSlot) bool) int64
- func (s *TaskSwapSlice) LastWake() int64
- func (s *TaskSwapSlice) Len() int
- func (s *TaskSwapSlice) Remove(value *TaskSlot) bool
- func (s *TaskSwapSlice) Unsafe() []*TaskSlot
- type Tick
- type TickListener
- type Ticker
- type WakeList
- type WakeLists
- type Waker
- type Wheel
Constants ¶
const ( DefaultInvokeQueueSize = 1024 * 1 DefaultWakeQueueSize = 1024 * 1 DefaultSpawnQueueSize = 1024 * 1 )
const (
DefaultEnqueueTimeout = time.Second * 10
)
Variables ¶
var ( ErrQueueFull = errors.New("queue full") ErrStop = errors.New("stop") )
var ( Micros50 = Cadence{Durations: []time.Duration{ time.Microsecond * 50, time.Microsecond * 100, time.Microsecond * 150, time.Microsecond * 200, time.Microsecond * 250, time.Microsecond * 350, time.Microsecond * 500, time.Microsecond * 750, time.Microsecond * 900, time.Microsecond * 1000, }} Micros250 = Cadence{Durations: []time.Duration{ time.Microsecond * 250, time.Microsecond * 500, time.Microsecond * 750, time.Microsecond * 1000, }} Millis5 = Cadence{Durations: []time.Duration{ time.Millisecond * 5, time.Millisecond * 10, time.Millisecond * 20, time.Millisecond * 30, time.Millisecond * 50, time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 250, time.Millisecond * 500, }} Millis10 = Cadence{Durations: []time.Duration{ time.Millisecond * 10, time.Millisecond * 20, time.Millisecond * 30, time.Millisecond * 50, time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 250, time.Millisecond * 500, }} Millis20 = Cadence{Durations: []time.Duration{ time.Millisecond * 20, time.Millisecond * 40, time.Millisecond * 60, time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 500, }} Millis25 = Cadence{Durations: []time.Duration{ time.Millisecond * 25, time.Millisecond * 50, time.Millisecond * 75, time.Millisecond * 100, time.Millisecond * 150, time.Millisecond * 250, time.Millisecond * 500, time.Millisecond * 750, }} Millis50 = Cadence{Durations: []time.Duration{ time.Millisecond * 50, time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 250, time.Millisecond * 500, time.Millisecond * 750, }} Millis100 = Cadence{Durations: []time.Duration{ time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 500, }} Millis250 = Cadence{Durations: []time.Duration{ time.Millisecond * 250, time.Millisecond * 500, time.Millisecond * 750, }} Millis500 = Cadence{Durations: []time.Duration{ time.Millisecond * 500, }} Seconds = Cadence{Durations: []time.Duration{ time.Second, time.Second * 2, time.Second * 3, time.Second * 5, time.Second * 10, time.Second * 15, time.Second * 20, time.Second * 30, time.Second * 45, }} Minutes = Cadence{Durations: []time.Duration{ time.Second * 5, time.Second * 60, time.Minute * 2, time.Minute * 3, time.Minute * 5, time.Minute * 10, time.Minute * 15, time.Minute * 20, time.Minute * 30, }} Hours = Cadence{Durations: []time.Duration{ time.Minute * 5, time.Hour, time.Hour * 2, time.Hour * 4, time.Hour * 6, time.Hour * 12, time.Hour * 24, }} )
Functions ¶
func EnqueueBlocking ¶
func EnqueueBlocking(task func()) bool
func NumReactors ¶
func NumReactors() int
func PollToPollFnPointer ¶
Types ¶
type BlockingPool ¶
type BlockingPool struct {
// contains filtered or unexported fields
}
BlockingPool executes tasks that may block, but *should execute rather quickly <1s. These tasks are forbidden to sleep. Use a worker for those types of tasks. This pool has a fixed number of worker goroutines and tasks are spread among them in round-robin fashion.
func NewBlockingPool ¶
func NewBlockingPool(numWorkers, queueSize int) *BlockingPool
func (*BlockingPool) Checkpoint ¶
func (b *BlockingPool) Checkpoint()
func (*BlockingPool) Close ¶
func (b *BlockingPool) Close() error
func (*BlockingPool) Enqueue ¶
func (b *BlockingPool) Enqueue(fn func()) bool
func (*BlockingPool) EnqueueTimeout ¶
func (b *BlockingPool) EnqueueTimeout(fn func(), timeout time.Duration) bool
type CloseEvent ¶
type Context ¶
type Context struct { // Task is the associated Task object. Task *Task // Time is a possible low frequency time from the reactor. // Each Reactor loop pass captures a high frequency nano-time which // is what this field is set to. High frequency time is relatively // expensive 10-30ns and a sudden burst of task polls or function invokes // can add 10-30ns per call which adds up. // // Generally, this has a precision in the nanosecond to microsecond level // and possibly single-digit millisecond to tick duration. Values beyond this // means the Reactor is overloaded. Time int64 // Interval is the current interval of the Task or 0 if no interval is set. // If the Interval value changes, then the Reactor will subscribe or resubscribe // or unsubscribe as necessary. Interval time.Duration // After represents the amount of time to wait before waking. This is a one-time // wake and generally used for cooperative scheduling (sharing CPU between all Tasks). After time.Duration // Reason why Poll was called Reason PollReason }
Context provides the low-level management of a Task with its Reactor.
func (*Context) SetInterval ¶
SetInterval sets the interval for the task
func (*Context) WakeOnNextTick ¶
func (p *Context) WakeOnNextTick()
WakeOnNextTick this task again on the next tick
type FuncSwapSlice ¶
type FuncSwapSlice struct {
// contains filtered or unexported fields
}
func (*FuncSwapSlice) Add ¶
func (s *FuncSwapSlice) Add(value *FuncSlot)
func (*FuncSwapSlice) Iterate ¶
func (s *FuncSwapSlice) Iterate(fn func(slot *FuncSlot) bool)
func (*FuncSwapSlice) LastWake ¶
func (s *FuncSwapSlice) LastWake() int64
func (*FuncSwapSlice) Len ¶
func (s *FuncSwapSlice) Len() int
func (*FuncSwapSlice) Remove ¶
func (s *FuncSwapSlice) Remove(value *FuncSlot) bool
func (*FuncSwapSlice) Unsafe ¶
func (s *FuncSwapSlice) Unsafe() []*FuncSlot
type FutureTask ¶
type PollClose ¶
type PollClose interface {
PollClose(ev CloseEvent) error
}
type PollReason ¶
type PollReason uint8
const ( ReasonStart PollReason = 0 // ReasonStart first time Poll is invoked after creating Task ReasonWake PollReason = 1 // ReasonWake the Task is awoken. Namaste ReasonInterval PollReason = 2 // ReasonInterval the Task's interval has elapsed. ReasonIntervalBehind PollReason = 3 // ReasonIntervalBehind the Task has missed interval wakes due to overloaded Reactor ReasonPing PollReason = 4 // ReasonPing ReasonClose PollReason = 5 // ReasonClose the Task will immediately close on return )
type Reactor ¶
type Reactor struct { Stats // contains filtered or unexported fields }
Reactor runs all tasks on a single goroutine. It has an optimized timing mechanism with a fixed tickDur duration and a fixed interval duration. The interval is broken down into slots for better resource allocation. For example, a tickDur duration of 4ms with 5 slots gives an interval duration of 20ms with each 4ms handling ~20% of the load. The timing is constantly adjusting to ensure the tickDur duration is accurate from the start adjusting for CPU Time. In addition, there is a lock-free MPMC queue that accepts invokes to run immediately without having to wait for a Tick.
func NewReactor ¶
func NextReactor ¶
func NextReactor() *Reactor
func (*Reactor) InvokeBlocking ¶
func (*Reactor) SnapshotStats ¶
func (*Reactor) SpawnInterval ¶
func (*Reactor) SpawnWorkerFn ¶
type TaskProvider ¶
type TaskProvider struct {
// contains filtered or unexported fields
}
func (*TaskProvider) CheckGID ¶
func (tp *TaskProvider) CheckGID() bool
func (*TaskProvider) Reactor ¶
func (tp *TaskProvider) Reactor() *Reactor
func (*TaskProvider) SetTask ¶
func (tp *TaskProvider) SetTask(task *Task)
func (*TaskProvider) Task ¶
func (tp *TaskProvider) Task() *Task
func (*TaskProvider) Wake ¶
func (tp *TaskProvider) Wake() error
type TaskSet ¶
type TaskSet struct {
// contains filtered or unexported fields
}
func (*TaskSet) LastSoftWakeLatency ¶
func (*TaskSet) LastWakeLatency ¶
func (*TaskSet) NumEntries ¶
func (*TaskSet) NumReactors ¶
func (*TaskSet) SpawnInterval ¶
func (*TaskSet) SpawnIntervalOn ¶
type TaskSwapSlice ¶
type TaskSwapSlice struct {
// contains filtered or unexported fields
}
func (*TaskSwapSlice) Add ¶
func (s *TaskSwapSlice) Add(value *TaskSlot)
func (*TaskSwapSlice) LastWake ¶
func (s *TaskSwapSlice) LastWake() int64
func (*TaskSwapSlice) Len ¶
func (s *TaskSwapSlice) Len() int
func (*TaskSwapSlice) Remove ¶
func (s *TaskSwapSlice) Remove(value *TaskSlot) bool
func (*TaskSwapSlice) Unsafe ¶
func (s *TaskSwapSlice) Unsafe() []*TaskSlot
type TickListener ¶
type TickListener struct {
// contains filtered or unexported fields
}
func (*TickListener) Chan ¶
func (tl *TickListener) Chan() <-chan int64
func (*TickListener) Close ¶
func (tl *TickListener) Close() error
type WakeLists ¶
type WakeLists struct {
// contains filtered or unexported fields
}