hub

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2020 License: Apache-2.0 Imports: 6 Imported by: 22

README

Hub

📨 A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges.

Release Software License Actions Status Coverage Status Go Doc Go Report Card Say Thanks!


Table of Contents

Install

To install this library you can go get it but I encourage you to always vendor your dependencies or use one of the version tags of this project.

go get -u github.com/leandro-lugaresi/hub
dep ensure --add github.com/leandro-lugaresi/hub

Usage

Subscribers

Hub provides subscribers as buffered (cap > 0) and unbuffered (cap = 0) channels but we have two different types of subscribers:

  • Subscriber this is the default subscriber and it's a blocking subscriber so if the channel is full and you try to send another message the send operation will block until the subscriber consumes some message.
  • NonBlockingSubscriber this subscriber will never block on the publish side but if the capacity of the channel is reached the publish operation will be lost and an alert will be trigged. This should be used only if loose data is acceptable. ie: metrics, logs
Topics

This library uses the same concept of topic exchanges on rabbiMQ, so the message name is used to find all the subscribers that match the topic, like a route. The topic must be a list of words delimited by dots (.) however, there is one important special case for binding keys: * (star) can substitute for exactly one word.

Examples & Demos

package main

import (
	"fmt"
	"sync"

	"github.com/leandro-lugaresi/hub"
)
func main() {
	h := hub.New()
	var wg sync.WaitGroup
	// the cap param is used to create one buffered channel with cap = 10
	// If you wan an unbuferred channel use the 0 cap
	sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")
	wg.Add(1)
	go func(s hub.Subscription) {
		for msg := range s.Receiver {
			fmt.Printf("receive msg with topic %s and id %d\n", msg.Name, msg.Fields["id"])
		}
		wg.Done()
	}(sub)

	h.Publish(hub.Message{
		Name:   "account.login.failed",
		Fields: hub.Fields{"id": 123},
	})
	h.Publish(hub.Message{
		Name:   "account.changepassword.failed",
		Fields: hub.Fields{"id": 456},
	})
	h.Publish(hub.Message{
		Name:   "account.login.success",
		Fields: hub.Fields{"id": 123},
	})
	// message not routed to this subscriber
	h.Publish(hub.Message{
		Name:   "account.foo.failed",
		Fields: hub.Fields{"id": 789},
	})

	// close all the subscribers
	h.Close()
	// wait until finish all the messages on buffer
	wg.Wait()

	// Output:
	// receive msg with topic account.login.failed and id 123
	// receive msg with topic account.changepassword.failed and id 456
	// receive msg with topic account.login.success and id 123
}

See more here!

Benchmarks

To run the benchmarks you can execute:

make bench

Currently, I only have the benchmarks of the CSTrie used internally. I will provide more benchmarks.

Throughput

The project have one test for throughput, just execute:

make throughput

In a intel(R) core(TM) i5-4460 CPU @ 3.20GHz x4 we got this results:

go test -v -timeout 60s github.com/leandro-lugaresi/hub -run ^TestThroughput -args -throughput
=== RUN   TestThroughput
1317530.091292 msg/sec
--- PASS: TestThroughput (3.04s)
PASS
ok      github.com/leandro-lugaresi/hub 3.192s

CSTrie

This project uses internally an awesome Concurrent Subscription Trie done by @tylertreat. If you want to learn more about see this blog post and the code is here


This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. We appreciate your contribution. Please refer to our contributing guidelines for further information

Documentation

Index

Examples

Constants

View Source
const AlertTopic = "hub.subscription.messageslost"

AlertTopic is used to notify when a nonblocking subscriber loose one message You can subscribe on this topic and log or send metrics.

Variables

This section is empty.

Functions

This section is empty.

Types

type Fields

type Fields map[string]interface{}

Fields is a [key]value storage for Messages values.

func (Fields) String added in v1.1.1

func (f Fields) String() string

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is a component that provides publish and subscribe capabilities for messages. Every message has a Name used to route them to subscribers and this can be used like RabbitMQ topics exchanges. Where every word is separated by dots `.` and you can use `*` as a wildcard.

Example
package main

import (
	"fmt"
	"sync"

	"github.com/leandro-lugaresi/hub"
)

func main() {
	h := hub.New()
	var wg sync.WaitGroup
	// the cap param is used to create one buffered channel with cap = 10
	// If you wan an unbuferred channel use the 0 cap
	sub := h.Subscribe(10, "account.login.*", "account.changepassword.*")

	wg.Add(1)

	go func(s hub.Subscription) {
		for msg := range s.Receiver {
			fmt.Printf("receive msg with topic %s and id %d\n", msg.Name, msg.Fields["id"])
		}

		wg.Done()
	}(sub)

	h.Publish(hub.Message{
		Name:   "account.login.failed",
		Fields: hub.Fields{"id": 123},
	})
	h.Publish(hub.Message{
		Name:   "account.changepassword.failed",
		Fields: hub.Fields{"id": 456},
	})
	h.Publish(hub.Message{
		Name:   "account.login.success",
		Fields: hub.Fields{"id": 123},
	})
	// message not routed to this subscriber
	h.Publish(hub.Message{
		Name:   "account.foo.failed",
		Fields: hub.Fields{"id": 789},
	})

	// close all the subscribers
	h.Close()
	// wait until finish all the messages on buffer
	wg.Wait()

}
Output:

receive msg with topic account.login.failed and id 123
receive msg with topic account.changepassword.failed and id 456
receive msg with topic account.login.success and id 123

func New

func New() *Hub

New create and return a new empty hub.

func (*Hub) Close

func (h *Hub) Close()

Close will unsubscribe all the subscriptions and close them all.

func (*Hub) NonBlockingSubscribe

func (h *Hub) NonBlockingSubscribe(cap int, topics ...string) Subscription

NonBlockingSubscribe create a nonblocking subscription to receive events for a given topic. This subscriber will loose messages if the buffer reaches the max capability.

func (*Hub) Publish

func (h *Hub) Publish(m Message)

Publish will send an event to all the subscribers matching the event name.

func (*Hub) Subscribe

func (h *Hub) Subscribe(cap int, topics ...string) Subscription

Subscribe create a blocking subscription to receive events for a given topic. The cap param is used inside the subscriber and in this case used to create a channel. cap(1) = unbuffered channel.

func (*Hub) Unsubscribe

func (h *Hub) Unsubscribe(sub Subscription)

Unsubscribe remove and close the Subscription.

func (*Hub) With added in v1.1.0

func (h *Hub) With(f Fields) *Hub

With creates a child Hub with the fields added to it. When someone call Publish, this Fields will be added automatically into the message.

type Message

type Message struct {
	Name   string
	Body   []byte
	Fields Fields
}

Message represent some message/event passed into the hub It also contain some helper functions to convert the fields to primitive types.

func (*Message) Topic

func (m *Message) Topic() string

Topic return the message topic used when the message was sended.

type Subscription

type Subscription struct {
	Topics   []string
	Receiver <-chan Message
	// contains filtered or unexported fields
}

Subscription represents a topic subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL