output

package
v4.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2023 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// GCPCloudStorageErrorIfExistsCollisionMode - error-if-exists.
	GCPCloudStorageErrorIfExistsCollisionMode = "error-if-exists"

	// GCPCloudStorageAppendCollisionMode - append.
	GCPCloudStorageAppendCollisionMode = "append"

	// GCPCloudStorageIgnoreCollisionMode - ignore.
	GCPCloudStorageIgnoreCollisionMode = "ignore"

	// GCPCloudStorageOverwriteCollisionMode - overwrite.
	GCPCloudStorageOverwriteCollisionMode = "overwrite"
)

Variables

View Source
var InjectTracingSpanMappingDocs = docs.FieldBloblang(
	"inject_tracing_map",
	"EXPERIMENTAL: A [Bloblang mapping](/docs/guides/bloblang/about) used to inject an object containing tracing propagation information into outbound messages. The specification of the injected fields will match the format used by the service wide tracer.",
	`meta = meta().merge(this)`,
	`root.meta.span = this`,
).AtVersion("3.45.0").Advanced()

InjectTracingSpanMappingDocs returns a field spec describing an inject tracing span mapping.

Functions

func Description

func Description(async, batches bool, content string) string

Description appends standard feature descriptions to an output description based on various features of the output.

func IterateBatchedSend

func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AMQP1Config

type AMQP1Config struct {
	URL                          string                       `json:"url" yaml:"url"`
	TargetAddress                string                       `json:"target_address" yaml:"target_address"`
	MaxInFlight                  int                          `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                          btls.Config                  `json:"tls" yaml:"tls"`
	SASL                         shared.SASLConfig            `json:"sasl" yaml:"sasl"`
	Metadata                     metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	ApplicationPropertiesMapping string                       `json:"application_properties_map" yaml:"application_properties_map"`
}

AMQP1Config contains configuration fields for the AMQP1 output type.

func NewAMQP1Config

func NewAMQP1Config() AMQP1Config

NewAMQP1Config creates a new AMQP1Config with default values.

type AMQPConfig

type AMQPConfig struct {
	URLs            []string                     `json:"urls" yaml:"urls"`
	MaxInFlight     int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Exchange        string                       `json:"exchange" yaml:"exchange"`
	ExchangeDeclare AMQPExchangeDeclareConfig    `json:"exchange_declare" yaml:"exchange_declare"`
	BindingKey      string                       `json:"key" yaml:"key"`
	Type            string                       `json:"type" yaml:"type"`
	ContentType     string                       `json:"content_type" yaml:"content_type"`
	ContentEncoding string                       `json:"content_encoding" yaml:"content_encoding"`
	Metadata        metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	Priority        string                       `json:"priority" yaml:"priority"`
	Persistent      bool                         `json:"persistent" yaml:"persistent"`
	Mandatory       bool                         `json:"mandatory" yaml:"mandatory"`
	Immediate       bool                         `json:"immediate" yaml:"immediate"`
	TLS             btls.Config                  `json:"tls" yaml:"tls"`
	Timeout         string                       `json:"timeout" yaml:"timeout"`
}

AMQPConfig contains configuration fields for the AMQP output type.

func NewAMQPConfig

func NewAMQPConfig() AMQPConfig

NewAMQPConfig creates a new AMQPConfig with default values.

type AMQPExchangeDeclareConfig

type AMQPExchangeDeclareConfig struct {
	Enabled bool   `json:"enabled" yaml:"enabled"`
	Type    string `json:"type" yaml:"type"`
	Durable bool   `json:"durable" yaml:"durable"`
}

AMQPExchangeDeclareConfig contains fields indicating whether the target AMQP exchange needs to be declared, as well as any fields specifying how to accomplish that.

type AmazonS3Config

type AmazonS3Config struct {
	sess.Config             `json:",inline" yaml:",inline"`
	Bucket                  string                       `json:"bucket" yaml:"bucket"`
	ForcePathStyleURLs      bool                         `json:"force_path_style_urls" yaml:"force_path_style_urls"`
	Path                    string                       `json:"path" yaml:"path"`
	Tags                    map[string]string            `json:"tags" yaml:"tags"`
	ContentType             string                       `json:"content_type" yaml:"content_type"`
	ContentEncoding         string                       `json:"content_encoding" yaml:"content_encoding"`
	CacheControl            string                       `json:"cache_control" yaml:"cache_control"`
	ContentDisposition      string                       `json:"content_disposition" yaml:"content_disposition"`
	ContentLanguage         string                       `json:"content_language" yaml:"content_language"`
	WebsiteRedirectLocation string                       `json:"website_redirect_location" yaml:"website_redirect_location"`
	Metadata                metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	StorageClass            string                       `json:"storage_class" yaml:"storage_class"`
	Timeout                 string                       `json:"timeout" yaml:"timeout"`
	KMSKeyID                string                       `json:"kms_key_id" yaml:"kms_key_id"`
	ServerSideEncryption    string                       `json:"server_side_encryption" yaml:"server_side_encryption"`
	MaxInFlight             int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config           `json:"batching" yaml:"batching"`
}

AmazonS3Config contains configuration fields for the AmazonS3 output type.

func NewAmazonS3Config

func NewAmazonS3Config() AmazonS3Config

NewAmazonS3Config creates a new Config with default values.

type AmazonSQSConfig

type AmazonSQSConfig struct {
	SessionConfig          `json:",inline" yaml:",inline"`
	URL                    string                       `json:"url" yaml:"url"`
	MessageGroupID         string                       `json:"message_group_id" yaml:"message_group_id"`
	MessageDeduplicationID string                       `json:"message_deduplication_id" yaml:"message_deduplication_id"`
	Metadata               metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	MaxInFlight            int                          `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config         `json:",inline" yaml:",inline"`
	Batching               batchconfig.Config `json:"batching" yaml:"batching"`
}

AmazonSQSConfig contains configuration fields for the output AmazonSQS type.

func NewAmazonSQSConfig

func NewAmazonSQSConfig() AmazonSQSConfig

NewAmazonSQSConfig creates a new Config with default values.

type AsyncSink

type AsyncSink interface {
	// Connect attempts to establish a connection to the sink, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// WriteBatch should block until either the message is sent (and
	// acknowledged) to a sink, or a transport specific error has occurred, or
	// the Type is closed.
	WriteBatch(ctx context.Context, msg message.Batch) error

	// Close is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	Close(ctx context.Context) error
}

AsyncSink is a type that writes Benthos messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type AsyncWriter

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is an output type that writes messages to a writer.Type.

func (*AsyncWriter) Connected

func (w *AsyncWriter) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*AsyncWriter) Consume

func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*AsyncWriter) SetInjectTracingMap

func (w *AsyncWriter) SetInjectTracingMap(exec *mapping.Executor)

SetInjectTracingMap sets a mapping to be used for injecting tracing events into messages.

func (*AsyncWriter) TriggerCloseNow

func (w *AsyncWriter) TriggerCloseNow()

TriggerCloseNow shuts down the output and stops processing messages.

func (*AsyncWriter) WaitForClose

func (w *AsyncWriter) WaitForClose(ctx context.Context) error

WaitForClose blocks until the File output has closed down.

type AzureBlobStorageConfig

type AzureBlobStorageConfig struct {
	StorageAccount          string `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string `json:"storage_access_key" yaml:"storage_access_key"`
	StorageSASToken         string `json:"storage_sas_token" yaml:"storage_sas_token"`
	StorageConnectionString string `json:"storage_connection_string" yaml:"storage_connection_string"`
	Container               string `json:"container" yaml:"container"`
	Path                    string `json:"path" yaml:"path"`
	BlobType                string `json:"blob_type" yaml:"blob_type"`
	PublicAccessLevel       string `json:"public_access_level" yaml:"public_access_level"`
	MaxInFlight             int    `json:"max_in_flight" yaml:"max_in_flight"`
}

AzureBlobStorageConfig contains configuration fields for the AzureBlobStorage output type.

func NewAzureBlobStorageConfig

func NewAzureBlobStorageConfig() AzureBlobStorageConfig

NewAzureBlobStorageConfig creates a new Config with default values.

type AzureQueueStorageConfig

type AzureQueueStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	QueueName               string             `json:"queue_name" yaml:"queue_name"`
	TTL                     string             `json:"ttl" yaml:"ttl"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config `json:"batching" yaml:"batching"`
}

AzureQueueStorageConfig contains configuration fields for the output Azure Queue Storage type.

func NewAzureQueueStorageConfig

func NewAzureQueueStorageConfig() AzureQueueStorageConfig

NewAzureQueueStorageConfig creates a new Config with default values.

type AzureTableStorageConfig

type AzureTableStorageConfig struct {
	StorageAccount          string             `json:"storage_account" yaml:"storage_account"`
	StorageAccessKey        string             `json:"storage_access_key" yaml:"storage_access_key"`
	StorageConnectionString string             `json:"storage_connection_string" yaml:"storage_connection_string"`
	TableName               string             `json:"table_name" yaml:"table_name"`
	PartitionKey            string             `json:"partition_key" yaml:"partition_key"`
	RowKey                  string             `json:"row_key" yaml:"row_key"`
	Properties              map[string]string  `json:"properties" yaml:"properties"`
	InsertType              string             `json:"insert_type" yaml:"insert_type"`
	TransactionType         string             `json:"transaction_type" yaml:"transaction_type"`
	Timeout                 string             `json:"timeout" yaml:"timeout"`
	MaxInFlight             int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching                batchconfig.Config `json:"batching" yaml:"batching"`
}

AzureTableStorageConfig contains configuration fields for the AzureTableStorage output type.

func NewAzureTableStorageConfig

func NewAzureTableStorageConfig() AzureTableStorageConfig

NewAzureTableStorageConfig creates a new Config with default values.

type BrokerConfig

type BrokerConfig struct {
	Copies   int                `json:"copies" yaml:"copies"`
	Pattern  string             `json:"pattern" yaml:"pattern"`
	Outputs  []Config           `json:"outputs" yaml:"outputs"`
	Batching batchconfig.Config `json:"batching" yaml:"batching"`
}

BrokerConfig contains configuration fields for the Broker output type.

func NewBrokerConfig

func NewBrokerConfig() BrokerConfig

NewBrokerConfig creates a new BrokerConfig with default values.

type CacheConfig

type CacheConfig struct {
	Target      string `json:"target" yaml:"target"`
	Key         string `json:"key" yaml:"key"`
	TTL         string `json:"ttl" yaml:"ttl"`
	MaxInFlight int    `json:"max_in_flight" yaml:"max_in_flight"`
}

CacheConfig contains configuration fields for the Cache output type.

func NewCacheConfig

func NewCacheConfig() CacheConfig

NewCacheConfig creates a new Config with default values.

type CassandraConfig

type CassandraConfig struct {
	Addresses                []string              `json:"addresses" yaml:"addresses"`
	TLS                      btls.Config           `json:"tls" yaml:"tls"`
	PasswordAuthenticator    PasswordAuthenticator `json:"password_authenticator" yaml:"password_authenticator"`
	DisableInitialHostLookup bool                  `json:"disable_initial_host_lookup" yaml:"disable_initial_host_lookup"`
	Query                    string                `json:"query" yaml:"query"`
	ArgsMapping              string                `json:"args_mapping" yaml:"args_mapping"`
	Consistency              string                `json:"consistency" yaml:"consistency"`
	Timeout                  string                `json:"timeout" yaml:"timeout"`
	LoggedBatch              bool                  `json:"logged_batch" yaml:"logged_batch"`
	// TODO: V4 Remove this and replace with explicit values.
	retries.Config `json:",inline" yaml:",inline"`
	MaxInFlight    int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

CassandraConfig contains configuration fields for the Cassandra output type.

func NewCassandraConfig

func NewCassandraConfig() CassandraConfig

NewCassandraConfig creates a new CassandraConfig with default values.

type Config

type Config struct {
	Label              string                  `json:"label" yaml:"label"`
	Type               string                  `json:"type" yaml:"type"`
	AMQP09             AMQPConfig              `json:"amqp_0_9" yaml:"amqp_0_9"`
	AMQP1              AMQP1Config             `json:"amqp_1" yaml:"amqp_1"`
	AWSDynamoDB        DynamoDBConfig          `json:"aws_dynamodb" yaml:"aws_dynamodb"`
	AWSKinesis         KinesisConfig           `json:"aws_kinesis" yaml:"aws_kinesis"`
	AWSKinesisFirehose KinesisFirehoseConfig   `json:"aws_kinesis_firehose" yaml:"aws_kinesis_firehose"`
	AWSS3              AmazonS3Config          `json:"aws_s3" yaml:"aws_s3"`
	AWSSNS             SNSConfig               `json:"aws_sns" yaml:"aws_sns"`
	AWSSQS             AmazonSQSConfig         `json:"aws_sqs" yaml:"aws_sqs"`
	AzureBlobStorage   AzureBlobStorageConfig  `json:"azure_blob_storage" yaml:"azure_blob_storage"`
	AzureQueueStorage  AzureQueueStorageConfig `json:"azure_queue_storage" yaml:"azure_queue_storage"`
	AzureTableStorage  AzureTableStorageConfig `json:"azure_table_storage" yaml:"azure_table_storage"`
	Broker             BrokerConfig            `json:"broker" yaml:"broker"`
	Cache              CacheConfig             `json:"cache" yaml:"cache"`
	Cassandra          CassandraConfig         `json:"cassandra" yaml:"cassandra"`
	Drop               DropConfig              `json:"drop" yaml:"drop"`
	DropOn             DropOnConfig            `json:"drop_on" yaml:"drop_on"`
	Dynamic            DynamicConfig           `json:"dynamic" yaml:"dynamic"`
	Elasticsearch      ElasticsearchConfig     `json:"elasticsearch" yaml:"elasticsearch"`
	Fallback           TryConfig               `json:"fallback" yaml:"fallback"`
	File               FileConfig              `json:"file" yaml:"file"`
	GCPCloudStorage    GCPCloudStorageConfig   `json:"gcp_cloud_storage" yaml:"gcp_cloud_storage"`
	GCPPubSub          GCPPubSubConfig         `json:"gcp_pubsub" yaml:"gcp_pubsub"`
	HDFS               HDFSConfig              `json:"hdfs" yaml:"hdfs"`
	HTTPServer         HTTPServerConfig        `json:"http_server" yaml:"http_server"`
	Inproc             string                  `json:"inproc" yaml:"inproc"`
	Kafka              KafkaConfig             `json:"kafka" yaml:"kafka"`
	MongoDB            MongoDBConfig           `json:"mongodb" yaml:"mongodb"`
	MQTT               MQTTConfig              `json:"mqtt" yaml:"mqtt"`
	Nanomsg            NanomsgConfig           `json:"nanomsg" yaml:"nanomsg"`
	NATS               NATSConfig              `json:"nats" yaml:"nats"`
	NATSStream         NATSStreamConfig        `json:"nats_stream" yaml:"nats_stream"`
	NSQ                NSQConfig               `json:"nsq" yaml:"nsq"`
	Plugin             any                     `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	RedisHash          RedisHashConfig         `json:"redis_hash" yaml:"redis_hash"`
	RedisList          RedisListConfig         `json:"redis_list" yaml:"redis_list"`
	RedisPubSub        RedisPubSubConfig       `json:"redis_pubsub" yaml:"redis_pubsub"`
	RedisStreams       RedisStreamsConfig      `json:"redis_streams" yaml:"redis_streams"`
	Reject             string                  `json:"reject" yaml:"reject"`
	Resource           string                  `json:"resource" yaml:"resource"`
	Retry              RetryConfig             `json:"retry" yaml:"retry"`
	SFTP               SFTPConfig              `json:"sftp" yaml:"sftp"`
	STDOUT             STDOUTConfig            `json:"stdout" yaml:"stdout"`
	Subprocess         SubprocessConfig        `json:"subprocess" yaml:"subprocess"`
	Switch             SwitchConfig            `json:"switch" yaml:"switch"`
	SyncResponse       struct{}                `json:"sync_response" yaml:"sync_response"`
	Socket             SocketConfig            `json:"socket" yaml:"socket"`
	Processors         []processor.Config      `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func (*Config) UnmarshalYAML

func (conf *Config) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type DropConfig

type DropConfig struct{}

DropConfig contains configuration fields for the drop output type.

func NewDropConfig

func NewDropConfig() DropConfig

NewDropConfig creates a new DropConfig with default values.

type DropOnConditions

type DropOnConditions struct {
	Error        bool   `json:"error" yaml:"error"`
	BackPressure string `json:"back_pressure" yaml:"back_pressure"`
}

DropOnConditions is a config struct representing the different circumstances under which messages should be dropped.

type DropOnConfig

type DropOnConfig struct {
	DropOnConditions `json:",inline" yaml:",inline"`
	Output           *Config `json:"output" yaml:"output"`
}

DropOnConfig contains configuration values for the DropOn output type.

func NewDropOnConfig

func NewDropOnConfig() DropOnConfig

NewDropOnConfig creates a new DropOnConfig with default values.

func (DropOnConfig) MarshalJSON

func (d DropOnConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (DropOnConfig) MarshalYAML

func (d DropOnConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type DynamicConfig

type DynamicConfig struct {
	Outputs map[string]Config `json:"outputs" yaml:"outputs"`
	Prefix  string            `json:"prefix" yaml:"prefix"`
}

DynamicConfig contains configuration fields for the Dynamic output type.

func NewDynamicConfig

func NewDynamicConfig() DynamicConfig

NewDynamicConfig creates a new DynamicConfig with default values.

type DynamoDBConfig

type DynamoDBConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Table          string            `json:"table" yaml:"table"`
	StringColumns  map[string]string `json:"string_columns" yaml:"string_columns"`
	JSONMapColumns map[string]string `json:"json_map_columns" yaml:"json_map_columns"`
	TTL            string            `json:"ttl" yaml:"ttl"`
	TTLKey         string            `json:"ttl_key" yaml:"ttl_key"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

DynamoDBConfig contains config fields for the DynamoDB output type.

func NewDynamoDBConfig

func NewDynamoDBConfig() DynamoDBConfig

NewDynamoDBConfig creates a DynamoDBConfig populated with default values.

type ElasticsearchAuthConfig

type ElasticsearchAuthConfig struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

ElasticsearchAuthConfig contains basic authentication fields.

type ElasticsearchConfig

type ElasticsearchConfig struct {
	URLs            []string                `json:"urls" yaml:"urls"`
	Sniff           bool                    `json:"sniff" yaml:"sniff"`
	Healthcheck     bool                    `json:"healthcheck" yaml:"healthcheck"`
	ID              string                  `json:"id" yaml:"id"`
	Action          string                  `json:"action" yaml:"action"`
	Index           string                  `json:"index" yaml:"index"`
	Pipeline        string                  `json:"pipeline" yaml:"pipeline"`
	Routing         string                  `json:"routing" yaml:"routing"`
	Type            string                  `json:"type" yaml:"type"`
	Timeout         string                  `json:"timeout" yaml:"timeout"`
	TLS             btls.Config             `json:"tls" yaml:"tls"`
	Auth            ElasticsearchAuthConfig `json:"basic_auth" yaml:"basic_auth"`
	AWS             OptionalAWSConfig       `json:"aws" yaml:"aws"`
	GzipCompression bool                    `json:"gzip_compression" yaml:"gzip_compression"`
	MaxInFlight     int                     `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config  `json:",inline" yaml:",inline"`
	Batching        batchconfig.Config `json:"batching" yaml:"batching"`
}

ElasticsearchConfig contains configuration fields for the Elasticsearch output type.

func NewElasticsearchConfig

func NewElasticsearchConfig() ElasticsearchConfig

NewElasticsearchConfig creates a new ElasticsearchConfig with default values.

type FileConfig

type FileConfig struct {
	Path  string `json:"path" yaml:"path"`
	Codec string `json:"codec" yaml:"codec"`
}

FileConfig contains configuration fields for the file based output type.

func NewFileConfig

func NewFileConfig() FileConfig

NewFileConfig creates a new FileConfig with default values.

type GCPCloudStorageConfig

type GCPCloudStorageConfig struct {
	Bucket          string             `json:"bucket" yaml:"bucket"`
	Path            string             `json:"path" yaml:"path"`
	ContentType     string             `json:"content_type" yaml:"content_type"`
	ContentEncoding string             `json:"content_encoding" yaml:"content_encoding"`
	ChunkSize       int                `json:"chunk_size" yaml:"chunk_size"`
	MaxInFlight     int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching        batchconfig.Config `json:"batching" yaml:"batching"`
	CollisionMode   string             `json:"collision_mode" yaml:"collision_mode"`
}

GCPCloudStorageConfig contains configuration fields for the GCP Cloud Storage output type.

func NewGCPCloudStorageConfig

func NewGCPCloudStorageConfig() GCPCloudStorageConfig

NewGCPCloudStorageConfig creates a new Config with default values.

type GCPPubSubConfig

type GCPPubSubConfig struct {
	ProjectID      string                       `json:"project" yaml:"project"`
	TopicID        string                       `json:"topic" yaml:"topic"`
	MaxInFlight    int                          `json:"max_in_flight" yaml:"max_in_flight"`
	PublishTimeout string                       `json:"publish_timeout" yaml:"publish_timeout"`
	Metadata       metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	OrderingKey    string                       `json:"ordering_key" yaml:"ordering_key"`
	Endpoint       string                       `json:"endpoint" yaml:"endpoint"`
	FlowControl    GCPPubSubFlowControlConfig   `json:"flow_control" yaml:"flow_control"`
}

GCPPubSubConfig contains configuration fields for the output GCPPubSub type.

func NewGCPPubSubConfig

func NewGCPPubSubConfig() GCPPubSubConfig

NewGCPPubSubConfig creates a new Config with default values.

type GCPPubSubFlowControlConfig

type GCPPubSubFlowControlConfig struct {
	MaxOutstandingMessages int    `json:"max_outstanding_messages" yaml:"max_outstanding_messages"`
	MaxOutstandingBytes    int    `json:"max_outstanding_bytes" yaml:"max_outstanding_bytes"`
	LimitExceededBehavior  string `json:"limit_exceeded_behavior" yaml:"limit_exceeded_behavior"`
}

GCPPubSubFlowControlConfig configures a flow control policy of the PubSub client. This affects the internal buffering mechanism that the client manages for each topic when publishing messages.

func NewGCPPubSubFlowControlConfig

func NewGCPPubSubFlowControlConfig() GCPPubSubFlowControlConfig

NewGCPPubSubFlowControlConfig creates a new flow control policy with default values.

type HDFSConfig

type HDFSConfig struct {
	Hosts       []string           `json:"hosts" yaml:"hosts"`
	User        string             `json:"user" yaml:"user"`
	Directory   string             `json:"directory" yaml:"directory"`
	Path        string             `json:"path" yaml:"path"`
	MaxInFlight int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching    batchconfig.Config `json:"batching" yaml:"batching"`
}

HDFSConfig contains configuration fields for the HDFS output type.

func NewHDFSConfig

func NewHDFSConfig() HDFSConfig

NewHDFSConfig creates a new Config with default values.

type HTTPServerConfig

type HTTPServerConfig struct {
	Address      string                `json:"address" yaml:"address"`
	Path         string                `json:"path" yaml:"path"`
	StreamPath   string                `json:"stream_path" yaml:"stream_path"`
	WSPath       string                `json:"ws_path" yaml:"ws_path"`
	AllowedVerbs []string              `json:"allowed_verbs" yaml:"allowed_verbs"`
	Timeout      string                `json:"timeout" yaml:"timeout"`
	CertFile     string                `json:"cert_file" yaml:"cert_file"`
	KeyFile      string                `json:"key_file" yaml:"key_file"`
	CORS         httpserver.CORSConfig `json:"cors" yaml:"cors"`
}

HTTPServerConfig contains configuration fields for the HTTPServer output type.

func NewHTTPServerConfig

func NewHTTPServerConfig() HTTPServerConfig

NewHTTPServerConfig creates a new HTTPServerConfig with default values.

type KafkaConfig

type KafkaConfig struct {
	Addresses        []string    `json:"addresses" yaml:"addresses"`
	ClientID         string      `json:"client_id" yaml:"client_id"`
	RackID           string      `json:"rack_id" yaml:"rack_id"`
	Key              string      `json:"key" yaml:"key"`
	Partitioner      string      `json:"partitioner" yaml:"partitioner"`
	Partition        string      `json:"partition" yaml:"partition"`
	Topic            string      `json:"topic" yaml:"topic"`
	Compression      string      `json:"compression" yaml:"compression"`
	MaxMsgBytes      int         `json:"max_msg_bytes" yaml:"max_msg_bytes"`
	Timeout          string      `json:"timeout" yaml:"timeout"`
	AckReplicas      bool        `json:"ack_replicas" yaml:"ack_replicas"`
	TargetVersion    string      `json:"target_version" yaml:"target_version"`
	TLS              btls.Config `json:"tls" yaml:"tls"`
	SASL             sasl.Config `json:"sasl" yaml:"sasl"`
	MaxInFlight      int         `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config   `json:",inline" yaml:",inline"`
	RetryAsBatch     bool                         `json:"retry_as_batch" yaml:"retry_as_batch"`
	Batching         batchconfig.Config           `json:"batching" yaml:"batching"`
	StaticHeaders    map[string]string            `json:"static_headers" yaml:"static_headers"`
	Metadata         metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	InjectTracingMap string                       `json:"inject_tracing_map" yaml:"inject_tracing_map"`
}

KafkaConfig contains configuration fields for the Kafka output type.

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

NewKafkaConfig creates a new KafkaConfig with default values.

type KinesisConfig

type KinesisConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Stream         string `json:"stream" yaml:"stream"`
	HashKey        string `json:"hash_key" yaml:"hash_key"`
	PartitionKey   string `json:"partition_key" yaml:"partition_key"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

KinesisConfig contains configuration fields for the Kinesis output type.

func NewKinesisConfig

func NewKinesisConfig() KinesisConfig

NewKinesisConfig creates a new Config with default values.

type KinesisFirehoseConfig

type KinesisFirehoseConfig struct {
	SessionConfig  `json:",inline" yaml:",inline"`
	Stream         string `json:"stream" yaml:"stream"`
	MaxInFlight    int    `json:"max_in_flight" yaml:"max_in_flight"`
	retries.Config `json:",inline" yaml:",inline"`
	Batching       batchconfig.Config `json:"batching" yaml:"batching"`
}

KinesisFirehoseConfig contains configuration fields for the KinesisFirehose output type.

func NewKinesisFirehoseConfig

func NewKinesisFirehoseConfig() KinesisFirehoseConfig

NewKinesisFirehoseConfig creates a new Config with default values.

type MQTTConfig

type MQTTConfig struct {
	URLs                  []string      `json:"urls" yaml:"urls"`
	QoS                   uint8         `json:"qos" yaml:"qos"`
	Retained              bool          `json:"retained" yaml:"retained"`
	RetainedInterpolated  string        `json:"retained_interpolated" yaml:"retained_interpolated"`
	Topic                 string        `json:"topic" yaml:"topic"`
	ClientID              string        `json:"client_id" yaml:"client_id"`
	DynamicClientIDSuffix string        `json:"dynamic_client_id_suffix" yaml:"dynamic_client_id_suffix"`
	Will                  mqttconf.Will `json:"will" yaml:"will"`
	User                  string        `json:"user" yaml:"user"`
	Password              string        `json:"password" yaml:"password"`
	ConnectTimeout        string        `json:"connect_timeout" yaml:"connect_timeout"`
	WriteTimeout          string        `json:"write_timeout" yaml:"write_timeout"`
	KeepAlive             int64         `json:"keepalive" yaml:"keepalive"`
	MaxInFlight           int           `json:"max_in_flight" yaml:"max_in_flight"`
	TLS                   tls.Config    `json:"tls" yaml:"tls"`
}

MQTTConfig contains configuration fields for the MQTT output type.

func NewMQTTConfig

func NewMQTTConfig() MQTTConfig

NewMQTTConfig creates a new MQTTConfig with default values.

type MongoDBConfig

type MongoDBConfig struct {
	MongoConfig client.Config `json:",inline" yaml:",inline"`

	Operation    string              `json:"operation" yaml:"operation"`
	WriteConcern client.WriteConcern `json:"write_concern" yaml:"write_concern"`

	FilterMap   string `json:"filter_map" yaml:"filter_map"`
	DocumentMap string `json:"document_map" yaml:"document_map"`
	HintMap     string `json:"hint_map" yaml:"hint_map"`

	// DeleteEmptyValue bool `json:"delete_empty_value" yaml:"delete_empty_value"`
	Upsert      bool               `json:"upsert" yaml:"upsert"`
	MaxInFlight int                `json:"max_in_flight" yaml:"max_in_flight"`
	RetryConfig retries.Config     `json:",inline" yaml:",inline"`
	Batching    batchconfig.Config `json:"batching" yaml:"batching"`
}

MongoDBConfig contains config fields for the MongoDB output type.

func NewMongoDBConfig

func NewMongoDBConfig() MongoDBConfig

NewMongoDBConfig creates a MongoDB populated with default values.

type NATSConfig

type NATSConfig struct {
	URLs        []string          `json:"urls" yaml:"urls"`
	Subject     string            `json:"subject" yaml:"subject"`
	Headers     map[string]string `json:"headers" yaml:"headers"`
	MaxInFlight int               `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config       `json:"tls" yaml:"tls"`
	Auth        auth.Config       `json:"auth" yaml:"auth"`
}

NATSConfig contains configuration fields for the NATS output type.

func NewNATSConfig

func NewNATSConfig() NATSConfig

NewNATSConfig creates a new NATSConfig with default values.

type NATSStreamConfig

type NATSStreamConfig struct {
	URLs        []string    `json:"urls" yaml:"urls"`
	ClusterID   string      `json:"cluster_id" yaml:"cluster_id"`
	ClientID    string      `json:"client_id" yaml:"client_id"`
	Subject     string      `json:"subject" yaml:"subject"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	Auth        auth.Config `json:"auth" yaml:"auth"`
}

NATSStreamConfig contains configuration fields for the NATSStream output type.

func NewNATSStreamConfig

func NewNATSStreamConfig() NATSStreamConfig

NewNATSStreamConfig creates a new NATSStreamConfig with default values.

type NSQConfig

type NSQConfig struct {
	Address     string      `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"`
	Topic       string      `json:"topic" yaml:"topic"`
	UserAgent   string      `json:"user_agent" yaml:"user_agent"`
	TLS         btls.Config `json:"tls" yaml:"tls"`
	MaxInFlight int         `json:"max_in_flight" yaml:"max_in_flight"`
}

NSQConfig contains configuration fields for the NSQ output type.

func NewNSQConfig

func NewNSQConfig() NSQConfig

NewNSQConfig creates a new NSQConfig with default values.

type NanomsgConfig

type NanomsgConfig struct {
	URLs        []string `json:"urls" yaml:"urls"`
	Bind        bool     `json:"bind" yaml:"bind"`
	SocketType  string   `json:"socket_type" yaml:"socket_type"`
	PollTimeout string   `json:"poll_timeout" yaml:"poll_timeout"`
	MaxInFlight int      `json:"max_in_flight" yaml:"max_in_flight"`
}

NanomsgConfig contains configuration fields for the Nanomsg output type.

func NewNanomsgConfig

func NewNanomsgConfig() NanomsgConfig

NewNanomsgConfig creates a new NanomsgConfig with default values.

type OptionalAWSConfig

type OptionalAWSConfig struct {
	Enabled     bool `json:"enabled" yaml:"enabled"`
	sess.Config `json:",inline" yaml:",inline"`
}

OptionalAWSConfig contains config fields for AWS authentication with an enable flag.

type PasswordAuthenticator

type PasswordAuthenticator struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	Username string `json:"username" yaml:"username"`
	Password string `json:"password" yaml:"password"`
}

PasswordAuthenticator contains the fields that will be used to authenticate with the Cassandra cluster.

type RedisHashConfig

type RedisHashConfig struct {
	bredis.Config  `json:",inline" yaml:",inline"`
	Key            string            `json:"key" yaml:"key"`
	WalkMetadata   bool              `json:"walk_metadata" yaml:"walk_metadata"`
	WalkJSONObject bool              `json:"walk_json_object" yaml:"walk_json_object"`
	Fields         map[string]string `json:"fields" yaml:"fields"`
	MaxInFlight    int               `json:"max_in_flight" yaml:"max_in_flight"`
}

RedisHashConfig contains configuration fields for the RedisHash output type.

func NewRedisHashConfig

func NewRedisHashConfig() RedisHashConfig

NewRedisHashConfig creates a new RedisHashConfig with default values.

type RedisListConfig

type RedisListConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Key           string             `json:"key" yaml:"key"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batchconfig.Config `json:"batching" yaml:"batching"`
}

RedisListConfig contains configuration fields for the RedisList output type.

func NewRedisListConfig

func NewRedisListConfig() RedisListConfig

NewRedisListConfig creates a new RedisListConfig with default values.

type RedisPubSubConfig

type RedisPubSubConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Channel       string             `json:"channel" yaml:"channel"`
	MaxInFlight   int                `json:"max_in_flight" yaml:"max_in_flight"`
	Batching      batchconfig.Config `json:"batching" yaml:"batching"`
}

RedisPubSubConfig contains configuration fields for the RedisPubSub output type.

func NewRedisPubSubConfig

func NewRedisPubSubConfig() RedisPubSubConfig

NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.

type RedisStreamsConfig

type RedisStreamsConfig struct {
	bredis.Config `json:",inline" yaml:",inline"`
	Stream        string                       `json:"stream" yaml:"stream"`
	BodyKey       string                       `json:"body_key" yaml:"body_key"`
	MaxLenApprox  int64                        `json:"max_length" yaml:"max_length"`
	MaxInFlight   int                          `json:"max_in_flight" yaml:"max_in_flight"`
	Metadata      metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	Batching      batchconfig.Config           `json:"batching" yaml:"batching"`
}

RedisStreamsConfig contains configuration fields for the RedisStreams output type.

func NewRedisStreamsConfig

func NewRedisStreamsConfig() RedisStreamsConfig

NewRedisStreamsConfig creates a new RedisStreamsConfig with default values.

type RetryConfig

type RetryConfig struct {
	Output         *Config `json:"output" yaml:"output"`
	retries.Config `json:",inline" yaml:",inline"`
}

RetryConfig contains configuration values for the Retry output type.

func NewRetryConfig

func NewRetryConfig() RetryConfig

NewRetryConfig creates a new RetryConfig with default values.

func (RetryConfig) MarshalJSON

func (r RetryConfig) MarshalJSON() ([]byte, error)

MarshalJSON prints an empty object instead of nil.

func (RetryConfig) MarshalYAML

func (r RetryConfig) MarshalYAML() (any, error)

MarshalYAML prints an empty object instead of nil.

type SFTPConfig

type SFTPConfig struct {
	Address     string                `json:"address" yaml:"address"`
	Path        string                `json:"path" yaml:"path"`
	Codec       string                `json:"codec" yaml:"codec"`
	Credentials sftpSetup.Credentials `json:"credentials" yaml:"credentials"`
	MaxInFlight int                   `json:"max_in_flight" yaml:"max_in_flight"`
}

SFTPConfig contains configuration fields for the SFTP output type.

func NewSFTPConfig

func NewSFTPConfig() SFTPConfig

NewSFTPConfig creates a new Config with default values.

type SNSConfig

type SNSConfig struct {
	TopicArn               string                       `json:"topic_arn" yaml:"topic_arn"`
	MessageGroupID         string                       `json:"message_group_id" yaml:"message_group_id"`
	MessageDeduplicationID string                       `json:"message_deduplication_id" yaml:"message_deduplication_id"`
	Metadata               metadata.ExcludeFilterConfig `json:"metadata" yaml:"metadata"`
	SessionConfig          `json:",inline" yaml:",inline"`
	Timeout                string `json:"timeout" yaml:"timeout"`
	MaxInFlight            int    `json:"max_in_flight" yaml:"max_in_flight"`
}

SNSConfig contains configuration fields for the output SNS type.

func NewSNSConfig

func NewSNSConfig() SNSConfig

NewSNSConfig creates a new Config with default values.

type STDOUTConfig

type STDOUTConfig struct {
	Codec string `json:"codec" yaml:"codec"`
}

STDOUTConfig contains configuration fields for the stdout based output type.

func NewSTDOUTConfig

func NewSTDOUTConfig() STDOUTConfig

NewSTDOUTConfig creates a new STDOUTConfig with default values.

type SessionConfig

type SessionConfig struct {
	sess.Config `json:",inline" yaml:",inline"`
}

SessionConfig hides a general AWS session config struct.

type SocketConfig

type SocketConfig struct {
	Network string `json:"network" yaml:"network"`
	Address string `json:"address" yaml:"address"`
	Codec   string `json:"codec" yaml:"codec"`
}

SocketConfig contains configuration fields for the Socket output type.

func NewSocketConfig

func NewSocketConfig() SocketConfig

NewSocketConfig creates a new SocketConfig with default values.

type Streamed

type Streamed interface {
	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by outputs and provides channel based streaming APIs.

func NewAsyncWriter

func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)

NewAsyncWriter creates a Streamed implementation around an AsyncSink.

func OnlySinglePayloads

func OnlySinglePayloads(out Streamed) Streamed

OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.

func WrapWithPipelines

func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an output with a variadic number of pipelines.

type SubprocessConfig

type SubprocessConfig struct {
	Name  string   `json:"name" yaml:"name"`
	Args  []string `json:"args" yaml:"args"`
	Codec string   `json:"codec" yaml:"codec"`
}

SubprocessConfig contains configuration for the Subprocess input type.

func NewSubprocessConfig

func NewSubprocessConfig() SubprocessConfig

NewSubprocessConfig creates a new SubprocessConfig with default values.

type SwitchConfig

type SwitchConfig struct {
	RetryUntilSuccess bool               `json:"retry_until_success" yaml:"retry_until_success"`
	StrictMode        bool               `json:"strict_mode" yaml:"strict_mode"`
	Cases             []SwitchConfigCase `json:"cases" yaml:"cases"`
}

SwitchConfig contains configuration fields for the switchOutput output type.

func NewSwitchConfig

func NewSwitchConfig() SwitchConfig

NewSwitchConfig creates a new SwitchConfig with default values.

type SwitchConfigCase

type SwitchConfigCase struct {
	Check    string `json:"check" yaml:"check"`
	Continue bool   `json:"continue" yaml:"continue"`
	Output   Config `json:"output" yaml:"output"`
}

SwitchConfigCase contains configuration fields per output of a switch type.

func NewSwitchConfigCase

func NewSwitchConfigCase() SwitchConfigCase

NewSwitchConfigCase creates a new switch output config with default values.

type Sync

type Sync interface {
	// WriteTransaction attempts to write a transaction to an output.
	WriteTransaction(context.Context, message.Transaction) error

	// Connected returns a boolean indicating whether this output is currently
	// connected to its target.
	Connected() bool

	// TriggerStopConsuming instructs the output to start shutting down
	// resources once all pending messages are delivered and acknowledged.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Sync is a common interface implemented by outputs and provides synchronous based writing APIs.

type TryConfig

type TryConfig []Config

TryConfig contains configuration fields for the Try output type.

func NewTryConfig

func NewTryConfig() TryConfig

NewTryConfig creates a new BrokerConfig with default values.

type WithPipeline

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.

func WrapWithPipeline

func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.

func (*WithPipeline) Connected

func (i *WithPipeline) Connected() bool

Connected returns a boolean indicating whether this output is currently connected to its target.

func (*WithPipeline) Consume

func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error

Consume starts the type listening to a message channel from a producer.

func (*WithPipeline) TriggerCloseNow

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers a closure of this object but does not block.

func (*WithPipeline) WaitForClose

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL