goworker

package module
v0.0.0-...-5c180f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2019 License: MIT Imports: 10 Imported by: 0

README

Build Status GoDoc Go Report Card License

GO Worker

Go worker pool with flexible queue implementations. You can implement your own Queue implementation other than the default MemoryQueue and RedisQueue.

Install

go get github.com/sihendra/goworker

Usage

// Instantiate the worker
queueManager := NewQueueManager(NewMemoryQueueFactory(10))
w := NewWorker(queueManager, 5)
defer w.Stop()

// Register the job handler
job := &dummyJob{}
job2 := &dummyJob2{}        
w.Register(job, "queue1")
w.Register(job2, "queue2")

// Enqueue the item to be processed
w.Enqueue(map[string]interface{}{"id":"123"}, "queue1")
w.Enqueue("some payload", "queue2")

// Start working
w.Start()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	Handle(item QueueItem) error
}

Job define function that will be called when there is queue item to be processed

type Queue

type Queue interface {
	// Push will push the item to the memory queue
	Push(entry []byte, timeout time.Duration) error
	// Pop will fetch one item from queue
	Pop(timeout time.Duration) ([]byte, error)
	// Channel will return channel to get the item
	Channel() chan []byte
	// Acknowledge will notify the queue that the item has been received. Mostly the queue impl will delete the item from the queue.
	Acknowledge(message []byte)
	// Shutdown will stop the queue item background listener
	Shutdown() error
}

Queue define methods that will be used to interact with Queue

func NewMemoryQueue

func NewMemoryQueue(length int) Queue

NewMemoryQueue return In-Memory Queue implementation

func NewRedisQueue

func NewRedisQueue(name string, pool *redis.Pool) Queue

NewRedisQueue will return In-Redis Queue implementation

type QueueFactory

type QueueFactory func(name string) (Queue, error)

QueueFactory define a function to create a new Queue instance. QueueFactory is used mostly by Worker or QueueManager

func NewMemoryQueueFactory

func NewMemoryQueueFactory(length int) QueueFactory

NewMemoryQueueFactory will return In-Memory QueueFactory

func NewRedisQueueFactory

func NewRedisQueueFactory(pool *redis.Pool) QueueFactory

NewRedisQueueFactory will return the In-Redis QueueFactory implementation

type QueueItem

type QueueItem struct {
	QueueName string
	Item      interface{}
}

QueueItem wrap any object before pushing to the Queue. This might contain some important internal metadata

func NewQueueItemFromBytes

func NewQueueItemFromBytes(data []byte) (item *QueueItem, err error)

NewQueueItemFromBytes will convert slice of byte to QeuueItem

func (QueueItem) ToBytes

func (q QueueItem) ToBytes() ([]byte, error)

ToBytes will convert QueueItem to slice of byte

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager manage multiple queues and fetch the item within single channel

func NewQueueManager

func NewQueueManager(factory QueueFactory) *QueueManager

NewQueueManager create new instance of QueueManager

func (*QueueManager) AddQueue

func (q *QueueManager) AddQueue(name string) error

AddQueue register queue to be fetched and listened

func (*QueueManager) Fetch

func (q *QueueManager) Fetch() chan QueueItem

Fetch will return channel to get the queue item from all queue

func (*QueueManager) Push

func (q *QueueManager) Push(queueItem QueueItem) error

Push will push the item to designated queue

type Worker

type Worker interface {
	Register(job Job, queueName string) error
	Enqueue(item interface{}, queueName string) error
	Dispatch(job Job, item interface{}, queueName string) error
	Start() error
	Stop() error
}

Worker interface provides methods to manage and interact with the worker

func NewWorker

func NewWorker(queue *QueueManager, workerCount int) Worker

NewWorker create new instance of Worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL