notificationcenter

package module
v2.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 9 Imported by: 7

README

Notificationcenter pub/sub library

Build Status Go Report Card GoDoc Coverage Status

License Apache 2.0

Publish/subscribe messaging, often referred to as pub/sub messaging, serves as a pivotal form of asynchronous communication between services within serverless and microservices architectures. Operating on a pub/sub model, this approach entails the instantaneous transmission of any published message to all subscribers associated with the corresponding topic. The utility of pub/sub messaging extends to enabling event-driven architectures and the seamless decoupling of applications, ultimately yielding improvements in performance, reliability, and scalability.

At its core, this mechanism involves the interaction between publishers, who disseminate messages, and subscribers, who receive and act upon these messages. By employing this model, systems can leverage the power of loosely coupled communication, enhancing the adaptability of individual components within the broader infrastructure.

To streamline the implementation of this messaging paradigm, libraries provide essential foundational elements that facilitate the utilization of various queue implementations. These libraries abstract the complexities of interacting with diverse queuing systems, thereby simplifying the development of pub/sub services. This not only promotes efficient communication between services but also empowers developers to concentrate on the business logic and functionality of their applications without becoming entangled in the intricacies of messaging infrastructures.

Using examples

Basic examples of usage.

import(
  nc "github.com/geniusrabbit/notificationcenter/v2"
)
Create new publisher processor
// Create new publisher processor
eventStream, err = nats.NewPublisher(nats.WithNatsURL("nats://hostname:4222/group?topics=event"))
if err != nil {
  log.Fatal(err)
}

// Register stream processor
err = nc.Register("events", eventStream)
if err != nil {
  log.Fatal(err)
}
Send event by the notification publisher
// Send by global functions
nc.Publish(context.Background(), "events", message{title: "event 1"})

// Send by logger interface
events := nc.Publisher("events")
events.Publish(context.Background(), message{title: "event 2"})
Subscribe by the specific notification publisher
import (
  nc "github.com/geniusrabbit/notificationcenter/v2"
  "github.com/geniusrabbit/notificationcenter/v2/nats"
)

func main() {
  ctx := context.Background()
  events := nats.MustNewSubscriber(nats.WithTopics("events"),
    nats.WithNatsURL("nats://connection"), nats.WithGroupName(`group`))
  nc.Register("events", events)
  nc.Register("refresh", interval.NewSubscriber(time.Minute * 5))

  // Add new receiver to process the stream "events"
  nc.Subscribe("events", func(msg nc.Message) error {
    fmt.Printf("%v\n", msg.Data())
    return msg.Ack()
  })

  // Add new time interval receiver to refresh the data every 5 minutes
  nc.Subscribe("refresh", func(msg nc.Message) error {
    return db.Reload()
  })

  // Run subscriber listeners
  nc.Listen(ctx)
}

TODO

  • Add support Amazon SQS queue
  • Add support Redis queue
  • Add support RabbitMQ queue
  • Add support MySQL notifications queue
  • Add support PostgreSQL notifications queue
  • Remove metrics from the queue (DEPRECATED)
  • Add support NATS & NATS stream
  • Add support kafka queue
  • Add support native GO chanels
  • Add support native GO time interval

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidRegisterParameter     = errors.New(`invalid register parameter`)
	ErrUndefinedPublisherInterface  = errors.New(`undefined publisher interface`)
	ErrUndefinedSubscriberInterface = errors.New(`undefined subscriber interface`)
	ErrInterfaceAlreadySubscribed   = errors.New("[notificationcenter] interface already subscribed")
)

Error list...

View Source
var DefaultRegistry = NewRegistry()

DefaultRegistry is global registry

Functions

func Close

func Close() error

Close notification center

func Listen

func Listen(ctx context.Context) error

Listen runs subscribers listen interface

func OnClose

func OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func Publish

func Publish(ctx context.Context, name string, messages ...any) error

Publish one or more messages to the pub-service

func Register

func Register(params ...any) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func Subscribe

func Subscribe(ctx context.Context, name string, receiver any) error

Subscribe new handler on some particular subscriber interface by name

Types

type ErrorHandler

type ErrorHandler func(msg Message, err error)

ErrorHandler type to process error value

type FuncPublisher

type FuncPublisher func(context.Context, ...any) error

FuncPublisher provides custom function wrapper for the custom publisher processor

func (FuncPublisher) Publish

func (f FuncPublisher) Publish(ctx context.Context, messages ...any) error

Publish method call the original custom publisher function

type FuncReceiver

type FuncReceiver func(msg Message) error

FuncReceiver implements Receiver interface for a single function

func (FuncReceiver) Receive

func (f FuncReceiver) Receive(msg Message) error

Receive message from sub-service to process it with function

type Message

type Message interface {
	// Context of the message
	Context() context.Context

	// Unical message ID (depends on transport)
	ID() string

	// Body returns message data as bytes
	Body() []byte

	// Acknowledgment of the message processing
	Ack() error
}

Message describes the access methods to the message original object

type ModelSubscriber

type ModelSubscriber struct {

	// Error handler pointer
	ErrorHandler ErrorHandler

	// Panic handler pointer
	PanicHandler PanicHandler
	// contains filtered or unexported fields
}

ModelSubscriber provedes subscibe functionality implementation

func (*ModelSubscriber) Close

func (s *ModelSubscriber) Close() error

Close all receivers if supports io.Closer interface

func (*ModelSubscriber) ProcessMessage

func (s *ModelSubscriber) ProcessMessage(msg Message) error

ProcessMessage by all receivers

func (*ModelSubscriber) Subscribe

func (s *ModelSubscriber) Subscribe(ctx context.Context, receiver Receiver) error

Subscribe new receiver to receive messages from the subsribtion

type MultiPublisher

type MultiPublisher []Publisher

MultiPublisher wrapper

func (MultiPublisher) Publish

func (p MultiPublisher) Publish(ctx context.Context, messages ...any) error

Publish one or more messages to the banch of pub-services

type PanicHandler

type PanicHandler func(msg Message, recoverData any)

PanicHandler type to process panic action

type Publisher

type Publisher interface {
	// Publish one or more messages to the pub-service
	Publish(ctx context.Context, messages ...any) error
}

Publisher pipeline base declaration

func PublisherByName

func PublisherByName(name string) Publisher

PublisherByName returns pub interface by name if exists or Nil otherwise

type Receiver

type Receiver interface {
	Receive(msg Message) error
}

Receiver describe interface of message processing

func ExtFuncReceiver

func ExtFuncReceiver(f any, decs ...decoder.Decoder) Receiver

ExtFuncReceiver wraps function argument with arbitrary input data type

func ReceiverFrom

func ReceiverFrom(handler any) Receiver

ReceiverFrom converts income handler type to Receiver interface

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides functionality of access to pub/sub interfaces by string names.

func NewRegistry

func NewRegistry() *Registry

NewRegistry init new registry object

func (*Registry) Close

func (r *Registry) Close() error

Close notification center

func (*Registry) Listen

func (r *Registry) Listen(ctx context.Context) (err error)

Listen runs subscribers listen interface

func (*Registry) OnClose

func (r *Registry) OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func (*Registry) Publish

func (r *Registry) Publish(ctx context.Context, name string, messages ...any) error

Publish one or more messages to the pub-service

func (*Registry) Publisher

func (r *Registry) Publisher(name string) Publisher

Publisher returns pub interface by name if exists or Nil otherwise

func (*Registry) Register

func (r *Registry) Register(params ...any) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func (*Registry) Subscribe

func (r *Registry) Subscribe(ctx context.Context, name string, receiver any) error

Subscribe new handler on some particular subscriber interface by name

func (*Registry) Subscriber

func (r *Registry) Subscriber(name string) Subscriber

Subscriber returns sub interface by name if exists or Nil otherwise

type Subscriber

type Subscriber interface {
	io.Closer

	// Subscribe new receiver to receive messages from the subsribtion
	Subscribe(ctx context.Context, receiver Receiver) error

	// Start processing queue
	Listen(ctx context.Context) error
}

Subscriber provides methods of working with subscription

func SubscriberByName

func SubscriberByName(name string) Subscriber

SubscriberByName returns sub interface by name if exists or Nil otherwise

Directories

Path Synopsis
internal
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package pg provides posibility to subscribe to internal postgres events.
Package pg provides posibility to subscribe to internal postgres events.
wrappers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL