EventBus

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2019 License: MIT Imports: 7 Imported by: 0

README

EventBus

Version GoDoc Coverage Status Build Status

This is a modified version of asaskevich/EventBus. The primary differences are:

  1. Removed all references to fmt, replacing with log or errors when relevant. This reduces build output by ~7MB.
  2. Converted to a go module.
  3. Travic-CI updates.

Package EventBus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/coreybutler/EventBus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/asaskevich/EventBus"

If you unhappy to use long EventBus, you can do something like this:

import (
	evbus "github.com/asaskevich/EventBus"
)
Example
func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := EventBus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}
Implemented methods
  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()
New()

New returns new EventBus with empty handlers.

bus := EventBus.New();
Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
Unsubscribe(topic string, fn interface{}) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);
HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asynchronous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // wait for all async callbacks to complete

fmt.Println("do some stuff after waiting for result")

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}
Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

Documentation

Index

Constants

View Source
const (
	// PublishService - Client service method
	PublishService = "ClientService.PushEvent"
)
View Source
const (
	// RegisterService - Server subscribe service method
	RegisterService = "ServerService.Register"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus interface {
	BusController
	BusSubscriber
	BusPublisher
}

Bus englobes global (subscribe, publish, control) bus behavior

func New

func New() Bus

New returns new EventBus with empty handlers.

type BusController

type BusController interface {
	HasCallback(topic string) bool
	WaitAsync()
}

BusController defines bus control behavior (checking handler's presence, synchronization)

type BusPublisher

type BusPublisher interface {
	Publish(topic string, args ...interface{})
}

BusPublisher defines publishing-related bus behavior

type BusSubscriber

type BusSubscriber interface {
	Subscribe(topic string, fn interface{}) error
	SubscribeAsync(topic string, fn interface{}, transactional bool) error
	SubscribeOnce(topic string, fn interface{}) error
	SubscribeOnceAsync(topic string, fn interface{}) error
	Unsubscribe(topic string, handler interface{}) error
}

BusSubscriber defines subscription-related bus behavior

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client - object capable of subscribing to a remote event bus

func NewClient

func NewClient(address, path string, eventBus Bus) *Client

NewClient - create a client object with the address and server path

func (*Client) EventBus

func (client *Client) EventBus() Bus

EventBus - returns the underlying event bus

func (*Client) Start

func (client *Client) Start() error

Start - starts the client service to listen to remote events

func (*Client) Stop

func (client *Client) Stop()

Stop - signal for the service to stop serving

func (*Client) Subscribe

func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string)

Subscribe subscribes to a topic in a remote event bus

func (*Client) SubscribeOnce

func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string)

SubscribeOnce subscribes once to a topic in a remote event bus

type ClientArg

type ClientArg struct {
	Args  []interface{}
	Topic string
}

ClientArg - object containing event for client to publish locally

type ClientService

type ClientService struct {
	// contains filtered or unexported fields
}

ClientService - service object listening to events published in a remote event bus

func (*ClientService) PushEvent

func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error

PushEvent - exported service to listening to remote events

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for handlers and callbacks.

func (*EventBus) HasCallback

func (bus *EventBus) HasCallback(topic string) bool

HasCallback returns true if exists any callback subscribed to the topic.

func (*EventBus) Publish

func (bus *EventBus) Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic string, fn interface{}) error

Subscribe subscribes to a topic. Returns error if `fn` is not a function.

func (*EventBus) SubscribeAsync

func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error

SubscribeAsync subscribes to a topic with an asynchronous callback Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnce

func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error

SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnceAsync

func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error

SubscribeOnceAsync subscribes to a topic once with an asynchronous callback Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error

Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

func (*EventBus) WaitAsync

func (bus *EventBus) WaitAsync()

WaitAsync waits for all async callbacks to complete

type NetworkBus

type NetworkBus struct {
	*Client
	*Server
	// contains filtered or unexported fields
}

NetworkBus - object capable of subscribing to remote event buses in addition to remote event busses subscribing to it's local event bus. Compoed of a server and client

func NewNetworkBus

func NewNetworkBus(address, path string) *NetworkBus

NewNetworkBus - returns a new network bus object at the server address and path

func (*NetworkBus) EventBus

func (networkBus *NetworkBus) EventBus() Bus

EventBus - returns wrapped event bus

func (*NetworkBus) Start

func (networkBus *NetworkBus) Start() error

Start - helper method to serve a network bus service

func (*NetworkBus) Stop

func (networkBus *NetworkBus) Stop()

Stop - signal for the service to stop serving

type NetworkBusService

type NetworkBusService struct {
	// contains filtered or unexported fields
}

NetworkBusService - object capable of serving the network bus

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server - object capable of being subscribed to by remote handlers

func NewServer

func NewServer(address, path string, eventBus Bus) *Server

NewServer - create a new Server at the address and path

func (*Server) EventBus

func (server *Server) EventBus() Bus

EventBus - returns wrapped event bus

func (*Server) HasClientSubscribed

func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool

HasClientSubscribed - True if a client subscribed to this server with the same topic

func (*Server) Start

func (server *Server) Start() error

Start - starts a service for remote clients to subscribe to events

func (*Server) Stop

func (server *Server) Stop()

Stop - signal for the service to stop serving

type ServerService

type ServerService struct {
	// contains filtered or unexported fields
}

ServerService - service object to listen to remote subscriptions

func (*ServerService) Register

func (service *ServerService) Register(arg *SubscribeArg, success *bool) error

Register - Registers a remote handler to this event bus for a remote subscribe - a given client address only needs to subscribe once event will be republished in local event bus

type SubscribeArg

type SubscribeArg struct {
	ClientAddr    string
	ClientPath    string
	ServiceMethod string
	SubscribeType SubscribeType
	Topic         string
}

SubscribeArg - object to hold subscribe arguments from remote event handlers

type SubscribeType

type SubscribeType int

SubscribeType - how the client intends to subscribe

const (
	// Subscribe - subscribe to all events
	Subscribe SubscribeType = iota
	// SubscribeOnce - subscribe to only one event
	SubscribeOnce
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL