channels

package module
v0.0.0-...-165da1f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. Using this instead of other, simpler, queue implementations (slice+append or linked list) provides substantial memory and time benefits, and fewer GC pauses.

The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Eager maximizes responsiveness at the expense of higher resource usage,
	// which can reduce throughput under certain conditions.
	// This strategy is meant for worker pools that will operate at a small percentage of their capacity
	// most of the time and may occasionally receive bursts of tasks. It's the default strategy.
	Eager = func() ResizingStrategy { return RatedResizer(1) }
	// Balanced tries to find a balance between responsiveness and throughput.
	// It's suitable for general purpose worker pools or those
	// that will operate close to 50% of their capacity most of the time.
	Balanced = func() ResizingStrategy { return RatedResizer(maxProcs / 2) }
	// Lazy maximizes throughput at the expense of responsiveness.
	// This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
	Lazy = func() ResizingStrategy { return RatedResizer(maxProcs) }
)

Preset pool resizing strategies

View Source
var (
	// TODO: Need to figure out a wau for channel closed logic
	ErrPipeHasBeanClosed = errors.New("the pipe has already benn closed")
)

Errors that are used throughout the Tunny API.

Functions

func NewAutoScaleTaskManager

func NewAutoScaleTaskManager[I any](ctx *TaskManagerContext[I], threshold, minThreshold int) *autoScaleTaskManager[I]

func NewRestrictedTaskManager

func NewRestrictedTaskManager[I any](ctx *TaskManagerContext[I], size int) *restrictedTaskManager[I]

func NewmonoTaskManager

func NewmonoTaskManager[I any](ctx *TaskManagerContext[I]) *monoTaskManager[I]

func RegisterBuffer

func RegisterBuffer(prefix string, meter metric.Meter, buffer Buffer)

func RegisterTaskManager

func RegisterTaskManager[I any](prefix string, meter metric.Meter, taskManager TaskManager[I])

Types

type BlackHole

type BlackHole[I any] struct {
	// contains filtered or unexported fields
}

BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.

func NewBlackHole

func NewBlackHole[I any]() *BlackHole[I]

func (*BlackHole[I]) Cap

func (ch *BlackHole[I]) Cap() int

func (*BlackHole[I]) Close

func (ch *BlackHole[I]) Close()

func (*BlackHole[I]) In

func (ch *BlackHole[I]) In() chan I

func (*BlackHole[I]) Len

func (ch *BlackHole[I]) Len() int

type Buffer

type Buffer interface {
	Len() int // The number of elements currently buffered.
	Cap() int // The maximum number of elements that can be buffered.
}

type Channel

type Channel[T any] interface {
	UnbufferedChannel[T]
	Buffer
}

type ConfigFn

type ConfigFn[I any] func(*config[I]) error

func WithAutoScaleTaskManager

func WithAutoScaleTaskManager[I any](minThreshold, threshold int) ConfigFn[I]

func WithChannel

func WithChannel[I, O any](channel Channel[I], prefix string, meter metric.Meter) ConfigFn[I]

func WithContext

func WithContext[I, O any](ctx context.Context) ConfigFn[I]

func WithMonoTaskManager

func WithMonoTaskManager[I any]() ConfigFn[I]

func WithRestrictedTaskManager

func WithRestrictedTaskManager[I any](size int) ConfigFn[I]

func WithUnbufferedChannel

func WithUnbufferedChannel[I, O any](channel UnbufferedChannel[I]) ConfigFn[I]

Will be blocking channels

type DefaultQueue

type DefaultQueue[T any] struct {
	// contains filtered or unexported fields
}

Queue represents a single instance of the queue data structure.

func NewDefaultQueue

func NewDefaultQueue[T any]() *DefaultQueue[T]

New constructs and returns a new Queue.

func NewDefaultQueueWithSize

func NewDefaultQueueWithSize[T any](size int) *DefaultQueue[T]

func (*DefaultQueue[T]) Add

func (q *DefaultQueue[T]) Add(elem T)

Add puts an element on the end of the queue.

func (*DefaultQueue[T]) Get

func (q *DefaultQueue[T]) Get(i int) T

Get returns the element at index i in the queue. If the index is invalid, the call will panic. This method accepts both positive and negative index values. Index 0 refers to the first element, and index -1 refers to the last.

func (*DefaultQueue[T]) Length

func (q *DefaultQueue[T]) Length() int

Length returns the number of elements currently stored in the queue.

func (*DefaultQueue[T]) Peek

func (q *DefaultQueue[T]) Peek() T

Peek returns the element at the head of the queue. This call panics if the queue is empty.

func (*DefaultQueue[T]) Poll

func (q *DefaultQueue[T]) Poll() T

Remove removes and returns the element from the front of the queue. If the queue is empty, the call will panic.

type InChannel

type InChannel[T any] interface {
	UnbufferedInChannel[T]
	Buffer
}

type Native

type Native[T any] chan T

Native implements the Channel interface by wrapping a native go channel.

func NewNative

func NewNative[T any](size int) Native[T]

NewNative makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().

func (Native[T]) Cap

func (ch Native[T]) Cap() int

func (Native[T]) Close

func (ch Native[T]) Close()

func (Native[T]) In

func (ch Native[T]) In() chan T

func (Native[T]) Len

func (ch Native[T]) Len() int

func (Native[T]) Out

func (ch Native[T]) Out() <-chan T

type Operation

type Operation[I any] func(context.Context, I) error

type OperationWithResutl

type OperationWithResutl[I, O any] func(context.Context, I, chan<- O) error

type OutChannel

type OutChannel[T any] interface {
	UnbufferedOutChannel[T]
	Buffer
}

type Pipe

type Pipe[I, O any] struct {
	Close   context.CancelFunc
	Returns chan<- O
	UnbufferedInChannel[I]
	TaskManager[I]
	// contains filtered or unexported fields
}

func NewPipe

func NewPipe[I, O any](operation OperationWithResutl[I, O], configs ...ConfigFn[I]) *Pipe[I, O]

func (*Pipe[I, O]) InTo

func (p *Pipe[I, O]) InTo(other UnbufferedInChannel[O]) UnbufferedInChannel[O]

func (*Pipe[I, O]) Push

func (p *Pipe[I, O]) Push(task I)

type Plug

type Plug[I any] struct {
	Close context.CancelFunc
	UnbufferedInChannel[I]
	TaskManager[I]
	// contains filtered or unexported fields
}

func NewPlug

func NewPlug[I any](operation Operation[I], configs ...ConfigFn[I]) *Plug[I]

type Queue

type Queue[T any] interface {
	// Length returns the number of elements currently stored in the queue.
	Length() int
	// Peek returns the element at the head of the queue. will retirn nil
	// if the queue is empty.
	Peek() T
	// Poll removes and returns the element from the front of the queue. If the
	// queue is empty, the retirn nil.
	Poll() T
	// Add puts an element on the end of the queue.
	Add(T)
}

type QueueChannel

type QueueChannel[T any] struct {
	// contains filtered or unexported fields
}

QueueChannel implements the Channel interface with an infinite buffer between the input and the output.

func NewQueueChannel

func NewQueueChannel[T any](buffer Queue[T]) *QueueChannel[T]

func NewQueueChannelWithInputSize

func NewQueueChannelWithInputSize[T any](buffer Queue[T], size int) *QueueChannel[T]

func (*QueueChannel[T]) Cap

func (ch *QueueChannel[T]) Cap() int

func (*QueueChannel[T]) Close

func (ch *QueueChannel[T]) Close()

func (*QueueChannel[T]) In

func (ch *QueueChannel[T]) In() chan T

func (*QueueChannel[T]) Len

func (ch *QueueChannel[T]) Len() int

func (*QueueChannel[T]) Out

func (ch *QueueChannel[T]) Out() <-chan T

type ResizingStrategy

type ResizingStrategy interface {
	Resize(runningWorkers, minWorkers, maxWorkers int) bool
}

func RatedResizer

func RatedResizer(rate int) ResizingStrategy

RatedResizer creates a resizing strategy which can be configured to create workers at a specific rate when the pool has no idle workers. rate: determines the number of tasks to receive before creating an extra worker. A value of 3 can be interpreted as: "Create a new worker every 3 tasks".

type TaskManager

type TaskManager[I any] interface {
	Active() int
	Threshold() int
	Skipped() int
}

type TaskManagerContext

type TaskManagerContext[I any] struct {
	Operation Operation[I]
	context.Context
	UnbufferedChannel[I]
}

type UnbufferedChannel

type UnbufferedChannel[T any] interface {
	UnbufferedInChannel[T]
	UnbufferedOutChannel[T]
}

type UnbufferedInChannel

type UnbufferedInChannel[T any] interface {
	In() chan T // The writeable end of the channel.
	Close()     // Closes the channel. It is an error to write to In() after calling Close().
}

type UnbufferedOutChannel

type UnbufferedOutChannel[T any] interface {
	Out() <-chan T // The readable end of the channel.
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL