pubsub

package
v0.0.0-...-a6598b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 18 Imported by: 7

README

pubsub

pubsub is the library that handles object in-memory storage, persistence of such data, and notification to other interested processes of changes to the data.

For details as to how pubsub works, please see IPC.md.

Documentation

Overview

Package pubsub provides access to publish/subscribe semantics and engine in a single host context across processes. The actual implementation of the publishing and subscribing are provided by an engine passed to `pubsub`.

This documentation covers:

* how to use pubsub in another module * how to implement a driver.

Usage

To use pubsub, you must first instantiate a pubsub instance, passing it a driver, and then use the instance. In general, there will be one pubsub instance per process, but that is not strictly necessary.

Once instantiated, you can retrieve a publisher or a subscriber from the `pubsub.PubSub`.

To instantiate pubsub:

import "github.com/lf-edge/eve/pkg/pillar/pubsub"
ps := pubsub.New(driver)

where `driver` is a `struct` that implements `pubsub.Driver`.

Included is the `SocketDriver`, which uses a Unix-domain socket to communicate between publishers and subscribers, and local directories to store persistent messages.

see the documentation for each element to understand its usage.

For example:

import (
  "github.com/lf-edge/eve/pkg/pillar/pubsub"
  "github.com/lf-edge/eve/pkg/pillar/pubsub/socketdriver"
)

func foo() {
  driver := socketdriver.SocketDriver{}
  ps := pubsub.New(&driver)
  pub, err := ps.Publish("my-agent", element)
  pub, err := ps.PublishPersistent("other-agent", element)
  sub, err := ps.Subscribe("my-agent", element, true, ctx)
}

Driver

The driver is responsible for implementing the underlying mechanics of publishing and subscribing. While `pubsub.PubSub` and its components - `Publication` and `Subscription` - handle the in-memory and diff aspects, the driver itself is responsible for communicating between the publisher and subscriber, and performing any persistence.

The driver is expected to implement the `Driver` interface, which primarily involves being able to return the `DriverPublisher` and `DriverSubscriber`. These in turn are used by `Publication` and `Subscription` to publish and subscribe messages.

The `DriverPublisher` and `DriverSubscriber` are expected to function as follows.

DriverPublisher

The `DriverPublisher` publishes messages and, optionally, persists them. It also can `Unpublish` messages, as well as `Load` all messages from persistence store. Finally, it must be able to set and clear a `restarted` flag/counter.

The actual interface is key-value pairs, where it either is requested to publish a key (string) and value (`interface{}`), or unpublish a key.

See the documentation for the `DriverPublisher` interface to learn more.

DriverSubscriber

The `DriverSubscriber` subscribes to messages. As with the `DriverPublisher`, the caller has no understanding of the underlying mechanism or semantics. Once started, the subscriber is expected to send any changes to the channel which was passed to it at startup. These changes are in the format of `pubsub.Change`, which encapsulates the change operation, key and value.

To listen to several subscriptions at the same time MultiChannelWatch can be used; to see how it can be used, have a look at usbmanager/subscriptions.go

See the documentation for the `DriverSubscriber` interface to learn more.

Index

Constants

View Source
const (
	// Global fixed string for a global subject, i.e. no agent
	Global = "global"
)

Variables

This section is empty.

Functions

func ConnReadCheck

func ConnReadCheck(conn net.Conn) error

ConnReadCheck waits till conn's fd is readable

func EnsureDir

func EnsureDir(dirname string) error

EnsureDir to make sure it exists

func MultiChannelWatch

func MultiChannelWatch(watches []ChannelWatch)

MultiChannelWatch allows listening to several receiving channels of different types at the same time this way the pubsub subscriptions can be managed in an array and be listened to all at once without requiring to write a big select statement

func TypeToName

func TypeToName(something interface{}) string

TypeToName given a particular object, get the desired name for it

Types

type Change

type Change struct {
	// Operation which operation is performed by this change
	Operation Operation
	// Key the key of the affected item, if any
	Key string
	// Value the value of the affected item, if any
	Value []byte
}

Change the message to go into a change channel

type ChannelWatch

type ChannelWatch struct {
	// Chan is the channel to watch for incoming data
	Chan reflect.Value
	// Callback is the function to call with that data (or empty if no data)
	Callback func(value interface{})
}

ChannelWatch describe a channel to watch and the callback to call

type Differ

type Differ interface {
	DetermineDiffs(localCollection LocalCollection) []string
}

Differ interface that updates a LocalCollection from previous state to current state, and returns a slice of keys that have changed

type Driver

type Driver interface {
	// Publisher return a `DriverPublisher` for the given name and topic.
	// The caller passes the `Updaters`, `Restarted` checker and `Differ`.
	// These can be used to:
	// * add to or remove from the updaters
	// * determine if the topic has been restarted
	// * diff the current known state from the target known state
	Publisher(global bool, name, topic string, persistent bool, updaterList *Updaters, restarted Restarted, differ Differ) (DriverPublisher, error)
	// Subscriber return a `DriverSubscriber` for the given name and topic.
	// This is expected to create a `DriverSubscriber`, but not start it.
	// Once started, when changes arrive, they should be published to the provided
	// channel. Each update to the channel is of type `Change`, which encapsulates
	// the operation, key and value.
	Subscriber(global bool, name, topic string, persistent bool, C chan Change) (DriverSubscriber, error)
	// DefaultName Return the default name to use for an agent, when the name
	// is not provided.
	DefaultName() string
}

Driver a backend driver for pubsub

type DriverPublisher

type DriverPublisher interface {
	// Start the publisher, if any startup is necessary.
	// This is expected to return immediately. If it needs to run in the
	// background, it is the responsibility of the driver to run it as a separate
	// goroutine.
	Start() error
	// Load current status from persistence. Usually called only on first start.
	// The implementation is responsible for determining if the load is necessary
	// or already has been performed. If it has been already, it should not change
	// anything. The caller has no knowledge of where the persistent state was
	// stored: disk, databases, or vellum. All it cares about is that it gets
	// a key-value list.
	Load() (map[string][]byte, int, error)
	// Publish a key-value pair to all subscribers and optionally persistence
	Publish(key string, item []byte) error
	// Unpublish a key, i.e. delete it and publish its deletion to all subscribers
	Unpublish(key string) error
	// Restart set the restartCounter for the topic. Zero implies no restart
	Restart(restartCounter int) error

	// Stop publishing
	// This is expected to return immediately.
	Stop() error

	// CheckMaxSize to see if it will fit
	CheckMaxSize(key string, val []byte) error

	// LargeDirName returns the directory to be used for large fields
	LargeDirName() string
}

DriverPublisher interface that a driver for publishing must implement

type DriverSubscriber

type DriverSubscriber interface {
	// Start subscribing to a name and topic and publish changes to the channel.
	// This is expected to return immediately. If it needs to run in the
	// background, it is the responsibility of the driver to run it as a separate
	// goroutine.
	Start() error

	// Load initial status from persistence. Usually called only on first start.
	// The implementation is responsible for determining if the load is necessary
	// or already has been performed. If it has been already, it should not change
	// anything. The caller has no knowledge of where the persistent state was
	// stored: disk, databases, or vellum. All it cares about is that it gets
	// a key-value list.
	Load() (map[string][]byte, int, error)

	// Stop subscribing to a name and topic
	// This is expected to return immediately.
	Stop() error

	// LargeDirName returns the directory to be used for large fields
	LargeDirName() string
}

DriverSubscriber interface that a driver for subscribing must implement

type EmptyDriver

type EmptyDriver struct{}

EmptyDriver struct

func (*EmptyDriver) DefaultName

func (e *EmptyDriver) DefaultName() string

DefaultName function

func (*EmptyDriver) Publisher

func (e *EmptyDriver) Publisher(global bool, name, topic string, persistent bool, updaterList *Updaters, restarted Restarted, differ Differ) (DriverPublisher, error)

Publisher function

func (*EmptyDriver) Subscriber

func (e *EmptyDriver) Subscriber(global bool, name, topic string, persistent bool, C chan Change) (DriverSubscriber, error)

Subscriber function

type EmptyDriverPublisher

type EmptyDriverPublisher struct{}

EmptyDriverPublisher struct

func (*EmptyDriverPublisher) CheckMaxSize

func (e *EmptyDriverPublisher) CheckMaxSize(key string, val []byte) error

CheckMaxSize function

func (*EmptyDriverPublisher) LargeDirName

func (e *EmptyDriverPublisher) LargeDirName() string

LargeDirName where to put large fields

func (*EmptyDriverPublisher) Load

func (e *EmptyDriverPublisher) Load() (map[string][]byte, int, error)

Load function

func (*EmptyDriverPublisher) Publish

func (e *EmptyDriverPublisher) Publish(key string, item []byte) error

Publish function

func (*EmptyDriverPublisher) Restart

func (e *EmptyDriverPublisher) Restart(restartCounter int) error

Restart function

func (*EmptyDriverPublisher) Start

func (e *EmptyDriverPublisher) Start() error

Start function

func (*EmptyDriverPublisher) Stop

func (e *EmptyDriverPublisher) Stop() error

Stop function

func (*EmptyDriverPublisher) Unpublish

func (e *EmptyDriverPublisher) Unpublish(key string) error

Unpublish function

type EmptyDriverSubscriber

type EmptyDriverSubscriber struct{}

EmptyDriverSubscriber struct

func (*EmptyDriverSubscriber) LargeDirName

func (e *EmptyDriverSubscriber) LargeDirName() string

LargeDirName where to put large fields

func (*EmptyDriverSubscriber) Load

func (e *EmptyDriverSubscriber) Load() (map[string][]byte, int, error)

Load function

func (*EmptyDriverSubscriber) Start

func (e *EmptyDriverSubscriber) Start() error

Start function

func (*EmptyDriverSubscriber) Stop

func (e *EmptyDriverSubscriber) Stop() error

Stop function

type Getter

type Getter interface {
	Get(key string) (interface{}, error)
}

Getter - Interface for pub/sub

type LocalCollection

type LocalCollection map[string][]byte

LocalCollection represents an entire local copy of a set of key-value pairs

type Notify

type Notify struct{}

Notify simple struct to pass notification messages

type Operation

type Operation byte

Operation type for a single change operation

const (
	// Restart operation is a restart
	Restart Operation = iota
	// Sync operation is a complete/sync of the initial content
	Sync
	// Delete operation is delete an existing key
	Delete
	// Modify operation is modify the value of an existing key
	Modify
)

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub is a system for publishing and subscribing to messages it manages the creation of Publication and Subscription, which handle the actual implementation of in-memory structures and logic the message passing and persistence are handled by a Driver. Should not be called directly. Instead use the `New()` function.

func New

func New(driver Driver, logger *logrus.Logger, log *base.LogObject) *PubSub

New create a new `PubSub` with a given `Driver`.

func (*PubSub) CheckMaxTimeTopic

func (p *PubSub) CheckMaxTimeTopic(agentName string, topic string, start time.Time,
	warnTime time.Duration, errTime time.Duration)

CheckMaxTimeTopic verifies if the time for a call has exeeded a reasonable number.

func (*PubSub) NewPublication

func (p *PubSub) NewPublication(options PublicationOptions) (Publication, error)

NewPublication creates a new Publication with given options

func (*PubSub) NewSubscription

func (p *PubSub) NewSubscription(options SubscriptionOptions) (Subscription, error)

NewSubscription creates a new Subscription with given options

func (*PubSub) RegisterFileWatchdog

func (p *PubSub) RegisterFileWatchdog(agentName string)

RegisterFileWatchdog tells the watchdog about the touch file

func (*PubSub) RegisterPidWatchdog

func (p *PubSub) RegisterPidWatchdog(agentName string)

RegisterPidWatchdog tells the watchdog about the pid file

func (*PubSub) StillRunning

func (p *PubSub) StillRunning(agentName string, warnTime time.Duration, errTime time.Duration)

StillRunning touches a file per agentName to signal the event loop is still running Those files are observed by the watchdog

type Publication

type Publication interface {
	// CheckMaxSize returns an error if the item is too large
	CheckMaxSize(key string, item interface{}) error
	// Publish - Publish an object
	Publish(key string, item interface{}) error
	// Unpublish - Delete / UnPublish an object
	Unpublish(key string) error
	// SignalRestarted - Signal the publisher has started one more time
	SignalRestarted() error
	// ClearRestarted clear the restarted flag
	ClearRestarted() error
	// Get - Lookup an object
	Get(key string) (interface{}, error)
	// GetAll - Get a copy of the objects.
	GetAll() map[string]interface{}
	// Iterate - Perform some action on all items
	Iterate(function base.StrMapFunc)
	// Close - delete the publisher
	Close() error
}

Publication - Interface to be implemented by a Publication

type PublicationImpl

type PublicationImpl struct {
	// contains filtered or unexported fields
}

PublicationImpl - Publication Implementation. The main structure that implements

Publication interface.

func (*PublicationImpl) CheckMaxSize

func (pub *PublicationImpl) CheckMaxSize(key string, item interface{}) error

CheckMaxSize returns an error if the item is too large and would result in a fatal if it was published

func (*PublicationImpl) ClearRestarted

func (pub *PublicationImpl) ClearRestarted() error

ClearRestarted clear the restart signal

func (*PublicationImpl) Close

func (pub *PublicationImpl) Close() error

Close the publisher

func (*PublicationImpl) DetermineDiffs

func (pub *PublicationImpl) DetermineDiffs(localCollection LocalCollection) []string

DetermineDiffs update a provided LocalCollection to the current state, and return the deleted keys before the added/modified ones

func (*PublicationImpl) Get

func (pub *PublicationImpl) Get(key string) (interface{}, error)

Get the value for a given key

func (*PublicationImpl) GetAll

func (pub *PublicationImpl) GetAll() map[string]interface{}

GetAll enumerate all the key-value pairs for the collection

func (*PublicationImpl) IsRestarted

func (pub *PublicationImpl) IsRestarted() bool

IsRestarted has this publication been set to "restarted"

func (*PublicationImpl) Iterate

func (pub *PublicationImpl) Iterate(function base.StrMapFunc)

Iterate - performs some callback function on all items

func (*PublicationImpl) Publish

func (pub *PublicationImpl) Publish(key string, item interface{}) error

Publish publish a key-value pair

func (*PublicationImpl) RestartCounter

func (pub *PublicationImpl) RestartCounter() int

RestartCounter number of times this this publication been set to "restarted"

func (*PublicationImpl) SignalRestarted

func (pub *PublicationImpl) SignalRestarted() error

SignalRestarted signal that a publication is restarted one more time

func (*PublicationImpl) Unpublish

func (pub *PublicationImpl) Unpublish(key string) error

Unpublish delete a key from the key-value map

type PublicationOptions

type PublicationOptions struct {
	AgentName  string
	AgentScope string
	TopicType  interface{}
	Persistent bool
}

PublicationOptions defines all the possible options a new publication may have

type Restarted

type Restarted interface {
	IsRestarted() bool
	RestartCounter() int
}

Restarted interface that lets you determine if a Publication has been restarted Returns zero if not; the count indicates the number of times it has restarted.

type SubCreateHandler

type SubCreateHandler func(ctx interface{}, key string, status interface{})

SubCreateHandler is a handler to handle creates

type SubDeleteHandler

type SubDeleteHandler func(ctx interface{}, key string, status interface{})

SubDeleteHandler is a handler to handle delete

type SubModifyHandler

type SubModifyHandler func(ctx interface{}, key string, status interface{},
	oldStatus interface{})

SubModifyHandler is a handler for modify notifications which carries the oldStatus

type SubRestartHandler

type SubRestartHandler func(ctx interface{}, restartCount int)

SubRestartHandler generic handler for restarts

type SubSyncHandler

type SubSyncHandler func(ctx interface{}, synchronized bool)

SubSyncHandler generic handler for synchronized

type Subscription

type Subscription interface {
	// Get - get / lookup an object by key
	Get(key string) (interface{}, error)
	// GetAll - Get a copy of the objects.
	GetAll() map[string]interface{}
	// Iterate - Perform some action on all items
	Iterate(function base.StrMapFunc)
	// Restarted report if this subscription has been marked as restarted
	Restarted() bool
	// RestartCounter reports how many times this subscription has been restarted
	RestartCounter() int
	// Synchronized report if this subscription has received initial items
	Synchronized() bool
	// ProcessChange - Invoked on the string msg from Subscription Channel
	ProcessChange(change Change)
	// MsgChan - Message Channel for Subscription
	MsgChan() <-chan Change
	// Activate starts the subscription
	Activate() error
	// Close stops the subscription and removes the state
	Close() error
}

Subscription - Interface to be implemented by a Subscription

type SubscriptionImpl

type SubscriptionImpl struct {
	C                   <-chan Change
	CreateHandler       SubCreateHandler
	ModifyHandler       SubModifyHandler
	DeleteHandler       SubDeleteHandler
	RestartHandler      SubRestartHandler
	SynchronizedHandler SubSyncHandler
	MaxProcessTimeWarn  time.Duration // If set generate warning if ProcessChange
	MaxProcessTimeError time.Duration // If set generate warning if ProcessChange
	Persistent          bool
	// contains filtered or unexported fields
}

SubscriptionImpl handle a subscription to a single agent+topic, optionally scope as well. Never should be instantiated directly. Rather, call `PubSub.Subscribe*`

func (*SubscriptionImpl) Activate

func (sub *SubscriptionImpl) Activate() error

Activate starts the subscription

func (*SubscriptionImpl) Close

func (sub *SubscriptionImpl) Close() error

Close stops the subscription and removes the content

func (*SubscriptionImpl) Get

func (sub *SubscriptionImpl) Get(key string) (interface{}, error)

Get - Get object with specified Key from this Subscription.

func (*SubscriptionImpl) GetAll

func (sub *SubscriptionImpl) GetAll() map[string]interface{}

GetAll - Enumerate all the key, value for the collection

func (*SubscriptionImpl) Iterate

func (sub *SubscriptionImpl) Iterate(function base.StrMapFunc)

Iterate - performs some callback function on all items

func (*SubscriptionImpl) MsgChan

func (sub *SubscriptionImpl) MsgChan() <-chan Change

MsgChan return the Message Channel for the Subscription.

func (*SubscriptionImpl) ProcessChange

func (sub *SubscriptionImpl) ProcessChange(change Change)

ProcessChange process a single change and its parameters. It calls the various handlers (if set) and updates the subscribed collection. The subscribed collection can be accessed using:

foo := s1.Get(key)
fooAll := s1.GetAll()

func (*SubscriptionImpl) RestartCounter

func (sub *SubscriptionImpl) RestartCounter() int

RestartCounter - Check how many times the Publisher has Restarted

func (*SubscriptionImpl) Restarted

func (sub *SubscriptionImpl) Restarted() bool

Restarted - Check if the Publisher has Restarted

func (*SubscriptionImpl) Synchronized

func (sub *SubscriptionImpl) Synchronized() bool

Synchronized -

func (*SubscriptionImpl) Topic

func (sub *SubscriptionImpl) Topic() string

Topic returns the string definiting the topic

type SubscriptionOptions

type SubscriptionOptions struct {
	CreateHandler  SubCreateHandler
	ModifyHandler  SubModifyHandler
	DeleteHandler  SubDeleteHandler
	RestartHandler SubRestartHandler
	SyncHandler    SubSyncHandler
	WarningTime    time.Duration
	ErrorTime      time.Duration
	AgentName      string
	AgentScope     string
	TopicImpl      interface{}
	Activate       bool
	Ctx            interface{}
	Persistent     bool
	MyAgentName    string // For logging
}

SubscriptionOptions options to pass when creating a Subscription

type Updaters

type Updaters struct {
	// contains filtered or unexported fields
}

Updaters list of channels to which notifications should be sent. Global across an entire `PubSub struct`. Can `Add()` and `Remove()`.

func (*Updaters) Add

func (u *Updaters) Add(log *base.LogObject, updater chan Notify, name string, instance int)

Add an updater

func (*Updaters) Remove

func (u *Updaters) Remove(log *base.LogObject, updater chan Notify)

Remove an updater

Directories

Path Synopsis
Package reverse provide a limited variant of pubsub where the subscriber creates the listener and the publisher connects.
Package reverse provide a limited variant of pubsub where the subscriber creates the listener and the publisher connects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL