minoch

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: BSD-3-Clause Imports: 10 Imported by: 0

Documentation

Overview

Package minoch is an implementation of Mino that is using channels and a local manager to exchange messages.

Because it is using only Go channels to communicate, this implementation can only be used by multiple instances in the same process. Its usage is purely to simplify the writing of tests, therefore it also provides some additionnal functionalities like filters.

A filter is called for any message incoming and it will determine if the instance should drop the message.

Documentation Last Review: 06.10.2020

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddressFactory

type AddressFactory struct {
	serde.Factory
}

AddressFactory is a factory to deserialize Minoch addresses.

- implements mino.AddressFactory

func (AddressFactory) FromText

func (f AddressFactory) FromText(text []byte) mino.Address

FromText implements mino.AddressFactory. It returns an instance of an address from a byte slice.

type Envelope

type Envelope struct {
	// contains filtered or unexported fields
}

Envelope is the wrapper to send messages through streams.

type Filter

type Filter func(mino.Request) bool

Filter is a function called for any request to an RPC which will drop it if it returns false.

type Manager

type Manager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Manager manages the communication between the local instances of Mino.

func NewManager

func NewManager() *Manager

NewManager creates a new empty manager.

type Minoch

type Minoch struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Minoch is an implementation of the Mino interface using channels. Each instance must have a unique string assigned to it.

- implements mino.Mino

func MustCreate

func MustCreate(manager *Manager, identifier string) *Minoch

MustCreate creates a new minoch instance and panic if the identifier is refused by the manager.

func NewMinoch

func NewMinoch(manager *Manager, identifier string) (*Minoch, error)

NewMinoch creates a new instance of a local Mino instance.

func (*Minoch) AddFilter

func (m *Minoch) AddFilter(filter Filter)

AddFilter adds the filter to all of the RPCs. This must be called before receiving requests.

func (*Minoch) CreateRPC

func (m *Minoch) CreateRPC(name string, h mino.Handler, f serde.Factory) (mino.RPC, error)

CreateRPC creates an RPC that can send to and receive from the unique path.

func (*Minoch) GetAddress

func (m *Minoch) GetAddress() mino.Address

GetAddress implements mino.Mino. It returns the address that other participants should use to contact this instance.

func (*Minoch) GetAddressFactory

func (m *Minoch) GetAddressFactory() mino.AddressFactory

GetAddressFactory implements mino.Mino. It returns the address factory.

func (*Minoch) WithSegment

func (m *Minoch) WithSegment(path string) mino.Mino

WithSegment returns a new mino instance that will have its URI path extended with the provided segment.

type RPC

type RPC struct {
	// contains filtered or unexported fields
}

RPC implements a remote procedure call that is calling its peers using the channels registered by the manager.

- implements mino.RPC

func (RPC) Call

func (c RPC) Call(ctx context.Context,
	req serde.Message, players mino.Players) (<-chan mino.Response, error)

Call implements mino.RPC. It sends the message to all participants and gathers their replies. The context is ignored in the scope of channel communication as there is no blocking I/O. The response channel will receive n responses for n players and be closed eventually.

Example
package main

import (
	"context"
	"fmt"

	"go.dedis.ch/dela/mino"
	"go.dedis.ch/dela/serde"
)

func main() {
	manager := NewManager()

	minoA := MustCreate(manager, "A").WithSegment("example")
	minoB := MustCreate(manager, "B").WithSegment("example")

	rpcA := mino.MustCreateRPC(minoA, "hello", exampleHandler{}, exampleFactory{})
	mino.MustCreateRPC(minoB, "hello", exampleHandler{}, exampleFactory{})

	roster := mino.NewAddresses(minoA.GetAddress(), minoB.GetAddress())

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	msg := exampleMessage{value: "Hello World!"}

	resps, err := rpcA.Call(ctx, msg, roster)
	if err != nil {
		panic("call failed: " + err.Error())
	}

	for resp := range resps {
		reply, err := resp.GetMessageOrError()
		if err != nil {
			panic("error in response: " + err.Error())
		}

		fmt.Println(reply.(exampleMessage).value)
	}

}

// exampleHandler is an RPC handler example.
//
// - implements mino.Handler
type exampleHandler struct {
	mino.UnsupportedHandler
}

// Process implements mino.Handler. It returns the message received.
func (exampleHandler) Process(req mino.Request) (serde.Message, error) {
	return req.Message, nil
}

// exampleMessage is an example of a message.
//
// - implements serde.Message
type exampleMessage struct {
	value string
}

// Serialize implements serde.Message. It returns the value contained in the
// message.
func (m exampleMessage) Serialize(serde.Context) ([]byte, error) {
	return []byte(m.value), nil
}

// exampleFactory is an example of a factory.
//
// - implements serde.Factory
type exampleFactory struct{}

// Deserialize implements serde.Factory. It returns the message using data as
// the inner value.
func (exampleFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) {
	return exampleMessage{value: string(data)}, nil
}
Output:

Hello World!
Hello World!

func (RPC) Stream

func (c RPC) Stream(ctx context.Context, memship mino.Players) (mino.Sender, mino.Receiver, error)

Stream implements mino.RPC. It simulates the stream by using the orchestrator as the router for all the messages. They are redirected to the channel associated with the address.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL