plugin

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2023 License: Apache-2.0 Imports: 23 Imported by: 2

README

Go Reference codecov

Fluent-bit plugin

This repository contains the required Golang interfaces and data types required to create Fluent bit Golang plugins.

Architectural overview

Plugins are implemented via proxy callbacks, fluent-bit implements a generic proxy approach that can be used to load and invoke DSO object callbacks.

The following is the state machine of callbacks.

stateDiagram-v2
    state "fluent-bit engine starts" as start
    start --> FLBPluginRegister
    FLBPluginRegister --> FLBPluginInit : Locate configured plugins
    FLBPluginInit --> FLB_ERR
    FLB_ERR --> FLBPluginUnregister
    FLBPluginInit --> FLB_OK
    FLB_OK --> instance

    state "plugin instance" as instance
    instance --> FLBPluginInputCallback: collect() data
    FLBPluginInputCallback --> WriteMsgPack

    instance --> FLBPluginOutputCallback : Flush(data)
    FLBPluginOutputCallback --> WriteMsgPack
    WriteMsgPack --> [*]

Writing a plugin

Plugins can be written for output and input processing.

As an example, the following are the minimum steps for writing an input plugin. Declaring a plugin requires to implement the InputPlugin interface. Explicitly defining the 2 methods Init and Collect:

package main

import (
	"context"
	"errors"
	"time"

	"github.com/calyptia/plugin"
)

// Plugin needs to be registered as an input type plugin in the initialisation phase
func init() {
	plugin.RegisterInput("go-test-input-plugin", "Golang input plugin for testing", &dummyPlugin{})
}

type dummyPlugin struct {
	foo string
}

// Init An instance of the configuration loader will be passed to the Init method so all the required
// configuration entries can be retrieved within the plugin context.
func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
	plug.foo = fbit.Conf.String("foo")
	return nil
}

// Collect this method will be invoked by the fluent-bit engine after the initialisation is successful
// this method can lock as each plugin its implemented in its own thread. Be aware that the main context
// can be cancelled at any given time, so handle the context properly within this method.
// The *ch* channel parameter, is a channel handled by the runtime that will receive messages from the plugin
// collection, make sure to validate channel closure and to follow the `plugin.Message` struct for messages
// generated by plugins.
func (plug dummyPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
	tick := time.NewTicker(time.Second)

	for {
		select {
		case <-ctx.Done():
			err := ctx.Err()
			if err != nil && !errors.Is(err, context.Canceled) {
				return err
			}

			return nil
		case <-tick.C:
			ch <- plugin.Message{
				Time: time.Now(),
				Record: map[string]string{
					"message": "hello from go-test-input-plugin",
					"foo":     plug.foo,
				},
			}
		}
	}
}

func main() {}

Adding metrics

Plugin can share their metrics over fluent-bit proxy interface. As an example, the following are the minimum steps for sharing plugin's metrics, Using a metric interface to implement the plugin's metrics.

package main

import (
	"context"
	"errors"
	"time"

	"github.com/calyptia/plugin"
	"github.com/calyptia/plugin/metric"
)

type dummyPlugin struct {
	counterExample metric.Counter
}

func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
	plug.counterExample = fbit.Metrics.NewCounter("example_metric_total", "Total number of example metrics", "go-test-input-plugin")
	return nil
}

func (plug dummyPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
	tick := time.NewTicker(time.Second)

	for {
		select {
		case <-ctx.Done():
			err := ctx.Err()
			if err != nil && !errors.Is(err, context.Canceled) {
				return err
			}

			return nil
		case <-tick.C:
			plug.counterExample.Add(1)

			ch <- plugin.Message{
				Time: time.Now(),
				Record: map[string]string{
					"message": "hello from go-test-input-plugin",
					"foo":     plug.foo,
				},
			}
		}
	}
}

func main() {}
Building a plugin

A plugin can be built locally using go build as:

go build -trimpath -buildmode c-shared -o ./bin/go-test-input-plugin.so .

Or compiled to linux/amd64 from another machine using zig (Example working on darwin/arm64).

CGO_ENABLED=1 \
GOOS=linux \
GOARCH=amd64 \
CC="zig cc -target x86_64-linux-gnu -isystem /usr/include -L/usr/lib/x86_64-linux-gnu" \
CXX="zig c++ -target x86_64-linux-gnu -isystem /usr/include -L/usr/lib/x86_64-linux-gnu" \
go build -buildmode=c-shared -trimpath -o ./my-plugin-linux-amd64.so ./...

Or using a Dockerfile as follows:

FROM golang:latest AS builder

WORKDIR /fluent-bit

COPY go.mod .
COPY go.sum .

RUN go mod download
RUN go mod verify

COPY . .

RUN go build -trimpath -buildmode c-shared -o ./bin/go-test-input-plugin.so .

FROM ghcr.io/calyptia/enterprise/advanced:main

COPY --from=builder /fluent-bit/bin/go-test-input-plugin.so /fluent-bit/etc/

ENTRYPOINT [ "/fluent-bit/bin/fluent-bit" ]
CMD [ "/fluent-bit/bin/fluent-bit", "-c", "/fluent-bit/etc/fluent-bit.conf" ]

Then create a fluent-bit.conf as follows:

[SERVICE]
    flush           1
    log_level       info
    plugins_file    /fluent-bit/etc/plugins.conf

[INPUT]
    Name go-test-input-plugin
    Tag  test-input
    foo  bar

Also a plugins.conf definition has to be provided, as follows:

[PLUGINS]
    Path /fluent-bit/etc/go-test-input-plugin.so

Run the docker container as follows:

docker build -t my-fluent-bit-plugin:main .
docker run --platform=linux/amd64 -v $(pwd)/fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf:ro -v $(pwd)/plugins.conf:/fluent-bit/etc/plugins.conf:ro my-fluent-bit-plugin:main

For further examples, please check the examples or testdata folders.

Running tests

Running the local tests must be doable with:

go test -v ./...

Contributing

Please feel free to open PR(s) on this repository and to report any bugs of feature requests on the GitHub issues page.

Documentation

Overview

Package plugin implements the global context and objects required to run an instance of a plugin also, the interfaces for input and output plugins.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FLBPluginExit

func FLBPluginExit() int

FLBPluginExit method is invoked once the plugin instance is exited from the fluent-bit context.

func FLBPluginFlush

func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int

FLBPluginFlush callback gets invoked by the fluent-bit runtime once there is data for the corresponding plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation.

func FLBPluginInit

func FLBPluginInit(ptr unsafe.Pointer) int

FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. here all the plugin context should be initialized and any data or flag required for plugins to execute the collect or flush callback.

func FLBPluginInputCallback

func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int

FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been initialised, the plugin implementation is responsible for handling the incoming data and the context that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit will not execute further callbacks.

This function will invoke Collect only once to preserve backward compatible behavior. There are unit tests to enforce this behavior.

func FLBPluginInputCleanupCallback added in v0.1.6

func FLBPluginInputCleanupCallback(data unsafe.Pointer) int

FLBPluginInputCleanupCallback releases the memory used during the input callback

func FLBPluginInputPause added in v1.1.2

func FLBPluginInputPause()

FLBPluginInputPause this method gets invoked by the fluent-bit runtime, once the plugin has been paused, the plugin invoked this method and entering paused state.

func FLBPluginInputPreRun added in v1.1.2

func FLBPluginInputPreRun(useHotReload C.int) int

FLBPluginInputPreRun this method gets invoked by the fluent-bit runtime, once the plugin has been initialised, the plugin invoked only once before executing the input callbacks.

func FLBPluginInputResume added in v1.1.2

func FLBPluginInputResume()

FLBPluginInputResume this method gets invoked by the fluent-bit runtime, once the plugin has been resumeed, the plugin invoked this method and re-running state.

func FLBPluginOutputPreExit added in v1.1.2

func FLBPluginOutputPreExit()

FLBPluginOutputPreExit this method gets invoked by the fluent-bit runtime, once the plugin has been exited, the plugin invoked this method and entering exiting state.

func FLBPluginOutputPreRun added in v1.1.2

func FLBPluginOutputPreRun(useHotReload C.int) int

func FLBPluginPreRegister added in v1.1.2

func FLBPluginPreRegister(hotReloading C.int) int

func FLBPluginRegister

func FLBPluginRegister(def unsafe.Pointer) int

FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description can be provided.

func RegisterInput

func RegisterInput(name, desc string, in InputPlugin)

RegisterInput plugin. This function must be called only once per file.

func RegisterOutput

func RegisterOutput(name, desc string, out OutputPlugin)

RegisterOutput plugin. This function must be called only once per file.

Types

type ConfigLoader

type ConfigLoader interface {
	String(key string) string
}

ConfigLoader interface to represent a fluent-bit configuration loader.

type Fluentbit added in v0.1.4

type Fluentbit struct {
	Conf    ConfigLoader
	Metrics Metrics
	Logger  Logger
}

type InputPlugin

type InputPlugin interface {
	Init(ctx context.Context, fbit *Fluentbit) error
	Collect(ctx context.Context, ch chan<- Message) error
}

InputPlugin interface to represent an input fluent-bit plugin.

type Logger added in v0.1.4

type Logger interface {
	Error(format string, a ...any)
	Warn(format string, a ...any)
	Info(format string, a ...any)
	Debug(format string, a ...any)
}

Logger interface to represent a fluent-bit logging mechanism.

type Message

type Message struct {
	Time   time.Time
	Record any
	// contains filtered or unexported fields
}

Message struct to store a fluent-bit message this is collected (input) or flushed (output) from a plugin implementation.

func (Message) Tag

func (m Message) Tag() string

Tag is available at output.

type Metrics added in v0.1.3

type Metrics interface {
	NewCounter(name, desc string, labelValues ...string) metric.Counter
	NewGauge(name, desc string, labelValues ...string) metric.Gauge
}

Metrics builder.

type OutputPlugin

type OutputPlugin interface {
	Init(ctx context.Context, fbit *Fluentbit) error
	Flush(ctx context.Context, ch <-chan Message) error
}

OutputPlugin interface to represent an output fluent-bit plugin.

Directories

Path Synopsis
Package configloader provides functionality to load Calyptia configurations from a set of specified configuration files.
Package configloader provides functionality to load Calyptia configurations from a set of specified configuration files.
Package metric provides with Counter and Gauge interfaces.
Package metric provides with Counter and Gauge interfaces.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL