mqrpc

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2022 License: MIT Imports: 9 Imported by: 2

README

mqrpc

Very simple RPC framework based on message queue system.

  • It supports AMQP only for now. (such like RabbitMQ)

Get Package

$ go get github.com/sangwonl/mqrpc

How to use

Import Package
import "github.com/sangwonl/mqrpc"
Creating MqService
const AmqpURI = "amqp://mqrpc:mqrpc@localhost:5672/"
const Namespace = "examples.rpc"
const PeerName = "myPeerName"

svc, err := mqrpc.NewMqService(AmqpURI, Namespace, PeerName)
if err != nil {
    panic(err)
}
Adding Message Handler
svc.AddHandler("someMsgType", func(ctx *mqrpc.Context) interface{} {
    var msgPayload SomeMsgPayload
    json.Unmarshal(ctx.GetMessage().Payload, &msgPayload)

    return nil

    // If you have something to return to client
    // return AnotherMsgPayload{}
})
Run Service
svc.Run(false)

// If you want to use SendToAny(),
// `enableWorker` flag must be set to true at server(worker) side.
// svc.Run(true)
Call RPC
msgClient := mqrpc.DefaultMessageService{MqService: svc}

args := SomeMsgPayload{hello: "world"}
resp, _ := msgClient.Request("myPeerName", "someMsgType", &args, 0)

var result Result
json.Unmarshal(resp.Payload, &result)

Example

You can find more details from /examples.

For trying to run example, you might need to run rabbitmq instance.

docker run -d \
    --name mqrpc \
    --hostname mqrpc \
    -e RABBITMQ_DEFAULT_USER=mqrpc \
    -e RABBITMQ_DEFAULT_PASS=mqrpc \
    -p 5672:5672 \
    -p 15672:15672 \
    rabbitmq:3.8.5-management

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) GetMessage

func (c *Context) GetMessage() Message

type DefaultMessageService added in v0.1.0

type DefaultMessageService struct {
	MqService *MqService
}

func (*DefaultMessageService) Broadcast added in v0.1.0

func (m *DefaultMessageService) Broadcast(msgType string, payload interface{}) error

func (*DefaultMessageService) Identifier added in v0.1.0

func (m *DefaultMessageService) Identifier() string

func (*DefaultMessageService) Request added in v0.1.0

func (m *DefaultMessageService) Request(to, msgType string, payload interface{}, timeout time.Duration) (interface{}, error)

func (*DefaultMessageService) Send added in v0.1.0

func (m *DefaultMessageService) Send(to, msgType string, payload interface{}) error

func (*DefaultMessageService) SendToAny added in v0.1.0

func (m *DefaultMessageService) SendToAny(msgType string, payload interface{}) error

type HandlerFunc

type HandlerFunc func(ctx *Context) interface{}

type Message

type Message struct {
	ReplyTo   string    // application use - address to reply to (ex: RPC)
	MessageId string    // application use - message identifier
	Timestamp time.Time // application use - message timestamp
	Type      MsgType   // application use - message type name

	Payload []byte `mapstructure:"Body"`
}

type MqService

type MqService struct {
	// contains filtered or unexported fields
}

func NewMqService

func NewMqService(amqpUri, namespace, peerName string) (*MqService, error)

func (*MqService) AddHandler

func (mq *MqService) AddHandler(msgType MsgType, handler HandlerFunc) error

func (*MqService) Run

func (mq *MqService) Run(enableWorker bool) error

type MsgType added in v0.1.0

type MsgType string
const MsgTypeReservedReply MsgType = "reply"

Directories

Path Synopsis
examples
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL