floc

package module
v1.0.0-...-3579fd9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2017 License: MIT Imports: 4 Imported by: 0

README

Gopher Floc Control

go-floc

Floc: Orchestrate goroutines with ease.

GoDoc Build Status Coverage Status Go Report Card License

The goal of the project is to make the process of running goroutines in parallel and synchronizing them easy.

Announcements

The version v2 is released on the 1st of December, 2017 and that is the main maintained branch.

Branch master became v1_dev. Please consider that change could broke your repository clones of master branch.

Installation and requirements

The package requires Go v1.8

To install the package use go get gopkg.in/workanator/go-floc.v1

Documentation and examples

Please refer Godoc reference of the package for more details.

Some examples are available at the Godoc reference. Additional examples can be found in go-floc-showcase.

Features

  • Easy to use functional interface.
  • Simple parallelism and synchronization of jobs.
  • As little overhead as possible, in comparison to direct use of goroutines and sync primitives.
  • Provide better control over execution with one entry point and one exit point. That is achieved by allowing any job finish execution with Cancel or Complete.

Introduction

Floc introduces some terms which are widely used through the package.

Flow

Flow is the overall process which can be controlled through floc.Flow. Flow can be canceled or completed with any arbitrary data at any point of execution. Flow has only one enter point and only one exit point.

// Design the job
job := run.Sequence(do, something, here, ...)

// The enter point - Run the job
floc.Run(flow, state, update, job)

// The exit point - Check the result of the job.
result, data := flow.Result()
State

State is an arbitrary data shared across all jobs in flow. Since floc.State contains shared data it provides methods which return data alongside with read-only and/or read/write lockers. Returned lockers are not locked and the caller is responsible for obtaining and releasing locks.

// Read data
data, locker := state.DataWithReadLocker()

locker.Lock()
container := data.(*MyContainer)
name := container.Name
date := container.Date
locker.Unlock()

// Write data
data, locker := state.DataWithWriteLocker()

locker.Lock()
container := data.(*MyContainer)
container.Counter = container.Counter + 1
locker.Unlock()

Floc does not restrict to use state locking methods, safe data read-write operations can be done using for example sync/atomic. As well Floc does not restrict to have data in state. State can contain say channels for communication between jobs.

type ChunkStream chan []byte

func WriteToDisk(flow floc.Flow, state floc.State, update floc.Update) {
  stream := state.Data().(ChunkStream)

  file, _ := os.Create("/tmp/file")
  defer file.Close()

  for {
    select {
    case <-flow.Done():
      break
    case chunk := <-stream:
      file.Write(chunk)
    }
  }
}
Update

Update is a function of prototype floc.Update which is responsible for updating state. To identify what piece of state should be updated key is used while value contains the data which should be written. It's up to the implementation how to interpret key and value.

type Dictionary map[string]interface{}

func UpdateMap(flow floc.Flow, state floc.State, key string, value interface{}) {
  data, locker := state.DataWithWriteLocker()

  locker.Lock();
  defer locker.Unlock()

  m := data.(Dictionary)
  m[key] = value
}
Job

Job in Floc is a smallest piece of flow. The prototype of job function is floc.Job. Each job has access to floc.State and floc.Update, so it can read/write state data, and to floc.Flow, what allows finish flow with Cancel() or Complete().

Cancel() and Complete() methods of floc.Flow has permanent effect. So once finished flow cannot be canceled or completed anymore.

func ValidateContentLength(flow floc.Flow, state floc.State, update floc.Update) {
  request := state.Data().(http.Request)

  // Cancel the flow with error if request body size is too big
  if request.ContentLength > MaxContentLength {
    flow.Cancel(errors.New("content is too big"))
  }
}

Example

Lets have some fun and write a simple example which calculates some statistics on text given. The example designed so it does not require locking because each part of the Statistics struct is accessible only by one job at a moment.

const Text = `Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed
  do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim
  veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo
  consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum
  dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident,
  sunt in culpa qui officia deserunt mollit anim id est laborum.`

var sanitizeWordRe = regexp.MustCompile(`\W`)

type Statistics struct {
  Words      []string
  Characters int
  Occurrence map[string]int
}

// Split to words and sanitize them
SplitToWords := func(flow floc.Flow, state floc.State, update floc.Update) {
  statistics := state.Data().(*Statistics)

  statistics.Words = strings.Split(Text, " ")
  for i, word := range statistics.Words {
    statistics.Words[i] = sanitizeWordRe.ReplaceAllString(word, "")
  }
}

// Count and sum the number of characters in the each word
CountCharacters := func(flow floc.Flow, state floc.State, update floc.Update) {
  statistics := state.Data().(*Statistics)

  for _, word := range statistics.Words {
    statistics.Characters += len(word)
  }
}

// Count the number of unique words
CountUniqueWords := func(flow floc.Flow, state floc.State, update floc.Update) {
  statistics := state.Data().(*Statistics)

  statistics.Occurrence = make(map[string]int)
  for _, word := range statistics.Words {
    statistics.Occurrence[word] = statistics.Occurrence[word] + 1
  }
}

// Print result
PrintResult := func(flow floc.Flow, state floc.State, update floc.Update) {
  statistics := state.Data().(*Statistics)

  fmt.Printf("Words Total       : %d\n", len(statistics.Words))
  fmt.Printf("Unique Word Count : %d\n", len(statistics.Occurrence))
  fmt.Printf("Character Count   : %d\n", statistics.Characters)
}

// Design the job and run it
job := run.Sequence(
  SplitToWords,
  run.Parallel(
    CountCharacters,
    CountUniqueWords,
  ),
  PrintResult,
)

floc.Run(
  floc.NewFlow(),
  floc.NewState(new(Statistics)),
  nil,
  job,
)

// Output:
// Words Total       : 64
// Unique Word Count : 60
// Character Count   : 370

Contributing

Please found information about contributing in CONTRIBUTING.md.

Documentation

Overview

Package floc allows to orchestrate goroutines with ease. The goal of the project is to make the process of running goroutines in parallel and synchronizing them easy.

Floc follows for objectives:

-- Split the overall work into the number of small jobs. Floc cannot force you to do that but doing that grants many advantages starting from simpler testing and up to better control on execution.

-- Make end algorithms more clear and simpler by expressing them through the combination of jobs. In short terms floc allows to express job through jobs.

-- Provide better control over execution with one entry point and one exit point. That is achieved by allowing any job finish execution with Cancel or Complete.

-- Simple parallelism and synchronization of jobs.

-- As little overhead, in comparison to direct use of goroutines and sync primitives, as possible.

The package categorizes middleware used for flow building in subpackages.

-- `guard` contains middleware which help protect flow from falling into panic or unpredicted behavior.

-- `pred` contains some basic predicates for AND, OR, NOT logics.

-- `run` provides middleware for designing flow, i.e. for running job sequentially, in parallel, in background and so on.

Here is a quick example of what the package capable of.

// The job computes something complex and does writing of results in
// background.
job := run.Sequence(
  run.Background(WriteToDisk),
  run.While(pred.Not(TestComputed), run.Sequence(
    run.Parallel(
      ComputeSomething,
      ComputeSomethindElse,
      guard.Panic(ComputeDangerousThing),
    ),
    run.Parallel(
      PrepareForWrite,
      UpdateComputedFlag,
    ),
  )
  )),
  CompleteWithSuccess,
)

// The entry point: produce the result.
floc.Run(flow, state, update, job)

// The exit point: consume the result.
result, data := flow.Result()

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFlowWithDisable

func NewFlowWithDisable(parent Flow) (Flow, DisableFunc)

NewFlowWithDisable creates a new instance of the flow, containing the parent flow, and a disable function which allows to disable calls to Complete and Cancel.

func NewFlowWithResume

func NewFlowWithResume(parent Flow) (Flow, ResumeFunc)

NewFlowWithResume creates a new instance of the flow, containing the parent flow, and a resume function which allows to resume execution of the flow.

func Run

func Run(flow Flow, state State, update Update, job Job)

Run runs the job with the given environment. The only purpose of the function at the moment is to make code more expressive. In future releases the function may have additional functionality.

Types

type DisableFunc

type DisableFunc func()

DisableFunc when invoked disables calls to Complete and Cancel.

type Flow

type Flow interface {
	Releaser

	// Done returns a channel that's closed when the flow done.
	// Successive calls to Done return the same value.
	Done() <-chan struct{}

	// Complete finishes the flow with success status and stops
	// execution of further jobs if any.
	Complete(data interface{})

	// Cancel cancels the execution of the flow.
	Cancel(data interface{})

	// IsFinished tests if execution of the flow is either completed or canceled.
	IsFinished() bool

	// Result returns the result code and the result data of the flow. The call
	// to the function is effective only if the flow is finished.
	Result() (result Result, data interface{})
}

Flow provides the control over execution of the flow.

func NewFlow

func NewFlow() Flow

NewFlow creates a new instance of the flow control. Once the flow is finished the instance should not be copied or reused for controlling other flows.

type Job

type Job func(flow Flow, state State, update Update)

Job is the proptotype of function which do some piece of the overall work. With the parameters it has the implementation can control execution of flow and read/write state directly or with update function.

type Predicate

type Predicate func(state State) bool

Predicate is the function which calculates the result of some condition.

type Releaser

type Releaser interface {
	// Release should be called once when the object is not needed anymore.
	Release()
}

Releaser is responsible for releasing underlying resources.

type Result

type Result int32

Result is the result of flow execution.

const (
	None Result = iota
	Completed
	Canceled
)

Possible results.

func (Result) Int32

func (result Result) Int32() int32

Int32 returns the underlying value as int32. That is handy while working with atomic operations.

func (Result) IsCanceled

func (result Result) IsCanceled() bool

IsCanceled tests if the resilt is Canceled.

func (Result) IsCompleted

func (result Result) IsCompleted() bool

IsCompleted tests if the resilt is Completed.

func (Result) IsFinished

func (result Result) IsFinished() bool

IsFinished tests if the result is either Completed or Canceled.

func (Result) IsNone

func (result Result) IsNone() bool

IsNone tests if the resilt is None.

func (Result) IsValid

func (result Result) IsValid() bool

IsValid tests if the result is a valid value.

func (Result) String

func (result Result) String() string

type ResultSet

type ResultSet struct {
	// contains filtered or unexported fields
}

ResultSet is the set of possible results. This set is the simple implementation of Set with no check for duplicate values and it covers only basic needs of floc.

func NewResultSet

func NewResultSet(results ...Result) ResultSet

NewResultSet constructs the set with given results. The function validates all result values first and panics on any invalid result.

func (ResultSet) Contains

func (set ResultSet) Contains(result Result) bool

Contains tests if the set contains the result.

func (ResultSet) Len

func (set ResultSet) Len() int

Len returns the number of items in the set.

type ResumeFunc

type ResumeFunc func() Flow

ResumeFunc when invoked resumes the execution of the flow. Effective in case the flow was Canceled or Completed. The function returns the parent Flow.

type State

type State interface {
	Releaser

	// Returns the contained data.
	Data() (data interface{})

	// Returns the contained data with read-only locker.
	DataWithReadLocker() (data interface{}, readLocker sync.Locker)

	// Returns the contained data with read/write locker.
	DataWithWriteLocker() (data interface{}, writeLocker sync.Locker)

	// Returns the contained data with read-only and read/write lockers.
	DataWithReadAndWriteLockers() (data interface{}, readLocker, writeLocker sync.Locker)
}

State is the container of data shared amongst jobs. Depending on implementation the data can be thread-safe or not.

The state is aware of possible implementation of Releaser interface by contained data. So if the contained data implements Releaser call to state.Release() will be propagated to data.Release() as well.

type Data struct{}

func (Data) Release() {
  fmt.Println("Data released")
}

state := floc.NewState(Data{})
state.Release()

// Output: Data released

func NewState

func NewState(data interface{}) State

NewState create a new instance of the state container which can contain any arbitrary data. Data can either be of primitive type or complex structure or even interface or function. What the state should contain depends on task.

type Events struct {
  HeaderReady bool
  BodyReady bool
  DataReady bool
}

state := floc.NewState(new(Events))

The container can contain nil value as well if no contained data is required.

state := floc.NewState(nil)

type Update

type Update func(flow Flow, state State, key string, value interface{})

Update is the function which may be invoked by job to update the state and/or the flow. It's up to the direct implementation how to interpret key and value.

Directories

Path Synopsis
Package guard contains jobs which allows to protect execution of the flow from crashing or from unpredicted behavior.
Package guard contains jobs which allows to protect execution of the flow from crashing or from unpredicted behavior.
Package pred provides predicates for basic logics.
Package pred provides predicates for basic logics.
Package run is the collection of jobs which make the architecture of the flow.
Package run is the collection of jobs which make the architecture of the flow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL