sender

package
v0.27.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: LGPL-2.1 Imports: 32 Imported by: 0

Documentation

Overview

Package sender is a set of types that implement skogul.Sender. A Sender in skogul is a simple primitive that receives skogul metrics and "does something with them".

The traditional and obvious sender accepts metrics and uses and external service to persist them to disk. E.g.: the InfluxDB sender stores the metrics to influxdb. The postgres sender accepts metrics and stores to postgres, and so forth.

The other type of senders are "internal", typical for routing. The classic examples are the "dupe" sender that accepts metrics and passes them on to multiple other senders - e.g.: Store to both postgres and influxdb. An other classic is the "fallback" sender: It has a list of senders and tries each one in order until one succeeds, allowing you to send to a primary influxdb normally - if influx fails, write to local disk, if that fails, write a message to the log.

The only thing a sender "must" do is implement Send(c *skogul.Container), and it is disallowed to modify the container in Send(), since multiple senders might be working on it at the same time.

To make a sender configurable, simply ensure data types in the type definition can be Unmarshalled from JSON. A small note on that is that it is necessary to use "SenderRef" and "HandlerRef" objects instead of Sender and Handler directly for now. This is to let the config engine track references that haven't resolved yet.

It also means certain data types need to be avoided or worked around. Currently, time.Duration is such an example, as it is missing a JSON unmrashaller. For such data types, a simple wrapper will do the trick, e.g. skogul.Duration wraps time.Duration.

Index

Constants

This section is empty.

Variables

Auto maps sender-names to sender implementation, used for auto configuration.

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	Next    skogul.SenderRef `doc:"The sender to try"`
	Base    skogul.Duration  `doc:"Initial delay after a failure. Will double for each retry"`
	Retries uint64           `doc:"Number of retries before giving up"`
	// contains filtered or unexported fields
}

Backoff sender will send to Next, but retry up to Retries times, with exponential backoff, starting with time.Duration

func (*Backoff) Send

func (bo *Backoff) Send(c *skogul.Container) error

Send with a delay

type Batch

type Batch struct {
	Next      skogul.SenderRef `doc:"Sender that will receive batched metrics"`
	Interval  skogul.Duration  `doc:"Flush the bucket after this duration regardless of how full it is. Default is 1s."`
	Threshold int              `doc:"Flush the bucket after reaching this amount of metrics. Default is 10."`
	Threads   int              `doc:"Number of threads for batch sender. Defaults to number of CPU cores."`
	Burner    skogul.SenderRef `` /* 520-byte string literal not displayed */
	// contains filtered or unexported fields
}

Batch sender collects metrics into a single container then passes them on after Threshold number of metrics are collected. In case Threshold is "never" reached, it will periodically flush metrics if no message has been received in Interval time.

Internally, the Batch sender consists of three parts. The first part is the Send() part, which just pushes the received container onto a channel.

The second part, which is a single, dedicated go routine, picks up said container and adds it to a batch-container. When the batch container is "full" (e.g.: exceeds Threshold) - or a timeout is reached - the batch container is pushed onto a second channel and a new, empty, batch container is created.

The third part picks up the ready-to-send containers and issues next.Send() on them. This is a separate go routine, one per NumCPU.

This means that:

  1. Batch sender will both do a "fan-in" from potentially multiple Send() calls.
  2. ... and do a fan-out afterwards.
  3. Send() will only block if two channels are full.

func (*Batch) Send

func (bat *Batch) Send(c *skogul.Container) error

Send batches up multiple metrics and passes them on after an interval or a set size is reached. It never returns error, since there is no way to know.

func (*Batch) Verify added in v0.19.0

func (bat *Batch) Verify() error

type Counter

type Counter struct {
	Next   skogul.SenderRef  `doc:"Reference to the next sender in the chain"`
	Stats  skogul.HandlerRef `doc:"Handler that will receive the stats periodically"`
	Period skogul.Duration   `doc:"How often to emit stats. Defaults to 1 second." example:"5s"`
	// contains filtered or unexported fields
}

Counter sender emits, periodically, the flow-rate of metrics through it. The stats are sent on to the Stats-sender every Period.

To avoid locks and support multiple go routines using the same counter, stats are sent over a channel to a separate goroutine that does the actual aggregation and calculation.

func (*Counter) Send

func (co *Counter) Send(c *skogul.Container) error

Send counts metrics, sends the count on a channel, then executes the next sender in the chain.

func (*Counter) Verify added in v0.11.0

func (co *Counter) Verify() error

Verify that the count sender works

type Debug

type Debug struct {
	Prefix  string            `doc:"Prefix to print before any metric"`
	Encoder skogul.EncoderRef `doc:"Which encoder to use. Defaults to prettyjson."`
}

Debug sender simply prints the metrics in json-marshalled format to stdout.

func (*Debug) Send

func (db *Debug) Send(c *skogul.Container) error

Send prints the JSON-formatted container to stdout

type Detacher

type Detacher struct {
	Next  skogul.SenderRef `doc:"Sender that receives the metrics."`
	Depth int              `doc:"How many containers can be pending delivery before we start blocking. Defaults to 1000."`
	// contains filtered or unexported fields
}

Detacher accepts a message, sends it to a channel, then picks it up on the other end in a separate go routine. This, unfortunately, leads to fan-in: if used in conjunction with HTTP receiver, for example, you end up going from multiple independent go routines to a single one, which is probably not what you want.

The purpose is to smooth out reading.

func (*Detacher) Send

func (de *Detacher) Send(c *skogul.Container) error

Send ensures a consumer exists, then transmits the container on a channel and returns immediately.

type Dupe

type Dupe struct {
	Next []*skogul.SenderRef `doc:"List of senders that will receive metrics, in order."`
}

Dupe sender executes all provided senders in turn.

func (*Dupe) Send

func (dp *Dupe) Send(c *skogul.Container) error

Send sends data down stream

type EnrichmentUpdater added in v0.15.1

type EnrichmentUpdater struct {
	Enricher skogul.TransformerRef `doc:"The enrichment transformer to update."`
}

EnrichmentUpdater sends any received container/metric to the update-function of the provided transformer, allowing on-the-fly updates to enrichment.

func (*EnrichmentUpdater) Send added in v0.15.1

Uses received metrics to update the enrichment transformer

func (*EnrichmentUpdater) Verify added in v0.15.1

func (e *EnrichmentUpdater) Verify() error

type ErrDiverter

type ErrDiverter struct {
	Next   skogul.SenderRef  `doc:"Send normal metrics here."`
	Err    skogul.HandlerRef `doc:"If the sender under Next fails, convert the error to a metric and send it here."`
	RetErr bool              `` /* 130-byte string literal not displayed */
}

ErrDiverter calls the Next sender, but if it fails, it will convert the error to a Container and send that to Err.

func (*ErrDiverter) Send

func (ed *ErrDiverter) Send(c *skogul.Container) error

Send data to the next sender. If it fails, use the Err sender.

type Fallback

type Fallback struct {
	Next []*skogul.SenderRef `doc:"Ordered list of senders that will potentially receive metrics."`
}

Fallback sender tries each provided sender in turn before failing.

E.g.:

primary := sender.InfluxDB{....}
secondary := sender.Queue{....} // Not implemented yet
emergency := sender.Debug{}

fallback := sender.Fallback{}
fallback.Add(&primary)
fallback.Add(&secondary)
fallback.Add(&emergency)

This will send data to Influx normally. If Influx fails, it will send it to a queue. If that fails, it will print it to stdout.

func (*Fallback) Send

func (fb *Fallback) Send(c *skogul.Container) error

Send sends data down stream XXX: Need to log failures?

type Fanout

type Fanout struct {
	Next    skogul.SenderRef `doc:"Sender receiving the metrics"`
	Workers int              `doc:"Number of worker threads in use. To _fan_in_ you can set this to 1."`
	// contains filtered or unexported fields
}

Fanout sender implements a worker pool for passing data on. This SHOULD be unnecessary, as the receiver should ideally do this for us (e.g.: the HTTP receiver does this natively). However, there might be times where it makes sense, specially since this can be used in reverse too: you can use the Fanout sender to limit the degree of concurrency that downstream is exposed to.

Again, this should really not be needed. If you use the fanout sender, be sure you understand why.

There only settings provided is "Next" to provide the next sender, and "Workers", that defines the size of the worker pool.

func (*Fanout) Send

func (fo *Fanout) Send(c *skogul.Container) error

Send ensures the workers are booted, then picks up a channel from available workers and sends the container to that container.

type File added in v0.8.0

type File struct {
	Path    string            `doc:"Absolute path to file to write. DEPRECATED - replaced by option File (to keep options more consistent across modules)."`
	File    string            `doc:"Absolute path to file to write to."`
	Append  bool              `` /* 130-byte string literal not displayed */
	Encoder skogul.EncoderRef `doc:"Which encoder to use. Defaults to JSON."`
	// contains filtered or unexported fields
}

File sender writes data to a file in various different fashions. Typical use will be debugging (write to disk) and writing to a FIFO for example.

Created file under path given by File option.

When SIGHUP signal is received File will be truncated. In case SIGHUP is received and the file doesn't exists the file will be created.

When Append option is supplied, and this sender receives a SIGHUP data will be appended to file, if the file exists.

func (*File) Deprecated added in v0.19.0

func (f *File) Deprecated() error

func (*File) Send added in v0.8.0

func (f *File) Send(c *skogul.Container) error

Send receives a skogul container and writes it to file.

func (*File) Verify added in v0.8.0

func (f *File) Verify() error

Verify checks that the configuration options are set appropriately

type ForwardAndFail

type ForwardAndFail struct {
	Next skogul.SenderRef `doc:"Sender receiving the metrics"`
}

ForwardAndFail sender will pass the container to the Next sender, but always returns an error. The use-case for this is to allow the fallback Sender or similar to eventually send data to a sender that ALWAYS works, e.g. the Debug-sender or just printing a message in the log, but we still want to propagate the error upwards in the stack so clients can take appropriate action.

Example use:

faf := sender.ForwardAndFail{Next: skogul.Debug{}} fb := sender.Fallback{Next: []skogul.Sender{influx, faf}}

func (*ForwardAndFail) Send

func (faf *ForwardAndFail) Send(c *skogul.Container) error

Send forwards the data to the next sender and always returns an error.

type HTTP

type HTTP struct {
	URL              string            `doc:"Fully qualified URL to send data to." example:"http://localhost:6081/ https://user:password@[::1]:6082/"`
	Headers          map[string]string `doc:"HTTP headers to be added to every request"`
	Timeout          skogul.Duration   `doc:"HTTP timeout."`
	Insecure         bool              `doc:"Disable TLS certificate validation."`
	ConnsPerHost     int               `doc:"Max concurrent connections per host. Should reflect ulimit -n. Defaults to unlimited."`
	IdleConnsPerHost int               `doc:"Max idle connections retained per host. Should reflect expected concurrency. Defaults to 2 + runtime.NumCPU."`
	RootCA           string            `doc:"Path to an alternate root CA used to verify server certificates. Leave blank to use system defaults."`
	Certfile         string            `doc:"Path to certificate file for TLS Client Certificate."`
	Keyfile          string            `doc:"Path to key file for TLS Client Certificate."`
	Encoder          skogul.EncoderRef `doc:"Encoder to use. Defaults to JSON-encoding."`
	// contains filtered or unexported fields
}

HTTP sender POSTs the Skogul JSON-encoded data to the provided URL.

func (*HTTP) GetStats added in v0.12.0

func (ht *HTTP) GetStats() *skogul.Metric

GetStats prepares a skogul metric with stats for the HTTP sender.

func (*HTTP) Send

func (ht *HTTP) Send(c *skogul.Container) error

Send POSTS data

func (*HTTP) Verify

func (ht *HTTP) Verify() error

Verify checks that configuration is sensible

type InfluxDB

type InfluxDB struct {
	URL                     string          `doc:"URL to InfluxDB API. Must include write end-point and database to write to." example:"http://[::1]:8086/write?db=foo"`
	Measurement             string          `doc:"Measurement name to write to."`
	MeasurementFromMetadata string          `` /* 177-byte string literal not displayed */
	Timeout                 skogul.Duration `doc:"HTTP timeout"`
	ConvertIntToFloat       bool            `doc:"Convert all integers to floats. Don't do this unless you really know why you're doing this."`
	Token                   skogul.Secret   `doc:"Authorization token used in InfluxDB 2.0"`
	// contains filtered or unexported fields
}

InfluxDB posts data to the provided URL and measurement, using the InfluxDB line format over HTTP.

func (*InfluxDB) Send

func (idb *InfluxDB) Send(c *skogul.Container) error

Send data to Influx, re-using idb.client.

func (*InfluxDB) Verify

func (idb *InfluxDB) Verify() error

Verify does a shallow verification of settings

type Kafka added in v0.16.0

type Kafka struct {
	Topic    string `doc:"Topic to write to."`
	Sync     bool   `doc:"Synchronous or not. By default, the sender is async."`
	Address  string `doc:"Address for the broker."`
	ClientID string `doc:"ClientID to use - uses lower-case skogul by default."`
	TLS      bool   `doc:"Enable TLS, off by default."`
	Username string `doc:"Username for SASL auth."`
	Password string `doc:"Password for SASL auth."`
	Encoder  skogul.EncoderRef
	// contains filtered or unexported fields
}

Kafka sender is a MVP-variant, and further features are reasonable and expected, including but not limited to:

- Authentication (coming before release) - Better control of batching, probably - Dynamic keys from metadata - Adjustment of various timeouts

func (*Kafka) Send added in v0.16.0

func (k *Kafka) Send(c *skogul.Container) error

type Log

type Log struct {
	Message string `doc:"Message to print."`
}

Log sender simply executes log.Print() on a predefined message.

Intended use is in combination with other senders, e.g. to explain WHY sender.Debug() was used.

func (Log) Send

func (lg Log) Send(c *skogul.Container) error

Send logs a message and does no further processing

type MQTT

type MQTT struct {
	Broker   string   `doc:"Address of broker to send to" example:"[::1]:8888"`
	Topics   []string `doc:"Topic(s) to publish events to"`
	Username string   `doc:"MQTT broker authorization username"`
	Password string   `doc:"MQTT broker authorization password"`
	ClientID string   `doc:"Custom client id to use (default: random)"`
	// contains filtered or unexported fields
}

MQTT Sender publishes messages on a MQTT message bus.

FIXME: The MQTT-sender and receiver should be updated to not use the url-encoded scheme.

func (*MQTT) Send

func (handler *MQTT) Send(c *skogul.Container) error

Send publishes the container in skogul JSON-encoded format on an MQTT topic.

func (*MQTT) Verify added in v0.4.0

func (handler *MQTT) Verify() error

Verify makes sure required configuration options are set

type Match added in v0.15.0

type Match struct {
	Conditions []map[string]interface{} `doc:"Array of metadata headers and required values."`
	Next       *skogul.SenderRef        `doc:"Sender to use in case of a match."`
}

Match describes a list of conditions that need to match for a sender to receive metrics.

type MnR

type MnR struct {
	Address      string `doc:"Address to send data to" example:"192.168.1.99:1234"`
	DefaultGroup string `doc:"Default group to use if the metadatafield group is missing."`
}

MnR sender writes to M&R port collector.

The output format is:

<timestamp>\t<groupname>\t<variable>\t<value>(\t<property>=<value>)*

Example:

1199145600 group myDevice.Variable1 100 device=myDevice name=MyVariable1

Two special metadata fields can be provided: "group" will set the M&R storage group, and "prefix" will be used to prefix all individual data variables.

E.g:

{
    "template": {
	    "timestamp": "2019-03-15T11:08:02+01:00",
	    "metadata": {
		"server": "somewhere.example.com"
	    }
    },
    "metrics": [
	{
	    "metadata": {
		"prefix": "myDevice.",
		"key": "value",
		"paramkey": "paramvalue"
	    },
	    "data": {
		"astring": "text",
		"float": 1.11,
		"integer": 5
	    }
	}
    ]
}

Will result in:

1552644482	group	myDevice.astring	text		key=value	paramkey=paramvalue	server=somewhere.example.com
1552644482	group	myDevice.float	1.11		key=value	paramkey=paramvalue	server=somewhere.example.com
1552644482	group	myDevice.integer	5		key=value	paramkey=paramvalue	server=somewhere.example.com

The default group is set to that of MnR DefaultGroup. If this is unset, the default group is "group". Meaning:

- If metadata provides "group" key, this is used - Otherwise, if DefaultGroup is set in MnR sender, this is used - Otherwise, "group" is used.

func (*MnR) Send

func (mnr *MnR) Send(c *skogul.Container) error

Send to MnR.

Implementation details: We need to write each value as its own variable to MnR, so we start by constructing two buffers for what comes before and after the key\tvalue, then iterate over m.Data.

Also, we open a new TCP connection for each call to Send() at the moment, which is really suboptimal for large quantities of data, but ok for occasional data dumps. If large metric containers are received, the cost will be negligible. But this should, of course, be fixed in the future.

type Nats added in v0.21.0

type Nats struct {
	Servers       string   `doc:"Comma separated list of nats URLs"`
	Subject       string   `doc:"Subject to publish messages on"`
	SubjectAppend []string `doc:"Append theese Metadata fields to subject"`
	Name          string   `doc:"Client name"`
	Username      string   `doc:"Client username"`
	Password      string   `doc:"Client password"`
	TLSClientKey  string   `doc:"TLS client key file path"`
	TLSClientCert string   `doc:"TLS client cert file path"`
	TLSCACert     string   `doc:"CA cert file path"`
	UserCreds     string   `doc:"Nats credentials file path"`
	NKeyFile      string   `doc:"Nats nkey file path"`
	Insecure      bool     `doc:"TLS InsecureSkipVerify"`
	Encoder       skogul.EncoderRef
	// contains filtered or unexported fields
}

func (*Nats) Send added in v0.21.0

func (n *Nats) Send(c *skogul.Container) error

func (*Nats) Verify added in v0.21.0

func (n *Nats) Verify() error

Verify configuration

type Net

type Net struct {
	Address string `doc:"Address to send data to" example:"192.168.1.99:1234"`
	Network string `doc:"Network, according to net.Dial. Typically udp or tcp."`
}

Net sends metrics to a network address FIXME: Use Encoder

func (*Net) Send

func (n *Net) Send(c *skogul.Container) error

Send sends metrics to a network address, json-encoded

func (*Net) Verify added in v0.10.9

func (n *Net) Verify() error

type Null

type Null struct{}

Null sender does nothing and returns nil - mainly for test-purposes

func (*Null) Send

func (n *Null) Send(c *skogul.Container) error

Send just returns nil

type Rabbitmq added in v0.24.0

type Rabbitmq struct {
	Username skogul.Secret     `doc:"Username for rabbitmq instance"`
	Password skogul.Secret     `doc:"Password for rabbitmq instance"`
	Host     string            `doc:"Hostname for rabbitmq instance. Fallback is localhost"`
	Port     string            `doc:"Port for rabbitmq instance. Fallback is 5672"`
	Queue    string            `doc:"Queue to write to"`
	Encoder  skogul.EncoderRef `doc:"Encoder to use. Fallback is json"`
	Timeout  int               `doc:"Timeout for rabbitmq instance connection. Fallback is 10 seconds."`
	// contains filtered or unexported fields
}

func (*Rabbitmq) Send added in v0.24.0

func (r *Rabbitmq) Send(c *skogul.Container) error

func (*Rabbitmq) Verify added in v0.24.0

func (r *Rabbitmq) Verify() error

type SNMP added in v0.24.0

type SNMP struct {
	Port      uint16                 `doc:"Snmp port"`
	Community string                 `doc:"Snmp communit field"`
	Version   string                 `doc:"Snmp version possible values: 2c, 3"`
	Target    string                 `doc:"Snmp target"`
	Oidmap    map[string]interface{} `doc:"Snmp oid to json field mapping"`
	Timeout   uint                   `doc:"Snmp timeout, default 5 seconds"`

	SnmpTrapOID string `doc:"Value of the snmp trap oid pdu"`
	// contains filtered or unexported fields
}

func (*SNMP) Send added in v0.24.0

func (x *SNMP) Send(c *skogul.Container) error

type SQL

type SQL struct {
	ConnStr string `` /* 233-byte string literal not displayed */
	Query   string `` /* 708-byte string literal not displayed */
	Driver  string `doc:"Database driver/system. Currently suported: mysql and postgres."`
	// contains filtered or unexported fields
}

SQL sender connects to a SQL Database, currently either MySQL(or Mariadb I suppose) or Postgres. The Connection String for MySQL is specified at https://github.com/go-sql-driver/mysql/ and postgres at http://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING .

The query is expanded using os.Expand() and will fill in timestamp/metadata/data. The sender will prep the query and essentially covert INSERT INTO foo VLAUES(${timestamp},${metadata.foo},${someData}) to foo("INSERT INTO foo VALUES(?,?,?)", timestamp, foo, someData), so they will be sensibly escaped.

func (*SQL) Send

func (sq *SQL) Send(c *skogul.Container) error

Send will send to the database, after first ensuring the connection is OK.

func (*SQL) Verify

func (sq *SQL) Verify() error

Verify ensures options are set, but currently doesn't check very well, since it is disallowed from connecting to a database and such.

type Sleeper

type Sleeper struct {
	Next     skogul.SenderRef `doc:"Sender that will receive delayed metrics"`
	MaxDelay skogul.Duration  `doc:"The maximum delay we will suffer"`
	Base     skogul.Duration  `doc:"The baseline - or minimum - delay"`
	Verbose  bool             `doc:"If set to true, will log delay durations"`
}

The Sleeper sender injects a random delay between Base and Base+MaxDelay before passing execution over to the Next sender.

The purpose is testing.

func (*Sleeper) Send

func (sl *Sleeper) Send(c *skogul.Container) error

Send sleeps a random duration according to Sleeper spec, then passes the data to the next sender.

type Splunk added in v0.10.0

type Splunk struct {
	URL           string        `doc:"URL to Splunk HTTP Event Collector (HEC)"`
	Token         skogul.Secret `doc:"Token for HTTP Authorization header for HEC endpoint."`
	Index         string        `doc:"Custom Splunk index to send event to."`
	HostnameField string        `` /* 134-byte string literal not displayed */
	SourceField   string        `doc:"Name of the metadata field with the source. Will fallback to the value set in Source if not found."`
	Source        string        `` /* 138-byte string literal not displayed */
	HTTP          *HTTP         `doc:"HTTP sender options. URL is overwritten from this config, the rest will be HTTP sender defaults unless overridden."`
	// contains filtered or unexported fields
}

Splunk contains the configuration parameters for this sender.

func (*Splunk) Send added in v0.10.0

func (s *Splunk) Send(c *skogul.Container) error

Send sends a skogul container to Splunk HEC

func (*Splunk) Verify added in v0.10.0

func (s *Splunk) Verify() error

Verify verifies that the sender config is valid

type Switch added in v0.15.0

type Switch struct {
	Default *skogul.SenderRef   `doc:"Default sender to use if no other match is made. If not specified, metrics are discarded."`
	Map     []Match             `doc:"List of match conditions."`
	Next    []*skogul.SenderRef `doc:"Ordered list of senders that will potentially receive metrics."`
}

Switch sender sends metrics selectively based on metadata.

Example config:

{
	"type": "switch",
	"map": [
		{
			"conditions": [{
				"router": "foo",
				"interface": "ae12"
			},
			{
				"router": "bar",
				"interface": "ae0"
			}],
			"next": "customerExportA"
		},
		{
			"conditions": [{
				"router": "foo",
				"interface": "ae5"
			}],
			"next": "customerExportB"
		}
	],
	"default": "log-no-customer"

func (*Switch) Send added in v0.15.0

func (sw *Switch) Send(c *skogul.Container) error

Send sends data down stream. Note that it is allowed to create new containers, but we CAN NOT modify the original, and we CAN NOT modify the metrics them self. This means that this is not an optimal

type TCP added in v0.25.0

type TCP struct {
	Address      string            `doc:"Address to send data to" example:"192.168.1.99:1234"`
	MaxRetries   int               `` /* 150-byte string literal not displayed */
	Encoder      skogul.EncoderRef `doc:"Encoder to use. Defaults to JSON-encoding."`
	DialTimeout  skogul.Duration   `doc:"Timeout for dialing. Includes DNS lookups and tcp connect."`
	WriteTimeout skogul.Duration   `doc:"Write timeout. Strongly advised to set this to single-digit seconds."`
	KeepAlive    skogul.Duration   `doc:"Keepalive timer for TCP."`
	Delimiter    []byte            `` /* 128-byte string literal not displayed */
	Threads      int               `` /* 177-byte string literal not displayed */
	// contains filtered or unexported fields
}

TCP sender is optimized around TCP sockets, though it is by no means perfect. Ideally, use a properly stateful protocol like HTTP.

The main issue with the TCP sender is buffering and error detection. It is difficult to be both performant and detect errors as they happen, and this isn't made easier by the abstractions of Go.

As such, this sender works well, but does not offer a guarantee that errors are actually detected when they happen.

func (*TCP) Send added in v0.25.0

func (t *TCP) Send(c *skogul.Container) error

Send sends metrics over TCP

func (*TCP) Verify added in v0.25.0

func (t *TCP) Verify() error

type Test

type Test struct {
	// contains filtered or unexported fields
}

Test sender is used to facilitate tests, and discards any metrics, but increments the Received counter.

func (*Test) Received

func (rcv *Test) Received() uint64

Received returns the amount of containers received

func (*Test) Send

func (rcv *Test) Send(c *skogul.Container) error

Send discards data and increments the Received counter

func (*Test) Set

func (rcv *Test) Set(v uint64)

Set atomicly sets the received counter to v

func (*Test) SetSync

func (rcv *Test) SetSync(v bool)

SetSync sets the tester up for synchronized testing. Probably should be the default from now on....

func (*Test) TestNegative

func (rcv *Test) TestNegative(t failer, s skogul.Sender, c *skogul.Container)

TestNegative sends data on s and expects to fail.

func (*Test) TestQuick

func (rcv *Test) TestQuick(t failer, sender skogul.Sender, c *skogul.Container, received uint64)

TestQuick sends data on the sender and waits 5 milliseconds before checking that the data was received on the other end.

func (*Test) TestSync

func (rcv *Test) TestSync(t failer, s skogul.Sender, c *skogul.Container, send int, received int)

TestSync sends items on and waits until an ack of reception has been seen from the other end. send and received are usually the same, but not always, e.g., for the batch sender.

FIXME: Should/must add a select here to handle timeouts.

func (*Test) TestTime

func (rcv *Test) TestTime(t failer, s skogul.Sender, c *skogul.Container, received uint64, delay time.Duration)

TestTime sends the container on the specified sender, waits delay period of time, then verifies that rcv has received the expected number of containers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL