eventbus

package module
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2022 License: MIT Imports: 2 Imported by: 2

README

Go EventBus

Go Reference CodeFactor pre-commit.ci status Go Unit Tests CodeQL codecov

Introduction

This package provides a simple yet powerful event bus.

  • Simple Pub/Sub
  • Async Publishing of events
  • Wildcard Support

Documentation

https://pkg.go.dev/github.com/dtomasi/go-event-bus/v3

Installation

go get github.com/dtomasi/go-event-bus/v3
package main

import "github.com/dtomasi/go-event-bus/v3"

Usage

Simple

Subscribe and Publish events using a simple callback function

package main

import "github.com/dtomasi/go-event-bus/v3"

func main()  {

    // Create a new instance
    eb := eventbus.NewEventBus()

    // Subscribe to "foo:baz" - or use a wildcard like "foo:*"
    eb.SubscribeCallback("foo:baz", func(topic string, data interface{}) {
        println(topic)
        println(data)
    })

    // Publish data to topic
    eb.Publish("foo:baz", "bar")
}
Synchronous using Channels

Subscribe using a EventChannel

package main

import "github.com/dtomasi/go-event-bus/v3"

func main()  {

    // Create a new instance
    eb := eventbus.NewEventBus()

    // Subscribe to "foo:baz" - or use a wildcard like "foo:*"
	eventChannel := eb.Subscribe("foo:baz")

	// Subscribe with existing channel use
	// eb.SubscribeChannel("foo:*", eventChannel)

    // Wait for the incoming event on the channel
    go func() {
        evt :=<-eventChannel
        println(evt.Topic)
        println(evt.Data)

        // Tell eventbus that you are done
        // This is only needed for synchronous publishing
        evt.Done()
    }()

    // Publish data to topic
    eb.Publish("foo:baz", "bar")
}
Async

Publish asynchronously

package main

import "github.com/dtomasi/go-event-bus/v3"

func main()  {

    // Create a new instance
    eb := eventbus.NewEventBus()

	// Subscribe to "foo:baz" - or use a wildcard like "foo:*"
	eventChannel := eb.Subscribe("foo:baz")

	// Subscribe with existing channel use
	// eb.SubscribeChannel("foo:*", eventChannel)

    // Wait for the incoming event on the channel
    go func() {
        evt :=<-eventChannel
        println(evt.Topic)
        println(evt.Data)
    }()

    // Publish data to topic asynchronously
    eb.PublishAsync("foo:baz", "bar")
}

Documentation

Overview

Package eventbus provides a simple event system that can be used synchronous and asynchronous using channels and callbacks. It also supports subscribing via wildcards to listen to a group/unit of event channels

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CallbackFunc

type CallbackFunc func(topic string, data interface{})

CallbackFunc Defines a CallbackFunc.

type Event

type Event struct {
	Data  interface{}
	Topic string
	// contains filtered or unexported fields
}

Event holds topic name and data.

func (*Event) Done

func (e *Event) Done()

Done calls Done on sync.WaitGroup if set.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus stores the information about subscribers interested for a particular topic.

func NewEventBus

func NewEventBus() *EventBus

NewEventBus returns a new EventBus instance.

func (*EventBus) HasSubscribers

func (eb *EventBus) HasSubscribers(topic string) bool

HasSubscribers Check if a topic has subscribers.

func (*EventBus) Publish

func (eb *EventBus) Publish(topic string, data interface{}) interface{}

Publish data to a topic and wait for all subscribers to finish This function creates a waitGroup internally. All subscribers must call Done() function on Event.

func (*EventBus) PublishAsync

func (eb *EventBus) PublishAsync(topic string, data interface{})

PublishAsync data to a topic asynchronously This function returns a bool channel which indicates that all subscribers where called.

func (*EventBus) PublishAsyncOnce

func (eb *EventBus) PublishAsyncOnce(topic string, data interface{})

PublishAsyncOnce same as PublishAsync but makes sure that topic is only published once.

func (*EventBus) PublishOnce

func (eb *EventBus) PublishOnce(topic string, data interface{}) interface{}

PublishOnce same as Publish but makes sure only published once on topic.

func (*EventBus) Stats

func (eb *EventBus) Stats() *Stats

Stats returns the stats map.

func (*EventBus) Subscribe

func (eb *EventBus) Subscribe(topic string) (EventChannel, *Subscription)

Subscribe to a topic passing a EventChannel.

func (*EventBus) SubscribeCallback

func (eb *EventBus) SubscribeCallback(topic string, callable CallbackFunc) *Subscription

SubscribeCallback provides a simple wrapper that allows to directly register CallbackFunc instead of channels.

func (*EventBus) SubscribeChannel

func (eb *EventBus) SubscribeChannel(topic string, ch EventChannel) *Subscription

SubscribeChannel subscribes to a given Channel.

type EventChannel

type EventChannel chan Event

EventChannel is a channel which can accept an Event.

func NewEventChannel

func NewEventChannel() EventChannel

NewEventChannel Creates a new EventChannel.

type SafeCounter

type SafeCounter struct {
	// contains filtered or unexported fields
}

SafeCounter is a concurrency safe counter.

func NewSafeCounter

func NewSafeCounter() *SafeCounter

NewSafeCounter creates a new counter.

func (*SafeCounter) Dec

func (c *SafeCounter) Dec()

Dec decrements the counter by 1.

func (*SafeCounter) DecBy

func (c *SafeCounter) DecBy(dec uint)

DecBy decrements the counter by given delta.

func (*SafeCounter) Inc

func (c *SafeCounter) Inc()

Inc increments the counter by 1.

func (*SafeCounter) IncBy

func (c *SafeCounter) IncBy(add uint)

IncBy increments the counter by given delta.

func (*SafeCounter) Value

func (c *SafeCounter) Value() int

Value returns the current value.

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

func (*Stats) GetPublishedCountByTopic

func (s *Stats) GetPublishedCountByTopic(topicName string) int

func (*Stats) GetSubscriberCountByTopic

func (s *Stats) GetSubscriberCountByTopic(topicName string) int

func (*Stats) GetTopicStats

func (s *Stats) GetTopicStats() []*TopicStats

func (*Stats) GetTopicStatsByName

func (s *Stats) GetTopicStatsByName(topicName string) *TopicStats

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription() *Subscription

func (*Subscription) Close

func (it *Subscription) Close()

func (*Subscription) OnClose

func (it *Subscription) OnClose(fn func())

type TopicStats

type TopicStats struct {
	Name            string
	PublishedCount  *SafeCounter
	SubscriberCount *SafeCounter
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL