mode

package
v0.0.0-...-320d922 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2018 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package mode defines and implents output strategies with failover or load balancing modes for use by output plugins.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoConnectionConfigured indicates no configured connections for publishing.
	ErrNoConnectionConfigured = errors.New("No connection configured")
)
View Source
var ErrNoHostsConfigured = errors.New("no host configuration found")

ErrNoHostsConfigured indicates missing host or hosts configuration

View Source
var (
	// ErrTempBulkFailure indicates PublishEvents fail temporary to retry.
	ErrTempBulkFailure = errors.New("temporary bulk send failure")
)

Functions

func ReadHostList

func ReadHostList(cfg *ucfg.Config) ([]string, error)

Types

type AsyncLoadBalancerMode

type AsyncLoadBalancerMode struct {
	// contains filtered or unexported fields
}

AsyncLoadBalancerMode balances the sending of events between multiple connections.

The balancing algorithm is mostly pull-based, with multiple workers trying to pull some amount of work from a shared queue. Workers will try to get a new work item only if they have a working/active connection. Workers without active connection do not participate until a connection has been re-established. Due to the pull based nature the algorithm will load-balance events by random with workers having less latencies/turn-around times potentially getting more work items then other workers with higher latencies. Thusly the algorithm dynamically adapts to resource availability of server events are forwarded to.

Workers not participating in the load-balancing will continuously try to reconnect to their configured endpoints. Once a new connection has been established, these workers will participate in in load-balancing again.

If a connection becomes unavailable, the events are rescheduled for another connection to pick up. Rescheduling events is limited to a maximum number of send attempts. If events have not been send after maximum number of allowed attemps has been passed, they will be dropped.

Like network connections, distributing events to workers is subject to timeout. If no worker is available to pickup a message for sending, the message will be dropped internally after max_retries. If mode or message requires guaranteed send, message is retried infinitely.

func NewAsyncLoadBalancerMode

func NewAsyncLoadBalancerMode(
	clients []AsyncProtocolClient,
	maxAttempts int,
	waitRetry, timeout, maxWaitRetry time.Duration,
) (*AsyncLoadBalancerMode, error)

NewAsyncLoadBalancerMode create a new load balancer connection mode.

func (*AsyncLoadBalancerMode) Close

func (m *AsyncLoadBalancerMode) Close() error

Close stops all workers and closes all open connections. In flight events are signaled as failed.

func (*AsyncLoadBalancerMode) PublishEvent

func (m *AsyncLoadBalancerMode) PublishEvent(
	signaler outputs.Signaler,
	opts outputs.Options,
	event common.MapStr,
) error

PublishEvent forwards the event to some load balancing worker.

func (*AsyncLoadBalancerMode) PublishEvents

func (m *AsyncLoadBalancerMode) PublishEvents(
	signaler outputs.Signaler,
	opts outputs.Options,
	events []common.MapStr,
) error

PublishEvents forwards events to some load balancing worker.

type AsyncProtocolClient

type AsyncProtocolClient interface {
	Connectable

	AsyncPublishEvents(cb func([]common.MapStr, error), events []common.MapStr) error

	AsyncPublishEvent(cb func(error), event common.MapStr) error
}

AsyncProtocolClient interface is a output plugin specfic client implementation for asynchronous encoding and publishing events.

func MakeAsyncClients

func MakeAsyncClients(
	config *ucfg.Config,
	newClient func(string) (AsyncProtocolClient, error),
) ([]AsyncProtocolClient, error)

func NewAsyncFailoverClient

func NewAsyncFailoverClient(clients []AsyncProtocolClient) []AsyncProtocolClient

type Connectable

type Connectable interface {
	// Connect establishes a connection to the clients sink.
	// The connection attempt shall report an error if no connection could been
	// established within the given time interval. A timeout value of 0 == wait
	// forever.
	Connect(timeout time.Duration) error

	// Close closes the established connection.
	Close() error

	// IsConnected indicates the clients connection state. If connection has
	// been lost while publishing events, IsConnected must return false. As long as
	// IsConnected returns false, an output plugin might try to re-establish the
	// connection by calling Connect.
	IsConnected() bool
}

type ConnectionMode

type ConnectionMode interface {
	// Close will stop the modes it's publisher loop and close all it's
	// associated clients
	Close() error

	// PublishEvents will send all events (potentially asynchronous) to its
	// clients.
	PublishEvents(trans outputs.Signaler, opts outputs.Options, events []common.MapStr) error

	// PublishEvent will send an event to its clients.
	PublishEvent(trans outputs.Signaler, opts outputs.Options, event common.MapStr) error
}

ConnectionMode takes care of connecting to hosts and potentially doing load balancing and/or failover

func NewAsyncConnectionMode

func NewAsyncConnectionMode(
	clients []AsyncProtocolClient,
	failover bool,
	maxAttempts int,
	waitRetry, timeout, maxWaitRetry time.Duration,
) (ConnectionMode, error)

func NewConnectionMode

func NewConnectionMode(
	clients []ProtocolClient,
	failover bool,
	maxAttempts int,
	waitRetry, timeout, maxWaitRetry time.Duration,
) (ConnectionMode, error)

type LoadBalancerMode

type LoadBalancerMode struct {
	// contains filtered or unexported fields
}

LoadBalancerMode balances the sending of events between multiple connections.

The balancing algorithm is mostly pull-based, with multiple workers trying to pull some amount of work from a shared queue. Workers will try to get a new work item only if they have a working/active connection. Workers without active connection do not participate until a connection has been re-established. Due to the pull based nature the algorithm will load-balance events by random with workers having less latencies/turn-around times potentially getting more work items then other workers with higher latencies. Thusly the algorithm dynamically adapts to resource availability of server events are forwarded to.

Workers not participating in the load-balancing will continuously try to reconnect to their configured endpoints. Once a new connection has been established, these workers will participate in in load-balancing again.

If a connection becomes unavailable, the events are rescheduled for another connection to pick up. Rescheduling events is limited to a maximum number of send attempts. If events have not been send after maximum number of allowed attemps has been passed, they will be dropped.

Distributing events to workers is subject to timeout. If no worker is available to pickup a message for sending, the message will be dropped internally.

func NewLoadBalancerMode

func NewLoadBalancerMode(
	clients []ProtocolClient,
	maxAttempts int,
	waitRetry, timeout, maxWaitRetry time.Duration,
) (*LoadBalancerMode, error)

NewLoadBalancerMode create a new load balancer connection mode.

func (*LoadBalancerMode) Close

func (m *LoadBalancerMode) Close() error

Close waits for the workers to end and connections to close.

func (*LoadBalancerMode) PublishEvent

func (m *LoadBalancerMode) PublishEvent(
	signaler outputs.Signaler,
	opts outputs.Options,
	event common.MapStr,
) error

PublishEvent forwards the event to some load balancing worker.

func (*LoadBalancerMode) PublishEvents

func (m *LoadBalancerMode) PublishEvents(
	signaler outputs.Signaler,
	opts outputs.Options,
	events []common.MapStr,
) error

PublishEvents forwards events to some load balancing worker.

type ProtocolClient

type ProtocolClient interface {
	Connectable

	// PublishEvents sends events to the clients sink. On failure or timeout err
	// must be set. If connection has been lost, IsConnected must return false
	// in future calls.
	// PublishEvents is free to publish only a subset of given events, even in
	// error case. On return nextEvents contains all events not yet published.
	PublishEvents(events []common.MapStr) (nextEvents []common.MapStr, err error)

	// PublishEvent sends one event to the clients sink. On failure and error is
	// returned.
	PublishEvent(event common.MapStr) error
}

ProtocolClient interface is a output plugin specific client implementation for encoding and publishing events. A ProtocolClient must be able to connection to it's sink and indicate connection failures in order to be reconnected byte the output plugin.

func MakeClients

func MakeClients(
	config *ucfg.Config,
	newClient func(string) (ProtocolClient, error),
) ([]ProtocolClient, error)

MakeClients will create a list from of ProtocolClient instances from outputer configuration host list and client factory function.

func NewFailoverClient

func NewFailoverClient(clients []ProtocolClient) []ProtocolClient

type SingleConnectionMode

type SingleConnectionMode struct {
	// contains filtered or unexported fields
}

SingleConnectionMode sends all Output on one single connection. If connection is not available, the output plugin blocks until the connection is either available again or the connection mode is closed by Close.

func NewSingleConnectionMode

func NewSingleConnectionMode(
	client ProtocolClient,
	maxAttempts int,
	waitRetry, timeout, maxWaitRetry time.Duration,
) (*SingleConnectionMode, error)

NewSingleConnectionMode creates a new single connection mode using exactly one ProtocolClient connection.

func (*SingleConnectionMode) Close

func (s *SingleConnectionMode) Close() error

Close closes the underlying connection.

func (*SingleConnectionMode) PublishEvent

func (s *SingleConnectionMode) PublishEvent(
	signaler outputs.Signaler,
	opts outputs.Options,
	event common.MapStr,
) error

PublishEvent forwards a single event. On failure PublishEvent tries to reconnect.

func (*SingleConnectionMode) PublishEvents

func (s *SingleConnectionMode) PublishEvents(
	signaler outputs.Signaler,
	opts outputs.Options,
	events []common.MapStr,
) error

PublishEvents tries to publish the events with retries if connection becomes unavailable. On failure PublishEvents tries to reconnect.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL