manager

package
v4.0.0-...-13a3402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package manager creates and manages multiple streams, providing an API for performing CRUD operations.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamExists       = errors.New("stream already exists")
	ErrStreamDoesNotExist = errors.New("stream does not exist")
)

Errors specifically returned by a stream manager.

Functions

func OptAPIEnabled

func OptAPIEnabled(b bool) func(*Type)

OptAPIEnabled sets whether the stream manager registers API endpoints for CRUD operations on streams. This is enabled by default.

Types

type ConfigSet

type ConfigSet map[string]stream.Config

ConfigSet is a map of stream configurations mapped by ID, which can be YAML parsed without losing default values inside the stream configs.

func (ConfigSet) UnmarshalYAML

func (c ConfigSet) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type StreamProcConstructorFunc

type StreamProcConstructorFunc func(streamID string) (processor.V1, error)

StreamProcConstructorFunc is a closure type that constructs a processor type for new streams, where the id of the stream is provided as an argument.

type StreamStatus

type StreamStatus struct {
	// contains filtered or unexported fields
}

StreamStatus tracks a stream along with information regarding its internals.

func (*StreamStatus) Config

func (s *StreamStatus) Config() stream.Config

Config returns the configuration of the stream.

func (*StreamStatus) IsReady

func (s *StreamStatus) IsReady() bool

IsReady returns a boolean indicating whether the stream is connected at both the input and output level.

func (*StreamStatus) IsRunning

func (s *StreamStatus) IsRunning() bool

IsRunning returns a boolean indicating whether the stream is currently running.

func (*StreamStatus) Metrics

func (s *StreamStatus) Metrics() *metrics.Local

Metrics returns a metrics aggregator of the stream.

func (*StreamStatus) Uptime

func (s *StreamStatus) Uptime() time.Duration

Uptime returns a time.Duration indicating the current uptime of the stream.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type manages a collection of streams, providing APIs for CRUD operations on the streams.

func New

func New(mgr bundle.NewManagement, opts ...func(*Type)) *Type

New creates a new stream manager.Type.

func (*Type) Create

func (m *Type) Create(id string, conf stream.Config) error

Create attempts to construct and run a new stream under a unique ID. If the ID already exists an error is returned.

func (*Type) Delete

func (m *Type) Delete(ctx context.Context, id string) error

Delete attempts to stop and remove a stream by its ID. Returns an error if the stream was not found, or if clean shutdown fails in the specified period of time.

func (*Type) HandleResourceCRUD

func (m *Type) HandleResourceCRUD(w http.ResponseWriter, r *http.Request)

HandleResourceCRUD is an http.HandleFunc for performing CRUD operations on resource components.

func (*Type) HandleStreamCRUD

func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamCRUD is an http.HandleFunc for performing CRUD operations on individual streams.

func (*Type) HandleStreamReady

func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)

HandleStreamReady is an http.HandleFunc for providing a ready check across all streams.

func (*Type) HandleStreamStats

func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)

HandleStreamStats is an http.HandleFunc for obtaining metrics for a stream.

func (*Type) HandleStreamsCRUD

func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamsCRUD is an http.HandleFunc for returning maps of active benthos streams by their id, status and uptime or overwriting the entire set of streams.

func (*Type) Read

func (m *Type) Read(id string) (*StreamStatus, error)

Read attempts to obtain the status of a managed stream. Returns an error if the stream does not exist.

func (*Type) Stop

func (m *Type) Stop(ctx context.Context) error

Stop attempts to gracefully shut down all active streams and close the stream manager.

func (*Type) Update

func (m *Type) Update(ctx context.Context, id string, conf stream.Config) error

Update attempts to stop an existing stream and replace it with a new version of the same stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL