Documentation ¶
Index ¶
- Constants
- Variables
- func AuthHandler(next http.Handler) http.Handler
- func DeleteAll(cache JobCache) error
- func GetJobToken(ctx context.Context) (string, error)
- func InitAuth()
- func InitMailer()
- func Notify(toAddress string, subject string, message string) error
- func NotifyOfJobFailure(j *Job, run *JobStat) error
- func SetupAuth(issuer string, audience string, clientId string, clientSecret string, ...)
- type Clock
- type Clocker
- type ErrJobNotFound
- type Job
- func GetMockFailingJob() *Job
- func GetMockJob() *Job
- func GetMockJobWithGenericSchedule(now time.Time) *Job
- func GetMockJobWithSchedule(repeat int, scheduleTime time.Time, delay string) *Job
- func GetMockRecurringJobWithSchedule(scheduleTime time.Time, delay string) *Job
- func GetMockRemoteJob(props RemoteProperties) *Job
- func NewFromBytes(b []byte) (*Job, error)
- func (j Job) Bytes() ([]byte, error)
- func (j *Job) Delete(cache JobCache) error
- func (j *Job) DeleteFromDependentJobs(cache JobCache) error
- func (j *Job) DeleteFromParentJobs(cache JobCache) error
- func (j *Job) Disable(cache JobCache) error
- func (j *Job) Enable(cache JobCache) error
- func (j *Job) GetWaitDuration() time.Duration
- func (j *Job) Init(cache JobCache) error
- func (j *Job) InitDelayDuration(checkTime bool) error
- func (j *Job) MarshalJSON() ([]byte, error)
- func (j *Job) Now() time.Time
- func (j *Job) ResponseTimeout() time.Duration
- func (j *Job) Run(cache JobCache)
- func (j *Job) RunCmd() (string, error)
- func (j *Job) RunOnFailureJob(cache JobCache)
- func (j *Job) SetClock(clk clock.Clock)
- func (j *Job) SetHeaders(req *http.Request, token string)
- func (j *Job) ShouldStartWaiting() bool
- func (j *Job) StartWaiting(cache JobCache, justRan bool)
- func (j *Job) StopTimer()
- func (j *Job) TryTemplatize(content string) (string, error)
- type JobCache
- type JobDB
- type JobRunner
- type JobStat
- type JobStatus
- type JobsMap
- type KalaStats
- type LockFreeJobCache
- func (c *LockFreeJobCache) ClearExpiredRuns() error
- func (c *LockFreeJobCache) Delete(id string) error
- func (c *LockFreeJobCache) DeleteRun(runId string) error
- func (c *LockFreeJobCache) Disable(j *Job) error
- func (c *LockFreeJobCache) Enable(j *Job) error
- func (c *LockFreeJobCache) Get(id string) (*Job, error)
- func (c *LockFreeJobCache) GetAll() *JobsMap
- func (c *LockFreeJobCache) GetAllRuns(jobID string) ([]*JobStat, error)
- func (c *LockFreeJobCache) GetRun(runID string) (*JobStat, error)
- func (c *LockFreeJobCache) Persist() error
- func (c *LockFreeJobCache) SaveRun(run *JobStat) error
- func (c *LockFreeJobCache) Set(j *Job) error
- func (c *LockFreeJobCache) Start(jobstatTtl time.Duration)
- func (c *LockFreeJobCache) UpdateRun(run *JobStat) error
- type Mailer
- type MemoryDB
- func (m *MemoryDB) ClearExpiredRuns() error
- func (m *MemoryDB) Close() error
- func (m *MemoryDB) Delete(id string) error
- func (m *MemoryDB) DeleteRun(id string) error
- func (m *MemoryDB) Get(id string) (*Job, error)
- func (m *MemoryDB) GetAll() (ret []*Job, _ error)
- func (m *MemoryDB) GetAllRuns(jobID string) (ret []*JobStat, _ error)
- func (m *MemoryDB) GetRun(runID string) (ret *JobStat, _ error)
- func (m *MemoryDB) Save(j *Job) error
- func (m *MemoryDB) SaveRun(run *JobStat) error
- func (m *MemoryDB) UpdateRun(jobStat *JobStat) error
- type MemoryJobCache
- func (c *MemoryJobCache) ClearExpiredRuns() error
- func (c *MemoryJobCache) Delete(id string) error
- func (c *MemoryJobCache) DeleteRun(runId string) error
- func (c *MemoryJobCache) Disable(j *Job) error
- func (c *MemoryJobCache) Enable(j *Job) error
- func (c *MemoryJobCache) Get(id string) (*Job, error)
- func (c *MemoryJobCache) GetAll() *JobsMap
- func (c *MemoryJobCache) GetAllRuns(jobID string) ([]*JobStat, error)
- func (c *MemoryJobCache) GetRun(runID string) (*JobStat, error)
- func (c *MemoryJobCache) Persist() error
- func (c *MemoryJobCache) SaveRun(run *JobStat) error
- func (c *MemoryJobCache) Set(j *Job) error
- func (c *MemoryJobCache) Start()
- func (c *MemoryJobCache) UpdateRun(run *JobStat) error
- type Metadata
- type MockDB
- func (m *MockDB) ClearExpiredRuns() error
- func (m *MockDB) Close() error
- func (m *MockDB) Delete(id string) error
- func (m *MockDB) DeleteRun(jobId string) error
- func (m *MockDB) Get(id string) (*Job, error)
- func (m *MockDB) GetAll() ([]*Job, error)
- func (m *MockDB) GetAllRuns(jobID string) ([]*JobStat, error)
- func (m *MockDB) GetRun(runID string) (*JobStat, error)
- func (m *MockDB) Save(job *Job) error
- func (m *MockDB) SaveRun(jobStat *JobStat) error
- func (m *MockDB) UpdateRun(jobStat *JobStat) error
- type MockDBGetAll
- type RJob
- type RemoteProperties
Constants ¶
const ( LocalJob jobType = iota RemoteJob )
const (
AccessTokenKey = contextKey("Access Token Key")
)
Variables ¶
var ( RFC3339WithoutTimezone = "2006-01-02T15:04:05" ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field") ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") )
var ( ErrJobDisabled = errors.New("Job cannot run, as it is disabled") ErrCmdIsEmpty = errors.New("Job Command is empty.") ErrJobTypeInvalid = errors.New("Job Type is not valid.") ErrInvalidDelimiters = errors.New("Job has invalid templating delimiters.") )
var (
ErrJobDoesntExist = errors.New("The job you requested does not exist")
)
var (
Oauth2Config *oauth2.Config = nil
)
var ( Status = &jobStatus{ Started: JobStatus("Started"), Running: JobStatus("Running"), Failed: JobStatus("Failed"), Success: JobStatus("Success"), } )
Functions ¶
func InitMailer ¶
func InitMailer()
func NotifyOfJobFailure ¶
Types ¶
type ErrJobNotFound ¶
type ErrJobNotFound string
ErrJobNotFound is raised when a Job is unable to be found within a database.
func (ErrJobNotFound) Error ¶
func (id ErrJobNotFound) Error() string
type Job ¶
type Job struct { Name string `json:"name"` Id string `json:"id"` // Command to run // e.g. "bash /path/to/my/script.sh" Command string `json:"command"` // Email of the owner of this job // e.g. "admin@example.com" Owner string `json:"owner"` // Is this job disabled? Disabled bool `json:"disabled"` // Jobs that are dependent upon this one will be run after this job runs. DependentJobs []string `json:"dependent_jobs"` // List of ids of jobs that this job is dependent upon. ParentJobs []string `json:"parent_jobs"` // Job that gets run after all retries have failed consecutively OnFailureJob string `json:"on_failure_job"` // ISO 8601 String // e.g. "R/2014-03-08T20:00:00.000Z/PT2H" Schedule string `json:"schedule"` // Number of times to retry on failed attempt for each run. Retries uint `json:"retries"` // Duration in which it is safe to retry the Job. Epsilon string `json:"epsilon"` NextRunAt time.Time `json:"next_run_at"` // Templating delimiters, the left & right separated by space, // for example `{{ }}` or `${ }`. // // If this field is non-empty, then each time this // job is executed, Kala will template its main // content as a Go Template with the job itself as data. // // The Command is templated for local jobs, // and Url and Body in RemoteProperties. TemplateDelimiters string // If the job is disabled (or the system inoperative) and we pass // the scheduled run point, when the job becomes active again, // normally the job will run immediately. // With this setting on, it will not run immediately, but will wait // until the next scheduled run time comes along. ResumeAtNextScheduledTime bool `json:"resume_at_next_scheduled_time"` // Meta data about successful and failed runs. Metadata Metadata `json:"metadata"` // Type of the job JobType jobType `json:"type"` // Custom properties for the remote job type RemoteProperties RemoteProperties `json:"remote_properties"` // Says if a job has been executed right numbers of time // and should not been executed again in the future IsDone bool `json:"is_done"` // contains filtered or unexported fields }
func GetMockFailingJob ¶
func GetMockFailingJob() *Job
func GetMockJob ¶
func GetMockJob() *Job
func GetMockJobWithSchedule ¶
func GetMockRemoteJob ¶
func GetMockRemoteJob(props RemoteProperties) *Job
func NewFromBytes ¶
NewFromBytes returns a Job instance from a byte representation.
func (*Job) DeleteFromDependentJobs ¶
DeleteFromDependentJobs
func (*Job) DeleteFromParentJobs ¶
DeleteFromParentJobs goes through and deletes the current job from any parent jobs.
func (*Job) Disable ¶
Disable stops the job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
func (*Job) GetWaitDuration ¶
func (*Job) Init ¶
Init fills in the protected fields and parses the iso8601 notation. It also adds the job to the Cache
func (*Job) InitDelayDuration ¶
InitDelayDuration is used to parsed the iso8601 Schedule notation into its relevant fields in the Job struct. If checkTime is true, then it will return an error if the Scheduled time has passed.
func (*Job) MarshalJSON ¶
need this to fix race condition
func (*Job) ResponseTimeout ¶
ResponseTimeout sets a default timeout if none specified
func (*Job) RunOnFailureJob ¶
Runs the on failure job, if it exists. Does not lock the parent job - it is up to you to do this however you want
func (*Job) SetHeaders ¶
SetHeaders sets default and user specific headers to the http request
func (*Job) ShouldStartWaiting ¶
func (*Job) StartWaiting ¶
StartWaiting begins a timer for when it should execute the Jobs .Run() method.
type JobCache ¶
type JobCache interface { Get(id string) (*Job, error) GetAll() *JobsMap Set(j *Job) error Delete(id string) error Persist() error Enable(j *Job) error Disable(j *Job) error SaveRun(run *JobStat) error UpdateRun(run *JobStat) error DeleteRun(runId string) error GetAllRuns(jobID string) ([]*JobStat, error) GetRun(runID string) (*JobStat, error) ClearExpiredRuns() error }
type JobDB ¶
type JobDB interface { GetAll() ([]*Job, error) Get(id string) (*Job, error) Delete(id string) error Save(job *Job) error Close() error SaveRun(*JobStat) error UpdateRun(*JobStat) error GetAllRuns(jobID string) ([]*JobStat, error) GetRun(runID string) (*JobStat, error) DeleteRun(jobId string) error ClearExpiredRuns() error }
type JobRunner ¶
type JobRunner struct {
// contains filtered or unexported fields
}
type JobStat ¶
type JobStat struct { Id string `json:"id"` JobId string `json:"job_id"` RanAt time.Time `json:"ran_at"` NumberOfRetries uint `json:"number_of_retries"` Status JobStatus `json:"status"` ExecutionDuration time.Duration `json:"execution_duration"` Output string `json:"output"` }
JobStat is used to store metrics about a specific Job .Run()
func NewJobStat ¶
type JobsMap ¶
func NewJobsMap ¶
func NewJobsMap() *JobsMap
type KalaStats ¶
type KalaStats struct { ActiveJobs int `json:"active_jobs"` DisabledJobs int `json:"disabled_jobs"` Jobs int `json:"jobs"` ErrorCount uint `json:"error_count"` SuccessCount uint `json:"success_count"` NextRunAt time.Time `json:"next_run_at"` LastAttemptedRun time.Time `json:"last_attempted_run"` CreatedAt time.Time `json:"created"` }
KalaStats is the struct for storing app-level metrics
func NewKalaStats ¶
NewKalaStats is used to easily generate a current app-level metrics report.
type LockFreeJobCache ¶
type LockFreeJobCache struct { Clock // contains filtered or unexported fields }
func NewLockFreeJobCache ¶
func NewLockFreeJobCache(jobDB JobDB) *LockFreeJobCache
func NewMockCache ¶
func NewMockCache() *LockFreeJobCache
func (*LockFreeJobCache) ClearExpiredRuns ¶
func (c *LockFreeJobCache) ClearExpiredRuns() error
func (*LockFreeJobCache) Delete ¶
func (c *LockFreeJobCache) Delete(id string) error
func (*LockFreeJobCache) DeleteRun ¶
func (c *LockFreeJobCache) DeleteRun(runId string) error
func (*LockFreeJobCache) Disable ¶
func (c *LockFreeJobCache) Disable(j *Job) error
Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
func (*LockFreeJobCache) Enable ¶
func (c *LockFreeJobCache) Enable(j *Job) error
func (*LockFreeJobCache) GetAll ¶
func (c *LockFreeJobCache) GetAll() *JobsMap
func (*LockFreeJobCache) GetAllRuns ¶
func (c *LockFreeJobCache) GetAllRuns(jobID string) ([]*JobStat, error)
func (*LockFreeJobCache) Persist ¶
func (c *LockFreeJobCache) Persist() error
func (*LockFreeJobCache) SaveRun ¶
func (c *LockFreeJobCache) SaveRun(run *JobStat) error
func (*LockFreeJobCache) Set ¶
func (c *LockFreeJobCache) Set(j *Job) error
func (*LockFreeJobCache) Start ¶
func (c *LockFreeJobCache) Start(jobstatTtl time.Duration)
func (*LockFreeJobCache) UpdateRun ¶
func (c *LockFreeJobCache) UpdateRun(run *JobStat) error
type MemoryDB ¶
type MemoryDB struct {
// contains filtered or unexported fields
}
func NewMemoryDB ¶
func NewMemoryDB() *MemoryDB
func (*MemoryDB) ClearExpiredRuns ¶
type MemoryJobCache ¶
type MemoryJobCache struct {
// contains filtered or unexported fields
}
func NewMemoryJobCache ¶
func NewMemoryJobCache(jobDB JobDB) *MemoryJobCache
func (*MemoryJobCache) ClearExpiredRuns ¶
func (c *MemoryJobCache) ClearExpiredRuns() error
func (*MemoryJobCache) Delete ¶
func (c *MemoryJobCache) Delete(id string) error
func (*MemoryJobCache) DeleteRun ¶
func (c *MemoryJobCache) DeleteRun(runId string) error
func (*MemoryJobCache) Disable ¶
func (c *MemoryJobCache) Disable(j *Job) error
Disable stops a job from running by stopping its jobTimer. It also sets Job.Disabled to true, which is reflected in the UI.
func (*MemoryJobCache) Enable ¶
func (c *MemoryJobCache) Enable(j *Job) error
func (*MemoryJobCache) GetAll ¶
func (c *MemoryJobCache) GetAll() *JobsMap
func (*MemoryJobCache) GetAllRuns ¶
func (c *MemoryJobCache) GetAllRuns(jobID string) ([]*JobStat, error)
func (*MemoryJobCache) Persist ¶
func (c *MemoryJobCache) Persist() error
func (*MemoryJobCache) SaveRun ¶
func (c *MemoryJobCache) SaveRun(run *JobStat) error
func (*MemoryJobCache) Set ¶
func (c *MemoryJobCache) Set(j *Job) error
func (*MemoryJobCache) Start ¶
func (c *MemoryJobCache) Start()
func (*MemoryJobCache) UpdateRun ¶
func (c *MemoryJobCache) UpdateRun(run *JobStat) error
type Metadata ¶
type Metadata struct { SuccessCount uint `json:"success_count"` LastSuccess time.Time `json:"last_success"` ErrorCount uint `json:"error_count"` LastError time.Time `json:"last_error"` LastAttemptedRun time.Time `json:"last_attempted_run"` NumberOfFinishedRuns uint `json:"number_of_finished_runs"` }
type MockDB ¶
func (*MockDB) ClearExpiredRuns ¶
type MockDBGetAll ¶
type MockDBGetAll struct { MockDB // contains filtered or unexported fields }
func (*MockDBGetAll) GetAll ¶
func (d *MockDBGetAll) GetAll() ([]*Job, error)
type RemoteProperties ¶
type RemoteProperties struct { Url string `json:"url"` Method string `json:"method"` // A body to attach to the http request Body string `json:"body"` // A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}]) Headers http.Header `json:"headers"` // A timeout property for the http request in seconds Timeout int `json:"timeout"` // A list of expected response codes (e.g. [200, 201]) ExpectedResponseCodes []int `json:"expected_response_codes"` }
RemoteProperties Custom properties for the remote job type