firebolt

package module
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

firebolt Code Coverage Badge by Gopherbadger Build Status Go Report Card

firebolt logo

A golang framework for streaming event processing & data pipeline apps

Introduction

Firebolt has a simple model intended to make it easier to write reliable pipeline applications that process a stream of data.

It can be used to build systems such as:

  • logging/observability pipelines
  • streaming ETL
  • event processing pipelines

Every application's pipeline starts with a single source, the component that receives events from some external system. Sources must implement the node.Source interface.

We provide one built-in source:

  • kafkaconsumer - Events come from a Kafka topic, and are passed to the root nodes as []byte

The processing of your application is executed by its nodes which form a processing tree. Data - events - flow down this tree. A parent node passes results down to it's child nodes. Nodes may process events synchronously or asynchronously. A synchronous node type node.FanoutNode returns a slice of results for fanout or 'demultiplexing' use cases. Each node must implement the node.SyncNode, node.FanoutNode, or node.AsyncNode interfaces accordingly.

We provide two built-in node types:

  • kafkaproducer - Events are produced onto a kafka topic by an asynchronous producer.
  • elasticsearch - Events are bulk indexed into Elasticsearch.

Firebolt has both run and compile-time dependencies on librdkafka, see Developing

Example: Logging Pipeline

At DigitalOcean, our first use of Firebolt was in our logging pipeline. This pipeline consumes logs from just about every system we run. The diagram below depicts the source and nodes in this application.

This system uses the built-in kafkaconsumer source (in yellow) and kafkaproducer and elasticsearch nodes (in green). The blue nodes are custom to this application.

Logging Pipeline Node Diagram

What does Firebolt do for me?

Firebolt is intended to address a number of concerns that are common to near-realtime data pipeline applications, making it easy to run a clustered application that scales predictably to handle large data volume.

It is not an analytics tool - it does not provide an easy way to support 'wide operations' like record grouping, windowing, or sorting that require shuffling data within the cluster. Firebolt is for 'straight through' processing pipelines that are not sensitive to the order in which events are processed.

Some of the concerns Firebolt addresses include:

  • kafka sources Minimal configuration and no code required to consume from a Kafka topic, consumer lag metrics included
  • kafka sinks Same for producing to a Kafka topic
  • loose coupling Nodes in the pipeline are loosely coupled, making them easily testable and highly reusable
  • simple stream filtering Filter the stream by returning nil in your nodes
  • convenient error handling Send events that fail processing to a kafka topic for recovery or analysis with a few lines of config
  • outage recovery: offset management Configurable Kafka offset management during recovery lets you determine the maximum "catch up" to attempt after an outage, so you can quickly get back to realtime processing.
  • outage recovery: parallel recovery After an outage, process realtime data and "fill-in" the outage time window in parallel, with a rate limit on the recovery window.
  • monitorability Firebolt exposes Prometheus metrics to track the performance of your Source and all Nodes without writing code. Your nodes can expose their own custom internal metrics as needed.
  • leader election Firebolt uses Zookeeper to conduct leader elections, facilitating any processing that may need to be conducted on one-and-only-one instance.

Documentation

  1. Configuration The configuration file format

  2. Execution How Firebolt processes your data

  3. Registry Adding node types to the registry

  4. Sample Application Code Example code for running the Firebolt executor

  5. Sources Implementing and using sources

  6. Sync Nodes Implementing and using synchronous nodes

  7. Fanout Nodes Implementing and using fanout nodes

  8. Async Nodes Implementing and using asynchronous nodes

  9. Leader Election Starting leader election and accessing election results

  10. Messaging How to send and receive messages between the components of your system

  11. Metrics What metrics are exposed by default, and how to add custom metrics to your nodes

Built-In Types

  1. Kafka Producer Node for producing events onto a Kafka topic

  2. Elasticsearch Node for indexing documents to an Elasticsearch cluster

Developing

Firebolt depends on librdkafka v1.3.0 or later. To get started building a firebolt app (or working on firebolt itself), install it following the instructions here.

An example for debian-based distros:

sudo wget -qO - https://packages.confluent.io/deb/5.4/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.4 stable main"
sudo apt-get update
sudo apt-get install -y librdkafka1 librdkafka-dev

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncEvent

type AsyncEvent struct {
	*Event
	ReturnError    func(error)
	ReturnEvent    func(*AsyncEvent)
	ReturnFiltered func()
}

AsyncEvent is a version of Event for nodes that support asynchronous processing.

func NewAsyncEvent

func NewAsyncEvent(event *Event, errFunc func(error), eventFunc func(*AsyncEvent), filterFunc func()) *AsyncEvent

NewAsyncEvent creates a version of the passed Event suitable for asynchronous processing.

func (*AsyncEvent) WithPayload

func (ae *AsyncEvent) WithPayload(payload interface{}) *AsyncEvent

WithPayload returns a clone of this event with the payload replaced.

type Event

type Event struct {
	Payload  interface{} `json:"payload"`
	Created  time.Time   `json:"created"`
	Recovery bool        `json:"recovery"`
}

Event is the struct passed through the node graph.

func (*Event) WithPayload

func (e *Event) WithPayload(payload interface{}) *Event

WithPayload returns a clone of this event with the payload replaced.

type EventError

type EventError struct {
	Timestamp time.Time   `json:"timestamp"`
	Event     interface{} `json:"event"`
	Err       error       `json:"error"`
}

EventError is the structure passed to any `error_handler`. When an error occurs in a node that has an error_handler configured, firebolt wraps up the error and the causing event in an EventError. This EventError needs to be handled in your error_handler's Process() method.

func NewEventError

func NewEventError(event *Event, err error) EventError

NewEventError constructs

func (EventError) MarshalJSON

func (ee EventError) MarshalJSON() ([]byte, error)

MarshalJSON converts an EventError to json, replacing any err values that are not FBError with a FBError wrapper so that the generated JSON has consistent structure.

type FBError

type FBError struct {
	Code      string      `json:"code"`
	Msg       string      `json:"message"`
	ErrorInfo interface{} `json:"errorinfo,omitempty"`
}

FBError is an optional error type that can be returned from the Process() method in your nodes,

func NewFBError

func NewFBError(code string, msg string, opts ...FBErrorOpt) FBError

NewFBError constructs a firebolt error. Returning a FBError (rather than just 'error') from the Process() method in your nodes will result in more structured error reports if you use `error_handler` nodes.

func (FBError) Error

func (f FBError) Error() string

type FBErrorOpt

type FBErrorOpt func(FBError) FBError

FBErrorOpt is an option that allows you do add optional data to an FBError when calling the constructor.

func WithInfo

func WithInfo(info interface{}) FBErrorOpt

WithInfo adds information to FBError.ErrorInfo.

type Nodeconfig

type Nodeconfig map[string]string

Nodeconfig holds a Nodes configuration

func (Nodeconfig) Float64Config added in v0.1.12

func (c Nodeconfig) Float64Config(name string, defaultValue float64, minValue float64, maxValue float64) (float64, error)

Float64Config validates and fetches the flaot-typed optional config value specified by 'name', using the 'defaultValue' if no value was provided in the configuration. The default float64 (if used) is formatted following platform-and-golang default precision and width (%f formatting).

func (Nodeconfig) Float64ConfigRequired added in v0.1.12

func (c Nodeconfig) Float64ConfigRequired(name string, minValue, maxValue float64) (float64, error)

Float64ConfigRequired validates and fetches the float64-typed required config value specified by 'name', returning an error if no value was provided in the configuration.

func (Nodeconfig) IntConfig

func (c Nodeconfig) IntConfig(name string, defaultValue int, minValue int, maxValue int) (int, error)

IntConfig validates and fetches the int-typed optional config value specified by 'name', using the 'defaultValue' if no value was provided in the configuration.

func (Nodeconfig) IntConfigRequired

func (c Nodeconfig) IntConfigRequired(name string, minValue int, maxValue int) (int, error)

IntConfigRequired validates and fetches the int-typed required config value specified by 'name', returning an error if no value was provided in the configuration.

func (Nodeconfig) StringConfig

func (c Nodeconfig) StringConfig(name string, defaultValue string) (string, error)

StringConfig validates and fetches the string-typed optional config value specified by 'name', using the 'defaultValue' if no value was provided in the configuration.

func (Nodeconfig) StringConfigRequired

func (c Nodeconfig) StringConfigRequired(name string) (string, error)

StringConfigRequired validates and fetches the string-typed required config value specified by 'name', returning an error if no value was provided in the configuration.

type ProduceRequest added in v0.1.2

type ProduceRequest interface {
	Topic() string
	Message() []byte
}

ProduceRequest is a request to produce a single message to a topic in a messaging system (AMQP, Kafka, ZMQ, etc).

type SimpleProduceRequest added in v0.1.2

type SimpleProduceRequest struct {
	TargetTopic  string
	MessageBytes []byte
}

SimpleProduceRequest is a default implementation of ProduceRequest that can be used in simple cases to request that a message be produced.

func (*SimpleProduceRequest) Message added in v0.1.2

func (s *SimpleProduceRequest) Message() []byte

Message returns the raw message bytes.

func (*SimpleProduceRequest) Topic added in v0.1.2

func (s *SimpleProduceRequest) Topic() string

Topic returns the target topic in the destination messaging system to which this message should be sent.

Directories

Path Synopsis
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.
kafkaproducer
Code generated by mockery v1.0.0.
Code generated by mockery v1.0.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL