luci: Index | Files | Directories

package butler

import ""

Package butler is the main engine for the Butler executable. It is an internal package to support


Package Files

butler.go doc.go stream.go


const (
    // DefaultMaxBufferAge is the default amount of time that a log entry may
    // be buffered before being dispatched.
    DefaultMaxBufferAge = time.Duration(5 * time.Second)

    // DefaultOutputWorkers is the default number of output workers to use.
    DefaultOutputWorkers = 16

type Butler Uses

type Butler struct {
    // contains filtered or unexported fields

Butler is the Butler application structure. The Butler runs until closed. During operation, it acts as a service manager and data router, routing: - Messages from Streams to the attached Output. - Streams from a StreamServer to the Stream list (AddStream).

func New Uses

func New(ctx context.Context, config Config) (*Butler, error)

New instantiates a new Butler instance and starts its processing.

func (*Butler) Activate Uses

func (b *Butler) Activate()

Activate notifies the Butler that its current stream load is sufficient. This enables it to exit Run when it reaches a stream count of zero. Prior to activation, the Butler would block in Run regardless of stream count.

func (*Butler) AddStream Uses

func (b *Butler) AddStream(rc io.ReadCloser, d *logpb.LogStreamDescriptor) error

AddStream adds a Stream to the Butler. This is goroutine-safe.

If no error is returned, the Butler assumes ownership of the supplied stream. The stream will be closed when processing is finished.

If an error is occurred, the caller is still the owner of the stream and is responsible for closing it.

func (*Butler) AddStreamServer Uses

func (b *Butler) AddStreamServer(streamServer streamserver.StreamServer)

AddStreamServer adds a StreamServer to the Butler. This is goroutine-safe and may be called anytime before or during Butler execution.

After this call completes, the Butler assumes ownership of the StreamServer.

func (*Butler) Streams Uses

func (b *Butler) Streams() []types.StreamName

Streams returns a sorted list of stream names that have been registered to the Butler.

func (*Butler) Wait Uses

func (b *Butler) Wait() error

Wait blocks until the Butler instance has completed, returning with the Butler's return code.

type Config Uses

type Config struct {
    // Output is the output instance to use for log dispatch.
    Output output.Output
    // OutputWorkers is the number of simultaneous goroutines that will be used
    // to output Butler log data. If zero, DefaultOutputWorkers will be used.
    OutputWorkers int

    // Project is the project that the log stream will be bound to.
    Project types.ProjectName
    // Prefix is the log stream common prefix value.
    Prefix types.StreamName

    // GlobalTags are a set of global log stream tags to apply to individual
    // streams on registration. Individual stream tags will override tags with
    // the same key.
    GlobalTags streamproto.TagMap

    // BufferLogs, if true, instructs the butler to buffer collected log data
    // before sending it to Output.
    BufferLogs bool
    // If buffering logs, this is the maximum amount of time that a log will
    // be buffered before being marked for dispatch. If this is zero,
    // DefaultMaxBufferAge will be used.
    MaxBufferAge time.Duration

    // NoWrapStreamRegistrationCallback provides a way to opt-out of wrapping
    // StreamRegistrationCallback to call on LogEntries without buffering them
    // to guarantee full LogEntries.
    NoWrapStreamRegistrationCallback bool

    // StreamRegistrationCallback is called on new streams and returns a callback
    // to attach to the stream or nil if no callback is desired.
    // The callback is by default wrapped internally to buffer LogEntries until
    // they're guaranteed complete. This behavior can be turned off explicitly
    // with NoWrapStreamRegistrationCallback above.
    // In wrapped callbacks for text and datagram streams, LogEntry .TimeOffset,
    // and .PrefixIndex will be 0. .StreamIndex and .Sequence WILL NOT correspond
    // to the values that the logdog service sees. They will, however, be
    // internally consistent within the stream.
    // Wrapped datagram streams never send a partial datagram; If the logdog
    // server or stream is shut down while we have a partial datagram buffered,
    // the partially buffered datagram will not be observed by the buffered
    // callback.
    // Wrapping a binary stream is a noop (i.e. your callback will see the exact
    // same values wrapped and unwrapped).
    // When the stream ends (either due to EOF from the user, or when the butler
    // is stopped), your callback will be invoked exactly once with `nil`.
    // Expects passed *logpb.LogStreamDescriptor reference to be safe to keep, and
    // should treat it as read-only.
    StreamRegistrationCallback func(*logpb.LogStreamDescriptor) bundler.StreamChunkCallback

Config is the set of Butler configuration parameters.

func (*Config) Validate Uses

func (c *Config) Validate() error

Validate validates that the configuration is sufficient to instantiate a Butler instance.


bootstrapPackage bootstrap handles Butler-side bootstrapping functionality.
buffered_callbackPackage buffered_callback provides functionality to wrap around LogEntry callbacks to guarantee calling only on complete LogEntries, because the LogDog bundler produces fragmented LogEntries under normal operation, in order to meet time or buffer size requirements.
bundlerPackage bundler is responsible for efficiently transforming aggregate stream data into Butler messages for export.
outputPackage output contains interfaces and implementations for Butler Outputs, which are responsible for delivering Butler protobufs to LogDog collection endpoints.
output/logPackage log implements the "log" Output.
output/pubsubPackage pubsub implements the "pubsub" Output.

Package butler imports 21 packages (graph) and is imported by 6 packages. Updated 2019-08-25. Refresh now. Tools for package owners.