worker

package
v0.0.0-...-ea25219 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2015 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Job statuses
	StatusToRun   = "Pending"
	StatusRunning = "Running"
	StatusFailed  = "Failed"
	StatusKilled  = "Killed"
	StatusDone    = "Done"
)

Variables

View Source
var (
	WorkerDataPath = "worker_data"

	DefaultServerHost = func() string {
		ip, err := CurrentServerIP("")
		if err != nil {
			fmt.Println("failed to retrieve current host ip")
		}
		return ip
	}()
	DefaultServerUser    = "app"
	DefaultServerSSHPort = "22"
)
View Source
var (
	DefaultJobCli = strings.Join(os.Args, " ")
)
View Source
var ErrJobDone = errors.New("job is finished")

Functions

func CurrentServerIP

func CurrentServerIP(name string) (string, error)

func Listen

func Listen()

Listen will parse an flag named as "job-id". If the job-id is zero, it will run as queue listen server. Otherwise, it will run a specific job and terminate the process after the job is run.

It must be executed before http.ListenAndServer

func RunJob

func RunJob(jobId uint64)

func SetJobDB

func SetJobDB(db *gorm.DB)

Types

type BeanstalkdQueue

type BeanstalkdQueue struct {
	Debug io.Writer
	// contains filtered or unexported fields
}

func NewBeanstalkdQueue

func NewBeanstalkdQueue(name, addr string) (bq *BeanstalkdQueue)

func (*BeanstalkdQueue) Dequeue

func (bq *BeanstalkdQueue) Dequeue() (jobId uint64, err error)

func (*BeanstalkdQueue) Enqueue

func (bq *BeanstalkdQueue) Enqueue(job *QorJob) (err error)

func (*BeanstalkdQueue) Name

func (bq *BeanstalkdQueue) Name() string

func (*BeanstalkdQueue) Purge

func (bq *BeanstalkdQueue) Purge(job *QorJob) (err error)

type ExtraInput

type ExtraInput map[string]string

func (ExtraInput) Open

func (ei ExtraInput) Open(name string) (f *os.File, err error)

func (*ExtraInput) Scan

func (ei *ExtraInput) Scan(src interface{}) error

func (*ExtraInput) Value

func (ei *ExtraInput) Value() (driver.Value, error)

type Job

type Job struct {
	Name   string
	Queuer Queuer

	Description string

	Resource *admin.Resource

	Handle    func(job *QorJob) error
	OnKill    func(job *QorJob) error
	OnStart   func(job *QorJob) error
	OnSuccess func(job *QorJob)
	OnFailed  func(job *QorJob)
	// contains filtered or unexported fields
}

func (*Job) Enqueue

func (j *Job) Enqueue(job *QorJob) (err error)

func (*Job) Kill

func (j *Job) Kill(job *QorJob) (err error)

func (*Job) Meta

func (j *Job) Meta(meta *admin.Meta)

func (*Job) NewQorJob

func (j *Job) NewQorJob(interval uint64, startAt time.Time, by, cli string) (job *QorJob)

func (*Job) Run

func (j *Job) Run(job *QorJob) (err error)

TODO: run will pollute db data, status be reset to 0?

func (*Job) URL

func (j *Job) URL() string

type QorJob

type QorJob struct {
	ID         uint64
	QueueJobId string

	// unit: minute
	// 0 to run job only once
	Interval uint64
	// zero time value to execute job immediately
	StartAt time.Time

	Stopped bool

	Cli        string
	WorkerName string
	JobName    string
	Status     string
	PID        int // TODO: change it into uint

	By string

	RunCounter     uint64
	FailCounter    uint64
	SuccessCounter uint64
	KillCounter    uint64

	ServerHost    string
	ServerUser    string
	ServerSSHPort string

	ExtraValue *ExtraInput `sql:"type:text;"` // Mysql: 64KB
	ExtraFile  *ExtraInput `sql:"type:text;"` // Mysql: 64KB

	UpdatedAt time.Time
	CreatedAt time.Time
	DeletedAt time.Time

	Progress int
	// contains filtered or unexported fields
}

func (*QorJob) GetJob

func (qj *QorJob) GetJob() *Job

func (*QorJob) GetLog

func (j *QorJob) GetLog() (l string)

func (*QorJob) GetLogger

func (j *QorJob) GetLogger() (f *os.File, err error)

func (*QorJob) GetWorker

func (qj *QorJob) GetWorker() *Worker

func (*QorJob) LogPath

func (qj *QorJob) LogPath() string

func (*QorJob) Run

func (j *QorJob) Run() (err error)

func (*QorJob) SaveRunStatus

func (j *QorJob) SaveRunStatus() (err error)

TODO: dequeue job will override value?

func (*QorJob) URL

func (q *QorJob) URL() string

func (*QorJob) UpdateStatus

func (j *QorJob) UpdateStatus(status string) (err error)

type Queuer

type Queuer interface {
	// Name returns the Queuer's identifier
	Name() string
	// Enqueue pushes a job to a queue, also a Queuer could set a id value (string)
	// in QorJob's QueueJobId if needed
	// Interval
	// StartAt
	Enqueue(job *QorJob) (err error)
	// Purge removes a job from a queue
	Purge(job *QorJob) (err error)
	// Dequeue blocks the process until a job id (and error if any) is returned
	Dequeue() (jobId uint64, err error)
}

type Worker

type Worker struct {
	Name string

	Jobs map[string]*Job
	// contains filtered or unexported fields
}

func New

func New(name string) *Worker

func (*Worker) InjectQorAdmin

func (w *Worker) InjectQorAdmin(res *admin.Resource)

TODO: UNDONE

func (*Worker) NewJob

func (w *Worker) NewJob(queuer Queuer, name, desc string, handle func(job *QorJob) error) (j *Job)

func (*Worker) ResourceName

func (w *Worker) ResourceName() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL