Documentation ¶
Overview ¶
Package dispatch implements a message processing mechanism.
The basic idea is goroutine-per-request, which means each request runs in a separate goroutine.
- + + | | | | | | goroutine per request | | | v v v +---------------------+ | Dispatcher | +----------+----------+ +--------+--------+ | | +-----|----------------------------------+ v v | v In-memory entity | +----+ +----+|+---------+ | |Dest| |Dest||| Dest | | +----+ +----+|+---------+ | || Mutex | +------------+ | ||+-------+| access | | | |||Handler+------------->| | | ||+-------+|sequentially | Associated | | ||+Handler+------------->| | | ||+-------+| | Resources | | ||+Handler+------------->| | | ||+-------+| | | | |+---------+ +------------+ | | | +----------------------------------------+
Index ¶
- type AddressBook
- type ConcurrentDest
- type ContentType
- type Context
- type ContextCanceledError
- type Dest
- type DestNotFoundError
- type Dispatcher
- type Handler
- type HandlerFunc
- type LockedDest
- type LockedHandlerFunc
- type Mutex
- type MuxAddressBook
- type MuxDest
- type PanicError
- type ProtocolNotImplementError
- type Request
- type Response
- type Sink
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddressBook ¶
AddressBook is used to get a Dest by a Request(typically by address).
Note that AddressBook is not necessarily to be a Dest container, it may also be a factory or a loader, etc.
type ConcurrentDest ¶
type ConcurrentDest struct {
// contains filtered or unexported fields
}
ConcurrentDest is a Dest that Requests are processed concurrently.
func NewConcurrentDest ¶
func NewConcurrentDest(h Handler) *ConcurrentDest
NewConcurrentDest creates a ConcurrentDest with provided Handler.
func (*ConcurrentDest) Call ¶
func (d *ConcurrentDest) Call(ctx *Context, r Request) Response
Call is for synchronous communication.
func (*ConcurrentDest) Send ¶
func (d *ConcurrentDest) Send(r Request) error
Send is for asynchronous communication.
type ContentType ¶
type ContentType int
ContentType indicates a Sink's data type.
const ( Bytes ContentType = iota + 1 // binary bytes Text // printable string Json // json string Protobuf // protobuf message )
func (ContentType) String ¶
func (c ContentType) String() string
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is a goroutine scope object which carries Mutexes locked by current goroutine and a cancelation signal.
A Context MUST only be created when:
1. Spawning a new goroutine. 2. In a new goroutine which has not been associated with a Context. (e.g. in http.HandlerFunc)
In other cases, always use associated Context and pass it around.
func NewContextWithCancel ¶
func NewContextWithCancel() (ctx *Context, cancel func())
NewContextWithCancel creates a Context that will be canceled when the returned cancel function is called. Calling cancel more than once will cause panic.
func NewContextWithTimeOut ¶
NewContextWithTimeOut creates a Context that will be canceled after indicated time duration.
func (*Context) AcquireOrCancel ¶
AcquireOrCancel should be called before accessing shared resources. It returns false if cancel is signaled before success lock. Usage pattern:
if ctx.AcquireOrCancel(m) { defer ctx.Release() // ... access shared resources ... } else { // ... canceled ... }
A Context can lock mutiple Mutexes. Note that a race condition may occur in this case (e.g. one locks A then waits to lock B, other locks B then waits to lock A). To simplify the problem, it's recommended always locking only one Mutex at a time.
func (*Context) ReleaseAll ¶
func (ctx *Context) ReleaseAll()
ReleaseAll unlocks all acquired Mutexes, in First-In-Last-Out order.
type ContextCanceledError ¶
type ContextCanceledError struct{}
ContextCanceledError records an error when Context canceled before processing a Request.
func (ContextCanceledError) Error ¶
func (e ContextCanceledError) Error() string
Error returns "Context canceled."
type Dest ¶
Dest is an in-memory entity that accepts Requests and returns Responses. It's typically implemented with a Mutex to make Requests processed sequentially and one or more Handlers to do the logic.
Call() is used for synchronous communication, it returns after Request has been processed; Send() is used for asynchronous communication, it returns immediately and spawns a new goroutine to process Request.
See ConcurrentDest, LockedDest and MuxDest for more details.
type DestNotFoundError ¶
type DestNotFoundError string
DestNotFoundError records an error when no Dest found by Request's Address().
func (DestNotFoundError) Error ¶
func (e DestNotFoundError) Error() string
Error returns error information with the address.
type Dispatcher ¶
type Dispatcher struct {
AddressBook
}
Dispatcher dispatches Requests to corresponding Dest by looking up an AddressBook.
Note that Dispatcher is also a Dest. You can build a multi-level Dispatcher easily.
func (*Dispatcher) Call ¶
func (d *Dispatcher) Call(ctx *Context, r Request) Response
Call dispatches a call request to a Dest. It returns a Response with DestNotFoundError if no Dest found in the AddressBook.
func (*Dispatcher) Send ¶
func (d *Dispatcher) Send(r Request) error
Send dispatches a send request to a Dest. It returns a Response with DestNotFoundError if no Dest found in the AddressBook.
type Handler ¶
Handler is an interface wraps Serve() which processes a Request and returns a Response. Typically, Handler will be registed to a Dest and serve a particular protocol.
type HandlerFunc ¶
HandlerFunc is an adapter from an ordinary function to a Handler.
type LockedDest ¶
type LockedDest struct { Mutex // contains filtered or unexported fields }
LockedDest is a Dest that Requests are processed sequentially.
func NewLockedDest ¶
func NewLockedDest(h Handler) *LockedDest
NewLockedDest creates a LockedDest with provided Handler.
func (*LockedDest) Call ¶
func (d *LockedDest) Call(ctx *Context, r Request) Response
Call is for synchronous communication.
func (*LockedDest) Send ¶
func (d *LockedDest) Send(r Request) error
Send is for asynchronous communication.
type LockedHandlerFunc ¶
LockedHandlerFunc is an adatper from an ordinary function to a auto-locked Handler.
func (LockedHandlerFunc) Serve ¶
func (f LockedHandlerFunc) Serve(ctx *Context, m Mutex, r Request) (rsp Response)
Serve tries to lock Mutex then calls f(r). If ctx is canceled before success lock, it returns a Response with a ContextCanceledError; If panic occurs, it returns a Response with a PanicError.
type Mutex ¶
type Mutex chan struct{}
Mutex is implemented as a buffered channel in order to make it selectable. Always create Mutex by calling NewMutex() to ensure channel's buffer size is 1. Usage:
m := NewMutex() // create m <- struct{}{} // lock <-m // unlock // lock or timeout select { case m <- struct{}{}: // success lock case <-time.After(time.Second): // timeout }
type MuxAddressBook ¶
type MuxAddressBook struct {
// contains filtered or unexported fields
}
MuxAddressBook is a AddressBook uses a Mux to look up Dest.
func NewMuxAddressBook ¶
func NewMuxAddressBook(m *mux.Mux) *MuxAddressBook
NewMuxAddressBook creates a MuxAddressBook with provided Mux.
Note that passed Mux MUST be either empty or only has values of type Dest.
func (*MuxAddressBook) Lookup ¶
func (b *MuxAddressBook) Lookup(r Request) Dest
Lookup matches Request's address to registed Dests.
func (*MuxAddressBook) Register ¶
func (b *MuxAddressBook) Register(address string, dest Dest)
Register binds a Dest to an address.
type MuxDest ¶
type MuxDest struct { Mutex // contains filtered or unexported fields }
MuxDest is a Dest carries mutiple Handlers.
func NewMuxDest ¶
NewMuxDest creates a MuxDest with provided Mux. Note that passed Mux MUST be either empty or only has values of type Handler.
func (*MuxDest) Call ¶
Call is for synchronous communication. It returns a Response with ProtocolNotImplementError if no registered Handler matches the Request's protocol.
type PanicError ¶
type PanicError struct {
// contains filtered or unexported fields
}
PanicError records an error when panic occur when calling a Handler.
func (PanicError) Error ¶
func (e PanicError) Error() string
Error returns panic information and debug stack.
type ProtocolNotImplementError ¶
type ProtocolNotImplementError string
ProtocolNotImplementError records an error when no Handler found by Request's Protocol().
func (ProtocolNotImplementError) Error ¶
func (e ProtocolNotImplementError) Error() string
Error returns error infomation with the protocol.
type Request ¶
Request represents a request message. Protocol() indicates the Request's type and how it will be processed. Address() indicates where the Request will be sent to. Body() returns Request's ContentType and raw data.
func SimpleRequest ¶
SimpleRequest creates a simple Request.
type Response ¶
Response represents a response message corrsponding to a Request. Error() returns an none nil error if any error occurs. Body() returns Response's ContentType and raw data.
func SimpleResponse ¶
SimpleResponse creates a simple Response
type Sink ¶
type Sink struct { ContentType // contains filtered or unexported fields }
A Sink carries binary bytes, string, or a protobuf message. Internally datas are converted to []byte.
func JsonSink ¶
func JsonSink(j interface{}) *Sink
JsonSink creates a Sink by a struct. The Sink's ContentType will be Json.
func ProtoSink ¶
ProtoSink creates a Sink by a protobuf message. The Sink's ContentType will be Protobuf.
func (*Sink) UnmarshalJson ¶
UnmarshalJson unmarshals JSON data and store the result in v.
func (*Sink) UnmarshalProtoMessage ¶
UnmarshalProtoMessage unmarshals data to a protobuf message.