jobqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2023 License: MIT Imports: 1 Imported by: 1

README

kyoo: A Go library providing an unlimited job queue and concurrent worker pools

Build Coverage Go Report Card

About

kyoo is the phonetic transcription of the word queue. It provides a job queue that can hold as much jobs as resources are available on the running system.

The queue has the following characteristics:

  • No limit of jobs to be queued (only limited by system resources = memory)
  • Concurrent processing of jobs using worker pools
  • When stopping queue, pending jobs are still processed

The library contains a simple Job interface and a simple FuncExecutorJob that just executes a given function and implements that interface. With that nearly all kinds of workloads should be processable already but of course it is possible to add custom implementations of the Job interface.

Possible use cases for the library are:

  • Consumers for message queues like RabbitMQ or Amazon SQS
  • Processing web server requests offloading time extensive work into background jobs
  • All kinds of backend processing jobs like image optimization, etc.

Example

The following example shows a simple http server offloading jobs to the jobqueue that is constantly processed in the background.

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"time"

	jobqueue "github.com/dirkaholic/kyoo"
	"github.com/dirkaholic/kyoo/job"
)

var queue *jobqueue.JobQueue

func handler(w http.ResponseWriter, r *http.Request) {
	queue.Submit(&job.FuncExecutorJob{Func: func() error {
		return doTheHeavyBackgroundWork(r.URL.Path)
	}})
	fmt.Printf("%s - submitted %s !!\n", time.Now().String(), r.URL.Path)

	fmt.Fprint(w, "Job added to queue.")
}

func main() {
	queue = jobqueue.NewJobQueue(runtime.NumCPU() * 2)
	queue.Start()

	http.HandleFunc("/", handler)
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func doTheHeavyBackgroundWork(path string) error {
	time.Sleep(2 * time.Second)
	fmt.Printf("%s - processed %s !!\n", time.Now().String(), path)
	return nil
}

Test the offloading by sending a bunch of http requests to the server

$ for i in {1..10}; do http http://127.0.0.1:8080/test/$i; done

The output on http server side should be similar like this

2020-01-09 21:36:36.156277 +0100 CET m=+5.733617272 - submitted /test/1 !!
2020-01-09 21:36:36.443521 +0100 CET m=+6.020861136 - submitted /test/2 !!
2020-01-09 21:36:36.730535 +0100 CET m=+6.307874793 - submitted /test/3 !!
2020-01-09 21:36:37.021405 +0100 CET m=+6.598744533 - submitted /test/4 !!
2020-01-09 21:36:37.311973 +0100 CET m=+6.889312431 - submitted /test/5 !!
2020-01-09 21:36:37.609868 +0100 CET m=+7.187208115 - submitted /test/6 !!
2020-01-09 21:36:37.895222 +0100 CET m=+7.472561850 - submitted /test/7 !!
2020-01-09 21:36:38.160524 +0100 CET m=+7.737863891 - processed /test/1 !!
2020-01-09 21:36:38.171491 +0100 CET m=+7.748830724 - submitted /test/8 !!
2020-01-09 21:36:38.445832 +0100 CET m=+8.023171514 - processed /test/2 !!
2020-01-09 21:36:38.448423 +0100 CET m=+8.025762679 - submitted /test/9 !!
2020-01-09 21:36:38.730541 +0100 CET m=+8.307880933 - submitted /test/10 !!
2020-01-09 21:36:38.735158 +0100 CET m=+8.312497505 - processed /test/3 !!
2020-01-09 21:36:39.024788 +0100 CET m=+8.602128093 - processed /test/4 !!
2020-01-09 21:36:39.315991 +0100 CET m=+8.893331115 - processed /test/5 !!
2020-01-09 21:36:39.614848 +0100 CET m=+9.192187633 - processed /test/6 !!
2020-01-09 21:36:39.896692 +0100 CET m=+9.474031970 - processed /test/7 !!
2020-01-09 21:36:40.175952 +0100 CET m=+9.753291345 - processed /test/8 !!
2020-01-09 21:36:40.451877 +0100 CET m=+10.029216847 - processed /test/9 !!
2020-01-09 21:36:40.734289 +0100 CET m=+10.311628415 - processed /test/10 !!

More examples

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	Process()
}

Job interface for job processing

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue represents a queue for enqueueing jobs to be processed

func NewJobQueue

func NewJobQueue(maxWorkers int) *JobQueue

NewJobQueue creates a new job queue

func (*JobQueue) Start

func (q *JobQueue) Start()

Start starts the workers and dispatcher

func (*JobQueue) Stop

func (q *JobQueue) Stop()

Stop stops the workers and dispatcher Also closes internal job queue channel after the last value has been received

func (*JobQueue) Stopped

func (q *JobQueue) Stopped() bool

Stopped indicates wether the queue is stopped

func (*JobQueue) Submit

func (q *JobQueue) Submit(job Job)

Submit submits a new job to be processed to the internal queue

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker - the worker threads that actually process the jobs

func NewWorker

func NewWorker(readyPool chan chan Job, done *sync.WaitGroup) *Worker

NewWorker creates a new worker

func (*Worker) Start

func (w *Worker) Start()

Start - begins the job processing loop for the worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker

Directories

Path Synopsis
examples
sqsworker Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL