eventlogger

package module
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MPL-2.0 Imports: 14 Imported by: 8

README

go-eventlogger Go Reference

go-eventlogger is a flexible event system libray implemented as a pub/sub model supporting middleware.

The library's clients submit events to a Broker that routes them through a pipeline. Each node in pipline can modifying the event, filtering it, persisting it, etc.

Stability Notice

While this library is fully open source and HashiCorp will be maintaining it (since we are and will be making extensive use of it), the API and output format is subject to minor changes as we fully bake and vet it in our projects. This notice will be removed once it's fully integrated into our major projects and no further changes are anticipated.

Usage

An Event is a collection of data, analogous to a log entry, that we want to process via pipelines. A pipeline is a graph composed of nodes. The client provides an event type and payload, and any other fields are generated as part of processing. The library will not attempt to discover whether configured formatter/marshaller nodes can actually handle the arbitrary payloads; it is up to the encapsulating program to put any such constraints on the user via its API.

The library's clients submit events to a Broker that routes them through pipelines (graphs) based on their type. A pipeline is a graph composed of Nodes. A Node processes an Event in some way -- modifying it, filtering it, persisting it, etc. A Sink is a Node that persists an Event.

Broker

Clients interact with the library via the Broker by sending events. A Broker processes incoming Events, by sending them to the pipelines (graphs) associated with the Event's type. A given Broker, along with its associated set of pipelines (graphs), will be configured programmatically.

Nodes

A Node is a node in a Pipeline, that can perform operations on an Event. A node has a Type, one of: Filter, Formatter, Sink.

Node example

Examples of things that a Node might do to an Event include:

Modify the Event, by storing a change description in Mutations. Changes could be described as a (jsonpointer, interface{}) key-value pair. Filter the Event out of the pipeline, by returning nil. Get the Event ready for a sink by rendering (formatting) it in someway, e.g. as JSON, so that downstream Sinks in the pipeline can then write it without any extra work. Rendered events will be stored in the Formatted map.

Pipeline

A Pipeline is a pointer to the root of an interconnected sequence of Nodes.

All pipelines with a Sink must contain a Formatter that precedes the Sink and formats events in the Sink's required format.

When using a FileSink without a specified format it will default to JSON and a JSONFormatter must be in the pipeline before the FileSink.

All pipelines must end with a sink node.

Contributing

First: if you're unsure or afraid of anything, just ask or submit the issue or pull request anyways. You won't be yelled at for giving your best effort. The worst that can happen is that you'll be politely asked to change something. We appreciate any sort of contributions, and don't want a wall of rules to get in the way of that.

That said, if you want to ensure that a pull request is likely to be merged, talk to us! A great way to do this is in issues themselves. When you want to work on an issue, comment on it first and tell us the approach you want to take.

Build

If you have the following requirements met locally:

  • Golang v1.16 or greater

Please note that development may require other tools; to install the set of tools at the versions used by the Boundary team, run:

make tools

Before opening a PR, please run:

make fmt

Documentation

Index

Constants

View Source
const (
	JSONFormat = "json"
)

Variables

View Source
var (
	ErrInvalidParameter = errors.New("invalid parameter")
	ErrNodeNotFound     = errors.New("node not found")
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the top-level entity used in the library for configuring the system and for sending events.

Brokers have registered Nodes which may be composed into registered Pipelines for EventTypes.

A Node may be a filter, formatter or sink (see NodeType).

A Broker may have multiple Pipelines.

EventTypes may have multiple Pipelines.

A Pipeline for an EventType may contain multiple filters, one formatter and one sink.

If a Pipeline does not have a formatter, then the event will not be written to the Sink.

A Node can be shared across multiple pipelines.

func NewBroker

func NewBroker(_ ...Option) (*Broker, error)

NewBroker creates a new Broker applying any relevant supplied options. Options are currently accepted, but none are applied.

func (*Broker) IsAnyPipelineRegistered added in v0.2.3

func (b *Broker) IsAnyPipelineRegistered(e EventType) bool

IsAnyPipelineRegistered returns whether a pipeline for a given event type is already registered or not.

func (Broker) Now

func (c Broker) Now() time.Time

Now returns the current time

func (*Broker) RegisterNode

func (b *Broker) RegisterNode(id NodeID, node Node, opt ...Option) error

RegisterNode assigns a node ID to a node. Node IDs should be unique. A Node may be a filter, formatter or sink (see NodeType). Nodes can be shared across multiple pipelines. Accepted options: WithNodeRegistrationPolicy (default: AllowOverwrite).

func (*Broker) RegisterPipeline

func (b *Broker) RegisterPipeline(def Pipeline, opt ...Option) error

RegisterPipeline adds a pipeline to the broker. Accepted options: WithPipelineRegistrationPolicy (default: AllowOverwrite).

func (*Broker) RemoveNode added in v0.2.5

func (b *Broker) RemoveNode(ctx context.Context, id NodeID) error

RemoveNode will remove a node from the broker, if it is not currently in use This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call referencing those nodes

func (*Broker) RemovePipeline

func (b *Broker) RemovePipeline(t EventType, id PipelineID) error

RemovePipeline removes a pipeline from the broker.

func (*Broker) RemovePipelineAndNodes added in v0.2.0

func (b *Broker) RemovePipelineAndNodes(ctx context.Context, t EventType, id PipelineID) (bool, error)

RemovePipelineAndNodes will attempt to remove all nodes referenced by the pipeline. Any nodes that are referenced by other pipelines will not be removed.

Failed preconditions will result in a return of false with an error and neither the pipeline nor nodes will be deleted.

Once we start deleting the pipeline and nodes, we will continue until completion, but we'll return true along with any errors encountered (as multierror.Error).

func (*Broker) Reopen

func (b *Broker) Reopen(ctx context.Context) error

Reopen calls every registered Node's Reopen() function. The intention is to ask all nodes to reopen any files they have open. This is typically used as part of log rotation: after rotating, the rotator sends a signal to the application, which then would invoke this method. Another typically use-case is to have all Nodes reevaluated any external configuration they might have.

func (*Broker) Send

func (b *Broker) Send(ctx context.Context, t EventType, payload interface{}) (Status, error)

Send writes an event of type t to all registered pipelines concurrently and reports on the result. An error will only be returned if a pipeline's delivery policies could not be satisfied.

func (*Broker) SetSuccessThreshold

func (b *Broker) SetSuccessThreshold(t EventType, successThreshold int) error

SetSuccessThreshold sets the success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many pipelines as the threshold value must successfully process the event. This means that a filter could of course filter an event before it reaches the pipeline's sink, but it would still count as success when it comes to meeting this threshold. Use this when you want to allow the filtering of events without causing an error because an event was filtered.

func (*Broker) SetSuccessThresholdSinks added in v0.1.2

func (b *Broker) SetSuccessThresholdSinks(t EventType, successThresholdSinks int) error

SetSuccessThresholdSinks sets the success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many sinks as the threshold value must successfully process the event.

func (*Broker) StopTimeAt

func (b *Broker) StopTimeAt(now time.Time)

StopTimeAt allows you to "stop" the Broker's timestamp clock at a predicable point in time, so timestamps are predictable for testing.

func (*Broker) SuccessThreshold added in v0.2.8

func (b *Broker) SuccessThreshold(t EventType) (int, bool)

SuccessThreshold returns the configured success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many filter or sink nodes as the threshold value must successfully process the event. The threshold is returned (default: 0), along with a boolean indicating whether the EventType was registered with the broker, if true, the threshold is accurate for the specified EventType.

func (*Broker) SuccessThresholdSinks added in v0.2.8

func (b *Broker) SuccessThresholdSinks(t EventType) (int, bool)

SuccessThresholdSinks returns the configured success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many sink nodes as the threshold value must successfully process the event. The threshold is returned (default: 0), along with a boolean indicating whether the EventType was registered with the broker, if true, the threshold is accurate for the specified EventType.

type Closer added in v0.2.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer will close without error

type Event

type Event struct {
	// Type of Event
	Type EventType

	// CreatedAt defines the time the event was Sent
	CreatedAt time.Time

	// Formatted used by Formatters to store formatted Event data which Sinks
	// can use when writing.  The keys correspond to different formats (json,
	// text, etc).
	Formatted map[string][]byte

	// Payload is the Event's payload data
	Payload interface{}
	// contains filtered or unexported fields
}

An Event is analogous to a log entry.

func (*Event) Format

func (e *Event) Format(formatType string) ([]byte, bool)

Format will retrieve the formatted value for the specified format type. The two value return allows the caller to determine the existence of the format type.

func (*Event) FormattedAs

func (e *Event) FormattedAs(formatType string, formattedValue []byte)

FormattedAs sets a formatted value for the event, for the specified format type. Any existing value for the type is overwritten.

type EventType

type EventType string

EventType is a string that uniquely identifies the type of an Event within a given Broker.

type FileSink

type FileSink struct {
	// Path is the complete path of the log file directory, excluding FileName
	Path string

	// FileName is the name of the log file
	FileName string

	// Mode is the file's mode and permission bits
	Mode os.FileMode

	// LastCreated represents the creation time of the latest log
	LastCreated time.Time

	// MaxBytes is the maximum number of desired bytes for a log file
	MaxBytes int

	// BytesWritten is the number of bytes written in the current log file
	BytesWritten int64

	// MaxFiles is the maximum number of old files to keep before removing them
	MaxFiles int

	// MaxDuration is the maximum duration allowed between each file rotation
	MaxDuration time.Duration

	// Format specifies the format the []byte representation is formatted in
	// Defaults to JSONFormat
	Format string

	// TimestampOnlyOnRotate specifies the file currently being written
	// should not contain a timestamp in the name even if rotation is
	// enabled.
	//
	// If false (the default) all files, including the currently written
	// one, will contain a timestamp in the filename.
	TimestampOnlyOnRotate bool
	// contains filtered or unexported fields
}

FileSink writes the []byte representation of an Event to a file as a string.

func (*FileSink) Name

func (fs *FileSink) Name() string

Name returns a representation of the Sink's name

func (*FileSink) Process

func (fs *FileSink) Process(_ context.Context, e *Event) (*Event, error)

Process writes the []byte representation of an Event to a file as a string.

func (*FileSink) Reopen

func (fs *FileSink) Reopen() error

Reopen will close, rotate and reopen the Sink's file.

func (*FileSink) Type

func (_ *FileSink) Type() NodeType

Type describes the type of the node as a Sink.

type Filter

type Filter struct {
	// Predicate is a func that returns true if we want to keep the Event.
	Predicate Predicate
	// contains filtered or unexported fields
}

Filter is a Node that's used for filtering out events from the Pipeline.

func (*Filter) Name

func (f *Filter) Name() string

Name returns a representation of the Filter's name

func (*Filter) Process

func (f *Filter) Process(ctx context.Context, e *Event) (*Event, error)

Process will call the Filter's Predicate func to determine whether to return the Event or filter it out of the Pipeline (Filtered Events return nil, nil, which is a successful response).

func (*Filter) Reopen

func (f *Filter) Reopen() error

Reopen is a no op for Filters.

func (*Filter) Type

func (f *Filter) Type() NodeType

Type describes the type of the node as a Filter.

type JSONFormatter

type JSONFormatter struct{}

JSONFormatter is a Formatter Node which formats the Event as JSON.

func (*JSONFormatter) Name

func (w *JSONFormatter) Name() string

Name returns a representation of the Formatter's name

func (*JSONFormatter) Process

func (w *JSONFormatter) Process(ctx context.Context, e *Event) (*Event, error)

Process formats the Event as JSON and stores that formatted data in Event.Formatted with a key of "json"

func (*JSONFormatter) Reopen

func (w *JSONFormatter) Reopen() error

Reopen is a no op

func (*JSONFormatter) Type

func (w *JSONFormatter) Type() NodeType

Type describes the type of the node as a Formatter.

type JSONFormatterFilter

type JSONFormatterFilter struct {
	Predicate func(e interface{}) (bool, error)
}

JSONFormatterFilter is a Formatter Node which formats the Event as JSON and then may filter the event based on the struct used to format the JSON. This is useful when you want to specify filters based on structure of the formatted JSON vs the structure of the event.

func (*JSONFormatterFilter) Name

func (w *JSONFormatterFilter) Name() string

Name returns a representation of the FormatterFilter's name

func (*JSONFormatterFilter) Process

func (w *JSONFormatterFilter) Process(ctx context.Context, e *Event) (*Event, error)

Process formats the Event as JSON and stores that formatted data in Event.Formatted with a key of "json" and then may filter the event based on the struct used to format the JSON.

func (*JSONFormatterFilter) Reopen

func (w *JSONFormatterFilter) Reopen() error

Reopen is a no op

func (*JSONFormatterFilter) Type

func (w *JSONFormatterFilter) Type() NodeType

Type describes the type of the node as a NodeTypeFormatterFilter.

type Node

type Node interface {
	// Process does something with the Event: filter, redaction,
	// marshalling, persisting.
	Process(ctx context.Context, e *Event) (*Event, error)
	// Reopen is used to re-read any config stored externally
	// and to close and reopen files, e.g. for log rotation.
	Reopen() error
	// Type describes the type of the node.  This is mostly just used to
	// validate that pipelines are sensibly arranged, e.g. ending with a sink.
	Type() NodeType
}

A Node in a graph

type NodeController added in v0.2.0

type NodeController struct {
	// contains filtered or unexported fields
}

A NodeController is used by a Broker to attempt additional control of a given node. For instance, when a Node supports being closed via the Closer interface.

func NewNodeController added in v0.2.0

func NewNodeController(n Node) *NodeController

NewNodeController creates a new NodeController for a given Node. The Node should be the original value registered with the broker, or have an Unwrap method returning the original Node (see NodeUnwrapper interface).

If the Node implements any of the following methods, the NodeController will call them as appropriate/needed:

Close() error

func (*NodeController) Close added in v0.2.0

func (nc *NodeController) Close(ctx context.Context) error

Close the Node if it implements the Closer interface, and if required use the NodeUnwrapper interface to unwrap it before closing it.

type NodeID

type NodeID string

NodeID is a string that uniquely identifies a Node.

type NodeType

type NodeType int

NodeType defines the possible Node type's in the system.

const (
	NodeTypeFilter NodeType
	NodeTypeFormatter
	NodeTypeSink
	NodeTypeFormatterFilter // A node that formats and then filters the events based on the new format.
)

type NodeUnwrapper added in v0.2.0

type NodeUnwrapper interface {
	Unwrap() Node
}

NodeUnwrapper will unwrap a node, returning the original value (see NewNodeController docs)

type Option added in v0.2.0

type Option func(*options) error

Option allows options to be passed as arguments.

func WithNodeRegistrationPolicy added in v0.2.0

func WithNodeRegistrationPolicy(policy RegistrationPolicy) Option

WithNodeRegistrationPolicy configures the option that determines the node registration policy.

func WithPipelineRegistrationPolicy added in v0.2.0

func WithPipelineRegistrationPolicy(policy RegistrationPolicy) Option

WithPipelineRegistrationPolicy configures the option that determines the pipeline registration policy.

type Pipeline

type Pipeline struct {
	// PipelineID uniquely identifies the Pipeline
	PipelineID PipelineID

	// EventType defines the type of event the Pipeline processes
	EventType EventType

	// NodeIDs defines Pipeline's the list of nodes
	NodeIDs []NodeID
}

Pipeline defines a pipe: its ID, the EventType it's for, and the nodes that it contains. Nodes can be shared across multiple pipelines.

type PipelineID

type PipelineID string

PipelineID is a string that uniquely identifies a Pipeline within a given EventType.

type Predicate

type Predicate func(e *Event) (bool, error)

Predicate is a func that returns true if we want to keep the Event.

type RegistrationPolicy added in v0.2.0

type RegistrationPolicy string

RegistrationPolicy is used to specify what kind of policy should apply when registering components (e.g. Pipeline, Node) with the Broker

const (
	AllowOverwrite RegistrationPolicy = "AllowOverwrite"
	DenyOverwrite  RegistrationPolicy = "DenyOverwrite"
)

type Status

type Status struct {

	// Warnings lists any non-fatal errors that occurred while sending an Event.
	Warnings []error
	// contains filtered or unexported fields
}

Status describes the result of a Send.

func (Status) Complete added in v0.2.6

func (s Status) Complete() []NodeID

Complete returns the IDs of 'filter' and 'sink' type nodes that successfully processed the Event, resulting in immediate completion of a particular Pipeline.

func (Status) CompleteSinks added in v0.2.6

func (s Status) CompleteSinks() []NodeID

CompleteSinks returns the IDs of 'sink' type nodes that successfully processed the Event, resulting in immediate completion of a particular Pipeline.

Directories

Path Synopsis
filters
gated
Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed.
Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed.
encrypt Module
formatter_filters
cloudevents
Package cloudevents includes a formatting/filter Node which will transform and encode Events into JSON or Text which conforms to the cloudevents spec.
Package cloudevents includes a formatting/filter Node which will transform and encode Events into JSON or Text which conforms to the cloudevents spec.
sinks
channel
Package channel implements Sink which sends events to a channel.
Package channel implements Sink which sends events to a channel.
writer
Package writer implements Sink which writes the []byte respresentation of an Event to an io.Writer as a string.
Package writer implements Sink which writes the []byte respresentation of an Event to an io.Writer as a string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL