eventbus

package module
v0.0.0-...-6bc1963 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

EventBus

EventBus is a lightweight event bus written in Golang, which provides various enhanced features and functionalities.

Features

  • Easy to use: EventBus is a tiny library with an API that is super easy to learn. No configuration needed, you can directly create a default instance of EventBus at any position in your code.

  • Cross-process: You can enable network options during initialization to use a cross-process event bus.

  • Non-intrusive: All extensions are implemented without impacting the code of the basic functionality. You only need to enable the corresponding feature options during initialization.

  • Extensible: The code provides a unified interface for extending functionality, allowing you to customize your own advanced features.

Installation

Use go get to install this package.

go get github.com/danielhookx/xcontainer

Getting Started

Subscribe Topic And Publish
package main

import (
	"github.com/danielhookx/eventbus"
)

func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
    bus := eventbus.New()
    bus.Subscribe("calculator", calculator);
    bus.Publish("calculator", 10, 20);
}
Sync Subscribe Topic
bus.SubscribeSync("calculator", calculator);
Unsubscribe
bus.Unsubscribe("calculator", calculator)
SubscribeWith

There is an advanced usage of this, using the SubscribeWith method to customize the subscription behavior.

It should be noted that when using SubscribeWith, you need to ensure the uniqueness of the key yourself, and the same key will only be subscribed once.

package main

import (
	"github.com/danielhookx/eventbus"
	"github.com/danielhookx/fission"
)

type mockDist struct {
	key string
}

func createMockDistHandlerFunc(key any) fission.Distribution {
	return &mockDist{
		key: key.(string),
	}
}

func (m *mockDist) Register(ctx context.Context) {
	return
}
func (m *mockDist) Key() any {
	return m.key
}
func (m *mockDist) Dist(data any) error {
	// add your code here
	fmt.Println(data)
	return nil
}
func (m *mockDist) Close() error {
	return nil
}

func main() {
	topic := "main.test"
    bus := eventbus.New()
    bus.SubscribeWith(topic, "key1", createMockDistHandlerFunc)
    bus.Publish(topic, "jack")
	bus.Unsubscribe(topic, "key1")
}
Cross Process Events

You only need to specify the use of netbus during initialization. Later, you can use the cross-process eventbus like the local eventbus.

publisher process

package main

import (
	"github.com/danielhookx/eventbus"
)

func main() {
	rawURL := "tcp://:7633"
	remoteURL := "tcp://localhost:7634"

	bus := eventbus.New(
		eventbus.WithProxys(
			eventbus.NewRPCProxyCreator(rawURL, remoteURL),
		),
	)
	
    bus.Publish("test", "jack")
}

subscriber process

package main

import (
	"github.com/danielhookx/eventbus"
)

func main() {
	rawURL := "tcp://:7634"
	remoteURL := "tcp://localhost:7633"

	bus := eventbus.New(
		eventbus.WithProxys(
			eventbus.NewRPCProxyCreator(rawURL, remoteURL),
		),
	)

    bus.Subscribe("test", func(name string) {
	    fmt.Printf("hello %s\n", name)
    })
}

Contributing

We welcome contributions to this project. Please submit pull requests with your proposed changes or improvements.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BusPublisher

type BusPublisher interface {
	Publish(topic string, args ...interface{})
}

type BusSubscriber

type BusSubscriber interface {
	Subscribe(topic string, fn interface{}) error
	SubscribeSync(topic string, fn interface{}) error
	Unsubscribe(topic string, key any) error
	SubscribeWith(topic string, key any, distHandler fission.CreateDistributionHandleFunc) error
}

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func (*EventBus) Publish

func (bus *EventBus) Publish(topic string, args ...interface{})

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic string, fn interface{}) error

func (*EventBus) SubscribeSync

func (bus *EventBus) SubscribeSync(topic string, fn interface{}) error

func (*EventBus) SubscribeWith

func (bus *EventBus) SubscribeWith(topic string, key any, distHandler fission.CreateDistributionHandleFunc) error

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic string, key any) error

type Eventbus

type Eventbus interface {
	BusSubscriber
	BusPublisher
}

func New

func New(opt ...EventbusOption) Eventbus

type EventbusOption

type EventbusOption interface {
	// contains filtered or unexported methods
}

func WithProxys

func WithProxys(proxyCreators ...ProxyCreator) EventbusOption

WithProxyCreator returns a EventbusOption that sets the ProxyCreator.

type ProxyCreator

type ProxyCreator func(bus Eventbus) Eventbus

func NewRPCProxyCreator

func NewRPCProxyCreator(rawURL, remoteURL string) ProxyCreator

type PubArgs

type PubArgs struct {
	Topic string
	Data  any
}

type PubReply

type PubReply struct{}

type RPCProxy

type RPCProxy struct {
	// contains filtered or unexported fields
}

func NewRPCProxy

func NewRPCProxy(rawURL, remoteURL string, bus Eventbus) (*RPCProxy, error)

func (*RPCProxy) Publish

func (p *RPCProxy) Publish(topic string, args ...interface{})

func (*RPCProxy) RPCPublish

func (p *RPCProxy) RPCPublish(args *PubArgs, reply *PubReply) error

func (*RPCProxy) RPCSubscribe

func (p *RPCProxy) RPCSubscribe(args *SubArgs, reply *SubReply) error

func (*RPCProxy) RPCSubscribeSync

func (p *RPCProxy) RPCSubscribeSync(args *SubArgs, reply *SubReply) error

func (*RPCProxy) RPCUnsubscribe

func (p *RPCProxy) RPCUnsubscribe(args *UnsubArgs, reply *UnsubReply) error

func (*RPCProxy) Subscribe

func (p *RPCProxy) Subscribe(topic string, fn interface{}) error

func (*RPCProxy) SubscribeSync

func (p *RPCProxy) SubscribeSync(topic string, fn interface{}) error

func (*RPCProxy) SubscribeWith

func (p *RPCProxy) SubscribeWith(topic string, key any, distHandler fission.CreateDistributionHandleFunc) error

func (*RPCProxy) Unsubscribe

func (p *RPCProxy) Unsubscribe(topic string, handler interface{}) error

type SubArgs

type SubArgs struct {
	RemoteURL string
	Topic     string
}

type SubReply

type SubReply struct {
}

type UnsubArgs

type UnsubArgs struct {
	Topic string
}

type UnsubReply

type UnsubReply struct {
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL