queue

package module
v0.0.0-...-9f0a137 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2018 License: MIT Imports: 21 Imported by: 0

README

Go Queue

Multi backend queues for Golang

codecov Codacy BadgeGo Report Card

Install

go get -u github.com/nickalie/go-queue

Example of usage

package main

import (
	"fmt"
	"github.com/nickalie/go-queue"
	"time"
)

func main() {
	for i := 0; i < 5; i++ {
		go consumer(i + 1)
	}

	producer()
}

func producer() {
	i := 0
	for {
		i++
		queue.Put("messages", fmt.Sprintf("message %d", i))
		time.Sleep(time.Second)
	}
}

func consumer(index int) {
	for {
		var message string
		queue.Get("messages", &message)

		fmt.Printf("Consumer %d got a message: %s\n", index, message)
		time.Sleep(2 * time.Second)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Get

func Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func Put

func Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func RemoveQueue

func RemoveQueue(queueName string) error

func Use

func Use(value Backend)

Use sets backend to manage queues. ChannelBackend is default.

Types

type AMQPBackend

type AMQPBackend struct {
	// contains filtered or unexported fields
}

AMQPBackend provides AMQP-based backend to manage queues. https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol Suitable for multi-host, multi-process and multithreaded environment

func NewAMQPBackend

func NewAMQPBackend(url string) (*AMQPBackend, error)

NewAMQPBackend creates new AMQPBackend

func (*AMQPBackend) Codec

func (b *AMQPBackend) Codec(c Codec) *AMQPBackend

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*AMQPBackend) Get

func (b *AMQPBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*AMQPBackend) Put

func (b *AMQPBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*AMQPBackend) RemoveQueue

func (b *AMQPBackend) RemoveQueue(queueName string) error

type Backend

type Backend interface {
	Put(queueName string, value interface{}) error
	Get(queueName string, value interface{}) error
	RemoveQueue(queueName string) error
}

Backend defines interface to manage queues. ChannelBackend is default.

type BeanstalkBackend

type BeanstalkBackend struct {
	// contains filtered or unexported fields
}

BeanstalkBackend provides beanstalk-based backend to manage queues. Suitable for multi-host, multi-process and multithreaded environment

func NewBeanstalkBackend

func NewBeanstalkBackend(addr string) (*BeanstalkBackend, error)

NewBeanstalkBackend creates new BeanstalkBackend

func (*BeanstalkBackend) Codec

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*BeanstalkBackend) Get

func (b *BeanstalkBackend) Get(queueName string, value interface{}) error

Get removes the first element from a queue and put it in the value pointed to by value

func (*BeanstalkBackend) Put

func (b *BeanstalkBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*BeanstalkBackend) RemoveQueue

func (b *BeanstalkBackend) RemoveQueue(queueName string) error

type BuntBackend

type BuntBackend struct {
	// contains filtered or unexported fields
}

BuntBackend provides BuntDB-based backend to manage queues. https://github.com/tidwall/buntdb Suitable for multithreaded single process environment

func NewBuntBackend

func NewBuntBackend(path string) (*BuntBackend, error)

NewBuntBackend creates new BuntBackend.

func NewBuntBackendFromDB

func NewBuntBackendFromDB(db *buntdb.DB) *BuntBackend

NewBuntBackendFromDB creates new BuntBackend from bunt.DB object.

func (*BuntBackend) Close

func (b *BuntBackend) Close() error

Close closes buntdb

func (*BuntBackend) Codec

func (b *BuntBackend) Codec(c Codec) *BuntBackend

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*BuntBackend) Get

func (b *BuntBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*BuntBackend) Interval

func (b *BuntBackend) Interval(interval time.Duration) *BuntBackend

Interval sets interval to poll new queue element. Default value is one second.

func (*BuntBackend) Put

func (b *BuntBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*BuntBackend) RemoveQueue

func (b *BuntBackend) RemoveQueue(queueName string) error

func (*BuntBackend) TTL

func (b *BuntBackend) TTL(ttl time.Duration) *BuntBackend

TTL sets message expiration time. Default is 0 - message will never expire

type ChannelBackend

type ChannelBackend struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

ChannelBackend uses go channels to manage queues Suitable for multithreaded single process environment

func NewChannelBackend

func NewChannelBackend() *ChannelBackend

NewChannelBackend creates new channels backend

func (*ChannelBackend) Buffer

func (b *ChannelBackend) Buffer(buffer int) *ChannelBackend

Buffer sets default buffer for new channels created by the backend. Default value is 1000.

func (*ChannelBackend) Channels

func (b *ChannelBackend) Channels(channels map[string]chan interface{}) *ChannelBackend

Channels sets initial channels (queues). Key - queue name, value - go channel

func (*ChannelBackend) Get

func (b *ChannelBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*ChannelBackend) Put

func (b *ChannelBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*ChannelBackend) RemoveQueue

func (b *ChannelBackend) RemoveQueue(queueName string) error

type Codec

type Codec interface {
	Marshal(v interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
}

Codec define interface for codecs to encode/decode objects in queues

type FSBackend

type FSBackend struct {
	// contains filtered or unexported fields
}

FSBackend uses file system to manage queues. Suitable for multithreaded and multi-process environments.

func NewFSBackend

func NewFSBackend(path string) (*FSBackend, error)

NewFSBackend creates new FSBackend.

func (*FSBackend) Codec

func (b *FSBackend) Codec(c Codec) *FSBackend

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*FSBackend) Get

func (b *FSBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*FSBackend) Interval

func (b *FSBackend) Interval(interval time.Duration) *FSBackend

Interval sets interval to poll new queue element. Default value is one second.

func (*FSBackend) Put

func (b *FSBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*FSBackend) RemoveQueue

func (b *FSBackend) RemoveQueue(queueName string) error

type GOBCodec

type GOBCodec byte

GOBCodec provides GOB based Codec

func NewGOBCodec

func NewGOBCodec() *GOBCodec

NewGOBCodec creates new GOBCodec

func (*GOBCodec) Marshal

func (c *GOBCodec) Marshal(v interface{}) ([]byte, error)

Marshal returns the GOB encoding of v.

func (*GOBCodec) Unmarshal

func (c *GOBCodec) Unmarshal(data []byte, v interface{}) error

Unmarshal parses the GOB-encoded data and stores the result in the value pointed to by v.

type JSONCodec

type JSONCodec byte

JSONCodec provides JSON based Codec

func NewJSONCodec

func NewJSONCodec() *JSONCodec

NewJSONCodec creates new JSONCodec

func (*JSONCodec) Marshal

func (c *JSONCodec) Marshal(v interface{}) ([]byte, error)

Marshal returns the JSON encoding of v.

func (*JSONCodec) Unmarshal

func (c *JSONCodec) Unmarshal(data []byte, v interface{}) error

Unmarshal parses the JSON-encoded data and stores the result in the value pointed to by v.

type RedisBackend

type RedisBackend struct {
	// contains filtered or unexported fields
}

RedisBackend provides redis-based backend to manage queues. Suitable for multi-host, multi-process and multithreaded environment

func NewRedisBackend

func NewRedisBackend(redisURL string) (*RedisBackend, error)

NewRedisBackend creates new RedisBackend

func NewRedisBackendWithPool

func NewRedisBackendWithPool(pool *redis.Pool) *RedisBackend

NewRedisBackendWithPool creates new RedisBackend with provided redis.Pool

func (*RedisBackend) Codec

func (b *RedisBackend) Codec(c Codec) *RedisBackend

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*RedisBackend) Get

func (b *RedisBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*RedisBackend) Put

func (b *RedisBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*RedisBackend) RemoveQueue

func (b *RedisBackend) RemoveQueue(queueName string) error

type StompBackend

type StompBackend struct {
	// contains filtered or unexported fields
}

StompBackend provides stomp-based backend to manage queues. Suitable for multi-host, multi-process and multithreaded environment

func NewStompBackend

func NewStompBackend(addr string, opts ...func(*stomp.Conn) error) (*StompBackend, error)

NewStompBackend creates new RedisBackend

func (*StompBackend) Codec

func (b *StompBackend) Codec(c Codec) *StompBackend

Codec sets codec to encode/decode objects in queues. GOBCodec is default.

func (*StompBackend) Get

func (b *StompBackend) Get(queueName string, v interface{}) error

Get removes the first element from a queue and put it in the value pointed to by v

func (*StompBackend) Put

func (b *StompBackend) Put(queueName string, value interface{}) error

Put adds value to the end of a queue.

func (*StompBackend) RemoveQueue

func (b *StompBackend) RemoveQueue(queueName string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL