Documentation ¶
Overview ¶
Package tracker tracks status of all jobs, and handles persistence.
Concurrency properties:
- The job map is protected by a Mutex, but lock is only required to get a copy or set the Status value, so there is minimal contention.
- Status objects are persisted to a Saver by a separate goroutine that periodically updates any modified Status objects. The Status's updatetime is used to determine whether it needs to be saved.
Index ¶
- Variables
- func YesterdayDate() time.Time
- type Datasets
- type GenericSaver
- type Handler
- type Job
- func (j Job) HasFiles(ctx context.Context, sClient stiface.Client) (bool, error)
- func (j Job) IsDaily() string
- func (j Job) Key() Key
- func (j Job) Path() string
- func (j Job) Prefix() (string, error)
- func (j Job) PrefixStats(ctx context.Context, sClient stiface.Client) ([]*storage.ObjectAttrs, int64, error)
- func (j Job) String() string
- func (j *Job) TablePartition() string
- type JobMap
- type JobService
- type JobWithTarget
- type Key
- type State
- type StateInfo
- type Status
- func (s *Status) Detail() string
- func (s *Status) DetailTime() time.Time
- func (s *Status) Elapsed() time.Duration
- func (s *Status) Error() string
- func (s *Status) Label() string
- func (s *Status) LastStateInfo() StateInfo
- func (s *Status) NewState(state State) StateInfo
- func (s *Status) Prev() State
- func (s *Status) SetDetail(detail string) StateInfo
- func (s *Status) StartTime() time.Time
- func (s *Status) State() State
- func (s *Status) StateChangeTime() time.Time
- type Tracker
- func (tr *Tracker) AddJob(job Job) error
- func (tr *Tracker) GetState() (JobMap, Job, time.Time)
- func (tr *Tracker) GetStatus(key Key) (Status, error)
- func (tr *Tracker) Heartbeat(key Key) error
- func (tr *Tracker) NumFailed() int
- func (tr *Tracker) NumJobs() int
- func (tr *Tracker) SetDetail(key Key, detail string) error
- func (tr *Tracker) SetJobError(key Key, errString string) error
- func (tr *Tracker) SetStatus(key Key, state State, detail string) error
- func (tr *Tracker) Sync(ctx context.Context, lastSave time.Time) (time.Time, error)
- func (tr *Tracker) UpdateJob(key Key, new Status) error
- func (tr *Tracker) WriteHTMLStatusTo(ctx context.Context, w io.Writer) error
Constants ¶
This section is empty.
Variables ¶
var ( MsgNoJobFound = "No job found. Try again." MsgJobExists = "Job already exists. Try again." )
var ( ErrJobAlreadyExists = errors.New("job already exists") ErrJobNotFound = errors.New("job not found") ErrJobIsObsolete = errors.New("job is obsolete") ErrInvalidStateTransition = errors.New("invalid state transition") ErrNotYetImplemented = errors.New("not yet implemented") ErrNoChange = errors.New("no change since last save") )
Error declarations
Functions ¶
func YesterdayDate ¶
YesterdayDate returns the date for the daily job (e.g, yesterday UTC).
Types ¶
type Datasets ¶
Datasets contains the name of BigQuery datasets used for temporary, raw, and joined tables.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler provides handlers for update, heartbeat, etc.
func NewHandler ¶
func NewHandler(tr *Tracker, js JobService) *Handler
NewHandler returns a Handler that sends updates to provided Tracker.
type Job ¶
type Job struct { Bucket string Experiment string Datatype string Date time.Time // Filter is an optional regex to apply to ArchiveURL names // Note that HasFiles does not use this, so ETL may process no files. Filter string `json:",omitempty"` Datasets Datasets `json:",omitempty"` }
Job describes a reprocessing "Job", which includes all data for a particular experiment, type and date.
func (Job) IsDaily ¶
IsDaily returns a string representing whether the job is a Daily job (e.g., job.Date = yesterday).
func (Job) Key ¶
Key returns a Job unique identifier (within the set of all jobs), suitable for use as a map key.
func (Job) PrefixStats ¶
func (j Job) PrefixStats(ctx context.Context, sClient stiface.Client) ([]*storage.ObjectAttrs, int64, error)
PrefixStats queries storage and gets a list of all file objects.
func (*Job) TablePartition ¶
TablePartition returns the BigQuery table partition for this Job's Date.
type JobMap ¶
JobMap is defined to allow custom json marshal/unmarshal. It defines the map from Job to Status.
type JobService ¶
type JobService interface {
NextJob(ctx context.Context) *JobWithTarget
}
type JobWithTarget ¶
type JobWithTarget struct { ID Key // ID used by gardener & parsers to identify a Job's status and configuration. Job Job DailyOnly bool `json:"-"` FullHistory bool `json:"-"` }
JobWithTarget specifies a type/date job, and a destination table or GCS prefix
func (JobWithTarget) Marshal ¶
func (j JobWithTarget) Marshal() []byte
Marshal marshals the JobWithTarget to json. If the JobWithTarget type ever includes fields that cannot be marshalled, then Marshal will panic.
func (JobWithTarget) String ¶
func (j JobWithTarget) String() string
type Key ¶
type Key string
Key is a unique identifier for a single tracker Job. Key may be used as a map key.
type State ¶
type State string
State types are used for the Status.State values This is intended to enforce type safety, but compiler accepts string assignment. 8-(
const ( Init State = "init" Parsing State = "parsing" ParseError State = "parseError" ParseComplete State = "postProcessing" // Ready for post processing, but not started yet. Stabilizing State = "stabilizing" Loading State = "loading" Deduplicating State = "deduplicating" Copying State = "copying" Joining State = "joining" Deleting State = "deleting" Finishing State = "finishing" Failed State = "failed" Complete State = "complete" )
State values TODO - should we allow different states for different datatypes? In principle, a state could be an object that includes available transitions, in which case we would want each datatype to have potentially different transition details, e.g. the dedup query.
type StateInfo ¶
type StateInfo struct { State State // const after creation Start time.Time // const after creation DetailTime time.Time Detail string // status or error, e.g. last filename in Parsing state. }
StateInfo describes each state in processing history.
type Status ¶
type Status struct { HeartbeatTime time.Time // Time of last ETL heartbeat. UpdateCount int // Number of updates // History has shared backing store. Copy on write is used to avoid // changing the underlying StateInfo that is shared by the tracker // JobMap and accessed concurrently by other goroutines. History []StateInfo }
A Status describes the state of a bucket/exp/type/YYYY/MM/DD job. Completed jobs are removed from the persistent store. Errored jobs are maintained in the persistent store for debugging. Status should be updated only by the Tracker, which will ensure correct serialization and Saver updates.
func (*Status) Detail ¶
Detail returns the most recent detail string. If the detail is empty, returns the previous state detail.
func (*Status) DetailTime ¶
DetailTime returns the timestamp of the most recent detail update.
func (*Status) Label ¶
Label provides a state label for metrics. If the final state is Failed, it composes the previous and final state, e.g. Loading-Failed
func (*Status) LastStateInfo ¶
LastStateInfo returns copy of the StateInfo for the most recent state.
func (*Status) NewState ¶
NewState adds a new StateInfo to the status. If state is unchanged, it just logs a warning. Returns the previous StateInfo
func (*Status) SetDetail ¶
SetDetail replaces the most recent StateInfo with copy containing new detail. It returns the previous StateInfo value.
func (*Status) StateChangeTime ¶
StateChangeTime returns the start time of the current state.
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker keeps track of all the jobs in flight. Only tracker functions should access any of the fields.
func InitTracker ¶
func InitTracker( ctx context.Context, saverV2 GenericSaver, saveInterval time.Duration, expirationTime time.Duration, cleanupDelay time.Duration) (*Tracker, error)
InitTracker recovers the Tracker state from a Client object. May return error if recovery fails.
func (*Tracker) AddJob ¶
AddJob adds a new job to the Tracker. May return ErrJobAlreadyExists if job already exists and is still in flight.
func (*Tracker) GetState ¶
GetState returns the full job map, last initialized Job, and last mod time. It also cleans up any expired jobs from the tracker.
func (*Tracker) GetStatus ¶
GetStatus retrieves the status of an existing job. Note that the returned object is a shallow copy, and the History field shares the slice objects with the JobMap.
func (*Tracker) NumJobs ¶
NumJobs returns the number of jobs in flight. This includes jobs in "Complete" state that have not been removed from saver.
func (*Tracker) SetJobError ¶
SetJobError updates a job's error fields, and handles persistence.
func (*Tracker) SetStatus ¶
SetStatus updates a job's state in memory. It may or may not change the job state. If it does change state, the detail string is applied to the last state, not the new state.
func (*Tracker) Sync ¶
Sync snapshots the full job state and saves it to persistent storage IFF it has changed. Returns time last saved, which may or may not be updated.