queue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2021 License: MIT Imports: 5 Imported by: 1

README

Queue

Go Lang GoDoc Build Status Coverage Status Go Report Card Gitter

A Go library for managing queues on top of Redis. It is based on a hiring exercise but later I found it useful for myself in a custom task processing project. I thought it might be useful in general.

Installation

$ go get github.com/kavehmz/queue

Usage

package main

import (
	"fmt"
	"time"

	"github.com/kavehmz/queue"
)

func main() {
	var q queue.Queue
	q.Urls([]string{"redis://localhost:6379"})
	q.AddTask(1, "start")
	q.AddTask(2, "start")
	q.AddTask(1, "stop")
	q.AddTask(2, "stop")
	analyzer := func(id int, task chan string, success chan bool) {
		for {
			select {
			case msg := <-task:
				fmt.Println(id, msg)
				if msg == "stop" {
					success <- true
					return
				}
			case <-time.After(2 * time.Second):
				fmt.Println("no new events for 2 seconds for ID", id)
				success <- false
				return
			}
		}
	}
	exitOnEmpty := func() bool {
		return true
	}
	q.AnalysePool(1, exitOnEmpty, analyzer)
}

Approach

Focus of this design is mainly horizontal scalability via concurrency, partitioning and fault-detection.

Documentation

Overview

Package queue is a simple Queue system written in Go that will uses Redis. Focus of this design is mainly horisontal scalability via concurrency, partitioning and fault-detection Queues can be partitions in to more than one Redis if necessary.

Number of redis partitions is set by using Urls function and setting slice of Redis URL connections. Redis partitioning is required in cases that one redis cannot handle the load because of IO, moemory or in rare situations CPU limitations.

In case of crash record of all incomplete tasks will be kepts in redis as keys with this format

QUEUE::0::PENDING::ID

ID will indicate the ID of failed tasks.

To use this library you need to use queue struct.

var q Queue
q.Urls([]{redis://localhost:6379})

Adding tasks is done by calling AddTask. This function will accept an ID and the task itself that will be as a string.

q.AddTask(1, "task1")
q.AddTask(2, "task2")

ID can be used in a special way. If ID of two tasks are the same while processing AnalysePool will send them to the same analyser goroutine if analyzer waits enough.

q.AddTask(2, "start")
q.AddTask(1, "load")
q.AddTask(2, "load")
q.AddTask(1, "stop")
q.AddTask(2, "stop")

This feature can be used in analyser to process a set of related tasks one after another. If you are adding ID related tasks and you need to spinup more than one AnalysePool to fetch and distribute tasks you need to insert the tasks into separate queue or separate redis servers. To have separate queues you can set Queues number in the queue strcuture.

whichQueue=id % q.Queues

AnalysePool accepts 3 parameters. One analyzerID that will identify which redis pool this AnalysePool will connect to.

whichRedis=(analyzerID/q.Queues) % len(q.urls)

AnalysePool need two closures, analyzer and exitOnEmpty. Format of those closure are as follows.

analyzer := func(id int, task chan string, success chan bool) {
	for {
		select {
		case msg := <-task:
			//process the task
			if msg == "stop_indicator" {
				success <- true
				return
			}
		}
	}
}
exitOnEmpty := func() bool {
	return true
}
q.AnalysePool(1, exitOnEmpty, analyzer)

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// AnalyzeBuff will set number of concurrent running anlyzers. It will default to number of cpu if not set.
	AnalyzerBuff int
	// QueueName this will set the name used in udnerlying system for the queue. Default is "QUEUE"
	QueueName string
	// Number of queues in each redis server. This is useful if you have ID related tasks and you need more than one AnalysePool. Default is 1
	Queues int
	// contains filtered or unexported fields
}

Queue the strcuture that will ecompass the queue settings and methods.

func (*Queue) AddTask

func (q *Queue) AddTask(id int, task string)

AddTask will add a task to the queue. It will accept an ID and a string. If more than one task are added with the same ID, queue will make sure they are send to the same analyser as long as analyers does not return before next ID is poped from the queue.

func (*Queue) AnalysePool

func (q *Queue) AnalysePool(analyzerID int, exitOnEmpty func() bool, analyzer func(int, chan string, chan bool))

AnalysePool can be calls to process redis queue(s). analyzerID will set which redis AnalysePool will connect to (redis:=pool[len(urls)%AnalysePool])

exitOnEmpty is a closure function which will control inner loop of AnalysePool when queue is empty.

exitOnEmpty := func() bool {
	return true
}

analyzer is a closure function which will be called for processing the tasks popped from queue.

analyzer := func(id int, task chan string, success chan bool) {
	for {
		select {
		case msg := <-task:
			if id == 2 {
				time.Sleep(20 * time.Millisecond)
			}
			fmt.Println(id, msg)
			if msg == "stop" {
				success <- true
				return
			}
		case <-time.After(2 * time.Second):
			fmt.Println("no new event for 2 seconds for ID", id)
			success <- false
			return
		}
	}
}

Analyser clousre must be able to accept the new Tasks without delay and if needed process them concurrently. Delay in accepting new Task will block AnalysePool.

Example

This will act both as test and example in documentation

var q Queue
q.Urls([]string{testRedis})
q.pool[0].Do("FLUSHALL")
q.AddTask(1, "start")
q.AddTask(2, "start")
q.AddTask(1, "stop")
q.AddTask(2, "stop")
analyzer := func(id int, msg_channel chan string, success chan bool) {
	for {
		select {
		case msg := <-msg_channel:
			if id == 2 {
				time.Sleep(20 * time.Millisecond)
			}
			fmt.Println(id, msg)
			if msg == "stop" {
				success <- true
				return
			}
		case <-time.After(2 * time.Second):
			fmt.Println("no new event for 2 seconds for ID", id)
			success <- false
			return
		}
	}
}
exitOnEmpty := func() bool {
	return true
}
q.AnalysePool(1, exitOnEmpty, analyzer)
Output:

1 start
1 stop
2 start
2 stop

func (*Queue) Urls

func (q *Queue) Urls(urls []string)

Urls will accept a slice of redis connection URLS. This slice will setup the connections and also set how many redis partitions will be used. Setting more than one redis is useful in some cases that a single redis can't handle a the queue load either because of IO and memory restrictions or if possible CPU.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL