eventbus

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2020 License: MIT Imports: 7 Imported by: 0

README

EventBus

GoDoc Coverage Status Build Status

package eventbus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/asaskevich/EventBus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/asaskevich/EventBus"

If you unhappy to use long EventBus, you can do something like this:

import (
	evbus "github.com/asaskevich/EventBus"
)
Example
func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := EventBus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}
Implemented methods
  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()
New()

New returns new EventBus with empty handlers.

bus := EventBus.New();
Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
Unsubscribe(topic string) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);
HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any addional argument will be tranfered to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asyncrhonous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)
	
bus.Publish("main:slow_calculator", 20, 60)
	
fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result") 
	
bus.WaitAsync() // wait for all async callbacks to complete
	
fmt.Println("do some stuff after waiting for result") 

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}    
Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

Documentation

Index

Constants

View Source
const (
	// PublishService - Client service method
	PublishService = "ClientService.PushEvent"
)
View Source
const (
	// RegisterService - Server subscribe service method
	RegisterService = "ServerService.Register"
)

Variables

This section is empty.

Functions

func HasCallback added in v0.1.0

func HasCallback(topic string) bool

HasCallback runs HasCallback on package-level bus singleton

func Publish added in v0.1.0

func Publish(topic string, args ...interface{})

Publish runs Publish on package-level bus singleton

func Subscribe

func Subscribe(topic string, fn interface{}) error

Subscribe runs Subscribe on package-level bus singleton

func SubscribeAsync added in v0.1.0

func SubscribeAsync(topic string, fn interface{}, transactional bool) error

SubscribeAsync runs SubscribeAsync on package-level bus singleton

func SubscribeOnce

func SubscribeOnce(topic string, fn interface{}) error

SubscribeOnce runs SubscribeOnce on package-level bus singleton

func SubscribeOnceAsync added in v0.1.0

func SubscribeOnceAsync(topic string, fn interface{}) error

SubscribeOnceAsync runs SubscribeOnceAsync on package-level bus singleton

func Unsubscribe added in v0.1.0

func Unsubscribe(topic string, handler interface{}) error

Unsubscribe runs Unsubscribe on package-level bus singleton

func WaitAsync added in v0.1.0

func WaitAsync()

WaitAsync runs WaitAsync on package-level bus singleton

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus - box for handlers and callbacks.

func New

func New() *Bus

New returns new Bus with empty handlers.

func (*Bus) HasCallback added in v0.2.0

func (bus *Bus) HasCallback(topic string) bool

HasCallback returns true if exists any callback subscribed to the topic.

func (*Bus) Publish added in v0.2.0

func (bus *Bus) Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func (*Bus) Subscribe added in v0.2.0

func (bus *Bus) Subscribe(topic string, fn interface{}) error

Subscribe subscribes to a topic. Returns error if `fn` is not a function.

func (*Bus) SubscribeAsync added in v0.2.0

func (bus *Bus) SubscribeAsync(topic string, fn interface{}, transactional bool) error

SubscribeAsync subscribes to a topic with an asynchronous callback Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

func (*Bus) SubscribeOnce added in v0.2.0

func (bus *Bus) SubscribeOnce(topic string, fn interface{}) error

SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

func (*Bus) SubscribeOnceAsync added in v0.2.0

func (bus *Bus) SubscribeOnceAsync(topic string, fn interface{}) error

SubscribeOnceAsync subscribes to a topic once with an asynchronous callback Handler will be removed after executing. Returns error if `fn` is not a function.

func (*Bus) Unsubscribe added in v0.2.0

func (bus *Bus) Unsubscribe(topic string, handler interface{}) error

Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

func (*Bus) WaitAsync added in v0.2.0

func (bus *Bus) WaitAsync()

WaitAsync waits for all async callbacks to complete

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client - object capable of subscribing to a remote event bus

func NewClient

func NewClient(address, path string, eventBus *Bus) *Client

NewClient - create a client object with the address and server path

func (*Client) Start

func (client *Client) Start() error

Start - starts the client service to listen to remote events

func (*Client) Stop

func (client *Client) Stop()

Stop - signal for the service to stop serving

func (*Client) Subscribe

func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string) error

Subscribe subscribes to a topic in a remote event bus

func (*Client) SubscribeOnce

func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string) error

SubscribeOnce subscribes once to a topic in a remote event bus

type ClientArg

type ClientArg struct {
	Args  []interface{}
	Topic string
}

ClientArg - object containing event for client to publish locally

type ClientService

type ClientService struct {
	// contains filtered or unexported fields
}

ClientService - service object listening to events published in a remote event bus

func (*ClientService) PushEvent

func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error

PushEvent - exported service to listening to remote events

type NetworkBus

type NetworkBus struct {
	*Client
	*Server
	// contains filtered or unexported fields
}

NetworkBus - object capable of subscribing to remote event buses in addition to remote event buses subscribing to it's local event bus. Composed of a server and client

func NewNetworkBus

func NewNetworkBus(address, path string) *NetworkBus

NewNetworkBus - returns a new network bus object at the server address and path

func (*NetworkBus) Start

func (networkBus *NetworkBus) Start() error

Start - helper method to serve a network bus service

func (*NetworkBus) Stop

func (networkBus *NetworkBus) Stop()

Stop - signal for the service to stop serving

type NetworkBusService

type NetworkBusService struct {
	// contains filtered or unexported fields
}

NetworkBusService - object capable of serving the network bus

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server - object capable of being subscribed to by remote handlers

func NewServer

func NewServer(address, path string, eventBus *Bus) *Server

NewServer - create a new Server at the address and path

func (*Server) HasClientSubscribed

func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool

HasClientSubscribed - True if a client subscribed to this server with the same topic

func (*Server) Start

func (server *Server) Start() error

Start - starts a service for remote clients to subscribe to events

func (*Server) Stop

func (server *Server) Stop()

Stop - signal for the service to stop serving

type ServerService

type ServerService struct {
	// contains filtered or unexported fields
}

ServerService - service object to listen to remote subscriptions

func (*ServerService) Register

func (service *ServerService) Register(arg *SubscribeArg, success *bool) error

Register - Registers a remote handler to this event bus for a remote subscribe - a given client address only needs to subscribe once event will be republished in local event bus

type SubscribeArg

type SubscribeArg struct {
	ClientAddr    string
	ClientPath    string
	ServiceMethod string
	SubscribeType SubscribeType
	Topic         string
}

SubscribeArg - object to hold subscribe arguments from remote event handlers

type SubscribeType

type SubscribeType int

SubscribeType - how the client intends to subscribe

const (
	// SubscribeTypePermanent - subscribe to all events
	SubscribeTypePermanent SubscribeType = iota
	// SubscribeTypeOnce - subscribe to only one event
	SubscribeTypeOnce
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL