luci: go.chromium.org/luci/logdog/client/butler/bundler Index | Files

package bundler

import "go.chromium.org/luci/logdog/client/butler/bundler"

Package bundler is responsible for efficiently transforming aggregate stream data into Butler messages for export.

A process will instantiate a single Bundler instance. The Bundler manages an elastic set of Stream instances, each of which contains state for a single log Stream.

Each Stream instance will have sequential stream binary data appended to it via Append, which it will collect and organize for export as a series of ButlerLogBundle_Entry protobufs. Streams operate independently and buffer data until it is consumed by their Bundler instance. If a Stream's buffer is full, the Stream will block on appending data, which will, in turn, block its data source.

The Bundler owns the various Stream instances. When its Next() method is called, it will sort through the stream instances to prepare an optimally-sized ButlerLogBundle protobuf for export. The construction of this bundle may block pending data, and may be subject to various data urgency requests.

The Bundler acknowledges the following constraints:

- Data enqueued into a Stream should be exported within a specific period
  of time from its introduction
- The exported ButlerLogBundle protobuf must not exceed a maximum bundle
  size constraint.
- Stream data may be added during the bundling process, and should be
  acknowledged if possible.

When a Stream is finished, its Close method should be called. This alerts the Stream that it will receive no more data, causing it to export a terminal ButlerLogBundle and unregister from the Bundler.

The Bundler may block via its CloseAndFinish() method until all Streams are drained and cleared.

Index

Package Files

binaryParser.go builder.go bundler.go counter.go data.go datagramParser.go doc.go parser.go sizer.go stream.go textParser.go

type Bundler Uses

type Bundler struct {
    // contains filtered or unexported fields
}

Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for stream registration and bundle consumption.

func New Uses

func New(c Config) *Bundler

New instantiates a new Bundler instance.

func (*Bundler) CloseAndFlush Uses

func (b *Bundler) CloseAndFlush()

CloseAndFlush closes the Bundler, alerting it that no more streams will be added and that existing data may be aggressively output.

CloseAndFlush will block until all buffered data has been consumed.

func (*Bundler) GetStreamDescs Uses

func (b *Bundler) GetStreamDescs() map[string]*logpb.LogStreamDescriptor

GetStreamDescs returns the set of registered stream names mapped to their descriptors.

This is intended for testing purposes. DO NOT modify the resulting descriptors.

func (*Bundler) Next Uses

func (b *Bundler) Next() *logpb.ButlerLogBundle

Next returns the next bundle, blocking until it is available.

func (*Bundler) Register Uses

func (b *Bundler) Register(p *streamproto.Properties) (Stream, error)

Register adds a new stream to the Bundler, returning a reference to the registered stream.

The Bundler takes ownership of the supplied Properties, and may modify them as needed.

type Config Uses

type Config struct {
    // Clock is the clock instance that will be used for Bundler and stream
    // timing.
    Clock clock.Clock

    // Project is the project to use.
    Project types.ProjectName
    // Prefix is the common prefix for this set of streams.
    Prefix types.StreamName

    // MaxBufferedBytes is the maximum number of bytes to buffer in memory per
    // stream.
    MaxBufferedBytes int64

    // MaxBundleSize is the maximum bundle size in bytes that may be generated.
    //
    // If this value is zero, no size constraint will be applied to generated
    // bundles.
    MaxBundleSize int

    // MaxBufferDelay is the maximum amount of time we're willing to buffer
    // bundled data. Other factors can cause the bundle to be sent before this,
    // but it is an upper bound.
    MaxBufferDelay time.Duration
}

Config is the Bundler configuration.

type Data Uses

type Data interface {
    chunkstream.Chunk

    // Bind resizes the Chunk buffer and records a timestamp to associate with the
    // data chunk.
    Bind(int, time.Time) Data

    // Timestamp returns the bound timestamp. This will be zero if no timestamp
    // has been bound.
    Timestamp() time.Time
}

Data is a reusable data buffer that is used by Stream instances to ingest data.

Data is initially an empty buffer. Once data is loaded into it, the buffer is resized to the bound data and a timestamp is attached via Bind.

type Stream Uses

type Stream interface {
    // LeaseData allocates and returns a Data block that stream data can be
    // loaded into. The caller should Release() the Data, or transfer ownership to
    // something that will (e.g., Append()).
    //
    // If the leased data is not Released, it is merely inefficient, not fatal.
    LeaseData() Data

    // Append adds a sequential chunk of data to the Stream. Append may block if
    // the data isn't ready to be consumed.
    //
    // Append takes ownership of the data regardless of whether or not it returns
    // an error. The supplied Data must not be referenced after calling Append.
    Append(Data) error

    // Close closes the Stream, flushing any remaining data.
    Close()
}

Stream is an individual Bundler Stream. Data is added to the Stream as a series of ordered binary chunks.

A Stream is not goroutine-safe.

Package bundler imports 23 packages (graph) and is imported by 2 packages. Updated 2018-08-21. Refresh now. Tools for package owners.