rmq

package module
v0.0.0-...-32f4539 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2018 License: Apache-2.0 Imports: 1 Imported by: 0

README

rmq: R Messaging and Queuing

installation

pre-reqs:

a) make sure you have a working go (golang) installation.

b) make sure you have a working gcc (C compiler) installed.

c) make sure you have R installed.

d) download rmq

$ go get -d github.com/glycerine/rmq

e) tell the C compiler where to find your R headers.

Locate your 'R.h' header, wherever it lives in your filesystem (e.g. find / -name 'R.h' if you have to), and add a '-I' flag to the src/Makefile in these places, to reflect your local location of the 'R.h' and related headers.

https://github.com/glycerine/rmq/blob/master/src/Makefile#L15

https://github.com/glycerine/rmq/blob/master/src/Makefile#L26

For example, if your R.h header has been installed in /usr/local/lib/R/include, then you would have the Makefile lines look like this:

gcc -fPIC -O2 -c -o interface.o cpp/interface.cpp -Iinclude/ -I/usr/local/lib/R/include

f) then build:

$ cd $GOPATH/src/github.com/glycerine/rmq
$ make install
$ R
> require(rmq)
>

this should also work, once you have done make install to build vendor.tar.gz:

$ cd $GOPATH/src/github.com/glycerine
$ R CMD INSTALL rmq

docs

pdf in R documentation format

godoc API reference

GoDoc

RMQ Or: How to utilize Go libraries from R.

The much anticipated Go 1.5 release brought strong support for building C-style shared libraries (.so files) from Go source code and libraries.

This is huge. It opens up many exciting new possibilities. In this proof-of-concept project (rmq), we explore using this new capability to extend R with Go libraries.

Package rmq provides messaging based on msgpack and websockets. It demonstrates calling from R into Golang (Go) libraries to extend R with functionality available in Go.

We use the Go library https://github.com/ugorji/go codec for msgpack encoding and decoding. This is a high performance implementation. We use it in a mode where it only supports the updated msgpack 2 (current) spec. This is critical for interoperability with other compiled languages that distiguish between utf8 strings and binary blobs (otherwise embedded '\0' zeros in blobs cause problems).

For websockets, we use the terrific https://github.com/gorilla/websocket library. As time permits in the future, we may extend more features aiming towards message queuing as well. The gorilla library supports securing your communication with TLS certs.

##Status

Excellent. Tested on OSX and Linux. Documentation has been written and is available. The package is functionally complete for the RPC over websockets and msgpack based serialization. After interactive usage, I added SIGINT handling so that the web-server can be stopped during development with a simple ctrl-c at the R console. The client side will be blocked during calls (it does not poll back to R while waiting on the network) but has a configurable timeout (default 5 seconds), that allows easy client-side error handling.

example R session, showing the msgpack library at work

> library(rmq)
> input = list() # make an R object to serialize.
> input$Blob = as.raw(c(0xff,0xf0,0x06))
> input$D = c("hello","world")
> input$E = c(32, 17)
> o=to.msgpack(input)
> o # look at the raw bytes in msgpack format
 [1] 83 a4 42 6c 6f 62 c4 03 ff f0 06 a1 44 92 a5 68 65 6c 6c 6f a5 77 6f 72 6c
[26] 64 a1 45 92 cb 40 40 00 00 00 00 00 00 cb 40 31 00 00 00 00 00 00
> from.msgpack(o) # now the inverse
$Blob
[1] ff f0 06

$D
[1] "hello" "world"

$E
[1] 32 17

> 

###sample session showing web-socket based RPC, from both the client and the server side:

server-side:

> require(rmq) 
> handler <- function(x) {
        print("handler called back with argument x = ")
        print(x)
        reply = list()
        reply$hi = "there!"
        reply$yum = c(1.1, 2.3)
        reply$input = x
        reply
    }
+ + + + + + + + > > 
> listenAndServe(handler, addr = "127.0.0.1:9090")
ListenAndServe listening on address '127.0.0.1:9090'...
[1] "handler called back with argument x = "
$hello
[1] "cran"  "this"  "is"    "great"

  [give Ctrl-c to stop the web-server]
>

client-side:

> require(rmq)
> my.message=list()
> my.message$hello =c("cran","this","is","great")
> rmq.call(addr = "127.0.0.1:9090", my.message)
$hi
[1] "there!"
$input
$input$hello
[1] "cran"  "this"  "is"    "great"
$yum
[1] 1.1 2.3
> 

additional example scripts

I've added two example scripts, example-server.R and example-client.R. These live in the top level of the repo. Run example-server.R first. Then in a different window, run example-client.R. These are simultaneously bash scripts and R source-able scripts; you can run them straight from the shell if 'R' is on your PATH.

And the reverse: embedding R inside your Golang program

In addition to using a Golang library under R, one can alternatively embed R as a library inside a Go executable. This is equally easy using the SexpToIface() function. Here is an example. This is taken from the main source file src/rmq/rmq.go.

func main() {
        // Give an example also of how to embed R in a Go program.

        // Introduction to embedding R:
        //
        // While RMQ is mainly designed to embed Go under R, it
        // defines functions that make embedding R in Go
        // quite easy too. We use SexpToIface() to generate
        // a go inteface{} value. For simple uses, this may be
        // more than enough.
        //
        // If you wish to turn results into
        // a pre-defined Go structure, the interface{} value could
        // transformed into msgpack (as in encodeRIntoMsgpack())
        // and from there automatically parsed into Go structures
        // if you define the Go structures and use
        // https://github.com/tinylib/msgp to generate the
        // go struct <-> msgpack encoding/decoding boilerplate.
        // The tinylib/msgp library uses go generate and is
        // blazing fast. This also avoids maintaining a separate
        // IDL file. Your Go source code becomes the defining document
        // for your data structures.

        var iface interface{}
        C.callInitEmbeddedR()
        myRScript := "rnorm(100)" // generate 100 Gaussian(0,1) samples
        var evalErrorOccurred C.int
        r := C.callParseEval(C.CString(myRScript), &evalErrorOccurred)
        if evalErrorOccurred == 0 && r != C.R_NilValue {
                C.Rf_protect(r)
                iface = SexpToIface(r)
                fmt.Printf("\n Embedding R in Golang example: I got back from evaluating myRScript:\n")
                goon.Dump(iface)
                C.Rf_unprotect(1) // unprotect r
        }
        C.callEndEmbeddedR()
}

Copyright 2015 Jason E. Aten, Ph.D.

License: Apache 2.0 for the top level RMQ code and integration. Individual vendored library components include their own licenses which are Apache2, MIT, or BSD style. See the src/vendor subdirectories for details.

Requires: Go 1.5.1 for GO15VENDOREXPERIMENT=1

Documentation

Overview

Summary: rmq passes msgpack2 messages over websockets between Golang and the R statistical language. It is an R package.

rmq: R Messaging and Queuing

## Or: How to utilize Go libraries from R.

The much anticipated Go 1.5 release brought strong support for building C-style shared libraries (.so files) from Go source code and libraries.

*This is huge*. It opens up many exciting new possibilities. In this project (rmq), we explore using this new capability to extend R with Go libraries.

Package rmq provides messaging based on msgpack and websockets. It demonstrates calling from R into Golang (Go) libraries to extend R with functionality available in Go.

## why msgpack

Msgpack is binary and self-describing. It can be extremely fast to parse. Moreover I don't have to worry about where to get the schema .proto file. The thorny problem of how to *create* new types of objects when I'm inside R just goes away. The data is self-describing, and new structures can be created at run-time.

Msgpack supports a similar forward evolution/backwards compatibility strategy as protobufs. Hence it allows incremental rolling upgrades of large compute clusters using it as a protocol. That was the whole raison d'etre of protobufs. Old code ignores new data fields. New code uses defaults for missing fields when given old data.

Icing on the cake: msgpack (like websocket) is usable from javascript in the browser (unlike most everything else). Because it is very simple, msgpack has massive cross-language support (55 bindings are listed at http://msgpack.org). Overall, msgpack is flexible while being fast, simple and widely supported, making it a great fit for data exchange between interpretted and compiled environments.

## implementation

We use the Go library https://github.com/ugorji/go codec for msgpack encoding and decoding. This is a high performance implementation. We use it in a mode where it only supports the updated msgpack 2 (current) spec. This is critical for interoperability with other compiled languages that distinguish between utf8 strings and binary blobs (otherwise embedded '\0' zeros in blobs cause problems).

For websockets, we use the terrific https://github.com/gorilla/websocket library. As time permits in the future, we may extend more features aiming towards message queuing as well. The gorilla library supports securing your communication with TLS certificates.

##Status

Excellent. Tested on OSX and Linux. Documentation has been written and is available. The package is functionally complete for the RPC over websockets and msgpack based serialization. After interactive usage, I added SIGINT handling so that the web-server can be stopped during development with a simple Ctrl-c at the R console. The client side will be blocked during calls (it does not poll back to R while waiting on the network) but has a configurable timeout (default 5 seconds), that allows easy client-side error handling.

## structure of this repo

This repository is mainly structured as an R package. It is designed to be built and installed into an R (statistical environment) installation, using the standard tools for R.

This package doesn't directly create a re-usable go library. Instead we target a c-shared library (rmq.so) that will install into R using 'R CMD INSTALL rmq'. See: 'make install' or 'make build' followed by doing `install.packages('./rmq_1.0.1.tar.gz', repos=NULL)` from inside R (assuming the package is in your current directory; if not then adjust the ./ part of the package path).

The code also serves as an example of how to use golang inside R.

## embedding R in Golang

While RMQ is mainly designed to embed Go under R, it defines functions, in particular SexpToIface(), that make embedding R in Go quite easy too. See the comments and example in main() of the central rmq.go file (https://github.com/glycerine/rmq/blob/master/src/rmq/rmq.go) for a demonstration.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecodeMsgpackBinArrayHeader

func DecodeMsgpackBinArrayHeader(p []byte) (headerSize int, payloadSize int, totalFrameSize int, err error)

DecodeMsgpackBinArrayHeader parses the first 2-5 bytes of a msgpack-format serialized binary array and returns the headerSize, payloadSize and totalFramesize for the frame the starts at p[0]. This is a utility function for decoding msgpack objects that are always wrapped in a 2-5 byte binary array header.

func FromMsgpack

func FromMsgpack(s C.SEXP) C.SEXP

FromMsgpack converts a serialized RAW vector of of msgpack2 encoded bytes into an R object. We use msgpack2 so that there is a difference between strings (utf8 encoded) and binary blobs which can contain '\0' zeros. The underlying msgpack2 library is the awesome https://github.com/ugorji/go/tree/master/codec library from Ugorji Nwoke.

func ListenAndServe

func ListenAndServe(addr_ C.SEXP, handler_ C.SEXP, rho_ C.SEXP) C.SEXP

ListenAndServe is the server part that expects calls from client in the form of RmqWebsocketCall() invocations. The underlying websocket library is the battle tested https://github.com/gorilla/websocket library from the Gorilla Web toolkit. http://www.gorillatoolkit.org/

addr_ is a string in "ip:port" format. The server will bind this address and port on the local host.

handler_ is an R function that takes a single argument. It will be called back each time the server receives an incoming message. The returned value of handler becomes the reply to the client.

rho_ in an R environment in which the handler_ callback will occur. The user-level wrapper rmq.server() provides a new environment for every call back by default, so most users won't need to worry about rho_.

Return value: this is always R_NilValue.

Semantics: ListenAndServe() will start a new webserver everytime it is called. If it exits due to a call into R_CheckUserInterrupt() or Rf_error(), then a background watchdog goroutine will notice the lack of heartbeating after 300ms, and will immediately shutdown the listening websocket server goroutine. Hence cleanup is fairly automatic.

Signal handling:

SIGINT (ctrl-c) is noted by R, and since we regularly call R_CheckUserInterrupt(), the user can stop the server by pressing ctrl-c at the R-console. The go-runtime, as embedded in the c-shared library, is not used to being embedded yet, and so its (system) signal handling facilities (e.g. signal.Notify) should *not* be used. We go to great pains to actually preserve the signal handling that R sets up and expects, and allow the go runtime to see any signals just creates heartache and crashes.

func ReadMsgpackFrame

func ReadMsgpackFrame(rawStream C.SEXP, byteOffset C.SEXP) C.SEXP

ReadMsgpackFrame reads the msgpack frame at byteOffset in rawStream, decodes the 2-5 bytes of a msgpack binary array (either bin8, bin16, or bin32), and returns and the decoded-into-R object and the next byteOffset to use. This is a helper for dealing with large byte streams of msgpack bytes.

func ReadNewlineDelimJson

func ReadNewlineDelimJson(rawStream C.SEXP, byteOffset C.SEXP) C.SEXP

ReadNewlineDelimJson reads a json object at byteOffset in rawStream, expects it to be newline terminated (see http://jsonlines.org/)), and returns the decoded-into-R object and the next byteOffset to use (the byte just after the terminating newline). This is a helper for dealing with large newline delimited streams of JSON text that are stored as raw byte arrays.

func RmqWebsocketCall

func RmqWebsocketCall(addr_ C.SEXP, msg_ C.SEXP, timeout_msec_ C.SEXP) C.SEXP

RmqWebsocketCall() is the client part that talks to the server part waiting in ListenAndServe(). ListenAndServe is the server part that expects calls from client in the form of RmqWebsocketCall() invocations. The underlying websocket library is the battle tested https://github.com/gorilla/websocket library from the Gorilla Web toolkit. http://www.gorillatoolkit.org/

addr_ is an "ip:port" string: where to find the server; it should match the addr_ the server was started with.

msg_ is the R object to be sent to the server.

timeout_msec_ is a numeric count of milliseconds to wait for a reply from the server. Timeouts are the only way we handle servers that accept our connect and then crash or take too long. Although a timeout of 0 will wait forever, this is not recommended. SIGINT (ctrl-c) will not interrupt a waiting client, so do be sure to give it some sane timeout. The default is 5000 msec (5 seconds).

func SexpToIface

func SexpToIface(s C.SEXP) interface{}

SexpToIface() does the heavy lifting of converting from an R value to a Go value. Initially just a subroutine of the internal encodeRIntoMsgpack(), it is also useful on its own for doing things like embedding R inside Go.

Currently VECSXP, REALSXP, INTSXP, RAWSXP, STRSXP, and LGLSXP are supported. In other words, we decode: lists, numeric vectors, integer vectors, raw byte vectors, string vectors, boolean vectors, and recursively defined list elements.

If list elements are named, the named list is turned into a map in Go.

func ToMsgpack

func ToMsgpack(s C.SEXP) C.SEXP

ToMsgpack converts an R object into serialized RAW vector of msgpack2 encoded bytes. We use msgpack2 so that there is a difference between strings (utf8 encoded) and binary blobs which can contain '\0' zeros. The underlying msgpack2 library is the awesome https://github.com/ugorji/go/tree/master/codec library from Ugorji Nwoke.

Types

This section is empty.

Directories

Path Synopsis
src
rmq
rmq/gorilla-examples
package name: rmq
package name: rmq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL