dag

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2022 License: MIT Imports: 7 Imported by: 2

README

DAG

pipeline status coverage report

Overview

DAG (Directed Acyclic Graph) based job runner.

Statuses

Job statuses
Status Description
JobStatusNotStarted This jobs waits to be started
JobStatusRunning This job is currently running
JobStatusFinishedSuccess This job ran succssesfully err == nil
JobStatusFinishedConditionNotMet The job has to decide it self if it doesn't run and returns this status. This makes sense for example if the DAG runs every day, but specific jobs should only run on Mondays. err == nil & responses["CONDITIONS_NOT_MET"] == "What ever you want to have here"
JobStatusFinishedWithSoftError If a job failes with a soft error, all child jobs will run err != nil & responses["SOFT_ERROR"] == "SOFT_ERROR"
JobStatusFinishedWithErrorAsItHasParentsHaveErrors If at least one parent has ended with JobStatusFinishedWithError or JobStatusFinishedWithErrorAsItHasParentsHaveErrors
JobStatusFinishedWithError If a job failes with an error, all child jobs will not run. These jobs will get the status JobStatusFinishedWithErrorAsItHasParentsHaveErrors err != nil & responses["SOFT_ERROR"] == "SOFT_ERROR" not set
DAG statuses
Status Description
DagStatusNotStarted This jobs waits to be started
DagStatusRunning DAG is running
DagStatusFinishedSuccess DAG ran succssesfully. Even if one or more JobStatusFinishedWithSoftError occured or one or more jobs didn't run because of JobStatusFinishedConditionNotMet
DagStatusFinishedWithErrors If at least one JobStatusFinishedWithError (not JobStatusFinishedWithSoftError) occured

Examples

Dag with a soft error
package main

// DAG job 'Task 2' finishes with a soft error => following tasks will succeed
// As we only have succssesful tasks or soft errors, DAG succeeds

// Using LOGRUS as logger. See example 1 and 2 how to use ZAP

import (
  "errors"
  "fmt"
  "os"
  "time"

  "github.com/sirupsen/logrus"
  "gitlab.com/lesql/dag"
)

func main() {

  mylogger := logrus.New()
  myloggerDag := mylogger.WithFields(logrus.Fields{
    "xyz": "xyz",
  })

  dagHooks := dag.Hooks{
    DagCreated: func(dag *dag.Dag) {
      mylogger.Infof("DagCreated: %s - %s ", dag.Name, dag.ID)
    },
    DagStarted: func(dag *dag.Dag) {
      mylogger.Infoln("DagStarted")
    },
    DagFinishedSuccess: func(dag *dag.Dag) {
      mylogger.Infoln("DagFinishedSuccess")
    },
    DagFinishedFailed: func(dag *dag.Dag) {
      mylogger.Errorln("DagFinishedFailed")
    },
    JobCreated: func(dagJob *dag.Job) {
      mylogger.Infoln("JobCreated")
    },
    JobStarted: func(dagJob *dag.Job) {
      mylogger.Infof("JobStarted: %s - %s", dagJob.Name, dagJob.ID)
    },
    JobFinishedSuccess: func(dagJob *dag.Job) {
      mylogger.Infof("JobFinishedSuccess: %s - %s", dagJob.Name, dagJob.ID)
    },
    JobFinishedFailed: func(dagJob *dag.Job) {
      mylogger.Errorf("JobFinishedFailed: %s - %s", dagJob.Name, dagJob.ID)
    },
  }

  dag, err := dag.New("Dag 1", myloggerDag, dagHooks)
  if err != nil {
    mylogger.Errorln(err)
    os.Exit(1)
  }

  job1, err := dag.CreateJob("Task 1", func() (map[string]string, error) {
    fmt.Println("Running task 1: sleep 12s")
    time.Sleep(12 * time.Second)
    ret := make(map[string]string)
    return ret, nil
  })

  if err != nil {
    logrus.Errorln(err)
    os.Exit(1)
  }

  job2, err := dag.CreateJob("Task 2", func() (map[string]string, error) {
    fmt.Println("Running task 2: sleep 10s")
    time.Sleep(10 * time.Second)
    ret := make(map[string]string)
    ret["SOFT_ERROR"] = "SOFT_ERROR"
    return ret, errors.New("Task 2 failed")
  })

  if err != nil {
    logrus.Errorln(err)
    os.Exit(1)
  }

  job3, err := dag.CreateJob("Task 3", func() (map[string]string, error) {
    fmt.Println("Running task 3: sleep 5s")
    time.Sleep(5 * time.Second)
    ret := make(map[string]string)
    return ret, nil
  })

  if err != nil {
    logrus.Errorln(err)
    os.Exit(1)
  }

  err = job1.Then(job3)
  if err != nil {
    mylogger.Errorln(err)
    os.Exit(1)
  }

  err = job2.Then(job3)
  if err != nil {
    mylogger.Errorln(err)
    os.Exit(1)
  }

  err = dag.Run()
  if err != nil {
    mylogger.Errorln(err)
    os.Exit(1)
  }
}

For more examples, see folder examples

Development

Run tests
go test --race -cover ./...  -v

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetPreV04DagStatusText added in v0.4.0

func GetPreV04DagStatusText(status DagStatus) string

GetPreV04DagStatusText returns the text status code from < v0.4.0. !!!! DO NOT USE THESE IN NEW CODE. !!!! It returns the empty string if the code is unknown.

func GetPreV04JobStatusText added in v0.4.0

func GetPreV04JobStatusText(status JobStatus) string

GetPreV04JobStatusText returns the text status code from < v0.4.0. !!!! DO NOT USE THESE IN NEW CODE. !!!! It returns the empty string if the code is unknown.

Types

type Dag

type Dag struct {
	ID         string
	Name       string
	Jobs       map[string]*Job
	JobsOrder  []string
	Status     DagStatus
	CreatedTs  time.Time
	StartedTs  time.Time
	FinishedTs time.Time
	Logger     Logger
	// contains filtered or unexported fields
}

Dag represents directed acyclic graph

func New

func New(name string, lggr Logger, hooksLocal Hooks) (*Dag, error)

New creates new DAG

func (*Dag) CreateJob

func (dag *Dag) CreateJob(name string, task func() (map[string]string, error)) (*Job, error)

CreateJob adds a job to DAG

func (*Dag) GetJob

func (dag *Dag) GetJob(name string) (*Job, error)

GetJob returns a job

func (*Dag) GetJobEdges

func (dag *Dag) GetJobEdges() ([]Edge, error)

GetJobEdges returns a list of job names

func (*Dag) GetJobs

func (dag *Dag) GetJobs() ([]JobBasics, error)

GetJobs returns a list of job names in order as they were added

func (*Dag) JobExists

func (dag *Dag) JobExists(name string) bool

JobExists tests if job exists

func (*Dag) Run

func (dag *Dag) Run() error

Run starts the DAG main loop which controls the jobs

func (*Dag) ShowJobStatuses

func (dag *Dag) ShowJobStatuses() error

type DagStatus added in v0.4.0

type DagStatus string
const (
	DagStatusNotStarted         DagStatus = "DagStatusNotStarted"
	DagStatusRunning            DagStatus = "DagStatusRunning"
	DagStatusFinishedSuccess    DagStatus = "DagStatusFinishedSuccess"
	DagStatusFinishedWithErrors DagStatus = "DagStatusFinishedWithErrors"
)

type Edge

type Edge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

type Hooks

type Hooks struct {
	DagCreated         func(dag *Dag)
	DagStarted         func(dag *Dag)
	DagFinishedSuccess func(dag *Dag)
	DagFinishedFailed  func(dag *Dag)
	JobCreated         func(job *Job)
	JobStarted         func(job *Job)
	JobFinishedSuccess func(job *Job)
	JobFinishedFailed  func(job *Job)
}

Hooks define what happens on certain events like DAG start or JOB finished with error

type Job

type Job struct {
	ID   string
	Name string

	DagID      string
	ParentJobs []*Job
	ChildJobs  []*Job
	Status     JobStatus
	CreatedTs  time.Time
	StartedTs  time.Time
	FinishedTs time.Time
	Data       map[string]string
	Responses  map[string]string
	Logger     Logger
	Mu         sync.RWMutex
	// contains filtered or unexported fields
}

func (*Job) AddData

func (job *Job) AddData(key string, value string)

AddData adds additional data to job

func (*Job) After

func (job *Job) After(parent *Job) error

func (*Job) Then

func (parent *Job) Then(job *Job) error

type JobBasics

type JobBasics struct {
	ID              string `json:"id"`
	Name            string `json:"name"`
	ParentJobsCount int    `json:"parentJobsCount"`
}

type JobStatus added in v0.4.0

type JobStatus string
const (
	JobStatusNotStarted                                JobStatus = "JobStatusNotStarted"
	JobStatusRunning                                   JobStatus = "JobStatusRunning"
	JobStatusFinishedSuccess                           JobStatus = "JobStatusFinishedSuccess"
	JobStatusFinishedConditionNotMet                   JobStatus = "JobStatusFinishedConditionNotMet"
	JobStatusFinishedWithSoftError                     JobStatus = "JobStatusFinishedWithSoftError"
	JobStatusFinishedWithErrorAsItHasParentsHaveErrors JobStatus = "JobStatusFinishedWithErrorAsItHasParentsHaveErrors"
	JobStatusFinishedWithError                         JobStatus = "JobStatusFinishedWithError"
)

type Logger added in v0.2.0

type Logger interface {
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL