Documentation ¶
Overview ¶
Package conveyor provides an abstraction for message queues, brokers, buses and the sort. It is idiomatic and asynchronous because it uses Go channels everywhere
In another repo there are implementations for redis, rabbitmq, ...
This repository includes an in-memory message broker implementation that is useful for testing ¶
Please, check README.md for an overview https://github.com/leolara/conveyor/README.md
Example ¶
package main import ( "github.com/leolara/conveyor" "github.com/leolara/conveyor/memory" "sync" "time" ) func main() { var wd sync.WaitGroup // we use a in-memory broker for testing and examples, you can find many implementations for different brokers at // https://github.com/leolara/conveyor-impl b := memory.NewBroker() wd.Add(2) go Producer(b, wd) go Consumer(b, wd) wd.Wait() } func Producer(b conveyor.Broker, wd sync.WaitGroup) { defer wd.Done() // This creates a channel to publish messages pubChan := make(chan conveyor.SendEnvelop) // Links the publication channel to the queue or topic, from now on, what we write on pubChan will get published // into "exampleTopic" b.Publish("exampleTopic", pubChan) // Creates a channel to receive publication errors, we can use one error channel for each publication // or one for all publications pubChanErr := make(chan error) // Sends a message to "exampleTopic" with content []byte{24}, and with errors going pubChanErr pubChan <- conveyor.NewSendEnvelop([]byte{24}, pubChanErr) // We MUST read from pubChanErr, as it is idiomatic in Go a nil error means success. // We could use the same error channel for each publication or use a different time each time. select { case err := <-pubChanErr: if err != nil { panic(err) } case <-time.After(10 * time.Millisecond): panic("Did not receive empty error") } // Closing pubChan will finish the go routine in the broker that handles this publication, releasing resources close(pubChan) } func Consumer(b conveyor.Broker, wd sync.WaitGroup) { defer wd.Done() // The object sub is a subscription to "exampleTopic", as you can see the subscription is // async as it returns a channel that will eventually return the subscription object sub := <-b.Subscribe("exampleTopic") // We should check if there was an error subscribing if sub.Error() != nil { panic(sub.Error()) } // sub.Receive() give us a channel from which receive messages select { case envelope := <-sub.Receive(): // envelope.Body() returns the content of the message if len(envelope.Body()) != 1 && envelope.Body()[0] != 24 { panic("received wrong data") } // We should ack the message when we are done with it envelope.Ack() <- nil case <-time.After(10 * time.Millisecond): panic("Did not receive message") } // after this, we can repeat reading from sub.Receive() and writing to envelope.Ack() // We can unsubscribe to stop receiving messages sub.Unsubscribe() select { case _, ok := <-sub.Receive(): if ok { panic("shouldn't receive anything") } case <-time.After(10 * time.Millisecond): // OK, it should not receive message } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { // Subscribe to a topic/queue // + target is the name of what you are subscribing to // + options are implementation dependant // returns a Subscription object asynchronously Subscribe(target string, options ...interface{}) <-chan Subscription // Publish to a topic/queue // + target is the name of what you are publishing to // + msgs is a channel on which you will send SendEnvelop objects // + options are implementation dependant // After calling this method you can publish as many messages as necessary using msgs channel Publish(target string, msgs <-chan SendEnvelop, options ...interface{}) }
Broker interface for message brokers
type Message ¶
type Message interface {
Body() []byte
}
Message contains a body, both ReceiveEnvelope and SendEnvelop extend this
type ReceiveEnvelope ¶
type ReceiveEnvelope interface { Message // Ack MUST return a channel to which the message receiver must write once to ACK, writing nil means a to ACK in // all implementations, other values are implementation dependent Ack() chan<- interface{} }
ReceiveEnvelope encapsulates a message that is being received
func NewReceiveEnvelop ¶
func NewReceiveEnvelop(body []byte, ack chan<- interface{}) ReceiveEnvelope
NewReceiveEnvelop creates an immutable ReceiveEnvelope. Useful when writing a broker implementation and need to send messages to subscribers, you do not have to create your own implementation of ReceiveEnvelope. It is immutable but contains *references* to body and ack, so you should be aware of that
func NewReceiveEnvelopCopy ¶
func NewReceiveEnvelopCopy(body []byte, ack chan<- interface{}) ReceiveEnvelope
NewReceiveEnvelopCopy does like NewReceiveEnvelop but using a copy of body instead of keeping the reference It is immutable but contains a *reference* to ack, so you should be aware of that
type SendEnvelop ¶
type SendEnvelop interface { Message // Error MUST return a channel, the broker will write nil on success or an error if failure Error() chan<- error }
SendEnvelop encapsulates a message that is being received
func NewSendEnvelop ¶
func NewSendEnvelop(body []byte, err chan error) SendEnvelop
NewSendEnvelop creates an immutable SendEnvelope. Useful when you are sending messages, you do not have to create your own implementation of SendEnvelope. It is immutable but contains *references* to body and err, so you should be aware of that.
type Subscription ¶
type Subscription interface { // Receive returns a channel to read and receive messages. The returned channel is always the same, so it is not necessary // to call this method for every read. Receive() <-chan ReceiveEnvelope // Unsubscribe lives up to its name Unsubscribe() Error() error }
Subscription encapsulates a subscription to a topic/queue