messagebus

package module
v1.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2020 License: MIT Imports: 3 Imported by: 35

README ยถ

๐ŸšŒ message-bus

Build Status Go Report Card codecov FOSSA Status license

logo

Go simple async message bus.

๐Ÿ“– ABOUT

Contributors:

Want to contribute ? Feel free to send pull requests!

Have problems, bugs, feature ideas? We are using the github issue tracker to manage them.

๐Ÿ“š Documentation

For documentation (including examples), visit rafallorenz.com/message-bus

For GoDoc reference, visit pkg.go.dev

๐Ÿš HOW TO USE

๐Ÿš… Benchmark

โžœ  message-bus git:(master) โœ— go test -bench=. -cpu=4 -benchmem
goos: darwin
goarch: amd64
pkg: github.com/vardius/message-bus
BenchmarkPublish-4                   	 4430224	       250 ns/op	       0 B/op	       0 allocs/op
BenchmarkSubscribe-4                 	  598240	      2037 ns/op	     735 B/op	       5 allocs/op

๐Ÿ‘‰ Click here to see all benchmark results.

Features

๐Ÿš HOW TO USE

๐Ÿ“œ License

This package is released under the MIT license. See the complete license in the package:

FOSSA Status

Documentation ยถ

Overview ยถ

Package messagebus provides simple async message publisher

Example ยถ
package main

import (
	"fmt"
	"sync"

	messagebus "github.com/vardius/message-bus"
)

func main() {
	queueSize := 100
	bus := messagebus.New(queueSize)

	var wg sync.WaitGroup
	wg.Add(2)

	_ = bus.Subscribe("topic", func(v bool) {
		defer wg.Done()
		fmt.Println("s1", v)
	})

	_ = bus.Subscribe("topic", func(v bool) {
		defer wg.Done()
		fmt.Println("s2", v)
	})

	// Publish block only when the buffer of one of the subscribers is full.
	// change the buffer size altering queueSize when creating new messagebus
	bus.Publish("topic", true)
	wg.Wait()

}
Output:

s1 true
s2 true
Example (Second) ยถ
package main

import (
	"fmt"

	messagebus "github.com/vardius/message-bus"
)

func main() {
	queueSize := 2
	subscribersAmount := 3

	ch := make(chan int, queueSize)
	defer close(ch)

	bus := messagebus.New(queueSize)

	for i := 0; i < subscribersAmount; i++ {
		_ = bus.Subscribe("topic", func(i int, out chan<- int) { out <- i })
	}

	go func() {
		for n := 0; n < queueSize; n++ {
			bus.Publish("topic", n, ch)
		}
	}()

	var sum = 0
	for sum < (subscribersAmount * queueSize) {
		select {
		case <-ch:
			sum++
		}
	}

	fmt.Println(sum)
}
Output:

6

Index ยถ

Examples ยถ

Constants ยถ

This section is empty.

Variables ยถ

This section is empty.

Functions ยถ

This section is empty.

Types ยถ

type MessageBus ยถ

type MessageBus interface {
	// Publish publishes arguments to the given topic subscribers
	// Publish block only when the buffer of one of the subscribers is full.
	Publish(topic string, args ...interface{})
	// Close unsubscribe all handlers from given topic
	Close(topic string)
	// Subscribe subscribes to the given topic
	Subscribe(topic string, fn interface{}) error
	// Unsubscribe unsubscribe handler from the given topic
	Unsubscribe(topic string, fn interface{}) error
}

MessageBus implements publish/subscribe messaging paradigm

func New ยถ

func New(handlerQueueSize int) MessageBus

New creates new MessageBus handlerQueueSize sets buffered channel length per subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL