observer

package module
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2022 License: MIT Imports: 5 Imported by: 0

README

observer

tests codecov release date commits since release issues closed license

This library implements the pub/sub pattern in a generic way. It uses Go's generic types to declare the type of the event.

Differences between v1 and v2

The v2 release contains breaking changes. The most important ones are:

  • The Observable interface does not provide a Clients() int64 method anymore.
  • The constructor NewObserver() has been removed. Instead, use new(observer.Observer[T]).
  • The Observer has become a NotifyTimeout that can be used to set a timeout for the NotifyAll method. The default value is 5 * time.Second.

Usage

go get github.com/leonsteinhaeuser/observer/v2

Example

package main

import (
    "fmt"
    "github.com/leonsteinhaeuser/observer/v2"
)

type Event struct {
    ID      int
    Message string
}

var (
    obsrv *observer.Observer[Event] = new(observer.Observer[Event])
)

func main() {
    rspCh, cancelFunc := obsrv.Subscribe()
    defer cancelFunc()
    go func() {
        for {
            fmt.Printf("Received event: %v\n", <-rspCh)
        }
    }()
    fmt.Println("Registered Clients: ", obsrv.Clients())

    obsrv.NotifyAll(Event{
        ID:      i,
        Message: "Hello World",
    })
}

If you would like to see a more detailed example, please take a look at the observer example.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClientAlreadyDeRegistered = errors.New("client already de-registered")

Functions

This section is empty.

Types

type CancelFunc

type CancelFunc func() error

func Handle

func Handle[T any](ctx context.Context, o Observable[T], h func(context.Context, T) error) (func() error, CancelFunc)

Handle builds repetitive message consumer using provided handler function h. Returned func() error value is suitable to run in errrgroup's Go() method.

type Observable

type Observable[T any] interface {
	// Subscribe registers a client to the observable.
	Subscribe() (<-chan T, CancelFunc)
	// NotifyAll notifies all registered clients.
	NotifyAll(data T)
}

Observable

type Observer

type Observer[T any] struct {
	NotifyTimeout time.Duration
	// contains filtered or unexported fields
}

Observer offers the possibility to notify all registered clients. Since the client map must be initialized, it is not possible to use this structure directly. Use NewObserver instead.

func (*Observer[t]) Clients

func (o *Observer[t]) Clients() int64

Clients returns the number of registered clients.

func (*Observer[T]) NotifyAll

func (o *Observer[T]) NotifyAll(data T)

NotifyAll notifies all registered clients.

func (*Observer[T]) Subscribe

func (o *Observer[T]) Subscribe() (<-chan T, CancelFunc)

Subscribe registers a client to the observer and returns a channel to receive notifications. The returned CancelFunc can be used to de-register the client.

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL