amqprpc

package module
v0.0.0-...-875fc58 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2017 License: Apache-2.0 Imports: 15 Imported by: 0

README

amqprpc

Project amqprpc provides access to the exported methods of an object across RabbitMQ connection. It is meant as a near drop-in implementation of TCP based built-in net/rpc package thus most of characteristics and limitations are kept intact. The major difference stems from the fact of using messaging queues for transport, one well defined queue for requests and a temporary queue established internally by a client for processing responses. The implementation is intended to follow to certain extent RabbitMQ tutorial Remote procedure call (RPC)

Status

Beta

Build Status Coverage Status GoDoc

Usage

Go get
go get -u github.com/go-amqprpc/amqprpc
Import
import "github.com/go-amqprpc/amqprpc"
Examples

A simple example of a use-case adopted from net/rpc would look as follows using a queue based RPC:

A server wishes to export an object of type Arith:

package X

import "errors"

type Args struct {
	A, B int
}

type Quotient struct {
	Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
        if args.B == 0 {
            return errors.New("divide by zero")
        }
        quo.Quo = args.A / args.B
        quo.Rem = args.A % args.B
        return nil
}

The server calls for AMQP service:

package X

import "github.com/go-amqprpc/amqprpc"

const (
        RPCQueueName = "rpc_queue"
        AMQPURI = "amqp://guest:guest@127.0.0.1:5672//"
)

arith := new(Arith)
server, err := amqprpc.NewServer(AMQPURI)
if err != nil {
        panic(err)
}
defer server.Close()

server.Register(RPCQueueName, arith)
err = server.Listen()
if err != nil {
        panic(err)
}

At this point, clients can see a service Arith with methods Arith.Multiply and Arith.Divide. To invoke one, a client first dials the server:

package X

import "github.com/go-amqprpc/amqprpc"

client, err := amqprpc.NewClient(AMQPURI)
if err != nil {
        panic(err)
}
defer client.Close()

Then it can make a remote call:

// Synchronous call

args := &X.Args{7,8}
var reply int
err = client.Call(RPCQueueName, "Arith.Multiply", args, &reply)
if err != nil {
        panic(err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)

or

// Asynchronous call

quotient := new(X.Quotient)
divCall := client.Go(RPCQueueName, "Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done	// will be equal to divCall
// check errors, print, etc.

License

Apache License v2 - see LICENSE for more details.

Documentation

Overview

Project `amqprpc` provides access to the exported methods of an object across RabbitMQ connection. It is meant as a near drop-in implementation of TCP based built-in `net/rpc` package thus most of characteristics and limitations are kept intact. The major difference stems from the fact of using messaging queues for transport, one well defined queue for requests and a temporary queue established internally by a client for processing responses. The implementation is intended to follow to certain extent RabbitMQ tutorial http://www.rabbitmq.com/tutorials/tutorial-six-go.html

A simple example of a use-case adopted from "net/rpc" would look as follows using a queue based RPC:

A server wishes to export an object of type Arith:

package X

import "errors"

type Args struct {
	A, B int
}

type Quotient struct {
	Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {
        if args.B == 0 {
            return errors.New("divide by zero")
        }
        quo.Quo = args.A / args.B
        quo.Rem = args.A % args.B
        return nil
}

The server calls for AMQP service:

package X

const (
        RPCQueueName = "rpc_queue"
        AMQPURI = "amqp://guest:guest@127.0.0.1:5672//"
)

arith := new(Arith)
server, err := amqprpc.NewServer(AMQPURI)
if err != nil {
        panic(err)
}
defer server.Close()

server.Register(RPCQueueName, arith)
err = server.Listen()
if err != nil {
        panic(err)
}

At this point, clients can see a service "Arith" with methods "Arith.Multiply" and "Arith.Divide". To invoke one, a client first dials the server:

package X

client, err := amqprpc.NewClient(AMQPURI)
if err != nil {
        panic(err)
}
defer client.Close()

Then it can make a remote call:

// Synchronous call

args := &X.Args{7,8}
var reply int
err = client.Call(RPCQueueName, "Arith.Multiply", args, &reply)
if err != nil {
        panic(err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)

or

// Asynchronous call

quotient := new(X.Quotient)
divCall := client.Go(RPCQueueName, "Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done	// will be equal to divCall
// check errors, print, etc.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Call

type Call struct {
	ServiceMethod string      // The name of the service and method to call.
	Args          interface{} // The argument to the function (*struct).
	Reply         interface{} // The reply from the function (*struct).
	Error         error       // After completion, the error status.
	Done          chan *Call  // Strobes when call is complete
	// contains filtered or unexported fields
}

Call represents an active RPC.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is RPC Client

func NewClient

func NewClient(url string) (*Client, error)

NewClient returns a new Client to handle requests to the set of services at broker connected using AMQP URI url

func NewClientConfig

func NewClientConfig(url string, config *ClientConfig) (*Client, error)

NewClientConfig returns a new Client to handle requests to the set of services at broker connected using AMQP URI url customised with config parameters

func (*Client) Call

func (client *Client) Call(key, serviceMethod string, args interface{}, reply interface{}) error

Call invokes the named function, waits for it to complete, and returns its error status.

func (*Client) Close

func (client *Client) Close()

Close sends a close message to underlying connection and empties internal pending message pool

func (*Client) Go

func (client *Client) Go(key, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

type ClientCodec

type ClientCodec interface {
	// ClientCodec returns a one-time codec for serializing requests
	ClientCodec(*bytes.Buffer) rpc.ClientCodec
	// ContentType is MIME content type for messages encoded with the codec
	ContentType() string
}

type ClientConfig

type ClientConfig struct {
	// PoolSize is number of threads sending requests to the server
	// and processing responses
	PoolSize uint
	// TLS is SSL connection configuration
	TLS *tls.Config
	// Codec is an encoder/decoder used for serialization of requests
	Codec ClientCodec
}

type GobClientCodec

type GobClientCodec struct {
	// contains filtered or unexported fields
}

func (GobClientCodec) ClientCodec

func (c GobClientCodec) ClientCodec(buf *bytes.Buffer) rpc.ClientCodec

func (GobClientCodec) ContentType

func (c GobClientCodec) ContentType() string

type GobServerCodec

type GobServerCodec struct {
	// contains filtered or unexported fields
}

func (GobServerCodec) ContentType

func (c GobServerCodec) ContentType() string

func (GobServerCodec) ServerCodec

func (c GobServerCodec) ServerCodec(in *bytes.Buffer, out *bytes.Buffer) rpc.ServerCodec

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a AMQP-RPC server wrapper

func NewServer

func NewServer(url string) (*Server, error)

NewServer returns a new Server.

func NewServerConfig

func NewServerConfig(url string, config *ServerConfig) (*Server, error)

NewServerConfig returns a new Server with customisable max number of attempts to re-connect to the broker after failure MaxAttempts, delay between connections ConnDelay and TLS connection configuration using ServerConfig config.

func (*Server) Close

func (server *Server) Close()

Close sends request to interrupt underlying connection

func (*Server) ConnError

func (server *Server) ConnError() error

ConnError returns last connection reported from underlying connection attempt

func (*Server) Listen

func (server *Server) Listen() error

Listen asynchronously starts all configured services

func (*Server) Register

func (s *Server) Register(key string, rcvr interface{}) error

Register is a queue key oriented wrapper for net/rpc.Server.Register that configures the set of methods of the receiver value that satisfy the following conditions:

  • exported method of exported type
  • two arguments, both of exported type
  • the second argument is a pointer
  • one return value, of type error

It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.

func (*Server) RegisterConfig

func (s *Server) RegisterConfig(qn, name string, rcvr interface{}, conf *ServiceConfig) error

func (*Server) RegisterName

func (s *Server) RegisterName(key, name string, rcvr interface{}) error

RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.

type ServerCodec

type ServerCodec interface {
	// ServerCodec returns a one-time codec used in rpc.Server.ServeRequest
	// call with input byte buffer (invoked with bytes.NewBuffer(d.Body))
	// and a transient output byte buffer for writing a response back to
	// the message broker
	ServerCodec(*bytes.Buffer, *bytes.Buffer) rpc.ServerCodec
	// ContentType is MIME content type for messages encoded with the codec
	ContentType() string
}

ServerCodec is a synchronous codec used to decode amqp.Delivery Body using rpc.ServerCodec

type ServerConfig

type ServerConfig struct {
	TLS         *tls.Config
	MaxAttempts int
	ConnDelay   uint
}

type ServiceConfig

type ServiceConfig struct {
	Queue    func(*amqp.Channel) (amqp.Queue, error)
	Codec    ServerCodec
	PoolSize uint
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL