queue

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: Apache-2.0 Imports: 10 Imported by: 2

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrOutOfCapacity = queue.ErrOutOfCapacity

ErrOutOfCapacity 超过容量

Functions

This section is empty.

Types

type BlockingQueue

type BlockingQueue[T any] interface {
	// Enqueue 将元素放入队列。如果在 ctx 超时之前,队列有空闲位置,那么元素会被放入队列;
	// 否则返回 error。
	// 在超时或者调用者主动 cancel 的情况下,所有的实现都必须返回 ctx。
	// 调用者可以通过检查 error 是否为 context.DeadlineExceeded
	// 或者 context.Canceled 来判断入队失败的原因
	// 注意,调用者必须使用 errors.Is 来判断,而不能直接使用 ==
	Enqueue(ctx context.Context, t T) error
	// Dequeue 从队首获得一个元素
	// 如果在 ctx 超时之前,队列中有元素,那么会返回队首的元素,否则返回 error。
	// 在超时或者调用者主动 cancel 的情况下,所有的实现都必须返回 ctx。
	// 调用者可以通过检查 error 是否为 context.DeadlineExceeded
	// 或者 context.Canceled 来判断入队失败的原因
	// 注意,调用者必须使用 errors.Is 来判断,而不能直接使用 ==
	Dequeue(ctx context.Context) (T, error)
}

BlockingQueue 阻塞队列 参考 Queue 普通队列 一个阻塞队列是否遵循 FIFO 取决于具体实现

type ConcurrentArrayBlockingQueue

type ConcurrentArrayBlockingQueue[T any] struct {
	// contains filtered or unexported fields
}

ConcurrentArrayBlockingQueue 有界并发阻塞队列

func NewConcurrentArrayBlockingQueue

func NewConcurrentArrayBlockingQueue[T any](capacity int) *ConcurrentArrayBlockingQueue[T]

NewConcurrentArrayBlockingQueue 创建一个有界阻塞队列 容量会在最开始的时候就初始化好 capacity 必须为正数

Example
q := NewConcurrentArrayBlockingQueue[int](10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_ = q.Enqueue(ctx, 22)
val, err := q.Dequeue(ctx)
// 这是例子,实际中你不需要写得那么复杂
switch err {
case context.Canceled:
	// 有人主动取消了,即调用了 cancel 方法。在这个例子里不会出现这个情况
case context.DeadlineExceeded:
	// 超时了
case nil:
	fmt.Println(val)
default:
	// 其它乱七八糟的
}
Output:

22

func (*ConcurrentArrayBlockingQueue[T]) AsSlice

func (c *ConcurrentArrayBlockingQueue[T]) AsSlice() []T

func (*ConcurrentArrayBlockingQueue[T]) Dequeue

func (c *ConcurrentArrayBlockingQueue[T]) Dequeue(ctx context.Context) (T, error)

Dequeue 出队 通过sema来控制容量、超时、阻塞问题

func (*ConcurrentArrayBlockingQueue[T]) Enqueue

func (c *ConcurrentArrayBlockingQueue[T]) Enqueue(ctx context.Context, t T) error

Enqueue 入队 通过sema来控制容量、超时、阻塞问题

func (*ConcurrentArrayBlockingQueue[T]) Len

func (c *ConcurrentArrayBlockingQueue[T]) Len() int

type ConcurrentLinkedBlockingQueue

type ConcurrentLinkedBlockingQueue[T any] struct {
	// contains filtered or unexported fields
}

ConcurrentLinkedBlockingQueue 基于链表的并发阻塞队列 如果 maxSize 是正数。那么就是有界并发阻塞队列 如果不是,就是无界并发阻塞队列, 在这种情况下,入队永远能够成功

func NewConcurrentLinkedBlockingQueue

func NewConcurrentLinkedBlockingQueue[T any](capacity int) *ConcurrentLinkedBlockingQueue[T]

NewConcurrentLinkedBlockingQueue 创建链式阻塞队列 capacity <= 0 时,为无界队列

Example
// 创建一个容量为 10 的有界并发阻塞队列,如果传入 0 或者负数,那么创建的是无界并发阻塞队列
q := NewConcurrentLinkedBlockingQueue[int](10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_ = q.Enqueue(ctx, 22)
val, err := q.Dequeue(ctx)
// 这是例子,实际中你不需要写得那么复杂
switch err {
case context.Canceled:
	// 有人主动取消了,即调用了 cancel 方法。在这个例子里不会出现这个情况
case context.DeadlineExceeded:
	// 超时了
case nil:
	fmt.Println(val)
default:
	// 其它乱七八糟的
}
Output:

22

func (*ConcurrentLinkedBlockingQueue[T]) AsSlice

func (c *ConcurrentLinkedBlockingQueue[T]) AsSlice() []T

func (*ConcurrentLinkedBlockingQueue[T]) Dequeue

func (c *ConcurrentLinkedBlockingQueue[T]) Dequeue(ctx context.Context) (T, error)

Dequeue 出队 注意:目前我们已经通过broadcast实现了超时控制

func (*ConcurrentLinkedBlockingQueue[T]) Enqueue

func (c *ConcurrentLinkedBlockingQueue[T]) Enqueue(ctx context.Context, t T) error

Enqueue 入队 注意:目前我们已经通过broadcast实现了超时控制

func (*ConcurrentLinkedBlockingQueue[T]) Len

func (c *ConcurrentLinkedBlockingQueue[T]) Len() int

type ConcurrentLinkedQueue

type ConcurrentLinkedQueue[T any] struct {
	// contains filtered or unexported fields
}

ConcurrentLinkedQueue 无界并发安全队列

func NewConcurrentLinkedQueue

func NewConcurrentLinkedQueue[T any]() *ConcurrentLinkedQueue[T]
Example
q := NewConcurrentLinkedQueue[int]()
_ = q.Enqueue(10)
val, err := q.Dequeue()
if err != nil {
	// 一般意味着队列为空
	fmt.Println(err)
}
fmt.Println(val)
Output:

10

func (*ConcurrentLinkedQueue[T]) Dequeue

func (c *ConcurrentLinkedQueue[T]) Dequeue() (T, error)

func (*ConcurrentLinkedQueue[T]) Enqueue

func (c *ConcurrentLinkedQueue[T]) Enqueue(t T) error

type ConcurrentPriorityQueue

type ConcurrentPriorityQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewConcurrentPriorityQueue

func NewConcurrentPriorityQueue[T any](capacity int, compare ekit.Comparator[T]) *ConcurrentPriorityQueue[T]

NewConcurrentPriorityQueue 创建优先队列 capacity <= 0 时,为无界队列

Example
q := NewConcurrentPriorityQueue[int](10, ekit.ComparatorRealNumber[int])
_ = q.Enqueue(3)
_ = q.Enqueue(2)
_ = q.Enqueue(1)
var vals []int
val, _ := q.Dequeue()
vals = append(vals, val)
val, _ = q.Dequeue()
vals = append(vals, val)
val, _ = q.Dequeue()
vals = append(vals, val)
fmt.Println(vals)
Output:

[1 2 3]

func (*ConcurrentPriorityQueue[T]) Cap

func (c *ConcurrentPriorityQueue[T]) Cap() int

Cap 无界队列返回0,有界队列返回创建队列时设置的值

func (*ConcurrentPriorityQueue[T]) Dequeue

func (c *ConcurrentPriorityQueue[T]) Dequeue() (T, error)

func (*ConcurrentPriorityQueue[T]) Enqueue

func (c *ConcurrentPriorityQueue[T]) Enqueue(t T) error

func (*ConcurrentPriorityQueue[T]) Len

func (c *ConcurrentPriorityQueue[T]) Len() int

func (*ConcurrentPriorityQueue[T]) Peek

func (c *ConcurrentPriorityQueue[T]) Peek() (T, error)

type DelayQueue

type DelayQueue[T Delayable] struct {
	// contains filtered or unexported fields
}

DelayQueue 延时队列 每次出队的元素必然都是已经到期的元素,即 Delay() 返回的值小于等于 0 延时队列本身对时间的精确度并不是很高,其时间精确度主要取决于 time.Timer 所以如果你需要极度精确的延时队列,那么这个结构并不太适合你。 但是如果你能够容忍至多在毫秒级的误差,那么这个结构还是可以使用的

func NewDelayQueue

func NewDelayQueue[T Delayable](c int) *DelayQueue[T]
Example
q := NewDelayQueue[delayElem](10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
now := time.Now()
_ = q.Enqueue(ctx, delayElem{
	// 3 秒后过期
	deadline: now.Add(time.Second * 3),
	val:      3,
})

_ = q.Enqueue(ctx, delayElem{
	// 2 秒后过期
	deadline: now.Add(time.Second * 2),
	val:      2,
})

_ = q.Enqueue(ctx, delayElem{
	// 1 秒后过期
	deadline: now.Add(time.Second * 1),
	val:      1,
})

var vals []int
val, _ := q.Dequeue(ctx)
vals = append(vals, val.val)
val, _ = q.Dequeue(ctx)
vals = append(vals, val.val)
val, _ = q.Dequeue(ctx)
vals = append(vals, val.val)
fmt.Println(vals)
duration := time.Since(now)
if duration > time.Second*3 {
	fmt.Println("delay!")
}
Output:

[1 2 3]
delay!

func (*DelayQueue[T]) Dequeue

func (d *DelayQueue[T]) Dequeue(ctx context.Context) (T, error)

func (*DelayQueue[T]) Enqueue

func (d *DelayQueue[T]) Enqueue(ctx context.Context, t T) error

type Delayable

type Delayable interface {
	Delay() time.Duration
}

type PriorityQueue added in v0.0.9

type PriorityQueue[T any] struct {
	// contains filtered or unexported fields
}

func NewPriorityQueue added in v0.0.9

func NewPriorityQueue[T any](capacity int, compare ekit.Comparator[T]) *PriorityQueue[T]
Example
package main

import (
	"fmt"

	"github.com/ecodeclub/ekit"
	"github.com/ecodeclub/ekit/internal/queue"
)

func main() {
	// 容量,并且队列里面放的是 int
	pq := queue.NewPriorityQueue(10, ekit.ComparatorRealNumber[int])
	_ = pq.Enqueue(10)
	_ = pq.Enqueue(9)
	val, _ := pq.Dequeue()
	fmt.Println(val)
}
Output:

9

func (*PriorityQueue[T]) Dequeue added in v0.0.9

func (pq *PriorityQueue[T]) Dequeue() (T, error)

func (*PriorityQueue[T]) Enqueue added in v0.0.9

func (pq *PriorityQueue[T]) Enqueue(t T) error

func (*PriorityQueue[T]) Len added in v0.0.9

func (pq *PriorityQueue[T]) Len() int

func (*PriorityQueue[T]) Peek added in v0.0.9

func (pq *PriorityQueue[T]) Peek() (T, error)

type Queue

type Queue[T any] interface {
	// Enqueue 将元素放入队列,如果此时队列已经满了,那么返回错误
	Enqueue(t T) error
	// Dequeue 从队首获得一个元素
	// 如果此时队列里面没有元素,那么返回错误
	Dequeue() (T, error)
}

Queue 普通队列 参考 BlockingQueue 阻塞队列 一个队列是否遵循 FIFO 取决于具体实现

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL