Documentation ¶
Overview ¶
Package dingo is a task/job <=> worker framework for #golang.
Goal
This library tries to make tasks invoking / monitoring as easy as possible. - any function can be a worker function, as long as types of its parameters are supported. - return values of worker functions are also accessible. - could be used locally as a queue for background jobs, or remotely as a distributed task queue when connected with AMQP or Redis.
Design
The design is inspired by https://github.com/RichardKnop/machinery http://www.celeryproject.org/ A short version of "how a task is invoked" in this library is: -------- caller --------- - users input arguments are treated as []interface{} - marshall []interface{} to []byte - send []byte to broker - polling return values from the store -------- worker --------- - consume []byte from broker - unmarshall []byte to []interface{}(underlying types might be different) - try to apply type-correction on []interface{} - invoking the worker function - convert its return values to []interface{} - marshall []interface{} to []byte - send []byte to the store -------- worker --------- - the byte stream of return values is ready after polling - unmarshall []byte to []interface{} - try to apply type-correction on []interface{} - return []interface{} to users. This library highly relies on reflection to provide flexibility, therefore, it may run more slowly than other libraries without using reflection. To overcome this, users can provide customized marshaller(s) and invoker(s) without using reflection. These customization are task specific, thus users may choose the default marsahller/invoker for most tasks, and provide customized marshaller/invoker to those tasks that are performance-critical.
Customization
These concept are virtualized for extensibility and customization, please refer to corresponding reference for details: - Generation of ID for new tasks: dingo.IDMaker - Parameter Marshalling: dingo.Marshaller - Worker Function Invoking: dingo.Invoker - Task Publishing/Consuming: dingo.Producer/dingo.Consumer/dingo.NamedConsumer - Report Publishing/Consuming: dingo.Reporter/dingo.Store
Parameter Types
Many parmeter types are supported by this library, except: - interface: no way to know the underlying type of an interface. - chan: not supported yet. - private field in struct, they would be ignored by most encoders. To support this, you need to provide a customized marshaller and invoker that can recognize those private fields.
TroubleShooting
It's relative hard to debug a multi-routine library. To know what's wrong inside, users can subscribe to receive failure events.(App.Listen)
Index ¶
- Variables
- func ComposeBytes(h *Header, bs [][]byte) (b []byte, err error)
- func DecomposeBytes(h *Header, b []byte) (bs [][]byte, err error)
- func NewLocalBackend(cfg *Config, to chan *ReportEnvelope) (v *localBackend, err error)
- func NewLocalBroker(cfg *Config, to chan []byte) (v *localBroker, err error)
- type App
- func (dg *App) AddIDMaker(expectedID int, m IDMaker) error
- func (dg *App) AddMarshaller(expectedID int, m Marshaller) error
- func (dg *App) Allocate(name string, count, share int) (remain int, err error)
- func (dg *App) Call(name string, opt *Option, args ...interface{}) (reports <-chan *Report, err error)
- func (dg *App) Close() (err error)
- func (dg *App) Listen(targets, level, expectedID int) (id int, events <-chan *Event, err error)
- func (dg *App) Register(name string, fn interface{}) (err error)
- func (dg *App) SetIDMaker(name string, id int) error
- func (dg *App) SetMarshaller(name string, taskMash, reportMash int) error
- func (dg *App) SetOption(name string, opt *Option) error
- func (dg *App) StopListen(id int) (err error)
- func (dg *App) Use(obj Object, types int) (id int, used int, err error)
- type Backend
- type BackendTestSuite
- func (ts *BackendTestSuite) SetupSuite()
- func (ts *BackendTestSuite) SetupTest()
- func (ts *BackendTestSuite) TearDownSuite()
- func (ts *BackendTestSuite) TearDownTest()
- func (ts *BackendTestSuite) TestBasic()
- func (ts *BackendTestSuite) TestExpect()
- func (ts *BackendTestSuite) TestOrder()
- func (ts *BackendTestSuite) TestSameID()
- type Broker
- type BrokerTestSuite
- func (ts *BrokerTestSuite) SetupSuite()
- func (ts *BrokerTestSuite) SetupTest()
- func (ts *BrokerTestSuite) TearDownSuite()
- func (ts *BrokerTestSuite) TearDownTest()
- func (ts *BrokerTestSuite) TestBasic()
- func (ts *BrokerTestSuite) TestDuplicated()
- func (ts *BrokerTestSuite) TestExpect()
- func (ts *BrokerTestSuite) TestNamed()
- type Config
- type Consumer
- type CustomMarshaller
- func (ms *CustomMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
- func (ms *CustomMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
- func (ms *CustomMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
- func (ms *CustomMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
- func (ms *CustomMarshaller) Prepare(name string, fn interface{}) (err error)
- type CustomMarshallerCodec
- type Error
- type Event
- type GenericInvoker
- type GobMarshaller
- func (ms *GobMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
- func (ms *GobMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
- func (ms *GobMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
- func (ms *GobMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
- func (ms *GobMarshaller) Prepare(name string, fn interface{}) (err error)
- type Header
- type HetroRoutines
- type IDMaker
- type Invoker
- type JSONSafeCodec
- func (codec *JSONSafeCodec) DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error)
- func (codec *JSONSafeCodec) DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error)
- func (codec *JSONSafeCodec) EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error)
- func (codec *JSONSafeCodec) EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error)
- func (codec *JSONSafeCodec) Prepare(name string, fn interface{}) (err error)
- type JsonMarshaller
- func (ms *JsonMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
- func (ms *JsonMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
- func (ms *JsonMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
- func (ms *JsonMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
- func (ms *JsonMarshaller) Prepare(string, interface{}) (err error)
- type LazyInvoker
- type Marshaller
- type Meta
- type NamedBroker
- type NamedConsumer
- type Object
- type Option
- type Producer
- type Report
- type ReportEnvelope
- type ReportPayload
- type Reporter
- type Result
- type Routines
- type SeqIDMaker
- type Store
- type Task
- type TaskPayload
- type TaskReceipt
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ConsumerEvent = struct {
}{}
ConsumerEvent, IDs of events that might be sent to ConsumerHook
var Encode = struct { // Default marshalling mode Default int // JSON marshalling mode JSON int // Gob marshalling mode GOB int // JSON-Safe marshalling mode JSONSAFE int }{ 0, 1, 2, 3, }
var ErrCode = struct { // the worker function panic Panic int32 // dingo.App shutdown Shutdown int32 }{ 1, 2, }
Error code used in dingo.Error
var EventCode = struct { Generic int TaskDeliveryFailure int DuplicatedPolling int }{ 0, 1, 2, }
var EventLvl = struct { Debug int Info int Warning int Error int }{ 0, 1, 2, 3, }
var ID = struct { // default ID maker Default int // an ID maker implemented via uuid4 UUID int // an ID maker implemented by atomic.AddInt64 SEQ int }{ 0, 1, 2, }
var ObjT = struct { /* when this type used in dingo.App.Use, it means let dingo decide which type would be registered to dingo.App. */ Default int // this object provides dingo.Reporter interface Reporter int // this object provides dingo.Store interface Store int // this object provides dingo.Producer interface Producer int // this object provides dingo.Consumer/dingo.NamedConsumer interface Consumer int // this is a dingo.mapper object Mapper int // this is a dingo.worker object Worker int // this object provides dingo.bridge interface Bridge int // this object provides dingo.NamedConsumer interface NamedConsumer int // from chained routines ChainRoutine int /* all object types, when used in dingo.App.Listen, it means listen to events from all possible origins. */ All int }{ 0, (1 << 0), (1 << 1), (1 << 2), (1 << 3), (1 << 4), (1 << 5), (1 << 6), (1 << 7), (1 << 8), int(^uint(0) >> 1), }
ObjT are types of object, they are bit flag and can be combined. These flags are used in:
- dingo.Use
- dingo.Listen
var ProducerEvent = struct { // a new kind of task is declared. DeclareTask int }{ 1, }
ProducerEvent, event IDs that might be passed to dingo.Producer.ProducerHook
var ReceiptStatus = struct { // this task is received successfully. OK int // something goes wrong NOK int // dingo can't find workers for this tasks WorkerNotFound int }{ 1, 2, 3, }
ReceiptStatus allows broker implementer to know if they have to reject the received packet or not.
var ReporterEvent = struct { // a sequence of reports from this task is about to fire. BeforeReport int }{ 1, }
ReportEvent are those IDs of events that might be sent to ReporterHook
var ResultError = struct { // the report channel returned from 'dingo' is nil NoChannel error // timeout Timeout error // the report channel is closed ChannelClosed error // there is no handler registered, shouldn't call .Then() NoHandler error }{ errors.New("channel is nil"), errors.New("time out"), errors.New("channel closed"), errors.New("no handler registered"), }
var Status = struct { None int16 // the task is sent to the consumer. Sent int16 // the task is sent to workers. Progress int16 // the task is done Success int16 // the task execution is failed. Fail int16 // this field should always the last one Count int16 }{ 0, 1, 2, 3, 4, 5, }
var StoreEvent = struct {
}{}
StoreEvent are those IDs of events that might be sent to StoreHook
Functions ¶
func ComposeBytes ¶
ComposeBytes composes slice of byte arrays could be composed into one byte stream, along with header section.
func DecomposeBytes ¶
DecomposeBytes can be used to decompose byte streams composed by "ComposeByte" into [][]byte
func NewLocalBackend ¶
func NewLocalBackend(cfg *Config, to chan *ReportEnvelope) (v *localBackend, err error)
NewLocalBackend allocates a Backend implementation based on 'channel'. Users can provide a channel and share it between multiple Reporter(s) and Store(s) to connect them.
func NewLocalBroker ¶
NewLocalBroker would allocate a Broker implementation based on 'channel'. Users can provide a channel and share it between multiple Producer(s) and Consumer(s) to connect them.
This one only implements Consumer interface, not the NamedConsumer one. So the dispatching of tasks relies on dingo.mapper
Types ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
App is the core component of dingo.
Example (Local) ¶
package main import ( "fmt" "time" "github.com/mission-liao/dingo" ) func main() { // this example demonstrates a job queue runs in background var err error defer func() { if err != nil { fmt.Printf("%v\n", err) } }() // a App in local mode app, err := dingo.NewApp("local", nil) if err != nil { return } // register a worker function murmur err = app.Register("murmur", func(msg string, repeat int, interval time.Duration) { for ; repeat > 0; repeat-- { select { case <-time.After(interval): fmt.Printf("%v\n", msg) } } }) if err != nil { return } // allocate 5 workers, sharing 1 report channel _, err = app.Allocate("murmur", 5, 1) if err != nil { return } results := []*dingo.Result{} // invoke 10 tasks for i := 0; i < 10; i++ { results = append( results, dingo.NewResult( // name, option, parameter#1, parameter#2, parameter#3... app.Call("murmur", nil, fmt.Sprintf("this is %d speaking", i), 10, 100*time.Millisecond), )) } // wait until those tasks are done for _, v := range results { err = v.Wait(0) if err != nil { return } } // release resource err = app.Close() if err != nil { return } }
Output:
func NewApp ¶
NewApp whose "mode" refers to different modes of dingo:
- "local": an App works in local mode, which is similar to other background worker framework.
- "remote": an App works in remote(distributed) mode, brokers(ex. AMQP...) and backends(ex. redis..., if required) would be needed to work.
func (*App) AddIDMaker ¶
AddIDMaker registers a customized IDMaker, input should be an object implements IDMaker.
You can register different id-makers to different tasks, internally, dingo would take both (name, id) as identity of a task.
The requirement of IDMaker:
- uniqueness of generated string among all generated tasks.
- routine(thread) safe.
The default IDMaker used by dingo is implemented by uuid4.
func (*App) AddMarshaller ¶
func (dg *App) AddMarshaller(expectedID int, m Marshaller) error
AddMarshaller registers a customized Marshaller, input should be an object implements both Marshaller and Invoker.
You can pick any builtin Invoker(s)/Marshaller(s) combined with your customized one:
app.AddMarshaller(3, &struct{JsonSafeMarshaller, __your_customized_invoker__})
"expectedID" is the expected identifier of this Marshaller, which could be useful when you need to sync the Marshaller-ID between producers and consumers. 0~3 are occupied by builtin Marshaller(s). Suggested "expectedID" should begin from 100.
func (*App) Allocate ¶
Allocate would allocate more workers. When your Consumer(s) implement NamedConsumer, a new listener (to brokers) would be allocated each time you call this function. All allocated workers would serve that listener.
If you want to open more channels to consume from brokers, just call this function multiple times.
parameters:
- name: the name of tasks.
- count: count of workers to be initialized.
- share: the count of workers sharing one report channel.
returns:
- remain: remaining count of workers that failed to initialize.
- err: any error produced
func (*App) Call ¶
func (dg *App) Call(name string, opt *Option, args ...interface{}) (reports <-chan *Report, err error)
Call would initiate a task by providing:
- "name" of tasks
- execution-"option" of tasks, could be nil
- argument of corresponding worker function.
A reporting channel would be returned for callers to monitor the status of tasks, and access its result. A suggested procedure to monitor reporting channels is
finished: for { select { case r, ok := <-report: if !ok { // dingo.App is closed somewhere else break finished } if r.OK() { // the result is ready returns := r.Returns() } if r.Fail() { // get error err := r.Error() } if r.Done() { break finished } } }
Multiple reports would be sent for each task:
- Sent: the task is already sent to brokers.
- Progress: the consumer received this task, and about to execute it
- Success: this task is finished without error.
- Fail: this task failed for some reason.
Noted: the 'Fail' here doesn't mean your worker function is failed, it means "dingo" doesn't execute your worker function properly.
func (*App) Close ¶
Close is used to release this instance. All reporting channels are closed after returning. However, those sent tasks/reports wouldn't be reclaimed.
func (*App) Listen ¶
Listen would subscribe the channel to receive events from 'dingo'.
"targets" are instances you want to monitor, they include:
- dingo.ObjT.Reporter: the Reporter instance attached to this App.
- dingo.ObjT.Store: the Store instance attached to this App.
- dingo.ObjT.Producer: the Producer instance attached to this App.
- dingo.ObjT.Consumer: the Consumer/NamedConsumer instance attached to this App.
- dingo.ObjT.Mapper: the internal component, turn if on when debug.
- dingo.ObjT.Worker: the internal component, turn it on when debug.
- dingo.ObjT.Bridge: the internal component, turn it on when debug.
- dingo.ObjT.All: every instance.
They are bit flags and can be combined as "targets", like:
ObjT.Bridge | ObjT.Worker | ...
"level" are minimal severity level expected, include:
- dingo.EventLvl.Debug
- dingo.EventLvl.Info
- dingo.EventLvl.Warning
- dingo.EventLvl.Error
"id" is the identity of this event channel, which could be used to stop monitoring by calling dingo.App.StopListen.
In general, a dedicated go routine would be initiated for this channel, with an infinite for loop, like this:
for { select { case e, ok := <-events: if !ok { // after App.Close(), all reporting channels would be closed, // except those channels abandoned by App.StopListen. return } fmt.Printf("%v\n", e) case <-quit: return } }
func (*App) Register ¶
Register would register a worker function
parameters:
- name: name of tasks
- fn: the function that actually perform the task.
returns:
- err: any error produced
func (*App) SetIDMaker ¶
SetIDMaker would set IDMaker used for a specific kind of tasks
parameters:
- name: name of tasks
- idmaker: id of IDMaker you would like to use when generating tasks.
func (*App) SetMarshaller ¶
SetMarshaller would set marshallers used for marshalling tasks and reports
parameters:
- name: name of tasks
- taskMash, reportMash: id of Marshaller for 'Task' and 'Report'
func (*App) StopListen ¶
StopListen would stop listening events.
Note: those channels stopped by App.StopListen wouldn't be closed but only be reclaimed by GC.
func (*App) Use ¶
Use is used to attach an instance, instance could be any instance implementing Reporter, Backend, Producer, Consumer.
parameters:
- obj: object to be attached
- types: interfaces contained in 'obj', refer to dingo.ObjT
returns:
- id: identifier assigned to this object, 0 is invalid value
- err: errors
For a producer, the right combination of "types" is ObjT.Producer|ObjT.Store, if reporting is not required, then only ObjT.Producer is used.
For a consumer, the right combination of "types" is ObjT.Consumer|ObjT.Reporter, if reporting is not reuqired(make sure there is no producer await), then only ObjT.Consumer is used.
Example (Caller) ¶
/* import ( "fmt" "time" "github.com/mission-liao/dingo" "github.com/mission-liao/dingo/amqp" ) */ ) */ // this example demostrate a caller based on AMQP, used along with ExampleApp_Use_worker // make sure you install a rabbitmq server locally. var err error defer func() { if err != nil { fmt.Printf("%v\n", err) } }() // an App in remote mode app, err := dingo.NewApp("remote", nil) if err != nil { return } // attach an AMQP producer to publish your tasks broker, err := dgamqp.NewBroker(dgamqp.DefaultAmqpConfig()) if err != nil { return } _, _, err = app.Use(broker, dingo.ObjT.Producer) if err != nil { return } // attach an AMQP store to receive reports from datastores. backend, err := dgamqp.NewBackend(dgamqp.DefaultAmqpConfig()) if err != nil { return } _, _, err = app.Use(backend, dingo.ObjT.Store) if err != nil { return } // register a work function that murmur err = app.Register("murmur", func(speech *struct { Prologue string Script []string }, interval time.Duration) (countOfSentence int) { // speak the prologue fmt.Printf("%v:\n", speech.Prologue) countOfSentence++ // speak the script for _, v := range speech.Script { <-time.After(interval) fmt.Printf("%v\n", v) countOfSentence++ } // return the total sentence we talked return }) if err != nil { return } // compose a script to talk script := &struct { Prologue string Script []string }{ Script: []string{ "Today, I'm announcing this library.", "It should be easy to use, ", "and fun to play with.", "Merry X'mas.", }, } // invoke 20 tasks results := []*dingo.Result{} for i := 0; i < 20; i++ { script.Prologue = fmt.Sprintf("this is %d speaking", i) results = append(results, dingo.NewResult( // name, option, parameter#1, parameter#2 ... app.Call("murmur", nil, script, 100*time.Millisecond), )) } // wait until those tasks are done for _, v := range results { err = v.Wait(0) if err != nil { return } // result is accessible fmt.Printf("one worker spoke %v sentences\n", v.Last.Return()[0].(int)) } // release resource err = app.Close() if err != nil { return }
Output:
Example (Worker) ¶
/* import ( "fmt" "os" "time" "github.com/mission-liao/dingo" "github.com/mission-liao/dingo/amqp" ) */ ) */ // this example demonstrate a worker based on AMQP, used along with ExampleApp_Use_caller // make sure you install a rabbitmq server locally. var err error defer func() { if err != nil { fmt.Printf("%v\n", err) } }() // an App in remote mode app, err := dingo.NewApp("remote", nil) if err != nil { return } // attach an AMQP consumer to receive tasks broker, err := dgamqp.NewBroker(dgamqp.DefaultAmqpConfig()) if err != nil { return } _, _, err = app.Use(broker, dingo.ObjT.NamedConsumer) if err != nil { return } // attach an AMQP reporter to publish reports backend, err := dgamqp.NewBackend(dgamqp.DefaultAmqpConfig()) if err != nil { return } _, _, err = app.Use(backend, dingo.ObjT.Reporter) if err != nil { return } // register a work function that murmur err = app.Register("murmur", func(speech *struct { Prologue string Script []string }, interval time.Duration) (countOfSentence int) { // speak the prologue fmt.Printf("%v:\n", speech.Prologue) countOfSentence++ // speak the script for _, v := range speech.Script { <-time.After(interval) fmt.Printf("%v\n", v) countOfSentence++ } // return the total sentence we talked return }) if err != nil { return } // allocate 1 workers, sharing 1 report channel _, err = app.Allocate("murmur", 1, 1) if err != nil { return } // wait until a key stroke var stroke []byte = make([]byte, 100) fmt.Println("waiting for tasks...stop waiting by pressing enter") os.Stdin.Read(stroke) // release resource err = app.Close() if err != nil { return }
Output:
type BackendTestSuite ¶
type BackendTestSuite struct { suite.Suite Gen func() (Backend, error) Trans *fnMgr Bkd Backend Rpt Reporter Sto Store Tasks []*Task }
All dingo.Backend provider should pass this test suite. Example testing code:
type myBackendTestSuite struct { dingo.BackendTestSuite } func TestMyBackendTestSuite(t *testing.T) { suite.Run(t, &myBackendTestSuite{ dingo.BackendTestSuite{ Gen: func() (dingo.Backend, error) { // generate a new instance of your backend. }, }, }) }
func (*BackendTestSuite) SetupSuite ¶
func (ts *BackendTestSuite) SetupSuite()
func (*BackendTestSuite) SetupTest ¶
func (ts *BackendTestSuite) SetupTest()
func (*BackendTestSuite) TearDownSuite ¶
func (ts *BackendTestSuite) TearDownSuite()
func (*BackendTestSuite) TearDownTest ¶
func (ts *BackendTestSuite) TearDownTest()
func (*BackendTestSuite) TestBasic ¶
func (ts *BackendTestSuite) TestBasic()
func (*BackendTestSuite) TestExpect ¶
func (ts *BackendTestSuite) TestExpect()
func (*BackendTestSuite) TestOrder ¶
func (ts *BackendTestSuite) TestOrder()
func (*BackendTestSuite) TestSameID ¶
func (ts *BackendTestSuite) TestSameID()
type BrokerTestSuite ¶
type BrokerTestSuite struct { suite.Suite Trans *fnMgr Gen func() (interface{}, error) Pdc Producer Csm Consumer Ncsm NamedConsumer ConsumerNames []string }
All dingo.Broker provider should pass this test suite. Example testing code:
type myBrokerTestSuite struct { dingo.BrokerTestSuite } func TestMyBrokerTestSuite(t *testing.T) { suite.Run(t, &myBrokerTestSuite{ dingo.BrokerTestSuite{ Gen: func() (interface{}, error) { // generate a new instance of your backend. // both dingo.Broker and dingo.NamedBroker are acceptable. }, }, }) }
func (*BrokerTestSuite) SetupSuite ¶
func (ts *BrokerTestSuite) SetupSuite()
func (*BrokerTestSuite) SetupTest ¶
func (ts *BrokerTestSuite) SetupTest()
func (*BrokerTestSuite) TearDownSuite ¶
func (ts *BrokerTestSuite) TearDownSuite()
func (*BrokerTestSuite) TearDownTest ¶
func (ts *BrokerTestSuite) TearDownTest()
func (*BrokerTestSuite) TestBasic ¶
func (ts *BrokerTestSuite) TestBasic()
func (*BrokerTestSuite) TestDuplicated ¶
func (ts *BrokerTestSuite) TestDuplicated()
func (*BrokerTestSuite) TestExpect ¶
func (ts *BrokerTestSuite) TestExpect()
func (*BrokerTestSuite) TestNamed ¶
func (ts *BrokerTestSuite) TestNamed()
type Config ¶
type Config struct {
Mappers_ int `json:"Mappers"`
}
func DefaultConfig ¶
func DefaultConfig() *Config
type Consumer ¶
type Consumer interface { // hook for listening event from dingo // parameter: // - eventID: which event? // - payload: corresponding payload, its type depends on 'eventID' // returns: // - err: errors ConsumerHook(eventID int, payload interface{}) (err error) // create a new listener to receive tasks // // parameters: // - rcpt: a channel that 'dingo' would send 'TaskReceipt' for tasks from 'tasks' channel. // returns: // - tasks: 'dingo' would consume from this channel for new tasks // - err: any error during initialization AddListener(rcpt <-chan *TaskReceipt) (tasks <-chan []byte, err error) // all listeners are stopped, their corresponding "tasks" channel(returned from AddListener) // would be closed. StopAllListeners() (err error) }
Consumer would consume tasks from broker(s). This kind of Consumer(s) is easier to implement, every task is sent to a single queue, and consumed from a single queue.
The interaction between Consumer(s) and dingo are asynchronous by the channel you provide in 'AddListener'.
type CustomMarshaller ¶
type CustomMarshaller struct {
Codec CustomMarshallerCodec
}
CustomMarshaller is a helper Marshaller for users to create customized Marshaller(s) by providing
several hooks. Users just need to take care of things they know: - input arguments - outpu return values other payloads of task/report are handled by CustomMarshaller. Here is a partial demo with json: // worker function, we are going to provide a custom marshaller // without any reflect for it. fn := func(msg string, category int) (done bool) { ... } // implement CustomMarshallerCodec interface type myCodec struct {} // encoding arguments func (c *myCodec) EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error) { bMsg, _ := json.Marshal(val[0]) bCategory, _ := json.Marshal(val[1]) return [][]byte{bMsg, bCategory}, nil } // encoding returns func (c *myCodec) EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error) { bDone, _ := json.Marshal(val[0]) return [][]byte{bDone}, nil } // decoding arguments func (c *myCodec) DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error) { var ( msg string category int ) // unmarshall each argument json.Unmarshal(bs[0], &msg) json.Unmarshal(bs[1], &category) return []interface{}{msg, category}, nil } func (c *myCodec) DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error) { var done bool json.Unmarshal(bs[0], &done) return []interface{}{done}, nil } // register it to dingo.App app.AddMarshaller(expectedMashId, &struct{ CustomMarshaller, myCustomInvoker, }{ CustomMarshaller{Codec: &myCodec{}}, myCustomInvoker{}, })
Example ¶
package main import ( "encoding/json" "fmt" "github.com/mission-liao/dingo" ) type testCustomMarshallerCodec struct{} func (me *testCustomMarshallerCodec) Prepare(name string, fn interface{}) (err error) { return } func (me *testCustomMarshallerCodec) EncodeArgument(_ interface{}, val []interface{}) (bs [][]byte, err error) { fmt.Println("encode argument is called") b, err := json.Marshal(val[0]) if err != nil { return } bs = [][]byte{b} return } func (me *testCustomMarshallerCodec) DecodeArgument(_ interface{}, bs [][]byte) (val []interface{}, err error) { fmt.Println("decode argument is called") var input []string err = json.Unmarshal(bs[0], &input) if err != nil { return } val = []interface{}{input} return } func (me *testCustomMarshallerCodec) EncodeReturn(_ interface{}, val []interface{}) (bs [][]byte, err error) { fmt.Println("encode return is called") b, err := json.Marshal(val[0]) if err != nil { return } bs = [][]byte{b} return } func (me *testCustomMarshallerCodec) DecodeReturn(_ interface{}, bs [][]byte) (val []interface{}, err error) { fmt.Println("decode return is called") var ret string err = json.Unmarshal(bs[0], &ret) if err != nil { return } val = []interface{}{ret} return } type testMyInvoker3 struct{} func (me *testMyInvoker3) Call(f interface{}, param []interface{}) ([]interface{}, error) { fmt.Println("my invoker is called for Call") ret := f.(func([]string) string)(param[0].([]string)) return []interface{}{ret}, nil } func (me *testMyInvoker3) Return(f interface{}, returns []interface{}) ([]interface{}, error) { fmt.Println("my invoker is called for Return") return returns, nil } func main() { /* import ( "encoding/json" "fmt" "github.com/mission-liao/dingo" ) */ // this example demonstrate the usage of using a // customized marshaller by encoding every parameter // in JSON. // And invoke it with customized invoker var err error defer func() { if err != nil { fmt.Printf("%v\n", err) } }() // an App in remote mode, with local backend/broker app, err := dingo.NewApp("remote", nil) if err != nil { return } // attach a local broker. broker, err := dingo.NewLocalBroker(dingo.DefaultConfig(), nil) if err != nil { return } _, _, err = app.Use(broker, dingo.ObjT.Default) // attach a local backend backend, err := dingo.NewLocalBackend(dingo.DefaultConfig(), nil) if err != nil { return } _, _, err = app.Use(backend, dingo.ObjT.Default) if err != nil { return } // register customize marshaller & invoker err = app.AddMarshaller(101, &struct { testMyInvoker3 dingo.CustomMarshaller }{ testMyInvoker3{}, dingo.CustomMarshaller{Codec: &testCustomMarshallerCodec{}}, }) if err != nil { return } // register worker function err = app.Register("concat", func(words []string) (ret string) { for _, v := range words { ret = ret + v } return }) if err != nil { return } // change marshaller of worker function err = app.SetMarshaller("concat", 101, 101) if err != nil { return } // allocate workers _, err = app.Allocate("concat", 1, 1) if err != nil { return } // trigger the fire... result := dingo.NewResult(app.Call("concat", nil, []string{"Merry ", "X", "'mas"})) err = result.Wait(0) if err != nil { return } result.OnOK(func(ret string) { fmt.Printf("%v\n", ret) }) err = app.Close() if err != nil { return } }
Output:
func (*CustomMarshaller) DecodeReport ¶
func (ms *CustomMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
func (*CustomMarshaller) DecodeTask ¶
func (ms *CustomMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
func (*CustomMarshaller) EncodeReport ¶
func (ms *CustomMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
func (*CustomMarshaller) EncodeTask ¶
func (ms *CustomMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
func (*CustomMarshaller) Prepare ¶
func (ms *CustomMarshaller) Prepare(name string, fn interface{}) (err error)
type CustomMarshallerCodec ¶
type CustomMarshallerCodec interface { /* A hook called when CustomMarshaller.Prepare is called. */ Prepare(name string, fn interface{}) (err error) /* encode arguments. - fn: function fingerprint - val: slice of arguments You can encode each argument one by one, and compose them into one slice of byte slice. (or anyway you want) */ EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error) /* decode arguments. - fn: function fingerprint - bs: slice of byte slice */ DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error) /* encode returns. - fn: function fingerprint - val: slice of returns You can encode each return one by one, and compose them into one slice of byte slice. (or anyway you want) */ EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error) /* decode arguments. - fn: function fingerprint - bs: slice of byte slice */ DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error) }
CustomMarshallerCodec is used by a marshaller developed to help users to provide a customized marshaller by providing a "codec" to encode/decode arguments/returns.
type Event ¶
type Event struct { // origin of event: please refer to dingo.ObjT for possible values. Origin int Time time.Time Level int Code int Payload interface{} }
func NewEventFromError ¶
type GenericInvoker ¶
type GenericInvoker struct{}
This Invoker is a generic one which can convert values from different types. For example:
- a struct can be converted from a map or another struct.
- pointers or pointers of pointers or ... is also handled.
However, the flexibility comes with price, it's also the slowest option. All builtin Marshaller(s) could be used with this Invoker.
func (*GenericInvoker) Call ¶
func (vk *GenericInvoker) Call(f interface{}, param []interface{}) ([]interface{}, error)
func (*GenericInvoker) Return ¶
func (vk *GenericInvoker) Return(f interface{}, returns []interface{}) ([]interface{}, error)
type GobMarshaller ¶
type GobMarshaller struct{}
GobMarshaller is a marshaller implemented via gob encoding. Note: this Marshaller can work with both GenericInvoker and LazyInvoker.
func (*GobMarshaller) DecodeReport ¶
func (ms *GobMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
func (*GobMarshaller) DecodeTask ¶
func (ms *GobMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
func (*GobMarshaller) EncodeReport ¶
func (ms *GobMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
func (*GobMarshaller) EncodeTask ¶
func (ms *GobMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
func (*GobMarshaller) Prepare ¶
func (ms *GobMarshaller) Prepare(name string, fn interface{}) (err error)
type Header ¶
type Header struct { // header type, "dingo" would raise an error when encountering headers with // unknown types. T int16 // dingo-generated id for this task I string // task name N string // registries(a serious of uint64), their usage depends on Marshaller(s). R []uint64 }
The Common header section of the byte stream marshalled from Marshaller(s), external components(broker.Producer, broker.Consumer, backend.Reporter, backend.Store) could rely on Header to get some identity info from the byte stream they have, like this: h, err := DecodeHeader(b) // the id of task h.ID() // the name of task h.Name() Registries could be added to Header. For example, if your Marshaller encodes each argument in different byte streams, you could record their lengths(in byte) in registries section in Header: // marshalling bs := [][]byte{} h := task.H for _, v := range args { b_, _ := json.Marshal(v) h.Append(uint64(len(b_))) bs = append(bs, b_) } // compose those byte streams b, _ := h.Flush(0) // header section for _, v := range bs { b = append(b, v) } // unmarshalling h, _ := DecodeHeader(b) for _, v := range h.Registry() { // you could rely on registry to decompose // the byte stream here. }
func DecodeHeader ¶
type HetroRoutines ¶
type HetroRoutines struct {
// contains filtered or unexported fields
}
func NewHetroRoutines ¶
func NewHetroRoutines() *HetroRoutines
func (*HetroRoutines) Close ¶
func (rs *HetroRoutines) Close()
func (*HetroRoutines) Events ¶
func (rs *HetroRoutines) Events() chan *Event
func (*HetroRoutines) New ¶
func (rs *HetroRoutines) New(want int) (quit <-chan int, done chan<- int, idx int)
func (*HetroRoutines) Stop ¶
func (rs *HetroRoutines) Stop(idx int) (err error)
type IDMaker ¶
IDMaker is an object that can generate a series of identiy, typed as string. Each idenity should be unique.
type Invoker ¶
type Invoker interface { // invoker the function "f" by parameters "param", and the returns // are represented as []interface{} Call(f interface{}, param []interface{}) ([]interface{}, error) // when marshal/unmarshal with json, some type information would be lost. // this function helps to convert those returns with correct type info provided // by function's reflection. // // parameters: // - f: the function // - returns: the array of returns // returns: // converted return array and error Return(f interface{}, returns []interface{}) ([]interface{}, error) }
The responsibility of Invoker(s) are:
- invoking the function by converting []interface{} to []reflect.Value, and calling reflect.Call.
- convert the return value of reflect.Call from []reflect.Value to []interface{}
Because most Marshaller(s) can't unmarshal byte stream to original type(s), Invokers are responsible for type-correction. The builtin Invoker(s) of "dingo" are also categorized by their ability to correct type(s).
Note: all implemented functions should be thread-safe.
type JSONSafeCodec ¶
type JSONSafeCodec struct{}
func (*JSONSafeCodec) DecodeArgument ¶
func (codec *JSONSafeCodec) DecodeArgument(fn interface{}, bs [][]byte) ([]interface{}, error)
func (*JSONSafeCodec) DecodeReturn ¶
func (codec *JSONSafeCodec) DecodeReturn(fn interface{}, bs [][]byte) ([]interface{}, error)
func (*JSONSafeCodec) EncodeArgument ¶
func (codec *JSONSafeCodec) EncodeArgument(fn interface{}, val []interface{}) ([][]byte, error)
func (*JSONSafeCodec) EncodeReturn ¶
func (codec *JSONSafeCodec) EncodeReturn(fn interface{}, val []interface{}) ([][]byte, error)
func (*JSONSafeCodec) Prepare ¶
func (codec *JSONSafeCodec) Prepare(name string, fn interface{}) (err error)
type JsonMarshaller ¶
type JsonMarshaller struct{}
JsonMarshaller is a marshaller implemented via json encoding. Note: this Marshaller can only work with GenericInvoker.
func (*JsonMarshaller) DecodeReport ¶
func (ms *JsonMarshaller) DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error)
func (*JsonMarshaller) DecodeTask ¶
func (ms *JsonMarshaller) DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error)
func (*JsonMarshaller) EncodeReport ¶
func (ms *JsonMarshaller) EncodeReport(fn interface{}, report *Report) (b []byte, err error)
func (*JsonMarshaller) EncodeTask ¶
func (ms *JsonMarshaller) EncodeTask(fn interface{}, task *Task) (b []byte, err error)
func (*JsonMarshaller) Prepare ¶
func (ms *JsonMarshaller) Prepare(string, interface{}) (err error)
type LazyInvoker ¶
type LazyInvoker struct{}
A less generic Invoker that can handle pointer with different level. This Invoker can only work with GobMarshaller and JsonSafeMarshaller.
func (*LazyInvoker) Call ¶
func (vk *LazyInvoker) Call(f interface{}, param []interface{}) ([]interface{}, error)
func (*LazyInvoker) Return ¶
func (vk *LazyInvoker) Return(f interface{}, returns []interface{}) ([]interface{}, error)
type Marshaller ¶
type Marshaller interface { // you can perform any preprocessing for every worker function when registered. Prepare(name string, fn interface{}) (err error) // Encode a task. EncodeTask(fn interface{}, task *Task) (b []byte, err error) // Decode a task. DecodeTask(h *Header, fn interface{}, b []byte) (task *Task, err error) // Encode a report. EncodeReport(fn interface{}, report *Report) (b []byte, err error) // Decode a report. DecodeReport(h *Header, fn interface{}, b []byte) (report *Report, err error) }
Marshaller(s) is the major component between []interface{} and []byte.
- Note: all marshalled []byte should be prefixed with a Header.
- Note: all implemented functions should be routine(thread)-safe.
type NamedBroker ¶
type NamedBroker interface { Producer NamedConsumer }
NamedBroker interface is composed of Producer/NamedConsumer
type NamedConsumer ¶
type NamedConsumer interface { // hook for listening event from dingo // parameter: // - eventID: which event? // - payload: corresponding payload, its type depends on 'eventID' // returns: // - err: errors ConsumerHook(eventID int, payload interface{}) (err error) // create a new consumer to receive tasks // // parameters: // - name: name of task to be received // - rcpt: a channel that 'dingo' would send 'TaskReceipt' for tasks from 'tasks'. // returns: // - tasks: 'dingo' would consume from this channel for new tasks // - err: any error during initialization AddListener(name string, rcpt <-chan *TaskReceipt) (tasks <-chan []byte, err error) // all listeners are stopped, their corresponding "tasks" channel(returned from AddListener) // would be closed. StopAllListeners() (err error) }
NamedConsumer would consume tasks from broker(s). Different kind of tasks should be sent to different queues, and consumed from different queues.
With this kind of Consumer(s), you can deploy different kinds of workers on machines, and each one of them handles different sets of worker functions.
type Object ¶
type Object interface { // what dingo expects from this object Expect(types int) error // allow dingo to attach event channels used in this object Events() ([]<-chan *Event, error) // releasing resource Close() error }
Object is an interface, and an object implements this interface means:
- dingo can have a trigger to release the resource allocated by this object.
- dingo can aggregate events raised from this object, (those events can be subscribed by dingo.App.Listen)
All objects attached via dingo.App.Use should implement this interface.
type Option ¶
type Option struct { // IgnoreReport: stop reporting when executing tasks. IR bool // MonitorProgress: monitoring the progress of task execution MP bool }
func DefaultOption ¶
func DefaultOption() *Option
func (*Option) GetIgnoreReport ¶
func (*Option) GetMonitorProgress ¶
func (*Option) IgnoreReport ¶
func (*Option) MonitorProgress ¶
type Producer ¶
type Producer interface { // hook for listening event from dingo // parameter: // - eventID: which event? // - payload: corresponding payload, its type depends on 'eventID' // returns: // - err: errors ProducerHook(eventID int, payload interface{}) (err error) // send a task to brokers, it should be a blocking call. // // parameters: // - meta: the meta info of this task to be sent. // - b: the byte stream of this task. Send(meta Meta, b []byte) error }
Producer is responsibe for sending tasks to broker(s).
type Report ¶
type Report struct { H *Header P *ReportPayload }
Report is the reports for task execution.
When Report.Done() is true, it means no more reports would be sent. And either Report.OK() or Report.Fail() would be true.
If Report.OK() is true, the task execution is succeeded, and you can grab the return value from Report.Returns(). Report.Returns() would give you an []interface{}, and the type of every elements in that slice would be type-corrected according to the work function.
If Report.Fail() is true, the task execution is failed, and you can grab the reason from Report.Error().
func (*Report) Done ¶
Done means the task is finished, and no more reports would be sent. Either Error() or Return() would have values to check.
func (*Report) Error ¶
Error refers to possible error raised during execution, which is packed into transportable form.
func (*Report) ID ¶
ID refers to identifier of this report, and all reports belongs to one task share the same ID.
func (*Report) Option ¶
Option refers to execution options of the report, inherits from the tasks that it belongs to.
type ReportEnvelope ¶
ReportEvenlope is the standard package sent through the channel to Reporter. The main requirement to fit is to allow Reporter can know the meta info of the byte stream to send.
type ReportPayload ¶
type Reporter ¶
type Reporter interface { // hook for listening events from dingo // parameter: // - eventID: which event? // - payload: corresponding payload, its type depends on 'eventID' // returns: // - err: errors ReporterHook(eventID int, payload interface{}) (err error) // attach a report channel to backend. what dingo can promise is: // - all reports belongs to the same task(name, id) would be sent through the same channel // // parameters: // - name: all reports sent through this channel would be this name // - reports: a input channel to receive reports from dingo. // returns: // - err: errors Report(name string, reports <-chan *ReportEnvelope) (id int, err error) }
Reporter is responsible for sending reports to backend(s). The interaction between Reporter(s) and dingo are asynchronous by channels.
type Result ¶
type Result struct { Last *Report // contains filtered or unexported fields }
Result is a wrapper of chan *dingo.Report returned from dingo.App.Call, taking care of the logic to handle asynchronous result from 'dingo'.
Example usage:
r := dingo.NewResult(app.Call(...)) // blocking until done err := r.Wait(0) if err == nil { r.Last // the last Report } // polling for every 1 second for dingo.ResultError.Timeout == r.Wait(1*time.Second) { // logging or ... }
When the task is done, you could register a handler function, whose fingerprint is identical to the return part of worker functions. For example, if the worker function is:
func ComposeWords(words []string) (count int, composed string)
Its corresponding 'OnOK' handler is:
func (count int, composed string) {...}
When anything goes wrong, you could register a handler function via 'OnNOK', whose fingerprint is
func (*Error, error)
Both failure reports or errors generated in 'Result' object would be passed to this handler, at least one of them would not be nil.
You can register handlers before calling 'Wait', or call 'Wait' before registering handlers. The ordering doesn't matter. Those handlers would be called exactly once.
func NewResult ¶
NewResult simply wrap this factory function with the calling to dingo.Call.
NewResult(app.Call("test", ...))
func (*Result) OnOK ¶
func (rt *Result) OnOK(fn interface{})
OnOK is used to set the handler for the successful case.
func (*Result) SetInvoker ¶
SetInvoker could assign Invoker for Result.OnOK
func (*Result) Wait ¶
Wait is used to wait forever or for a period of time. Here is the meaning of return:
- timeout: wait again later
- other errors: something wrong.
- nil: done, you can access the result via 'Last' member.
When anything other than 'timeout' is returned, the result of subsequent 'Wait' would remain the same.
Registered callback would be triggered when possible.
type Routines ¶
type Routines struct {
// contains filtered or unexported fields
}
func NewRoutines ¶
func NewRoutines() *Routines
type SeqIDMaker ¶
type SeqIDMaker struct {
// contains filtered or unexported fields
}
SeqIDMaker is an implementation of IDMaker suited for local mode. A sequence of number would be generated when called. Usage:
err := app.AddIDMaker(101, &dingo.SeqIDMaker{})
func (*SeqIDMaker) NewID ¶
func (seq *SeqIDMaker) NewID() (string, error)
type Store ¶
type Store interface { // hook for listening events from dingo // parameter: // - eventID: which event? // - payload: corresponding payload, its type depends on 'eventID' // returns: // - err: errors StoreHook(eventID int, payload interface{}) (err error) // polling reports for tasks // // parameters: // - meta: the meta info of that task to be polled. // returns: // - reports: the output channel for dingo to receive reports. Poll(meta Meta) (reports <-chan []byte, err error) // Stop monitoring that task // // parameters: // - id the meta info of that task/report to stop polling. Done(meta Meta) error }
Store is responsible for receiving reports from backend(s)
type Task ¶
type Task struct { H *Header P *TaskPayload }
Task is the struct records all infomation required for task execution, including:
- name(type) of task
- identifier of this task, which should be unique among all tasks of the same name.
- arguments to be passed into worker function
- execution option
You don't have to know what it is unless you try to implement:
- dingo.Marshaller
- dingo.Invoker
You don't have to create it by yourself, every time you call dingo.App.Call, one is generated automatically.
type TaskPayload ¶
type TaskPayload struct { O *Option A []interface{} }
type TaskReceipt ¶
TaskReceipt is the receipt allows "dingo" to reject tasks for any reason, the way to handle rejected tasks are Broker(s) dependent.
Source Files ¶
- backend.go
- backend_local.go
- backend_test_suite.go
- bridge.go
- bridge_local.go
- bridge_remote.go
- broker.go
- broker_local.go
- broker_test_suite.go
- codec_json_safe.go
- config.go
- dingo.go
- err.go
- event.go
- header.go
- id_maker.go
- invoker.go
- invoker_generic.go
- invoker_lazy.go
- mapper.go
- marshaller.go
- marshaller_custom.go
- marshaller_gob.go
- marshaller_json.go
- mgr.go
- mux.go
- object.go
- option.go
- report.go
- result.go
- routine.go
- task.go
- worker.go