import "github.com/vardius/message-bus"
Package messagebus provides simple async message publisher
Code:
queueSize := 100 bus := messagebus.New(queueSize) var wg sync.WaitGroup wg.Add(2) bus.Subscribe("topic", func(v bool) { defer wg.Done() fmt.Println("s1", v) }) bus.Subscribe("topic", func(v bool) { defer wg.Done() fmt.Println("s2", v) }) // Publish block only when the buffer of one of the subscribers is full. // change the buffer size altering queueSize when creating new messagebus bus.Publish("topic", true) wg.Wait() // Unordered output: // s1 true // s2 true
Output:
s1 true s2 true
Code:
queueSize := 2 subscribersAmount := 3 ch := make(chan int, queueSize) defer close(ch) bus := messagebus.New(queueSize) for i := 0; i < subscribersAmount; i++ { bus.Subscribe("topic", func(i int, out chan<- int) { out <- i }) } go func() { for n := 0; n < queueSize; n++ { bus.Publish("topic", n, ch) } }() var sum = 0 for sum < (subscribersAmount * queueSize) { select { case <-ch: sum++ } } fmt.Println(sum)
Output:
6
type MessageBus interface { // Publish publishes arguments to the given topic subscribers // Publish block only when the buffer of one of the subscribers is full. Publish(topic string, args ...interface{}) // Close unsubscribe all handlers from given topic Close(topic string) // Subscribe subscribes to the given topic Subscribe(topic string, fn interface{}) error // Unsubscribe unsubscribe handler from the given topic Unsubscribe(topic string, fn interface{}) error }
MessageBus implements publish/subscribe messaging paradigm
func New(handlerQueueSize int) MessageBus
New creates new MessageBus handlerQueueSize sets buffered channel length per subscriber
Package messagebus imports 3 packages (graph) and is imported by 2 packages. Updated 2019-10-21. Refresh now. Tools for package owners.