gofherd

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

README

Herd of gophers at your service

Gofherd

Gofherd (gof-herd), is a small framework for running user defined functions with bounded parallelism. It's simple interface gives you a function which accepts tasks, allows you to define a function which has "processing logic" and gives you an output channel to read results from.

Gofherd provides:

  • Bounded parallelism
    • You can configure number of gophers to run the tasks.
  • Monitoring
    • Current state is exposed as Prometheus compatible metrics on /metrics
    • Metrics: gofherd_success_total, gofherd_retry_total, gofherd_failure_total
  • Dynamic parallelism
    • Using GET/PATCH calls on /herd

Example

go get github.com/darshanime/gofherd
package main

import (
	"fmt"

	gf "github.com/darshanime/gofherd"
)

func LoadWork(herd *gf.Gofherd) {
	for i := 0; i < 10; i++ {
		w := gf.Work{ID: fmt.Sprintf("workdID:%d", i), Body: i}
		// blocking call, returns when work picked up by a goroutine
		herd.SendWork(w)
	}
	// done pushing all work, close input chan to indicate that
	herd.CloseInputChan()
}

// this is the signature of "processing logic" function
func ProcessWork(w *gf.Work) gf.Status {
	result := w.Body.(int) + 10
	// set the result after processing
	w.SetResult(result)
	return gf.Success
}

func ReviewOutput(outputChan <-chan gf.Work) {
	// output chan is closed when all results are received
	for work := range outputChan {
		fmt.Printf("%s, status: %s, result:%d\n", work.ID, work.Status(), work.Result())
	}
}

func main() {
	// passing "processing logic" when initializing work
	herd := gf.New(ProcessWork)
	// setting a herd size of 0
	herd.SetHerdSize(0)
	// disabling retries
	herd.SetMaxRetries(0)
	// bind on 127.0.0.1:5555
	herd.SetAddr("127.0.0.1:5555")
	go LoadWork(herd)
	herd.Start()
	ReviewOutput(herd.OutputChan())
}

Get current herd size: curl -XGET localhost:5555/herd

Now, we can increase the herd size using curl -XPATCH 127.0.0.1:5555/herd -d '{"num": 10}'

Output:

$ go run main.go
workdID:0, status: success, result:10
workdID:1, status: success, result:11
workdID:2, status: success, result:12
workdID:3, status: success, result:13
workdID:5, status: success, result:15
workdID:4, status: success, result:14
workdID:7, status: success, result:17
workdID:6, status: success, result:16
workdID:9, status: success, result:19
workdID:8, status: success, result:18

Specification

When initializing gofherd, it takes:

  • processing logic function with the signature func ProcessWork(w *gf.Work) gf.Status The status can be one of:
const (
	// Success represents a successful processing outcome. It won't be retried.
	Success Status = iota
	// Retry represents a failed processing outcome, but is retriable. It will be retried for MaxRetries.
	Retry
	// Failure represents a failed processing outcome and should not be retried again.
	Failure
)

Each unit of "work" is defined as the struct:

type Work struct {
	ID     string
	retry  int64
	status Status
	Body   interface{}
	result interface{}
}

The ID field is used to track status of the work unit, retry count etc. The Body field can be anything that makes sense for the usecase at hand. It is for the input problem, there is a Result field which has the output answer.

For sending work, gf.SendWork can be used. It is a blocking call and will return when a member of the herd is accepts the work. On calling gf.OutputChan(), a receive only channel <-chan Work is returned which can be used to read the status for successfully processed work units. It will be closed by gofherd on completion.

Callbacks can be registered for Success, Retry and Failures. The corresponding function will be called when the processing work function returns with the assigned status. Make sure the callbacks are concurrent safe.

Logging

Gofherd accepts

type Logger interface {
	Printf(format string, v ...interface{})
}

Example Usage:

logger := log.New(os.Stdout, "gofherd:", log.Ldate|log.Ltime|log.Lshortfile)
herd.SetLogger(logger)

Deploying

There is a docker compose deployment at github.com/darshanime/gofherd-deploy you can refer to jumpstart your custom thing.

Contributing

Run the tests

go test -race -v ./...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Gofherd

type Gofherd struct {
	// contains filtered or unexported fields
}

Gofherd is the core struct, orchestrating all functionality. It offers all the public methods of Gofherd.

func New

func New(processingLogic func(*Work) Status) *Gofherd

New initializes a new Gofherd struct. It takes in the processing logic function with the signature `func(*gf.Work) gf.Status`

func (*Gofherd) AddFailureCallback

func (gf *Gofherd) AddFailureCallback(f func(*Work))

AddFailureCallback is used to setup logging. If not specified, gofherd emits no logs.

func (*Gofherd) AddRetryCallback

func (gf *Gofherd) AddRetryCallback(f func(*Work))

AddRetryCallback is used to setup logging. If not specified, gofherd emits no logs.

func (*Gofherd) AddSuccessCallback

func (gf *Gofherd) AddSuccessCallback(f func(*Work))

AddSuccessCallback is used to setup logging. If not specified, gofherd emits no logs.

func (*Gofherd) CloseInputChan

func (gf *Gofherd) CloseInputChan()

CloseInputChan is to closed the input chan. Closing the input chan when the tasks are completed will allow gofherd to shutdown gracefully.

func (*Gofherd) DecreaseHerdBy

func (gf *Gofherd) DecreaseHerdBy(num int64)

DecreaseHerdBy is used to decrease the herd size given amount

func (*Gofherd) IncreasedHerdBy

func (gf *Gofherd) IncreasedHerdBy(num int64)

IncreasedHerdBy is used to increase the herd size given amount

func (*Gofherd) OutputChan

func (gf *Gofherd) OutputChan() <-chan Work

OutputChan returns the output chan, it will be closed when the processing is complete, enabling it to be read in a `for range` loop.

func (*Gofherd) SendWork

func (gf *Gofherd) SendWork(work Work)

SendWork enques Work onto the input chan.

func (*Gofherd) SetAddr

func (gf *Gofherd) SetAddr(addr string)

SetAddr accepts the `addr` string where the started server will be spun up.

func (*Gofherd) SetHerdSize

func (gf *Gofherd) SetHerdSize(num int64)

SetHerdSize sets the herd size. The passed number is the number of gofhers spawned up for processing.

func (*Gofherd) SetLogger

func (gf *Gofherd) SetLogger(l Logger)

SetLogger is used to setup logging. If not specified, gofherd emits no logs.

func (*Gofherd) SetMaxRetries

func (gf *Gofherd) SetMaxRetries(num int64)

SetMaxRetries is the maximum number of times a Work unit will be tried before giving up.

func (*Gofherd) Start

func (gf *Gofherd) Start()

Start will start the processing and start the server. The function will return immediately.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger interface is accepted by SetLogger function and used to log the output. It has a single function Printf with the signature: `Printf(format string, v ...interface{})`

type Status

type Status int

Status represents the outcome of "processing" Work. It can be one of Success, Retry, Failure.

const (
	// Success represents a successful processing outcome. It won't be retried.
	Success Status = iota
	// Retry represents a failed processing outcome, but is retriable. It will be retried for MaxRetries.
	Retry
	// Failure represents a failed processing outcome and should not be retried again.
	Failure
)

func (Status) String

func (r Status) String() string

type Work

type Work struct {
	ID string

	Body interface{}
	// contains filtered or unexported fields
}

Work is the struct representing the work unit in Gofherd. It has an ID field which is a string, `Body` and `Result` which are an interface to store the "problem" and "solution" respectively.

func (*Work) Result

func (w *Work) Result() interface{}

Result is used to access the Result of the Work unit.

func (*Work) SetResult

func (w *Work) SetResult(result interface{})

SetResult is used to set the result for the Work unit.

func (*Work) Status

func (w *Work) Status() Status

Status is used to access the status of the Work unit.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL