rpc

package module
v0.0.0-...-f895429 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

README

Go RPC (using messaging system)

Go library for RPC client/server based on messaging system (currently support only Kafka)

Test

Features include:

Running example

Server Example

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/pongsatt/go-rpc"
	"github.com/pongsatt/go-rpc/example"
	"github.com/pongsatt/go-rpc/messaging"
)

func main() {
	kafkaClient := messaging.NewCpKafkaClient(&messaging.CpKafkaConfig{
		Brokers: "localhost:9092",
	})

	requestReplyClient := rpc.NewRequestReplyClient("RealServer", kafkaClient, &rpc.RequestReplyConfig{})

	err := example.NewRealServerProvider(
		requestReplyClient, &example.RealServer{})
	if err != nil {
		panic(err)
	}
	fmt.Println("server started")

	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

	<-termChan
	kafkaClient.Shutdown()
}

Start kafka server (using Readpanda for simplicity)

./start_servers.sh

Start server

go run example/server/*

Server output

➜  go-rpc go run example/server/*
server started

Client Example

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/pongsatt/go-rpc"
	"github.com/pongsatt/go-rpc/example"
	"github.com/pongsatt/go-rpc/messaging"
)

func main() {
	kafkaClient := messaging.NewCpKafkaClient(&messaging.CpKafkaConfig{
		Brokers: "localhost:9092",
	})

	timeout := 30 * time.Second
	requestReplyClient := rpc.NewRequestReplyClient("RealServer", kafkaClient, &rpc.RequestReplyConfig{
		Timeout: &timeout,
	})

	// interface -> proxy
	proxy := example.NewServerProxy(requestReplyClient)

	// execute normal code
	client := example.NewCleint(proxy)

	for i := 0; i < 100; i++ {
		start := time.Now()
		id, err := client.Create("test")
		fmt.Printf("time use %s\n", time.Since(start))

		if err != nil {
			fmt.Printf("error creating %v\n", err)
		} else {
			fmt.Printf("got id %s\n", id)
		}
	}

	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

	<-termChan
	kafkaClient.Shutdown()
}

Run client

go run example/client/*

Client output

➜  go-rpc go run example/client/*
time use 275.060096ms
got id Hi test

Code Generation

Server

//go:generate go run github.com/pongsatt/go-rpc/cmd/gen provider -name=RealServer

package example

// RealServer struct
type RealServer struct {
}

// NewRealServer creates new instance
func NewRealServer() *RealServer {
	return &RealServer{}
}

// GetID func
func (proxy *RealServer) GetID(seed string) (string, error) {
	return "Hi " + seed, nil
}

// Create order
func (proxy *RealServer) Create(order *Order) (string, error) {
	return "ok", nil
}

Client

//go:generate go run github.com/pongsatt/go-rpc/cmd/gen proxy

package example

// Server interface
type Server interface {
	GetID(seed string) (string, error)
	Create(order *Order) (string, error)
}

// Client struct
type Client struct {
	server Server
}

// NewCleint new instance
func NewCleint(server Server) *Client {
	return &Client{server}
}

// Create func
func (client *Client) Create(seed string) (string, error) {
	return client.server.GetID(seed)
}

Run go generate

go generate ./...

Output

generating proxy
writing file client_gen.go # proxy for Server interface
generate done
generating provider
writing file realserver_gen.go # run rpc server for RealServer struct
generate done

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Call

func Call(rpcName string, fPtr interface{}, rpcSend func(reqBytes []byte) ([]byte, error))

Call makes rpc call

func Encode

func Encode(data RPCdata) ([]byte, error)

Encode The RPCdata in binary format which can be sent over the network.

func Execute

func Execute(req RPCdata, fFunc interface{}) ([]byte, error)

Execute the given function if present

Types

type MessagingClient

type MessagingClient interface {
	Publish(msg *messaging.Msg) error
	Consume(topic string, groupID string, handler func(msg *messaging.Msg) error) error
	IsLocalConsume(topic string, key string) (bool, error)
}

MessagingClient represents messaging client such as kafka

type RPCdata

type RPCdata struct {
	Name string        // name of the function
	Args []interface{} // request's or response's body expect error.
	Err  string        // Error any executing remote server
}

RPCdata represents the serializing format of structured data

func Decode

func Decode(b []byte) (RPCdata, error)

Decode the binary data into the Go RPC struct

type RequestReplyClient

type RequestReplyClient struct {
	// contains filtered or unexported fields
}

RequestReplyClient represents client instance

func NewRequestReplyClient

func NewRequestReplyClient(topic string, client MessagingClient, config *RequestReplyConfig) *RequestReplyClient

NewRequestReplyClient creates new request reply client instance

func (*RequestReplyClient) Request

func (c *RequestReplyClient) Request(key string, payload []byte) ([]byte, error)

Request sends a request with payload and wait for a response. key is used as a partition key.

func (*RequestReplyClient) SubscribeRequest

func (c *RequestReplyClient) SubscribeRequest(handler func(payload []byte) ([]byte, error)) error

SubscribeRequest waits for a request to come in and call the handler func

type RequestReplyConfig

type RequestReplyConfig struct {
	Timeout *time.Duration
}

RequestReplyConfig represents request reply configuration

Directories

Path Synopsis
cmd
gen
Code generated by generator, DO NOT EDIT.
Code generated by generator, DO NOT EDIT.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL