Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) IsAnyPipelineRegistered(e EventType) bool
- func (c Broker) Now() time.Time
- func (b *Broker) RegisterNode(id NodeID, node Node, opt ...Option) error
- func (b *Broker) RegisterPipeline(def Pipeline, opt ...Option) error
- func (b *Broker) RemoveNode(ctx context.Context, id NodeID) error
- func (b *Broker) RemovePipeline(t EventType, id PipelineID) error
- func (b *Broker) RemovePipelineAndNodes(ctx context.Context, t EventType, id PipelineID) (bool, error)
- func (b *Broker) Reopen(ctx context.Context) error
- func (b *Broker) Send(ctx context.Context, t EventType, payload interface{}) (Status, error)
- func (b *Broker) SetSuccessThreshold(t EventType, successThreshold int) error
- func (b *Broker) SetSuccessThresholdSinks(t EventType, successThresholdSinks int) error
- func (b *Broker) StopTimeAt(now time.Time)
- func (b *Broker) SuccessThreshold(t EventType) (int, bool)
- func (b *Broker) SuccessThresholdSinks(t EventType) (int, bool)
- type Closer
- type Event
- type EventType
- type FileSink
- type Filter
- type JSONFormatter
- type JSONFormatterFilter
- type Node
- type NodeController
- type NodeID
- type NodeType
- type NodeUnwrapper
- type Option
- type Pipeline
- type PipelineID
- type Predicate
- type RegistrationPolicy
- type Status
Constants ¶
const (
JSONFormat = "json"
)
Variables ¶
var ( ErrInvalidParameter = errors.New("invalid parameter") ErrNodeNotFound = errors.New("node not found") )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the top-level entity used in the library for configuring the system and for sending events.
Brokers have registered Nodes which may be composed into registered Pipelines for EventTypes.
A Node may be a filter, formatter or sink (see NodeType).
A Broker may have multiple Pipelines.
EventTypes may have multiple Pipelines.
A Pipeline for an EventType may contain multiple filters, one formatter and one sink.
If a Pipeline does not have a formatter, then the event will not be written to the Sink.
A Node can be shared across multiple pipelines.
func NewBroker ¶
NewBroker creates a new Broker applying any relevant supplied options. Options are currently accepted, but none are applied.
func (*Broker) IsAnyPipelineRegistered ¶ added in v0.2.3
IsAnyPipelineRegistered returns whether a pipeline for a given event type is already registered or not.
func (*Broker) RegisterNode ¶
RegisterNode assigns a node ID to a node. Node IDs should be unique. A Node may be a filter, formatter or sink (see NodeType). Nodes can be shared across multiple pipelines. Accepted options: WithNodeRegistrationPolicy (default: AllowOverwrite).
func (*Broker) RegisterPipeline ¶
RegisterPipeline adds a pipeline to the broker. Accepted options: WithPipelineRegistrationPolicy (default: AllowOverwrite).
func (*Broker) RemoveNode ¶ added in v0.2.5
RemoveNode will remove a node from the broker, if it is not currently in use This is useful if RegisterNode was used successfully prior to a failed RegisterPipeline call referencing those nodes
func (*Broker) RemovePipeline ¶
func (b *Broker) RemovePipeline(t EventType, id PipelineID) error
RemovePipeline removes a pipeline from the broker.
func (*Broker) RemovePipelineAndNodes ¶ added in v0.2.0
func (b *Broker) RemovePipelineAndNodes(ctx context.Context, t EventType, id PipelineID) (bool, error)
RemovePipelineAndNodes will attempt to remove all nodes referenced by the pipeline. Any nodes that are referenced by other pipelines will not be removed.
Failed preconditions will result in a return of false with an error and neither the pipeline nor nodes will be deleted.
Once we start deleting the pipeline and nodes, we will continue until completion, but we'll return true along with any errors encountered (as multierror.Error).
func (*Broker) Reopen ¶
Reopen calls every registered Node's Reopen() function. The intention is to ask all nodes to reopen any files they have open. This is typically used as part of log rotation: after rotating, the rotator sends a signal to the application, which then would invoke this method. Another typically use-case is to have all Nodes reevaluated any external configuration they might have.
func (*Broker) Send ¶
Send writes an event of type t to all registered pipelines concurrently and reports on the result. An error will only be returned if a pipeline's delivery policies could not be satisfied.
func (*Broker) SetSuccessThreshold ¶
SetSuccessThreshold sets the success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many pipelines as the threshold value must successfully process the event. This means that a filter could of course filter an event before it reaches the pipeline's sink, but it would still count as success when it comes to meeting this threshold. Use this when you want to allow the filtering of events without causing an error because an event was filtered.
func (*Broker) SetSuccessThresholdSinks ¶ added in v0.1.2
SetSuccessThresholdSinks sets the success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many sinks as the threshold value must successfully process the event.
func (*Broker) StopTimeAt ¶
StopTimeAt allows you to "stop" the Broker's timestamp clock at a predicable point in time, so timestamps are predictable for testing.
func (*Broker) SuccessThreshold ¶ added in v0.2.8
SuccessThreshold returns the configured success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many filter or sink nodes as the threshold value must successfully process the event. The threshold is returned (default: 0), along with a boolean indicating whether the EventType was registered with the broker, if true, the threshold is accurate for the specified EventType.
func (*Broker) SuccessThresholdSinks ¶ added in v0.2.8
SuccessThresholdSinks returns the configured success threshold per EventType. For the overall processing of a given event to be considered a success, at least as many sink nodes as the threshold value must successfully process the event. The threshold is returned (default: 0), along with a boolean indicating whether the EventType was registered with the broker, if true, the threshold is accurate for the specified EventType.
type Event ¶
type Event struct { // Type of Event Type EventType // CreatedAt defines the time the event was Sent CreatedAt time.Time // Formatted used by Formatters to store formatted Event data which Sinks // can use when writing. The keys correspond to different formats (json, // text, etc). Formatted map[string][]byte // Payload is the Event's payload data Payload interface{} // contains filtered or unexported fields }
An Event is analogous to a log entry.
func (*Event) Format ¶
Format will retrieve the formatted value for the specified format type. The two value return allows the caller to determine the existence of the format type.
func (*Event) FormattedAs ¶
FormattedAs sets a formatted value for the event, for the specified format type. Any existing value for the type is overwritten.
type EventType ¶
type EventType string
EventType is a string that uniquely identifies the type of an Event within a given Broker.
type FileSink ¶
type FileSink struct { // Path is the complete path of the log file directory, excluding FileName Path string // FileName is the name of the log file FileName string // Mode is the file's mode and permission bits Mode os.FileMode // LastCreated represents the creation time of the latest log LastCreated time.Time // MaxBytes is the maximum number of desired bytes for a log file MaxBytes int // BytesWritten is the number of bytes written in the current log file BytesWritten int64 // MaxFiles is the maximum number of old files to keep before removing them MaxFiles int // MaxDuration is the maximum duration allowed between each file rotation MaxDuration time.Duration // Format specifies the format the []byte representation is formatted in // Defaults to JSONFormat Format string // TimestampOnlyOnRotate specifies the file currently being written // should not contain a timestamp in the name even if rotation is // enabled. // // If false (the default) all files, including the currently written // one, will contain a timestamp in the filename. TimestampOnlyOnRotate bool // contains filtered or unexported fields }
FileSink writes the []byte representation of an Event to a file as a string.
func (*FileSink) Process ¶
Process writes the []byte representation of an Event to a file as a string.
type Filter ¶
type Filter struct { // Predicate is a func that returns true if we want to keep the Event. Predicate Predicate // contains filtered or unexported fields }
Filter is a Node that's used for filtering out events from the Pipeline.
type JSONFormatter ¶
type JSONFormatter struct{}
JSONFormatter is a Formatter Node which formats the Event as JSON.
func (*JSONFormatter) Name ¶
func (w *JSONFormatter) Name() string
Name returns a representation of the Formatter's name
func (*JSONFormatter) Process ¶
Process formats the Event as JSON and stores that formatted data in Event.Formatted with a key of "json"
func (*JSONFormatter) Type ¶
func (w *JSONFormatter) Type() NodeType
Type describes the type of the node as a Formatter.
type JSONFormatterFilter ¶
JSONFormatterFilter is a Formatter Node which formats the Event as JSON and then may filter the event based on the struct used to format the JSON. This is useful when you want to specify filters based on structure of the formatted JSON vs the structure of the event.
func (*JSONFormatterFilter) Name ¶
func (w *JSONFormatterFilter) Name() string
Name returns a representation of the FormatterFilter's name
func (*JSONFormatterFilter) Process ¶
Process formats the Event as JSON and stores that formatted data in Event.Formatted with a key of "json" and then may filter the event based on the struct used to format the JSON.
func (*JSONFormatterFilter) Type ¶
func (w *JSONFormatterFilter) Type() NodeType
Type describes the type of the node as a NodeTypeFormatterFilter.
type Node ¶
type Node interface { // Process does something with the Event: filter, redaction, // marshalling, persisting. Process(ctx context.Context, e *Event) (*Event, error) // Reopen is used to re-read any config stored externally // and to close and reopen files, e.g. for log rotation. Reopen() error // Type describes the type of the node. This is mostly just used to // validate that pipelines are sensibly arranged, e.g. ending with a sink. Type() NodeType }
A Node in a graph
type NodeController ¶ added in v0.2.0
type NodeController struct {
// contains filtered or unexported fields
}
A NodeController is used by a Broker to attempt additional control of a given node. For instance, when a Node supports being closed via the Closer interface.
func NewNodeController ¶ added in v0.2.0
func NewNodeController(n Node) *NodeController
NewNodeController creates a new NodeController for a given Node. The Node should be the original value registered with the broker, or have an Unwrap method returning the original Node (see NodeUnwrapper interface).
If the Node implements any of the following methods, the NodeController will call them as appropriate/needed:
Close() error
type NodeType ¶
type NodeType int
NodeType defines the possible Node type's in the system.
const ( NodeTypeFilter NodeType NodeTypeFormatter NodeTypeSink NodeTypeFormatterFilter // A node that formats and then filters the events based on the new format. )
type NodeUnwrapper ¶ added in v0.2.0
type NodeUnwrapper interface {
Unwrap() Node
}
NodeUnwrapper will unwrap a node, returning the original value (see NewNodeController docs)
type Option ¶ added in v0.2.0
type Option func(*options) error
Option allows options to be passed as arguments.
func WithNodeRegistrationPolicy ¶ added in v0.2.0
func WithNodeRegistrationPolicy(policy RegistrationPolicy) Option
WithNodeRegistrationPolicy configures the option that determines the node registration policy.
func WithPipelineRegistrationPolicy ¶ added in v0.2.0
func WithPipelineRegistrationPolicy(policy RegistrationPolicy) Option
WithPipelineRegistrationPolicy configures the option that determines the pipeline registration policy.
type Pipeline ¶
type Pipeline struct { // PipelineID uniquely identifies the Pipeline PipelineID PipelineID // EventType defines the type of event the Pipeline processes EventType EventType // NodeIDs defines Pipeline's the list of nodes NodeIDs []NodeID }
Pipeline defines a pipe: its ID, the EventType it's for, and the nodes that it contains. Nodes can be shared across multiple pipelines.
type PipelineID ¶
type PipelineID string
PipelineID is a string that uniquely identifies a Pipeline within a given EventType.
type RegistrationPolicy ¶ added in v0.2.0
type RegistrationPolicy string
RegistrationPolicy is used to specify what kind of policy should apply when registering components (e.g. Pipeline, Node) with the Broker
const ( AllowOverwrite RegistrationPolicy = "AllowOverwrite" DenyOverwrite RegistrationPolicy = "DenyOverwrite" )
type Status ¶
type Status struct { // Warnings lists any non-fatal errors that occurred while sending an Event. Warnings []error // contains filtered or unexported fields }
Status describes the result of a Send.
func (Status) Complete ¶ added in v0.2.6
Complete returns the IDs of 'filter' and 'sink' type nodes that successfully processed the Event, resulting in immediate completion of a particular Pipeline.
func (Status) CompleteSinks ¶ added in v0.2.6
CompleteSinks returns the IDs of 'sink' type nodes that successfully processed the Event, resulting in immediate completion of a particular Pipeline.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
filters
|
|
gated
Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed.
|
Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed. |
encrypt
Module
|
|
formatter_filters
|
|
cloudevents
Package cloudevents includes a formatting/filter Node which will transform and encode Events into JSON or Text which conforms to the cloudevents spec.
|
Package cloudevents includes a formatting/filter Node which will transform and encode Events into JSON or Text which conforms to the cloudevents spec. |
sinks
|
|
channel
Package channel implements Sink which sends events to a channel.
|
Package channel implements Sink which sends events to a channel. |
writer
Package writer implements Sink which writes the []byte respresentation of an Event to an io.Writer as a string.
|
Package writer implements Sink which writes the []byte respresentation of an Event to an io.Writer as a string. |