conveyor

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2019 License: MIT Imports: 0 Imported by: 10

README

Conveyor, an idiomatic and asynchronous Go message broker abstraction

Build Status docs

This Go module provides an abstraction for message queues, brokers, buses and the sort. It is idiomatic and asynchronous because it uses Go channels everywhere

You can find implementations for Kafka, MQTT, NATS, NSQ, RabbitMQ, Redis, AWS SQS, STAN and STOMP at github.com/leolara/conveyor-impl

This module includes an in-memory message broker implementation that is useful for testing

How does it work

With conveyor you send and receive messages using Go channels.

Sending messages

To write to a queue called exampleTopic you would do:

// This creates a channel to publish messages
pubChan := make(chan conveyor.SendEnvelop)

// Links the publication channel to the queue or topic, from now on, what we write on pubChan will get published
// into "exampleTopic"
b.Publish("exampleTopic", pubChan)

// Creates a channel to receive publication errors, we can use one error channel for each publication
// or one for all publications
pubChanErr := make(chan error)

// Sends a message to "exampleTopic" with content []byte{24}, and with errors going pubChanErr
pubChan <- conveyor.NewSendEnvelop([]byte{24}, pubChanErr)

// We MUST read from pubChanErr, as it is idiomatic in Go a nil error means success.
// We could use the same error channel for each publication or use a different time each time.
err := <-pubChanErr
if err != nil {
    fmt.Errorf("got publication error: %s", err)
}

// here repeat as many times as necessary writing on pubChan and reading from pubChanErr

// Closing pubChan will finish the go routine in the broker that handles this publication, releasing resources
close(pubChan)

Receiving messages

In the other side is as idiomatic, asynchronous and channel based as when sending messages.

To receive messages from exampleTopic:


// The object sub is a subscription to "exampleTopic", as you can see the subscription is
// async as it returns a channel that will eventually return the subscription object
sub := <-b.Subscribe("exampleTopic")

// We should check if there was an error subscribing
if sub.Error() != nil {
    panic(sub.Error())
}

// sub.Receive() give us a channel from which receive messages
envelope := <-sub.Receive()

// envelope.Body() returns the content of the message
if len(envelope.Body()) != 1 && envelope.Body()[0] != 24 {
    t.Error("received wrong data")
}

// We should ack the message when we are done with it
envelope.Ack() <- nil

// after this, we can repeat reading from sub.Receive() and writing to envelope.Ack()

// Once we are done, we can unsubscribe to stop receiving messages
sub.Unsubscribe()

Implementation specific details

Both broker.Subscribe and broker.Publish allow optional parameters that are dependent on the broker implementation. Writing nil into envelope.Ack() means a successful processing of the message, sending other values are implementation dependent.

Why is this different from matryer/vice

The go module vice goal is to being able to use Go channels over message brokers transparently, so the code reading and writing does not have to know that there are actually a distributed messages underneath. Conveyor goal is to being able to use message and event brokers, buses and the sort using an idiomatic and asynchonous paradigm, which in Go means using channels.

In summary they are the sides of the same coin:

Vice advantages: works with any code that uses chan []byte

Conveyor advantages: the user controls acknowledgments and can respond to publication errors, also access broker implementation details

Vice disadvantages: user cannot control acknowledgments and cannot respond to publication errors

Conveyor disadvantages: code that uses it needs to be aware of Conveyor interfaces

Contributing

If you have some idea for some changes, please create an issue explaining your idea before sending a pull request. We are happy to have your help.

Documentation

Overview

Package conveyor provides an abstraction for message queues, brokers, buses and the sort. It is idiomatic and asynchronous because it uses Go channels everywhere

In another repo there are implementations for redis, rabbitmq, ...

This repository includes an in-memory message broker implementation that is useful for testing

Please, check README.md for an overview https://github.com/leolara/conveyor/README.md

Example
package main

import (
	"github.com/leolara/conveyor"
	"github.com/leolara/conveyor/memory"
	"sync"
	"time"
)

func main() {
	var wd sync.WaitGroup

	// we use a in-memory broker for testing and examples, you can find many implementations for different brokers at
	// https://github.com/leolara/conveyor-impl
	b := memory.NewBroker()

	wd.Add(2)
	go Producer(b, wd)
	go Consumer(b, wd)

	wd.Wait()
}

func Producer(b conveyor.Broker, wd sync.WaitGroup) {
	defer wd.Done()

	// This creates a channel to publish messages
	pubChan := make(chan conveyor.SendEnvelop)

	// Links the publication channel to the queue or topic, from now on, what we write on pubChan will get published
	// into "exampleTopic"
	b.Publish("exampleTopic", pubChan)

	// Creates a channel to receive publication errors, we can use one error channel for each publication
	// or one for all publications
	pubChanErr := make(chan error)

	// Sends a message to "exampleTopic" with content []byte{24}, and with errors going pubChanErr
	pubChan <- conveyor.NewSendEnvelop([]byte{24}, pubChanErr)

	// We MUST read from pubChanErr, as it is idiomatic in Go a nil error means success.
	// We could use the same error channel for each publication or use a different time each time.
	select {
	case err := <-pubChanErr:
		if err != nil {
			panic(err)
		}
	case <-time.After(10 * time.Millisecond):
		panic("Did not receive empty error")
	}

	// Closing pubChan will finish the go routine in the broker that handles this publication, releasing resources
	close(pubChan)
}

func Consumer(b conveyor.Broker, wd sync.WaitGroup) {
	defer wd.Done()

	// The object sub is a subscription to "exampleTopic", as you can see the subscription is
	// async as it returns a channel that will eventually return the subscription object
	sub := <-b.Subscribe("exampleTopic")

	// We should check if there was an error subscribing
	if sub.Error() != nil {
		panic(sub.Error())
	}

	// sub.Receive() give us a channel from which receive messages
	select {
	case envelope := <-sub.Receive():
		// envelope.Body() returns the content of the message
		if len(envelope.Body()) != 1 && envelope.Body()[0] != 24 {
			panic("received wrong data")
		}
		// We should ack the message when we are done with it
		envelope.Ack() <- nil
	case <-time.After(10 * time.Millisecond):
		panic("Did not receive message")
	}

	// after this, we can repeat reading from sub.Receive() and writing to envelope.Ack()

	// We can unsubscribe to stop receiving messages
	sub.Unsubscribe()

	select {
	case _, ok := <-sub.Receive():
		if ok {
			panic("shouldn't receive anything")
		}
	case <-time.After(10 * time.Millisecond):
		// OK, it should not receive message
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Subscribe to a topic/queue
	//  + target is the name of what you are subscribing to
	//  + options are implementation dependant
	// returns a Subscription object asynchronously
	Subscribe(target string, options ...interface{}) <-chan Subscription
	// Publish to a topic/queue
	//  + target is the name of what you are publishing to
	//  + msgs is a channel on which you will send SendEnvelop objects
	//  + options are implementation dependant
	// After calling this method you can publish as many messages as necessary using msgs channel
	Publish(target string, msgs <-chan SendEnvelop, options ...interface{})
}

Broker interface for message brokers

type Message

type Message interface {
	Body() []byte
}

Message contains a body, both ReceiveEnvelope and SendEnvelop extend this

type ReceiveEnvelope

type ReceiveEnvelope interface {
	Message
	// Ack MUST return a channel to which the message receiver must write once to ACK, writing nil means a to ACK in
	// all implementations, other values are implementation dependent
	Ack() chan<- interface{}
}

ReceiveEnvelope encapsulates a message that is being received

func NewReceiveEnvelop

func NewReceiveEnvelop(body []byte, ack chan<- interface{}) ReceiveEnvelope

NewReceiveEnvelop creates an immutable ReceiveEnvelope. Useful when writing a broker implementation and need to send messages to subscribers, you do not have to create your own implementation of ReceiveEnvelope. It is immutable but contains *references* to body and ack, so you should be aware of that

func NewReceiveEnvelopCopy

func NewReceiveEnvelopCopy(body []byte, ack chan<- interface{}) ReceiveEnvelope

NewReceiveEnvelopCopy does like NewReceiveEnvelop but using a copy of body instead of keeping the reference It is immutable but contains a *reference* to ack, so you should be aware of that

type SendEnvelop

type SendEnvelop interface {
	Message
	// Error MUST return a channel, the broker will write nil on success or an error if failure
	Error() chan<- error
}

SendEnvelop encapsulates a message that is being received

func NewSendEnvelop

func NewSendEnvelop(body []byte, err chan error) SendEnvelop

NewSendEnvelop creates an immutable SendEnvelope. Useful when you are sending messages, you do not have to create your own implementation of SendEnvelope. It is immutable but contains *references* to body and err, so you should be aware of that.

type Subscription

type Subscription interface {
	// Receive returns a channel to read and receive messages. The returned channel is always the same, so it is not necessary
	// to call this method for every read.
	Receive() <-chan ReceiveEnvelope
	// Unsubscribe lives up to its name
	Unsubscribe()
	Error() error
}

Subscription encapsulates a subscription to a topic/queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL