dfmgr

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2024 License: MIT Imports: 16 Imported by: 0

README

Dataflow-jobcontrol

This is a package for starting, monitoring and stopping GCP dataflow jobs. It uses a CloudSQL Postgres database to retain a list of jobs for progress tracking.

Main Files
File Purpose
schema/ Postgres db creation scripts
jobdef/ Example dataflow pipeline options json config file
dfmgr.go Logic manager
dfmgr_test.go Tests
pgmgr.go Postgres logic manager
pgmgr_test.go Tests
pgdatamgr.go Data repo interface
Ancillary Files
File Purpose
config.go Boot package parameters, environment var collection
const.go Package constants
entity.go Package structs
env Package environment variables for local/dev installation
gogets Statements for go-getting required packages

Documentation

Index

Constants

View Source
const (
	//CnstStateUnknown The job's run state isn't specified.
	CnstStateUnknown = "JOB_STATE_UNKNOWN"
	//CnstStateStopped indicates that the job has not yet started to run
	CnstStateStopped = "JOB_STATE_STOPPED"
	//CnstStateRunning indicates that the job is currently running
	CnstStateRunning = "JOB_STATE_RUNNING"
	//CnstStateDone indicates that the job has successfully completed. This is a terminal job state.  This state may be set by the Cloud Dataflow service, as a transition from `JOB_STATE_RUNNING`. It may also be set via a Cloud Dataflow `UpdateJob` call, if the job has not yet reached a terminal state.
	CnstStateDone = "JOB_STATE_DONE"
	//CnstStateFailed indicates that the job has failed.  This is a terminal job state.  This state may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_RUNNING`.
	CnstStateFailed = "JOB_STATE_FAILED"
	//CnstStateCancelled indicates that the job has been explicitly cancelled. This is a terminal job state. This state may only be set via a Cloud Dataflow `UpdateJob` call, and only if the job has not yet reached another terminal state.
	CnstStateCancelled = "JOB_STATE_CANCELLED"
	//CnstStateUpdated indicates that the job was successfully updated, meaning that this job was stopped and another job was started, inheriting state from this one. This is a terminal job state. This state may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_RUNNING`.
	CnstStateUpdated = "JOB_STATE_UPDATED"
	//CnstStateDraining indicates that the job is in the process of draining. A draining job has stopped pulling from its input sources and is processing any data that remains in-flight. This state may be set via a Cloud Dataflow `UpdateJob` call, but only as a transition from `JOB_STATE_RUNNING`. Jobs that are draining may only transition to `JOB_STATE_DRAINED`,`JOB_STATE_CANCELLED`, or `JOB_STATE_FAILED`.
	CnstStateDraining = "JOB_STATE_DRAINING"
	//CnstStateDrained indicates that the job has been drained. A drained job terminated by stopping pulling from its input sources and processing any data that remained in-flight when draining was requested. This state is a terminal state, may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_DRAINING`.
	CnstStateDrained = "JOB_STATE_DRAINED"
	//CnstStatePending indicates that the job has been created but is not yet running.  Jobs that are pending may only transition to `JOB_STATE_RUNNING`, or `JOB_STATE_FAILED`.
	CnstStatePending = "JOB_STATE_PENDING"
	//CnstStateCancelling indicates that the job has been explicitly cancelled and is in the process of stopping.  Jobs that are cancelling may only transition to `JOB_STATE_CANCELLED` or `JOB_STATE_FAILED`.
	CnstStateCancelling = "JOB_STATE_CANCELLING"
	//CnstStateQueued indicates that the job has been created but is being delayed until launch. Jobs that are queued may only transition to `JOB_STATE_PENDING` or `JOB_STATE_CANCELLED`.
	CnstStateQueued = "JOB_STATE_QUEUED"
)

Variables

View Source
var (
	//EnvDebugOn controls verbose logging
	EnvDebugOn bool
)
View Source
var (
	//ErrNoDataFound occurs if a json result returns null
	ErrNoDataFound = errors.New("data was not found for this search")
)

errors

Functions

func NewGoogleCredentials

func NewGoogleCredentials(ctx context.Context, path string) (*google.Credentials, error)

NewGoogleCredentials returns a GCP/Google credential from a supplied file.. reference... might not be needed

Types

type DfMgr

type DfMgr struct {
	// contains filtered or unexported fields
}

DfMgr covers job management functionality

func NewMgr

func NewMgr(ctx context.Context, bc cfg.ConfigSetting) (*DfMgr, error)

NewMgr returns a new manager

func (*DfMgr) GetGcsJobDefinition

func (dfm *DfMgr) GetGcsJobDefinition(ctx context.Context, filename string) (*JobRunParameter, error)

GetGcsJobDefinition retrieves a GCS bucket hosted set of parameters for a dataflow job

func (*DfMgr) GetJob

func (dfm *DfMgr) GetJob(ctx context.Context, jobID string) (*DsJob, error)

GetJob gets a job by id

func (*DfMgr) GetJobStatus

func (dfm *DfMgr) GetJobStatus(ctx context.Context, jobID string) (*df.Job, error)

GetJobStatus gets a job from an id

func (*DfMgr) GetJobs

func (dfm *DfMgr) GetJobs(ctx context.Context, appscope, jobtype, jobstate string) ([]*DsJob, error)

GetJobs gets a list of jobs for an appscope

func (*DfMgr) GetLatestJobs

func (dfm *DfMgr) GetLatestJobs(ctx context.Context, appscope, jobtype string, limit int) ([]*DsJob, error)

GetLatestJobs gets the most recent job for an appscope

func (*DfMgr) JobStart

func (dfm *DfMgr) JobStart(ctx context.Context, appscope, jobtype string, jobParam *JobRunParameter) (*JobSimpleMeta, error)

JobStart starts a job from a template

func (*DfMgr) JobStop

func (dfm *DfMgr) JobStop(ctx context.Context, jobID string) (*df.Job, error)

JobStop stops a job

func (*DfMgr) SetGcsJobDefinition

func (dfm *DfMgr) SetGcsJobDefinition(ctx context.Context, filename, contenttype string, jd *JobRunParameter) error

SetGcsJobDefinition retrieves a GCS bucket hosted set of parameters for a dataflow job

type DsJob

type DsJob struct {
	AppScope    string     `json:"appscope" datastore:"appscope"`
	JobID       string     `json:"jobid" datastore:"jobid"`
	JobType     string     `json:"jobtype" datastore:"jobtype"`
	LastStatus  string     `json:"laststatus" datastore:"laststatus"`
	CreatedDate *time.Time `json:"createddate,omitempty" datastore:"createddate"`
	LastTouched *time.Time `json:"lasttouched,omitempty" datastore:"lasttouched"`
}

DsJob covers the basic identifier data required to control a dataflow job

type JobRunParameter

type JobRunParameter struct {
	CustomParameters   map[string]string `json:"customparameters"`
	RuntimeEnvironment map[string]string `json:"runtimeenvironment"`
	JobRequest         map[string]string `json:"jobrequest"`
}

JobRunParameter contains the full set of parameters to run a datflow job

type JobSimpleMeta

type JobSimpleMeta struct {
	JobID        string `json:"jobid"`
	JobType      string `json:"jobtype"`
	CurrentState string `json:"currentstate"`
}

JobSimpleMeta contains the basic data

type PgMgr

type PgMgr struct {
	// contains filtered or unexported fields
}

PgMgr handles interactions with a postgres db store

func NewPgMgr

func NewPgMgr(ctx context.Context, bc cfg.ConfigSetting) (*PgMgr, error)

NewPgMgr creates a new manager

func (*PgMgr) DeleteJob

func (pgm *PgMgr) DeleteJob(ctx context.Context, appscope, jobid string) error

DeleteJob clears a job

func (*PgMgr) DeleteJobArchive

func (pgm *PgMgr) DeleteJobArchive(ctx context.Context, appscope string) error

DeleteJobArchive clears the job archive (older than 24 hours)

func (*PgMgr) GetAppScopeJobCount

func (pgm *PgMgr) GetAppScopeJobCount(ctx context.Context, appscope, jobtype, jobstate string) (int64, error)

GetAppScopeJobCount gets the count of Jobs for a specified appscope

func (*PgMgr) GetAppScopeJobs

func (pgm *PgMgr) GetAppScopeJobs(ctx context.Context, appscope, jobtype, jobstate string) ([]*DsJob, error)

GetAppScopeJobs gets the Jobs for a specified appscope, jobtype and jobstate (latter two can be empty string)

func (*PgMgr) GetJob

func (pgm *PgMgr) GetJob(ctx context.Context, jobid string) (*DsJob, error)

GetJob gets a specific job

func (*PgMgr) GetLatestAppScopeJob

func (pgm *PgMgr) GetLatestAppScopeJob(ctx context.Context, appscope, jobtype string, limit int) ([]*DsJob, error)

GetLatestAppScopeJob gets the lastest Job for a specified appscope

func (*PgMgr) SaveJob

func (pgm *PgMgr) SaveJob(ctx context.Context, mdp *DsJob) error

SaveJob saves a job

func (*PgMgr) SetJobStatus

func (pgm *PgMgr) SetJobStatus(ctx context.Context, jobid, jobstate string) error

SetJobStatus sets a job status

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL