goeventbus

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2023 License: MIT Imports: 1 Imported by: 0

README

Go Report Card codecov Go Reference workflow

EventBus for Golang

Description

This is a simple implementation of an event bus in golang. Actually support:

  • publish/subscribe messaging.

Get Started

To start use eventbus in your project, you can run the following command.

go get github.com/stanipetrosyan/go-eventbus

And import

import (
	goeventbus "github.com/StaniPetrosyan/go-eventbus"
)

Publish/Subscribe


var eventbus = goeventbus.NewEventBus()

address := "topic"
options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage().SetBody("Hi Topic").SetOptions(options)

eventbus.Channel(address).Subscriber().Listen(func(dc goeventbus.Context) {
	fmt.Printf("Message %s\n", dc.Result().Data)
})

for {
	eventbus.Channel(address).Publisher().Publish(message)
	time.Sleep(time.Second)
}

Message

For publishing, you need to create a Message object using this method.

message := goeventbus.CreateMessage().SetBody("Hi Topic")

Each message can have some options:


options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage()

message.SetOptions(options)

eventBus.Channel("address").Publisher().Publish(message)

Processor


eventbus.Channel("topic1").Processor(func(message goeventbus.Message) bool {
	return logic
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.6.0

type Channel interface {
	Publisher() Publisher
	Subscriber() Subscriber
	Processor(predicate func(message Message) bool) Channel
}

func NewChannel added in v0.6.0

func NewChannel(address string) Channel

type Context

type Context interface {
	Result() Message
}

func NewConsumerContextWithMessage added in v0.6.0

func NewConsumerContextWithMessage(message Message) Context

type DefaultEventBus

type DefaultEventBus struct {
	// contains filtered or unexported fields
}

func (*DefaultEventBus) Channel added in v0.6.0

func (e *DefaultEventBus) Channel(address string) Channel

type EventBus

type EventBus interface {
	Channel(adress string) Channel
}

func NewEventBus

func NewEventBus() EventBus

type Message

type Message struct {
	Data    interface{}
	Options MessageOptions
}

func CreateMessage

func CreateMessage() Message

func (Message) SetBody

func (m Message) SetBody(data interface{}) Message

func (Message) SetOptions

func (m Message) SetOptions(options MessageOptions) Message

func (Message) ToJson

func (m Message) ToJson() ([]byte, error)

type MessageOptions

type MessageOptions struct {
	// contains filtered or unexported fields
}

func NewMessageOptions

func NewMessageOptions() MessageOptions

func (MessageOptions) AddHeader

func (op MessageOptions) AddHeader(key string, value string) MessageOptions

func (MessageOptions) Header

func (op MessageOptions) Header(key string) string

func (MessageOptions) SetHeader

func (op MessageOptions) SetHeader(headers map[string]string) MessageOptions

type Publisher

type Publisher interface {
	Publish(message Message)
}

func NewPublisher added in v0.6.0

func NewPublisher(ch chan Message) Publisher

type Subscriber added in v0.6.0

type Subscriber interface {
	Listen(consumer func(context Context))
}

func NewSubscriber added in v0.6.0

func NewSubscriber(ch chan Message) Subscriber

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL