pubsub

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoAdapter = errors.New("no adapter matches topic to broadcast the message")
)

Functions

func Broadcast

func Broadcast(topic string, message []byte, options ...*Option) (err error)

Broadcast broadcasts message on given topic across the whole cluster.

func DirectBroadcast

func DirectBroadcast(nodeId string, topic string, message []byte, options ...*Option) error

DirectBroadcast Broadcasts ServiceMsg on given topic to a given node.

func Dispatch

func Dispatch(topic string, message []byte)

Dispatch used by adapters, process and delivery messages coming from backend (redis, kafka, *MQ), decrypting and decompressing if necessary.

func LocalBroadcast

func LocalBroadcast(topic string, message any)

LocalBroadcast broadcasts message on given topic only for the current node.

`topic` - The topic to broadcast to, ie: `"users:123"` `message` - The payload of the broadcast

func Self

func Self() string

Self get node id

func SetAdapters

func SetAdapters(adapters []AdapterConfig)

SetAdapters configure the adapters topics.

Allows the application to have instances specialized by topics.

## Example

SetAdapters([]AdapterConfig{
	{&RedisAdapter{Addr: "admin.redis-host:6379"}, []string{"admin:*"}},
	{&RedisAdapter{Addr: "global.redis-host:6379"}, []string{"*"}},
})

func SetGlobalOptions

func SetGlobalOptions(options ...*Option)

SetGlobalOptions set global options for sending messages

func Subscribe

func Subscribe(topic string, dispatcher Dispatcher)

func Unsubscribe

func Unsubscribe(topic string, dispatcher Dispatcher)

Unsubscribe the dispatchFunc from the pubsub adapter's topic.

Types

type Adapter

type Adapter interface {
	// Name the Adapter name
	Name() string

	// Subscribe the Adapter that has an external broker must subscribe to the given topic
	Subscribe(topic string)

	// Unsubscribe the Adapter that has an external broker must unsubscribe to the given topic
	Unsubscribe(topic string)

	// Broadcast the given topic and message to all nodes in the cluster (except the current node itself).
	Broadcast(topic string, message []byte, opts map[string]any) error
}

Adapter Specification to implement a custom PubSub adapter.

type AdapterConfig

type AdapterConfig struct {

	// Adapter The adapter instance being configured
	Adapter Adapter

	// Keyring allow to define a custom Keyring use for message encryption
	Keyring *crypto.Keyring

	// Options options that will be passed to the adapter during the broadcast
	Options []Option

	// Topics The topic name pattern this adapter must match
	Topics []string

	// RawMessage when true, do not encode messages when transmitting to adapter
	RawMessage bool

	// EnableEncryption enable/disable message encryption
	DisableEncryption bool

	// DisableCompression is used to control message compression. This can be used to reduce bandwidth usage at
	// the cost of slightly more CPU utilization.
	DisableCompression bool
}

func GetAdapter

func GetAdapter(topic string) *AdapterConfig

GetAdapter Gets the adapter associated with a topic.

type Dispatcher

type Dispatcher interface {
	Dispatch(topic string, message any, from string)
}

func DispatcherFunc

func DispatcherFunc(d func(topic string, message any, from string)) Dispatcher

type DispatcherFuncImpl

type DispatcherFuncImpl struct {
	Dispatcher func(topic string, message any, from string)
}

func (*DispatcherFuncImpl) Dispatch

func (d *DispatcherFuncImpl) Dispatch(topic string, message any, from string)

type DummyAdapter

type DummyAdapter struct {
}

DummyAdapter default adapter for local message distribution (only for the current node)

func (*DummyAdapter) Broadcast

func (a *DummyAdapter) Broadcast(topic string, message []byte, opts map[string]any) error

func (*DummyAdapter) Name

func (a *DummyAdapter) Name() string

func (*DummyAdapter) Subscribe

func (a *DummyAdapter) Subscribe(topic string)

func (*DummyAdapter) Unsubscribe

func (a *DummyAdapter) Unsubscribe(topic string)

type Option

type Option struct {
	// contains filtered or unexported fields
}

func O

func O(key string, value any) *Option

func (*Option) Key

func (o *Option) Key() string

func (*Option) Value

func (o *Option) Value() any

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL