multisocket

package module
v0.0.0-...-f10b52b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2019 License: MIT Imports: 8 Imported by: 0

README

multisocket

CircleCI codecov Go Report Card Godoc GitHub

multisocket is an network library. Like nanomsg and zeromq, multisocket supports many transports: inproc, ipc, tcp etc. Unlike them, multisocket implements a message based bidirectional tx/rx independent and stateless Socket, Based on it, various protocols(reqrep, pubsub, pipline, bus etc) are built. multisocket provides sematic and easy to use interfaces to help you build complex message systems.

Quick Start

server
Socket is stateless, supports recveiving concurrently.

    sock := multisocket.NewDefault()
    if err := sock.Listen("tcp://127.0.0.1:30001"); err != nil {
        log.WithField("err", err).Panicf("listen")
    }
    
    worker := func(n int) {
        for {
            msg, err := sock.RecvMsg()
            if err != nil {
                log.WithField("err", err).Errorf("recv")
                continue
            }
            s := string(msg.Content)
            content := []byte(fmt.Sprintf("[#%d]Hello, %s", n, s))
            if err = sock.SendTo(msg.Source, content); err != nil {
                log.WithField("err", err).Errorf("send")
            }
        }
    }
    // recving concurrently
    go worker(0)
    go worker(1)

client
tx/rx are independent

    sock := multisocket.NewDefault()
    if err := sock.Dial("tcp://127.0.0.1:30001"); err != nil {
        log.WithField("err", err).Panicf("dial")
    }
    // sending
    go func() {
        var content string
        idx := 0
        for {
            content = fmt.Sprintf("%s#%d", name, idx)
            if err = sock.Send([]byte(content)); err != nil {
                log.WithField("err", err).Errorf("send")
            }
            log.WithField("id", idx).Infof("send")
            time.Sleep(1000 * time.Millisecond)
            idx++
        }
    }()

    // recving
    go func() {
        for {
            if content, err = sock.Recv(); err != nil { 
                log.WithField("err", err).Errorf("recv")
            }
            fmt.Printf("%s\n", string(content))
        }
    }()

Design

There are three layers: Protocol, Socket, Transport.
multisocket

Transport

Transport is responsible for connections between peers, common transports include: inproc, ipc, tcp, websocket etc. It's easy to implement custom transports if needed.

Socket

Socket is based on Transport, it provides bidrectional tx/rx independent and stateless message communication.

Protocol

Protocols are based on Socket, they provide various communication patterns: request/reply, publish/subscribe, push/pull etc. Also, it's easy to implement your custom protocols.

Socket

Socket consists of thred component: Connector, Sender, Receiver.

Connector

It's responsible for dialing/listening, to establish connections to other peers.
connect types:

  1. N
    at most establish N connections.
  2. no limit
    can establish any connections.
Sender

It's responsible for sending messages to connected peers.
send type:

  1. to one
    fairly choose a peer to send.
  2. to all
    send to all peers.
  3. to dest
    send to a specified destination.
  4. no send
    users can choose not to send any messages.
Receiver

It's responsible for recving messages from peers.
recv types:

  1. no recv
    disscard any recved message from any peer. useful for protocol like: push.
  2. all
    recv all messages from all peers. It's the default behaviour.

Protocols

See wiki.

Examples

See examples/

Acknowledgement

This project is inspired by zeromq and nanomsg, especially nanomsg/mangos from which some code references. Thank all them for their efforts.

Documentation

Index

Constants

View Source
const (
	ErrMsgDropped      = errs.Err("message dropped")
	ErrBrokenPath      = errs.Err("bad destination: broken path")
	ErrInvalidSendType = errs.Err("invalid send type")
)

errors

Variables

View Source
var (
	// OptionDomains is option's domain
	OptionDomains = []string{"Socket"}
	// Options for receiver
	Options = socketOptions{
		NoRecv:          options.NewBoolOption(false),
		RecvQueueSize:   options.NewUint16Option(64),
		NoSend:          options.NewBoolOption(false),
		SendQueueSize:   options.NewUint16Option(64),
		SendTTL:         options.NewUint8Option(message.DefaultMsgTTL),
		SendBestEffort:  options.NewBoolOption(false),
		SendStopTimeout: options.NewTimeDurationOption(5 * time.Second),
	}
)

Functions

func NewPair

func NewPair() (Socket, Socket)

NewPair create a pair of Sockets.

func StartSwitch

func StartSwitch(backSock, frontSock Socket, mid SwitchMiddlewareFunc)

StartSwitch start switch messages between back and front sockets

Types

type ConnectorAction

type ConnectorAction = connector.Action

ConnectorAction is connector's actions

type Socket

type Socket interface {
	options.Options

	ConnectorAction
	Connector() connector.Connector

	RecvMsg() (*message.Message, error)
	SendMsg(msg *message.Message) error                // for forward message
	Send(content []byte) error                         // for initiative send one
	SendAll(content []byte) error                      // for initiative send all
	SendTo(dest message.MsgPath, content []byte) error // for reply send

	Close() error
}

Socket is a network peer

func New

func New(ovs options.OptionValues) Socket

New creates a Socket

func NewDefault

func NewDefault() Socket

NewDefault creates a default Socket

func NewNoRecv

func NewNoRecv(ovs options.OptionValues) Socket

NewNoRecv create a no recv Socket

func NewNoSend

func NewNoSend(ovs options.OptionValues) Socket

NewNoSend create a no send Socket

type SwitchMiddlewareFunc

type SwitchMiddlewareFunc func(msg *message.Message) *message.Message

SwitchMiddlewareFunc check or modidy switch messages

Directories

Path Synopsis
all
Package all is used to register all transports.
Package all is used to register all transports.
ipc
Package ipc implements the IPC transport on top of UNIX domain sockets.
Package ipc implements the IPC transport on top of UNIX domain sockets.
tcp
ws

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL