eventbus

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2022 License: MIT Imports: 3 Imported by: 12

README

go-eventbus

simple strong typed event bus from golang generics.

you can use it as a local eventbus or a mediator for CQRS

  • Publish, Subscribe/SubscribeOnce/Subscribe (with type assertion)
  • Dispatch, AddProcessor

Install

go get github.com/goxiaoy/go-eventbus

Usage

create a eventbus or skip to use Default one
bus := eventbus.New()
Publish and Subscribe
type TestEvent1 struct {
}

func main() {
    ctx := context.Background()
    //Subscribe
    dispose, err := eventbus.Subscribe[*TestEvent1](bus)(func(ctx context.Context, event *TestEvent1) error {
        fmt.Print("do TestEvent1")
        return nil
    })
    if err != nil {
        panic(err)
    }
    //Publish
    err = eventbus.Publish[*TestEvent1](bus)(ctx, &TestEvent1{})
    if err != nil {
        panic(err)
    }
    //UnSubscribe
    err = dispose.Dispose()
    if err != nil {
        panic(err)
    }

}

Subscribe to any

//You can also subscribe to any
dispose, err = eventbus.Subscribe[interface{}](bus)(func(ctx context.Context, event interface{}) error {
    fmt.Println("do any")
    return nil
})

Subscribe once

dispose, err = eventbus.SubscribeOnce[interface{}](bus)(func(ctx context.Context, event interface{}) error {
    fmt.Println("do any")
    return nil
})
Dispatch and Process
//Processor
dispose, err = eventbus.AddProcessor[*TestEvent1, *TestResult1](bus)(func(ctx context.Context, event *TestEvent1) (*TestResult1, error) {
    fmt.Println("return result")
    return &TestResult1{}, err
})
//Dispatch
result, err := eventbus.Dispatch[*TestEvent1, *TestResult1](bus)(ctx, &TestEvent1{})
if err != nil {
    panic(err)
}

Limitation

  • This eventbus is designed as O(n) complexity.
  • Do not support Dispatch/Process Result interface type

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Default = New()
)
View Source
var (
	ErrNotProcessor = fmt.Errorf("no processor registerd")
)

Functions

This section is empty.

Types

type DispatcherFunc

type DispatcherFunc[TEvent any, TResult any] func(ctx context.Context, event TEvent) (TResult, error)

func Dispatch

func Dispatch[TEvent any, TResult any](es ...*EventBus) DispatcherFunc[TEvent, TResult]

Dispatch return processed result, ErrNotProcessor returned if no matching processor

type DisposeFunc

type DisposeFunc func() error

func (DisposeFunc) Dispose

func (d DisposeFunc) Dispose() error

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func New

func New() *EventBus

type Handler

type Handler[TEvent any] func(ctx context.Context, event TEvent) error

type IDisposable

type IDisposable interface {
	Dispose() error
}

type ProcessableFunc

type ProcessableFunc[TEvent any, TResult any] func(processor Processor[TEvent, TResult]) (IDisposable, error)

func AddProcessor

func AddProcessor[TEvent any, TResult any](es ...*EventBus) ProcessableFunc[TEvent, TResult]

type Processor

type Processor[TEvent any, TResult any] func(ctx context.Context, event TEvent) (TResult, error)

type PublisherFunc

type PublisherFunc[TEvent any] func(ctx context.Context, event TEvent) error

func Publish

func Publish[TEvent any](es ...*EventBus) PublisherFunc[TEvent]

type SubscribeFunc

type SubscribeFunc[TEvent any] func(handler Handler[TEvent]) (IDisposable, error)

func Subscribe

func Subscribe[TEvent any](es ...*EventBus) SubscribeFunc[TEvent]

type SubscribeOnceFunc

type SubscribeOnceFunc[TEvent any] func(handler Handler[TEvent]) (IDisposable, error)

func SubscribeOnce

func SubscribeOnce[TEvent any](es ...*EventBus) SubscribeOnceFunc[TEvent]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL