workqueue

package
v0.4.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

README

WorkQueue

Thw workqueue package is inspired by the taskQueue at https://github.com/richardwilkes/toolbox/tree/master/taskqueue. The differences being:

  • Work queued can be prioritized
  • Work's priority can be adjusted
  • Errors can be monitored by observing a channel

The workqueue.queue allows the processing of work in a queued fashion. By default, the queue's length is set to two times the number of CPUs, and the number of go routines working on the queue equal to the number of CPUs.

Work submitted to the queue takes the form of a function signature of func() error. When work is performed off the queue any errors are communicated back via a channel accessible via a call to the queue's Errors() function.

Work submitted to the queue, by default, all have equal priority and are processed on the queue's go routines in the order they were placed. Of course since the queue's go routines may operate in parallel, the work on the queue may not be finished in the same sequence they were put on the queue depending on go's scheduler.

However, there is an option to set a priority when submitting work to the queue. This will influence the order which work is processed. Work with a lower priority number is taken first (i.e. Work X is priority 1, while work Y is priority 2, resulting in work X taking precedence over work Y).

Sometimes the priority of queued work needs adjusted. For instance if a low priority work is queued for quite a while due to higher priority work being added to the queue. The work's priority can be dynamically adjuested with an option when the work is submitted (see example below).

License

This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt.

Simple Example

package main

import (
	"fmt"
	"github.com/rbell/toolchest/workqueue"
	"sync"
	"sync/atomic"
	"time"
)

func main() {
	// Create queue with number of workers equal to number of cpus and queue length equal to number cpus * 2
	q := workqueue.NewQueue()

	// wg to prevent app from exiting before all work is done
	wg := &sync.WaitGroup{}

	count := atomic.Int32{}

	// make 100 work functions to perform on the queue - each work function will increment count and print the resulting value, then emulate doing some work.
	work := make([]workqueue.Work, 100)
	for i := 0; i < 100; i++ {
		work[i] = func() error {
			defer wg.Done()
			index := count.Add(1)
			fmt.Printf("Doing some work! %v\n", index) // emulate logging
			time.Sleep(time.Millisecond * 100)         // emulate doing some processing
			return nil
		}
	}

	// Queue the work
	for _, w := range work {
		wg.Add(1)
		q.Enqueue(w)
	}

	wg.Wait()
}

Configuration

Configure the queue's length:

q := workqueue.NewQueue(workqueue.WithQueueLength(10))

Configure the number of go routines performing work on the queue:

q := workqueue.NewQueue(workqueue.WithWorkers(2))

Configuration options can be combined:

q := workqueue.NewQueue(workqueue.WithWorkers(2), workqueue.WithQueueLength(10))

Prioritize Work

Work can be prioritized when submitting the work to the queue:

q.QueueWork(workX, workqueue.WithPriority(1))
q.QueueWork(workY, workqueue.WithPriority(2))

(workX will take precidence over workY)

Dynamically Re Prioritizing Work

Work can be re prioritized dynamically:

adjustAt := time.Now().Add(time.Minute)
q.QueueWork(workX, 
	workqueue.WithPriority(100),
	workqueue.WithAdjustPriority(
		func() int {
			if time.Now() > adjustAt {
				return 1
			}
		}
    ))

In the above example, if workX is still queued after one minute, its priority will be set to priority one and, assuming no other work is prioritized above it, workX will be performed ahead of all other work.

Error Monitoring

Errors returned by work placed on the queue can be monitored via a call to the queue's Errors() function:

	q := workqueue.NewQueue()

	// launch go routine to monitor errors
	go func() {
		errCh := q.Errors()
		for {
			err := <-errCh
			if err != nil {
				fmt.Println(err)
			} else {
				break
			}
		}
	}()

Each call to Errors() returns a unique channel allowing multiple routines to "subscribe" to errors being reported by the queue.

Examples

Runnable examples can be found in the toolchest/workqueue/examples folder.

Documentation

Index

Constants

View Source
const (
	IN_QUEUE workState = iota
	IN_PROGRESS
)

work states

Variables

This section is empty.

Functions

func WithAdjustPriority added in v0.1.1

func WithAdjustPriority(adjustment func() int) workOption

WithAdjustPriority adds an adjustment priorty function to the workItem such that it's priority can be dynamically adjusted in queue

func WithName added in v0.2.0

func WithName(name string) workOption

WithName adds a name for the work (for reporting)

func WithPriority

func WithPriority(priority int) workOption

WithPriority sets the priority of the work to be done. Lower number is higher priority.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue allow work to be queued up and worked on in a set number of go routines

func NewQueue

func NewQueue(options ...WorkQueueOption) *Queue

NewQueue returns a reference to an initialized Queue

func (*Queue) Break

func (w *Queue) Break()

Break stops the queue form accepting any work and any work in queue is skipped

func (*Queue) Dequeue added in v0.2.0

func (w *Queue) Dequeue(id uuid.UUID) error

Dequeue removes from the queue the work item with the specified id. If the work item is in process, then an error is returned.

func (*Queue) Enqueue added in v0.2.0

func (w *Queue) Enqueue(workToDo Work, options ...workOption) uuid.UUID

Enqueue queues work to do on the workChan to be processed

func (*Queue) Errors

func (w *Queue) Errors() chan error

Errors allows monitoring errors that occur on work submitted to queue

func (*Queue) ResizeQueueLength added in v0.2.0

func (w *Queue) ResizeQueueLength(length int)

ResizeQueueLength adjusts the size of the queue

func (*Queue) SetPriority added in v0.2.1

func (w *Queue) SetPriority(id uuid.UUID, priority int) error

SetPriority changes the priority of the queued work item with the uuid.

func (*Queue) Stop

func (w *Queue) Stop()

Stop stops the queue from accepting work

func (*Queue) WorkItems added in v0.2.0

func (w *Queue) WorkItems() []*QueuedWork

WorkItems returns the current state of all queued work items

type QueuedWork added in v0.2.0

type QueuedWork struct {
	// contains filtered or unexported fields
}

func (*QueuedWork) Id added in v0.2.0

func (w *QueuedWork) Id() string

func (*QueuedWork) Name added in v0.2.0

func (w *QueuedWork) Name() string

func (*QueuedWork) Priority added in v0.2.0

func (w *QueuedWork) Priority() int

func (*QueuedWork) State added in v0.2.0

func (w *QueuedWork) State() string

type Work

type Work func() error

type WorkQueueOption

type WorkQueueOption func(*Queue)

func WithQueueLength

func WithQueueLength(queueLength int) WorkQueueOption

WithQueueLength sets the number of functions that can be queued up before routines queueing are blocked

func WithWorkers

func WithWorkers(workerCount int) WorkQueueOption

WithWorkers sets the number of go routines working on the workChan

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL