import "github.com/ajvb/kala/job"
cache.go cache_util.go clock.go db.go job.go runner.go stats.go test_utils.go
var ( RFC3339WithoutTimezone = "2006-01-02T15:04:05" ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field") ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") )
var ( ErrJobDisabled = errors.New("Job cannot run, as it is disabled") ErrCmdIsEmpty = errors.New("Job Command is empty.") ErrJobTypeInvalid = errors.New("Job Type is not valid.") ErrInvalidDelimiters = errors.New("Job has invalid templating delimiters.") )
ErrJobNotFound is raised when a Job is unable to be found within a database.
func (id ErrJobNotFound) Error() string
type Job struct { Name string `json:"name"` Id string `json:"id"` // Command to run // e.g. "bash /path/to/my/script.sh" Command string `json:"command"` // Email of the owner of this job // e.g. "admin@example.com" Owner string `json:"owner"` // Is this job disabled? Disabled bool `json:"disabled"` // Jobs that are dependent upon this one will be run after this job runs. DependentJobs []string `json:"dependent_jobs"` // List of ids of jobs that this job is dependent upon. ParentJobs []string `json:"parent_jobs"` // Job that gets run after all retries have failed consecutively OnFailureJob string `json:"on_failure_job"` // ISO 8601 String // e.g. "R/2014-03-08T20:00:00.000Z/PT2H" Schedule string `json:"schedule"` // Number of times to retry on failed attempt for each run. Retries uint `json:"retries"` // Duration in which it is safe to retry the Job. Epsilon string `json:"epsilon"` NextRunAt time.Time `json:"next_run_at"` // Templating delimiters, the left & right separated by space, // for example `{{ }}` or `${ }`. // // If this field is non-empty, then each time this // job is executed, Kala will template its main // content as a Go Template with the job itself as data. // // The Command is templated for local jobs, // and Url and Body in RemoteProperties. TemplateDelimiters string // If the job is disabled (or the system inoperative) and we pass // the scheduled run point, when the job becomes active again, // normally the job will run immediately. // With this setting on, it will not run immediately, but will wait // until the next scheduled run time comes along. ResumeAtNextScheduledTime bool `json:"resume_at_next_scheduled_time"` // Meta data about successful and failed runs. Metadata Metadata `json:"metadata"` // Type of the job JobType jobType `json:"type"` // Custom properties for the remote job type RemoteProperties RemoteProperties `json:"remote_properties"` // Collection of Job Stats Stats []*JobStat `json:"stats"` // Says if a job has been executed right numbers of time // and should not been executed again in the future IsDone bool `json:"is_done"` // contains filtered or unexported fields }
func GetMockRemoteJob(props RemoteProperties) *Job
NewFromBytes returns a Job instance from a byte representation.
Bytes returns the byte representation of the Job.
DeleteFromDependentJobs
DeleteFromParentJobs goes through and deletes the current job from any parent jobs.
Disable stops the job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
Init fills in the protected fields and parses the iso8601 notation. It also adds the job to the Cache
InitDelayDuration is used to parsed the iso8601 Schedule notation into its relevant fields in the Job struct. If checkTime is true, then it will return an error if the Scheduled time has passed.
need this to fix race condition
Runs the on failure job, if it exists. Does not lock the parent job - it is up to you to do this however you want
StartWaiting begins a timer for when it should execute the Jobs .Run() method.
type JobCache interface { Get(id string) (*Job, error) GetAll() *JobsMap Set(j *Job) error Delete(id string) error Persist() error Enable(j *Job) error Disable(j *Job) error }
type JobDB interface { GetAll() ([]*Job, error) Get(id string) (*Job, error) Delete(id string) error Save(job *Job) error Close() error }
type JobRunner struct {
// contains filtered or unexported fields
}
LocalRun executes the Job's local shell command
RemoteRun sends a http request, and checks if the response is valid in time,
Run calls the appropriate run function, collects metadata around the success or failure of the Job's execution, and schedules the next run.
type JobStat struct { JobId string `json:"job_id"` RanAt time.Time `json:"ran_at"` NumberOfRetries uint `json:"number_of_retries"` Success bool `json:"success"` ExecutionDuration time.Duration `json:"execution_duration"` }
JobStat is used to store metrics about a specific Job .Run()
type KalaStats struct { ActiveJobs int `json:"active_jobs"` DisabledJobs int `json:"disabled_jobs"` Jobs int `json:"jobs"` ErrorCount uint `json:"error_count"` SuccessCount uint `json:"success_count"` NextRunAt time.Time `json:"next_run_at"` LastAttemptedRun time.Time `json:"last_attempted_run"` CreatedAt time.Time `json:"created"` }
KalaStats is the struct for storing app-level metrics
NewKalaStats is used to easily generate a current app-level metrics report.
type LockFreeJobCache struct { PersistOnWrite bool Clock // contains filtered or unexported fields }
func NewLockFreeJobCache(jobDB JobDB) *LockFreeJobCache
func NewMockCache() *LockFreeJobCache
func (c *LockFreeJobCache) Delete(id string) error
func (c *LockFreeJobCache) Disable(j *Job) error
Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
func (c *LockFreeJobCache) Enable(j *Job) error
func (c *LockFreeJobCache) Get(id string) (*Job, error)
func (c *LockFreeJobCache) GetAll() *JobsMap
func (c *LockFreeJobCache) Persist() error
func (c *LockFreeJobCache) PersistEvery(persistWaitTime time.Duration)
func (c *LockFreeJobCache) Retain() error
func (c *LockFreeJobCache) RetainEvery(retentionWaitTime time.Duration)
func (c *LockFreeJobCache) Set(j *Job) error
type MemoryDB struct {
// contains filtered or unexported fields
}
func NewMemoryJobCache(jobDB JobDB) *MemoryJobCache
func (c *MemoryJobCache) Delete(id string) error
func (c *MemoryJobCache) Disable(j *Job) error
Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
func (c *MemoryJobCache) Enable(j *Job) error
func (c *MemoryJobCache) Get(id string) (*Job, error)
func (c *MemoryJobCache) GetAll() *JobsMap
func (c *MemoryJobCache) Persist() error
func (c *MemoryJobCache) PersistEvery(persistWaitTime time.Duration)
func (c *MemoryJobCache) Set(j *Job) error
func (c *MemoryJobCache) Start(persistWaitTime time.Duration)
type Metadata struct { SuccessCount uint `json:"success_count"` LastSuccess time.Time `json:"last_success"` ErrorCount uint `json:"error_count"` LastError time.Time `json:"last_error"` LastAttemptedRun time.Time `json:"last_attempted_run"` NumberOfFinishedRuns uint `json:"number_of_finished_runs"` }
type MockDB struct{}
func (d *MockDBGetAll) GetAll() ([]*Job, error)
Type alias for the recursive call
type RemoteProperties struct { Url string `json:"url"` Method string `json:"method"` // A body to attach to the http request Body string `json:"body"` // A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}]) Headers http.Header `json:"headers"` // A timeout property for the http request in seconds Timeout int `json:"timeout"` // A list of expected response codes (e.g. [200, 201]) ExpectedResponseCodes []int `json:"expected_response_codes"` }
RemoteProperties Custom properties for the remote job type
Path | Synopsis |
---|---|
storage/boltdb | |
storage/consul | |
storage/mongo | |
storage/mysql | |
storage/postgres | |
storage/redis |
Package job imports 24 packages (graph) and is imported by 29 packages. Updated 2020-07-09. Refresh now. Tools for package owners.