core

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2015 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LogInternalStream is the name of the internal message channel (logs)
	LogInternalStream = "_GOLLUM_"
	// WildcardStream is the name of the "all streams" channel
	WildcardStream = "*"
	// DroppedStream is the name of the stream used to store dropped messages
	DroppedStream = "_DROPPED_"
)
View Source
const (
	// PluginControlStop will cause the consumer to halt and shutdown.
	PluginControlStop = PluginControl(1)

	// PluginControlRoll notifies the consumer about a reconnect or reopen request
	PluginControlRoll = PluginControl(2)
)

Variables

View Source
var (
	// LogInternalStreamID is the ID of the "_GOLLUM_" stream
	LogInternalStreamID = GetStreamID(LogInternalStream)

	// WildcardStreamID is the ID of the "*" stream
	WildcardStreamID = GetStreamID(WildcardStream)

	// DroppedStreamID is the ID of the "_DROPPED_" stream
	DroppedStreamID = GetStreamID(DroppedStream)
)
View Source
var MessageCount = uint32(0)

MessageCount holds the number of messages processed since the last call to GetAndResetMessageCount.

View Source
var StreamTypes = StreamRegistry{
	// contains filtered or unexported fields
}

StreamTypes is the global instance of StreamRegistry used to store the all registered streams.

Functions

func EnableRetryQueue

func EnableRetryQueue(size int)

EnableRetryQueue creates a retried messages channel using the given size.

func GetAndResetMessageCount

func GetAndResetMessageCount() uint32

GetAndResetMessageCount returns the current message counter and resets it to 0. This function is threadsafe.

func GetRetryQueue

func GetRetryQueue() <-chan Message

GetRetryQueue returns read access to the retry queue.

Types

type AsyncMessageSource

type AsyncMessageSource interface {
	MessageSource

	// EnqueueResponse sends a message to the source of another message.
	EnqueueResponse(msg Message)
}

AsyncMessageSource extends the MessageSource interface to allow a backchannel that simply forwards any message coming from the producer.

type Config

type Config struct {
	Values  []map[string]shared.MarshalMap
	Plugins []PluginConfig
}

Config represents the top level config containing all plugin clonfigs

func ReadConfig

func ReadConfig(path string) (*Config, error)

ReadConfig parses a YAML config file into a new Config struct.

type Consumer

type Consumer interface {
	// Consume should implement to main loop that fetches messages from a given
	// source and pushes it to the Message channel.
	Consume(*sync.WaitGroup)

	// Streams returns the streams this consumer is writing to.
	Streams() []MessageStreamID

	// Control returns write access to this consumer's control channel.
	// See PluginControl* constants.
	Control() chan<- PluginControl
}

Consumer is an interface for plugins that recieve data from outside sources and generate Message objects from this data.

type ConsumerBase

type ConsumerBase struct {
	// contains filtered or unexported fields
}

ConsumerBase base class All consumers support a common subset of configuration options:

  • "consumer.Something": Enable: true Stream:
  • "error"
  • "default"

Enable switches the consumer on or off. By default this value is set to true.

Stream contains either a single string or a list of strings defining the message channels this consumer will produce. By default this is set to "*" which means only producers set to consume "all streams" will get these messages.

func (ConsumerBase) AddMainWorker

func (cons ConsumerBase) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (ConsumerBase) AddWorker

func (cons ConsumerBase) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*ConsumerBase) Configure

func (cons *ConsumerBase) Configure(conf PluginConfig) error

Configure initializes standard consumer values from a plugin config.

func (*ConsumerBase) Control

func (cons *ConsumerBase) Control() chan<- PluginControl

Control returns write access to this consumer's control channel. See ConsumerControl* constants.

func (*ConsumerBase) DefaultControlLoop

func (cons *ConsumerBase) DefaultControlLoop(onRoll func())

DefaultControlLoop provides a consumer mainloop that is sufficient for most usecases.

func (*ConsumerBase) Enqueue

func (cons *ConsumerBase) Enqueue(data []byte, sequence uint64)

Enqueue creates a new message from a given byte slice and passes it to EnqueueMessage. Note that data is not copied, just referenced by the message.

func (*ConsumerBase) EnqueueCopy

func (cons *ConsumerBase) EnqueueCopy(data []byte, sequence uint64)

EnqueueCopy behaves like Enqueue but creates a copy of data that is attached to the message.

func (*ConsumerBase) EnqueueMessage

func (cons *ConsumerBase) EnqueueMessage(msg Message)

EnqueueMessage passes a given message to all streams. Only the StreamID of the message is modified, everything else is passed as-is.

func (*ConsumerBase) ProcessCommand

func (cons *ConsumerBase) ProcessCommand(command PluginControl, onRoll func()) bool

ProcessCommand provides a callback based possibility to react on the different consumer commands. Returns true if ConsumerControlStop was triggered.

func (ConsumerBase) SetWorkerWaitGroup

func (cons ConsumerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*ConsumerBase) Streams

func (cons *ConsumerBase) Streams() []MessageStreamID

Streams returns an array with all stream ids this consumer is writing to.

func (*ConsumerBase) TickerControlLoop

func (cons *ConsumerBase) TickerControlLoop(interval time.Duration, onRoll func(), onTick func())

TickerControlLoop is like DefaultControlLoop but executes a given function at every given interval tick, too.

func (ConsumerBase) WorkerDone

func (cons ConsumerBase) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type ConsumerError

type ConsumerError struct {
	// contains filtered or unexported fields
}

ConsumerError can be used to return consumer related errors e.g. during a call to Configure

func NewConsumerError

func NewConsumerError(args ...interface{}) ConsumerError

NewConsumerError creates a new ConsumerError

func (ConsumerError) Error

func (err ConsumerError) Error() string

Error satisfies the error interface for the ConsumerError struct

type Filter

type Filter interface {
	Accepts(msg Message) bool
}

Filter allows custom message filtering for ProducerBase derived plugins. Producers not deriving from ProducerBase might utilize this one, too.

type Formatter

type Formatter interface {
	// Format transfers the message payload into a new format. The payload may
	// then be reassigned to the original or a new message.
	// In addition to that the formatter may change the stream of the message.
	Format(msg Message) ([]byte, MessageStreamID)
}

Formatter is the interface definition for message formatters

type LinkableMessageSource

type LinkableMessageSource interface {
	MessageSource
	// Link the message source to the message reciever. This makes it possible
	// to create stable "pipes" between e.g. a consumer and producer.
	Link(pipe interface{})

	// IsLinked has to return true if Link executed successfull and does not
	// need to be called again.
	IsLinked() bool
}

LinkableMessageSource extends the MessageSource interface to allow a pipe like behaviour between two components that communicate messages.

type LogConsumer

type LogConsumer struct {
	// contains filtered or unexported fields
}

LogConsumer is an internal consumer plugin used indirectly by the gollum log package.

func (*LogConsumer) Configure

func (cons *LogConsumer) Configure(conf PluginConfig) error

Configure initializes this consumer with values from a plugin config.

func (*LogConsumer) Consume

func (cons *LogConsumer) Consume(threads *sync.WaitGroup)

Consume starts listening for control statements

func (*LogConsumer) Control

func (cons *LogConsumer) Control() chan<- PluginControl

Control returns a handle to the control channel

func (LogConsumer) IsPaused

func (cons LogConsumer) IsPaused() bool

IsPaused returns false as Pause is not implemented

func (LogConsumer) Pause

func (cons LogConsumer) Pause()

Pause is not implemented

func (LogConsumer) Resume

func (cons LogConsumer) Resume()

Resume is not implemented as Pause is not implemented.

func (*LogConsumer) Streams

func (cons *LogConsumer) Streams() []MessageStreamID

Streams always returns an array with one member - the internal log stream

func (LogConsumer) Write

func (cons LogConsumer) Write(data []byte) (int, error)

Write fullfills the io.Writer interface

type MappedStream

type MappedStream struct {
	StreamID MessageStreamID
	Stream   Stream
}

MappedStream holds a stream and the id the stream is assgined to

type Message

type Message struct {
	Data      []byte
	StreamID  MessageStreamID
	Source    MessageSource
	Timestamp time.Time
	Sequence  uint64
}

Message is a container used for storing the internal state of messages. This struct is passed between consumers and producers.

func NewMessage

func NewMessage(source MessageSource, data []byte, sequence uint64) Message

NewMessage creates a new message from a given data stream

func (Message) Drop

func (msg Message) Drop(timeout time.Duration)

Drop pushes a message to the retry queue and sets the stream to _DROPPED_. This queue can be consumed by the loopback consumer. If no such consumer has been configured, the message is lost.

func (Message) Enqueue

func (msg Message) Enqueue(channel chan<- Message, timeout time.Duration)

Enqueue is a convenience function to push a message to a channel while waiting for a timeout instead of just blocking. Passing a timeout of -1 will discard the message. Passing a timout of 0 will always block. Messages that time out will be passed to the dropped queue if a Dropped consumer exists.

func (Message) Retry

func (msg Message) Retry(timeout time.Duration)

Retry pushes a message to the retry queue. This queue can be consumed by the loopback consumer. If no such consumer has been configured, the message is lost.

func (Message) String

func (msg Message) String() string

String implements the stringer interface

type MessageBatch

type MessageBatch struct {
	// contains filtered or unexported fields
}

MessageBatch is a helper class for producers to format and store messages into a single buffer that is flushed to an io.Writer. You can use the Reached* functions to determine whether a flush should be called, i.e. if a timeout or size threshold has been reached.

func NewMessageBatch

func NewMessageBatch(size int, format Formatter) *MessageBatch

NewMessageBatch creates a new MessageBatch with a given size (in bytes) and a given formatter.

func (*MessageBatch) Append

func (batch *MessageBatch) Append(msg Message) bool

Append formats a message and appends it to the internal buffer. If the message does not fit into the buffer this function returns false. If the message can never fit into the buffer (too large), true is returned and an error is logged.

func (*MessageBatch) Flush

func (batch *MessageBatch) Flush(resource io.Writer, validate func() bool, onError func(error) bool)

Flush writes the content of the buffer to a given resource and resets the internal state, i.e. the buffer is empty after a call to Flush. Writing will be done in a separate go routine to be non-blocking.

The validate callback will be called after messages have been successfully written to the io.Writer. If validate returns false the buffer will not be resetted (automatic retry). If validate is nil a return value of true is assumed (buffer reset).

The onError callback will be called if the io.Writer returned an error. If onError returns false the buffer will not be resetted (automatic retry). If onError is nil a return value of true is assumed (buffer reset).

func (MessageBatch) IsEmpty

func (batch MessageBatch) IsEmpty() bool

IsEmpty returns true if no data is stored in the buffer

func (MessageBatch) ReachedSizeThreshold

func (batch MessageBatch) ReachedSizeThreshold(size int) bool

ReachedSizeThreshold returns true if the bytes stored in the buffer are above or equal to the size given. If there is no data this function returns false.

func (MessageBatch) ReachedTimeThreshold

func (batch MessageBatch) ReachedTimeThreshold(timeout time.Duration) bool

ReachedTimeThreshold returns true if the last flush was more than timeout ago. If there is no data this function returns false.

func (*MessageBatch) Touch

func (batch *MessageBatch) Touch()

Touch resets the timer queried by ReachedTimeThreshold, i.e. this resets the automatic flush timeout

func (*MessageBatch) WaitForFlush

func (batch *MessageBatch) WaitForFlush(timeout time.Duration)

WaitForFlush blocks until the current flush command returns. Passing a timeout > 0 will unblock this function after the given duration at the latest.

type MessageSource

type MessageSource interface {
}

MessageSource defines methods that are common to all message sources. Currently this is only a placeholder.

type MessageStreamID

type MessageStreamID uint64

MessageStreamID is the "compiled name" of a stream

func GetStreamID

func GetStreamID(stream string) MessageStreamID

GetStreamID returns the integer representation of a given stream name.

type Plugin

type Plugin interface {
	Configure(conf PluginConfig) error
}

Plugin is the base class for any runtime class that can be configured and instantiated during runtim.

func NewPlugin

func NewPlugin(config PluginConfig) (Plugin, error)

NewPlugin creates a new plugin from the type information stored in its config. This function internally calls NewPluginWithType.

func NewPluginWithType

func NewPluginWithType(typename string, config PluginConfig) (Plugin, error)

NewPluginWithType creates a new plugin of a given type and initializes it using the given config (i.e. passes that config to Configure). The type passed to this function may differ from the type stored in the config. If the type is meant to match use NewPlugin instead of NewPluginWithType. This function returns nil, error if the plugin could not be instantiated or plugin, error if Configure failed.

type PluginConfig

type PluginConfig struct {
	Typename  string
	Enable    bool
	Instances int
	Stream    []string
	Settings  shared.MarshalMap
	// contains filtered or unexported fields
}

PluginConfig is a configuration for a specific plugin

func NewPluginConfig

func NewPluginConfig(typename string) PluginConfig

NewPluginConfig creates a new plugin config with default values. By default the plugin is enabled, has a buffered channel with 4096 slots, has one instance, is bound to no streams and has no additional settings.

func (PluginConfig) GetBool

func (conf PluginConfig) GetBool(key string, defaultValue bool) bool

GetBool tries to read a non-predefined, boolean value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetInt

func (conf PluginConfig) GetInt(key string, defaultValue int) int

GetInt tries to read a non-predefined, integer value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStreamMap

func (conf PluginConfig) GetStreamMap(key string, defaultValue string) map[MessageStreamID]string

GetStreamMap tries to read a non-predefined, stream to string map from a plugin config. A mapping on the wildcard stream is always returned. The target is either defaultValue or a value defined by the config.

func (PluginConfig) GetStreamRoutes

func (conf PluginConfig) GetStreamRoutes(key string) map[MessageStreamID][]MessageStreamID

GetStreamRoutes tries to read a non-predefined, stream to stream map from a plugin config. If no routes are defined an empty map is returned

func (PluginConfig) GetString

func (conf PluginConfig) GetString(key string, defaultValue string) string

GetString tries to read a non-predefined, string value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStringArray

func (conf PluginConfig) GetStringArray(key string, defaultValue []string) []string

GetStringArray tries to read a non-predefined, string array from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetStringMap

func (conf PluginConfig) GetStringMap(key string, defaultValue map[string]string) map[string]string

GetStringMap tries to read a non-predefined, string to string map from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) GetValue

func (conf PluginConfig) GetValue(key string, defaultValue interface{}) interface{}

GetValue tries to read a non-predefined, untyped value from a PluginConfig. If that value is not found defaultValue is returned.

func (PluginConfig) HasValue

func (conf PluginConfig) HasValue(key string) bool

HasValue returns true if the given key has been set as a config option. This function only takes non-predefined settings into account.

func (PluginConfig) Override

func (conf PluginConfig) Override(key string, value interface{})

Override sets or override a configuration value for non-predefined options.

func (*PluginConfig) Read

func (conf *PluginConfig) Read(values shared.MarshalMap)

Read analyzes a given key/value map to extract the configuration values valid for each plugin. All non-default values are written to the Settings member.

func (PluginConfig) Validate

func (conf PluginConfig) Validate() bool

Validate should be called after a configuration has been processed. It will check the keys read from the config files against the keys requested up to this point. Unknown keys will be written to the error log.

type PluginControl

type PluginControl int

PluginControl is an enumeration used by the Producer.control() channel

type PluginRunState

type PluginRunState struct {
	// contains filtered or unexported fields
}

PluginRunState is used in some plugins to store information about the execution state of the plugin (i.e. if it is running or not) as well as threading primitives that enable gollum to wait for a plugin top properly shut down.

func (*PluginRunState) AddWorker

func (state *PluginRunState) AddWorker()

AddWorker adds a worker to the waitgroup configured by SetWorkerWaitGroup.

func (*PluginRunState) IsPaused

func (state *PluginRunState) IsPaused() bool

IsPaused implements the MessageSource interface

func (*PluginRunState) Pause

func (state *PluginRunState) Pause()

Pause implements the MessageSource interface

func (*PluginRunState) Resume

func (state *PluginRunState) Resume()

Resume implements the MessageSource interface

func (*PluginRunState) SetWorkerWaitGroup

func (state *PluginRunState) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup sets the WaitGroup used to manage workers

func (*PluginRunState) WorkerDone

func (state *PluginRunState) WorkerDone()

WorkerDone removes a worker from the waitgroup configured by SetWorkerWaitGroup.

type Producer

type Producer interface {
	// Enqueue sends a message to the producer. The producer may reject
	// the message or drop it after a given timeout. Enqueue can block.
	Enqueue(msg Message)

	// Produce should implement a main loop that passes messages from the
	// message channel to some other service like the console.
	// This can be part of this function or a separate go routine.
	// Produce is always called as a go routine.
	Produce(workers *sync.WaitGroup)

	// Streams returns the streams this producer is listening to.
	Streams() []MessageStreamID

	// Control returns write access to this producer's control channel.
	// See ProducerControl* constants.
	Control() chan<- PluginControl
}

Producer is an interface for plugins that pass messages to other services, files or storages.

type ProducerBase

type ProducerBase struct {
	// contains filtered or unexported fields
}

ProducerBase base class All producers support a common subset of configuration options:

  • "producer.Something": Enable: true Channel: 1024 ChannelTimeout: 200 Formatter: "format.Envelope" Stream:
  • "error"
  • "default"

Enable switches the consumer on or off. By default this value is set to true.

Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.

ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer's queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.

Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to "*" which means "listen to all streams but the internal".

Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward.

func (ProducerBase) AddMainWorker

func (prod ProducerBase) AddMainWorker(workers *sync.WaitGroup)

AddMainWorker adds the first worker to the waitgroup

func (ProducerBase) AddWorker

func (prod ProducerBase) AddWorker()

AddWorker adds an additional worker to the waitgroup. Assumes that either MarkAsActive or SetWaitGroup has been called beforehand.

func (*ProducerBase) Close

func (prod *ProducerBase) Close(onMessage func(msg Message))

Close closes the internal message channel and sends all remaining messages to the given callback. This function is called by *ControlLoop after a quit command has been recieved.

func (*ProducerBase) Configure

func (prod *ProducerBase) Configure(conf PluginConfig) error

Configure initializes the standard producer config values.

func (*ProducerBase) Control

func (prod *ProducerBase) Control() chan<- PluginControl

Control returns write access to this producer's control channel. See ProducerControl* constants.

func (*ProducerBase) DefaultControlLoop

func (prod *ProducerBase) DefaultControlLoop(onMessage func(msg Message), onRoll func())

DefaultControlLoop provides a producer mainloop that is sufficient for most usecases. Before this function exits Close will be called.

func (*ProducerBase) Enqueue

func (prod *ProducerBase) Enqueue(msg Message)

Enqueue will add the message to the internal channel so it can be processed by the producer main loop.

func (*ProducerBase) Format

func (prod *ProducerBase) Format(msg Message) ([]byte, MessageStreamID)

Format calls the formatters Format function

func (*ProducerBase) GetFormatter

func (prod *ProducerBase) GetFormatter() Formatter

GetFormatter returns the formatter of this producer

func (ProducerBase) GetTimeout

func (prod ProducerBase) GetTimeout() time.Duration

GetTimeout returns the duration this producer will block before a message is dropped. A value of -1 will cause the message to drop. A value of 0 will cause the producer to always block.

func (*ProducerBase) Messages

func (prod *ProducerBase) Messages() chan<- Message

Messages returns write access to the message channel this producer reads from.

func (ProducerBase) Next

func (prod ProducerBase) Next() (Message, bool)

Next returns the latest message from the channel as well as the open state of the channel. This function blocks if the channel is empty.

func (ProducerBase) NextNonBlocking

func (prod ProducerBase) NextNonBlocking(onMessage func(msg Message)) bool

NextNonBlocking calls a given callback if a message is queued or returns. Returns false if no message was recieved.

func (*ProducerBase) PauseAllStreams

func (prod *ProducerBase) PauseAllStreams(capacity int)

PauseAllStreams sends the Pause() command to all streams this producer is listening to.

func (*ProducerBase) ProcessCommand

func (prod *ProducerBase) ProcessCommand(command PluginControl, onRoll func()) bool

ProcessCommand provides a callback based possibility to react on the different producer commands. Returns true if ProducerControlStop was triggered.

func (*ProducerBase) ResumeAllStreams

func (prod *ProducerBase) ResumeAllStreams()

ResumeAllStreams sends the Resume() command to all streams this producer is listening to.

func (ProducerBase) SetWorkerWaitGroup

func (prod ProducerBase) SetWorkerWaitGroup(workers *sync.WaitGroup)

SetWorkerWaitGroup forwards to Plugin.SetWorkerWaitGroup for this consumer's internal plugin state. This method is also called by AddMainWorker.

func (*ProducerBase) Streams

func (prod *ProducerBase) Streams() []MessageStreamID

Streams returns the streams this producer is listening to.

func (*ProducerBase) TickerControlLoop

func (prod *ProducerBase) TickerControlLoop(interval time.Duration, onMessage func(msg Message), onRoll func(), onTimeOut func())

TickerControlLoop is like DefaultControlLoop but executes a given function at every given interval tick, too. Before this function exits Close will be called.

func (ProducerBase) WorkerDone

func (prod ProducerBase) WorkerDone()

WorkerDone removes an additional worker to the waitgroup.

type ProducerError

type ProducerError struct {
	// contains filtered or unexported fields
}

ProducerError can be used to return consumer related errors e.g. during a call to Configure

func NewProducerError

func NewProducerError(args ...interface{}) ProducerError

NewProducerError creates a new ProducerError

func (ProducerError) Error

func (err ProducerError) Error() string

Error satisfies the error interface for the ProducerError struct

type SerialMessageSource

type SerialMessageSource interface {
	AsyncMessageSource

	// Notify the end of the response stream
	ResponseDone()
}

SerialMessageSource extends the AsyncMessageSource interface to allow waiting for all parts of the response to be submitted.

type Stream

type Stream interface {
	// Pause causes this stream to go silent. Messages should be queued or cause
	// a blocking call. The passed capacity can be used to configure internal
	// channel for buffering incoming messages while this stream is paused.
	Pause(capacity int)

	// Resume causes this stream to send messages again after Pause() had been
	// called. Any buffered messages need to be sent by this method or by a
	// separate go routine.
	Resume()

	// AddProducer adds one or more producers to this stream, i.e. the producers
	// listening to messages on this stream.
	AddProducer(producers ...Producer)

	// Enqueue sends a given message to all registered producers
	Enqueue(msg Message)
}

Stream defines the interface for all stream plugins

type StreamBase

type StreamBase struct {
	Filter     Filter
	Format     Formatter
	Producers  []Producer
	Distribute func(msg Message)
	// contains filtered or unexported fields
}

StreamBase defines the standard stream implementation. New stream types should derive from this class. StreamBase allows streams to set and execute filters as well as format a message. Types derived from StreamBase should set the Distribute member instead of overloading the Enqueue method. See stream.Broadcast for default configuration values and examples.

func (*StreamBase) AddProducer

func (stream *StreamBase) AddProducer(producers ...Producer)

AddProducer adds all producers to the list of known producers. Duplicates will be filtered.

func (*StreamBase) Configure

func (stream *StreamBase) Configure(conf PluginConfig) error

Configure sets up all values requred by StreamBase

func (*StreamBase) Enqueue

func (stream *StreamBase) Enqueue(msg Message)

Enqueue checks the filter, formats the message and sends it to all producers registered. Functions deriving from StreamBase can set the Distribute member to hook into this function.

func (*StreamBase) Pause

func (stream *StreamBase) Pause(capacity int)

Pause will cause this stream to go silent. Messages will be queued to an internal channel that can be configured in size by setting the capacity parameter. Pass a capacity of 0 to disable buffering. Calling Pause on an already paused stream is ignored.

func (*StreamBase) Resume

func (stream *StreamBase) Resume()

Resume causes this stream to send messages again after Pause() had been called. Any buffered messages will be sent by a separate go routine. Calling Resume on a stream that is not paused is ignored.

type StreamRegistry

type StreamRegistry struct {
	// contains filtered or unexported fields
}

StreamRegistry holds streams mapped by their MessageStreamID as well as a reverse lookup of MessageStreamID to stream name.

func (StreamRegistry) AddWildcardProducersToStream

func (registry StreamRegistry) AddWildcardProducersToStream(stream Stream)

AddWildcardProducersToStream adds all known wildcard producers to a given stream. The state of the wildcard list is undefined during the configuration phase.

func (StreamRegistry) ForEachStream

func (registry StreamRegistry) ForEachStream(callback func(streamID MessageStreamID, stream Stream))

ForEachStream loops over all registered streams and calls the given function.

func (StreamRegistry) GetStream

func (registry StreamRegistry) GetStream(id MessageStreamID) Stream

GetStream returns a registered stream or nil

func (StreamRegistry) GetStreamByName

func (registry StreamRegistry) GetStreamByName(name string) Stream

GetStreamByName returns a registered stream by name. See GetStream.

func (StreamRegistry) GetStreamName

func (registry StreamRegistry) GetStreamName(streamID MessageStreamID) string

GetStreamName does a reverse lookup for a given MessageStreamID and returns the corresponding name. If the MessageStreamID is not registered, an empty string is returned.

func (*StreamRegistry) GetStreamOrFallback

func (registry *StreamRegistry) GetStreamOrFallback(streamID MessageStreamID) Stream

GetStreamOrFallback returns the stream for the given id if it is registered. If no stream is registered for the given id the default stream is used. The default stream is equivalent to an unconfigured stream.Broadcast with all wildcard producers allready added.

func (StreamRegistry) IsStreamRegistered

func (registry StreamRegistry) IsStreamRegistered(id MessageStreamID) bool

IsStreamRegistered returns true if the stream for the given id is registered.

func (*StreamRegistry) Register

func (registry *StreamRegistry) Register(stream Stream, streamID MessageStreamID)

Register registeres a stream plugin to a given stream id

func (*StreamRegistry) RegisterWildcardProducer

func (registry *StreamRegistry) RegisterWildcardProducer(producers ...Producer)

RegisterWildcardProducer adds a new producer to the list of known wildcard prodcuers. This list has to be added to new streams upon creation to send messages to producers listening to *. Duplicates will be filtered. This state of this list is undefined during the configuration phase.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL