zmq4chan

package module
v0.0.0-...-ca16ba2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2015 License: BSD-2-Clause Imports: 3 Imported by: 0

Documentation

Overview

Package zmq4chan extends https://github.com/pebbe/zmq4 with channel I/O.

Since ZeroMQ sockets are not thread-safe, they cannot be used directly for sending and receiving in different goroutines (e.g. when using a socket type with unrestricted send/receive pattern). This package interleaves the ZeroMQ calls safely, while providing a simple API. The implementation uses epoll, which makes it Linux-specific.

Multipart messaging can be achieved by combining the basic IO.Add interface with the SendMessageBytes and RecvMessageBytes adapters.

Example:

io := zmq4chan.NewIO()

s, err := zmq4.NewSocket(zmq4.REP)
defer io.Remove(s)

err = s.Bind("tcp://*:12765")

recv := make(chan [][]byte)
send := make(chan [][]byte)
defer close(send)

err = io.Add(s, zmq4chan.SendMessageBytes(send), zmq4chan.ReceiveMessageBytes(recv))

for msg := range recv {
    send <- msg
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RecvMessageBytes

func RecvMessageBytes(recv chan<- [][]byte) chan<- Data

RecvMessageBytes converts individual parts into complete messages. The caller should arrange for the returned channel to be closed (e.g. by delegating it to an IO instance).

func SendMessageBytes

func SendMessageBytes(send <-chan [][]byte) <-chan Data

SendMessageBytes converts messages into individual parts. The channel passed as the parameter should be closed by the caller.

Types

type Data

type Data struct {
	Bytes []byte
	More  bool // true if more parts will be sent/received
}

Data holds a message part which will be sent or has been received.

func (Data) String

func (r Data) String() string

String converts r.Bytes to a string.

type IO

type IO struct {
	// contains filtered or unexported fields
}

IO implements channel-based messaging.

func NewIO

func NewIO() (io *IO)

NewIO allocates resources which will not be released before program termination. You shouldn't need many instances. Panics on error.

func (*IO) Add

func (io *IO) Add(s *zmq.Socket, send <-chan Data, recv chan<- Data) (err error)

Add registers a socket for sending and/or receiving. The caller can't access the socket directly after this. The send channel (if any) should be closed by the caller.

func (*IO) Remove

func (io *IO) Remove(s *zmq.Socket) (err error)

Remove closes a socket. If it has been registered, it will be removed. The recv channel (if any) will be closed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL