emitter

package module
v0.0.0-...-349169d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: MIT Imports: 3 Imported by: 83

README

Emitter wercker status Coverage Status godoc

The emitter package implements a channel-based pubsub pattern. The design goals are to use Golang concurrency model instead of flat callbacks and to design a very simple API that is easy to consume.

Why?

Go has expressive concurrency model but nobody uses it properly for pubsub as far as I can tell (in the year 2015). I implemented my own solution as I could not find any other that meets my expectations. Please, read this article for more information.

What it does?

Brief example

e := &emitter.Emitter{}
go func(){
	<-e.Emit("change", 42) // wait for the event sent successfully
	<-e.Emit("change", 37)
	e.Off("*") // unsubscribe any listeners
}()

for event := range e.On("change") {
	// do something with event.Args
	println(event.Int(0)) // cast the first argument to int
}
// listener channel was closed

Constructor

emitter.New takes a uint as the first argument to indicate what buffer size should be used for listeners. It is also possible to change the buffer capacity during runtime using the following code: e.Cap = 10.

By default, the emitter uses one goroutine per listener to send an event. You can change that behavior from asynchronous to synchronous by passing emitter.Sync flag as shown here: e.Use("*", emitter.Sync). I recommend specifying middlewares(see below) for the emitter at the begining.

Wildcard

The package allows publications and subscriptions with wildcard. This feature is based on path.Match function.

Example:

go e.Emit("something:special", 42)
event := <-e.Once("*") // search any events
println(event.Int(0)) // will print 42

// or emit an event with wildcard path
go e.Emit("*", 37) // emmit for everyone
event := <-e.Once("something:special")
println(event.Int(0)) // will print 37

Note that the wildcard uses path.Match, but the lib does not return errors related to parsing for this is not the main feature. Please check the topic specifically via emitter.Test() function.

Middlewares

An important part of pubsub package is the predicates. It should be allowed to skip some events. Middlewares address this problem. The middleware is a function that takes a pointer to the Event as its first argument. A middleware is capable of doing the following items:

  1. It allows you to modify an event.
  2. It allows skipping the event emitting if needed.
  3. It also allows modification of the event's arguments.
  4. It allows you to specify the mode to describe how exactly an event should be emitted(see below).

There are two ways to add middleware into the event emitting flow:

  • via .On("event", middlewares...)
  • via .Use("event", middlewares...)

The first one add middlewares only for a particular listener, while the second one adds middlewares for all events with a given topic.

For example:

// use synchronous mode for all events, it also depends
// on the emitter capacity(buffered/unbuffered channels)
e.Use("*", emitter.Sync)
go e.Emit("something:special", 42)

// define predicate
event := <-e.Once("*", func(ev *emitter.Event){
	if ev.Int(0) == 42 {
	    // skip sending
		ev.Flags = ev.Flags | emitter.FlagVoid
	}
})
panic("will never happen")

Flags

Flags needs to describe how exactly the event should be emitted. The available options are listed here.

Every event(emitter.Event) has a field called.Flags that contains flags as a binary mask. Flags can be set only via middlewares(see above).

There are several predefined middlewares to set needed flags:

You can chain the above flags as shown below:

e.Use("*", emitter.Void) // skip sending for any events
go e.Emit("surprise", 65536)
event := <-e.On("*", emitter.Reset, emitter.Sync, emitter.Once) // set custom flags for this listener
pintln(event.Int(0)) // prints 65536

Cancellation

Golang provides developers with a powerful control for its concurrency flow. We know the state of a channel and whether it would block a go routine or not. So, by using this language construct, we can discard any emitted event. It's a good practice to design your application with timeouts so that you cancel the operations if needed as shown below:

Assume you have time out to emit the events:

done := e.Emit("broadcast", "the", "event", "with", "timeout")

select {
case <-done:
	// so the sending is done
case <-time.After(timeout):
	// time is out, let's discard emitting
	close(done)
}

It's pretty useful to control any goroutines inside an emitter instance.

Callbacks-only usage

using the emitter in more traditional way is possible, as well. If you don't need the async mode or you very attentive to the application resources, then the recipe is to use an emitter with zero capacity or to use FlagVoid to skip sending into the listener channel and use middleware as callback:

e := &emitter.Emitter{}
e.Use("*", emitter.Void)

go e.Emit("change", "field", "value")
e.On("change", func(event *Event){
	// handle changes here
	field := event.String(0)
	value := event.String(1)
	// ...and so on
})

Groups

Group merges different listeners into one channel. Example:

e1 := &emitter.Emitter{}
e2 := &emitter.Emitter{}
e3 := &emitter.Emitter{}

g := &emitter.Group{Cap: 1}
g.Add(e1.On("first"), e2.On("second"), e3.On("third"))

for event := g.On() {
	// handle the event
	// event has field OriginalTopic and Topic
}

Also you can combine several groups into one.

See the api here.

Event

Event is a struct that contains event information. Also, th event has some helpers to cast various arguments into bool, string, float64, int by given argument index with an optional default value.

Example:


go e.Emit("*", "some string", 42, 37.0, true)
event := <-e.Once("*")

first := event.String(0)
second := event.Int(1)
third := event.Float(2)
fourth := event.Bool(3)

// use default value if not exists
dontExists := event.Int(10, 64)
// or use dafault value if type don't match
def := event.Int(0, 128)

// .. and so on

License

MIT

Documentation

Overview

Package emitter implements channel based pubsub pattern. The design goals are:

  • fully functional and safety
  • simple to understand and use
  • make the code readable, maintainable and minimalistic

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Close

func Close(e *Event)

Close middleware sets FlagClose flag for an event

func Once

func Once(e *Event)

Once middleware sets FlagOnce flag for an event

func Reset

func Reset(e *Event)

Reset middleware resets flags

func Skip

func Skip(e *Event)

Skip middleware sets FlagSkip flag for an event

func Sync

func Sync(e *Event)

Sync middleware sets FlagSync flag for an event

func Test

func Test(pattern string) bool

Test returns boolean value to indicate that given pattern is valid.

What is it for? Internally `emitter` uses `path.Match` function to find matching. But as this functionality is optional `Emitter` don't indicate that the pattern is invalid. You should check it separately explicitly via `Test` function.

func Void

func Void(e *Event)

Void middleware sets FlagVoid flag for an event

Types

type Emitter

type Emitter struct {
	Cap uint
	// contains filtered or unexported fields
}

Emitter is a struct that allows to emit, receive event, close receiver channel, get info about topics and listeners

func New

func New(capacity uint) *Emitter

New returns just created Emitter struct. Capacity argument will be used to create channels with given capacity by default. The OnWithCap method can be used to get different capacities per listener.

func (*Emitter) Emit

func (e *Emitter) Emit(topic string, args ...interface{}) chan struct{}

Emit emits an event with the rest arguments to all listeners which were covered by topic(it can be pattern).

func (*Emitter) Listeners

func (e *Emitter) Listeners(topic string) []<-chan Event

Listeners returns slice of listeners which were covered by topic(it can be pattern) and error if pattern is invalid.

func (*Emitter) Off

func (e *Emitter) Off(topic string, channels ...<-chan Event)

Off unsubscribes all listeners which were covered by topic, it can be pattern as well.

func (*Emitter) On

func (e *Emitter) On(topic string, middlewares ...func(*Event)) <-chan Event

On returns a channel that will receive events. As optional second argument it takes middlewares.

func (*Emitter) OnWithCap

func (e *Emitter) OnWithCap(topic string, capacity uint, middlewares ...func(*Event)) <-chan Event

On returns a channel that will receive events with the listener capacity specified. As optional second argument it takes middlewares.

func (*Emitter) Once

func (e *Emitter) Once(topic string, middlewares ...func(*Event)) <-chan Event

Once works exactly like On(see above) but with `Once` as the first middleware.

func (*Emitter) Topics

func (e *Emitter) Topics() []string

Topics returns all existing topics.

func (*Emitter) Use

func (e *Emitter) Use(pattern string, middlewares ...func(*Event))

Use registers middlewares for the pattern.

type Event

type Event struct {
	Topic, OriginalTopic string
	Flags                Flag
	Args                 []interface{}
}

Event is a structure to send events contains some helpers to cast primitive types easily.

func (Event) Bool

func (e Event) Bool(index uint, dflt ...bool) bool

Bool returns casted into bool type argument by index. `dflt` argument is an optional default value returned either in case of casting error or in case of index error.

func (Event) Float

func (e Event) Float(index uint, dflt ...float64) float64

Float returns casted into float64 type argument by index. `dflt` argument is an optional default value returned either in case of casting error or in case of index error.

func (Event) Int

func (e Event) Int(index uint, dflt ...int) int

Int returns casted into int type argument by index. `dflt` argument is an optional default value returned either in case of casting error or in case of index error.

func (Event) String

func (e Event) String(index uint, dflt ...string) string

String returns casted into string type argument by index. `dflt` argument is an optional default value returned either in case of casting error or in case of index error.

type Flag

type Flag int

Flag used to describe what behavior do you expect.

const (
	// FlagReset only to clear previously defined flags.
	// Example:
	// ee.Use("*", Reset) // clears flags for this pattern
	FlagReset Flag = 0
	// FlagOnce indicates to remove the listener after first sending.
	FlagOnce Flag = 1 << iota
	// FlagVoid indicates to skip sending.
	FlagVoid
	// FlagSkip indicates to skip sending if channel is blocked.
	FlagSkip
	// FlagClose indicates to drop listener if channel is blocked.
	FlagClose
	// FlagSync indicates to send an event synchronously.
	FlagSync
)

type Group

type Group struct {
	// Cap is capacity to create new channel
	Cap uint
	// contains filtered or unexported fields
}

Group marges given subscribed channels into on subscribed channel

func (*Group) Add

func (g *Group) Add(channels ...<-chan Event)

Add adds channels which were already subscribed to some events.

func (*Group) Flush

func (g *Group) Flush()

Flush reset the group to the initial state. All references will dropped.

func (*Group) Off

func (g *Group) Off(channels ...<-chan Event)

Off unsubscribed given channels if any or unsubscribed all channels in other case

func (*Group) On

func (g *Group) On() <-chan Event

On returns subscribed channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL