tracker

package
v0.0.0-...-a6a2865 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: Apache-2.0 Imports: 20 Imported by: 1

README

Gardener Tracker

The Tracker keeps track of the state of all parsing activities, persists current state to storage, and recovers the system state from storage on startup or recovery.

The tracker is used by other components of Gardener to decide:

  1. what jobs to do next,
  2. when a job has failed and needs to be recovered,
  3. when postprocessing actions should be initiated.

The tracker provides an API to the other Gardener components to answer questions about the system state.

Documentation

Overview

Package tracker tracks status of all jobs, and handles persistence.

Concurrency properties:

  1. The job map is protected by a Mutex, but lock is only required to get a copy or set the Status value, so there is minimal contention.
  2. Status objects are persisted to a Saver by a separate goroutine that periodically updates any modified Status objects. The Status's updatetime is used to determine whether it needs to be saved.

Index

Constants

This section is empty.

Variables

View Source
var (
	MsgNoJobFound = "No job found. Try again."
	MsgJobExists  = "Job already exists. Try again."
)
View Source
var (
	ErrJobAlreadyExists       = errors.New("job already exists")
	ErrJobNotFound            = errors.New("job not found")
	ErrJobIsObsolete          = errors.New("job is obsolete")
	ErrInvalidStateTransition = errors.New("invalid state transition")
	ErrNotYetImplemented      = errors.New("not yet implemented")
	ErrNoChange               = errors.New("no change since last save")
)

Error declarations

Functions

func YesterdayDate

func YesterdayDate() time.Time

YesterdayDate returns the date for the daily job (e.g, yesterday UTC).

Types

type Datasets

type Datasets struct {
	Tmp  string
	Raw  string
	Join string
}

Datasets contains the name of BigQuery datasets used for temporary, raw, and joined tables.

type GenericSaver

type GenericSaver interface {
	Save(src any) error
	Load(dst any) error
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler provides handlers for update, heartbeat, etc.

func NewHandler

func NewHandler(tr *Tracker, js JobService) *Handler

NewHandler returns a Handler that sends updates to provided Tracker.

func (*Handler) Register

func (h *Handler) Register(mux *http.ServeMux)

Register registers the handlers on the server.

type Job

type Job struct {
	Bucket     string
	Experiment string
	Datatype   string
	Date       time.Time
	// Filter is an optional regex to apply to ArchiveURL names
	// Note that HasFiles does not use this, so ETL may process no files.
	Filter   string   `json:",omitempty"`
	Datasets Datasets `json:",omitempty"`
}

Job describes a reprocessing "Job", which includes all data for a particular experiment, type and date.

func (Job) HasFiles

func (j Job) HasFiles(ctx context.Context, sClient stiface.Client) (bool, error)

HasFiles queries storage and gets a list of all file objects.

func (Job) IsDaily

func (j Job) IsDaily() string

IsDaily returns a string representing whether the job is a Daily job (e.g., job.Date = yesterday).

func (Job) Key

func (j Job) Key() Key

Key returns a Job unique identifier (within the set of all jobs), suitable for use as a map key.

func (Job) Path

func (j Job) Path() string

Path returns the GCS path prefix to the job data.

func (Job) Prefix

func (j Job) Prefix() (string, error)

Prefix returns the path prefix for a job, not including the gs://bucket-name/.

func (Job) PrefixStats

func (j Job) PrefixStats(ctx context.Context, sClient stiface.Client) ([]*storage.ObjectAttrs, int64, error)

PrefixStats queries storage and gets a list of all file objects.

func (Job) String

func (j Job) String() string

func (*Job) TablePartition

func (j *Job) TablePartition() string

TablePartition returns the BigQuery table partition for this Job's Date.

type JobMap

type JobMap map[Job]Status

JobMap is defined to allow custom json marshal/unmarshal. It defines the map from Job to Status.

func (JobMap) WriteHTML

func (jobs JobMap) WriteHTML(w io.Writer) error

WriteHTML writes a table containing the jobs and status.

type JobService

type JobService interface {
	NextJob(ctx context.Context) *JobWithTarget
}

type JobWithTarget

type JobWithTarget struct {
	ID          Key // ID used by gardener & parsers to identify a Job's status and configuration.
	Job         Job
	DailyOnly   bool `json:"-"`
	FullHistory bool `json:"-"`
}

JobWithTarget specifies a type/date job, and a destination table or GCS prefix

func (JobWithTarget) Marshal

func (j JobWithTarget) Marshal() []byte

Marshal marshals the JobWithTarget to json. If the JobWithTarget type ever includes fields that cannot be marshalled, then Marshal will panic.

func (JobWithTarget) String

func (j JobWithTarget) String() string

type Key

type Key string

Key is a unique identifier for a single tracker Job. Key may be used as a map key.

type State

type State string

State types are used for the Status.State values This is intended to enforce type safety, but compiler accepts string assignment. 8-(

const (
	Init          State = "init"
	Parsing       State = "parsing"
	ParseError    State = "parseError"
	ParseComplete State = "postProcessing" // Ready for post processing, but not started yet.
	Stabilizing   State = "stabilizing"
	Loading       State = "loading"
	Deduplicating State = "deduplicating"
	Copying       State = "copying"
	Joining       State = "joining"
	Deleting      State = "deleting"
	Finishing     State = "finishing"
	Failed        State = "failed"
	Complete      State = "complete"
)

State values TODO - should we allow different states for different datatypes? In principle, a state could be an object that includes available transitions, in which case we would want each datatype to have potentially different transition details, e.g. the dedup query.

type StateInfo

type StateInfo struct {
	State      State     // const after creation
	Start      time.Time // const after creation
	DetailTime time.Time
	Detail     string // status or error, e.g. last filename in Parsing state.
}

StateInfo describes each state in processing history.

type Status

type Status struct {
	HeartbeatTime time.Time // Time of last ETL heartbeat.

	UpdateCount int // Number of updates

	// History has shared backing store.  Copy on write is used to avoid
	// changing the underlying StateInfo that is shared by the tracker
	// JobMap and accessed concurrently by other goroutines.
	History []StateInfo
}

A Status describes the state of a bucket/exp/type/YYYY/MM/DD job. Completed jobs are removed from the persistent store. Errored jobs are maintained in the persistent store for debugging. Status should be updated only by the Tracker, which will ensure correct serialization and Saver updates.

func NewStatus

func NewStatus() Status

NewStatus creates a new Status with provided parameters.

func (*Status) Detail

func (s *Status) Detail() string

Detail returns the most recent detail string. If the detail is empty, returns the previous state detail.

func (*Status) DetailTime

func (s *Status) DetailTime() time.Time

DetailTime returns the timestamp of the most recent detail update.

func (*Status) Elapsed

func (s *Status) Elapsed() time.Duration

Elapsed returns the elapsed time of the Job, rounded to nearest second.

func (*Status) Error

func (s *Status) Error() string

func (*Status) Label

func (s *Status) Label() string

Label provides a state label for metrics. If the final state is Failed, it composes the previous and final state, e.g. Loading-Failed

func (*Status) LastStateInfo

func (s *Status) LastStateInfo() StateInfo

LastStateInfo returns copy of the StateInfo for the most recent state.

func (*Status) NewState

func (s *Status) NewState(state State) StateInfo

NewState adds a new StateInfo to the status. If state is unchanged, it just logs a warning. Returns the previous StateInfo

func (*Status) Prev

func (s *Status) Prev() State

Prev returns the penultimate job State enum, or Init

func (*Status) SetDetail

func (s *Status) SetDetail(detail string) StateInfo

SetDetail replaces the most recent StateInfo with copy containing new detail. It returns the previous StateInfo value.

func (*Status) StartTime

func (s *Status) StartTime() time.Time

StartTime returns the time that this job was started.

func (*Status) State

func (s *Status) State() State

State returns the job State enum.

func (*Status) StateChangeTime

func (s *Status) StateChangeTime() time.Time

StateChangeTime returns the start time of the current state.

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker keeps track of all the jobs in flight. Only tracker functions should access any of the fields.

func InitTracker

func InitTracker(
	ctx context.Context,
	saverV2 GenericSaver,
	saveInterval time.Duration,
	expirationTime time.Duration,
	cleanupDelay time.Duration) (*Tracker, error)

InitTracker recovers the Tracker state from a Client object. May return error if recovery fails.

func (*Tracker) AddJob

func (tr *Tracker) AddJob(job Job) error

AddJob adds a new job to the Tracker. May return ErrJobAlreadyExists if job already exists and is still in flight.

func (*Tracker) GetState

func (tr *Tracker) GetState() (JobMap, Job, time.Time)

GetState returns the full job map, last initialized Job, and last mod time. It also cleans up any expired jobs from the tracker.

func (*Tracker) GetStatus

func (tr *Tracker) GetStatus(key Key) (Status, error)

GetStatus retrieves the status of an existing job. Note that the returned object is a shallow copy, and the History field shares the slice objects with the JobMap.

func (*Tracker) Heartbeat

func (tr *Tracker) Heartbeat(key Key) error

Heartbeat updates a job's heartbeat time.

func (*Tracker) NumFailed

func (tr *Tracker) NumFailed() int

NumFailed returns the number of failed jobs.

func (*Tracker) NumJobs

func (tr *Tracker) NumJobs() int

NumJobs returns the number of jobs in flight. This includes jobs in "Complete" state that have not been removed from saver.

func (*Tracker) SetDetail

func (tr *Tracker) SetDetail(key Key, detail string) error

SetDetail updates a job's detail message in memory.

func (*Tracker) SetJobError

func (tr *Tracker) SetJobError(key Key, errString string) error

SetJobError updates a job's error fields, and handles persistence.

func (*Tracker) SetStatus

func (tr *Tracker) SetStatus(key Key, state State, detail string) error

SetStatus updates a job's state in memory. It may or may not change the job state. If it does change state, the detail string is applied to the last state, not the new state.

func (*Tracker) Sync

func (tr *Tracker) Sync(ctx context.Context, lastSave time.Time) (time.Time, error)

Sync snapshots the full job state and saves it to persistent storage IFF it has changed. Returns time last saved, which may or may not be updated.

func (*Tracker) UpdateJob

func (tr *Tracker) UpdateJob(key Key, new Status) error

UpdateJob updates an existing job. May return ErrJobNotFound if job no longer exists.

func (*Tracker) WriteHTMLStatusTo

func (tr *Tracker) WriteHTMLStatusTo(ctx context.Context, w io.Writer) error

WriteHTMLStatusTo writes out the status of all jobs to the html writer.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL