rpcmq

package module
v0.0.0-...-8a85d2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2016 License: BSD-3-Clause Imports: 11 Imported by: 6

README

rpcmq GoDoc

Introduction

The package rpcmq implements an RPC protocol over AMQP.

Installation

go get github.com/jroimartin/rpcmq

Documentation

godoc github.com/jroimartin/rpcmq

Examples

The following snippets show how easy is to implement both an RPC client and server using rpcmq. In this example the server registers a new method called "toUpper" that takes an string and convert it to uppercase. On the other hand, the RPC client will invoke this method remotely. Then, after 10 seconds, the client and the server will shutdown.

server.go

package main

import (
	"log"
	"strings"
	"time"

	"github.com/jroimartin/rpcmq"
)

func main() {
	s := rpcmq.NewServer("amqp://amqp_broker:5672",
		"rpc-queue", "rpc-exchange", "direct")
	if err := s.Register("toUpper", toUpper); err != nil {
		log.Fatalf("Register: %v", err)
	}
	if err := s.Init(); err != nil {
		log.Fatalf("Init: %v", err)
	}
	defer s.Shutdown()

	time.Sleep(1 * time.Minute)
}

func toUpper(id string, data []byte) ([]byte, error) {
	log.Printf("Received (%v): toUpper(%v)\n", id, string(data))
	return []byte(strings.ToUpper(string(data))), nil
}

client.go

package main

import (
	"log"
	"time"

	"github.com/jroimartin/rpcmq"
)

func main() {
	c := rpcmq.NewClient("amqp://amqp_broker:5672",
		"rpc-queue", "rpc-client", "rpc-exchange", "direct")
	if err := c.Init(); err != nil {
		log.Fatalf("Init: %v", err)
	}
	defer c.Shutdown()

	go func() {
		for {
			data := []byte("Hello gophers!")
			uuid, err := c.Call("toUpper", data, 0)
			if err != nil {
				log.Println("Call:", err)
			}
			log.Printf("Sent: toUpper(%v) (%v)\n", string(data), uuid)
			time.Sleep(500 * time.Millisecond)
		}
	}()

	go func() {
		for r := range c.Results() {
			if r.Err != "" {
				log.Printf("Received error: %v (%v)", r.Err, r.UUID)
				continue
			}
			log.Printf("Received: %v (%v)\n", string(r.Data), r.UUID)
		}
	}()

	time.Sleep(1 * time.Minute)
}

This code will generate the following output:

screen shot 2015-03-03 at 11 56 33

Documentation

Overview

Package rpcmq implements an RPC protocol over AMQP.

Client/Server initialization

It is important to note that both clients and servers must be initialized before being used. Also, any configuration parameter (TLSConfig, Parallel, Prefetch, RateLimit and DeliveryMode) should be set up before calling Init().

SSL/TLS

When connecting to the broker via amqps protocol, the TLS configuration can be set up using the TLSConfig parameter present in the Client and Server objects. For more information, see the documentation of the package "crypto/tls".

Index

Constants

View Source
const (
	// Persistent delivery mode means that messages will be restored to
	// durable queues during server restart.
	Persistent = DeliveryMode(amqp.Persistent)

	// Transient delivery mode means higher throughput but messages will
	// not be restored on broker restart.
	Transient = DeliveryMode(amqp.Transient)
)

Delivery modes.

Variables

Log is the logger used to register warnings and info messages. If it is nil, no messages will be logged.

View Source
var RetrySleep = 2 * time.Second

RetrySleep is the time between retries if the connection with the broker is lost.

Functions

This section is empty.

Types

type Client

type Client struct {

	// DeliveryMode allows to configure the delivery mode followed by the
	// broker. The default mode is Persistent.
	DeliveryMode DeliveryMode

	// TLSConfig allows to configure the TLS parameters used to connect to
	// the broker via amqps.
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

A Client is an RPC client, which is used to invoke remote procedures.

func NewClient

func NewClient(uri, msgsQueue, repliesQueue, exchange, kind string) *Client

NewClient returns a reference to a Client object. The paremeter uri is the network address of the broker and msgsQueue/repliesQueue are the names of queues that will be created to exchange the messages between clients and servers. On the other hand, the parameters exchange and kind determine the type of exchange that will be created. In fanout mode the queue name is ignored, so each queue has its own unique id.

func (*Client) Call

func (c *Client) Call(method string, data []byte, ttl time.Duration) (id string, err error)

Call invokes the remote procedure specified by the parameter method, being the parameter data the input passed to it. On the other hand, ttl is the time that this task will remain in the queue before being considered dead. The returned id can be used to identify the result corresponding to each invokation. If ttl is 0, the message will not expire.

func (*Client) Init

func (c *Client) Init() error

Init initializes the Client object. It establishes the connection with the broker, creating a channel and the queues that will be used under the hood.

func (*Client) Results

func (c *Client) Results() <-chan Result

Results returns a channel used to receive the results returned by the invoked procedures.

func (*Client) Shutdown

func (c *Client) Shutdown()

Shutdown shuts down the client gracefully. Using this method will ensure that all replies sent by the RPC servers to the client will be received by the latter.

type DeliveryMode

type DeliveryMode uint8

DeliveryMode represents the boker's delivery mode.

type Function

type Function func(id string, data []byte) ([]byte, error)

Function declares the signature of the methods that can be registered by an RPC server. The id parameter contains the uuid of the task being executed.

type Result

type Result struct {
	UUID string
	Data []byte
	Err  string
}

A Result contains the data returned by the invoked procedure or an error message, in case that it finished with error. The UUID allows to link the result with the procedure call.

type Server

type Server struct {

	// DeliveryMode allows to configure the delivery mode followed by the
	// broker. The default mode is Persistent.
	DeliveryMode DeliveryMode

	// Parallel allows to define the number of methods to be run in
	// parallel.
	Parallel int

	// Prefetch allows to define the number of tasks to be "cached".
	Prefetch int

	// RateLimit allows to define a limit of deliveries handled per second.
	RateLimit time.Duration

	// TLSConfig allows to configure the TLS parameters used to connect to
	// the broker via amqps.
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

A Server is an RPC sever, which is used to register the methods than can be invoked remotely.

func NewServer

func NewServer(uri, msgsQueue, exchange, kind string) *Server

NewServer returns a reference to a Server object. The paremeter uri is the network address of the broker and msgsQueue is the name of queue that will be created to exchange the messages between clients and servers. On the other hand, the parameters exchange and kind determine the type of exchange that will be created. In fanout mode the queue name is ignored, so each queue has its own unique id.

func (*Server) Init

func (s *Server) Init() error

Init initializes the Server object. It establishes the connection with the broker, creating a channel and the queues that will be used under the hood.

func (*Server) Register

func (s *Server) Register(method string, f Function) error

Register registers a method with the name given by the parameter method and links the function f to it. Register should be called before Init() to avoid dropping messages due to "not registered method" errors.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown shuts down the server gracefully. Using this method will ensure that all requests sent by the RPC clients to the server will be handled by the latter.

Directories

Path Synopsis
_examples
sum
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL