Documentation ¶
Index ¶
- Variables
- func Add(key string, flow *Flow)
- func All() map[string]*Flow
- func Join(queues ...*Queue)
- type Attachment
- type Branch
- type Data
- type Flow
- func (f *Flow) AddEdge(node Node)
- func (f *Flow) AddNode(node string, handler Handler) *Flow
- func (f *Flow) Build() *Flow
- func (f *Flow) ConditionalNode(vertex string, conditions map[string]string) *Flow
- func (f *Flow) Edge(inVertex, outVertex string) *Flow
- func (f *Flow) ForEach(inVertex string, childVertex ...string) *Flow
- func (f *Flow) GetKey() string
- func (f *Flow) GetNodeHandler(node string) Handler
- func (f *Flow) GetType() string
- func (f *Flow) Loop(inVertex string, childVertex ...string) *Flow
- func (f *Flow) Node(vertex string) *Flow
- func (f *Flow) OperationCountByType(optType string) int
- func (f *Flow) Process(ctx context.Context, data Data) (Data, error)
- func (f *Flow) RunInBackground() bool
- func (f *Flow) WithRaw(raw *RawFlow) *Flow
- type ForEach
- type Handler
- type Node
- type Payload
- type Queue
- type RawFlow
- type Task
- func (t *Task) After(fn func(ctx context.Context, task *Task)) *Task
- func (t *Task) Attempt(ctx context.Context) (time.Time, error)
- func (t *Task) Attempts() int
- func (t *Task) Done() bool
- func (t *Task) MaxTimeout(d time.Duration) *Task
- func (t *Task) NextAttempt() time.Time
- func (t *Task) NoJitter() *Task
- func (t *Task) NotBefore(date time.Time) *Task
- func (t *Task) Result() error
- func (t *Task) Retries(n int) *Task
- func (t *Task) Within(deadline time.Duration) *Task
- type TaskFunc
- type Vertex
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAlreadyComplete Returned when a task is attempted which was already successfully completed. ErrAlreadyComplete = errors.New("this task was already successfully completed once") // ErrDoNotReattempt If this is returned from a task function, the task shall not be re-attempted. ErrDoNotReattempt = errors.New("this task should not be re-attempted") // ErrMaxRetriesExceeded This task has been attempted too many times. ErrMaxRetriesExceeded = errors.New("the maximum retries for this task has been exceeded") // Now Set this function to influence the clock that will be used for // scheduling re-attempts. Now = func() time.Time { return time.Now().UTC() } )
var ErrQueueShuttingDown = errors.New("queue is shutting down; new tasks are not being accepted")
Functions ¶
Types ¶
type Attachment ¶
type Data ¶
type Data struct { RequestID string `json:"request_id"` Payload Payload `json:"payload"` Status string `json:"status"` Flow string `json:"flow"` Operation string `json:"operation"` FailedReason error `json:"failed_reason"` UserID uint `json:"user_id"` TimeStamp int64 `json:"time_stamp"` Download bool `json:"download"` FileName string `json:"file_name"` Attachments []Attachment `json:"attachments"` }
func (*Data) LogRecords ¶
func (*Data) MarshalBinary ¶
func (*Data) UnmarshalBinary ¶
type Flow ¶
type Flow struct { Key string `json:"key"` Error error `json:"error"` Status string `json:"status"` // contains filtered or unexported fields }
func (*Flow) ConditionalNode ¶
func (*Flow) GetNodeHandler ¶
func (*Flow) OperationCountByType ¶
func (*Flow) RunInBackground ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
NewQueue Creates a new task queue. The name of the task queue is used in Prometheus label names and must match [a-zA-Z0-9:_] (snake case is used by convention).
func (*Queue) Dispatch ¶
Dispatch Attempts any tasks which are due and updates the task schedule. Returns true if there is more work to do.
func (*Queue) Enqueue ¶
Enqueue Enqueues a task.
An error will only be returned if the queue has been shut down.
func (*Queue) Shutdown ¶
func (q *Queue) Shutdown()
Shutdown Stops accepting new tasks and blocks until all already-queued tasks are complete. The queue must have been started with Start, not Run.
func (*Queue) Start ¶
Start the task queue in the background. If you wish to use the warm shutdown feature, you must use Start, not Run.
func (*Queue) Submit ¶
Submit Creates and enqueues a new task, returning the new task. Note that the caller cannot customize settings on the task without creating a race condition; so attempting to will panic. See NewTask and (*Queue).Enqueue to create tasks with customized options.
An error will only be returned if the queue has been shut down.
type RawFlow ¶
type RawFlow struct { RunInBackground bool `json:"run_in_background"` ProcessOperationCount int `json:"process_operation_count"` FirstNode string `json:"first_node,omitempty"` LastNode string `json:"last_node,omitempty"` Nodes []string `json:"nodes,omitempty"` Loops [][]string `json:"loops,omitempty"` ForEach []ForEach `json:"for_each,omitempty"` Branches []Branch `json:"branches,omitempty"` Edges [][]string `json:"edges,omitempty"` }
type Task ¶
type Task struct { Metadata map[string]interface{} // contains filtered or unexported fields }
Task Stores state for a task which shall be or has been executed. Each task may only be executed successfully once.
func (*Task) After ¶
After Sets a function which will be executed once the task is completed, successfully or not. The final result (nil or an error) is passed to the callee.
func (*Task) Attempt ¶
Attempt to execute this task.
If successful, the zero time and nil are returned.
Otherwise, the error returned from the task function is returned to the caller. If an error is returned for which errors.Is(err, ErrDoNotReattempt) is true, the caller should not call Attempt again.
func (*Task) MaxTimeout ¶
MaxTimeout Sets the maximum timeout between retries, or zero to exponentially increase the timeout indefinitely. Defaults to 30 minutes.
func (*Task) NextAttempt ¶
NextAttempt Returns the time the next attempt is scheduled for, or the zero value if it has not been attempted before.
func (*Task) NoJitter ¶
NoJitter Specifies that randomness should not be introduced into the exponential backoff algorithm.
func (*Task) Result ¶
Result Returns the result of the task. The task must have been completed for this to be valid.