v.io: v.io/x/ref/lib/pubsub Index | Examples | Files

package pubsub

import "v.io/x/ref/lib/pubsub"

Package pubsub defines interfaces for accessing dynamically changing process configuration information.

Settings represent configuration parameters and their value. Settings are published to named Streams. Streams are forked to add additional consumers, i.e. readers of the Settings published to the Stream.

Settings are represented by an interface type that wraps the data and provides a name and description for each Settings. Streams are similarly named and also have a description. When streams are 'forked' the latest value of all Settings that have been sent over the Stream are made available to the caller. This allows for a rendezvous between the single producer of Settings and multiple consumers of those Settings that may be added at arbitrary points in time.

Streams are hosted by a Publisher type, which in addition to the methods required for managing Streams provides a means to shut down all of the Streams it hosts.

Index

Examples

Package Files

model.go publisher.go types.go

func Format Uses

func Format(s Setting) string

Format formats a Setting in a consistent manner, it is intended to be used when implementing the Setting interface.

type Any Uses

type Any struct {
    // contains filtered or unexported fields
}

Type Any can be used to represent or implement a Setting of any type.

func (*Any) Description Uses

func (s *Any) Description() string

func (*Any) Name Uses

func (s *Any) Name() string

func (*Any) String Uses

func (s *Any) String() string

func (*Any) Value Uses

func (s *Any) Value() interface{}

type DurationFlag Uses

type DurationFlag struct{ time.Duration }

DurationFlag implements flag.Value in order to provide validation of duration values in the flag package.

func (DurationFlag) Get Uses

func (d DurationFlag) Get() interface{}

Implements flag.Value.Get

func (*DurationFlag) Set Uses

func (d *DurationFlag) Set(s string) error

Implements flag.Value.Set

func (DurationFlag) String Uses

func (d DurationFlag) String() string

Implements flag.Value.String

type Publisher Uses

type Publisher struct {
    // contains filtered or unexported fields
}

A Publisher provides a mechanism for communicating Settings from a set of producers to multiple consumers. Each such producer and associated consumers are called a Stream. Operations are provided for creating streams (CreateStream) and adding new consumers (ForkStream). Communication is implemented using channels, with the producer and consumers providing channels to send and receive Settings over. A Stream remembers the last value of all Settings that were sent over it; these can be retrieved via ForkStream or the Latest method.

The Publisher may be shut down by calling its Shutdown method and the producers will be notified via the channel returned by CreateStream, at which point they should close the channel they use for publishing Settings. If producers fail to close this channel then the Publisher will leak goroutines (one per stream) when it is shutdown.

Code:

in := make(chan pubsub.Setting)
pub := pubsub.NewPublisher()
pub.CreateStream("net", "network settings", in) //nolint:errcheck

// A simple producer of IP address settings.
producer := func() {
    in <- pubsub.NewString("ip", "address", "1.2.3.5")
}

var waiter sync.WaitGroup
waiter.Add(2)

// A simple consumer of IP address Settings.
consumer := func(ch chan pubsub.Setting) {
    fmt.Println(<-ch)
    waiter.Done()
}

// Publish an initial Setting to the Stream.
in <- pubsub.NewString("ip", "address", "1.2.3.4")

// Fork the stream twice, and read the latest value.
ch1 := make(chan pubsub.Setting)
st, _ := pub.ForkStream("net", ch1)
fmt.Println(st.Latest["ip"])
ch2 := make(chan pubsub.Setting)
st, _ = pub.ForkStream("net", ch2)
fmt.Println(st.Latest["ip"])

// Now we can read new Settings as they are generated.
go producer()
go consumer(ch1)
go consumer(ch2)

waiter.Wait()

Output:

ip: address: (string: 1.2.3.4)
ip: address: (string: 1.2.3.4)
ip: address: (string: 1.2.3.5)
ip: address: (string: 1.2.3.5)

func NewPublisher Uses

func NewPublisher() *Publisher

NewPublisher creates a Publisher.

func (*Publisher) CloseFork Uses

func (p *Publisher) CloseFork(name string, ch chan<- Setting) error

CloseFork removes the specified channel from the named stream. The caller must drain the channel before closing it. TODO(cnicolaou): add tests for this.

func (*Publisher) CreateStream Uses

func (p *Publisher) CreateStream(name, description string, ch <-chan Setting) (<-chan struct{}, error)

CreateStream creates a Stream with the provided name and description (note, Settings have their own names and description, these are for the stream). In general, no buffering is required for this channel since the Publisher itself will read from it, however, if the consumers are slow then the publisher may be slow in draining the channel. The publisher should provide additional buffering if this is a concern. Consequently this mechanism should be used for rarely changing Settings, such as network address changes forced by DHCP and hence no buffering will be required. The channel returned by CreateStream is closed when the publisher is shut down and hence the caller should wait for this to occur and then close the channel it has passed to CreateStream.

func (*Publisher) ForkStream Uses

func (p *Publisher) ForkStream(name string, ch chan<- Setting) (*Stream, error)

ForkStream 'forks' the named stream to add a new consumer. The channel provided is to be used to read Settings sent down the stream. This channel will be closed by the Publisher when it is asked to shut down. The reader on this channel must be able to keep up with the flow of Settings through the Stream in order to avoid blocking all other readers and hence should set an appropriate amount of buffering for the channel it passes in. ForkStream returns the most recent values of all Settings previously sent over the stream, thus allowing its caller to synchronise with the stream.

func (*Publisher) Latest Uses

func (p *Publisher) Latest(name string) *Stream

Latest returns information on the requested stream, including the last instance of all Settings, if any, that flowed over it.

func (*Publisher) Shutdown Uses

func (p *Publisher) Shutdown()

Shutdown initiates the process of stopping the operation of the Publisher. All of the channels passed to CreateStream must be closed by their owner to ensure that all goroutines are garbage collected.

Code:

in := make(chan pubsub.Setting)
pub := pubsub.NewPublisher()
stop, _ := pub.CreateStream("net", "network settings", in)

var producerReady sync.WaitGroup
producerReady.Add(1)
var consumersReady sync.WaitGroup
consumersReady.Add(2)

// A producer to write 100 Settings before signalling that it's
// ready to be shutdown. This is purely to demonstrate how to use
// Shutdown.
producer := func() {
    for i := 0; ; i++ {
        select {
        case <-stop:
            close(in)
            return
        default:
            in <- pubsub.NewString("ip", "address", "1.2.3.4")
            if i == 100 {
                producerReady.Done()
            }
        }
    }
}

var waiter sync.WaitGroup
waiter.Add(2)

consumer := func() {
    ch := make(chan pubsub.Setting, 10)
    pub.ForkStream("net", ch) //nolint:errcheck
    consumersReady.Done()
    i := 0
    for {
        if _, ok := <-ch; !ok {
            // The channel has been closed when the publisher
            // is asked to shut down.
            break
        }
        i++
    }
    if i >= 100 {
        // We've received at least 100 Settings as per the producer above.
        fmt.Println("done")
    }
    waiter.Done()
}

go consumer()
go consumer()
consumersReady.Wait()
go producer()
producerReady.Wait()
pub.Shutdown()
waiter.Wait()

Output:

done
done

func (*Publisher) String Uses

func (p *Publisher) String() string

String returns a string representation of the publisher, including the names and descriptions of all the streams it currently supports.

type Setting Uses

type Setting interface {
    String() string
    // Name returns the name of the Setting
    Name() string
    // Description returns the description of the Setting
    Description() string
    // Value returns the value of the Setting.
    Value() interface{}
}

Setting must be implemented by all data types to sent over Publisher streams.

func NewAny Uses

func NewAny(name, description string, value interface{}) Setting

func NewBool Uses

func NewBool(name, description string, value bool) Setting

func NewDuration Uses

func NewDuration(name, description string, value time.Duration) Setting

func NewFloat64 Uses

func NewFloat64(name, description string, value float64) Setting

func NewInt Uses

func NewInt(name, description string, value int) Setting

func NewInt64 Uses

func NewInt64(name, description string, value int64) Setting

func NewString Uses

func NewString(name, description string, value string) Setting

type Stream Uses

type Stream struct {
    Name, Description string
    // Latest is a map of Setting names to the Setting itself.
    Latest map[string]Setting
}

Stream is returned by Latest and includes the name and description for the stream and the most recent values of the Setting that flowed through it.

Package pubsub imports 4 packages (graph) and is imported by 16 packages. Updated 2020-10-10. Refresh now. Tools for package owners.