yarf

package module
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2019 License: MIT Imports: 10 Imported by: 0

README

GoDoc Go Report Card License

[comment]: ( https://codecov.io ) [comment]: ( Something ci )

yarf - Yet Another RPC Framework

For simple comunication between services

Motivation

There are a few rpc frameworks out there, so why one more. The simple answer is that may of them out there, such as gRPC, Twirp and so on did not fit our need. They are often very opinionated, overly complex and are in some cases much of a black box trying to solve every problem in many languages.

What we found was that we were writing models i protobuf to be used in frameworks such as gRPC and then on both client and server side had to map them into local structs since protobuf was not expresive enough. This instead of just having a versioned struct in a common kit repo or such. In essens fighting with the rpc libs we tired in order for it to work with our use cases.

Overview

Yarf is a rpc framework focusing on ease of use and clear options of how to use.

Features

  • Separation between protocol and transport
  • Support for synchronise calls, channals and callbacks
  • Support for middleware on both client and server side
  • Support for context passing between client and server
  • Support for Custom serializing, both of protocol and content level.
  • Expressiv builder pattern fron client calls
  • Client and server Middleware

Supported transport layers

  • http
  • nats

Quickstart

See examples for more examples/simple

Intallation
go get bitbucket.org/modfin/yarf
go get bitbucket.org/modfin/yarf/...
Server
package main
import (
    "bitbucket.org/modfin/yarf"
    "bitbucket.org/modfin/yarf/transport/thttp"
    "log"
)


func SHA256(req *yarf.Msg, resp *yarf.Msg) (err error) {

    hash := hashing.Sum256(req.Content)
    resp.SetParam("hash",
        base64.StdEncoding.EncodeToString(hash[:]))
    return
}

func join(req *yarf.Msg, resp *yarf.Msg) (err error) {

    slice, ok := req.Param("slice").StringSlice()
    if !ok{
        return errors.New("param arr was not a string slice")
    }

    joined := strings.Join(slice, "")

    resp.SetContent(joined)

    return nil
}

func main(){

	transport, err := thttp.NewHttpTransporter(thttp.Options{})
	if err != nil {
        log.Fatal(err)
    }
    server := yarf.NewServer(transport, "a", "namespace")

    server.HandleFunc(SHA256)
    server.HandleFunc(join)
    server.Handle("add", func(req *yarf.Msg, resp *yarf.Msg) (err error){
        res := req.Param("val1").IntOr(0)+req.Param("val2").IntOr(0)
        resp.SetParam("res", res)
        return nil
    })



    
    log.Fatal(transport.Start())
}
Client
package main
import (
    "bitbucket.org/modfin/yarf"
    "bitbucket.org/modfin/yarf/transport/thttp"
    "log"
    "fmt"
)
func main(){

    transport, err := thttp.NewHttpTransporter(thttp.Options{Discovery: &thttp.DiscoveryDefault{Host:"localhost"}})
    if err != nil {
        log.Fatal(err)
    }
    client := yarf.NewClient(transport)


    // Sending and reciving params
    msg, err := client.Request("a.namespace.add").
        WithParam("val1", 5).
        WithParam("val2", 7).
        Get()
    
    if err != nil{
        log.Fatal(err)
    }
    fmt.Println(" Result of 5 + 7 =", res.Param("res").IntOr(-1))


    //Sending binary content
    msg, err = client.Request("a.namespace.SHA256").
        WithBinaryContent([]byte("Hello Yarf")).
        Get()

    if err != nil {
        return "", err
    }

    hash, ok := msg.Param("hash").String()
    fmt.Println("ok", ok, "hash", hash)


    var joined string

    // Binding response content to string (works with structs, slices and so on)
    err = client.Request("a.namespace.join").
        WithParam("slice", []string{"jo", "in", "ed"}).
        BindResponseContent(&s).
        Done()

    fmt.Println("joined", joined, "err", err)

    // or
    err = client.Call("a.namespace.join", nil, &joined, yarf.NewParam("slice", []string{"joi", "ned"}))

    fmt.Println("joined", joined, "err", err)
}

Test

go test -v ./... ./test.sh, docker is requierd to run integration tests

Design

Yarf consist in large of 5 things, API, Middleware, Protocol, Serlization and Transport. We try to provide good defaults that both preform and are esay to use. But, as most of us know, its not rare to end up in a corner case where things have to be flexible. Therefore we try to seperate things in a clear way that is easy to extend and change.

API

Our main api that we provide for users of yarf is made up by a client, server and the messages that pass between the two.

Middleware

Yarf has support for middleware on both the client and server side of request and both of them expect the same function type. Middleware can be use when the need to decorate a message arise, do logging, caching, authentication or anything else reqiering interception of messages

e.g. simple time logging

 func(request *yarf.Msg, response *yarf.Msg, next yarf.NextMiddleware) error {

		// Runs before client requst
        start = time.Now()

		err := next() // Running other middleware and request to server

		// Runs after client request
		elapsed := time.Now().Sub(start)
        fmt.Println("Request to function", request.Function(), "took", elapsed)

		return err
	}

e.g. a simple server side caching

 func(request *yarf.Msg, response *yarf.Msg, next yarf.NextMiddleware) error {

		// Runs before handler function

		key, ok := request.Param("cachekey").String()
		if !ok {
		   // could not find cachekey in message, runs anyway
		   return next()
		}

		b, found := getCachedItem(key)

		if found{
		    response.SetBinaryContent(b)
		    return nil
		}

		err := next() // Running other middleware and handler

		// Runs after handler function

        if err != nil{
            return err
        }

        setCachedItem("key", response.Content)

		return err
	}

On a client, middleware can be set on per request bases (local) or for all request going through the client (global) and are run as follows

Client
   |
   V
   Global0 --> Global1 --> Local0 --> Local1 --> Call to transport layer
                                                        |
                                                        V
   Global0 <-- Global1 <-- Local0 <-- Local1 <-- Response from transport layer
   |
   V
Client

On the server, middleware can be set on per handler bases (local) or for all reguest going through the server (global) and are run as follow

Incomming from transport layer
   |
   V
   Global0 --> Global1 --> Local0 --> Local1 --> Running handler function
                                                      |
                                                      V
   Global0 <-- Global1 <-- Local0 <-- Local1 <-- Response from handler function
   |
   V
Outgoing to transport layer

Protocol

The yarf protocol is pretty straight forward but has a few layers to it. It is not really that interesting unless you for some reason whant your own serilization or need to change it

A message sent between client and server always start with a string prepended by a newline, "\n", followed by bytes.

The first line describes the content type of the followig bytes, this in order to deserlize the message into a yarf.Msg. This exist in order to allow for different ways of (de)serlizing a message

The following bytes are then deserlized into the follwoing struct

type Msg struct {
	Headers map[string]interface{}
	Content []byte
}

The message struct header field contain mutiple keys that is usefull for the yarf, but can also be used to pass paremeters. I shall howerver always contain a content-type which helps yarf in dezerlization of Content, if needed.

Serialization

This might be a some what confusing topic since there is a few layers to it. But in general there are only two that has to be considerd. The serialization of the protocol and the serialization of the content.

For both the the protocol and the content a content type shall aways be provided. This helps the reciver of the message to deserialize the message and the content.

Serlizations can be done in different combinations and independet of each other. The default is msgpack for both but can be changed per client, server or message basis. Since it is might be hard to track what server has what and so on, serializers can be regiserd in yarf by yarf.RegisterSerializer(serializer). Yarf also provde some extras ones, msgpack and json, which can be regiserd by import _ bitbucket.org/modfin/yarf/serializers and we think this should cover most needs.

Transport

The transport layer works independetly of everything else and is responsibel for service discovery, transport of data and provide to a context that can be canceld from the client to the server.

A implmentation using Nats and one using HTTP is provided with yarf.

A transporter shall implment a rather simple api in order to work with yarf. But since different transporters have different properties, some thinsgs may vary. e.g. the function namespacing using Nats is a global and has no real need for service discover, while HTTP has local namespace for each specific serivece.

TODO

  • Unit testing
  • More documentation
  • Add support for reader and writers, for streaming requests/responses
  • Http Transport
    • Improving service discover on HTTP transport
      • Consul
      • etcd
      • DNS A
      • DNS SRV
    • Improving loadbalancing on HTTP transport
    • Support for http2 and tls transport
  • Middlewares
    • Proper Logging
    • Statistics and latency collection
    • Circuit breakers
    • Caching
    • Authentictaion, JWT

Documentation

Index

Constants

View Source
const HeaderContentType = "content-type"

HeaderContentType is the function name header param name

View Source
const HeaderFunction = "function"

HeaderFunction is the function name header param name

View Source
const HeaderStatus = "status"

HeaderStatus is the status header param name

View Source
const HeaderUUID = "status"

HeaderUUID is the uuid header param name

View Source
const StatusHandlerError = 510

StatusHandlerError the handler function of request failed

View Source
const StatusInternalError = 500

StatusInternalError rpc status internal server error

View Source
const StatusInternalPanic = 501

StatusInternalPanic rpc status internal server error when recovered from panic

View Source
const StatusMarshalError = 550

StatusMarshalError could not marshal data

View Source
const StatusOk = 200

StatusOk rpc status ok

View Source
const StatusUnmarshalError = 551

StatusUnmarshalError could not unmarshal data

Variables

This section is empty.

Functions

func RegisterSerializer

func RegisterSerializer(serializer Serializer)

RegisterSerializer lets a user register a protocolSerializer for a specific content type this allow yarf to bind message content to that specific serial format. Yarf standard serializers can be registered by importing with side effect e.g. import _ "bitbucket.org/modfin/yarf/serializers"

Types

type CallTransporter

type CallTransporter interface {
	Call(ctx context.Context, function string, requestData []byte) (response []byte, err error)
}

CallTransporter is the interface that must be fulfilled for a transporter to be used as a client.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a struct wrapping a transporting layer and methods for using yarf

func NewClient

func NewClient(t CallTransporter) Client

NewClient create a new yarf client using a specific transporter

func (*Client) Call

func (c *Client) Call(function string, requestData interface{}, responseData interface{}, requestParams ...Param) error

Call is a short hand performs a request from function name, and req param. The response is unmarshaled into resp

func (*Client) Request

func (c *Client) Request(function string) *RPC

Request creates a request builder in yarf

func (*Client) WithMiddleware

func (c *Client) WithMiddleware(middleware ...Middleware)

WithMiddleware adds middleware to client request for pre and post processing

func (*Client) WithProtocolSerializer

func (c *Client) WithProtocolSerializer(serializer Serializer)

WithProtocolSerializer sets the protocolSerializer used for transport.

func (*Client) WithSerializer

func (c *Client) WithSerializer(serializer Serializer)

WithSerializer sets the contentSerializer used for content if not binary

type ListenTransporter

type ListenTransporter interface {
	Listen(function string, toExec func(ctx context.Context, requestData []byte) (responseData []byte)) error
	Close() error
}

ListenTransporter is the interface that must be fulfilled for a transporter to be used as a server

type Middleware

type Middleware func(request *Msg, response *Msg, next NextMiddleware) error

Middleware the function that shall be implemented for a middleware

type Msg

type Msg struct {
	Headers map[string]interface{}
	Content []byte
	// contains filtered or unexported fields
}

Msg represents a message that is being passed between client and server

func (*Msg) BindContent

func (m *Msg) BindContent(content interface{}) (err error)

BindContent is used to unmarshal/bind content data to input interface. It will look for a proper deserializer matching header content-type. Serializer can be registered by yarf.RegisterSerializer()

func (*Msg) ContentType

func (m *Msg) ContentType() (contentType string, ok bool)

ContentType returns the ContentType header of the request, if one exist

func (*Msg) Context

func (m *Msg) Context() context.Context

Context returns the context of the message. This is primarily for use on the server side, in order to monitor Done from client side

func (*Msg) Function

func (m *Msg) Function() (status string, ok bool)

Function returns the function name being called, if one exist

func (*Msg) InternalError

func (m *Msg) InternalError() *Msg

InternalError sets the status header to 500

func (*Msg) Ok

func (m *Msg) Ok() *Msg

Ok sets the status header to 200

func (*Msg) Param

func (m *Msg) Param(key string) *Param

Param receives a param from the params header, it is wrapped in a param struct which implements helper methods in how to access params.

func (*Msg) SetBinaryContent

func (m *Msg) SetBinaryContent(content []byte) *Msg

SetBinaryContent sets the input data as content of the message

func (*Msg) SetContent

func (m *Msg) SetContent(content interface{}) *Msg

SetContent sets the input interface as the content of the message

func (*Msg) SetContentType

func (m *Msg) SetContentType(contentType string) *Msg

SetContentType sets the statues header of the message

func (*Msg) SetContentUsing

func (m *Msg) SetContentUsing(content interface{}, serializer Serializer) *Msg

SetContentUsing serializes the content with a specific contentSerializer and sets it as a binary payload

func (*Msg) SetHeader

func (m *Msg) SetHeader(key string, value interface{}) *Msg

SetHeader sets a generic header of the message

func (*Msg) SetParam

func (m *Msg) SetParam(key string, value interface{}) *Msg

SetParam sets a param in the params header of the message. Which later provides helper methods of de/serializations and defaults.

func (*Msg) SetParams

func (m *Msg) SetParams(params ...Param) *Msg

SetParams params in the params header of the message. Which later provides helper methods of de/serializations and defaults.

func (*Msg) SetStatus

func (m *Msg) SetStatus(code int) *Msg

SetStatus sets the statues header of the message

func (*Msg) Status

func (m *Msg) Status() (status int, ok bool)

Status returns the status header of the request, if one exist

func (*Msg) UUID

func (m *Msg) UUID() (status string, ok bool)

UUID returns the request uuid

func (*Msg) WithContext

func (m *Msg) WithContext(ctx context.Context) *Msg

WithContext sets the context of the message. The supplied context will replace the current one. If wrapping is intended get the current context first using Context

type NextMiddleware

type NextMiddleware func() error

NextMiddleware simple alias for func() error

type Param

type Param struct {
	// contains filtered or unexported fields
}

Param is a key/value entry and a struct which implements helper methods to help with retrial of data types from value.

func NewParam

func NewParam(key string, value interface{}) Param

NewParam creates a new key value param from input

func (*Param) Bool

func (m *Param) Bool() (bool, bool)

Bool returns value as a bool, if possible

func (*Param) BoolOr

func (m *Param) BoolOr(def bool) bool

BoolOr returns value as a bool, otherwise the provided default

func (*Param) BoolSlice

func (m *Param) BoolSlice() ([]bool, bool)

BoolSlice returns value as a []bool, if possible

func (*Param) BoolSliceOr

func (m *Param) BoolSliceOr(def []bool) []bool

BoolSliceOr returns value as a []bool, otherwise the provided default

func (*Param) Float

func (m *Param) Float() (float64, bool)

Float returns value as a float64, if possible

func (*Param) FloatOr

func (m *Param) FloatOr(def float64) float64

FloatOr returns value as a float64, otherwise the provided default

func (*Param) FloatSlice

func (m *Param) FloatSlice() ([]float64, bool)

FloatSlice returns value as a []float64, if possible

func (*Param) FloatSliceOr

func (m *Param) FloatSliceOr(def []float64) []float64

FloatSliceOr returns value as a []float64, otherwise the provided default

func (*Param) Int

func (m *Param) Int() (int64, bool)

Int returns value as a int64, if possible

func (*Param) IntOr

func (m *Param) IntOr(def int64) int64

IntOr returns value as a int64, otherwise the provided default

func (*Param) IntSlice

func (m *Param) IntSlice() ([]int64, bool)

IntSlice returns value as a []int64, if possible

func (*Param) IntSliceOr

func (m *Param) IntSliceOr(def []int64) []int64

IntSliceOr returns value as a []int64, otherwise the provided default

func (*Param) IsNil

func (m *Param) IsNil() bool

IsNil returns true if value is nil

func (*Param) IsSlice

func (m *Param) IsSlice() bool

IsSlice returns true if value is a array

func (*Param) Key

func (m *Param) Key() string

Key returns the key of the key/value pair

func (*Param) String

func (m *Param) String() (string, bool)

String returns value as a string, if possible

func (*Param) StringOr

func (m *Param) StringOr(defaultTo string) string

StringOr returns value as a string, otherwise the provided default

func (*Param) StringSlice

func (m *Param) StringSlice() ([]string, bool)

StringSlice returns value as a []string, if possible

func (*Param) StringSliceOr

func (m *Param) StringSliceOr(defaultTo []string) []string

StringSliceOr returns value as a []string, otherwise the provided default

func (*Param) Uint

func (m *Param) Uint() (uint64, bool)

Uint returns value as a uint64, if possible

func (*Param) UintOr

func (m *Param) UintOr(def uint64) uint64

UintOr returns value as a uint64, otherwise the provided default

func (*Param) UintSlice

func (m *Param) UintSlice() ([]uint64, bool)

UintSlice returns value as a []uint64, if possible

func (*Param) UintSliceOr

func (m *Param) UintSliceOr(def []uint64) []uint64

UintSliceOr returns value as a []uint64, otherwise the provided default

func (*Param) Value

func (m *Param) Value() interface{}

Value returns the value of the key/value pair

type RPC

type RPC struct {
	// contains filtered or unexported fields
}

RPC represents a request to in yarf and is used to build a request using the builder pattern

func (*RPC) Async

func (r *RPC) Async() *RPCTransit

Async returns a performs the request and return a transit object.

func (*RPC) BindResponseContent

func (r *RPC) BindResponseContent(content interface{}) *RPC

BindResponseContent will unmarshal response into interface passed into method

func (*RPC) Callbacks

func (r *RPC) Callbacks(msgCallback func(*Msg), errorCallback func(error)) *RPCTransit

Callbacks sets a msgCallback function that will be called on success or failure, it does nothing if called after exec()

func (*RPC) Channels

func (r *RPC) Channels() (<-chan *Msg, <-chan error)

Channels returns msgChannel associated with the request Channels() will call UseChannel() and then exec() if exec() has not been called.

func (*RPC) Done

func (r *RPC) Done() error

Done waits until the rpc request is done and has returned a result. If the result is already resolved, the error will be returned directly

func (*RPC) Error

func (r *RPC) Error() error

Error return the error of the request, if any at the point of calling it. Meaning that it might return nil and later return a none nil value

func (*RPC) Get

func (r *RPC) Get() (*Msg, error)

Get wait for request to be done before returning with the resulting message and error.

func (*RPC) WithBinaryContent

func (r *RPC) WithBinaryContent(data []byte) *RPC

WithBinaryContent sets requests content as a binary format and marshaling will not be preformed, it does nothing if called after exec()

func (*RPC) WithContent

func (r *RPC) WithContent(requestData interface{}) *RPC

WithContent sets requests content, it does nothing if called after exec()

func (*RPC) WithContentUsing

func (r *RPC) WithContentUsing(requestData interface{}, serializer Serializer) *RPC

WithContentUsing sets requests content with a specific protocolSerializer, it does nothing if called after exec()

func (*RPC) WithContext

func (r *RPC) WithContext(ctx context.Context) *RPC

WithContext sets context of request for outside control, it does nothing if called after exec()

func (*RPC) WithMiddleware

func (r *RPC) WithMiddleware(middleware ...Middleware) *RPC

WithMiddleware adds middleware to specific request.

func (*RPC) WithParam

func (r *RPC) WithParam(key string, value interface{}) *RPC

WithParam set a param that can be read by server side, like a query param in http requests, it does nothing if called after exec()

func (*RPC) WithParams

func (r *RPC) WithParams(params ...Param) *RPC

WithParams set params provided that can be read by server side, like a query param in http requests, it does nothing if called after exec()

func (*RPC) WithUUID

func (r *RPC) WithUUID(uuid string) *RPC

WithUUID sets the uuid for the request enabling tracing of requests

type RPCError

type RPCError struct {
	Status int
	Msg    string
}

RPCError struct for yarf rpc calls

func NewRPCError

func NewRPCError(status int, msg string) RPCError

NewRPCError create a RPCError struct for yarf rpc calls

func (RPCError) Error

func (e RPCError) Error() string

type RPCTransit

type RPCTransit struct {
	// contains filtered or unexported fields
}

RPCTransit represents a request to in yarf when in transit and the purpose of it is to restricts the function that are allowed to be called when the request in transit in order to expose internal state and what can be done.

func (*RPCTransit) Callbacks

func (r *RPCTransit) Callbacks(callback func(*Msg), errorCallback func(error)) *RPCTransit

Callbacks sets a msgCallback function that will be called on success or failure, it does nothing if called after exec()

func (*RPCTransit) Channels

func (r *RPCTransit) Channels() (<-chan *Msg, <-chan error)

Channels returns msgChannel associated with the request, these are created if UseChannels() is called before exec(). Channels() will call UseChannel() and then exec() if exec() has not been called.

func (*RPCTransit) Done

func (r *RPCTransit) Done() error

Done waits until the rpc request is done and has returned a result. If the result is already resolved, the error will be returned directly

func (*RPCTransit) Error

func (r *RPCTransit) Error() error

Error return the error of the request, if any at the point of calling it. Meaning that it might return nil and later return a none nil value.

func (*RPCTransit) Get

func (r *RPCTransit) Get() (*Msg, error)

Get wait for request to be done before returning with the resulting message and error. It the response already is resolved it will return the resulting message or error without waiting for anything

type Serializer

type Serializer struct {
	ContentType string
	Marshal     func(v interface{}) ([]byte, error)
	Unmarshal   func(data []byte, v interface{}) error
}

Serializer is the interface that must be fulfilled for protocolSerializer of data before transport.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a yarf server with a particular transporter

func NewServer

func NewServer(t ListenTransporter, namespace ...string) Server

NewServer creates a new server with a particular server and name space of functions provided

func (*Server) Handle

func (s *Server) Handle(function string, handler func(request *Msg, response *Msg) error, middleware ...Middleware)

Handle creates a server endpoint for yarf using the handler function, the name of function will be on the format "namespace.function" e.g. my-namespace.Add, if a string "Add" is passed into the function coupled with a handler function

func (*Server) HandleFunc

func (s *Server) HandleFunc(handler func(request *Msg, response *Msg) error, middleware ...Middleware)

HandleFunc creates a server endpoint for yarf using the handler function, the name of function will be on the format "namespace.FunctionName" e.g. my-namespace.Add, if a function named Add is passed into the function

func (*Server) WithMiddleware

func (s *Server) WithMiddleware(middleware ...Middleware)

WithMiddleware add middleware to all requests

func (*Server) WithProtocolSerializer

func (s *Server) WithProtocolSerializer(serializer Serializer)

WithProtocolSerializer sets the protocolSerializer used for transport.

func (*Server) WithSerializer

func (s *Server) WithSerializer(serializer Serializer)

WithSerializer sets the default protocolSerializer for content.

type Transporter

type Transporter interface {
	CallTransporter
	ListenTransporter
}

Transporter is the interface that must be fulfilled for a transporter.

Directories

Path Synopsis
example
transport

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL