blockingQueues

package module
v0.0.0-...-26531ad Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2017 License: MIT Imports: 6 Imported by: 0

README

Blocking Queues

GoDoc License

Blocking Queues provides some simple, performant, goroutine safe queues useful as resource pools or job queues. The primary focus is simplicity and high performance without sacrificing readability. In fact I tried to provide good documentation on the code and some examples of usage.

Queues Provided

  • ArrayBlockingQueue: A bounded blocking queue backed by a slice
  • LinkedBlockingQueue: A bounded blocking queue backed by a container/list
  • ConcurrentRingBuffer: A bounded lock-free queue backed by a slice

Installation

go get -u github.com/theodesp/blockingQueues

Usage

Non blocking api

queue, _ := NewArrayBlockingQueue(2)
res, _ := queue.Push(1)
res, _ := queue.Push(2)
res, err := queue.Push(3) // err is not Nil as queue is full
res, err := queue.Pop()
res, err := queue.Pop()
res, err := queue.Pop() // err is not Nil as queue is empty

Blocking api

queue, _ := NewArrayBlockingQueue(2)
res, _ := queue.Put(1)
res, _ := queue.Put(2)
res, err := queue.Put(3) // Will block the current goroutine

// In another goroutine
res, err := queue.Get() // Will unblock the first goroutine and add the last item
res, err := queue.Get()
res, err := queue.Get()
res, err := queue.Get() // Will block the current goroutine

Full API Documentation: https://godoc.org/github.com/theodesp/blockingQueues

Benchmarks

Using:

Model Name:	MacBook Pro
Model Identifier:	MacBookPro12,1
Processor Name:	Intel Core i7
Processor Speed:	3.1 GHz
Number of Processors:	1
Total Number of Cores:	2
L2 Cache (per Core):	256 KB
L3 Cache:	4 MB
Memory:	16 GB
ArrayBlockingQueue

Simple operations - no goroutines

ArrayBlockingQueueSuite.BenchmarkPeek     100000000               21.0 ns/op
ArrayBlockingQueueSuite.BenchmarkPop      100000000               20.7 ns/op
ArrayBlockingQueueSuite.BenchmarkPopOverflow       100000000               20.8 ns/op
ArrayBlockingQueueSuite.BenchmarkPush      50000000                38.9 ns/op
ArrayBlockingQueueSuite.BenchmarkPushOverflow      50000000                39.0 ns/op

Multiple Goroutines - Different ratio of readers/writers - Size of Queue is 1024 items

ArrayBlockingQueueSuite.BenchmarkPut1to1   10000000               169 ns/op
ArrayBlockingQueueSuite.BenchmarkPut2to2    5000000               508 ns/op
ArrayBlockingQueueSuite.BenchmarkPut4to4    1000000              1222 ns/op

The last test is slower as the number of goroutines are double of the available logic cores and it is expected to go slower because of the context switching.

ArrayBlockingQueueSuite.BenchmarkPut1to3   5000000               837 ns/op
ArrayBlockingQueueSuite.BenchmarkPut1to4   1000000              1126 ns/op
ArrayBlockingQueueSuite.BenchmarkPut2to1   5000000               476 ns/op
ArrayBlockingQueueSuite.BenchmarkPut2to3   2000000               799 ns/op
ArrayBlockingQueueSuite.BenchmarkPut3to2   2000000               816 ns/op
ArrayBlockingQueueSuite.BenchmarkPut4to1   1000000              1239 ns/op

Having a different ratio of readers and writers introduce the same amount of latency.

LinkedBlockingQueue
LinkedBlockingQueueSuite.BenchmarkPeek       100000000               21.4 ns/op
LinkedBlockingQueueSuite.BenchmarkPop100000000               24.4 ns/op
LinkedBlockingQueueSuite.BenchmarkPopOverflow        100000000               23.4 ns/op
LinkedBlockingQueueSuite.BenchmarkPush       50000000                47.3 ns/op
LinkedBlockingQueueSuite.BenchmarkPushOverflow       50000000                42.1 ns/op
LinkedBlockingQueueSuite.BenchmarkPut1to1    10000000               246 ns/op
LinkedBlockingQueueSuite.BenchmarkPut1to3     2000000               930 ns/op
LinkedBlockingQueueSuite.BenchmarkPut1to4     1000000              1496 ns/op
LinkedBlockingQueueSuite.BenchmarkPut2to1     5000000               578 ns/op
LinkedBlockingQueueSuite.BenchmarkPut2to2     5000000               560 ns/op
LinkedBlockingQueueSuite.BenchmarkPut2to3     2000000              1053 ns/op
LinkedBlockingQueueSuite.BenchmarkPut3to2     2000000              1041 ns/op
LinkedBlockingQueueSuite.BenchmarkPut4to1     1000000              1488 ns/op
LinkedBlockingQueueSuite.BenchmarkPut4to4     1000000              1451 ns/op
ConcurrentRingBuffer

Test

ConcurrentRingBufferSuite.BenchmarkRingBuffer1to1        20000000                85.7 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer1to3         1000000              2793 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer1to4          500000              5501 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer2to1         5000000               465 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer2to2         5000000               474 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer2to3         1000000              2640 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer3to2         1000000              2766 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer4to1         1000000              5411 ns/op
ConcurrentRingBufferSuite.BenchmarkRingBuffer4to4          500000              5370 ns/op

LICENCE

Copyright © 2017 Theo Despoudis MIT license

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorCapacity = errors.New("ERROR_CAPACITY: attempt to Create Queue with invalid Capacity")
View Source
var ErrorEmpty = errors.New("ERROR_EMPTY: attempt to Get while Queue is Empty")
View Source
var ErrorFull = errors.New("ERROR_FULL: attempt to Put while Queue is Full")

Functions

This section is empty.

Types

type AbstractCollectionBase

type AbstractCollectionBase interface {
	Size() uint64
	Capacity() uint64
	IsEmpty() bool
	Clear()
}

type ArrayStore

type ArrayStore struct {
	// contains filtered or unexported fields
}

func NewArrayStore

func NewArrayStore(size uint64) *ArrayStore

func (*ArrayStore) Get

func (s *ArrayStore) Get(pos uint64) interface{}

func (*ArrayStore) Remove

func (s *ArrayStore) Remove(pos uint64) interface{}

func (*ArrayStore) Set

func (s *ArrayStore) Set(value interface{}, pos uint64)

func (ArrayStore) Size

func (s ArrayStore) Size() uint64

type BlockingQueue

type BlockingQueue struct {
	// contains filtered or unexported fields
}

func NewArrayBlockingQueue

func NewArrayBlockingQueue(capacity uint64) (*BlockingQueue, error)

Creates an BlockingQueue backed by an Array with the given (fixed) capacity returns an error if the capacity is less than 1

func NewLinkedBlockingQueue

func NewLinkedBlockingQueue(capacity uint64) (*BlockingQueue, error)

Creates an BlockingQueue backed by an LinkedList with the given (fixed) capacity returns an error if the capacity is less than 1

func (*BlockingQueue) Capacity

func (q *BlockingQueue) Capacity() uint64

Capacity returns this current elements remaining capacity, is concurrent safe

func (*BlockingQueue) Clear

func (q *BlockingQueue) Clear()

Clears all the queues elements, cleans up, signals waiters for queue is empty

func (*BlockingQueue) Get

func (q *BlockingQueue) Get() (interface{}, error)

Takes an element from the head of the queue. It blocks the current goroutine if the queue is Empty until notified

func (BlockingQueue) IsEmpty

func (q BlockingQueue) IsEmpty() bool

func (*BlockingQueue) Offer

func (q *BlockingQueue) Offer(item interface{}) (res bool)

Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full. Does not block the current goroutine

func (BlockingQueue) Peek

func (q BlockingQueue) Peek() interface{}

Just attempts to return the tail element of the queue

func (*BlockingQueue) Pop

func (q *BlockingQueue) Pop() (res interface{}, err error)

Pops an element from the head of the queue. Does not block the current goroutine

func (*BlockingQueue) Push

func (q *BlockingQueue) Push(item interface{}) (bool, error)

Pushes the specified element at the tail of the queue. Does not block the current goroutine

func (*BlockingQueue) Put

func (q *BlockingQueue) Put(item interface{}) (bool, error)

Puts an element to the tail of the queue. It blocks the current goroutine if the queue is Full until notified

func (*BlockingQueue) Size

func (q *BlockingQueue) Size() uint64

Size returns this current elements size, is concurrent safe

type ConcurrentRingBuffer

type ConcurrentRingBuffer struct {
	// contains filtered or unexported fields
}

func NewConcurrentRingBuffer

func NewConcurrentRingBuffer(capacity uint64) *ConcurrentRingBuffer

func (*ConcurrentRingBuffer) Get

func (q *ConcurrentRingBuffer) Get() (interface{}, error)

func (*ConcurrentRingBuffer) Put

func (q *ConcurrentRingBuffer) Put(value interface{}) (bool, error)

type Interface

type Interface interface {
	AbstractCollectionBase

	Push(item interface{}) (bool, error)
	Pop() (interface{}, error)

	Get() (interface{}, error)
	Put(item interface{}) (bool, error)
	Offer(item interface{}) bool

	Peek() interface{}
}

All Queues must implement this interface

type LinkedListStore

type LinkedListStore struct {
	// contains filtered or unexported fields
}

func NewLinkedListStore

func NewLinkedListStore(capacity uint64) *LinkedListStore

func (*LinkedListStore) Get

func (s *LinkedListStore) Get(pos uint64) interface{}

func (*LinkedListStore) Remove

func (s *LinkedListStore) Remove(pos uint64) interface{}

func (*LinkedListStore) Set

func (s *LinkedListStore) Set(value interface{}, pos uint64)

func (LinkedListStore) Size

func (s LinkedListStore) Size() uint64

type QueueStore

type QueueStore interface {
	Set(value interface{}, pos uint64)
	Remove(pos uint64) interface{}
	Get(pos uint64) interface{}
	Size() uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL