kala: github.com/ajvb/kala/job Index | Files | Directories

package job

import "github.com/ajvb/kala/job"

Index

Package Files

cache.go cache_util.go clock.go db.go job.go runner.go stats.go test_utils.go

Constants

const (
    LocalJob jobType = iota
    RemoteJob
)

Variables

var (
    RFC3339WithoutTimezone = "2006-01-02T15:04:05"

    ErrInvalidJob       = errors.New("Invalid Local Job. Job's must contain a Name and a Command field")
    ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field")
    ErrInvalidJobType   = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote")
)
var (
    ErrJobDisabled       = errors.New("Job cannot run, as it is disabled")
    ErrCmdIsEmpty        = errors.New("Job Command is empty.")
    ErrJobTypeInvalid    = errors.New("Job Type is not valid.")
    ErrInvalidDelimiters = errors.New("Job has invalid templating delimiters.")
)
var (
    ErrJobDoesntExist = errors.New("The job you requested does not exist")
)

func DeleteAll Uses

func DeleteAll(cache JobCache) error

type Clock Uses

type Clock struct {
    clock.Clock
    // contains filtered or unexported fields
}

func (*Clock) SetClock Uses

func (clk *Clock) SetClock(in clock.Clock)

func (*Clock) Time Uses

func (clk *Clock) Time() clock.Clock

func (*Clock) TimeSet Uses

func (clk *Clock) TimeSet() (result bool)

type Clocker Uses

type Clocker interface {
    Time() clock.Clock
    TimeSet() bool
}

type ErrJobNotFound Uses

type ErrJobNotFound string

ErrJobNotFound is raised when a Job is unable to be found within a database.

func (ErrJobNotFound) Error Uses

func (id ErrJobNotFound) Error() string

type Job Uses

type Job struct {
    Name string `json:"name"`
    Id   string `json:"id"`

    // Command to run
    // e.g. "bash /path/to/my/script.sh"
    Command string `json:"command"`

    // Email of the owner of this job
    // e.g. "admin@example.com"
    Owner string `json:"owner"`

    // Is this job disabled?
    Disabled bool `json:"disabled"`

    // Jobs that are dependent upon this one will be run after this job runs.
    DependentJobs []string `json:"dependent_jobs"`

    // List of ids of jobs that this job is dependent upon.
    ParentJobs []string `json:"parent_jobs"`

    // Job that gets run after all retries have failed consecutively
    OnFailureJob string `json:"on_failure_job"`

    // ISO 8601 String
    // e.g. "R/2014-03-08T20:00:00.000Z/PT2H"
    Schedule string `json:"schedule"`

    // Number of times to retry on failed attempt for each run.
    Retries uint `json:"retries"`

    // Duration in which it is safe to retry the Job.
    Epsilon string `json:"epsilon"`

    NextRunAt time.Time `json:"next_run_at"`

    // Templating delimiters, the left & right separated by space,
    // for example `{{ }}` or `${ }`.
    //
    // If this field is non-empty, then each time this
    // job is executed, Kala will template its main
    // content as a Go Template with the job itself as data.
    //
    // The Command is templated for local jobs,
    // and Url and Body in RemoteProperties.
    TemplateDelimiters string

    // If the job is disabled (or the system inoperative) and we pass
    // the scheduled run point, when the job becomes active again,
    // normally the job will run immediately.
    // With this setting on, it will not run immediately, but will wait
    // until the next scheduled run time comes along.
    ResumeAtNextScheduledTime bool `json:"resume_at_next_scheduled_time"`

    // Meta data about successful and failed runs.
    Metadata Metadata `json:"metadata"`

    // Type of the job
    JobType jobType `json:"type"`

    // Custom properties for the remote job type
    RemoteProperties RemoteProperties `json:"remote_properties"`

    // Collection of Job Stats
    Stats []*JobStat `json:"stats"`

    // Says if a job has been executed right numbers of time
    // and should not been executed again in the future
    IsDone bool `json:"is_done"`
    // contains filtered or unexported fields
}

func GetMockFailingJob Uses

func GetMockFailingJob() *Job

func GetMockJob Uses

func GetMockJob() *Job

func GetMockJobWithGenericSchedule Uses

func GetMockJobWithGenericSchedule(now time.Time) *Job

func GetMockJobWithSchedule Uses

func GetMockJobWithSchedule(repeat int, scheduleTime time.Time, delay string) *Job

func GetMockRecurringJobWithSchedule Uses

func GetMockRecurringJobWithSchedule(scheduleTime time.Time, delay string) *Job

func GetMockRemoteJob Uses

func GetMockRemoteJob(props RemoteProperties) *Job

func NewFromBytes Uses

func NewFromBytes(b []byte) (*Job, error)

NewFromBytes returns a Job instance from a byte representation.

func (Job) Bytes Uses

func (j Job) Bytes() ([]byte, error)

Bytes returns the byte representation of the Job.

func (*Job) Delete Uses

func (j *Job) Delete(cache JobCache) error

func (*Job) DeleteFromDependentJobs Uses

func (j *Job) DeleteFromDependentJobs(cache JobCache) error

DeleteFromDependentJobs

func (*Job) DeleteFromParentJobs Uses

func (j *Job) DeleteFromParentJobs(cache JobCache) error

DeleteFromParentJobs goes through and deletes the current job from any parent jobs.

func (*Job) Disable Uses

func (j *Job) Disable(cache JobCache) error

Disable stops the job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*Job) Enable Uses

func (j *Job) Enable(cache JobCache) error

func (*Job) GetWaitDuration Uses

func (j *Job) GetWaitDuration() time.Duration

func (*Job) Init Uses

func (j *Job) Init(cache JobCache) error

Init fills in the protected fields and parses the iso8601 notation. It also adds the job to the Cache

func (*Job) InitDelayDuration Uses

func (j *Job) InitDelayDuration(checkTime bool) error

InitDelayDuration is used to parsed the iso8601 Schedule notation into its relevant fields in the Job struct. If checkTime is true, then it will return an error if the Scheduled time has passed.

func (*Job) MarshalJSON Uses

func (j *Job) MarshalJSON() ([]byte, error)

need this to fix race condition

func (*Job) Run Uses

func (j *Job) Run(cache JobCache)

func (*Job) RunCmd Uses

func (j *Job) RunCmd() (string, error)

func (*Job) RunOnFailureJob Uses

func (j *Job) RunOnFailureJob(cache JobCache)

Runs the on failure job, if it exists. Does not lock the parent job - it is up to you to do this however you want

func (*Job) SetClock Uses

func (j *Job) SetClock(clk clock.Clock)

func (*Job) ShouldStartWaiting Uses

func (j *Job) ShouldStartWaiting() bool

func (*Job) StartWaiting Uses

func (j *Job) StartWaiting(cache JobCache, justRan bool)

StartWaiting begins a timer for when it should execute the Jobs .Run() method.

func (*Job) StopTimer Uses

func (j *Job) StopTimer()

type JobCache Uses

type JobCache interface {
    Get(id string) (*Job, error)
    GetAll() *JobsMap
    Set(j *Job) error
    Delete(id string) error
    Persist() error
    Enable(j *Job) error
    Disable(j *Job) error
}

type JobDB Uses

type JobDB interface {
    GetAll() ([]*Job, error)
    Get(id string) (*Job, error)
    Delete(id string) error
    Save(job *Job) error
    Close() error
}

type JobRunner Uses

type JobRunner struct {
    // contains filtered or unexported fields
}

func (*JobRunner) LocalRun Uses

func (j *JobRunner) LocalRun() (string, error)

LocalRun executes the Job's local shell command

func (*JobRunner) RemoteRun Uses

func (j *JobRunner) RemoteRun() (string, error)

RemoteRun sends a http request, and checks if the response is valid in time,

func (*JobRunner) Run Uses

func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error)

Run calls the appropriate run function, collects metadata around the success or failure of the Job's execution, and schedules the next run.

type JobStat Uses

type JobStat struct {
    JobId             string        `json:"job_id"`
    RanAt             time.Time     `json:"ran_at"`
    NumberOfRetries   uint          `json:"number_of_retries"`
    Success           bool          `json:"success"`
    ExecutionDuration time.Duration `json:"execution_duration"`
}

JobStat is used to store metrics about a specific Job .Run()

func GetMockJobStats Uses

func GetMockJobStats(oldDate time.Time, count int) []*JobStat

func NewJobStat Uses

func NewJobStat(id string) *JobStat

type JobsMap Uses

type JobsMap struct {
    Jobs map[string]*Job
    Lock sync.RWMutex
}

func NewJobsMap Uses

func NewJobsMap() *JobsMap

type KalaStats Uses

type KalaStats struct {
    ActiveJobs   int `json:"active_jobs"`
    DisabledJobs int `json:"disabled_jobs"`
    Jobs         int `json:"jobs"`

    ErrorCount   uint `json:"error_count"`
    SuccessCount uint `json:"success_count"`

    NextRunAt        time.Time `json:"next_run_at"`
    LastAttemptedRun time.Time `json:"last_attempted_run"`

    CreatedAt time.Time `json:"created"`
}

KalaStats is the struct for storing app-level metrics

func NewKalaStats Uses

func NewKalaStats(cache JobCache) *KalaStats

NewKalaStats is used to easily generate a current app-level metrics report.

type LockFreeJobCache Uses

type LockFreeJobCache struct {
    PersistOnWrite bool
    Clock
    // contains filtered or unexported fields
}

func NewLockFreeJobCache Uses

func NewLockFreeJobCache(jobDB JobDB) *LockFreeJobCache

func NewMockCache Uses

func NewMockCache() *LockFreeJobCache

func (*LockFreeJobCache) Delete Uses

func (c *LockFreeJobCache) Delete(id string) error

func (*LockFreeJobCache) Disable Uses

func (c *LockFreeJobCache) Disable(j *Job) error

Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*LockFreeJobCache) Enable Uses

func (c *LockFreeJobCache) Enable(j *Job) error

func (*LockFreeJobCache) Get Uses

func (c *LockFreeJobCache) Get(id string) (*Job, error)

func (*LockFreeJobCache) GetAll Uses

func (c *LockFreeJobCache) GetAll() *JobsMap

func (*LockFreeJobCache) Persist Uses

func (c *LockFreeJobCache) Persist() error

func (*LockFreeJobCache) PersistEvery Uses

func (c *LockFreeJobCache) PersistEvery(persistWaitTime time.Duration)

func (*LockFreeJobCache) Retain Uses

func (c *LockFreeJobCache) Retain() error

func (*LockFreeJobCache) RetainEvery Uses

func (c *LockFreeJobCache) RetainEvery(retentionWaitTime time.Duration)

func (*LockFreeJobCache) Set Uses

func (c *LockFreeJobCache) Set(j *Job) error

func (*LockFreeJobCache) Start Uses

func (c *LockFreeJobCache) Start(persistWaitTime time.Duration, jobstatTtl time.Duration)

type MemoryDB Uses

type MemoryDB struct {
    // contains filtered or unexported fields
}

func NewMemoryDB Uses

func NewMemoryDB() *MemoryDB

func (*MemoryDB) Close Uses

func (m *MemoryDB) Close() error

func (*MemoryDB) Delete Uses

func (m *MemoryDB) Delete(id string) error

func (*MemoryDB) Get Uses

func (m *MemoryDB) Get(id string) (*Job, error)

func (*MemoryDB) GetAll Uses

func (m *MemoryDB) GetAll() (ret []*Job, _ error)

func (*MemoryDB) Save Uses

func (m *MemoryDB) Save(j *Job) error

type MemoryJobCache Uses

type MemoryJobCache struct {
    PersistOnWrite bool
    // contains filtered or unexported fields
}

func NewMemoryJobCache Uses

func NewMemoryJobCache(jobDB JobDB) *MemoryJobCache

func (*MemoryJobCache) Delete Uses

func (c *MemoryJobCache) Delete(id string) error

func (*MemoryJobCache) Disable Uses

func (c *MemoryJobCache) Disable(j *Job) error

Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.

func (*MemoryJobCache) Enable Uses

func (c *MemoryJobCache) Enable(j *Job) error

func (*MemoryJobCache) Get Uses

func (c *MemoryJobCache) Get(id string) (*Job, error)

func (*MemoryJobCache) GetAll Uses

func (c *MemoryJobCache) GetAll() *JobsMap

func (*MemoryJobCache) Persist Uses

func (c *MemoryJobCache) Persist() error

func (*MemoryJobCache) PersistEvery Uses

func (c *MemoryJobCache) PersistEvery(persistWaitTime time.Duration)

func (*MemoryJobCache) Set Uses

func (c *MemoryJobCache) Set(j *Job) error

func (*MemoryJobCache) Start Uses

func (c *MemoryJobCache) Start(persistWaitTime time.Duration)

type Metadata Uses

type Metadata struct {
    SuccessCount         uint      `json:"success_count"`
    LastSuccess          time.Time `json:"last_success"`
    ErrorCount           uint      `json:"error_count"`
    LastError            time.Time `json:"last_error"`
    LastAttemptedRun     time.Time `json:"last_attempted_run"`
    NumberOfFinishedRuns uint      `json:"number_of_finished_runs"`
}

type MockDB Uses

type MockDB struct{}

func (*MockDB) Close Uses

func (m *MockDB) Close() error

func (*MockDB) Delete Uses

func (m *MockDB) Delete(id string) error

func (*MockDB) Get Uses

func (m *MockDB) Get(id string) (*Job, error)

func (*MockDB) GetAll Uses

func (m *MockDB) GetAll() ([]*Job, error)

func (*MockDB) Save Uses

func (m *MockDB) Save(job *Job) error

type MockDBGetAll Uses

type MockDBGetAll struct {
    MockDB
    // contains filtered or unexported fields
}

func (*MockDBGetAll) GetAll Uses

func (d *MockDBGetAll) GetAll() ([]*Job, error)

type RJob Uses

type RJob Job

Type alias for the recursive call

type RemoteProperties Uses

type RemoteProperties struct {
    Url    string `json:"url"`
    Method string `json:"method"`

    // A body to attach to the http request
    Body string `json:"body"`

    // A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}])
    Headers http.Header `json:"headers"`

    // A timeout property for the http request in seconds
    Timeout int `json:"timeout"`

    // A list of expected response codes (e.g. [200, 201])
    ExpectedResponseCodes []int `json:"expected_response_codes"`
}

RemoteProperties Custom properties for the remote job type

Directories

PathSynopsis
storage/boltdb
storage/consul
storage/mongo
storage/mysql
storage/postgres
storage/redis

Package job imports 24 packages (graph) and is imported by 29 packages. Updated 2020-07-09. Refresh now. Tools for package owners.