universal

package module
v0.0.0-...-204804b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2018 License: MIT Imports: 11 Imported by: 0

README

universal Build Status codecov Go Report Card GoDoc

Package universal provides interfaces for expressing message processing logic and utilities that simplify building and running message processing applications. It requires Go 1.9 or newer.

There are many different ways to build a message processing application with Go. However, the basic requirements of those applications are typically the same:

  1. It must accept an incoming message from a message source (e.g. a message queue or database).
  2. It must perform work on the incoming message and write a result.
  3. It must persist the result in a message destination (e.g. another message queue or database).

An additional practical requirement is that the message processing logic must be callable on-demand to provide visibility into the logic (e.g. for troubleshooting unexpected message processing results).

Simple message processor

A Service receives messages from the provided message source and calls the Processor functions for every message in separate goroutines. The Service can also be used as an http.Handler, which calls the Processor using the HTTP request body as the message body and returns the results in the response body.

Example

type MyMessage struct {
	ctx    context.Context
	input  *bytes.Reader
	result *bytes.Buffer
}

func (msg *MyMessage) Context() context.Context    { return msg.ctx }
func (msg *MyMessage) Read(p []byte) (int, error)  { return msg.input.Read(p) }
func (msg *MyMessage) Write(p []byte) (int, error) { return msg.result.Write(p) }

type MyProcessor struct{}

func (proc *MyProcessor) Process(m universal.Message) error {
	// Read the message, perform work, and write the result.
	_, err := m.Write([]byte("result"))
	return err
}

func (proc *MyProcessor) Finish(m universal.Message, procErr error) {
	if procErr != nil {
		// Handle any error returned from Process.
		return
	}
	message := m.(*MyMessage)
	MessageSource.SendMessage(message.result.Bytes())
}

func main() {
	svc := universal.NewService(&MyProcessor{})
	go svc.Run(func() universal.Message {
		// Get the next message from a message source.
		input := MessageSource.GetMessage()
		return &MyMessage{
			ctx:    context.Background(),
			input:  bytes.NewReader(input.Body),
			result: new(bytes.Buffer),
		}
	}, 30*time.Second)
	http.ListenAndServe(":80", svc)
}

RPC message processor

An RPCService receives messages from the provided message source and calls an existing RPC server, passing the RPC method call results to the RPCFinisher function.

Example

type Args struct {
	A, B int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

type MyRPCMessage struct {
	ctx     context.Context
	request rpc.Request
	args    interface{}
}

func (rw *MyRPCMessage) Context() context.Context { return rw.ctx }
func (rw *MyRPCMessage) Request() rpc.Request     { return rw.request }
func (rw *MyRPCMessage) Args() interface{}        { return rw.args }

type MyRPCFinisher struct{}

func (rw *MyRPCFinisher) Finish(
	m universal.RPCMessage,
	r universal.RPCResult,
	rpcErr error,
) {
	if rpcErr != nil {
		// Handle any error returned from the RPC method.
		return
	}
	result := r.(*int)
	MessageSource.SendMessage([]byte(strconv.Itoa(result)))
}

func main() {
	server := rpc.NewServer()
	server.Register(new(Arith))

	svc := universal.NewRPCService(&MyRPCFinisher{})
	go svc.Run(func() universal.RPCMessage {
		// Get the next message from a message source.
		input := MessageSource.GetMessage()
		return &MyRPCMessage{
			ctx:     context.Background(),
			request: rpc.Request{ServiceMethod: input.Method},
			args: &Args{
				A: input.A,
				B: input.B,
			},
		}
	}, server, 30*time.Second)

	server.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
	http.ListenAndServe(":80", nil)
}

Documentation

Overview

Package universal provides a way to express an atomic unit of work that can be used as a message handler and an HTTP handler. TODO: Also rpc! TODO: All in separate goroutines!

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HTTPError

type HTTPError interface {
	StatusCode() int
	Header() http.Header
	encoding.BinaryMarshaler
	error
}

An HTTPError provides HTTP-specific response information for an error. It allows setting a specific HTTP status code, header values, and the response body.

type Message

type Message interface {
	Context() context.Context
	// Reader provides the Message input as a readable byte stream.
	io.Reader
	// Writer provides the Message output as a writable byte stream.
	io.Writer
}

Message is a task to be completed by a Processor.

type Processor

type Processor interface {
	// Process accepts a Message, performing all actions necessary to complete
	// the processing of the Message except for persistence of the result to the
	// output data system (e.g. a message queue or a database). The Messages come
	// from either a Service.Run loop or a Service's HTTP handler. All Process calls
	// run in separate goroutines.
	//
	// Any error returned is be passed to the Finish function. If the Message
	// came from an HTTP request, the error is returned to the caller in the
	// HTTP body.
	Process(Message) error

	// Finish handles persistence of the results of processing a Message to the
	// output data system (e.g. a message queue or a database). The processed
	// Message and any error returned while processing the Message are passed
	// to Finish.
	//
	// Finish is only called from a Service.Run loop and is not called for Messages
	// from a Service's HTTP handler. All Finish calls run in separate goroutines
	// (in the same goroutine as the Process call).
	Finish(Message, error)
}

A Processor implements all logic for handling incoming Messages, either from a Service.Run loop or from a Service's HTTP handler.

type RPCFinisher

type RPCFinisher interface {
	Finish(RPCMessage, RPCResult, error)
}

type RPCMessage

type RPCMessage interface {
	Context() context.Context
	Request() rpc.Request
	Args() interface{}
}

type RPCResult

type RPCResult = interface{}

An RPCResult is the result value from an RPC method call. The caller is responsible for type-asserting the value to the type of the result of the called RPC method (the 2nd argument of the RPC method).

type RPCService

type RPCService struct {
	// contains filtered or unexported fields
}

func NewRPCService

func NewRPCService(finisher RPCFinisher) *RPCService

func (*RPCService) Run

func (svc *RPCService) Run(
	next func() RPCMessage,
	server *rpc.Server,
	shutdownTimeout time.Duration,
) error

Run calls the specified RPC method then Finish on all messages returned by the provided RPCMessage source function, blocking until the provided RPCMessage source function returns nil.

When the provided RPCMessage source returns a nil RPCMessage, Run starts to shut down, blocking until all running goroutines started by Run have returned or until the shutdownTimeout is reached, whichever comes first. If the shutdownTimeout is reached, Run returns an error.

type Service

type Service struct {
	// contains filtered or unexported fields
}

A Service passes Messages to the Processor. Service provides a Run method for providing Messages directly to the Service and an HTTP handler that passes HTTP request bodies as Messages.

func NewService

func NewService(processor Processor) *Service

NewService creates a new Service that uses the provided Processor to process Messages.

func (*Service) Run

func (svc *Service) Run(next func() Message, shutdownTimeout time.Duration) error

Run calls Process then Finish on all Messages returned by the provided Message source function, blocking until the provided Message source function returns nil.

When the provided Message source returns a nil Message, Run starts to shut down, blocking until all running goroutines started by Run have returned or until the shutdownTimeout is reached, whichever comes first. If the shutdownTimeout is reached, Run returns an error.

func (*Service) ServeHTTP

func (svc *Service) ServeHTTP(writer http.ResponseWriter, request *http.Request)

ServeHTTP is an HTTP handler that calls the Service's Process function with the HTTP request as a Message. All body unmarshalling and response marshalling is performed by the Process function, just like in the Service.Run loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL