queue

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2022 License: Apache-2.0 Imports: 8 Imported by: 12

Documentation

Index

Constants

View Source
const (
	ErrNotStarted    errhelper.ErrString = "not started"
	ErrStopped       errhelper.ErrString = "stopped"
	ErrKeyNotAllowed errhelper.ErrString = "key not allowed"
)

Errors for timeout queue

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job[K comparable] struct {
	Action JobAction
	Key    K
}

Job item to record action and related resource object

func (Job[K]) String

func (j Job[K]) String() string

type JobAction

type JobAction uint8
const (
	// JobActionAdd for jobs to add or create some resources
	JobActionAdd JobAction = 1 + iota

	// JobActionUpdate for jobs to update some resources
	JobActionUpdate

	// JobActionDelete for jobs to delete some resources
	JobActionDelete

	// JobActionCleanup for jobs to cleanup resources
	JobActionCleanup
)

func (JobAction) String

func (j JobAction) String() string

func (JobAction) Valid added in v0.12.0

func (j JobAction) Valid() bool

Valid returns true if the j is a valid JobAction

type JobEnqueueResult added in v0.12.0

type JobEnqueueResult uint8

JobEnqueueResult

const (
	// JobEnqueueSuccess indicates the job enqueued successfully (with some side effects)
	//
	// Side effects:
	//
	//	- enqueue an "Delete" job cancels "Update" job (also see JobEnqueueCounteractive)
	JobEnqueueSuccess JobEnqueueResult = iota

	// JobEnqueueDup indicates the job to enqueue was discaded due to duplication, it
	// is common for job scheduling and should be considered as success for most cases
	//
	// Dup condition happends if there is job with the same action and key
	JobEnqueueDup

	// JobEnqueueConflict indicates the job to enqueue was rejected because of conflict
	// this result is usually caused by inconsistent application state and should be considered
	// considered unrecoverable
	//
	// Conflict conditions:
	//
	//	- enqueue an "Add" job, but there is an "Update" job
	//	- enqueue an "Update", but there is an "Add" or "Delete" job
	JobEnqueueConflict

	// JobJobEnqueueCounteractive indicates the job to enqueue was canceled as it
	// counteracted with existing job, and that counteracted job also got canceled
	//
	// Counteractive conditions:
	//
	//	- enqueue a "Delete" job, and there is an "Add" job (will cancel that "Add" job)
	JobEnqueueCounteractive

	// JobEnqueueInvalidAction indicates the job to enqueue was rejected because of its
	// invalid JobAction
	JobEnqueueInvalidAction
)

func (JobEnqueueResult) Error added in v0.12.0

func (r JobEnqueueResult) Error() string

func (JobEnqueueResult) String added in v0.12.0

func (r JobEnqueueResult) String() string

type JobQueue

type JobQueue[K comparable] struct {
	// contains filtered or unexported fields
}

JobQueue implements a waiting interruptable blocking queue for job scheduling and deduplication It defines four kinds of job by intention

TODO: refactor it with priority queue (B+Tree)

func NewJobQueue

func NewJobQueue[K comparable]() (q *JobQueue[K])

NewJobQueue creates a paused job queue.

func (*JobQueue[K]) Dequeue added in v0.12.0

func (q *JobQueue[K]) Dequeue() (job Job[K], shouldAcquireMore bool)

Dequeue a job from the queue, it waits until there is a job available or the job queue being paused, shouldAcquireMore indicates whether the queue was paused (true means paused)

func (*JobQueue[K]) Enqueue added in v0.12.0

func (q *JobQueue[K]) Enqueue(job Job[K]) JobEnqueueResult

Enqueue enqueues a job, regardless of the state of the JobQueue.

see JobEnqueueResult for details of the return value.

func (*JobQueue[K]) Find

func (q *JobQueue[K]) Find(key K) (_ Job[K], found bool)

Find a job by the key.

func (*JobQueue[K]) Pause

func (q *JobQueue[K]) Pause()

Pause pauses the Dequeue operations, existing Dequeue() call will return immediately.

func (*JobQueue[K]) Remaining added in v0.12.0

func (q *JobQueue[K]) Remaining(buf []Job[K]) (n int, shortbuf bool)

Remaining copies a list of jobs remaining into buf.

func (*JobQueue[K]) Remove

func (q *JobQueue[K]) Remove(job Job[K]) bool

Remove removes a job from the queue, returns true if there was such job and got removed

func (*JobQueue[K]) Resume

func (q *JobQueue[K]) Resume()

Resume resumes the paused job queue

type SeqDataHandleFunc added in v0.5.4

type SeqDataHandleFunc[T any] func(seq uint64, d T)

type SeqQueue

type SeqQueue[T any] struct {
	// contains filtered or unexported fields
}

SeqQueue is the sequence queue for randomly enqueued ordered data.

func NewSeqQueue

func NewSeqQueue[T any](handleData SeqDataHandleFunc[T]) *SeqQueue[T]

NewSeqQueue returns a empty SeqQueue

func (*SeqQueue[T]) Offer

func (q *SeqQueue[T]) Offer(seq uint64, data T) (complete bool)

Offer an unordered data with its sequence

func (*SeqQueue[T]) Reset

func (q *SeqQueue[T]) Reset()

Reset the SeqQueue for new sequential data

func (*SeqQueue[T]) SetMaxSeq

func (q *SeqQueue[T]) SetMaxSeq(maxSeq uint64) (complete bool)

SetMaxSeq set when should this queue stop enqueuing data

type TimeoutData

type TimeoutData[K comparable, V any] struct {
	Key  K
	Data V
	// contains filtered or unexported fields
}

TimeoutData is the data set used internally

type TimeoutQueue

type TimeoutQueue[K comparable, V any] struct {
	// contains filtered or unexported fields
}

TimeoutQueue to arrange timeout events in a single queue, then you can access them in sequence with channel

func NewTimeoutQueue

func NewTimeoutQueue[K comparable, V any]() *TimeoutQueue[K, V]

NewTimeoutQueue returns an idle TimeoutQueue

func (*TimeoutQueue[K, V]) Allow

func (q *TimeoutQueue[K, V]) Allow(key K)

Allow allow tasks with key, future tasks with the key can be offered

func (*TimeoutQueue[K, V]) Clear

func (q *TimeoutQueue[K, V]) Clear()

Clear out all timeout key-value pairs

func (*TimeoutQueue[K, V]) Find

func (q *TimeoutQueue[K, V]) Find(key K) (_ V, _ bool)

Find timeout key-value pair according to the key

func (*TimeoutQueue[K, V]) Forbid

func (q *TimeoutQueue[K, V]) Forbid(key K)

Forbid forbid tasks with key, future tasks with the key cannot be offered

func (*TimeoutQueue[K, V]) Len

func (q *TimeoutQueue[K, V]) Len() int

Len is used internally for timeout data sort

func (*TimeoutQueue[K, V]) Less

func (q *TimeoutQueue[K, V]) Less(i, j int) bool

Less is used internally for timeout data sort

func (*TimeoutQueue[K, V]) OfferWithDelay

func (q *TimeoutQueue[K, V]) OfferWithDelay(key K, val V, wait time.Duration) error

OfferWithDelay to enqueue key-value pair, timeout after `wait`, if you would like to call Remove to delete the timeout object, `key` must be unique in this queue

func (*TimeoutQueue[K, V]) OfferWithTime

func (q *TimeoutQueue[K, V]) OfferWithTime(key K, val V, at time.Time) error

OfferWithTime to enqueue key-value pair with time, timeout at `time`, if you would like to call Remove to delete the timeout object, `key` must be unique in this queue

func (*TimeoutQueue[K, V]) Remains

func (q *TimeoutQueue[K, V]) Remains() []TimeoutData[K, V]

Remains shows key-value pairs not timed out

func (*TimeoutQueue[K, V]) Remove

func (q *TimeoutQueue[K, V]) Remove(key K) (ret V, _ bool)

Remove a timeout object from the queue according to the key

func (*TimeoutQueue[K, V]) Start

func (q *TimeoutQueue[K, V]) Start(stop <-chan struct{})

Start routine to generate timeout data

func (*TimeoutQueue[K, V]) Swap

func (q *TimeoutQueue[K, V]) Swap(i, j int)

Swap is used internally for timeout data sort

func (*TimeoutQueue[K, V]) TakeCh

func (q *TimeoutQueue[K, V]) TakeCh() <-chan *TimeoutData[K, V]

TakeCh returns the channel from which you can get key-value pairs timed out one by one

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL