pubsub

package module
v0.0.0-...-ceac3b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 3 Imported by: 24

README

pubsub

GoDoc

PubSub publishes data to subscriptions. However, it can do so much more than just push some data to a subscription. Each subscription is placed in a tree. When data is published, it traverses the tree and finds each interested subscription. This allows for sophisticated filters and routing.

If you have any questions, or want to get attention for a PR or issue please reach out on the #logging-and-metrics channel in the cloudfoundry slack

Installation

go get code.cloudfoundry.org/go-pubsub

Subscription Trees

A subscription tree is a collection of subscriptions that are organized based on what data they want published to them. When subscribing, a path is provided to give directions to PubSub about where to store the subscription and what data should be published to it.

So for example, say there are three subscriptions with the following paths:

Name Path
sub-1 ["a", "b", "c"]
sub-2 ["a", "b", "d"]
sub-3 ["a", "b"]

After both subscriptions have been registered PubSub will have the following subscription tree:

              a
              |
              b   <-(sub-3)
             / \
(sub-1)->   c   d   <-(sub-2)

To better draw out each's subscriptions view of the tree:

Sub-1
              a
              |
              b
             /
(sub-1)->   c
Sub-2
              a
              |
              b
               \
                d   <-(sub-2)
Sub-3
              a
              |
              b   <-(sub-3)

So to give a few exapmles of how data could be published:

Single path
              a
              |
              b   <-(sub-3)
               \
                d   <-(sub-2)

In this example both sub-2 and sub-3 would have the data written to it.

Multi-Path
              a
              |
              b   <-(sub-3)
             / \
(sub-1)->   c   d   <-(sub-2)

In this example all sub-1, sub-2 and sub-3 would have the data written to it.

Shorter Path
              a
              |
              b   <-(sub-3)

In this example only sub-3 would have the data written to it.

Other Path
              x
              |
              y

In this example, no subscriptions would have data written to them.

Simple Example:

ps := pubsub.New()
subscription := func(name string) pubsub.SubscriptionFunc {
	return func(data interface{}) {
		fmt.Printf("%s -> %v\n", name, data)
	}
}

ps.Subscribe(subscription("sub-1"), []string{"a", "b", "c"})
ps.Subscribe(subscription("sub-2"), []string{"a", "b", "d"})
ps.Subscribe(subscription("sub-3"), []string{"a", "b", "e"})

ps.Publish("data-1", pubsub.LinearTreeTraverser([]string{"a", "b"}))
ps.Publish("data-2", pubsub.LinearTreeTraverser([]string{"a", "b", "c"}))
ps.Publish("data-3", pubsub.LinearTreeTraverser([]string{"a", "b", "d"}))
ps.Publish("data-3", pubsub.LinearTreeTraverser([]string{"x", "y"}))

// Output:
// sub-1 -> data-2
// sub-2 -> data-3

In this example the LinearTreeTraverser is used to traverse the tree of subscriptions. When an interested subscription is found (in this case sub-1 and sub-2 for data-2 and data-3 respectively), the subscription is handed the data.

More complex examples can be found in the examples directory.

TreeTraversers

A TreeTraverser is used to traverse the subscription tree and find what subscriptions should have the data published to them. There are a few implementations provided, however it is likely a user will need to implement their own to suit their data.

When creating a TreeTraverser it is important to note how the data is structured. A TreeTraverser must be deterministic and ideally stateless. The order the data is parsed and returned (via Traverse()) must align with the given path of Subscribe().

This means if the TreeTraverser intends to look at field A, then B, and then finally C, then the subscription path must be A, B and then C (and not B, A, C or something).

Subscriptions

A Subscription is used when publishing data. The given path is used to determine it's placement in the subscription tree.

Code Generation

The tree traversers and subscriptions are quite complicated. Laying out a tree structure is not something humans are going to find natural. Therefore a generator is provided for structs.

The struct is inspected (at go generate time) and creates the tree layout code. There is a provided example.

Documentation

Overview

Package pubsub provides a library that implements the Publish and Subscribe model. Subscriptions can subscribe to complex data patterns and data will be published to all subscribers that fit the criteria.

Each Subscription when subscribing will walk the underlying subscription tree to find its place in the tree. The given path when subscribing is used to analyze the Subscription and find the correct node to store it in.

As data is published, the TreeTraverser analyzes the data to determine what nodes the data belongs to. Data is written to multiple subscriptions. This means that when data is published, the system can traverse multiple paths for the data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PathAndTraverser

type PathAndTraverser struct {
	Path      uint64
	Traverser TreeTraverser
}

PathAndTraverser is a path and traverser pair.

type Paths

type Paths func(idx int, data interface{}) (path uint64, nextTraverser TreeTraverser, ok bool)

Paths is returned by a TreeTraverser. It describes how the data is both assigned and how to continue to analyze it. At will be called with idx ranging from [0, n] where n is the number of valid paths. This means that the Paths needs to be prepared for an idx that is greater than it has valid data for.

If nextTraverser is nil, then the previous TreeTraverser is used.

func CombinePaths

func CombinePaths(p ...Paths) Paths

CombinePaths takes several paths and flattens it into a single path.

func FlatPaths

func FlatPaths(p []uint64) Paths

FlatPaths implements Paths for a slice of paths. It returns nil for all nextTraverser meaning to use the given TreeTraverser.

func PathAndTraversers

func PathAndTraversers(t []PathAndTraverser) Paths

PathsWithTraverser implement Paths and allow a TreeTraverser to have multiple paths with multiple traversers.

func PathsWithTraverser

func PathsWithTraverser(paths []uint64, a TreeTraverser) Paths

PathsWithTraverser implements Paths for both a slice of paths and a single TreeTraverser. Each path will return the given TreeTraverser.

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub uses the given SubscriptionEnroller to create the subscription tree. It also uses the TreeTraverser to then write to the subscriber. All of PubSub's methods safe to access concurrently. PubSub should be constructed with New().

func New

func New(opts ...PubSubOption) *PubSub

New constructs a new PubSub.

func (*PubSub) Publish

func (s *PubSub) Publish(d interface{}, a TreeTraverser)

Publish writes data using the TreeTraverser to the interested subscriptions.

func (*PubSub) Subscribe

func (s *PubSub) Subscribe(sub Subscription, opts ...SubscribeOption) Unsubscriber

Subscribe will add a subscription to the PubSub. It returns a function that can be used to unsubscribe. Options can be provided to configure the subscription and its interactions with published data.

type PubSubOption

type PubSubOption interface {
	// contains filtered or unexported methods
}

PubSubOption is used to configure a PubSub.

func WithDeterministicHashing

func WithDeterministicHashing(hashFunction func(interface{}) uint64) PubSubOption

WithDeterministicHashing configures a PubSub that will use the given function to hash each published data point. The hash is used only for a subscription that has set its deterministic routing name.

func WithNoMutex

func WithNoMutex() PubSubOption

WithNoMutex configures a PubSub that does not have any internal mutexes. This is useful if more complex or custom locking is required. For example, if a subscription needs to subscribe while being published to.

func WithRand

func WithRand(int63 func(max int64) int64) PubSubOption

WithRand configures a PubSub that will use the given function to make sharding decisions. The given function has to match the symantics of math/rand.Int63n.

type SubscribeOption

type SubscribeOption interface {
	// contains filtered or unexported methods
}

SubscribeOption is used to configure a subscription while subscribing.

func WithDeterministicRouting

func WithDeterministicRouting(name string) SubscribeOption

WithDeterministicRouting configures a subscription to have a deterministic routing name. A PubSub configured to use deterministic hashing will use this name and the subscription's shard ID to maintain consistent routing.

func WithPath

func WithPath(path []uint64) SubscribeOption

WithPath configures a subscription to reside at a path. The path determines what data the subscription is interested in. This value should be correspond to what the publishing TreeTraverser yields. It defaults to nil (meaning it gets everything).

func WithShardID

func WithShardID(shardID string) SubscribeOption

WithShardID configures a subscription to have a shardID. Subscriptions with a shardID are sharded to any subscriptions with the same shardID and path. Defaults to an empty shardID (meaning it does not shard).

type Subscription

type Subscription func(data interface{})

Subscription is a subscription that will have corresponding data written to it.

type TreeTraverser

type TreeTraverser func(data interface{}) Paths

TreeTraverser publishes data to the correct subscriptions. Each data point can be published to several subscriptions. As the data traverses the given paths, it will write to any subscribers that are assigned there. Data can go down multiple paths (i.e., len(paths) > 1).

Traversing a path ends when the return len(paths) == 0. If len(paths) > 1, then each path will be traversed.

func LinearTreeTraverser

func LinearTreeTraverser(a []uint64) TreeTraverser

LinearTreeTraverser implements TreeTraverser on behalf of a slice of paths. If the data does not traverse multiple paths, then this works well.

type Unsubscriber

type Unsubscriber func()

Unsubscriber is returned by Subscribe. It should be invoked to remove a subscription from the PubSub.

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL