jobs

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const SendJobStatusJobType = "send_job_status"

Variables

View Source
var (
	ErrInvalidJobType   = errors.New("invalid job type")
	ErrPermanentFailure = errors.New("permanent failure")
)

Functions

func PermanentFailure added in v0.8.0

func PermanentFailure(err error) error

Types

type ExecutorFunc added in v0.8.0

type ExecutorFunc func(ctx context.Context, j *Job) error

type GormStore

type GormStore struct {
	// contains filtered or unexported fields
}

func (*GormStore) AcceptJob added in v0.10.0

func (s *GormStore) AcceptJob(j *Job, acceptedGracePeriod time.Duration) error

func (*GormStore) InsertJob

func (s *GormStore) InsertJob(j *Job) error

func (*GormStore) Job

func (s *GormStore) Job(id uuid.UUID) (j Job, err error)

func (*GormStore) Jobs

func (s *GormStore) Jobs(o datastore.ListOptions) (jj []Job, err error)

func (*GormStore) SchedulableJobs added in v0.8.0

func (s *GormStore) SchedulableJobs(acceptedGracePeriod, reSchedulableGracePeriod time.Duration, o datastore.ListOptions) (jj []Job, err error)

func (*GormStore) Status added in v0.9.0

func (s *GormStore) Status() ([]StatusQuery, error)

func (*GormStore) UpdateJob

func (s *GormStore) UpdateJob(j *Job) error

type JSONResponse added in v0.6.0

type JSONResponse struct {
	ID            uuid.UUID `json:"jobId"`
	Type          string    `json:"type"`
	State         State     `json:"state"`
	Error         string    `json:"error"`
	Errors        []string  `json:"errors"`
	Result        string    `json:"result"`
	TransactionID string    `json:"transactionId"`
	CreatedAt     time.Time `json:"createdAt"`
	UpdatedAt     time.Time `json:"updatedAt"`
}

Job HTTP response

type Job

type Job struct {
	ID                     uuid.UUID      `gorm:"column:id;primary_key;type:uuid;"`
	Type                   string         `gorm:"column:type"`
	State                  State          `gorm:"column:state;default:INIT;index:idx_jobs_state_updated_at"`
	Error                  string         `gorm:"column:error"`
	Errors                 pq.StringArray `gorm:"column:errors;type:text[]"`
	Result                 string         `gorm:"column:result"`
	TransactionID          string         `gorm:"column:transaction_id"`
	ExecCount              int            `gorm:"column:exec_count;default:0"`
	CreatedAt              time.Time      `gorm:"column:created_at"`
	UpdatedAt              time.Time      `gorm:"column:updated_at;index:idx_jobs_state_updated_at"`
	DeletedAt              gorm.DeletedAt `gorm:"column:deleted_at;index"`
	ShouldSendNotification bool           `gorm:"-"` // Whether or not to notify admin (via webhook for example)
	Attributes             datatypes.JSON `gorm:"attributes"`
}

Job database model

func (*Job) BeforeCreate

func (j *Job) BeforeCreate(tx *gorm.DB) (err error)

func (Job) TableName added in v0.9.0

func (Job) TableName() string

func (Job) ToJSONResponse added in v0.6.0

func (j Job) ToJSONResponse() JSONResponse

type JobOption added in v0.9.0

type JobOption func(*Job)

func WithAttributes added in v0.9.0

func WithAttributes(attributes datatypes.JSON) JobOption

type JobQueueStatus added in v0.9.0

type JobQueueStatus struct {
	JobsInit        int `json:"jobsInit"`
	JobsNotAccepted int `json:"jobsNotAccepted"`
	JobsAccepted    int `json:"jobsAccepted"`
	JobsErrored     int `json:"jobsErrored"`
	JobsFailed      int `json:"jobsFailed"`
	JobsCompleted   int `json:"jobsCompleted"`
}

type NotificationConfig added in v0.8.0

type NotificationConfig struct {
	// contains filtered or unexported fields
}

func (*NotificationConfig) SendJobStatus added in v0.8.0

func (cfg *NotificationConfig) SendJobStatus(ctx context.Context, content string) error

func (*NotificationConfig) SendJobStatusWebhook added in v0.8.0

func (cfg *NotificationConfig) SendJobStatusWebhook(ctx context.Context, content string) error

func (*NotificationConfig) ShouldSendJobStatus added in v0.8.0

func (cfg *NotificationConfig) ShouldSendJobStatus() bool

type Result added in v0.6.0

type Result struct {
	Result        string
	TransactionID string
}

type Service

type Service interface {
	List(limit, offset int) (*[]Job, error)
	Details(jobID string) (*Job, error)
}

func NewService

func NewService(store Store) Service

NewService initiates a new job service.

type ServiceImpl added in v0.10.0

type ServiceImpl struct {
	// contains filtered or unexported fields
}

ServiceImpl defines the API for job HTTP handlers.

func (*ServiceImpl) Details added in v0.10.0

func (s *ServiceImpl) Details(jobID string) (*Job, error)

Details returns a specific job.

func (*ServiceImpl) List added in v0.10.0

func (s *ServiceImpl) List(limit, offset int) (*[]Job, error)

List returns all jobs in the datastore.

type State added in v0.8.0

type State string

State is a type for Job state.

const (
	Init               State = "INIT"
	Accepted           State = "ACCEPTED"
	NoAvailableWorkers State = "NO_AVAILABLE_WORKERS"
	Error              State = "ERROR"
	Complete           State = "COMPLETE"
	Failed             State = "FAILED"
)

type StatusQuery added in v0.9.0

type StatusQuery struct {
	State State
	Count int
}

type Store

type Store interface {
	Jobs(datastore.ListOptions) ([]Job, error)
	Job(id uuid.UUID) (Job, error)
	InsertJob(*Job) error
	UpdateJob(*Job) error
	AcceptJob(j *Job, acceptedGracePeriod time.Duration) error
	SchedulableJobs(acceptedGracePeriod, reSchedulableGracePeriod time.Duration, o datastore.ListOptions) ([]Job, error)
	Status() ([]StatusQuery, error)
}

Store manages data regarding jobs.

func NewGormStore

func NewGormStore(db *gorm.DB) Store

type WorkerPool

type WorkerPool interface {
	RegisterExecutor(jobType string, executorF ExecutorFunc)
	CreateJob(jobType, txID string, opts ...JobOption) (*Job, error)
	Schedule(j *Job) error
	Status() (WorkerPoolStatus, error)
	Start()
	Stop(wait bool)
	Capacity() uint
	QueueSize() uint
}

func NewWorkerPool

func NewWorkerPool(db Store, capacity uint, workerCount uint, opts ...WorkerPoolOption) WorkerPool

type WorkerPoolImpl added in v0.10.0

type WorkerPoolImpl struct {
	// contains filtered or unexported fields
}

func (*WorkerPoolImpl) Capacity added in v0.10.0

func (wp *WorkerPoolImpl) Capacity() uint

func (*WorkerPoolImpl) CreateJob added in v0.10.0

func (wp *WorkerPoolImpl) CreateJob(jobType, txID string, opts ...JobOption) (*Job, error)

CreateJob constructs a new Job for type `jobType` ready for scheduling.

func (*WorkerPoolImpl) QueueSize added in v0.10.0

func (wp *WorkerPoolImpl) QueueSize() uint

func (*WorkerPoolImpl) RegisterExecutor added in v0.10.0

func (wp *WorkerPoolImpl) RegisterExecutor(jobType string, executorF ExecutorFunc)

func (*WorkerPoolImpl) Schedule added in v0.10.0

func (wp *WorkerPoolImpl) Schedule(j *Job) error

Schedule will try to immediately schedule the run of a job

func (*WorkerPoolImpl) Start added in v0.10.0

func (wp *WorkerPoolImpl) Start()

func (*WorkerPoolImpl) Status added in v0.10.0

func (wp *WorkerPoolImpl) Status() (WorkerPoolStatus, error)

func (*WorkerPoolImpl) Stop added in v0.10.0

func (wp *WorkerPoolImpl) Stop(wait bool)

type WorkerPoolOption added in v0.8.0

type WorkerPoolOption func(*WorkerPoolImpl)

func WithAcceptedGracePeriod added in v0.9.0

func WithAcceptedGracePeriod(d time.Duration) WorkerPoolOption

func WithDbJobPollInterval added in v0.9.0

func WithDbJobPollInterval(d time.Duration) WorkerPoolOption

func WithJobStatusWebhook added in v0.8.0

func WithJobStatusWebhook(u string, timeout time.Duration) WorkerPoolOption

func WithLogger added in v0.9.0

func WithLogger(logger *log.Logger) WorkerPoolOption

func WithMaxJobErrorCount added in v0.9.0

func WithMaxJobErrorCount(count int) WorkerPoolOption

func WithReSchedulableGracePeriod added in v0.9.0

func WithReSchedulableGracePeriod(d time.Duration) WorkerPoolOption

func WithSystemService added in v0.9.0

func WithSystemService(svc system.Service) WorkerPoolOption

type WorkerPoolStatus added in v0.9.0

type WorkerPoolStatus struct {
	JobQueueStatus
	Capacity    int `json:"poolCapacity"`
	WorkerCount int `json:"workerCount"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL