queue

package
v0.0.0-...-5f18953 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2017 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package queue contain methods and structures for managing of the message queue

Index

Constants

View Source
const (
	ProcessedSuccessful = iota
	ProcessedWithError
	ProcessedWaitNext
	ProcessedKillWorker
)

Results of the execution of the worker

Variables

View Source
var DefaultQueueOptions = Options{
	MinimunWorkersCount:      4,
	MaximumWorkersCount:      32,
	StorageOptions:           nil,
	MaximumMessagesPerWorker: 2048,
	InputTimeOut:             5 * time.Second,
	MaximumMessagesInQueue:   2048,
	MaximumQueueMessagesSize: 16 * 1024 * 1024,
}

DefaultQueueOptions is default options for queue

View Source
var DefaultStorageOptions = StorageOptions{
	MaxDataFileSize:                               0x1FFFFFFF,
	FlushOperations:                               512,
	PercentFreeForRecalculateOnExit:               5,
	PercentFreeForRecalculateOnIncrementIndexFile: 10,
	SkipReturnedRecords:                           true,
	SkipDelayPerTry:                               500,
	CheckCRCOnRead:                                false,
	MaxOneTimeOpenedFiles:                         12,
	DeleteInvalidIndexFile:                        true,
}

DefaultStorageOptions is default options for filestorage

Functions

This section is empty.

Types

type Logging

type Logging interface {
	Trace(msg string, a ...interface{})
	Info(msg string, a ...interface{})
	Warning(msg string, a ...interface{})
	Error(msg string, a ...interface{})
}

Logging is the interface that must support logging system for work with queue

type Options

type Options struct {
	// Options for file storage connected to this queue
	StorageOptions *StorageOptions

	// In the during of timeout, message must be processed or saved to disk
	InputTimeOut time.Duration

	// Maximum size of the messages what can be processed without storing to disk
	MaximumQueueMessagesSize int32

	// Maximum count of the messages what can be processed without storing to disk
	MaximumMessagesInQueue uint16

	// Minimum count of the workers per queue
	MinimunWorkersCount uint16

	// Minimum count of the workers per queue
	MaximumWorkersCount uint16

	// Maximum count of the messages that thw worker can crocess per one time
	MaximumMessagesPerWorker uint16
}

Options holds the optional parameters for the managing of the messages.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a base structure for managing of the messages

func CreateQueue

func CreateQueue(Name, StoragePath string, Log Logging, Factory WorkerFactory, Options *Options) (*Queue, error)

CreateQueue is function than creates and inits internal states :

func (*Queue) Close

func (q *Queue) Close()

Close stops the handler of the messages, saves the messages located in the memory into the disk, closes all opened files.

func (*Queue) Count

func (q *Queue) Count() uint64

Count returns the count of the messages in the queue

func (*Queue) Insert

func (q *Queue) Insert(buf []byte) bool

Insert appends the message into the queue. In depends of the timeout's option either is trying to write message to the disk or is trying to process this message in the memory and writing to the disk only if timeout is expired shortly. Returns false if aren't processing / writing of the message in the during of the timeout or has some problems with writing to disk

func (*Queue) InsertFile

func (q *Queue) InsertFile(fileName string) bool

InsertFile appends file to queue. After processing content of the file if result of the execution of the worker is successful file will deleted.

type QueueItem

type QueueItem struct {
	ID     StorageIdx
	Stream io.ReadSeeker
	// contains filtered or unexported fields
}

QueueItem is elementh of the queue

type QueueWorkerFactory

type QueueWorkerFactory interface {
	CreateWorker() (Worker, error)
	NeedTimeoutProcessing() bool
	CanCreateWorkers() bool
	Close()
}

type StorageIdx

type StorageIdx uint64

StorageIdx is unique identifier of the message in the memory or on the disk

const InvalidIdx StorageIdx = 0xFFFFFFFFFFFFFFFF

InvalidIdx id Invalid index description

type StorageOptions

type StorageOptions struct {
	// maximum size of the storage's data files
	MaxDataFileSize int64

	// Count of the operation with storage when index file will be flushed
	FlushOperations uint32

	// Count of the percents if the free messages before close of the when index file will be reformed
	PercentFreeForRecalculateOnExit uint8

	// Count of the percents if the free messages when index file will be reformed
	PercentFreeForRecalculateOnIncrementIndexFile uint8

	// Depends skip error messages if timeout of the waiting did not finished yet
	SkipReturnedRecords bool

	// Duration of the timeout. Time of the next processing calculated by TimeOfError+CountOfTheErrors*SkipDelayPerTry
	SkipDelayPerTry uint32

	// Depends check crc of the message before sent in to worker
	CheckCRCOnRead bool

	// Count of the one time opened for reading and for writing files. Open files are counting separately
	MaxOneTimeOpenedFiles int16

	// If queue index file is corrupted then will recreate index file and try to restore ,essages information
	DeleteInvalidIndexFile bool
}

StorageOptions holds the optional parameters for the disk storage of the messages.

type Worker

type Worker interface {
	// Processes message that is stored in `*Message`.
	// After it the worker must call function `(*Queue).Process` with his unique identifier
	// and with result of the processing, also must be pushed himself into chanal `Worker`
	ProcessMessage(*QueueItem) int

	// Processing of the event when available messages is absent
	// After it the worker must call function `(*Queue).Process` with his unique identifier and
	// with result of the processing, also must send himself into chanal `Worker`
	ProcessTimeout() int
	// Returns unique identifier of the worker
	GetID() WorkerID
	// Close is called when queue is finishing work with worker. Here you can close connection to database or etc.
	Close()
}

Worker is interface that allow to structure to processing outgoing message

type WorkerFactory

type WorkerFactory interface {
	// Creates new worker for this factory with unique ID
	CreateWorker() (Worker, error)
	// Returns true if possible used some messages in one action (for example,
	// collect large SQL script from lot of the small messages)
	NeedTimeoutProcessing() bool
	CanCreateWorkers() bool
	Close()
}

WorkerFactory is interface for creating new workers

type WorkerID

type WorkerID uint64

WorkerID is an identifier of the worker

Directories

Path Synopsis
internal
mmap
Package mmap allows mapping files into memory.
Package mmap allows mapping files into memory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL