bundle

package
v4.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package bundle contains singletons referenced throughout the Benthos codebase that allow imported components to add their constructors and documentation to a service.

Each component type has it's own singleton bundle containing all imported implementations of the component, and from this bundle more can be derived that modify the components that are available.

Index

Constants

This section is empty.

Variables

View Source
var AllBuffers = &BufferSet{
	specs: map[string]bufferSpec{},
}

AllBuffers is a set containing every single buffer that has been imported.

View Source
var AllCaches = &CacheSet{
	specs: map[string]cacheSpec{},
}

AllCaches is a set containing every single cache that has been imported.

View Source
var AllInputs = &InputSet{
	specs: map[string]inputSpec{},
}

AllInputs is a set containing every single input that has been imported.

View Source
var AllMetrics = &MetricsSet{
	specs: map[string]metricsSpec{},
}

AllMetrics is a set containing every single metrics that has been imported.

View Source
var AllOutputs = &OutputSet{
	specs: map[string]outputSpec{},
}

AllOutputs is a set containing every single output that has been imported.

View Source
var AllProcessors = &ProcessorSet{
	specs: map[string]processorSpec{},
}

AllProcessors is a set containing every single processor that has been imported.

View Source
var AllRateLimits = &RateLimitSet{
	specs: map[string]rateLimitSpec{},
}

AllRateLimits is a set containing every single ratelimit that has been imported.

View Source
var AllTracers = &TracerSet{
	specs: map[string]tracerSpec{},
}

AllTracers is a set containing every single tracer that has been imported.

View Source
var GlobalEnvironment = &Environment{
	buffers:    AllBuffers,
	caches:     AllCaches,
	inputs:     AllInputs,
	outputs:    AllOutputs,
	processors: AllProcessors,
	rateLimits: AllRateLimits,
	metrics:    AllMetrics,
	tracers:    AllTracers,
}

GlobalEnvironment contains service-wide singleton bundles.

Functions

This section is empty.

Types

type BufferConstructor

type BufferConstructor func(buffer.Config, NewManagement) (buffer.Streamed, error)

BufferConstructor constructs an buffer component.

type BufferSet

type BufferSet struct {
	// contains filtered or unexported fields
}

BufferSet contains an explicit set of buffers available to a Benthos service.

func (*BufferSet) Add

func (s *BufferSet) Add(constructor BufferConstructor, spec docs.ComponentSpec) error

Add a new buffer to this set by providing a spec (name, documentation, and constructor).

func (*BufferSet) Docs

func (s *BufferSet) Docs() []docs.ComponentSpec

Docs returns a slice of buffer specs, which document each method.

func (*BufferSet) DocsFor

func (s *BufferSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*BufferSet) Init

func (s *BufferSet) Init(conf buffer.Config, mgr NewManagement) (buffer.Streamed, error)

Init attempts to initialise an buffer from a config.

type CacheConstructor

type CacheConstructor func(cache.Config, NewManagement) (cache.V1, error)

CacheConstructor constructs an cache component.

type CacheSet

type CacheSet struct {
	// contains filtered or unexported fields
}

CacheSet contains an explicit set of caches available to a Benthos service.

func (*CacheSet) Add

func (s *CacheSet) Add(constructor CacheConstructor, spec docs.ComponentSpec) error

Add a new cache to this set by providing a spec (name, documentation, and constructor).

func (*CacheSet) Docs

func (s *CacheSet) Docs() []docs.ComponentSpec

Docs returns a slice of cache specs, which document each method.

func (*CacheSet) DocsFor

func (s *CacheSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*CacheSet) Init

func (s *CacheSet) Init(conf cache.Config, mgr NewManagement) (cache.V1, error)

Init attempts to initialise an cache from a config.

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

Environment is a collection of Benthos component plugins that can be used in order to build and run streaming pipelines with access to different sets of plugins. This is useful for sandboxing, testing, etc.

func NewEnvironment

func NewEnvironment() *Environment

NewEnvironment creates an empty environment.

func (*Environment) BufferAdd

func (e *Environment) BufferAdd(constructor BufferConstructor, spec docs.ComponentSpec) error

BufferAdd adds a new buffer to this environment by providing a constructor and documentation.

func (*Environment) BufferDocs

func (e *Environment) BufferDocs() []docs.ComponentSpec

BufferDocs returns a slice of buffer specs, which document each method.

func (*Environment) BufferInit

func (e *Environment) BufferInit(conf buffer.Config, mgr NewManagement) (buffer.Streamed, error)

BufferInit attempts to initialise a buffer from a config.

func (*Environment) CacheAdd

func (e *Environment) CacheAdd(constructor CacheConstructor, spec docs.ComponentSpec) error

CacheAdd adds a new cache to this environment by providing a constructor and documentation.

func (*Environment) CacheDocs

func (e *Environment) CacheDocs() []docs.ComponentSpec

CacheDocs returns a slice of cache specs, which document each method.

func (*Environment) CacheInit

func (e *Environment) CacheInit(conf cache.Config, mgr NewManagement) (cache.V1, error)

CacheInit attempts to initialise a cache from a config.

func (*Environment) Clone

func (e *Environment) Clone() *Environment

Clone an existing environment to a new one that can be modified independently.

func (*Environment) GetDocs

func (e *Environment) GetDocs(name string, ctype docs.Type) (docs.ComponentSpec, bool)

GetDocs returns a documentation spec for an implementation of a component.

func (*Environment) InputAdd

func (e *Environment) InputAdd(constructor InputConstructor, spec docs.ComponentSpec) error

InputAdd adds a new input to this environment by providing a constructor and documentation.

func (*Environment) InputDocs

func (e *Environment) InputDocs() []docs.ComponentSpec

InputDocs returns a slice of input specs, which document each method.

func (*Environment) InputInit

func (e *Environment) InputInit(conf input.Config, mgr NewManagement) (input.Streamed, error)

InputInit attempts to initialise an input from a config.

func (*Environment) MetricsAdd

func (e *Environment) MetricsAdd(constructor MetricConstructor, spec docs.ComponentSpec) error

MetricsAdd adds a new metrics exporter to this environment by providing a constructor and documentation.

func (*Environment) MetricsDocs

func (e *Environment) MetricsDocs() []docs.ComponentSpec

MetricsDocs returns a slice of metrics exporter specs.

func (*Environment) MetricsInit

func (e *Environment) MetricsInit(conf metrics.Config, nm NewManagement) (*metrics.Namespaced, error)

MetricsInit attempts to initialise a metrics exporter from a config.

func (*Environment) OutputAdd

func (e *Environment) OutputAdd(constructor OutputConstructor, spec docs.ComponentSpec) error

OutputAdd adds a new output to this environment by providing a constructor and documentation.

func (*Environment) OutputDocs

func (e *Environment) OutputDocs() []docs.ComponentSpec

OutputDocs returns a slice of output specs, which document each method.

func (*Environment) OutputInit

func (e *Environment) OutputInit(
	conf output.Config,
	mgr NewManagement,
	pipelines ...processor.PipelineConstructorFunc,
) (output.Streamed, error)

OutputInit attempts to initialise a output from a config.

func (*Environment) ProcessorAdd

func (e *Environment) ProcessorAdd(constructor ProcessorConstructor, spec docs.ComponentSpec) error

ProcessorAdd adds a new processor to this environment by providing a constructor and documentation.

func (*Environment) ProcessorDocs

func (e *Environment) ProcessorDocs() []docs.ComponentSpec

ProcessorDocs returns a slice of processor specs, which document each method.

func (*Environment) ProcessorInit

func (e *Environment) ProcessorInit(conf processor.Config, mgr NewManagement) (processor.V1, error)

ProcessorInit attempts to initialise a processor from a config.

func (*Environment) RateLimitAdd

func (e *Environment) RateLimitAdd(constructor RateLimitConstructor, spec docs.ComponentSpec) error

RateLimitAdd adds a new ratelimit to this environment by providing a constructor and documentation.

func (*Environment) RateLimitDocs

func (e *Environment) RateLimitDocs() []docs.ComponentSpec

RateLimitDocs returns a slice of ratelimit specs, which document each method.

func (*Environment) RateLimitInit

func (e *Environment) RateLimitInit(conf ratelimit.Config, mgr NewManagement) (ratelimit.V1, error)

RateLimitInit attempts to initialise a ratelimit from a config.

func (*Environment) TracersAdd

func (e *Environment) TracersAdd(constructor TracerConstructor, spec docs.ComponentSpec) error

TracersAdd adds a new tracers exporter to this environment by providing a constructor and documentation.

func (*Environment) TracersDocs

func (e *Environment) TracersDocs() []docs.ComponentSpec

TracersDocs returns a slice of tracers exporter specs.

func (*Environment) TracersInit

func (e *Environment) TracersInit(conf tracer.Config, nm NewManagement) (trace.TracerProvider, error)

TracersInit attempts to initialise a tracers exporter from a config.

type InputConstructor

type InputConstructor func(input.Config, NewManagement) (input.Streamed, error)

InputConstructor constructs an input component.

type InputSet

type InputSet struct {
	// contains filtered or unexported fields
}

InputSet contains an explicit set of inputs available to a Benthos service.

func (*InputSet) Add

func (s *InputSet) Add(constructor InputConstructor, spec docs.ComponentSpec) error

Add a new input to this set by providing a constructor and documentation.

func (*InputSet) Docs

func (s *InputSet) Docs() []docs.ComponentSpec

Docs returns a slice of input specs, which document each method.

func (*InputSet) DocsFor

func (s *InputSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*InputSet) Init

func (s *InputSet) Init(conf input.Config, mgr NewManagement) (input.Streamed, error)

Init attempts to initialise an input from a config.

type MetricConstructor

type MetricConstructor func(conf metrics.Config, nm NewManagement) (metrics.Type, error)

MetricConstructor constructs an metrics component.

type MetricsSet

type MetricsSet struct {
	// contains filtered or unexported fields
}

MetricsSet contains an explicit set of metrics available to a Benthos service.

func (*MetricsSet) Add

func (s *MetricsSet) Add(constructor MetricConstructor, spec docs.ComponentSpec) error

Add a new metrics to this set by providing a spec (name, documentation, and constructor).

func (*MetricsSet) Docs

func (s *MetricsSet) Docs() []docs.ComponentSpec

Docs returns a slice of metrics specs, which document each method.

func (*MetricsSet) DocsFor

func (s *MetricsSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*MetricsSet) Init

Init attempts to initialise an metrics from a config.

type NewManagement

type NewManagement interface {
	ForStream(id string) NewManagement
	IntoPath(segments ...string) NewManagement
	WithAddedMetrics(m metrics.Type) NewManagement

	Path() []string
	Label() string

	Metrics() metrics.Type
	Logger() log.Modular
	Tracer() trace.TracerProvider
	FS() ifs.FS
	BloblEnvironment() *bloblang.Environment

	RegisterEndpoint(path, desc string, h http.HandlerFunc)

	NewBuffer(conf buffer.Config) (buffer.Streamed, error)
	NewCache(conf cache.Config) (cache.V1, error)
	NewInput(conf input.Config) (input.Streamed, error)
	NewProcessor(conf processor.Config) (processor.V1, error)
	NewOutput(conf output.Config, pipelines ...processor.PipelineConstructorFunc) (output.Streamed, error)
	NewRateLimit(conf ratelimit.Config) (ratelimit.V1, error)

	ProbeCache(name string) bool
	AccessCache(ctx context.Context, name string, fn func(cache.V1)) error
	StoreCache(ctx context.Context, name string, conf cache.Config) error
	RemoveCache(ctx context.Context, name string) error

	ProbeInput(name string) bool
	AccessInput(ctx context.Context, name string, fn func(input.Streamed)) error
	StoreInput(ctx context.Context, name string, conf input.Config) error
	RemoveInput(ctx context.Context, name string) error

	ProbeProcessor(name string) bool
	AccessProcessor(ctx context.Context, name string, fn func(processor.V1)) error
	StoreProcessor(ctx context.Context, name string, conf processor.Config) error
	RemoveProcessor(ctx context.Context, name string) error

	ProbeOutput(name string) bool
	AccessOutput(ctx context.Context, name string, fn func(output.Sync)) error
	StoreOutput(ctx context.Context, name string, conf output.Config) error
	RemoveOutput(ctx context.Context, name string) error

	ProbeRateLimit(name string) bool
	AccessRateLimit(ctx context.Context, name string, fn func(ratelimit.V1)) error
	StoreRateLimit(ctx context.Context, name string, conf ratelimit.Config) error
	RemoveRateLimit(ctx context.Context, name string) error

	GetPipe(name string) (<-chan message.Transaction, error)
	SetPipe(name string, t <-chan message.Transaction)
	UnsetPipe(name string, t <-chan message.Transaction)
}

NewManagement defines the latest API for a Benthos manager, which will become the only API (internally) in Benthos V4.

type OutputConstructor

OutputConstructor constructs an output component.

type OutputSet

type OutputSet struct {
	// contains filtered or unexported fields
}

OutputSet contains an explicit set of outputs available to a Benthos service.

func (*OutputSet) Add

func (s *OutputSet) Add(constructor OutputConstructor, spec docs.ComponentSpec) error

Add a new output to this set by providing a spec (name, documentation, and constructor).

func (*OutputSet) Docs

func (s *OutputSet) Docs() []docs.ComponentSpec

Docs returns a slice of output specs, which document each method.

func (*OutputSet) DocsFor

func (s *OutputSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*OutputSet) Init

Init attempts to initialise an output from a config.

type ProcessorConstructor

type ProcessorConstructor func(conf processor.Config, mgr NewManagement) (processor.V1, error)

ProcessorConstructor constructs an processor component.

type ProcessorSet

type ProcessorSet struct {
	// contains filtered or unexported fields
}

ProcessorSet contains an explicit set of processors available to a Benthos service.

func (*ProcessorSet) Add

func (s *ProcessorSet) Add(constructor ProcessorConstructor, spec docs.ComponentSpec) error

Add a new processor to this set by providing a spec (name, documentation, and constructor).

func (*ProcessorSet) Docs

func (s *ProcessorSet) Docs() []docs.ComponentSpec

Docs returns a slice of processor specs, which document each method.

func (*ProcessorSet) DocsFor

func (s *ProcessorSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*ProcessorSet) Init

Init attempts to initialise an processor from a config.

type RateLimitConstructor

type RateLimitConstructor func(ratelimit.Config, NewManagement) (ratelimit.V1, error)

RateLimitConstructor constructs an ratelimit component.

type RateLimitSet

type RateLimitSet struct {
	// contains filtered or unexported fields
}

RateLimitSet contains an explicit set of ratelimits available to a Benthos service.

func (*RateLimitSet) Add

func (s *RateLimitSet) Add(constructor RateLimitConstructor, spec docs.ComponentSpec) error

Add a new ratelimit to this set by providing a spec (name, documentation, and constructor).

func (*RateLimitSet) Docs

func (s *RateLimitSet) Docs() []docs.ComponentSpec

Docs returns a slice of ratelimit specs, which document each method.

func (*RateLimitSet) DocsFor

func (s *RateLimitSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*RateLimitSet) Init

Init attempts to initialise an ratelimit from a config.

type TracerConstructor

type TracerConstructor func(tracer.Config, NewManagement) (trace.TracerProvider, error)

TracerConstructor constructs an tracer component.

type TracerSet

type TracerSet struct {
	// contains filtered or unexported fields
}

TracerSet contains an explicit set of tracers available to a Benthos service.

func (*TracerSet) Add

func (s *TracerSet) Add(constructor TracerConstructor, spec docs.ComponentSpec) error

Add a new tracer to this set by providing a spec (name, documentation, and constructor).

func (*TracerSet) Docs

func (s *TracerSet) Docs() []docs.ComponentSpec

Docs returns a slice of tracer specs, which document each method.

func (*TracerSet) DocsFor

func (s *TracerSet) DocsFor(name string) (docs.ComponentSpec, bool)

DocsFor returns the documentation for a given component name, returns a boolean indicating whether the component name exists.

func (*TracerSet) Init

Init attempts to initialise an tracer from a config.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL