bigqueue

package module
v1.2.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2023 License: Apache-2.0 Imports: 16 Imported by: 3

README

BigQueue-go

BigQueue-go is pure Golang implementation for big, fast and persistent queue based on memory mapped file. Its file storage structures is totally compatible with [BigQueue](https://github.com/bulldog2011/bigqueue)

Go Report Card Build Status codecov Releases Godoc LICENSE

Feature Highlight:

  1. Fast: close to the speed of direct memory access, both enqueue and dequeue are close to O(1) memory access.
  2. Big: the total size of the queue is only limited by the available disk space.
  3. Persistent: all data in the queue is persisted on disk, and is crash resistant.
  4. Reliable: OS will be responsible to presist the produced messages even your process crashes.
  5. Realtime: messages produced by producer threads will be immediately visible to consumer threads.
  6. Memory-efficient: automatic paging & swapping algorithm, only most-recently accessed data is kept in memory.
  7. Thread-safe: multiple threads can concurrently enqueue and dequeue without data corruption.
  8. Simple&Light : pure Golang implements without any 3rd-party library

Quick Start

Installing

To start using BigQueue-Go, install Go and run go get:

$ go get github.com/jhunters/bigqueue

To run testcases:

$ go test -v .

Importing bigqueue

To use bigqueue as an file implements queue, import as:


import	"github.com/jhunters/bigqueue"

func main() {
	var queue = new(bigqueue.FileQueue)

	err := queue.Open(".", "testqueue", nil)

	if err != nil {
		fmt.Println(err)
		return
	}
	defer queue.Close()
	
	data := []byte("hello jhunters")
	
	i, err := queue.Enqueue(data)
	if err != nil {
		fmt.Println(err)
		return
	} else {
		fmt.Println("Enqueued index=", i, string(data))
	}
	
	index, bb, err := queue.Dequeue()
	if err != nil {
		fmt.Println(err)
		return
	}
	
	fmt.Println("Dequeue data:", index, string(bb))
}

Docs

  1. big queue tutorial
  2. fanout queue tutorial

The Big Picture

design

design

design

Benmark test

$ go test -bench . -benchtime=3s -run=^$
goos: linux
goarch: amd64
pkg: github.com/bigqueue
Benchmark_EnqueueOnly-8                  2319403              1479 ns/op
Benchmark_DequeueOnly-8                  4704715               743 ns/op
Benchmark_EnqueueDequeue-8               1536244              2303 ns/op
Benchmark_ParallelEnqueueDequeue-8       1254315              2760 ns/op
PASS
ok      github.com/bigqueue     40.028s

License

BigQueue-Go is Apache 2.0 licensed.

Documentation

Overview

package bigqueue implements is pure Golang implementation for big, fast and persistent queue based on memory mapped file.

  • @Author: Malin Xie
  • @Description:
  • @Date: 2020-08-28 20:06:46

Index

Constants

View Source
const (

	// DefaultDataPageSize data file size
	DefaultDataPageSize = 128 * 1024 * 1024

	// DefaultIndexItemsPerPage items numbers in one page
	DefaultIndexItemsPerPage = 17

	// MaxInt64 max value of int64
	MaxInt64 = 0x7fffffffffffffff
	// IndexFileName file name
	IndexFileName = "index"
	// DataFileName file name
	DataFileName = "data"
	// MetaFileName file name
	MetaFileName = "meta_data"
	// FrontFileName file name
	FrontFileName = "front_index"

	Default_Page_Size = 10
)
View Source
const (
	// FanoutFrontFileName Fanout FrontFileName file name
	FanoutFrontFileName = "front_index_"
)

Variables

View Source
var (
	ErrEnqueueDataNull = errors.New("enqueue data can not be null")

	ErrIndexOutOfBoundTH = errors.New("index is valid which should between tail and head index")

	// SubscribeExistErr repeat call Subscriber method
	ErrSubscribeExistErr = errors.New("Subscriber alread set, can not repeat set")

	// Subscribe should call after queue Open method
	ErrSubscribeFailedNoOpenErr = errors.New("Subscriber method only support after queue opened")
)

These errors can be returned when opening or calling methods on a DB.

View Source
var DefaultOptions = &Options{
	DataPageSize:      DefaultDataPageSize,
	indexPageSize:     defaultIndexPageSize,
	IndexItemsPerPage: DefaultIndexItemsPerPage,
	itemsPerPage:      defaultItemsPerPage,
	AutoGCBySeconds:   0,
}

DefaultOptions default options

Functions

func Assert

func Assert(condition bool, message string, v ...interface{})

Assert assert will panic with a given formatted message if the given condition is false.

func BytesToInt

func BytesToInt(b []byte) int64

BytesToInt byte to int64

func BytesToInt32

func BytesToInt32(b []byte) int32

BytesToInt32 bytes to int32

func GetFileName

func GetFileName(prefix string, suffix string, index int64) string

GetFileName to return joined file name

func GetFiles

func GetFiles(pathname string) (*list.List, error)

GetFiles get all files from current directory. not include any sub directories

func IntToBytes

func IntToBytes(n int64) []byte

IntToBytes int64 to byte array

func Mod

func Mod(val int64, bits int) int64

Mod return

func PathExists

func PathExists(path string) (bool, error)

PathExists to check the target path is exist exist return true otherwise return false

func Printstack added in v1.2.3

func Printstack()

Printstack print call stack trace info

func RemoveFiles

func RemoveFiles(pathname string) error

RemoveFiles remove all files from current directory. not include any sub directories

func Warn added in v1.2.3

func Warn(v ...interface{})

Warn print log to os.Stderr

func Warnf added in v1.2.3

func Warnf(msg string, v ...interface{})

Warnf print log to os.Stderr

Types

type DB

type DB struct {
	// If you want to read the entire database fast, you can set MmapFlag to
	// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
	MmapFlags int

	InitialMmapSize int
	// contains filtered or unexported fields
}

DB represents a collection of buckets persisted to a file on disk. All data access is performed through transactions which can be obtained through the DB. All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.

func (*DB) Close

func (db *DB) Close() error

Close releases all resources.

func (*DB) GoString

func (db *DB) GoString() string

GoString returns the Go string representation of the database.

func (*DB) Open

func (db *DB) Open(mode os.FileMode) error

Open open target db file

func (*DB) Path

func (db *DB) Path() string

Path returns the path to currently open database file.

type DBFactory

type DBFactory struct {
	InitialMmapSize int
	// contains filtered or unexported fields
}

DBFactory is used to manupilate mulitple data files by index number

func (*DBFactory) Close

func (f *DBFactory) Close() error

Close all data files

type FanOutQueue added in v1.0.2

type FanOutQueue interface {
	// Open to open target file if failed error returns
	Open(dir string, queueName string, options *Options) error

	// IsEmpty Determines whether a queue is empty
	//fanoutId queue index
	// return ture if empty, false otherwise
	IsEmpty(fanoutID int64) bool

	// Size return avaiable queue size
	Size(fanoutID int64) int64

	// Enqueue Append an item to the queue and return index no
	// if any error ocurres a non-nil error returned
	Enqueue(data []byte) (int64, error)

	// EnqueueAsync Append an item to the queue async way
	EnqueueAsync(data []byte, fn func(int64, error))

	Dequeue(fanoutID int64) (int64, []byte, error)

	Peek(fanoutID int64) (int64, []byte, error)

	// To skip deqeue target number of items
	Skip(fanoutID int64, count int64) error

	Close() error

	// Set to asynchous subscribe
	Subscribe(fanoutID int64, fn func(int64, []byte, error)) error

	// to free asynchous subscribe
	FreeSubscribe(fanoutID int64)

	// FreeAllSubscribe to free all asynchous subscribe
	FreeAllSubscribe()
}

FanOutQueue queue supports with pub-sub feature

type FileFanoutQueue added in v1.0.2

type FileFanoutQueue struct {
	// contains filtered or unexported fields
}

FileFanoutQueue file fanout queue implements

func (*FileFanoutQueue) Close added in v1.0.2

func (q *FileFanoutQueue) Close()

Close free the resource

func (*FileFanoutQueue) Dequeue added in v1.0.2

func (q *FileFanoutQueue) Dequeue(fanoutID int64) (int64, []byte, error)

Dequeue dequeue data from target fanoutID

func (*FileFanoutQueue) Enqueue added in v1.0.2

func (q *FileFanoutQueue) Enqueue(data []byte) (int64, error)

Enqueue Append an item to the queue and return index no

func (*FileFanoutQueue) FreeAllSubscribe added in v1.0.2

func (q *FileFanoutQueue) FreeAllSubscribe()

FreeAllSubscribe to free all subscriber

func (*FileFanoutQueue) FreeSubscribe added in v1.0.2

func (q *FileFanoutQueue) FreeSubscribe(fanoutID int64)

FreeSubscribe to free subscriber by target fanout id

func (*FileFanoutQueue) IsEmpty added in v1.0.2

func (q *FileFanoutQueue) IsEmpty(fanoutID int64) bool

IsEmpty test if target fanoutID is empty

func (*FileFanoutQueue) Open added in v1.0.2

func (q *FileFanoutQueue) Open(dir string, queueName string, options *Options) error

Open the queue files

func (*FileFanoutQueue) Peek added in v1.0.2

func (q *FileFanoutQueue) Peek(fanoutID int64) (int64, []byte, error)

Peek peek the head item from target fanoutID

func (*FileFanoutQueue) PeekAll added in v1.1.1

func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error)

// PeekAll Retrieves all the items from the front of a queue return array of data and array of index

func (*FileFanoutQueue) PeekPagination added in v1.2.2

func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error)

PeekPagination to peek data from queue by paing feature.

func (*FileFanoutQueue) Size added in v1.0.2

func (q *FileFanoutQueue) Size(fanoutID int64) int64

Size return item size with target fanoutID

func (*FileFanoutQueue) Skip added in v1.0.2

func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error

Skip To skip deqeue target number of items

func (*FileFanoutQueue) Status added in v1.1.3

func (q *FileFanoutQueue) Status(fanoutID int64) *QueueFilesStatus

Status get status info from current queue

func (*FileFanoutQueue) Subscribe added in v1.0.2

func (q *FileFanoutQueue) Subscribe(fanoutID int64, fn func(int64, []byte, error)) error

Subscribe do async subscribe by target fanout id

type FileQueue

type FileQueue struct {
	// contains filtered or unexported fields
}

FileQueue queue implements with mapp file

func (*FileQueue) Close

func (q *FileQueue) Close() error

Close close file queue

func (*FileQueue) Dequeue

func (q *FileQueue) Dequeue() (int64, []byte, error)

Dequeue Retrieves and removes the front of a queue

func (*FileQueue) Enqueue

func (q *FileQueue) Enqueue(data []byte) (int64, error)

Enqueue adds an item at the queue and HeadIndex will increase

func (*FileQueue) EnqueueAsync

func (q *FileQueue) EnqueueAsync(data []byte, fn func(int64, error))

EnqueueAsync adds an item at the queue and HeadIndex will increase Asynchouous mode will call back with fn function

func (*FileQueue) FreeSubscribe added in v1.0.2

func (q *FileQueue) FreeSubscribe()

FreeSubscribe free subscriber

func (*FileQueue) Gc

func (q *FileQueue) Gc() error

Gc Delete all used data files to free disk space.

BigQueue will persist enqueued data in disk files, these data files will remain even after the data in them has been dequeued later, so your application is responsible to periodically call this method to delete all used data files and free disk space.

func (*FileQueue) IsEmpty

func (q *FileQueue) IsEmpty() bool

IsEmpty to determines whether a queue is empty

func (*FileQueue) Open

func (q *FileQueue) Open(dir string, queueName string, options *Options) error

Open the queue files

func (*FileQueue) Peek

func (q *FileQueue) Peek() (int64, []byte, error)

Peek Retrieves the item at the front of a queue if item exist return with index id, item data

func (*FileQueue) PeekAll added in v1.1.1

func (q *FileQueue) PeekAll() ([][]byte, []int64, error)

PeekAll Retrieves all the items from the front of a queue return array of data and array of index

func (*FileQueue) PeekPagination added in v1.2.2

func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error)

PeekPagination to peek data from queue by paing feature.

func (*FileQueue) Size

func (q *FileQueue) Size() int64

Size to return total number of items available in the queue.

func (*FileQueue) Skip

func (q *FileQueue) Skip(count int64) error

Skip the target n items to front index

func (*FileQueue) Status added in v1.1.3

func (q *FileQueue) Status() *QueueFilesStatus

Status get status info from current queue

func (*FileQueue) Subscribe added in v1.0.2

func (q *FileQueue) Subscribe(fn func(int64, []byte, error)) error

Subscribe subscribe a call back function to subscribe message

type IOQueue added in v1.2.3

type IOQueue interface {
	Queue

	// Open queue from file io info
	Open(dir string, queueName string, options *Options) error
}

I/O queue inteface to define the all necessary functions

func NewAndOpenFileQueue added in v1.2.3

func NewAndOpenFileQueue(dir string, queueName string, options *Options) (IOQueue, error)

NewAndOpenFileQueue inital FileQueue and Open by target directory and queue name

type Options

type Options struct {

	// size in bytes of a data page
	DataPageSize int

	// the item count is  1 << IndexItemsPerPage
	IndexItemsPerPage int

	// if value > 0 then enable auto gc features and repeat process gc by the specified interval time in seconds.
	AutoGCBySeconds int
	// contains filtered or unexported fields
}

Options the options struct

type Queue

type Queue interface {
	// Determines whether a queue is empty
	// return ture if empty, false otherwise
	IsEmpty() bool

	// return avaiable queue size
	Size() int64

	// Append an item to the queue and return index no
	// if any error ocurres a non-nil error returned
	Enqueue(data []byte) (int64, error)

	EnqueueAsync(data []byte, fn func(int64, error))

	Dequeue() (int64, []byte, error)

	Peek() (int64, []byte, error)

	// To skip deqeue target number of items
	Skip(count int64) error

	Close() error

	// Delete all used data files to free disk space.
	Gc() error

	// Set to asynchous subscribe
	Subscribe(fn func(int64, []byte, error)) error

	// to free asynchous subscribe
	FreeSubscribe()
}

Queue inteface to define the all necessary functions

type QueueFileInfo added in v1.1.3

type QueueFileInfo struct {
	Name      string
	Path      string
	Size      int64
	FileIndex int64
	CanGC     bool
}

queue file info

type QueueFilesStatus added in v1.1.3

type QueueFilesStatus struct {
	// front index of the big queue,
	FrontIndex int64

	// head index of the array, this is the read write barrier.
	// readers can only read items before this index, and writes can write this index or after
	HeadIndex int64

	// tail index of the array,
	// readers can't read items before this tail
	TailIndex int64

	// head index of the data page, this is the to be appended data page index
	HeadDataPageIndex int64

	// head offset of the data page, this is the to be appended data offset
	HeadDataItemOffset int64

	IndexFileList []*QueueFileInfo
	DataFileList  []*QueueFileInfo
	MetaFileInfo  *QueueFileInfo
	FrontFileInfo *QueueFileInfo
}

status info of queue files

type QueueFront added in v1.0.2

type QueueFront struct {
	// contains filtered or unexported fields
}

QueueFront queue front struct

type RemoteQueue added in v1.2.3

type RemoteQueue interface {
	Queue

	// Open queue from remote server
	Open(serverUrl string, queueName string)
}

RemoteQueue remote server queue inteface to define the all necessary functions

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL