jobs

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2019 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StateFinished = "FINISHED"
	StateErrored  = "ERRORED"
	StateRunning  = "RUNNING"
)

Variables

This section is empty.

Functions

func ExtractJobIdFromKey

func ExtractJobIdFromKey(path string, key *stream.FlowKeyValue) (string, error)

func NewEtcdStreamConsumer

func NewEtcdStreamConsumer(ctx context.Context, cli *clientv3.Client, config *StreamConsumerConfiguration) (*streamConsumer, error)

func NewStreamConsumer

func NewStreamConsumer(ctx context.Context, config *StreamConsumerConfiguration, jobStore JobStore, flow horizontal.Flow) *streamConsumer

func RunJobFlowUntil

func RunJobFlowUntil(ctx context.Context, cli *clientv3.Client, path string, cb func(job *Job) (bool, error), timeout time.Duration) error

interesting idea that can be used for some other cases, when need to process the stream till a certain point

Types

type Job

type Job struct {
	Id           string        `json:"Id"`
	TaskName     string        `json:"TaskName"`
	Args         JobParameters `json:"Args"`
	State        string        `json:"State"`
	CreatedAt    time.Time     `json:"CreatedAt"`
	UpdatedAt    time.Time     `json:"UpdatedAt"`
	Result       string        `json:"Result"`
	MaxRetry     int64         `json:"MaxRetry"`
	CurrentRetry int64         `json:"CurrentRetry"`
	Errors       []string      `json:"Errors"`
	// attached to the job when picked up by the consumer
	ConsumerName string `json:"ConsumerName"`
}

func NewJob

func NewJob(taskName string, args JobParameters) (string, *Job)

func NewJobWithRetry

func NewJobWithRetry(taskName string, args JobParameters, maxRetry int) (string, *Job)

func (*Job) IsFailed

func (j *Job) IsFailed() bool

type JobHandler

type JobHandler interface {
	Handle(jobStore JobStore, j *Job, logger *logrus.Entry) error
}

type JobParameters

type JobParameters map[string]interface{}

type JobStore

type JobStore interface {
	Get(ctx context.Context, jobId string) (*Job, error)
	MarkFinished(ctx context.Context, jobId string) error
	MarkFailed(ctx context.Context, jobId string, err error) error
	SaveResult(ctx context.Context, jobId string, result interface{}) error
	MarkRunning(ctx context.Context, jobId string) error
	// Touch only updates the UpdatedAt field of the job it doesn't affect any other fields
	Touch(ctx context.Context, jobId string) error
}

func NewEtcdJobStore

func NewEtcdJobStore(cli *clientv3.Client, path string, consumerName string, runningNoUpdate time.Duration, retentionPeriod time.Duration) JobStore

func NewJobStore

func NewJobStore(cli kvstore.Store, path string, consumerName string, runningNoUpdate time.Duration, retentionPeriod time.Duration) JobStore

type Producer

type Producer interface {
	Submit(ctx context.Context, taskName string, args JobParameters) (string, error)
	SubmitRaw(ctx context.Context, job *Job) error
	SubmitWithRetry(ctx context.Context, taskName string, args JobParameters, maxRetry int) (string, error)
}

func NewEtcdJobProducer

func NewEtcdJobProducer(cli *clientv3.Client, path string) Producer

func NewProducer

func NewProducer(cli kvstore.Store, path string) Producer

type Progress

type Progress struct {
	// contains filtered or unexported fields
}

func NewProgress

func NewProgress(jobStore JobStore, logger *logrus.Entry) *Progress

func (*Progress) RunWithProgressResult

func (p *Progress) RunWithProgressResult(ctx context.Context, jobId string, slowFunc func(progressChan chan<- interface{}) error) error

func (*Progress) RunWithProgressTime

func (p *Progress) RunWithProgressTime(ctx context.Context, jobId string, slowFunc func() error, opts ...RunOption) error

type RunOption

type RunOption func(*progressOption)

func WithTickTime

func WithTickTime(t time.Duration) RunOption

type StreamConsumerConfiguration

type StreamConsumerConfiguration struct {
	Path         string `validate:"required" yaml:"Path"`
	ConsumerName string `validate:"required" yaml:"ConsumerName"`
	Concurrency  int    `validate:"required" yaml:"Concurrency"`
	// optional defaults to 5ecs
	NextRetry       time.Duration `yaml:"NextRetry"`
	RunningNoUpdate time.Duration `yaml:"RunningNoUpdate"`
	FromEnd         bool          `yaml:"FromEnd"`
	// how often to set the job.UpdateAt field to keep the job running
	HeartBeatInterval time.Duration `yaml:"HeartBeatInterval"`
	RetentionPeriod   time.Duration `yaml:"RetentionPeriod"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL