config

package
v4.0.0-...-13a3402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LintBytes

func LintBytes(lintConf docs.LintConfig, rawBytes []byte) ([]docs.Lint, error)

LintBytes attempts to report errors within a user config. Returns a slice of lint results.

func MarshalYAML

func MarshalYAML(v yaml.Node) ([]byte, error)

MarshalYAML marshals a structure into YAML with consistent formatting across all Benthos components.

func ReadFileEnvSwap

func ReadFileEnvSwap(store ifs.FS, path string, lookupEnvFn func(name string) (string, bool)) (configBytes []byte, lints []docs.Lint, modTime time.Time, err error)

ReadFileEnvSwap reads a file and replaces any environment variable interpolations before returning the contents. Linting errors are returned if the file has an unexpected higher level format, such as invalid utf-8 encoding.

An modTime timestamp is returned if the modtime of the file is available.

func ReadFileLinted

func ReadFileLinted(fs ifs.FS, path string, skipEnvVarCheck bool, lConf docs.LintConfig, config *Type) ([]docs.Lint, error)

ReadFileLinted will attempt to read a configuration file path into a structure. Returns an array of lint messages or an error.

func ReplaceEnvVariables

func ReplaceEnvVariables(inBytes []byte, lookupFn func(string) (string, bool)) (replaced []byte, err error)

ReplaceEnvVariables will search a blob of data for the pattern `${FOO:bar}`, where `FOO` is an environment variable name and `bar` is a default value. The `bar` section (including the colon) can be left out if there is no appropriate default value for the field.

For each aforementioned pattern found in the blob the contents of the respective environment variable will be read and will replace the pattern. If the environment variable is empty or does not exist then either the default value is used or the field will be left empty.

func ShouldReread

func ShouldReread(err error) bool

ShouldReread returns true if the error returned from an update trigger is non nil and also temporal, and therefore it is worth trying the update again even if the content has not changed.

func Spec

func Spec() docs.FieldSpecs

Spec returns a docs.FieldSpec for an entire Benthos configuration.

func SpecWithoutStream

func SpecWithoutStream() docs.FieldSpecs

SpecWithoutStream describes a stream config without the core stream fields.

Types

type ErrMissingEnvVars

type ErrMissingEnvVars struct {
	Variables []string

	// Our best attempt at parsing the config that's missing variables by simply
	// inserting an empty string. There's a good chance this is still a valid
	// config! :)
	BestAttempt []byte
}

ErrMissingEnvVars is returned when attempting environment variable interpolations where the referenced environment variables are missing.

func (*ErrMissingEnvVars) Error

func (e *ErrMissingEnvVars) Error() string

Error returns a rather sweet error message.

type ErrNoReread

type ErrNoReread struct {
	// contains filtered or unexported fields
}

ErrNoReread is an error type returned from update triggers that indicates an attempt should not be re-made unless the source file has been modified.

func (*ErrNoReread) Error

func (e *ErrNoReread) Error() string

Error returns a human readable error string.

func (*ErrNoReread) Unwrap

func (e *ErrNoReread) Unwrap() error

Unwrap the underlying error.

type MainUpdateFunc

type MainUpdateFunc func(conf *Type) error

MainUpdateFunc is a closure function called whenever a main config has been updated. If an error is returned then the attempt will be made again after a grace period.

type OptFunc

type OptFunc func(*Reader)

OptFunc is an opt function that changes the behaviour of a config reader.

func OptAddOverrides

func OptAddOverrides(overrides ...string) OptFunc

OptAddOverrides adds one or more override expressions to the config reader, each of the form `path=value`.

func OptSetBootstrapConfig

func OptSetBootstrapConfig(conf *Type) OptFunc

OptSetBootstrapConfig sets a config to be used as the default for each parse. This can be used to change the default behaviours of benthos configs.

func OptSetLintConfig

func OptSetLintConfig(lConf docs.LintConfig) OptFunc

OptSetLintConfig sets the config used for linting files.

func OptSetStreamPaths

func OptSetStreamPaths(streamsPaths ...string) OptFunc

OptSetStreamPaths marks this config reader as operating in streams mode, and adds a list of paths to obtain individual stream configs from.

func OptTestSuffix

func OptTestSuffix(suffix string) OptFunc

OptTestSuffix configures the suffix given to unit test definition files, this is used in order to exclude unit tests from being run in streams mode with arbitrary directory walking.

func OptUseFS

func OptUseFS(fs ifs.FS) OptFunc

OptUseFS sets the ifs.FS implementation for the reader to use. By default the OS filesystem is used, and when overridden it is no longer possible to use BeginFileWatching.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides utilities for parsing a Benthos config as a main file with a collection of resource files, and options such as overrides.

func NewReader

func NewReader(mainPath string, resourcePaths []string, opts ...OptFunc) *Reader

NewReader creates a new config reader.

func (*Reader) BeginFileWatching

func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error

BeginFileWatching creates a goroutine that watches all active configuration files for changes. If a resource is changed then it is swapped out automatically through the provided manager. If a main config or stream config changes then the closures registered with either SubscribeConfigChanges or SubscribeStreamChanges will be called.

WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be called before this, as otherwise it is unsafe to register them during watching.

func (*Reader) Close

func (r *Reader) Close(ctx context.Context) error

Close the reader, when this method exits all reloading will be stopped.

func (*Reader) Read

func (r *Reader) Read() (conf Type, lints []string, err error)

Read a Benthos config from the files and options specified.

func (*Reader) ReadStreams

func (r *Reader) ReadStreams(confs map[string]stream.Config) (lints []string, err error)

ReadStreams attempts to read Benthos stream configs from one or more paths. Stream configs are extracted and added to a provided map, where the id is derived from the path of the stream config file.

func (*Reader) SubscribeConfigChanges

func (r *Reader) SubscribeConfigChanges(fn MainUpdateFunc) error

SubscribeConfigChanges registers a closure function to be called whenever the main configuration file is updated.

The provided closure should return true if the stream was successfully replaced.

func (*Reader) SubscribeStreamChanges

func (r *Reader) SubscribeStreamChanges(fn StreamUpdateFunc) error

SubscribeStreamChanges registers a closure to be called whenever the configuration of a stream is updated.

The provided closure should return true if the stream was successfully replaced.

func (*Reader) TriggerMainUpdate

func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error

TriggerMainUpdate attempts to re-read the main configuration file, trigger the provided main update func, and apply changes to resources to the provided manager as appropriate.

func (*Reader) TriggerResourceDelete

func (r *Reader) TriggerResourceDelete(mgr bundle.NewManagement, path string) error

TriggerResourceDelete attempts to remove all resources that originated from a given file.

func (*Reader) TriggerResourceUpdate

func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error

TriggerResourceUpdate attempts to re-read a resource configuration file and apply changes to the provided manager as appropriate.

func (*Reader) TriggerStreamUpdate

func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error

TriggerStreamUpdate attempts to re-read a stream configuration file, and trigger the provided stream update func.

type StreamUpdateFunc

type StreamUpdateFunc func(id string, conf *stream.Config) error

StreamUpdateFunc is a closure function called whenever a stream config has been updated. If an error is returned then the attempt will be made again after a grace period.

When the provided config is nil it is a signal that the stream has been deleted, and it is expected that the provided update func should shut that stream down.

type Type

type Type struct {
	HTTP                   api.Config `json:"http" yaml:"http"`
	stream.Config          `json:",inline" yaml:",inline"`
	manager.ResourceConfig `json:",inline" yaml:",inline"`
	Logger                 log.Config     `json:"logger" yaml:"logger"`
	Metrics                metrics.Config `json:"metrics" yaml:"metrics"`
	Tracer                 tracer.Config  `json:"tracer" yaml:"tracer"`
	SystemCloseDelay       string         `json:"shutdown_delay" yaml:"shutdown_delay"`
	SystemCloseTimeout     string         `json:"shutdown_timeout" yaml:"shutdown_timeout"`
	Tests                  []any          `json:"tests,omitempty" yaml:"tests,omitempty"`
}

Type is the Benthos service configuration struct.

func New

func New() Type

New returns a new configuration with default values.

func (*Type) Clone

func (t *Type) Clone() (Type, error)

Clone a config, creating a new copy that can be mutated in isolation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL