job

package
v0.8.5-0...-a152ffe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2021 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LocalJob jobType = iota
	RemoteJob
)
View Source
const (
	AccessTokenKey = contextKey("Access Token Key")
)

Variables

View Source
var (
	RFC3339WithoutTimezone = "2006-01-02T15:04:05"

	ErrInvalidJob       = errors.New("Invalid Local Job. Job's must contain a Name and a Command field")
	ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field")
	ErrInvalidJobType   = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote")
)
View Source
var (
	ErrJobDisabled       = errors.New("Job cannot run, as it is disabled")
	ErrCmdIsEmpty        = errors.New("Job Command is empty.")
	ErrJobTypeInvalid    = errors.New("Job Type is not valid.")
	ErrInvalidDelimiters = errors.New("Job has invalid templating delimiters.")
)
View Source
var (
	ErrJobDoesntExist = errors.New("The job you requested does not exist")
)
View Source
var (
	Oauth2Config *oauth2.Config = nil
)
View Source
var (
	Status = &jobStatus{
		Started: JobStatus("Started"),
		Running: JobStatus("Running"),
		Failed:  JobStatus("Failed"),
		Success: JobStatus("Success"),
	}
)

Functions

func AuthHandler

func AuthHandler(next http.Handler) http.Handler

func DeleteAll

func DeleteAll(cache JobCache) error

func GetJobToken

func GetJobToken(ctx context.Context) (string, error)

func InitAuth

func InitAuth()

func InitMailer

func InitMailer()

func Notify

func Notify(toAddress string, subject string, message string) error

func NotifyOfJobFailure

func NotifyOfJobFailure(j *Job, run *JobStat) error

func SetupAuth

func SetupAuth(issuer string, audience string, clientId string, clientSecret string, userName string, pwd string,
	tokenUrl string)

Types

type Clock

type Clock struct {
	clock.Clock
	// contains filtered or unexported fields
}

func (*Clock) SetClock

func (clk *Clock) SetClock(in clock.Clock)

func (*Clock) Time

func (clk *Clock) Time() clock.Clock

func (*Clock) TimeSet

func (clk *Clock) TimeSet() (result bool)

type Clocker

type Clocker interface {
	Time() clock.Clock
	TimeSet() bool
}

type ErrJobNotFound

type ErrJobNotFound string

ErrJobNotFound is raised when a Job is unable to be found within a database.

func (ErrJobNotFound) Error

func (id ErrJobNotFound) Error() string

type Job

type Job struct {
	Name string `json:"name"`
	Id   string `json:"id"`

	// Command to run
	// e.g. "bash /path/to/my/script.sh"
	Command string `json:"command"`

	// Email of the owner of this job
	// e.g. "admin@example.com"
	Owner string `json:"owner"`

	// Is this job disabled?
	Disabled bool `json:"disabled"`

	// Jobs that are dependent upon this one will be run after this job runs.
	DependentJobs []string `json:"dependent_jobs"`

	// List of ids of jobs that this job is dependent upon.
	ParentJobs []string `json:"parent_jobs"`

	// Job that gets run after all retries have failed consecutively
	OnFailureJob string `json:"on_failure_job"`

	// ISO 8601 String
	// e.g. "R/2014-03-08T20:00:00.000Z/PT2H"
	Schedule string `json:"schedule"`

	// Number of times to retry on failed attempt for each run.
	Retries uint `json:"retries"`

	// Duration in which it is safe to retry the Job.
	Epsilon string `json:"epsilon"`

	NextRunAt time.Time `json:"next_run_at"`

	// Templating delimiters, the left & right separated by space,
	// for example `{{ }}` or `${ }`.
	//
	// If this field is non-empty, then each time this
	// job is executed, Kala will template its main
	// content as a Go Template with the job itself as data.
	//
	// The Command is templated for local jobs,
	// and Url and Body in RemoteProperties.
	TemplateDelimiters string

	// If the job is disabled (or the system inoperative) and we pass
	// the scheduled run point, when the job becomes active again,
	// normally the job will run immediately.
	// With this setting on, it will not run immediately, but will wait
	// until the next scheduled run time comes along.
	ResumeAtNextScheduledTime bool `json:"resume_at_next_scheduled_time"`

	// Meta data about successful and failed runs.
	Metadata Metadata `json:"metadata"`

	// Type of the job
	JobType jobType `json:"type"`

	// Custom properties for the remote job type
	RemoteProperties RemoteProperties `json:"remote_properties"`

	// Says if a job has been executed right numbers of time
	// and should not been executed again in the future
	IsDone bool `json:"is_done"`
	// contains filtered or unexported fields
}

func GetMockFailingJob

func GetMockFailingJob() *Job

func GetMockJob

func GetMockJob() *Job

func GetMockJobWithGenericSchedule

func GetMockJobWithGenericSchedule(now time.Time) *Job

func GetMockJobWithSchedule

func GetMockJobWithSchedule(repeat int, scheduleTime time.Time, delay string) *Job

func GetMockRecurringJobWithSchedule

func GetMockRecurringJobWithSchedule(scheduleTime time.Time, delay string) *Job

func GetMockRemoteJob

func GetMockRemoteJob(props RemoteProperties) *Job

func NewFromBytes

func NewFromBytes(b []byte) (*Job, error)

NewFromBytes returns a Job instance from a byte representation.

func (Job) Bytes

func (j Job) Bytes() ([]byte, error)

Bytes returns the byte representation of the Job.

func (*Job) Delete

func (j *Job) Delete(cache JobCache) error

func (*Job) DeleteFromDependentJobs

func (j *Job) DeleteFromDependentJobs(cache JobCache) error

DeleteFromDependentJobs

func (*Job) DeleteFromParentJobs

func (j *Job) DeleteFromParentJobs(cache JobCache) error

DeleteFromParentJobs goes through and deletes the current job from any parent jobs.

func (*Job) Disable

func (j *Job) Disable(cache JobCache) error

Disable stops the job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*Job) Enable

func (j *Job) Enable(cache JobCache) error

func (*Job) GetWaitDuration

func (j *Job) GetWaitDuration() time.Duration

func (*Job) Init

func (j *Job) Init(cache JobCache) error

Init fills in the protected fields and parses the iso8601 notation. It also adds the job to the Cache

func (*Job) InitDelayDuration

func (j *Job) InitDelayDuration(checkTime bool) error

InitDelayDuration is used to parsed the iso8601 Schedule notation into its relevant fields in the Job struct. If checkTime is true, then it will return an error if the Scheduled time has passed.

func (*Job) MarshalJSON

func (j *Job) MarshalJSON() ([]byte, error)

need this to fix race condition

func (*Job) Now

func (j *Job) Now() time.Time

func (*Job) ResponseTimeout

func (j *Job) ResponseTimeout() time.Duration

ResponseTimeout sets a default timeout if none specified

func (*Job) Run

func (j *Job) Run(cache JobCache)

func (*Job) RunCmd

func (j *Job) RunCmd() (string, error)

func (*Job) RunOnFailureJob

func (j *Job) RunOnFailureJob(cache JobCache)

Runs the on failure job, if it exists. Does not lock the parent job - it is up to you to do this however you want

func (*Job) SetClock

func (j *Job) SetClock(clk clock.Clock)

func (*Job) SetHeaders

func (j *Job) SetHeaders(req *http.Request, token string)

SetHeaders sets default and user specific headers to the http request

func (*Job) ShouldStartWaiting

func (j *Job) ShouldStartWaiting() bool

func (*Job) StartWaiting

func (j *Job) StartWaiting(cache JobCache, justRan bool)

StartWaiting begins a timer for when it should execute the Jobs .Run() method.

func (*Job) StopTimer

func (j *Job) StopTimer()

func (*Job) TryTemplatize

func (j *Job) TryTemplatize(content string) (string, error)

TryTemplatize returns a string based on a template using data defined in the Job definition.

type JobCache

type JobCache interface {
	Get(id string) (*Job, error)
	GetAll() *JobsMap
	Set(j *Job) error
	Delete(id string) error
	Persist() error
	Enable(j *Job) error
	Disable(j *Job) error
	SaveRun(run *JobStat) error
	UpdateRun(run *JobStat) error
	DeleteRun(runId string) error
	GetAllRuns(jobID string) ([]*JobStat, error)
	GetRun(runID string) (*JobStat, error)
	ClearExpiredRuns() error
}

type JobDB

type JobDB interface {
	GetAll() ([]*Job, error)
	Get(id string) (*Job, error)
	Delete(id string) error
	Save(job *Job) error
	Close() error
	SaveRun(*JobStat) error
	UpdateRun(*JobStat) error
	GetAllRuns(jobID string) ([]*JobStat, error)
	GetRun(runID string) (*JobStat, error)
	DeleteRun(jobId string) error
	ClearExpiredRuns() error
}

type JobRunner

type JobRunner struct {
	// contains filtered or unexported fields
}

func (*JobRunner) LocalRun

func (j *JobRunner) LocalRun() (string, error)

LocalRun executes the Job's local shell command

func (*JobRunner) RemoteRun

func (j *JobRunner) RemoteRun() (string, error)

RemoteRun sends a http request, and checks if the response is valid in time,

func (*JobRunner) Run

func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error)

Run calls the appropriate run function, collects metadata around the success or failure of the Job's execution, and schedules the next run.

type JobStat

type JobStat struct {
	Id                string        `json:"id"`
	JobId             string        `json:"job_id"`
	RanAt             time.Time     `json:"ran_at"`
	NumberOfRetries   uint          `json:"number_of_retries"`
	Status            JobStatus     `json:"status"`
	ExecutionDuration time.Duration `json:"execution_duration"`
	Output            string        `json:"output"`
}

JobStat is used to store metrics about a specific Job .Run()

func NewJobStat

func NewJobStat(jobId string) *JobStat

type JobStatus

type JobStatus string

type JobsMap

type JobsMap struct {
	Jobs map[string]*Job
	Lock sync.RWMutex
}

func NewJobsMap

func NewJobsMap() *JobsMap

type KalaStats

type KalaStats struct {
	ActiveJobs   int `json:"active_jobs"`
	DisabledJobs int `json:"disabled_jobs"`
	Jobs         int `json:"jobs"`

	ErrorCount   uint `json:"error_count"`
	SuccessCount uint `json:"success_count"`

	NextRunAt        time.Time `json:"next_run_at"`
	LastAttemptedRun time.Time `json:"last_attempted_run"`

	CreatedAt time.Time `json:"created"`
}

KalaStats is the struct for storing app-level metrics

func NewKalaStats

func NewKalaStats(cache JobCache) *KalaStats

NewKalaStats is used to easily generate a current app-level metrics report.

type LockFreeJobCache

type LockFreeJobCache struct {
	Clock
	// contains filtered or unexported fields
}

func NewLockFreeJobCache

func NewLockFreeJobCache(jobDB JobDB) *LockFreeJobCache

func NewMockCache

func NewMockCache() *LockFreeJobCache

func (*LockFreeJobCache) ClearExpiredRuns

func (c *LockFreeJobCache) ClearExpiredRuns() error

func (*LockFreeJobCache) Delete

func (c *LockFreeJobCache) Delete(id string) error

func (*LockFreeJobCache) DeleteRun

func (c *LockFreeJobCache) DeleteRun(runId string) error

func (*LockFreeJobCache) Disable

func (c *LockFreeJobCache) Disable(j *Job) error

Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*LockFreeJobCache) Enable

func (c *LockFreeJobCache) Enable(j *Job) error

func (*LockFreeJobCache) Get

func (c *LockFreeJobCache) Get(id string) (*Job, error)

func (*LockFreeJobCache) GetAll

func (c *LockFreeJobCache) GetAll() *JobsMap

func (*LockFreeJobCache) GetAllRuns

func (c *LockFreeJobCache) GetAllRuns(jobID string) ([]*JobStat, error)

func (*LockFreeJobCache) GetRun

func (c *LockFreeJobCache) GetRun(runID string) (*JobStat, error)

func (*LockFreeJobCache) Persist

func (c *LockFreeJobCache) Persist() error

func (*LockFreeJobCache) SaveRun

func (c *LockFreeJobCache) SaveRun(run *JobStat) error

func (*LockFreeJobCache) Set

func (c *LockFreeJobCache) Set(j *Job) error

func (*LockFreeJobCache) Start

func (c *LockFreeJobCache) Start(jobstatTtl time.Duration)

func (*LockFreeJobCache) UpdateRun

func (c *LockFreeJobCache) UpdateRun(run *JobStat) error

type Mailer

type Mailer struct {
	// contains filtered or unexported fields
}

type MemoryDB

type MemoryDB struct {
	// contains filtered or unexported fields
}

func NewMemoryDB

func NewMemoryDB() *MemoryDB

func (*MemoryDB) ClearExpiredRuns

func (m *MemoryDB) ClearExpiredRuns() error

func (*MemoryDB) Close

func (m *MemoryDB) Close() error

func (*MemoryDB) Delete

func (m *MemoryDB) Delete(id string) error

func (*MemoryDB) DeleteRun

func (m *MemoryDB) DeleteRun(id string) error

func (*MemoryDB) Get

func (m *MemoryDB) Get(id string) (*Job, error)

func (*MemoryDB) GetAll

func (m *MemoryDB) GetAll() (ret []*Job, _ error)

func (*MemoryDB) GetAllRuns

func (m *MemoryDB) GetAllRuns(jobID string) (ret []*JobStat, _ error)

func (*MemoryDB) GetRun

func (m *MemoryDB) GetRun(runID string) (ret *JobStat, _ error)

func (*MemoryDB) Save

func (m *MemoryDB) Save(j *Job) error

func (*MemoryDB) SaveRun

func (m *MemoryDB) SaveRun(run *JobStat) error

func (*MemoryDB) UpdateRun

func (m *MemoryDB) UpdateRun(jobStat *JobStat) error

type MemoryJobCache

type MemoryJobCache struct {
	// contains filtered or unexported fields
}

func NewMemoryJobCache

func NewMemoryJobCache(jobDB JobDB) *MemoryJobCache

func (*MemoryJobCache) ClearExpiredRuns

func (c *MemoryJobCache) ClearExpiredRuns() error

func (*MemoryJobCache) Delete

func (c *MemoryJobCache) Delete(id string) error

func (*MemoryJobCache) DeleteRun

func (c *MemoryJobCache) DeleteRun(runId string) error

func (*MemoryJobCache) Disable

func (c *MemoryJobCache) Disable(j *Job) error

Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*MemoryJobCache) Enable

func (c *MemoryJobCache) Enable(j *Job) error

func (*MemoryJobCache) Get

func (c *MemoryJobCache) Get(id string) (*Job, error)

func (*MemoryJobCache) GetAll

func (c *MemoryJobCache) GetAll() *JobsMap

func (*MemoryJobCache) GetAllRuns

func (c *MemoryJobCache) GetAllRuns(jobID string) ([]*JobStat, error)

func (*MemoryJobCache) GetRun

func (c *MemoryJobCache) GetRun(runID string) (*JobStat, error)

func (*MemoryJobCache) Persist

func (c *MemoryJobCache) Persist() error

func (*MemoryJobCache) SaveRun

func (c *MemoryJobCache) SaveRun(run *JobStat) error

func (*MemoryJobCache) Set

func (c *MemoryJobCache) Set(j *Job) error

func (*MemoryJobCache) Start

func (c *MemoryJobCache) Start()

func (*MemoryJobCache) UpdateRun

func (c *MemoryJobCache) UpdateRun(run *JobStat) error

type Metadata

type Metadata struct {
	SuccessCount         uint      `json:"success_count"`
	LastSuccess          time.Time `json:"last_success"`
	ErrorCount           uint      `json:"error_count"`
	LastError            time.Time `json:"last_error"`
	LastAttemptedRun     time.Time `json:"last_attempted_run"`
	NumberOfFinishedRuns uint      `json:"number_of_finished_runs"`
}

type MockDB

type MockDB struct {
	Runs map[string]*JobStat
}

func (*MockDB) ClearExpiredRuns

func (m *MockDB) ClearExpiredRuns() error

func (*MockDB) Close

func (m *MockDB) Close() error

func (*MockDB) Delete

func (m *MockDB) Delete(id string) error

func (*MockDB) DeleteRun

func (m *MockDB) DeleteRun(jobId string) error

func (*MockDB) Get

func (m *MockDB) Get(id string) (*Job, error)

func (*MockDB) GetAll

func (m *MockDB) GetAll() ([]*Job, error)

func (*MockDB) GetAllRuns

func (m *MockDB) GetAllRuns(jobID string) ([]*JobStat, error)

func (*MockDB) GetRun

func (m *MockDB) GetRun(runID string) (*JobStat, error)

func (*MockDB) Save

func (m *MockDB) Save(job *Job) error

func (*MockDB) SaveRun

func (m *MockDB) SaveRun(jobStat *JobStat) error

func (*MockDB) UpdateRun

func (m *MockDB) UpdateRun(jobStat *JobStat) error

type MockDBGetAll

type MockDBGetAll struct {
	MockDB
	// contains filtered or unexported fields
}

func (*MockDBGetAll) GetAll

func (d *MockDBGetAll) GetAll() ([]*Job, error)

type RJob

type RJob Job

Type alias for the recursive call

type RemoteProperties

type RemoteProperties struct {
	Url    string `json:"url"`
	Method string `json:"method"`

	// A body to attach to the http request
	Body string `json:"body"`

	// A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}])
	Headers http.Header `json:"headers"`

	// A timeout property for the http request in seconds
	Timeout int `json:"timeout"`

	// A list of expected response codes (e.g. [200, 201])
	ExpectedResponseCodes []int `json:"expected_response_codes"`
}

RemoteProperties Custom properties for the remote job type

Directories

Path Synopsis
storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL