dingo

package module
v0.0.0-...-561878d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2021 License: MIT Imports: 15 Imported by: 2

README

dingo

GoDoc Build Status Coverage Status

I initiated this project after machinery, which is a great library and tends to provide a replacement of Celery in #golang. The reasons to create (yet) another task library are:

  • To make sending tasks as easy as possible
  • Await and receive reports through channels. (channel is a natural way to represent asynchronous results)
  • I want to get familiar with those concepts of #golang: interface, routine, channel, and a distributed task framework is a good topic for practice, :)

One important concept I learned from Celery and inherited in Dingo is that Caller and Worker could share the same codebase.

When you send a task message in Celery, that message will not contain any source code, but only the name of the task you want to execute. This works similarly to how host names work on the internet: every worker maintains a mapping of task names to their actual functions, called the task registry.

Below is a quicklink to go through this README:

Quick Demo

Here is a quick demo of this project in local mode as a background job pool:

package main

import (
	"fmt"
	"github.com/mission-liao/dingo"
)

func main() {
	// initiate a local app
	app, err := dingo.NewApp("local", nil)
	if err != nil {
		return
	}
	// register a worker function
	err = app.Register("add", func(a int, b int) int {
		return a + b
	})
	if err != nil {
		return
	}

	// allocate workers for that function: 2 workers, sharing 1 report channel.
	_, err = app.Allocate("add", 2, 1)

	// wrap the report channel with dingo.Result
	result := dingo.NewResult(app.Call("add", dingo.DefaultOption(), 2, 3))
	err = result.Wait(0)
	if err != nil {
		return
	}
    // set callback like promise in javascript
	result.OnOK(func(sum int) {
		fmt.Printf("result is: %v\n", sum)
	})

	// release resource
	err = app.Close()
	if err != nil {
		return
	}
}

Features

Invoking Worker Functions with Arbitary Signatures

(Almost) ANY Function Can Be Your Dingo

These functions can be used as worker functions by dingo:

type Person struct {
  ID int
  Name string
}
func NewEmployee(p *Person, age int) (failed bool) { ... } // struct, OK
func GetEmployees(age int) (employees map[string]*Person) { ... } // map of struct, OK
func DeleteEmployees(names []string) (count int) { ... } // slice, OK
func DoNothing () { ... } // OK

Idealy, you don't have to rewrite your function to fit any specific signature, it's piece of cake to adapt a function to Dingo.

Below is to explain why some types can't be supported by Dingo: The most compatible exchange format is []byte, to marshall in/out your parameters to []byte, we rely these builtin encoders:

  • encoding/json
  • encoding/gob

Type info are deduced from the signatures of worker functions you register. With those type info, parameters are unmarshalled from []byte to cloeset type. A type correction procedure would be applied on those parameters before invoking.

Obviously, it's hard (not impossible) to handle all types in #golang, these are unsupported by Dingo as far as I know:

  • interface: unmarshalling requires concrete types. (so error can be marshalled, but can't be un-marshalled)
  • chan: haven't tried yet
  • private field in struct: they are ignore by json/gob, but it's still possible to support them by providing customized marshaller and invoker. (please search 'ExampleCustomMarshaller' for details)
Stateful Worker Functions

Dingo Remembers things

Wanna create a worker function with states? Two ways to did this in Dingo:

  • The reflect package allow us to invoke a method of struct, so you can initiate an instance of your struct to hold every global and provide its method as worker functions.
  • create a closure to enclose states or globals

Refer to Stateful Worker Functions for more details.

Two Way Binding with Worker Functions

Throwing and Catching with Your Dingo

Besides sending arguments, return values from worker functions can also be accessed. Every time you initiate a task, you will get a report channel.

reports, err := app.Call("yourTask", nil, arg1, arg2 ...)

// synchronous waiting
r := <-reports

// asynchronous waiting
go func () {
  r := <-reports
}()

// access the return values
if r.OK() {
  var ret []interface{} = r.Return()
  ret[0].(int) // by type assertion
}

Or using:

A Distributed Task Framework with Local Mode

Dingo @Home, or Anywhere

You would prefer a small, local worker pool at early development stage, and transfer to a distributed one when stepping in production. In dingo, there is nothing much to do for transfering (besides debugging, :( )

You've seen a demo for local mode, and it's easy to make it distributed by attaching corresponding components at caller-side and worker-side. A demo: caller and worker.

In short, at Caller side, you need to:

  • register worker functions for tasks
  • config default-option, id-maker, marshaller for tasks if needed.
  • attach Producer, Store

And at Worker side, you need to:

  • register the same worker function as the one on Caller side for tasks
  • config marshaller for tasks if needed, the marshaller used for Caller and Worker should be sync.
  • attach Consumer (or NamedConsumer), Reporter
  • allocate worker routines
Customizable

Personalize Your Dingo

Many core behaviors can be customized:

Development Environment Setup

There is no dependency manager in this project, you need to install them by yourself.

go get github.com/streadway/amqp
go get github.com/garyburd/redigo/redis
go get github.com/stretchr/testify
go get github.com/satori/go.uuid

Install Redis and Rabbitmq, then unittest @ the root folder of dingo

go test -v ./...

Documentation

Overview

Package dingo is a task/job <=> worker framework for #golang.

Goal

This library tries to make tasks invoking / monitoring as easy as possible.
 - any function can be a worker function, as long as types of its parameters are supported.
 - return values of worker functions are also accessible.
 - could be used locally as a queue for background jobs, or remotely as a distributed task queue when connected with AMQP or Redis.

Design

The design is inspired by
 https://github.com/RichardKnop/machinery
 http://www.celeryproject.org/

A short version of "how a task is invoked" in this library is:
 -------- caller ---------
 - users input arguments are treated as []interface{}
 - marshall []interface{} to []byte
 - send []byte to broker
 - polling return values from the store
 -------- worker ---------
 - consume []byte from broker
 - unmarshall []byte to []interface{}(underlying types might be different)
 - try to apply type-correction on []interface{}
 - invoking the worker function
 - convert its return values to []interface{}
 - marshall []interface{} to []byte
 - send []byte to the store
 -------- worker ---------
 - the byte stream of return values is ready after polling
 - unmarshall []byte to []interface{}
 - try to apply type-correction on []interface{}
 - return []interface{} to users.

This library highly relies on reflection to provide flexibility, therefore,
it may run more slowly than other libraries without using reflection. To overcome this,
users can provide customized marshaller(s) and invoker(s) without using reflection. These
customization are task specific, thus users may choose the default marsahller/invoker for
most tasks, and provide customized marshaller/invoker to those tasks that are performance-critical.

Customization

These concept are virtualized for extensibility and customization, please refer to
corresponding reference for details:
 - Generation of ID for new tasks: dingo.IDMaker
 - Parameter Marshalling: dingo.Marshaller
 - Worker Function Invoking: dingo.Invoker
 - Task Publishing/Consuming: dingo.Producer/dingo.Consumer/dingo.NamedConsumer
 - Report Publishing/Consuming: dingo.Reporter/dingo.Store

Parameter Types

 Many parmeter types are supported by this library, except:
  - interface: no way to know the underlying type of an interface.
  - chan: not supported yet.
  - private field in struct, they would be ignored by most encoders. To support this,
    you need to provide a customized marshaller and invoker that can recognize those
	private fields.

TroubleShooting

It's relative hard to debug a multi-routine library. To know what's wrong inside, users
can subscribe to receive failure events.(App.Listen)

Index

Examples

Constants

This section is empty.

Variables

View Source
var ConsumerEvent = struct {
}{}

ConsumerEvent, IDs of events that might be sent to ConsumerHook

View Source
var Encode = struct {
	// Default marshalling mode
	Default int
	// JSON marshalling mode
	JSON int
	// Gob marshalling mode
	GOB int
	// JSON-Safe marshalling mode
	JSONSAFE int
}{
	0, 1, 2, 3,
}
View Source
var ErrCode = struct {
	// the worker function panic
	Panic int32
	// dingo.App shutdown
	Shutdown int32
}{
	1, 2,
}

Error code used in dingo.Error

View Source
var EventCode = struct {
	Generic             int
	TaskDeliveryFailure int
	DuplicatedPolling   int
}{
	0, 1, 2,
}
View Source
var EventLvl = struct {
	Debug   int
	Info    int
	Warning int
	Error   int
}{
	0,
	1,
	2,
	3,
}
View Source
var ID = struct {
	// default ID maker
	Default int
	// an ID maker implemented via uuid4
	UUID int
	// an ID maker implemented by atomic.AddInt64
	SEQ int
}{
	0, 1, 2,
}
View Source
var ObjT = struct {
	/*
		when this type used in dingo.App.Use, it means let
		dingo decide which type would be registered to dingo.App.
	*/
	Default int
	// this object provides dingo.Reporter interface
	Reporter int
	// this object provides dingo.Store interface
	Store int
	// this object provides dingo.Producer interface
	Producer int
	// this object provides dingo.Consumer/dingo.NamedConsumer interface
	Consumer int
	// this is a dingo.mapper object
	Mapper int
	// this is a dingo.worker object
	Worker int
	// this object provides dingo.bridge interface
	Bridge int
	// this object provides dingo.NamedConsumer interface
	NamedConsumer int
	// from chained routines
	ChainRoutine int
	/*
		all object types, when used in dingo.App.Listen, it means
		listen to events from all possible origins.
	*/
	All int
}{
	0,
	(1 << 0),
	(1 << 1),
	(1 << 2),
	(1 << 3),
	(1 << 4),
	(1 << 5),
	(1 << 6),
	(1 << 7),
	(1 << 8),
	int(^uint(0) >> 1),
}

ObjT are types of object, they are bit flag and can be combined. These flags are used in:

  • dingo.Use
  • dingo.Listen
View Source
var ProducerEvent = struct {
	// a new kind of task is declared.
	DeclareTask int
}{
	1,
}

ProducerEvent, event IDs that might be passed to dingo.Producer.ProducerHook

View Source
var ReceiptStatus = struct {
	// this task is received successfully.
	OK int
	// something goes wrong
	NOK int
	// dingo can't find workers for this tasks
	WorkerNotFound int
}{
	1, 2, 3,
}

ReceiptStatus allows broker implementer to know if they have to reject the received packet or not.

View Source
var ReporterEvent = struct {
	// a sequence of reports from this task is about to fire.
	BeforeReport int
}{
	1,
}

ReportEvent are those IDs of events that might be sent to ReporterHook

View Source
var ResultError = struct {
	// the report channel returned from 'dingo' is nil
	NoChannel error
	// timeout
	Timeout error
	// the report channel is closed
	ChannelClosed error
	// there is no handler registered, shouldn't call .Then()
	NoHandler error
}{
	errors.New("channel is nil"),
	errors.New("time out"),
	errors.New("channel closed"),
	errors.New("no handler registered"),
}
View Source
var Status = struct {
	None int16

	// the task is sent to the consumer.
	Sent int16

	// the task is sent to workers.
	Progress int16

	// the task is done
	Success int16

	// the task execution is failed.
	Fail int16

	// this field should always the last one
	Count int16
}{
	0, 1, 2, 3, 4, 5,
}
View Source
var StoreEvent = struct {
}{}

StoreEvent are those IDs of events that might be sent to StoreHook

Functions

func ComposeBytes

func ComposeBytes(h *Header, bs [][]byte) (b []byte, err error)

ComposeBytes composes slice of byte arrays could be composed into one byte stream, along with header section.

func DecomposeBytes

func DecomposeBytes(h *Header, b []byte) (bs [][]byte, err error)

DecomposeBytes can be used to decompose byte streams composed by "ComposeByte" into [][]byte

func NewLocalBackend

func NewLocalBackend(cfg *Config, to chan *ReportEnvelope) (v *localBackend, err error)

NewLocalBackend allocates a Backend implementation based on 'channel'. Users can provide a channel and share it between multiple Reporter(s) and Store(s) to connect them.

func NewLocalBroker

func NewLocalBroker(cfg *Config, to chan []byte) (v *localBroker, err error)

NewLocalBroker would allocate a Broker implementation based on 'channel'. Users can provide a channel and share it between multiple Producer(s) and Consumer(s) to connect them.

This one only implements Consumer interface, not the NamedConsumer one. So the dispatching of tasks relies on dingo.mapper

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App is the core component of dingo.

Example (Local)
package main

import (
	"fmt"
	"time"

	"github.com/mission-liao/dingo"
)

func main() {
	// this example demonstrates a job queue runs in background

	var err error
	defer func() {
		if err != nil {
			fmt.Printf("%v\n", err)
		}
	}()

	// a App in local mode
	app, err := dingo.NewApp("local", nil)
	if err != nil {
		return
	}

	// register a worker function murmur
	err = app.Register("murmur", func(msg string, repeat int, interval time.Duration) {
		for ; repeat > 0; repeat-- {
			select {
			case <-time.After(interval):
				fmt.Printf("%v\n", msg)
			}
		}
	})
	if err != nil {
		return
	}

	// allocate 5 workers, sharing 1 report channel
	_, err = app.Allocate("murmur", 5, 1)
	if err != nil {
		return
	}

	results := []*dingo.Result{}
	// invoke 10 tasks
	for i := 0; i < 10; i++ {
		results = append(
			results,
			dingo.NewResult(
				// name, option, parameter#1, parameter#2, parameter#3...
				app.Call("murmur", nil, fmt.Sprintf("this is %d speaking", i), 10, 100*time.Millisecond),
			))
	}

	// wait until those tasks are done
	for _, v := range results {
		err = v.Wait(0)
		if err != nil {
			return
		}
	}

	// release resource
	err = app.Close()
	if err != nil {
		return
	}
}
Output:

func NewApp

func NewApp(mode string, cfg *Config) (app *App, err error)

NewApp whose "mode" refers to different modes of dingo:

  • "local": an App works in local mode, which is similar to other background worker framework.
  • "remote": an App works in remote(distributed) mode, brokers(ex. AMQP...) and backends(ex. redis..., if required) would be needed to work.

func (*App) AddIDMaker

func (dg *App) AddIDMaker(expectedID int, m IDMaker) error

AddIDMaker registers a customized IDMaker, input should be an object implements IDMaker.

You can register different id-makers to different tasks, internally, dingo would take both (name, id) as identity of a task.

The requirement of IDMaker:

  • uniqueness of generated string among all generated tasks.
  • routine(thread) safe.

The default IDMaker used by dingo is implemented by uuid4.

func (*App) AddMarshaller

func (dg *App) AddMarshaller(expectedID int, m Marshaller) error

AddMarshaller registers a customized Marshaller, input should be an object implements both Marshaller and Invoker.

You can pick any builtin Invoker(s)/Marshaller(s) combined with your customized one:

app.AddMarshaller(3, &struct{JsonSafeMarshaller, __your_customized_invoker__})

"expectedID" is the expected identifier of this Marshaller, which could be useful when you need to sync the Marshaller-ID between producers and consumers. 0~3 are occupied by builtin Marshaller(s). Suggested "expectedID" should begin from 100.

func (*App) Allocate

func (dg *App) Allocate(name string, count, share int) (remain int, err error)

Allocate would allocate more workers. When your Consumer(s) implement NamedConsumer, a new listener (to brokers) would be allocated each time you call this function. All allocated workers would serve that listener.

If you want to open more channels to consume from brokers, just call this function multiple times.

parameters:

  • name: the name of tasks.
  • count: count of workers to be initialized.
  • share: the count of workers sharing one report channel.

returns:

  • remain: remaining count of workers that failed to initialize.
  • err: any error produced

func (*App) Call

func (dg *App) Call(name string, opt *Option, args ...interface{}) (reports <-chan *Report, err error)

Call would initiate a task by providing:

  • "name" of tasks
  • execution-"option" of tasks, could be nil
  • argument of corresponding worker function.

A reporting channel would be returned for callers to monitor the status of tasks, and access its result. A suggested procedure to monitor reporting channels is

finished:
  for {
    select {
      case r, ok := <-report:
      if !ok {
        // dingo.App is closed somewhere else
        break finished
      }

      if r.OK() {
        // the result is ready
        returns := r.Returns()
      }
      if r.Fail() {
        // get error
        err := r.Error()
      }

      if r.Done() {
        break finished
      }
    }
  }

Multiple reports would be sent for each task:

  • Sent: the task is already sent to brokers.
  • Progress: the consumer received this task, and about to execute it
  • Success: this task is finished without error.
  • Fail: this task failed for some reason.

Noted: the 'Fail' here doesn't mean your worker function is failed, it means "dingo" doesn't execute your worker function properly.

func (*App) Close

func (dg *App) Close() (err error)

Close is used to release this instance. All reporting channels are closed after returning. However, those sent tasks/reports wouldn't be reclaimed.

func (*App) Listen

func (dg *App) Listen(targets, level, expectedID int) (id int, events <-chan *Event, err error)

Listen would subscribe the channel to receive events from 'dingo'.

"targets" are instances you want to monitor, they include:

  • dingo.ObjT.Reporter: the Reporter instance attached to this App.
  • dingo.ObjT.Store: the Store instance attached to this App.
  • dingo.ObjT.Producer: the Producer instance attached to this App.
  • dingo.ObjT.Consumer: the Consumer/NamedConsumer instance attached to this App.
  • dingo.ObjT.Mapper: the internal component, turn if on when debug.
  • dingo.ObjT.Worker: the internal component, turn it on when debug.
  • dingo.ObjT.Bridge: the internal component, turn it on when debug.
  • dingo.ObjT.All: every instance.

They are bit flags and can be combined as "targets", like:

ObjT.Bridge | ObjT.Worker | ...

"level" are minimal severity level expected, include:

  • dingo.EventLvl.Debug
  • dingo.EventLvl.Info
  • dingo.EventLvl.Warning
  • dingo.EventLvl.Error

"id" is the identity of this event channel, which could be used to stop monitoring by calling dingo.App.StopListen.

In general, a dedicated go routine would be initiated for this channel, with an infinite for loop, like this:

for {
  select {
    case e, ok := <-events:
      if !ok {
        // after App.Close(), all reporting channels would be closed,
        // except those channels abandoned by App.StopListen.
        return
      }
      fmt.Printf("%v\n", e)
    case <-quit:
      return
  }
}

func (*App) Register

func (dg *App) Register(name string, fn interface{}) (err error)

Register would register a worker function

parameters:

  • name: name of tasks
  • fn: the function that actually perform the task.

returns:

  • err: any error produced

func (*App) SetIDMaker

func (dg *App) SetIDMaker(name string, id int) error

SetIDMaker would set IDMaker used for a specific kind of tasks

parameters:

  • name: name of tasks
  • idmaker: id of IDMaker you would like to use when generating tasks.

func (*App) SetMarshaller

func (dg *App) SetMarshaller(name string, taskMash, reportMash int) error

SetMarshaller would set marshallers used for marshalling tasks and reports

parameters:

  • name: name of tasks
  • taskMash, reportMash: id of Marshaller for 'Task' and 'Report'

func (*App) SetOption

func (dg *App) SetOption(name string, opt *Option) error

SetOption would set default option used for a worker function.

func (*App) StopListen

func (dg *App) StopListen(id int) (err error)

StopListen would stop listening events.

Note: those channels stopped by App.StopListen wouldn't be closed but only be reclaimed by GC.

func (*App) Use

func (dg *App) Use(obj Object, types int) (id int, used int, err error)

Use is used to attach an instance, instance could be any instance implementing Reporter, Backend, Producer, Consumer.

parameters:

  • obj: object to be attached
  • types: interfaces contained in 'obj', refer to dingo.ObjT

returns:

  • id: identifier assigned to this object, 0 is invalid value
  • err: errors

For a producer, the right combination of "types" is ObjT.Producer|ObjT.Store, if reporting is not required, then only ObjT.Producer is used.

For a consumer, the right combination of "types" is ObjT.Consumer|ObjT.Reporter, if reporting is not reuqired(make sure there is no producer await), then only ObjT.Consumer is used.

Example (Caller)
/*
	import (
		"fmt"
		"time"

		"github.com/mission-liao/dingo"
		"github.com/mission-liao/dingo/amqp"
	)
*/		)
*/
// this example demostrate a caller based on AMQP, used along with ExampleApp_Use_worker
// make sure you install a rabbitmq server locally.
var err error
defer func() {
	if err != nil {
		fmt.Printf("%v\n", err)
	}
}()

// an App in remote mode
app, err := dingo.NewApp("remote", nil)
if err != nil {
	return
}

// attach an AMQP producer to publish your tasks
broker, err := dgamqp.NewBroker(dgamqp.DefaultAmqpConfig())
if err != nil {
	return
}
_, _, err = app.Use(broker, dingo.ObjT.Producer)
if err != nil {
	return
}

// attach an AMQP store to receive reports from datastores.
backend, err := dgamqp.NewBackend(dgamqp.DefaultAmqpConfig())
if err != nil {
	return
}
_, _, err = app.Use(backend, dingo.ObjT.Store)
if err != nil {
	return
}

// register a work function that murmur
err = app.Register("murmur", func(speech *struct {
	Prologue string
	Script   []string
}, interval time.Duration) (countOfSentence int) {
	// speak the prologue
	fmt.Printf("%v:\n", speech.Prologue)
	countOfSentence++

	// speak the script
	for _, v := range speech.Script {
		<-time.After(interval)
		fmt.Printf("%v\n", v)
		countOfSentence++
	}

	// return the total sentence we talked
	return
})
if err != nil {
	return
}

// compose a script to talk
script := &struct {
	Prologue string
	Script   []string
}{
	Script: []string{
		"Today, I'm announcing this library.",
		"It should be easy to use, ",
		"and fun to play with.",
		"Merry X'mas.",
	},
}

// invoke 20 tasks
results := []*dingo.Result{}
for i := 0; i < 20; i++ {
	script.Prologue = fmt.Sprintf("this is %d speaking", i)
	results = append(results,
		dingo.NewResult(
			// name, option, parameter#1, parameter#2 ...
			app.Call("murmur", nil, script, 100*time.Millisecond),
		))
}

// wait until those tasks are done
for _, v := range results {
	err = v.Wait(0)
	if err != nil {
		return
	}

	// result is accessible
	fmt.Printf("one worker spoke %v sentences\n", v.Last.Return()[0].(int))
}

// release resource
err = app.Close()
if err != nil {
	return
}
Output:

Example (Worker)
/*
	import (
		"fmt"
		"os"
		"time"

		"github.com/mission-liao/dingo"
		"github.com/mission-liao/dingo/amqp"
	)
*/
	)
*/
// this example demonstrate a worker based on AMQP, used along with ExampleApp_Use_caller
// make sure you install a rabbitmq server locally.
var err error
defer func() {
	if err != nil {
		fmt.Printf("%v\n", err)
	}
}()

// an App in remote mode
app, err := dingo.NewApp("remote", nil)
if err != nil {
	return
}

// attach an AMQP consumer to receive tasks
broker, err := dgamqp.NewBroker(dgamqp.DefaultAmqpConfig())
if err != nil {
	return
}
_, _, err = app.Use(broker, dingo.ObjT.NamedConsumer)
if err != nil {
	return
}

// attach an AMQP reporter to publish reports
backend, err := dgamqp.NewBackend(dgamqp.DefaultAmqpConfig())
if err != nil {
	return
}
_, _, err = app.Use(backend, dingo.ObjT.Reporter)
if err != nil {
	return
}

// register a work function that murmur
err = app.Register("murmur", func(speech *struct {
	Prologue string
	Script   []string
}, interval time.Duration) (countOfSentence int) {
	// speak the prologue
	fmt.Printf("%v:\n", speech.Prologue)
	countOfSentence++

	// speak the script
	for _, v := range speech.Script {
		<-time.After(interval)
		fmt.Printf("%v\n", v)
		countOfSentence++
	}

	// return the total sentence we talked
	return
})
if err != nil {
	return
}

// allocate 1 workers, sharing 1 report channel
_, err = app.Allocate("murmur", 1, 1)
if err != nil {
	return
}

// wait until a key stroke
var stroke []byte = make([]byte, 100)
fmt.Println("waiting for tasks...stop waiting by pressing enter")
os.Stdin.Read(stroke)

// release resource
err = app.Close()
if err != nil {
	return
}
Output:

type Backend

type Backend interface {
	Reporter
	Store
}

Backend interface is composed of Reporter/Store

type BackendTestSuite

type BackendTestSuite struct {
	suite.Suite

	Gen   func() (Backend, error)
	Trans *fnMgr
	Bkd   Backend
	Rpt   Reporter
	Sto   Store
	Tasks []*Task
}

All dingo.Backend provider should pass this test suite. Example testing code:

type myBackendTestSuite struct {
  dingo.BackendTestSuite
}
func TestMyBackendTestSuite(t *testing.T) {
  suite.Run(t, &myBackendTestSuite{
    dingo.BackendTestSuite{
      Gen: func() (dingo.Backend, error) {
        // generate a new instance of your backend.
      },
    },
  })
}

func (*BackendTestSuite) SetupSuite

func (ts *BackendTestSuite) SetupSuite()

func (*BackendTestSuite) SetupTest

func (ts *BackendTestSuite) SetupTest()

func (*BackendTestSuite) TearDownSuite

func (ts *BackendTestSuite) TearDownSuite()

func (*BackendTestSuite) TearDownTest

func (ts *BackendTestSuite) TearDownTest()

func (*BackendTestSuite) TestBasic

func (ts *BackendTestSuite) TestBasic()

func (*BackendTestSuite) TestExpect

func (ts *BackendTestSuite) TestExpect()

func (*BackendTestSuite) TestOrder

func (ts *BackendTestSuite) TestOrder()

func (*BackendTestSuite) TestSameID

func (ts *BackendTestSuite) TestSameID()

type Broker

type Broker interface {
	Producer
	Consumer
}

Broker interface is composed of Producer/Consumer

type BrokerTestSuite

type BrokerTestSuite struct {
	suite.Suite

	Trans         *fnMgr
	Gen           func() (interface{}, error)
	Pdc           Producer
	Csm           Consumer
	Ncsm          NamedConsumer
	ConsumerNames []string
}

All dingo.Broker provider should pass this test suite. Example testing code:

type myBrokerTestSuite struct {
  dingo.BrokerTestSuite
}
func TestMyBrokerTestSuite(t *testing.T) {
  suite.Run(t, &myBrokerTestSuite{
    dingo.BrokerTestSuite{
      Gen: func() (interface{}, error) {
        // generate a new instance of your backend.
        // both dingo.Broker and dingo.NamedBroker are acceptable.
      },
    },
  })
}

func (*BrokerTestSuite) SetupSuite

func (ts *BrokerTestSuite) SetupSuite()

func (*BrokerTestSuite) SetupTest

func (ts *BrokerTestSuite) SetupTest()

func (*BrokerTestSuite) TearDownSuite

func (ts *BrokerTestSuite) TearDownSuite()

func (*BrokerTestSuite) TearDownTest

func (ts *BrokerTestSuite) TearDownTest()

func (*BrokerTestSuite) TestBasic

func (ts *BrokerTestSuite) TestBasic()

func (*BrokerTestSuite) TestDuplicated

func (ts *BrokerTestSuite) TestDuplicated()

func (*BrokerTestSuite) TestExpect

func (ts *BrokerTestSuite) TestExpect()

func (*BrokerTestSuite) TestNamed

func (ts *BrokerTestSuite) TestNamed()

type Config

type Config struct {
	Mappers_ int `json:"Mappers"`
}

func DefaultConfig

func DefaultConfig() *Config

func (*Config) Mappers

func (cfg *Config) Mappers(count int) *Config

Mappers is to set the count of mappers initiated. Note: "mapper" is the replacement of "Broker" in local mode.

type Consumer

type Consumer interface {
	// hook for listening event from dingo
	// parameter:
	// - eventID: which event?
	// - payload: corresponding payload, its type depends on 'eventID'
	// returns:
	// - err: errors
	ConsumerHook(eventID int, payload interface{}) (err error)

	// create a new listener to receive tasks
	//
	// parameters:
	// - rcpt: a channel that 'dingo' would send 'TaskReceipt' for tasks from 'tasks' channel.
	// returns:
	// - tasks: 'dingo' would consume from this channel for new tasks
	// - err: any error during initialization
	AddListener(rcpt <-chan *TaskReceipt) (tasks <-chan []byte, err error)

	// all listeners are stopped, their corresponding "tasks" channel(returned from AddListener)
	// would be closed.
	StopAllListeners() (err error)
}

Consumer would consume tasks from broker(s). This kind of Consumer(s) is easier to implement, every task is sent to a single queue, and consumed from a single queue.

The interaction between Consumer(s) and dingo are asynchronous by the channel you provide in 'AddListener'.

type CustomMarshaller

type CustomMarshaller struct {
	Codec CustomMarshallerCodec
}

CustomMarshaller is a helper Marshaller for users to create customized Marshaller(s) by providing

 several hooks. Users just need to take care of things they know:
  - input arguments
  - outpu return values
 other payloads of task/report are handled by CustomMarshaller.

 Here is a partial demo with json:
   // worker function, we are going to provide a custom marshaller
   // without any reflect for it.
   fn := func(msg string, category int) (done bool) {
      ...
   }

   // implement CustomMarshallerCodec interface
   type myCodec struct {}
   // encoding arguments
   func (c *myCodec) EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error) {
      bMsg, _ := json.Marshal(val[0])
      bCategory, _ := json.Marshal(val[1])
      return [][]byte{bMsg, bCategory}, nil
   }
   // encoding returns
   func (c *myCodec) EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error) {
      bDone, _ := json.Marshal(val[0])
      return [][]byte{bDone}, nil
   }
   // decoding arguments
   func (c *myCodec) DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error) {
      var (
         msg      string
         category int
      )
      // unmarshall each argument
      json.Unmarshal(bs[0], &msg)
      json.Unmarshal(bs[1], &category)
      return []interface{}{msg, category}, nil
   }
   func (c *myCodec) DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error) {
	var done bool
    json.Unmarshal(bs[0], &done)
	return []interface{}{done}, nil
   }

   // register it to dingo.App
   app.AddMarshaller(expectedMashId, &struct{
      CustomMarshaller,
      myCustomInvoker,
   }{
      CustomMarshaller{Codec: &myCodec{}},
      myCustomInvoker{},
   })
Example
package main

import (
	"encoding/json"
	"fmt"

	"github.com/mission-liao/dingo"
)

type testCustomMarshallerCodec struct{}

func (me *testCustomMarshallerCodec) Prepare(name string, fn interface{}) (err error) { return }
func (me *testCustomMarshallerCodec) EncodeArgument(_ interface{}, val []interface{}) (bs [][]byte, err error) {
	fmt.Println("encode argument is called")
	b, err := json.Marshal(val[0])
	if err != nil {
		return
	}
	bs = [][]byte{b}
	return
}
func (me *testCustomMarshallerCodec) DecodeArgument(_ interface{}, bs [][]byte) (val []interface{}, err error) {
	fmt.Println("decode argument is called")
	var input []string
	err = json.Unmarshal(bs[0], &input)
	if err != nil {
		return
	}
	val = []interface{}{input}
	return
}
func (me *testCustomMarshallerCodec) EncodeReturn(_ interface{}, val []interface{}) (bs [][]byte, err error) {
	fmt.Println("encode return is called")
	b, err := json.Marshal(val[0])
	if err != nil {
		return
	}
	bs = [][]byte{b}
	return
}
func (me *testCustomMarshallerCodec) DecodeReturn(_ interface{}, bs [][]byte) (val []interface{}, err error) {
	fmt.Println("decode return is called")
	var ret string
	err = json.Unmarshal(bs[0], &ret)
	if err != nil {
		return
	}
	val = []interface{}{ret}
	return
}

type testMyInvoker3 struct{}

func (me *testMyInvoker3) Call(f interface{}, param []interface{}) ([]interface{}, error) {
	fmt.Println("my invoker is called for Call")

	ret := f.(func([]string) string)(param[0].([]string))
	return []interface{}{ret}, nil
}
func (me *testMyInvoker3) Return(f interface{}, returns []interface{}) ([]interface{}, error) {
	fmt.Println("my invoker is called for Return")
	return returns, nil
}

func main() {
	/*
		import (
			"encoding/json"
			"fmt"

			"github.com/mission-liao/dingo"
		)
	*/
	// this example demonstrate the usage of using a
	// customized marshaller by encoding every parameter
	// in JSON.
	// And invoke it with customized invoker
	var err error
	defer func() {
		if err != nil {
			fmt.Printf("%v\n", err)
		}
	}()

	// an App in remote mode, with local backend/broker
	app, err := dingo.NewApp("remote", nil)
	if err != nil {
		return
	}

	// attach a local broker.
	broker, err := dingo.NewLocalBroker(dingo.DefaultConfig(), nil)
	if err != nil {
		return
	}
	_, _, err = app.Use(broker, dingo.ObjT.Default)
	// attach a local backend
	backend, err := dingo.NewLocalBackend(dingo.DefaultConfig(), nil)
	if err != nil {
		return
	}
	_, _, err = app.Use(backend, dingo.ObjT.Default)
	if err != nil {
		return
	}

	// register customize marshaller & invoker
	err = app.AddMarshaller(101, &struct {
		testMyInvoker3
		dingo.CustomMarshaller
	}{
		testMyInvoker3{},
		dingo.CustomMarshaller{Codec: &testCustomMarshallerCodec{}},
	})
	if err != nil {
		return
	}

	// register worker function
	err = app.Register("concat", func(words []string) (ret string) {
		for _, v := range words {
			ret = ret + v
		}
		return
	})
	if err != nil {
		return
	}

	// change marshaller of worker function
	err = app.SetMarshaller("concat", 101, 101)
	if err != nil {
		return
	}

	// allocate workers
	_, err = app.Allocate("concat", 1, 1)
	if err != nil {
		return
	}

	// trigger the fire...
	result := dingo.NewResult(app.Call("concat", nil, []string{"Merry ", "X", "'mas"}))
	err = result.Wait(0)
	if err != nil {
		return
	}
	result.OnOK(func(ret string) {
		fmt.Printf("%v\n", ret)
	})

	err = app.Close()
	if err != nil {
		return
	}
}
Output:

func (*CustomMarshaller) DecodeReport

func (ms *CustomMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)

func (*CustomMarshaller) DecodeTask

func (ms *CustomMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)

func (*CustomMarshaller) EncodeReport

func (ms *CustomMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)

func (*CustomMarshaller) EncodeTask

func (ms *CustomMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)

func (*CustomMarshaller) Prepare

func (ms *CustomMarshaller) Prepare(name string, fn interface{}) (err error)

type CustomMarshallerCodec

type CustomMarshallerCodec interface {

	/*
	 A hook called when CustomMarshaller.Prepare is called.
	*/
	Prepare(name string, fn interface{}) (err error)

	/*
	 encode arguments.
	 - fn: function fingerprint
	 - val: slice of arguments

	 You can encode each argument one by one, and compose them into one
	 slice of byte slice. (or anyway you want)
	*/
	EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error)

	/*
	 decode arguments.
	 - fn: function fingerprint
	 - bs: slice of byte slice
	*/
	DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error)

	/*
	 encode returns.
	 - fn: function fingerprint
	 - val: slice of returns

	 You can encode each return one by one, and compose them into one
	 slice of byte slice. (or anyway you want)
	*/
	EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error)

	/*
	 decode arguments.
	 - fn: function fingerprint
	 - bs: slice of byte slice
	*/
	DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error)
}

CustomMarshallerCodec is used by a marshaller developed to help users to provide a customized marshaller by providing a "codec" to encode/decode arguments/returns.

type Error

type Error struct {
	C int32
	M string
}

A generic error that could be marshalled/unmarshalled by JSON.

func NewErr

func NewErr(code int32, err error) *Error

func (*Error) Code

func (err *Error) Code() int32

func (*Error) Error

func (err *Error) Error() string

func (*Error) Msg

func (err *Error) Msg() string

type Event

type Event struct {
	// origin of event: please refer to dingo.ObjT for possible values.
	Origin  int
	Time    time.Time
	Level   int
	Code    int
	Payload interface{}
}

func NewEvent

func NewEvent(orig, lvl, code int, payload interface{}) *Event

func NewEventFromError

func NewEventFromError(orig int, err error) *Event

type GenericInvoker

type GenericInvoker struct{}

This Invoker is a generic one which can convert values from different types. For example:

  • a struct can be converted from a map or another struct.
  • pointers or pointers of pointers or ... is also handled.

However, the flexibility comes with price, it's also the slowest option. All builtin Marshaller(s) could be used with this Invoker.

func (*GenericInvoker) Call

func (vk *GenericInvoker) Call(f interface{}, param []interface{}) ([]interface{}, error)

func (*GenericInvoker) Return

func (vk *GenericInvoker) Return(f interface{}, returns []interface{}) ([]interface{}, error)

type GobMarshaller

type GobMarshaller struct{}

GobMarshaller is a marshaller implemented via gob encoding. Note: this Marshaller can work with both GenericInvoker and LazyInvoker.

func (*GobMarshaller) DecodeReport

func (ms *GobMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)

func (*GobMarshaller) DecodeTask

func (ms *GobMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)

func (*GobMarshaller) EncodeReport

func (ms *GobMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)

func (*GobMarshaller) EncodeTask

func (ms *GobMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)

func (*GobMarshaller) Prepare

func (ms *GobMarshaller) Prepare(name string, fn interface{}) (err error)
type Header struct {
	// header type, "dingo" would raise an error when encountering headers with
	// unknown types.
	T int16

	// dingo-generated id for this task
	I string

	// task name
	N string

	// registries(a serious of uint64), their usage depends on Marshaller(s).
	R []uint64
}
 The Common header section of the byte stream marshalled from Marshaller(s),
 external components(broker.Producer, broker.Consumer, backend.Reporter, backend.Store) could rely
 on Header to get some identity info from the byte stream they have, like this:

   h, err := DecodeHeader(b)
   // the id of task
   h.ID()
   // the name of task
   h.Name()

 Registries could be added to Header. For example, if your Marshaller encodes each argument
 in different byte streams, you could record their lengths(in byte) in registries section
 in Header:

   // marshalling
   bs := [][]byte{}
   h := task.H
   for _, v := range args {
     b_, _ := json.Marshal(v)
	 h.Append(uint64(len(b_)))
	 bs = append(bs, b_)
   }

   // compose those byte streams
   b, _ := h.Flush(0) // header section
   for _, v := range bs {
	   b = append(b, v)
   }

   // unmarshalling
   h, _ := DecodeHeader(b)
   for _, v := range h.Registry() {
     // you could rely on registry to decompose
     // the byte stream here.
   }

func DecodeHeader

func DecodeHeader(b []byte) (h *Header, err error)

func NewHeader

func NewHeader(id, name string) *Header

func (*Header) Append

func (hd *Header) Append(r uint64)

func (*Header) Flush

func (hd *Header) Flush(prealloc uint64) ([]byte, error)

Flush the header to a byte stream. Note: after flushing, all registries would be reset.

func (*Header) ID

func (hd *Header) ID() string

func (*Header) Length

func (hd *Header) Length() uint64

func (*Header) Name

func (hd *Header) Name() string

func (*Header) Registry

func (hd *Header) Registry() []uint64

func (*Header) Reset

func (hd *Header) Reset()

func (*Header) Type

func (hd *Header) Type() int16

type HetroRoutines

type HetroRoutines struct {
	// contains filtered or unexported fields
}

func NewHetroRoutines

func NewHetroRoutines() *HetroRoutines

func (*HetroRoutines) Close

func (rs *HetroRoutines) Close()

func (*HetroRoutines) Events

func (rs *HetroRoutines) Events() chan *Event

func (*HetroRoutines) New

func (rs *HetroRoutines) New(want int) (quit <-chan int, done chan<- int, idx int)

func (*HetroRoutines) Stop

func (rs *HetroRoutines) Stop(idx int) (err error)

type IDMaker

type IDMaker interface {
	// routine(thread) safe is required.
	NewID() (string, error)
}

IDMaker is an object that can generate a series of identiy, typed as string. Each idenity should be unique.

type Invoker

type Invoker interface {
	// invoker the function "f" by parameters "param", and the returns
	// are represented as []interface{}
	Call(f interface{}, param []interface{}) ([]interface{}, error)

	// when marshal/unmarshal with json, some type information would be lost.
	// this function helps to convert those returns with correct type info provided
	// by function's reflection.
	//
	// parameters:
	// - f: the function
	// - returns: the array of returns
	// returns:
	// converted return array and error
	Return(f interface{}, returns []interface{}) ([]interface{}, error)
}

The responsibility of Invoker(s) are:

  • invoking the function by converting []interface{} to []reflect.Value, and calling reflect.Call.
  • convert the return value of reflect.Call from []reflect.Value to []interface{}

Because most Marshaller(s) can't unmarshal byte stream to original type(s), Invokers are responsible for type-correction. The builtin Invoker(s) of "dingo" are also categorized by their ability to correct type(s).

Note: all implemented functions should be thread-safe.

type JSONSafeCodec

type JSONSafeCodec struct{}

func (*JSONSafeCodec) DecodeArgument

func (codec *JSONSafeCodec) DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error)

func (*JSONSafeCodec) DecodeReturn

func (codec *JSONSafeCodec) DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error)

func (*JSONSafeCodec) EncodeArgument

func (codec *JSONSafeCodec) EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error)

func (*JSONSafeCodec) EncodeReturn

func (codec *JSONSafeCodec) EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error)

func (*JSONSafeCodec) Prepare

func (codec *JSONSafeCodec) Prepare(name string, fn interface{}) (err error)

type JsonMarshaller

type JsonMarshaller struct{}

JsonMarshaller is a marshaller implemented via json encoding. Note: this Marshaller can only work with GenericInvoker.

func (*JsonMarshaller) DecodeReport

func (ms *JsonMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)

func (*JsonMarshaller) DecodeTask

func (ms *JsonMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)

func (*JsonMarshaller) EncodeReport

func (ms *JsonMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)

func (*JsonMarshaller) EncodeTask

func (ms *JsonMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)

func (*JsonMarshaller) Prepare

func (ms *JsonMarshaller) Prepare(string, interface{}) (err error)

type LazyInvoker

type LazyInvoker struct{}

A less generic Invoker that can handle pointer with different level. This Invoker can only work with GobMarshaller and JsonSafeMarshaller.

func (*LazyInvoker) Call

func (vk *LazyInvoker) Call(f interface{}, param []interface{}) ([]interface{}, error)

func (*LazyInvoker) Return

func (vk *LazyInvoker) Return(f interface{}, returns []interface{}) ([]interface{}, error)

type Marshaller

type Marshaller interface {

	// you can perform any preprocessing for every worker function when registered.
	Prepare(name string, fn interface{}) (err error)

	// Encode a task.
	EncodeTask(fn interface{}, task *Task) (b []byte, err error)

	// Decode a task.
	DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)

	// Encode a report.
	EncodeReport(fn interface{}, report *Report) (b []byte, err error)

	// Decode a report.
	DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
}

Marshaller(s) is the major component between []interface{} and []byte.

  • Note: all marshalled []byte should be prefixed with a Header.
  • Note: all implemented functions should be routine(thread)-safe.

type Meta

type Meta interface {
	ID() string
	Name() string
}

type NamedBroker

type NamedBroker interface {
	Producer
	NamedConsumer
}

NamedBroker interface is composed of Producer/NamedConsumer

type NamedConsumer

type NamedConsumer interface {
	// hook for listening event from dingo
	// parameter:
	// - eventID: which event?
	// - payload: corresponding payload, its type depends on 'eventID'
	// returns:
	// - err: errors
	ConsumerHook(eventID int, payload interface{}) (err error)

	// create a new consumer to receive tasks
	//
	// parameters:
	// - name: name of task to be received
	// - rcpt: a channel that 'dingo' would send 'TaskReceipt' for tasks from 'tasks'.
	// returns:
	// - tasks: 'dingo' would consume from this channel for new tasks
	// - err: any error during initialization
	AddListener(name string, rcpt <-chan *TaskReceipt) (tasks <-chan []byte, err error)

	// all listeners are stopped, their corresponding "tasks" channel(returned from AddListener)
	// would be closed.
	StopAllListeners() (err error)
}

NamedConsumer would consume tasks from broker(s). Different kind of tasks should be sent to different queues, and consumed from different queues.

With this kind of Consumer(s), you can deploy different kinds of workers on machines, and each one of them handles different sets of worker functions.

type Object

type Object interface {
	// what dingo expects from this object
	Expect(types int) error

	// allow dingo to attach event channels used in this object
	Events() ([]<-chan *Event, error)

	// releasing resource
	Close() error
}

Object is an interface, and an object implements this interface means:

  • dingo can have a trigger to release the resource allocated by this object.
  • dingo can aggregate events raised from this object, (those events can be subscribed by dingo.App.Listen)

All objects attached via dingo.App.Use should implement this interface.

type Option

type Option struct {
	// IgnoreReport: stop reporting when executing tasks.
	IR bool
	// MonitorProgress: monitoring the progress of task execution
	MP bool
}

func DefaultOption

func DefaultOption() *Option

func (*Option) GetIgnoreReport

func (opt *Option) GetIgnoreReport() bool

func (*Option) GetMonitorProgress

func (opt *Option) GetMonitorProgress() bool

func (*Option) IgnoreReport

func (opt *Option) IgnoreReport(ignore bool) *Option

func (*Option) MonitorProgress

func (opt *Option) MonitorProgress(only bool) *Option

type Producer

type Producer interface {
	// hook for listening event from dingo
	// parameter:
	// - eventID: which event?
	// - payload: corresponding payload, its type depends on 'eventID'
	// returns:
	// - err: errors
	ProducerHook(eventID int, payload interface{}) (err error)

	// send a task to brokers, it should be a blocking call.
	//
	// parameters:
	// - meta: the meta info of this task to be sent.
	// - b: the byte stream of this task.
	Send(meta Meta, b []byte) error
}

Producer is responsibe for sending tasks to broker(s).

type Report

type Report struct {
	H *Header
	P *ReportPayload
}

Report is the reports for task execution.

When Report.Done() is true, it means no more reports would be sent. And either Report.OK() or Report.Fail() would be true.

If Report.OK() is true, the task execution is succeeded, and you can grab the return value from Report.Returns(). Report.Returns() would give you an []interface{}, and the type of every elements in that slice would be type-corrected according to the work function.

If Report.Fail() is true, the task execution is failed, and you can grab the reason from Report.Error().

func (*Report) Done

func (r *Report) Done() bool

Done means the task is finished, and no more reports would be sent. Either Error() or Return() would have values to check.

func (*Report) Error

func (r *Report) Error() *Error

Error refers to possible error raised during execution, which is packed into transportable form.

func (*Report) Fail

func (r *Report) Fail() bool

Fail means the task is done and failed, Error() would have values to check.

func (*Report) ID

func (r *Report) ID() string

ID refers to identifier of this report, and all reports belongs to one task share the same ID.

func (*Report) Name

func (r *Report) Name() string

Name refers to the name of this task that the report belongs to.

func (*Report) OK

func (r *Report) OK() bool

OK means the task is done and succeeded, Return() would have values to check.

func (*Report) Option

func (r *Report) Option() *Option

Option refers to execution options of the report, inherits from the tasks that it belongs to.

func (*Report) Return

func (r *Report) Return() []interface{}

Return refers to the return values of worker function.

func (*Report) Status

func (r *Report) Status() int16

Status refers to current execution status of this task

type ReportEnvelope

type ReportEnvelope struct {
	ID   Meta
	Body []byte
}

ReportEvenlope is the standard package sent through the channel to Reporter. The main requirement to fit is to allow Reporter can know the meta info of the byte stream to send.

type ReportPayload

type ReportPayload struct {
	S int16
	E *Error
	O *Option
	R []interface{}
}

type Reporter

type Reporter interface {
	// hook for listening events from dingo
	// parameter:
	// - eventID: which event?
	// - payload: corresponding payload, its type depends on 'eventID'
	// returns:
	// - err: errors
	ReporterHook(eventID int, payload interface{}) (err error)

	// attach a report channel to backend. what dingo can promise is:
	// - all reports belongs to the same task(name, id) would be sent through the same channel
	//
	// parameters:
	// - name: all reports sent through this channel would be this name
	// - reports: a input channel to receive reports from dingo.
	// returns:
	// - err: errors
	Report(name string, reports <-chan *ReportEnvelope) (id int, err error)
}

Reporter is responsible for sending reports to backend(s). The interaction between Reporter(s) and dingo are asynchronous by channels.

type Result

type Result struct {
	Last *Report
	// contains filtered or unexported fields
}

Result is a wrapper of chan *dingo.Report returned from dingo.App.Call, taking care of the logic to handle asynchronous result from 'dingo'.

Example usage:

r := dingo.NewResult(app.Call(...))

// blocking until done
err := r.Wait(0)
if err == nil {
  r.Last // the last Report
}

// polling for every 1 second
for dingo.ResultError.Timeout == r.Wait(1*time.Second) {
  // logging or ...
}

When the task is done, you could register a handler function, whose fingerprint is identical to the return part of worker functions. For example, if the worker function is:

func ComposeWords(words []string) (count int, composed string)

Its corresponding 'OnOK' handler is:

func (count int, composed string) {...}

When anything goes wrong, you could register a handler function via 'OnNOK', whose fingerprint is

func (*Error, error)

Both failure reports or errors generated in 'Result' object would be passed to this handler, at least one of them would not be nil.

You can register handlers before calling 'Wait', or call 'Wait' before registering handlers. The ordering doesn't matter. Those handlers would be called exactly once.

func NewResult

func NewResult(reports <-chan *Report, err error) (r *Result)

NewResult simply wrap this factory function with the calling to dingo.Call.

NewResult(app.Call("test", ...))

func (*Result) NOK

func (rt *Result) NOK() bool

NOK is used to check the status is NOK or not. note: !NOK != OK

func (*Result) OK

func (rt *Result) OK() bool

OK is used to check the status is OK or not

func (*Result) OnNOK

func (rt *Result) OnNOK(efn func(*Error, error))

OnNOK is used to set the handler for the failure case.

func (*Result) OnOK

func (rt *Result) OnOK(fn interface{})

OnOK is used to set the handler for the successful case.

func (*Result) SetInvoker

func (rt *Result) SetInvoker(ivok Invoker)

SetInvoker could assign Invoker for Result.OnOK

func (*Result) Then

func (rt *Result) Then() (err error)

Then is the asynchronous version of 'Result.Wait'

func (*Result) Wait

func (rt *Result) Wait(timeout time.Duration) (err error)

Wait is used to wait forever or for a period of time. Here is the meaning of return:

  • timeout: wait again later
  • other errors: something wrong.
  • nil: done, you can access the result via 'Last' member.

When anything other than 'timeout' is returned, the result of subsequent 'Wait' would remain the same.

Registered callback would be triggered when possible.

type Routines

type Routines struct {
	// contains filtered or unexported fields
}

func NewRoutines

func NewRoutines() *Routines

func (*Routines) Close

func (rs *Routines) Close()

Close is used to stop/release all allocated routines, this function should be safe from multiple calls.

func (*Routines) Events

func (rs *Routines) Events() chan *Event

func (*Routines) New

func (rs *Routines) New() <-chan int

func (*Routines) Wait

func (rs *Routines) Wait() *sync.WaitGroup

type SeqIDMaker

type SeqIDMaker struct {
	// contains filtered or unexported fields
}

SeqIDMaker is an implementation of IDMaker suited for local mode. A sequence of number would be generated when called. Usage:

err := app.AddIDMaker(101, &dingo.SeqIDMaker{})

func (*SeqIDMaker) NewID

func (seq *SeqIDMaker) NewID() (string, error)

type Store

type Store interface {
	// hook for listening events from dingo
	// parameter:
	// - eventID: which event?
	// - payload: corresponding payload, its type depends on 'eventID'
	// returns:
	// - err: errors
	StoreHook(eventID int, payload interface{}) (err error)

	// polling reports for tasks
	//
	// parameters:
	// - meta: the meta info of that task to be polled.
	// returns:
	// - reports: the output channel for dingo to receive reports.
	Poll(meta Meta) (reports <-chan []byte, err error)

	// Stop monitoring that task
	//
	// parameters:
	// - id the meta info of that task/report to stop polling.
	Done(meta Meta) error
}

Store is responsible for receiving reports from backend(s)

type Task

type Task struct {
	H *Header
	P *TaskPayload
}

Task is the struct records all infomation required for task execution, including:

  • name(type) of task
  • identifier of this task, which should be unique among all tasks of the same name.
  • arguments to be passed into worker function
  • execution option

You don't have to know what it is unless you try to implement:

  • dingo.Marshaller
  • dingo.Invoker

You don't have to create it by yourself, every time you call dingo.App.Call, one is generated automatically.

func (*Task) Args

func (t *Task) Args() []interface{}

func (*Task) ID

func (t *Task) ID() string

func (*Task) Name

func (t *Task) Name() string

func (*Task) Option

func (t *Task) Option() *Option

type TaskPayload

type TaskPayload struct {
	O *Option
	A []interface{}
}

type TaskReceipt

type TaskReceipt struct {
	ID      string
	Status  int
	Payload interface{}
}

TaskReceipt is the receipt allows "dingo" to reject tasks for any reason, the way to handle rejected tasks are Broker(s) dependent.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL