dispatch

package module
v0.0.0-...-86570b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2014 License: MIT Imports: 7 Imported by: 1

README

dispatch

GoDoc

Package dispatch implements a message processing mechanism.

The basic idea is goroutine-per-request, which means each request runs in a separate goroutine.

              +  +  +
              |  |  |
              |  |  | goroutine per request
              |  |  |
              v  v  v
     +---------------------+
     |      Dispatcher     |
     +----------+----------+
       +--------+--------+
       |        |  +-----|----------------------------------+
       v        v  |     v       In-memory entity           |
     +----+  +----+|+---------+                             |
     |Dest|  |Dest|||  Dest   |                             |
     +----+  +----+|+---------+                             |
                   ||  Mutex  |             +------------+  |
                   ||+-------+|   access    |            |  |
                   |||Handler+------------->|            |  |
                   ||+-------+|sequentially | Associated |  |
                   ||+Handler+------------->|            |  |
                   ||+-------+|             | Resources  |  |
                   ||+Handler+------------->|            |  |
                   ||+-------+|             |            |  |
                   |+---------+             +------------+  |
                   |                                        |
                   +----------------------------------------+

Documentation

Overview

Package dispatch implements a message processing mechanism.

The basic idea is goroutine-per-request, which means each request runs in a separate goroutine.

  • + + | | | | | | goroutine per request | | | v v v +---------------------+ | Dispatcher | +----------+----------+ +--------+--------+ | | +-----|----------------------------------+ v v | v In-memory entity | +----+ +----+|+---------+ | |Dest| |Dest||| Dest | | +----+ +----+|+---------+ | || Mutex | +------------+ | ||+-------+| access | | | |||Handler+------------->| | | ||+-------+|sequentially | Associated | | ||+Handler+------------->| | | ||+-------+| | Resources | | ||+Handler+------------->| | | ||+-------+| | | | |+---------+ +------------+ | | | +----------------------------------------+

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddressBook

type AddressBook interface {
	Lookup(r Request) Dest
}

AddressBook is used to get a Dest by a Request(typically by address).

Note that AddressBook is not necessarily to be a Dest container, it may also be a factory or a loader, etc.

type ConcurrentDest

type ConcurrentDest struct {
	// contains filtered or unexported fields
}

ConcurrentDest is a Dest that Requests are processed concurrently.

func NewConcurrentDest

func NewConcurrentDest(h Handler) *ConcurrentDest

NewConcurrentDest creates a ConcurrentDest with provided Handler.

func (*ConcurrentDest) Call

func (d *ConcurrentDest) Call(ctx *Context, r Request) Response

Call is for synchronous communication.

func (*ConcurrentDest) Send

func (d *ConcurrentDest) Send(r Request) error

Send is for asynchronous communication.

type ContentType

type ContentType int

ContentType indicates a Sink's data type.

const (
	Bytes    ContentType = iota + 1 // binary bytes
	Text                            // printable string
	Json                            // json string
	Protobuf                        // protobuf message
)

func (ContentType) String

func (c ContentType) String() string

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context is a goroutine scope object which carries Mutexes locked by current goroutine and a cancelation signal.

A Context MUST only be created when:

1. Spawning a new goroutine. 2. In a new goroutine which has not been associated with a Context. (e.g. in http.HandlerFunc)

In other cases, always use associated Context and pass it around.

func NewContext

func NewContext() *Context

NewContext creates a Context with no cancelation.

func NewContextWithCancel

func NewContextWithCancel() (ctx *Context, cancel func())

NewContextWithCancel creates a Context that will be canceled when the returned cancel function is called. Calling cancel more than once will cause panic.

func NewContextWithTimeOut

func NewContextWithTimeOut(t time.Duration) *Context

NewContextWithTimeOut creates a Context that will be canceled after indicated time duration.

func (*Context) AcquireOrCancel

func (ctx *Context) AcquireOrCancel(m Mutex) bool

AcquireOrCancel should be called before accessing shared resources. It returns false if cancel is signaled before success lock. Usage pattern:

if ctx.AcquireOrCancel(m) {
        defer ctx.Release()
        // ... access shared resources ...
} else {
        // ... canceled ...
}

A Context can lock mutiple Mutexes. Note that a race condition may occur in this case (e.g. one locks A then waits to lock B, other locks B then waits to lock A). To simplify the problem, it's recommended always locking only one Mutex at a time.

func (*Context) Canceled

func (ctx *Context) Canceled() <-chan struct{}

func (*Context) Release

func (ctx *Context) Release()

Release unlocks last acquired Mutex.

func (*Context) ReleaseAll

func (ctx *Context) ReleaseAll()

ReleaseAll unlocks all acquired Mutexes, in First-In-Last-Out order.

type ContextCanceledError

type ContextCanceledError struct{}

ContextCanceledError records an error when Context canceled before processing a Request.

func (ContextCanceledError) Error

func (e ContextCanceledError) Error() string

Error returns "Context canceled."

type Dest

type Dest interface {
	Call(ctx *Context, r Request) Response
	Send(r Request) error
}

Dest is an in-memory entity that accepts Requests and returns Responses. It's typically implemented with a Mutex to make Requests processed sequentially and one or more Handlers to do the logic.

Call() is used for synchronous communication, it returns after Request has been processed; Send() is used for asynchronous communication, it returns immediately and spawns a new goroutine to process Request.

See ConcurrentDest, LockedDest and MuxDest for more details.

type DestNotFoundError

type DestNotFoundError string

DestNotFoundError records an error when no Dest found by Request's Address().

func (DestNotFoundError) Error

func (e DestNotFoundError) Error() string

Error returns error information with the address.

type Dispatcher

type Dispatcher struct {
	AddressBook
}

Dispatcher dispatches Requests to corresponding Dest by looking up an AddressBook.

Note that Dispatcher is also a Dest. You can build a multi-level Dispatcher easily.

func (*Dispatcher) Call

func (d *Dispatcher) Call(ctx *Context, r Request) Response

Call dispatches a call request to a Dest. It returns a Response with DestNotFoundError if no Dest found in the AddressBook.

func (*Dispatcher) Send

func (d *Dispatcher) Send(r Request) error

Send dispatches a send request to a Dest. It returns a Response with DestNotFoundError if no Dest found in the AddressBook.

type Handler

type Handler interface {
	Serve(ctx *Context, m Mutex, r Request) Response
}

Handler is an interface wraps Serve() which processes a Request and returns a Response. Typically, Handler will be registed to a Dest and serve a particular protocol.

type HandlerFunc

type HandlerFunc func(ctx *Context, m Mutex, r Request) Response

HandlerFunc is an adapter from an ordinary function to a Handler.

func (HandlerFunc) Serve

func (f HandlerFunc) Serve(ctx *Context, m Mutex, r Request) (rsp Response)

Serve calls f(ctx, m, r). If panic occurs, it returns a Response with a PanicError.

type LockedDest

type LockedDest struct {
	Mutex
	// contains filtered or unexported fields
}

LockedDest is a Dest that Requests are processed sequentially.

func NewLockedDest

func NewLockedDest(h Handler) *LockedDest

NewLockedDest creates a LockedDest with provided Handler.

func (*LockedDest) Call

func (d *LockedDest) Call(ctx *Context, r Request) Response

Call is for synchronous communication.

func (*LockedDest) Send

func (d *LockedDest) Send(r Request) error

Send is for asynchronous communication.

type LockedHandlerFunc

type LockedHandlerFunc func(r Request) Response

LockedHandlerFunc is an adatper from an ordinary function to a auto-locked Handler.

func (LockedHandlerFunc) Serve

func (f LockedHandlerFunc) Serve(ctx *Context, m Mutex, r Request) (rsp Response)

Serve tries to lock Mutex then calls f(r). If ctx is canceled before success lock, it returns a Response with a ContextCanceledError; If panic occurs, it returns a Response with a PanicError.

type Mutex

type Mutex chan struct{}

Mutex is implemented as a buffered channel in order to make it selectable. Always create Mutex by calling NewMutex() to ensure channel's buffer size is 1. Usage:

m := NewMutex() // create
m <- struct{}{} // lock
<-m             // unlock

// lock or timeout
select {
        case m <- struct{}{}:
        // success lock
        case <-time.After(time.Second):
        // timeout
}

func NewMutex

func NewMutex() Mutex

NewMutex creates an unlocked Mutex.

func (Mutex) Lock

func (m Mutex) Lock()

Lock locks the Mutex.

func (Mutex) Unlock

func (m Mutex) Unlock()

Unlock unlocks the Mutex.

type MuxAddressBook

type MuxAddressBook struct {
	// contains filtered or unexported fields
}

MuxAddressBook is a AddressBook uses a Mux to look up Dest.

func NewMuxAddressBook

func NewMuxAddressBook(m *mux.Mux) *MuxAddressBook

NewMuxAddressBook creates a MuxAddressBook with provided Mux.

Note that passed Mux MUST be either empty or only has values of type Dest.

func (*MuxAddressBook) Lookup

func (b *MuxAddressBook) Lookup(r Request) Dest

Lookup matches Request's address to registed Dests.

func (*MuxAddressBook) Register

func (b *MuxAddressBook) Register(address string, dest Dest)

Register binds a Dest to an address.

type MuxDest

type MuxDest struct {
	Mutex
	// contains filtered or unexported fields
}

MuxDest is a Dest carries mutiple Handlers.

func NewMuxDest

func NewMuxDest(m *mux.Mux) *MuxDest

NewMuxDest creates a MuxDest with provided Mux. Note that passed Mux MUST be either empty or only has values of type Handler.

func (*MuxDest) Call

func (d *MuxDest) Call(ctx *Context, r Request) Response

Call is for synchronous communication. It returns a Response with ProtocolNotImplementError if no registered Handler matches the Request's protocol.

func (*MuxDest) Handle

func (d *MuxDest) Handle(pattern string, h Handler)

Handle binds a Handler to a pattern.

func (*MuxDest) Send

func (d *MuxDest) Send(r Request) error

Send is for asynchronous communication. It returns a Response with ProtocolNotImplementError if no registered Handler matches the Request's protocol.

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

PanicError records an error when panic occur when calling a Handler.

func (PanicError) Error

func (e PanicError) Error() string

Error returns panic information and debug stack.

type ProtocolNotImplementError

type ProtocolNotImplementError string

ProtocolNotImplementError records an error when no Handler found by Request's Protocol().

func (ProtocolNotImplementError) Error

Error returns error infomation with the protocol.

type Request

type Request interface {
	Protocol() string
	Address() string
	Body() *Sink
}

Request represents a request message. Protocol() indicates the Request's type and how it will be processed. Address() indicates where the Request will be sent to. Body() returns Request's ContentType and raw data.

func SimpleRequest

func SimpleRequest(protocol, address string, body *Sink) Request

SimpleRequest creates a simple Request.

type Response

type Response interface {
	Error() error
	Body() *Sink
}

Response represents a response message corrsponding to a Request. Error() returns an none nil error if any error occurs. Body() returns Response's ContentType and raw data.

func SimpleResponse

func SimpleResponse(body *Sink, err error) Response

SimpleResponse creates a simple Response

type Sink

type Sink struct {
	ContentType
	// contains filtered or unexported fields
}

A Sink carries binary bytes, string, or a protobuf message. Internally datas are converted to []byte.

func BytesSink

func BytesSink(b []byte) *Sink

BytesSink creates a Sink by []byte. The Sink's ContentType will be Bytes.

func JsonSink

func JsonSink(j interface{}) *Sink

JsonSink creates a Sink by a struct. The Sink's ContentType will be Json.

func ProtoSink

func ProtoSink(m proto.Message) *Sink

ProtoSink creates a Sink by a protobuf message. The Sink's ContentType will be Protobuf.

func TextSink

func TextSink(v string) *Sink

TextSink creates a Sink by string. The Sink's ContentType will be Text.

func (*Sink) Bytes

func (s *Sink) Bytes() []byte

Bytes returns data as []byte.

func (*Sink) String

func (s *Sink) String() string

String returns data as a string.

func (*Sink) UnmarshalJson

func (s *Sink) UnmarshalJson(v interface{}) error

UnmarshalJson unmarshals JSON data and store the result in v.

func (*Sink) UnmarshalProtoMessage

func (s *Sink) UnmarshalProtoMessage(m proto.Message) error

UnmarshalProtoMessage unmarshals data to a protobuf message.

func (*Sink) Write

func (s *Sink) Write(w io.Writer)

Write writes data to a Writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL