pipeline

package
v0.0.0-...-967d409 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0 Imports: 4 Imported by: 7

Documentation

Overview

Package pipeline describes a metrics reporting pipeline that accepts reports as input and eventually delivers them (possibly after aggregation) to one or more downstream services. A pipeline generally consists of a collection of aggregators, dispatchers, and endpoints wrapped in RetryingSender objects. Metric values can be provided by an external source (reported via an API), or can be generated by a component such as a heartbeat.

-> Aggregator -> ...        -> RetryingSender -> Endpoint A

HTTP -> Selector -> Aggregator -> Dispatcher -> RetryingSender -> Endpoint B

-> Aggregator -> ...        -> RetryingSender -> Endpoint C

   Heartbeat  -> Dispatcher -> RetryingSender -> Endpoint A

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ReleaseAll

func ReleaseAll(components []Component) error

ReleaseAll calls Release on all of the given Components in parallel, returning a multierror if one or more calls fail, or nil if all succeed.

Types

type Component

type Component interface {
	// Use registers new usage of this component. Use should be called whenever this component is
	// added downstream of some other component. When no longer used, Release should be called.
	Use()

	// Release is called when the caller is no longer using this component. If the component's usage
	// count reaches 0 due to this release, it should perform the following steps in order:
	// 1. Decrement the usage counter. If the usage counter is still greater than 0, return nil.
	// 2. Gracefully shutdown background processes and wait for completion. Following this step,
	//    no data shall be sent from this component to downstream components.
	// 3. Call Release on all downstream components, waiting for their release operations to
	//    complete.
	//
	// As a result, calling Release on all of the pipeline Input components should result in a graceful
	// shutdown of all Components in the correct order.
	//
	// Release returns an error if it or any of its downstream components generate one.
	Release() error
}

Component represents a single component in a pipeline. Components can be used downstream of multiple other components, enabling creation of fork/join pipeline patterns. Because of this, components implement a reference counting strategy that determines when they should clean up underlying resources.

type Endpoint

type Endpoint interface {
	// Endpoint is a Component.
	Component

	// Name returns the name of this endpoint. The name must be unique across all endpoints in the
	// system, and should be constant across restarts of the agent. There can be multiple instances
	// of the same type of endpoint with different names.
	Name() string

	// Send sends the given EndpointReport - previously built by this endpoint - to the reporting
	// service.
	Send(EndpointReport) error

	// BuildReport builds an EndpointReport from the given StampedMetricReport, optionally attaching
	// context.
	BuildReport(report metrics.StampedMetricReport) (EndpointReport, error)

	// IsTransient returns true if the given error indicates that the operation failed due to some
	// transient error and can be retried.
	IsTransient(error) bool
}

Endpoint represents a metric reporting endpoint that the agent reports to. For example, Cloud Service Control or PubSub.

type EndpointReport

type EndpointReport struct {
	metrics.StampedMetricReport `json:",inline"`
	Context                     json.RawMessage
}

EndpointReport is a metrics.StampedMetricReport containing optional endpoint-specific context that can be used to help ensure idempotence across retries. For example, if a reporting service requires a unique ID or timestamp that remains the same during each retry so that requests can be deduplicated, that identifier can be generated in BuildReport, persisted in the EndpointReport's context, and resent with each retry.

func NewEndpointReport

func NewEndpointReport(report metrics.StampedMetricReport, context interface{}) (EndpointReport, error)

func (*EndpointReport) UnmarshalContext

func (er *EndpointReport) UnmarshalContext(ctx interface{}) error

UnmarshalContext unmarshals an EndpointReport's context into the given struct.

type Input

type Input interface {
	// Input is also a Component.
	Component

	// AddReport adds a report to the pipeline. It returns an error if one is known immediately,
	// such as a report that refers to unknown metrics. See aggregator.Aggregator.
	AddReport(metrics.MetricReport) error
}

Input represents a Component that accepts reports from an external source.

type InputAdapter

type InputAdapter struct {
	Sender Sender
}

Type InputAdapter is an Input that converts incoming reports to StampedMetricReport objects and sends them directly to a delegate Sender.

func (*InputAdapter) AddReport

func (a *InputAdapter) AddReport(report metrics.MetricReport) error

func (*InputAdapter) Release

func (a *InputAdapter) Release() error

func (*InputAdapter) Use

func (a *InputAdapter) Use()

type Sender

type Sender interface {
	// Sender is a Component.
	Component

	// Send sends the report downstream. The behavior of the Send operation depends on the type of
	// sender. Some implementations - the Dispatcher, for instance - simply forward the Send to
	// subsequent Senders. Others - like the RetryingSender - may queue the report and attempt to
	// send it at a later time.
	//
	// An error indicates that something failed quickly, but it does not
	// indicate that the operation failed completely (i.e., some senders behind a Dispatcher may have
	// succeeded). Likewise, the lack of an error response does not indicate that the Send operation
	// succeeded, due to the asynchronous nature of a RetryingSender.
	Send(report metrics.StampedMetricReport) error

	// Endpoints returns the transitive list of endpoints that this sender will ultimately send to.
	Endpoints() []string
}

A Sender handles sending StampedMetricReports to remote endpoints.

type Source

type Source interface {

	// Shutdown instructs the source to stop sending metric data, release any held components, and
	// clean up resources.
	Shutdown() error
}

Source represents an autonomous metric data source that runs within the Agent.

type UsageTracker

type UsageTracker struct {
	// contains filtered or unexported fields
}

Type UsageTracker is a utility that helps track the usage of a Component. It provides Use and Release methods, and calls a close function when Release decrements the usage count to 0.

func (*UsageTracker) Release

func (u *UsageTracker) Release(close func() error) error

func (*UsageTracker) Use

func (u *UsageTracker) Use()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL