simplequeue

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2024 License: MPL-2.0 Imports: 4 Imported by: 0

README

go-simplequeue

GitHub go.mod Go version Codacy grade Go Report Card Liberapay patrons ko-fi

Simple locking queue system with workers.

Also see: https://pkg.go.dev/code.vanderkleijn.net/go-simplequeue

Support

Source code and issues: https://github.com/mvdkleijn/go-simplequeue Also see: https://vanderkleijn.net/posts/announcing-go-simplequeue/

Usage / Example

// Define a job that conforms to the simplequeue.Job interface
type MyJob struct {
    id int
}

func (mj *MyJob) ID() int64 {
    return int64(mj.id)
}

func (mj *MyJob) Do() {
    // Lets just pause the job for a little time
    ms := time.Duration(rand.Intn(1000)+1) * time.Millisecond
    time.Sleep(ms)
    fmt.Printf("Job %d executing\n", mj.ID())
}

// Create some jobs for our test
func createJobs(number int) []*MyJob {
    jobs := make([]*MyJob, 0)

    for i := 1; i <= number; i++ {
        jobs = append(jobs, &MyJob{id: i})
    }

    return jobs
}

// Run our program
func main() {
    ctx := context.Background()

    // How much we want of each
    numWorkers := 15
    numJobs := 200

    // Create some jobs with a helper function
    jobs := createJobs(numJobs)

    // Create a queue
    q := sq.CreateQueue(ctx)

    // Initialize the workers
    workers := sq.InitializeWorkers(ctx, numWorkers)

    fmt.Printf("Number of workers in pool: %d\n", len(workers))
    fmt.Printf("Number of jobs for queue: %d\n", len(jobs))

    // Push the jobs onto the Queue
    for _, job := range jobs {
        q.Push(job)
    }

    // Process the queue with some workers
    q.Process(ctx, workers)

    // Show some stats afterwards
    var totalJobsHandled int64 = 0
    for _, w := range workers {
        totalJobsHandled += w.Handled()

        fmt.Printf("Worker %d processed a total of %d jobs\n", w.ID(), w.Handled())
    }

    fmt.Printf("Total jobs handled: %d\n", totalJobsHandled)
    fmt.Printf("Total workers: %d\n", len(workers))
}

Licensing

Go-simplequeue is made available under the MPL-2.0 license. The full details are available from the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job interface {
	ID() int64 // Returns the job's ID
	Do()       // Actually executes the job
}

Job should be implemented by any struct intended for queueing

type Queue

type Queue struct {
	Lock *sync.Mutex
	// contains filtered or unexported fields
}

Queue is set of Jobs that can be worked on by one or more Workers

func CreateQueue

func CreateQueue(ctx context.Context) *Queue

CreateQueue initializes and returns an empty queue

func (*Queue) Jobs

func (q *Queue) Jobs() int

Jobs returns the number of jobs on the queue

func (*Queue) Pop

func (q *Queue) Pop() *Job

Pop a job off of the queue and return it

func (*Queue) Process

func (q *Queue) Process(ctx context.Context, pool []*Worker)

Process the queue with a pool of workers

func (*Queue) Push

func (q *Queue) Push(job Job)

Push a job onto the queue

type QueueI

type QueueI interface {
	Pop() *Job                                   // Remove a job from the queue and return it
	Push(job *Job)                               // Add a job onto the queue
	Jobs() int                                   // Returns the number of jobs on the queue
	Process(ctx context.Context, pool []*Worker) // Process the queue with a pool of workers
}

QueueI is intended for possible future expansion

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is the implementation of a single worker

func InitializeWorkers

func InitializeWorkers(ctx context.Context, num int) []*Worker

InitializeWorkers initializes and returns a pool of workers

func (*Worker) Handled

func (w *Worker) Handled() int64

Handled returns the number of jobs the worker handled

func (*Worker) ID

func (w *Worker) ID() int64

ID returns the worker's ID

type WorkerI

type WorkerI interface {
	ID() int64
	Handled() int64
	Process(job Job)
}

WorkerI is intended for possible future expansion

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL