Documentation ¶
Overview ¶
Package workmen implements simple library for performin some jobs and keeping track of job's status and output.
Usual workflow consists from creating manager, starting it's loop. creating jobs and getting jobs from repository with it's status and output.
You can receive and reset all started jobs (it will not stop running goroutines!) with the manager's `ResetStartedJobs` method. It's recommended to call it at the worker's startup in order to prevent jobs from never completing. You can then use `DeleteJob` for removing jobs that shouldn't be restarted.
Example (Workflow) ¶
Example demonstrating basic usage and core features of the library.
package main import ( "fmt" "github.com/michaelkrukov/workmen" ) func main() { memoryDatabase := "file::memory:?mode=memory&cache=shared" // Create repository with sqlite3 in-memory database as jobs storage. repository := workmen.MakeRepository("sqlite3", memoryDatabase) // Create manager with maximum of 16 simultaneous jobs processing // and created repository. manager := workmen.MakeManager(16, repository) // Workaroud for using memory with connections pooling manager.Repository.SetMaxOpenConns(1) // Register processor for the specified job type. The processor accepts // Job and returns two values - job output and failure flag. manager.CreateJobType("echo", func(job workmen.Job) (string, bool) { return job.Input, false }) // Reset all started jobs (worker wasn't running, that means that all // started jobs are never going to complete). manager.ResetStartedJobs() // Start the processing loop in the background. go manager.Loop() // Create job with specified type and input. job, _ := manager.CreateJob("echo", "msg") // Wait for manager to receive jobs at least once and wait // until it's finished processing received jobs. In real world you // probably should just use `manager.Stop()`. manager.PingAndStop() // You can get job from manager by it's ID. It will have // curent job's state. Jobs is just values. job, _ = manager.GetJob(job.ID) if job.Completed { fmt.Println(job.Output) } else { fmt.Println("Job is not completed") } }
Output: msg
Index ¶
- Variables
- type GORMJobsRepository
- type Job
- type JobProcessor
- type JobsRepository
- type Manager
- func (m *Manager) CleanCompletedJobs(cleanBefore time.Time) ([]Job, error)
- func (m *Manager) CreateJob(jobType string, input string) (Job, error)
- func (m *Manager) CreateJobType(jobType string, processor JobProcessor)
- func (m *Manager) DeleteJob(job *Job) error
- func (m *Manager) GetJob(id uuid.UUID) (Job, error)
- func (m *Manager) Loop()
- func (m *Manager) PingAndStop()
- func (m *Manager) ResetStartedJobs() ([]Job, error)
- func (m *Manager) Stop()
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var JobTableName = "jobs"
JobTableName can be used to change default name for table with jobs in repository.
Functions ¶
This section is empty.
Types ¶
type GORMJobsRepository ¶
A GORMJobsRepository represents connection data and methods for interacting with jobs using gorm package. For possible Driver and URI values refer to it's documentation - https://gorm.io/docs/connecting_to_the_database.html.
func (*GORMJobsRepository) Close ¶
func (d *GORMJobsRepository) Close()
Close closes connection to the database.
func (*GORMJobsRepository) SetMaxOpenConns ¶
func (d *GORMJobsRepository) SetMaxOpenConns(n int)
SetMaxOpenConns sets the maximum number of open connections to the database.
type Job ¶
type Job struct { ID uuid.UUID `gorm:"primary_key"` Type string Started bool `gorm:"index:in_started"` Completed bool Failed bool Input string Output string Version int `gorm:"index:in_version"` CompletedAt time.Time CreatedAt time.Time }
A Job represents unit of processing. It stores job's ID, Type and Input data. Input is processed by JobProcessors and Output is results of it's processing.
type JobProcessor ¶
JobProcessor is function type for processing jobs. Should return job's output and boolean indicating failure.
type JobsRepository ¶
type JobsRepository interface { SetMaxOpenConns(int) Close() // contains filtered or unexported methods }
A JobsRepository represents interface for interacting with jobs.
func MakeRepository ¶
func MakeRepository(driver string, uri string) JobsRepository
MakeRepository creates new JobsRepository with specified driver and uri settings and initializes connection. Want to use Postgresql or mysql? Refer tp GORM documentation - https://gorm.io/docs/connecting_to_the_database.html. Currently only GORM is available as database backend for repository.
type Manager ¶
type Manager struct { Ping chan chan bool Repository JobsRepository // contains filtered or unexported fields }
A Manager represents master for organizing jobs processing. Repository stores jobs. Ping is used to notify manager that new jobs are available.
func MakeManager ¶
func MakeManager(maxThreads int, repository JobsRepository) *Manager
MakeManager creates manager. At most `maxThreads` goroutines will process jobs simultaneously. Manager will use provided repository for interacting with jobs. (see Database type for more information).
func (*Manager) CleanCompletedJobs ¶
CleanCompletedJobs remoevs all complete jobs that was completed before specified time
func (*Manager) CreateJob ¶
CreateJob creates Job to be processed, writes it to database and pings manager.
func (*Manager) CreateJobType ¶
func (m *Manager) CreateJobType(jobType string, processor JobProcessor)
CreateJobType sets function as processor for jobs with specified type. It removes previous processor for this type.
func (*Manager) Loop ¶
func (m *Manager) Loop()
Loop processes jobs from database when manager is pinged througt Ping channel.
func (*Manager) PingAndStop ¶
func (m *Manager) PingAndStop()
PingAndStop waits for at least one database query and then calls Stop.
func (*Manager) ResetStartedJobs ¶
ResetStartedJobs resets all jobs that were started. It will not stop goroutines, only update repository.