benthos: github.com/Jeffail/benthos/lib/stream Index | Examples | Files | Directories

package stream

import "github.com/Jeffail/benthos/lib/stream"

Package stream creates and manages a full Benthos stream pipeline, consisting of an input layer of consumers, an optional buffer layer, a processing pipelines layer, and an output layer of producers:

Inputs -> Buffer -> Processing Pipelines -> Outputs

The number of parallel input consumers, processing pipelines, and output producers depends on the configuration of the stream.

Custom Stream Processors

It is possible to construct a stream with your own custom processor implementations embedded within it. This results in your processors being executed for each discrete message batch at the end of any other configured processors.

Your custom processors will be constructed once per pipeline processing thread, e.g. with four pipeline processing threads the pipeline would look like this:

Inputs -> Buffer -> Processing Pipeline -> Custom Processor -> Outputs
                 \  Processing Pipeline -> Custom Processor /
                 \  Processing Pipeline -> Custom Processor /
                 \  Processing Pipeline -> Custom Processor /

Plugins

Benthos components (inputs, processors, conditions, outputs, etc) are pluggable by design, and can be complemented with your custom implementations by calling RegisterPlugin on a component package.

This method is more complicated than simply adding a custom stream processor, but allows you to use your custom implementations in the same flexible way that native Benthos types can be used.

Message Batches

In Benthos every message is a batch, and it is the configuration of a stream that determines the size of each batch (usually 1.) Therefore all processors, including your custom implementations, support batches.

Sometimes your custom processors will require batches of a certain size in order to function. It is recommended that you perform message batching using the standard Benthos batch or combine processors, as it will ensure resiliency throughout the stream pipeline. For example, you can add a batch processor to your input layer:

conf := NewConfig()

conf.Input.Type = input.TypeKafka
conf.Input.Kafka.Addresses = []string{"localhost:9092"}
conf.Input.Kafka.Topic = "example_topic_one"

conf.Input.Processors = append(conf.Input.Processors, processor.NewConfig())
conf.Input.Processors[0].Type = processor.TypeBatch
conf.Input.Processors[0].Batch.ByteSize = 10000000 // 10MB

Horizontal Scaling

The standard set of processors of a Benthos stream are stateless and can therefore be horizontally scaled without impacting the results. Horizontal scaling therefore only depends on the sources of data of a stream.

Most message queues/protocols provide mechanisms to automatically distribute messages horizontally across consumers. Horizontally scaling Benthos is therefore as simple as applying those means.

Kafka, for example, allows you to distribute messages across partitions, which can either be statically distributed across consumers or, using the kafka_balanced input type, can be dynamically distributed across consumers.

Vertical Scaling

Vertically scaled message processing can be done in Benthos with parallel processing pipelines, where the number of threads is configurable in the pipeline second of a stream configuration. However, in order to saturate those processing threads your configuration needs one of two things: multiple parallel inputs or a memory buffer.

Adding a memory buffer is a simple way of scaling a single input consumer across processing threads, but this removes the automatic delivery guarantees that Benthos provides.

Instead, it is recommended that you create parallel input sources, the number of which should at least match the number of processing threads. This retains the delivery guarantees of your sources and sinks by keeping them tightly coupled and is done by configuring a broker input type, for example, processing across four threads with eight parallel consumers:

// Create a Kafka input with automatic partition balancing
inputConf := input.NewConfig()

inputConf.Type = input.TypeKafkaBalanced
inputConf.KafkaBalanced.Addresses = []string{"localhost:9092"}
inputConf.KafkaBalanced.Topics = []string{"example_topic_one"}

// Create a decompression processor (default gzip)
processorConf := processor.NewConfig()
processorConf.Type = processor.TypeDecompress

// Create a stream with eight parallel consumers and four processing threads
conf := NewConfig()

conf.Input.Type = input.TypeBroker
conf.Input.Broker.Inputs = append(conf.Input.Broker.Inputs, inputConf)
conf.Input.Broker.Copies = 8

conf.Pipeline.Processors = append(conf.Pipeline.Processors, processorConf)
conf.Pipeline.Threads = 4

Delivery Guarantees

A Benthos stream, without a buffer (the default), guarantees at-least-once message delivery matching the source and sink protocols used. Meaning if you are consuming a Kafka stream and producing to a Kafka stream then Benthos matches the at-least-once delivery guarantees of Kafka.

If you configure a stream with a buffer then your delivery guarantees will depend on the resiliency of the buffer method you have chosen.

Processor Idempotency

Benthos processors are usually stateless operations that are idempotent by their nature, meaning duplicate messages travelling the pipeline do not impact the result of the processor itself.

If your custom processors are stateful and exhibit side effects you will need to implement your own tooling in order to guarantee exactly-once processing of messages.

ExampleBase64Encoder demonstrates running a Kafka to Kafka stream where each incoming message is encoded with base64.

Code:

package main

import (
    "bytes"
    "encoding/base64"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Jeffail/benthos/v3/lib/input"
    "github.com/Jeffail/benthos/v3/lib/output"
    "github.com/Jeffail/benthos/v3/lib/types"
)

// Base64Encoder is a types.Processor implementation that base64 encodes
// all messages travelling through a Benthos stream.
type Base64Encoder struct{}

// ProcessMessage base64 encodes all messages.
func (p Base64Encoder) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
    // Create a copy of the original message
    result := m.Copy()

    // For each message part replace its contents with the base64 encoded
    // version.
    result.Iter(func(i int, part types.Part) error {
        var buf bytes.Buffer

        e := base64.NewEncoder(base64.StdEncoding, &buf)
        e.Write(part.Get())
        e.Close()

        part.Set(buf.Bytes())
        return nil
    })

    return []types.Message{result}, nil
}

// CloseAsync shuts down the processor and stops processing requests.
func (p Base64Encoder) CloseAsync() {
    // Do nothing as our processor doesn't require resource cleanup.
}

// WaitForClose blocks until the processor has closed down.
func (p Base64Encoder) WaitForClose(timeout time.Duration) error {
    // Do nothing as our processor doesn't require resource cleanup.
    return nil
}

// ExampleBase64Encoder demonstrates running a Kafka to Kafka stream where each
// incoming message is encoded with base64.
func main() {
    conf := NewConfig()

    conf.Input.Type = input.TypeKafka
    conf.Input.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Input.Kafka.Topic = "example_topic_one"

    conf.Output.Type = output.TypeKafka
    conf.Output.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Output.Kafka.Topic = "example_topic_two"

    s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
        return Base64Encoder{}, nil
    }))
    if err != nil {
        panic(err)
    }

    defer s.Stop(time.Second)

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    // Wait for termination signal
    select {
    case <-sigChan:
        log.Println("Received SIGTERM, the service is closing.")
    }
}

ExampleSplitToBatch demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single batch of messages.

Code:

package main

import (
    "bytes"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Jeffail/benthos/v3/lib/input"
    "github.com/Jeffail/benthos/v3/lib/message"
    "github.com/Jeffail/benthos/v3/lib/output"
    "github.com/Jeffail/benthos/v3/lib/types"
)

// SplitToBatch is a types.Processor implementation that reads a single message
// containing line delimited payloads and splits the payloads into a single
// batch of messages per line.
type SplitToBatch struct{}

// ProcessMessage splits messages of a batch by lines and sends them onwards as
// a batch of messages.
func (p SplitToBatch) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
    var splitParts [][]byte
    m.Iter(func(i int, b types.Part) error {
        splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...)
        return nil
    })

    return []types.Message{message.New(splitParts)}, nil
}

// CloseAsync shuts down the processor and stops processing requests.
func (p SplitToBatch) CloseAsync() {
    // Do nothing as our processor doesn't require resource cleanup.
}

// WaitForClose blocks until the processor has closed down.
func (p SplitToBatch) WaitForClose(timeout time.Duration) error {
    // Do nothing as our processor doesn't require resource cleanup.
    return nil
}

// ExampleSplitToBatch demonstrates running a Kafka to Kafka stream where each
// incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single batch of messages.
func main() {
    conf := NewConfig()

    conf.Input.Type = input.TypeKafka
    conf.Input.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Input.Kafka.Topic = "example_topic_one"

    conf.Output.Type = output.TypeKafka
    conf.Output.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Output.Kafka.Topic = "example_topic_two"

    s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
        return SplitToBatch{}, nil
    }))
    if err != nil {
        panic(err)
    }

    defer s.Stop(time.Second)

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    // Wait for termination signal
    select {
    case <-sigChan:
        log.Println("Received SIGTERM, the service is closing.")
    }
}

ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single message per payload.

Code:

package main

import (
    "bytes"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Jeffail/benthos/v3/lib/input"
    "github.com/Jeffail/benthos/v3/lib/message"
    "github.com/Jeffail/benthos/v3/lib/output"
    "github.com/Jeffail/benthos/v3/lib/types"
)

// SplitToMessages is a types.Processor implementation that reads a single
// message containing line delimited payloads and splits the payloads into a
// single message per line.
type SplitToMessages struct{}

// ProcessMessage splits messages of a batch by lines and sends them onwards as
// an individual message per payload.
func (p SplitToMessages) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
    var splitParts [][]byte
    m.Iter(func(i int, b types.Part) error {
        splitParts = append(splitParts, bytes.Split(b.Get(), []byte("\n"))...)
        return nil
    })

    messages := make([]types.Message, len(splitParts))
    for i, part := range splitParts {
        messages[i] = message.New([][]byte{part})
    }
    return messages, nil
}

// CloseAsync shuts down the processor and stops processing requests.
func (p SplitToMessages) CloseAsync() {
    // Do nothing as our processor doesn't require resource cleanup.
}

// WaitForClose blocks until the processor has closed down.
func (p SplitToMessages) WaitForClose(timeout time.Duration) error {
    // Do nothing as our processor doesn't require resource cleanup.
    return nil
}

// ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where
// each incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single message per payload.
func main() {
    conf := NewConfig()

    conf.Input.Type = input.TypeKafka
    conf.Input.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Input.Kafka.Topic = "example_topic_one"

    conf.Output.Type = output.TypeKafka
    conf.Output.Kafka.Addresses = []string{
        "localhost:9092",
    }
    conf.Output.Kafka.Topic = "example_topic_two"

    s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
        return SplitToMessages{}, nil
    }))
    if err != nil {
        panic(err)
    }

    defer s.Stop(time.Second)

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    // Wait for termination signal
    select {
    case <-sigChan:
        log.Println("Received SIGTERM, the service is closing.")
    }
}

ExampleYAMLConfig demonstrates running a Benthos stream with a configuration parsed from a YAML file and a custom processor.

Code:

package main

import (
    "io/ioutil"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Jeffail/benthos/v3/lib/types"
    yaml "gopkg.in/yaml.v3"
)

// CustomProcessor is a types.Processor implementation that does nothing.
type CustomProcessor struct{}

// ProcessMessage does nothing.
func (p CustomProcessor) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
    return []types.Message{m}, nil
}

// CloseAsync shuts down the processor and stops processing requests.
func (p CustomProcessor) CloseAsync() {
    // Do nothing as our processor doesn't require resource cleanup.
}

// WaitForClose blocks until the processor has closed down.
func (p CustomProcessor) WaitForClose(timeout time.Duration) error {
    // Do nothing as our processor doesn't require resource cleanup.
    return nil
}

// ExampleYAMLConfig demonstrates running a Benthos stream with a configuration
// parsed from a YAML file and a custom processor.
func main() {
    confBytes, err := ioutil.ReadFile("./foo.yaml")
    if err != nil {
        panic(err)
    }

    conf := NewConfig()
    if err = yaml.Unmarshal(confBytes, &conf); err != nil {
        panic(err)
    }

    s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
        return CustomProcessor{}, nil
    }))
    if err != nil {
        panic(err)
    }

    defer s.Stop(time.Second)

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    // Wait for termination signal
    select {
    case <-sigChan:
        log.Println("Received SIGTERM, the service is closing.")
    }
}

Index

Examples

Package Files

config.go package.go type.go

func OptAddProcessors Uses

func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)

OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.

func OptOnClose Uses

func OptOnClose(onClose func()) func(*Type)

OptOnClose sets a closure to be called when the stream closes.

func OptSetLogSimple Uses

func OptSetLogSimple(l log.PrintFormatter) func(*Type)

OptSetLogSimple sets the logging output to a simpler log interface (implemented by the standard *log.Logger.)

func OptSetLogger Uses

func OptSetLogger(l log.Modular) func(*Type)

OptSetLogger sets the logging output to be used by all components of the stream. To avoid implementing the log.Modular interface with a custom logger consider using OptSetLogSimple instead.

func OptSetManager Uses

func OptSetManager(mgr types.Manager) func(*Type)

OptSetManager sets the service manager to be used by all components of the stream.

func OptSetStats Uses

func OptSetStats(stats metrics.Type) func(*Type)

OptSetStats sets the metrics aggregator to be used by all components of the stream.

type Config Uses

type Config struct {
    Input    input.Config    `json:"input" yaml:"input"`
    Buffer   buffer.Config   `json:"buffer" yaml:"buffer"`
    Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"`
    Output   output.Config   `json:"output" yaml:"output"`
}

Config is a configuration struct representing all four layers of a Benthos stream.

func NewConfig Uses

func NewConfig() Config

NewConfig returns a new configuration with default values.

func (Config) Sanitised Uses

func (c Config) Sanitised() (interface{}, error)

Sanitised returns a sanitised copy of the Benthos configuration, meaning fields of no consequence (unused inputs, outputs, processors etc) are excluded.

type Type Uses

type Type struct {
    // contains filtered or unexported fields
}

Type creates and manages the lifetime of a Benthos stream.

func New Uses

func New(conf Config, opts ...func(*Type)) (*Type, error)

New creates a new stream.Type.

func (*Type) Stop Uses

func (t *Type) Stop(timeout time.Duration) error

Stop attempts to close the stream within the specified timeout period. Initially the attempt is graceful, but as the timeout draws close the attempt becomes progressively less graceful.

Directories

PathSynopsis
managerPackage manager creates and manages multiple streams, providing an API for performing CRUD operations.

Package stream imports 11 packages (graph) and is imported by 5 packages. Updated 2019-09-09. Refresh now. Tools for package owners.