output

package
v4.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Description

func Description(async, batches bool, content string) string

Description appends standard feature descriptions to an output description based on various features of the output.

func IterateBatchedSend

func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AsyncSink

type AsyncSink interface {
	// Connect attempts to establish a connection to the sink, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// WriteBatch should block until either the message is sent (and
	// acknowledged) to a sink, or a transport specific error has occurred, or
	// the Type is closed.
	WriteBatch(ctx context.Context, msg message.Batch) error

	// Close is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	Close(ctx context.Context) error
}

AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type AsyncWriter

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is an output type that writes messages to a writer.Type.

func (*AsyncWriter) Connected

func (w *AsyncWriter) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*AsyncWriter) Consume

func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*AsyncWriter) TriggerCloseNow

func (w *AsyncWriter) TriggerCloseNow()

TriggerCloseNow shuts down the output and stops processing messages.

func (*AsyncWriter) WaitForClose

func (w *AsyncWriter) WaitForClose(ctx context.Context) error

WaitForClose blocks until the File output has closed down.

type BrokerConfig

type BrokerConfig struct {
	Copies   int                `json:"copies" yaml:"copies"`
	Pattern  string             `json:"pattern" yaml:"pattern"`
	Outputs  []Config           `json:"outputs" yaml:"outputs"`
	Batching batchconfig.Config `json:"batching" yaml:"batching"`
}

BrokerConfig contains configuration fields for the Broker output type.

func NewBrokerConfig

func NewBrokerConfig() BrokerConfig

NewBrokerConfig creates a new BrokerConfig with default values.

type CacheConfig

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type CassandraConfig

type CassandraConfig struct {
	Addresses                []string              `json:"addresses" yaml:"addresses"`
	TLS                      btls.Config           `json:"tls" yaml:"tls"`
	PasswordAuthenticator    PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"`
	DisableInitialHostLookup bool                  `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"`
	Query                    string                `json:"query" yaml:"query"`
	ArgsMapping              string                `json:"args_mapping" yaml:"args_mapping"`
	Consistency              string                `json:"consistency" yaml:"consistency"`
	Timeout                  string                `json:"timeout" yaml:"timeout"`
	LoggedBatch              bool                  `json:"logged_batch" yaml:"logged_batch"`
	// TODO: V4 Remove this and replace with explicit values.
	retries.Config `json:",inline" yaml:",inline"`
	MaxInFlight    int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

CassandraConfig contains configuration fields for the Cassandra output type.

func NewCassandraConfig

func NewCassandraConfig() CassandraConfig

NewCassandraConfig creates a new CassandraConfig with default values.

type Config

type Config struct {
	Label        string             `json:"label" yaml:"label"`
	Type         string             `json:"type" yaml:"type"`
	Broker       BrokerConfig       `json:"broker" yaml:"broker"`
	Cache        CacheConfig        `json:"cache" yaml:"cache"`
	Cassandra    CassandraConfig    `json:"cassandra" yaml:"cassandra"`
	Drop         DropConfig         `json:"drop" yaml:"drop"`
	DropOn       DropOnConfig       `json:"drop_on" yaml:"drop_on"`
	Dynamic      DynamicConfig      `json:"dynamic" yaml:"dynamic"`
	Fallback     TryConfig          `json:"fallback" yaml:"fallback"`
	Inproc       string             `json:"inproc" yaml:"inproc"`
	MQTT         MQTTConfig         `json:"mqtt" yaml:"mqtt"`
	Nanomsg      NanomsgConfig      `json:"nanomsg" yaml:"nanomsg"`
	NSQ          NSQConfig          `json:"nsq" yaml:"nsq"`
	Plugin       any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Reject       string             `json:"reject" yaml:"reject"`
	Resource     string             `json:"resource" yaml:"resource"`
	Retry        RetryConfig        `json:"retry" yaml:"retry"`
	SFTP         SFTPConfig         `json:"sftp" yaml:"sftp"`
	STDOUT       STDOUTConfig       `json:"stdout" yaml:"stdout"`
	Subprocess   SubprocessConfig   `json:"subprocess" yaml:"subprocess"`
	Switch       SwitchConfig       `json:"switch" yaml:"switch"`
	SyncResponse struct{}           `json:"sync_response" yaml:"sync_response"`
	Socket       SocketConfig       `json:"socket" yaml:"socket"`
	Processors   []processor.Config `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type DropConfig

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type DropOnConditions

type DropOnConditions struct {
	Error        bool   `json:"error" yaml:"error"`
	BackPressure string `json:"back_pressure" yaml:"back_pressure"`
}

DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.

type DropOnConfig

type DropOnConfig struct {
	DropOnConditions `json:",inline" yaml:",inline"`
	Output           *Config `json:"output" yaml:"output"`
}

DropOnConfig contains configuration values for the DropOn output type.

func NewDropOnConfig

func NewDropOnConfig() DropOnConfig

NewDropOnConfig creates a new DropOnConfig with default values.

func (DropOnConfig) MarshalJSON

func (d DropOnConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (DropOnConfig) MarshalYAML

func (d DropOnConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type DynamicConfig

type DynamicConfig struct {
	Outputs map[string]Config `json:"outputs" yaml:"outputs"`
	Prefix  string            `json:"prefix" yaml:"prefix"`
}

DynamicConfig contains configuration fields for the Dynamic output type.

func NewDynamicConfig

func NewDynamicConfig() DynamicConfig

NewDynamicConfig creates a new DynamicConfig with default values.

type MQTTConfig

type MQTTConfig struct {
	URLs                  []string      `json:"urls" yaml:"urls"`
	QoS                   uint8         `json:"qos" yaml:"qos"`
	Retained              bool          `json:"retained" yaml:"retained"`
	RetainedInterpolated  string        `json:"retained_interpolated" yaml:"retained_interpolated"`
	Topic                 string        `json:"topic" yaml:"topic"`
	ClientID              string        `json:"client_id" yaml:"client_id"`
	DynamicClientIDSuffix string        `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
	Will                  mqttconf.Will `json:"will" yaml:"will"`
	User                  string        `json:"user" yaml:"user"`
	Password              string        `json:"password" yaml:"password"`
	ConnectTimeout        string        `json:"connect_timeout" yaml:"connect_timeout"`
	WriteTimeout          string        `json:"write_timeout" yaml:"write_timeout"`
	KeepAlive             int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight           int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                   tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type NSQConfig

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type NanomsgConfig

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type PasswordAuthenticator

type PasswordAuthenticator struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.

type RetryConfig

type RetryConfig struct {
	Output         *Config `json:"output" yaml:"output"`
	retries.Config `json:",inline" yaml:",inline"`
}

RetryConfig contains configuration values for the Retry output type.

func NewRetryConfig

func NewRetryConfig() RetryConfig

NewRetryConfig creates a new RetryConfig with default values.

func (RetryConfig) MarshalJSON

func (r RetryConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (RetryConfig) MarshalYAML

func (r RetryConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type SFTPConfig

type SFTPConfig struct {
	Address     string                `json:"address" yaml:"address"`
	Path        string                `json:"path" yaml:"path"`
	Codec       string                `json:"codec" yaml:"codec"`
	Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"`
	MaxInFlight int                   `json:"max_in_flight" yaml:"max_in_flight"`
}

SFTPConfig contains configuration fields for the SFTP output type.

func NewSFTPConfig

func NewSFTPConfig() SFTPConfig

NewSFTPConfig creates a new Config with default values.

type STDOUTConfig

type STDOUTConfig struct {
	Codec string `json:"codec" yaml:"codec"`
}

STDOUTConfig contains configuration fields for the stdout based output type.

func NewSTDOUTConfig

func NewSTDOUTConfig() STDOUTConfig

NewSTDOUTConfig creates a new STDOUTConfig with default values.

type SocketConfig

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type Streamed

type Streamed interface {
	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by outputs and provides channel based streaming APIs.

func NewAsyncWriter

func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)

NewAsyncWriter creates a Streamed implementation around an AsyncSink.

func OnlySinglePayloads

func OnlySinglePayloads(out Streamed) Streamed

OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.

func WrapWithPipelines

func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an output with a variadic number of pipelines.

type SubprocessConfig

type SubprocessConfig struct {
	Name  string   `json:"name" yaml:"name"`
	Args  []string `json:"args" yaml:"args"`
	Codec string   `json:"codec" yaml:"codec"`
}

SubprocessConfig contains configuration for the Subprocess input type.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig creates a new SubprocessConfig with default values.

type SwitchConfig

type SwitchConfig struct {
	RetryUntilSuccess bool               `json:"retry_until_success" yaml:"retry_until_success"`
	StrictMode        bool               `json:"strict_mode" yaml:"strict_mode"`
	Cases             []SwitchConfigCase `json:"cases" yaml:"cases"`
}

SwitchConfig contains configuration fields for the switchOutput output type.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig creates a new SwitchConfig with default values.

type SwitchConfigCase

type SwitchConfigCase struct {
	Check    string `json:"check" yaml:"check"`
	Continue bool   `json:"continue" yaml:"continue"`
	Output   Config `json:"output" yaml:"output"`
}

SwitchConfigCase contains configuration fields per output of a switch type.

func NewSwitchConfigCase

func NewSwitchConfigCase() SwitchConfigCase

NewSwitchConfigCase creates a new switch output config with default values.

type Sync

type Sync interface {
	// WriteTransaction attempts to write a transaction to an output.
	WriteTransaction(context.Context, message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerStopConsuming instructs the output to start shutting down
	// resources once all pending messages are delivered and acknowledged.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Sync is a common interface implemented by outputs and provides synchronous based writing APIs.

type TryConfig

type TryConfig []Config

TryConfig contains configuration fields for the Try output type.

func NewTryConfig

func NewTryConfig() TryConfig

NewTryConfig creates a new BrokerConfig with default values.

type WithPipeline

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.

func WrapWithPipeline

func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.

func (*WithPipeline) Connected

func (i *WithPipeline) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*WithPipeline) Consume

func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error

Consume starts the type listening to a message channel from a producer.

func (*WithPipeline) TriggerCloseNow

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers a closure of this object but does not block.

func (*WithPipeline) WaitForClose

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL