flow

package module
v0.0.0-...-194f2cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2022 License: MIT Imports: 12 Imported by: 0

README

Introduction

This package provides simple graph to execute functions in a group.

Features

  • Define nodes of different types: Vertex, Branch and Loop
  • Define branch for conditional nodes

Installation

go get github.com/sujit-baniya/flow

Usage

Basic Flow
package main

import (
	"context"
	"fmt"
	"github.com/sujit-baniya/flow"
)

func Message(ctx context.Context, d flow.Data) (flow.Data, error) {
	d.Payload = flow.Payload(fmt.Sprintf("message %s", d.Payload))
	return d, nil
}

func Send(ctx context.Context, d flow.Data) (flow.Data, error) {
	d.Payload = flow.Payload(fmt.Sprintf("This is send %s", d.Payload))
	return d, nil
}

func basicFlow() {
	flow1 := flow.New()
	flow1.AddNode("message", Message)
	flow1.AddNode("send", Send)
	flow1.Edge("message", "send")
	response, e := flow1.Build().Process(context.Background(), flow.Data{
		Payload: flow.Payload("Payload"),
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
}

func basicRawFlow() {
	rawFlow := []byte(`{
		"edges": [
			["message", "send"]
		]
	}`)
	flow1 := flow.New(rawFlow)
	flow1.AddNode("message", Message)
	flow1.AddNode("send", Send)
	response, e := flow1.Process(context.Background(), flow.Data{
		Payload: flow.Payload("Payload"),
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
}

func main() {
	basicFlow()
	basicRawFlow()
}

Branch Flow
package main

import (
	"context"
	"fmt"
	"encoding/json"
	"github.com/sujit-baniya/flow"
)

func GetRegistration(ctx context.Context, d flow.Data) (flow.Data, error) {
	return d, nil
}

// VerifyUser Conditional Vertex
func VerifyUser(ctx context.Context, d flow.Data) (flow.Data, error) {
	var reg Registration
	d.ConvertTo(&reg)
	if _, ok := registeredEmail[reg.Email]; !ok {
		d.Status = "pass"
	} else {
		d.Status = "fail"
	}
	return d, nil
}

func CreateUser(ctx context.Context, d flow.Data) (flow.Data, error) {
	d.Payload = flow.Payload(fmt.Sprintf("create user %s", d.Payload))
	return d, nil
}

func CancelRegistration(ctx context.Context, d flow.Data) (flow.Data, error) {
	d.Payload = flow.Payload(fmt.Sprintf("cancel user %s", d.Payload))
	return d, nil
}

type Registration struct {
	Email    string
	Password string
}

var registeredEmail = map[string]bool{"test@gmail.com": true}

func basicRegistrationFlow() {
	flow1 := flow.New()
	flow1.AddNode("get-registration", GetRegistration)
	flow1.AddNode("create-user", CreateUser)
	flow1.AddNode("cancel-registration", CancelRegistration)
	flow1.AddNode("verify-user", VerifyUser)
	flow1.ConditionalNode("verify-user", map[string]string{
		"pass": "create-user",
		"fail": "cancel-registration",
	})
	flow1.Edge("get-registration", "verify-user")

	registration1 := Registration{
		Email:    "test@gmail.com",
		Password: "admin",
	}
	reg1, _ := json.Marshal(registration1)

	registration2 := Registration{
		Email:    "test1@gmail.com",
		Password: "admin",
	}
	reg2, _ := json.Marshal(registration2)
	response, e := flow1.Process(context.Background(), flow.Data{
		Payload: reg1,
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
	response, e = flow1.Process(context.Background(), flow.Data{
		Payload: reg2,
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
}

func basicRegistrationRawFlow() {
	rawFlow := []byte(`{
		"edges": [
			["get-registration", "verify-user"]
		],
		"branches":[
			{
				"key": "verify-user",
				"conditional_nodes": {
					"pass": "create-user",
					"fail": "cancel-registration"
				}
			}
		]
	}`)
	flow1 := flow.New(rawFlow)
	flow1.AddNode("get-registration", GetRegistration)
	flow1.AddNode("create-user", CreateUser)
	flow1.AddNode("cancel-registration", CancelRegistration)
	flow1.AddNode("verify-user", VerifyUser)
	registration1 := Registration{
		Email:    "test@gmail.com",
		Password: "admin",
	}
	reg1, _ := json.Marshal(registration1)

	registration2 := Registration{
		Email:    "test1@gmail.com",
		Password: "admin",
	}
	reg2, _ := json.Marshal(registration2)
	response, e := flow1.Process(context.Background(), flow.Data{
		Payload: reg1,
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
	response, e = flow1.Process(context.Background(), flow.Data{
		Payload: reg2,
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(response.ToString())
}

func main() {
	basicRegistrationFlow()
	basicRegistrationRawFlow()
}

Loop Flow
package main

import (
	"context"
	"fmt"
	"encoding/json"
	"github.com/sujit-baniya/flow"
	"strings"
)

func GetSentence(ctx context.Context, d flow.Data) (flow.Data, error) {
	words := strings.Split(d.ToString(), ` `)
	bt, _ := json.Marshal(words)
	d.Payload = bt
	return d, nil
}

func ForEachWord(ctx context.Context, d flow.Data) (flow.Data, error) {
	return d, nil
}

func WordUpperCase(ctx context.Context, d flow.Data) (flow.Data, error) {
	var word string
	_ = json.Unmarshal(d.Payload, &word)
	bt, _ := json.Marshal(strings.Title(strings.ToLower(word)))
	d.Payload = bt
	return d, nil
}

func AppendString(ctx context.Context, d flow.Data) (flow.Data, error) {
	var word string
	_ = json.Unmarshal(d.Payload, &word)
	bt, _ := json.Marshal("Upper Case: " + word)
	d.Payload = bt
	return d, nil
}

func main() {
	flow1 := flow.New()
	flow1.AddNode("get-sentence", GetSentence)
	flow1.AddNode("for-each-word", ForEachWord)
	flow1.AddNode("upper-case", WordUpperCase)
	flow1.AddNode("append-string", AppendString)
	flow1.Loop("for-each-word", "upper-case")
	flow1.Edge("get-sentence", "for-each-word")
	flow1.Edge("upper-case", "append-string")
	resp, e := flow1.Process(context.Background(), flow.Data{
		Payload: flow.Payload("this is a sentence"),
	})
	if e != nil {
		panic(e)
	}
	fmt.Println(resp.ToString())
}

ToDo List

  • Implement async flow and nodes
  • Implement distributed nodes

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyComplete Returned when a task is attempted which was already successfully completed.
	ErrAlreadyComplete = errors.New("this task was already successfully completed once")

	// ErrDoNotReattempt If this is returned from a task function, the task shall not be re-attempted.
	ErrDoNotReattempt = errors.New("this task should not be re-attempted")

	// ErrMaxRetriesExceeded This task has been attempted too many times.
	ErrMaxRetriesExceeded = errors.New("the maximum retries for this task has been exceeded")

	// Now Set this function to influence the clock that will be used for
	// scheduling re-attempts.
	Now = func() time.Time {
		return time.Now().UTC()
	}
)
View Source
var ErrQueueShuttingDown = errors.New("queue is shutting down; new tasks are not being accepted")

Functions

func Add

func Add(key string, flow *Flow)

func All

func All() map[string]*Flow

func Join

func Join(queues ...*Queue)

Join Shuts down any number of work queues in parallel and blocks until they're all finished.

Types

type Attachment

type Attachment struct {
	Data     []byte
	File     string
	MimeType string
}

type Branch

type Branch struct {
	Key              string            `json:"key"`
	ConditionalNodes map[string]string `json:"conditional_nodes"`
}

type Data

type Data struct {
	RequestID    string       `json:"request_id"`
	Payload      Payload      `json:"payload"`
	Status       string       `json:"status"`
	Flow         string       `json:"flow"`
	Operation    string       `json:"operation"`
	FailedReason error        `json:"failed_reason"`
	UserID       uint         `json:"user_id"`
	TimeStamp    int64        `json:"time_stamp"`
	Download     bool         `json:"download"`
	FileName     string       `json:"file_name"`
	Attachments  []Attachment `json:"attachments"`
}

func (*Data) ConvertTo

func (d *Data) ConvertTo(rs interface{}) error

func (*Data) GetStatus

func (d *Data) GetStatus() string

func (*Data) Log

func (d *Data) Log() error

func (*Data) LogRecords

func (d *Data) LogRecords(count ...int64) error

func (*Data) MarshalBinary

func (d *Data) MarshalBinary() ([]byte, error)

func (*Data) ToString

func (d *Data) ToString() string

func (*Data) UnmarshalBinary

func (d *Data) UnmarshalBinary(data []byte) error

type Flow

type Flow struct {
	Key    string `json:"key"`
	Error  error  `json:"error"`
	Status string `json:"status"`
	// contains filtered or unexported fields
}

func Get

func Get(key string) *Flow

func New

func New(raw ...Payload) *Flow

func NewRaw

func NewRaw(flow *RawFlow) *Flow

func (*Flow) AddEdge

func (f *Flow) AddEdge(node Node)

func (*Flow) AddNode

func (f *Flow) AddNode(node string, handler Handler) *Flow

func (*Flow) Build

func (f *Flow) Build() *Flow

func (*Flow) ConditionalNode

func (f *Flow) ConditionalNode(vertex string, conditions map[string]string) *Flow

func (*Flow) Edge

func (f *Flow) Edge(inVertex, outVertex string) *Flow

func (*Flow) ForEach

func (f *Flow) ForEach(inVertex string, childVertex ...string) *Flow

func (*Flow) GetKey

func (f *Flow) GetKey() string

func (*Flow) GetNodeHandler

func (f *Flow) GetNodeHandler(node string) Handler

func (*Flow) GetType

func (f *Flow) GetType() string

func (*Flow) Loop

func (f *Flow) Loop(inVertex string, childVertex ...string) *Flow

func (*Flow) Node

func (f *Flow) Node(vertex string) *Flow

func (*Flow) OperationCountByType

func (f *Flow) OperationCountByType(optType string) int

func (*Flow) Process

func (f *Flow) Process(ctx context.Context, data Data) (Data, error)

func (*Flow) RunInBackground

func (f *Flow) RunInBackground() bool

func (*Flow) WithRaw

func (f *Flow) WithRaw(raw *RawFlow) *Flow

type ForEach

type ForEach struct {
	InVertex    string   `json:"in_vertex"`
	ChildVertex []string `json:"child_vertex"`
}

type Handler

type Handler func(ctx context.Context, data Data) (Data, error)

type Node

type Node interface {
	Process(ctx context.Context, data Data) (Data, error)
	AddEdge(node Node)
	GetType() string
	GetKey() string
}

type Payload

type Payload []byte

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(name string) *Queue

NewQueue Creates a new task queue. The name of the task queue is used in Prometheus label names and must match [a-zA-Z0-9:_] (snake case is used by convention).

func (*Queue) Dispatch

func (q *Queue) Dispatch(ctx context.Context) bool

Dispatch Attempts any tasks which are due and updates the task schedule. Returns true if there is more work to do.

func (*Queue) Enqueue

func (q *Queue) Enqueue(t *Task) error

Enqueue Enqueues a task.

An error will only be returned if the queue has been shut down.

func (*Queue) Now

func (q *Queue) Now(now func() time.Time)

Now Sets the function the queue will use to obtain the current time.

func (*Queue) Run

func (q *Queue) Run(ctx context.Context)

Run the task queue. Blocks until the context is cancelled.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Shutdown Stops accepting new tasks and blocks until all already-queued tasks are complete. The queue must have been started with Start, not Run.

func (*Queue) Start

func (q *Queue) Start(ctx context.Context)

Start the task queue in the background. If you wish to use the warm shutdown feature, you must use Start, not Run.

func (*Queue) Submit

func (q *Queue) Submit(fn TaskFunc) (*Task, error)

Submit Creates and enqueues a new task, returning the new task. Note that the caller cannot customize settings on the task without creating a race condition; so attempting to will panic. See NewTask and (*Queue).Enqueue to create tasks with customized options.

An error will only be returned if the queue has been shut down.

type RawFlow

type RawFlow struct {
	RunInBackground       bool       `json:"run_in_background"`
	ProcessOperationCount int        `json:"process_operation_count"`
	FirstNode             string     `json:"first_node,omitempty"`
	LastNode              string     `json:"last_node,omitempty"`
	Nodes                 []string   `json:"nodes,omitempty"`
	Loops                 [][]string `json:"loops,omitempty"`
	ForEach               []ForEach  `json:"for_each,omitempty"`
	Branches              []Branch   `json:"branches,omitempty"`
	Edges                 [][]string `json:"edges,omitempty"`
}

type Task

type Task struct {
	Metadata map[string]interface{}
	// contains filtered or unexported fields
}

Task Stores state for a task which shall be or has been executed. Each task may only be executed successfully once.

func NewTask

func NewTask(fn TaskFunc) *Task

NewTask Creates a new task for a given function.

func (*Task) After

func (t *Task) After(fn func(ctx context.Context, task *Task)) *Task

After Sets a function which will be executed once the task is completed, successfully or not. The final result (nil or an error) is passed to the callee.

func (*Task) Attempt

func (t *Task) Attempt(ctx context.Context) (time.Time, error)

Attempt to execute this task.

If successful, the zero time and nil are returned.

Otherwise, the error returned from the task function is returned to the caller. If an error is returned for which errors.Is(err, ErrDoNotReattempt) is true, the caller should not call Attempt again.

func (*Task) Attempts

func (t *Task) Attempts() int

Attempts Returns the number of times this task has been attempted

func (*Task) Done

func (t *Task) Done() bool

Done Returns true if this task was completed, successfully or not.

func (*Task) MaxTimeout

func (t *Task) MaxTimeout(d time.Duration) *Task

MaxTimeout Sets the maximum timeout between retries, or zero to exponentially increase the timeout indefinitely. Defaults to 30 minutes.

func (*Task) NextAttempt

func (t *Task) NextAttempt() time.Time

NextAttempt Returns the time the next attempt is scheduled for, or the zero value if it has not been attempted before.

func (*Task) NoJitter

func (t *Task) NoJitter() *Task

NoJitter Specifies that randomness should not be introduced into the exponential backoff algorithm.

func (*Task) NotBefore

func (t *Task) NotBefore(date time.Time) *Task

NotBefore Specifies the earliest possible time of the first execution.

func (*Task) Result

func (t *Task) Result() error

Result Returns the result of the task. The task must have been completed for this to be valid.

func (*Task) Retries

func (t *Task) Retries(n int) *Task

Retries Set the maximum number of retries on failure, or -1 to attempt indefinitely. By default, tasks are not retried on failure.

func (*Task) Within

func (t *Task) Within(deadline time.Duration) *Task

Within Specifies an upper limit for the duration of each attempt.

type TaskFunc

type TaskFunc func(ctx context.Context) error

type Vertex

type Vertex struct {
	Key              string            `json:"key"`
	Type             string            `json:"type"`
	ConditionalNodes map[string]string `json:"conditional_nodes"`
	// contains filtered or unexported fields
}

func (*Vertex) AddEdge

func (v *Vertex) AddEdge(node Node)

func (*Vertex) GetKey

func (v *Vertex) GetKey() string

func (*Vertex) GetType

func (v *Vertex) GetType() string

func (*Vertex) Process

func (v *Vertex) Process(ctx context.Context, data Data) (Data, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL