flow

package module
v0.6.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2021 License: Apache-2.0 Imports: 3 Imported by: 1

README

FLOW

Simple read-process-write worker with goroutines

GoDoc Go codecov Go Report

Usage

  1. Register In, Out and Processes functions, that implement corresponding interfaces.
  2. Run flow.Serve

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Flow

type Flow struct {
	In      map[string]Reader
	Out     map[string]Writer
	Process map[string]Processor
	// contains filtered or unexported fields
}

Flow is the abstract flow worker to operate input, output and process

func NewFlow

func NewFlow() *Flow

NewFlow initialize new flow instance

func (*Flow) AddInFlow

func (f *Flow) AddInFlow(key string, in Reader)

AddInFlow in adder

func (*Flow) AddOutFlow

func (f *Flow) AddOutFlow(key string, out Writer)

AddOutFlow out adder

func (*Flow) AddProcessFlow

func (f *Flow) AddProcessFlow(key string, process Processor)

AddProcessFlow process adder

func (*Flow) GetStatus added in v0.4.1

func (f *Flow) GetStatus() FlowStatus

func (*Flow) IsRunning added in v0.4.2

func (f *Flow) IsRunning() bool

func (*Flow) IsStartable added in v0.6.6

func (f *Flow) IsStartable() bool

func (*Flow) Restart added in v0.4.4

func (f *Flow) Restart() error

func (*Flow) Serve

func (f *Flow) Serve(workersCount int, in, out string, processors []string) error

Serve flow in concurrent mode returned error also save in description of the flow status so we can not check it at the moment

func (*Flow) SetInFlow

func (f *Flow) SetInFlow(in map[string]Reader)

SetInFlow in setter

func (*Flow) SetOutFlow

func (f *Flow) SetOutFlow(out map[string]Writer)

SetOutFlow out setter

func (*Flow) SetProcessFlow

func (f *Flow) SetProcessFlow(process map[string]Processor)

SetProcessFlow process setter

func (*Flow) Stop added in v0.3.0

func (f *Flow) Stop() error

Stop stops reading important that Reader should be tollerant to Cancel if it is not reading

func (*Flow) WithChanBuffer added in v0.3.1

func (f *Flow) WithChanBuffer(chanBuffer uint16) *Flow

func (*Flow) WithError added in v0.6.3

func (f *Flow) WithError(err string) *Flow

func (*Flow) WithWaitToKill added in v0.6.0

func (f *Flow) WithWaitToKill(waitToKill uint16) *Flow

type FlowStatus added in v0.6.0

type FlowStatus struct {
	Status         Status
	Started, Ended time.Time
	Description    string
	CountRead      uint64
	CountWrite     uint64
	CountMax       uint64
}

type Processor

type Processor interface {
	ProcessMessage(wg *sync.WaitGroup, inChan, outChan chan map[string]string, goroutineNum int)
}

Processor data in flow

type Reader

type Reader interface {
	ReadDataToChan() (inChan chan map[string]string)
	Cancel()
	GetReadStatus() (countRead, countMax uint64)
}

Reader input data to flow

type Status added in v0.4.1

type Status uint8
const (
	NOT_EXIST Status = iota
	WAIT_TO_START
	PROCESSING
	ERROR
	CANCELLING
	CANCELLED
	FINISHED
)

func (Status) String added in v0.4.1

func (s Status) String() string

type Writer

type Writer interface {
	WriteDataFromChan(wg *sync.WaitGroup, outChan chan map[string]string)
	IsFinished() <-chan struct{}
	GetWriteStatus() (countWrite uint64)
}

Writer output data from flow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL