workmen

package module
v0.0.0-...-16418dd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2020 License: MIT Imports: 7 Imported by: 0

README

Workmen

Build Status codecov Go Report Card GoDoc

Simple library for performing some jobs and keeping track of theirs status and output.

Overview

  • Straightforward API
  • Reliable infrastructure
  • Extensive testing
  • Consistency (ResetStartedJobs, DeleteJob)

Documentation

Overview

Package workmen implements simple library for performin some jobs and keeping track of job's status and output.

Usual workflow consists from creating manager, starting it's loop. creating jobs and getting jobs from repository with it's status and output.

You can receive and reset all started jobs (it will not stop running goroutines!) with the manager's `ResetStartedJobs` method. It's recommended to call it at the worker's startup in order to prevent jobs from never completing. You can then use `DeleteJob` for removing jobs that shouldn't be restarted.

Example (Workflow)

Example demonstrating basic usage and core features of the library.

package main

import (
	"fmt"

	"github.com/michaelkrukov/workmen"
)

func main() {
	memoryDatabase := "file::memory:?mode=memory&cache=shared"

	// Create repository with sqlite3 in-memory database as jobs storage.
	repository := workmen.MakeRepository("sqlite3", memoryDatabase)

	// Create manager with maximum of 16 simultaneous jobs processing
	// and created repository.
	manager := workmen.MakeManager(16, repository)

	// Workaroud for using memory with connections pooling
	manager.Repository.SetMaxOpenConns(1)

	// Register processor for the specified job type. The processor accepts
	// Job and returns two values - job output and failure flag.
	manager.CreateJobType("echo", func(job workmen.Job) (string, bool) {
		return job.Input, false
	})

	// Reset all started jobs (worker wasn't running, that means that all
	// started jobs are never going to complete).
	manager.ResetStartedJobs()

	// Start the processing loop in the background.
	go manager.Loop()

	// Create job with specified type and input.
	job, _ := manager.CreateJob("echo", "msg")

	// Wait for manager to receive jobs at least once and wait
	// until it's finished processing received jobs. In real world you
	// probably should just use `manager.Stop()`.
	manager.PingAndStop()

	// You can get job from manager by it's ID. It will have
	// curent job's state. Jobs is just values.
	job, _ = manager.GetJob(job.ID)

	if job.Completed {
		fmt.Println(job.Output)
	} else {
		fmt.Println("Job is not completed")
	}

}
Output:

msg

Index

Examples

Constants

This section is empty.

Variables

View Source
var JobTableName = "jobs"

JobTableName can be used to change default name for table with jobs in repository.

Functions

This section is empty.

Types

type GORMJobsRepository

type GORMJobsRepository struct {
	Driver string
	URI    string
	DB     *gorm.DB
}

A GORMJobsRepository represents connection data and methods for interacting with jobs using gorm package. For possible Driver and URI values refer to it's documentation - https://gorm.io/docs/connecting_to_the_database.html.

func (*GORMJobsRepository) Close

func (d *GORMJobsRepository) Close()

Close closes connection to the database.

func (*GORMJobsRepository) SetMaxOpenConns

func (d *GORMJobsRepository) SetMaxOpenConns(n int)

SetMaxOpenConns sets the maximum number of open connections to the database.

type Job

type Job struct {
	ID          uuid.UUID `gorm:"primary_key"`
	Type        string
	Started     bool `gorm:"index:in_started"`
	Completed   bool
	Failed      bool
	Input       string
	Output      string
	Version     int `gorm:"index:in_version"`
	CompletedAt time.Time
	CreatedAt   time.Time
}

A Job represents unit of processing. It stores job's ID, Type and Input data. Input is processed by JobProcessors and Output is results of it's processing.

func (Job) TableName

func (Job) TableName() string

TableName returns name for the table with jobs.

type JobProcessor

type JobProcessor func(Job) (string, bool)

JobProcessor is function type for processing jobs. Should return job's output and boolean indicating failure.

type JobsRepository

type JobsRepository interface {
	SetMaxOpenConns(int)
	Close()
	// contains filtered or unexported methods
}

A JobsRepository represents interface for interacting with jobs.

func MakeRepository

func MakeRepository(driver string, uri string) JobsRepository

MakeRepository creates new JobsRepository with specified driver and uri settings and initializes connection. Want to use Postgresql or mysql? Refer tp GORM documentation - https://gorm.io/docs/connecting_to_the_database.html. Currently only GORM is available as database backend for repository.

type Manager

type Manager struct {
	Ping       chan chan bool
	Repository JobsRepository
	// contains filtered or unexported fields
}

A Manager represents master for organizing jobs processing. Repository stores jobs. Ping is used to notify manager that new jobs are available.

func MakeManager

func MakeManager(maxThreads int, repository JobsRepository) *Manager

MakeManager creates manager. At most `maxThreads` goroutines will process jobs simultaneously. Manager will use provided repository for interacting with jobs. (see Database type for more information).

func (*Manager) CleanCompletedJobs

func (m *Manager) CleanCompletedJobs(cleanBefore time.Time) ([]Job, error)

CleanCompletedJobs remoevs all complete jobs that was completed before specified time

func (*Manager) CreateJob

func (m *Manager) CreateJob(jobType string, input string) (Job, error)

CreateJob creates Job to be processed, writes it to database and pings manager.

func (*Manager) CreateJobType

func (m *Manager) CreateJobType(jobType string, processor JobProcessor)

CreateJobType sets function as processor for jobs with specified type. It removes previous processor for this type.

func (*Manager) DeleteJob

func (m *Manager) DeleteJob(job *Job) error

DeleteJob deletes job without stopping any goroutines.

func (*Manager) GetJob

func (m *Manager) GetJob(id uuid.UUID) (Job, error)

GetJob returns Job with specified ID.

func (*Manager) Loop

func (m *Manager) Loop()

Loop processes jobs from database when manager is pinged througt Ping channel.

func (*Manager) PingAndStop

func (m *Manager) PingAndStop()

PingAndStop waits for at least one database query and then calls Stop.

func (*Manager) ResetStartedJobs

func (m *Manager) ResetStartedJobs() ([]Job, error)

ResetStartedJobs resets all jobs that were started. It will not stop goroutines, only update repository.

func (*Manager) Stop

func (m *Manager) Stop()

Stop attempts to stop manager and blocks until manager is stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL