queue

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2013 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddConveyor

func AddConveyor(conveyorID string, config *Config) error

func GobDecode

func GobDecode(buf []byte, res interface{}) error

func GobEncode

func GobEncode(v interface{}) ([]byte, error)

func RemoveConveyor

func RemoveConveyor(conveyorId string) error

RemoveConveyor stops and removes the conveyor. The conveyor will wait on running tasks to complete before shutting down.

func StartManager

func StartManager(dbFilepath string) error

Types

type Config

type Config struct {
	NWorker      int32 `json:"n_worker"`       // Number of simultaneous workers processing tasks.
	Throttle     int32 `json:"throttle"`       // Number of maxium task invocations from queue per second.
	TaskTLimit   int32 `json:"task_t_limit"`   // Duration allowed per task to complete in seconds.
	TaskMaxTries int32 `json:"task_max_tries"` // Number of tries per task before giving up. Set 0 for unlimited retries.
	LogSize      int32 `json:"log_size"`       // Number of max log entries.
}

type Conveyor

type Conveyor struct {
	Object  string    `json:"object"`  // Define resource.
	ID      string    `json:"id"`      // Conveyor identification.
	Created time.Time `json:"created"` // Conveyor created timestamp.
	Changed time.Time `json:"changed"` // Conveyor changed timestamp.
	Paused  bool      `json:"paused"`  // Conveyor is in pause state.
	Config  Config    `json:"config"`  // Conveyor configurations.
	// contains filtered or unexported fields
}

func GetAllConveyor

func GetAllConveyor() ([]*Conveyor, error)

func GetConveyor

func GetConveyor(conveyorID string) (*Conveyor, error)

func NewConveyor

func NewConveyor(conveyorID string, config *Config) *Conveyor

func (*Conveyor) Add

func (conv *Conveyor) Add(target, payload string, scheduled, recurring int64) (*Task, error)

func (*Conveyor) Flush

func (conv *Conveyor) Flush() error

func (*Conveyor) Init

func (conv *Conveyor) Init() *Conveyor

func (*Conveyor) Pause

func (conv *Conveyor) Pause()

func (*Conveyor) Resume

func (conv *Conveyor) Resume()

func (*Conveyor) Start

func (conv *Conveyor) Start() error

Starting the conveyor belt and handles retrieving and delegating tasks.

func (*Conveyor) Stats

func (conv *Conveyor) Stats() (*Statistic, error)

func (*Conveyor) Stop

func (conv *Conveyor) Stop()

func (*Conveyor) Tasks

func (conv *Conveyor) Tasks() ([]*Task, error)

type Manager

type Manager struct {
	Conveyors map[string]*Conveyor
}

type Scheduler

type Scheduler struct {
	// Conveyor.
	Conveyor *Conveyor `json:"-"`

	// Conveyor scheduler id name.
	ScheduleId string `json:"schedule_id"`

	// Conveyor scheduler list name.
	ScheduleList string `json:"schedule_list"`

	// How often scheduler checks schedule list in seconds.
	Rate time.Duration `json:"rate"`
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(conveyor *Conveyor) *Scheduler

func (*Scheduler) Add

func (sched *Scheduler) Add(taskId string, task []byte, time int64) error

func (*Scheduler) Reschedule

func (sched *Scheduler) Reschedule(task *Task) (int32, error)

Reschedule task.

func (*Scheduler) Start

func (sched *Scheduler) Start()

type Statistic

type Statistic struct {
	Object                    string        `json:"object"`
	InQueue                   int64         `json:"in_queue"`
	InProcessing              int64         `json:"in_processing"`
	InScheduled               int64         `json:"in_scheduled"`
	TotalProcessed            int           `json:"total_processed"`
	TotalProcessedOK          int           `json:"total_processed_ok"`
	TotalProcessedError       int           `json:"total_processed_error"`
	TotalProcessedRescheduled int           `json:"total_processed_rescheduled"`
	AvgTime                   time.Duration `json:"avg_time"`
	AvgTimeRecent             time.Duration `json:"avg_time_recent"`
}

type Task

type Task struct {
	Object    string `json:"object"`
	ID        string `json:"id"`
	Target    string `json:"target"`
	Payload   string `json:"payload"`
	Tries     int32  `json:"tries"`
	Delay     int32  `json:"delay"`
	Recurring int32  `json:"recurring"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL