rbforwarder

package module
v0.0.0-...-41c7550 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2024 License: LGPL-3.0 Imports: 7 Imported by: 0

README

rbforwarder

rbforwarder is an extensible tool for processing data asynchronously. It allows you to create a custom pipeline in a modular fashion.

You can use rbforwarder to build a pipeline that decodes a JSON, filter or add fields, encode the data again to JSON and send it using multiple protocols HTTP, MQTT, AMQP, etc. It's easy to write a pipeline component.

Built-in features

  • Support multiple workers for each components.
  • Asynchronous report system. Get responses on a separate goroutine.
  • Built-in message retrying. rbforwarder can retry messages on fail.
  • Instrumentation, to have an overview of how the application is performing.

Components

  • Send data to an endpoint
    • MQTT
    • HTTP
    • Kafka
  • Decoders / Encoders
    • JSON
  • Utility
    • Limiter
    • Batcher (supports deflate)

Road Map

The application is on hard development, breaking changes expected until 1.0.

Milestone Feature Status
0.1 Pipeline Done
0.2 Reporter Done
0.3 HTTP component Done
0.4 Batcher component Done
0.5 Limiter component Done
0.6 Instrumentation Pending
0.8 JSON component Pending
0.9 MQTT component Pending
1.0 Kafka component Pending

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger = logrus.NewEntry(log)

Logger for the package

View Source
var Version = "0.5"

Version is the current tag

Functions

This section is empty.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component contains information about a pipeline component

type Config

type Config struct {
	Retries   int
	Backoff   int
	QueueSize int
}

Config stores the configuration for a forwarder

type RBForwarder

type RBForwarder struct {
	// contains filtered or unexported fields
}

RBForwarder is the main objecto of the package. It has the main methods for send messages and get reports. It has a backend for routing messages between workers

func NewRBForwarder

func NewRBForwarder(config Config) *RBForwarder

NewRBForwarder creates a new Forwarder object

func (*RBForwarder) Close

func (f *RBForwarder) Close()

Close stops pending actions

func (*RBForwarder) GetOrderedReports

func (f *RBForwarder) GetOrderedReports() <-chan interface{}

GetOrderedReports is the same as GetReports() but the reports are delivered in order

func (*RBForwarder) GetReports

func (f *RBForwarder) GetReports() <-chan interface{}

GetReports is used by the source to get a report for a sent message. Reports are delivered on the same order that was sent

func (*RBForwarder) Produce

func (f *RBForwarder) Produce(data []byte, opts map[string]interface{}, opaque interface{}) error

Produce is used by the source to send messages to the backend

func (*RBForwarder) PushComponents

func (f *RBForwarder) PushComponents(components []interface{})

PushComponents adds a new component to the pipeline

func (*RBForwarder) Run

func (f *RBForwarder) Run()

Run starts getting messages

type Report

type Report struct {
	Component int
	Code      int
	Status    string
	Opaque    interface{}
	// contains filtered or unexported fields
}

Report contains information abot a delivered message

Directories

Path Synopsis
components

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL