feeder

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2019 License: BSD-3-Clause Imports: 19 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPFeeder

type AMQPFeeder struct {
	StopChan            chan bool
	StoppedChan         chan bool
	IsRunning           bool
	Consumer            *Consumer
	URL                 string
	Exchanges           []string
	Queue               string
	MakeObservationFunc format.MakeObservationFunc
}

AMQPFeeder is a Feeder that accepts input via AMQP queues.

func MakeAMQPFeeder

func MakeAMQPFeeder(url string, exchanges []string, queue string) *AMQPFeeder

MakeAMQPFeeder returns a new AMQPFeeder, connecting to the AMQP server at the given URL, creating a new queue with the given name bound to the provided exchanges.

func (*AMQPFeeder) NewConsumer

func (f *AMQPFeeder) NewConsumer(amqpURI string, exchanges []string, exchangeType, queueName, key,
	ctag string, out chan observation.InputObservation) (*Consumer, error)

NewConsumer returns a new Consumer.

func (*AMQPFeeder) NewConsumerWithReconnector

func (f *AMQPFeeder) NewConsumerWithReconnector(amqpURI string, exchanges []string, exchangeType,
	queueName, key, ctag string, out chan observation.InputObservation,
	reconnector func(string) (wabbit.Conn, string, error)) (*Consumer, error)

NewConsumerWithReconnector creates a new consumer with the given properties. The callback function is called for each delivery accepted from a consumer channel.

func (*AMQPFeeder) Run

func (f *AMQPFeeder) Run(out chan observation.InputObservation) error

Run starts the feeder.

func (*AMQPFeeder) SetInputDecoder

func (f *AMQPFeeder) SetInputDecoder(fn format.MakeObservationFunc)

SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.

func (*AMQPFeeder) Stop

func (f *AMQPFeeder) Stop(stopChan chan bool)

Stop causes the feeder to stop accepting deliveries and close all associated channels, including the passed notification channel.

type Consumer

type Consumer struct {
	URL string

	Callback            func(wabbit.Delivery)
	StopReconnection    chan bool
	ChanMutex           sync.Mutex
	ConnMutex           sync.Mutex
	OutChan             chan observation.InputObservation
	MakeObservationFunc format.MakeObservationFunc
	ErrorChan           chan wabbit.Error
	Reconnector         func(string) (wabbit.Conn, string, error)
	Connector           func(*Consumer) error
	// contains filtered or unexported fields
}

Consumer reads and processes messages from a fake RabbitMQ server.

func (*Consumer) Shutdown

func (c *Consumer) Shutdown() error

Shutdown shuts down a consumer, closing down its channels and connections.

type Feeder

type Feeder interface {
	Run(chan observation.InputObservation) error
	SetInputDecoder(format.MakeObservationFunc)
	Stop(chan bool)
}

Feeder is an interface of a component that accepts observations in a specific format and feeds them into a channel of InputObservations. An input decoder in the form of a MakeObservationFunc describes the operations necessary to transform the input format into an InputObservation.

type HTTPFeeder

type HTTPFeeder struct {
	StopChan            chan bool
	StoppedChan         chan bool
	IsRunning           bool
	Port                int
	Host                string
	MakeObservationFunc format.MakeObservationFunc
	Server              *http.Server
	OutChan             chan observation.InputObservation
}

HTTPFeeder is a Feeder implementation that accepts HTTP requests to obtain observations.

func MakeHTTPFeeder

func MakeHTTPFeeder(host string, port int) *HTTPFeeder

MakeHTTPFeeder creates a new HTTPFeeder listening on a specific address and port.

func (*HTTPFeeder) Run

func (f *HTTPFeeder) Run(out chan observation.InputObservation) error

Run starts the feeder.

func (*HTTPFeeder) ServeHTTP

func (f *HTTPFeeder) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*HTTPFeeder) SetInputDecoder

func (f *HTTPFeeder) SetInputDecoder(fn format.MakeObservationFunc)

SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.

func (*HTTPFeeder) Stop

func (f *HTTPFeeder) Stop(stopChan chan bool)

Stop causes the feeder to stop accepting requests and close all associated channels, including the passed notification channel.

type Setup

type Setup struct {
	Feeder []struct {
		Name        string `yaml:"name"`
		Type        string `yaml:"type"`
		InputFormat string `yaml:"input_format"`
		// for AMQP
		URL      string   `yaml:"url"`
		Exchange []string `yaml:"exchange"`
		// for HTTP etc.
		ListenHost string `yaml:"listen_host"`
		ListenPort int    `yaml:"listen_port"`
		// for socket input
		Path string `yaml:"path"`
	} `yaml:"feeder"`
	Feeders map[string]Feeder
}

Setup describes a collection of feeders that should be active, including their configuration settings.

func LoadSetup

func LoadSetup(in []byte) (*Setup, error)

LoadSetup creates a new Setup from a byte array containing YAML.

func (*Setup) Run

func (fs *Setup) Run(in chan observation.InputObservation) error

Run starts all feeders according to the description in the setup, in the background. Use Stop() to stop the feeders.

func (*Setup) Stop

func (fs *Setup) Stop(stopChan chan bool)

Stop causes all feeders described in the setup to cease processing input. The stopChan will be closed once all feeders are done shutting down.

type SocketFeeder

type SocketFeeder struct {
	ObsChan             chan observation.InputObservation
	Verbose             bool
	Running             bool
	InputListener       net.Listener
	MakeObservationFunc format.MakeObservationFunc
	StopChan            chan bool
	StoppedChan         chan bool
}

SocketFeeder is a Feeder implementation that reds data from a UNIX socket.

func MakeSocketFeeder

func MakeSocketFeeder(inputSocket string) (*SocketFeeder, error)

MakeSocketFeeder returns a new SocketFeeder reading from the Unix socket inputSocket and writing parsed events to outChan. If no such socket could be created for listening, the error returned is set accordingly.

func (*SocketFeeder) Run

func (sf *SocketFeeder) Run(out chan observation.InputObservation) error

Run starts the feeder.

func (*SocketFeeder) SetInputDecoder

func (sf *SocketFeeder) SetInputDecoder(fn format.MakeObservationFunc)

SetInputDecoder states that the given MakeObservationFunc should be used to parse and decode data delivered to this feeder.

func (*SocketFeeder) Stop

func (sf *SocketFeeder) Stop(stoppedChan chan bool)

Stop causes the SocketFeeder to stop reading from the socket and close all associated channels, including the passed notification channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL