redisqueue

package module
v0.0.0-...-d33151c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

README

go-redis-queue

What and why

This is a redis-based queue for usage in Go. I evaluated a lot of other options before writing this, but I really didn't like the API of most of the other options out there, and nearly all of them were missing one or more of my required features.

Features

  1. Ability to add arbitrary tasks to a queue in redis
  2. Automatic dedupping of tasks. Multiple pushes of the exact same payload does not create any additional work.
  3. Ability to schedule tasks in the future.
  4. Atomic Push and Pop from queue. Two workers cannot get the same job.
  5. Sorted FIFO queue.
  6. Can act like a priority queue by scheduling a job with a really old timestamp
  7. Well tested
  8. Small, concise codebase
  9. Simple API

Usage

Adding jobs to a queue.

import "github.com/AgileBits/go-redis-queue/redisqueue"
c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()

q := redisqueue.New("some_queue_name", c)

wasAdded, err := q.Push("basic item")
if err != nil { ... }

queueSize, err := q.Pending()
if err != nil { ... }

wasAdded, err := q.Schedule("scheduled item", time.Now().Add(10*time.Minute))
if err != nil { ... }

A simple worker processing jobs from a queue:

c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()

q := redisqueue.New("some_queue_name", c)

for !timeToQuit {
  job, err = q.Pop()
  if err != nil { ... }
  if job != "" {
    // process the job.
  } else {
    time.Sleep(2*time.Second)
  }
}

A batch worker processing jobs from a queue:

c, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil { ... }
defer c.Close()

q := redisqueue.New("some_queue_name", c)

for !timeToQuit {
  jobs, err := q.PopJobs(100) // argument is "limit"
  if err != nil { ... }
  if len(jobs) > 0 {
    for i, job := range jobs {
      // process the job.
    }
  } else {
    time.Sleep(2*time.Second)
  }
}

Requirements

  • Redis 2.6.0 or greater
  • github.com/garyburd/redigo/redis
  • Go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	Name string
	// contains filtered or unexported fields
}

Queue holds a reference to a redis connection and a queue name.

func New

func New(queueName string, c redis.Conn) *Queue

New defines a new Queue

func (*Queue) FlushQueue

func (q *Queue) FlushQueue() error

FlushQueue removes everything from the queue. Useful for testing.

func (*Queue) Pending

func (q *Queue) Pending() (int64, error)

Pending returns the count of jobs pending, including scheduled jobs that are not due yet.

func (*Queue) Pop

func (q *Queue) Pop() (string, error)

Pop removes and returns a single job from the queue. Safe for concurrent use (multiple goroutines must use their own Queue objects and redis connections)

func (*Queue) PopJobs

func (q *Queue) PopJobs(limit int) ([]string, error)

PopJobs returns multiple jobs from the queue. Safe for concurrent use (multiple goroutines must use their own Queue objects and redis connections)

func (*Queue) Push

func (q *Queue) Push(job string) (bool, error)

Push pushes a single job on to the queue. The job string can be any format, as the queue doesn't really care.

func (*Queue) Schedule

func (q *Queue) Schedule(job string, when time.Time) (bool, error)

Schedule schedule a job at some point in the future, or some point in the past. Scheduling a job far in the past is the same as giving it a high priority, as jobs are popped in order of due date.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL