Documentation ¶
Index ¶
- Constants
- func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, ...) (<-chan contracts.Entity, <-chan error)
- func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, ...) (<-chan contracts.Entity, <-chan error)
- func WrapCmdErr(cmd redis.Cmder) error
- type Client
- func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error)
- func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error)
- func (c *Client) XReadUntilResult(ctx context.Context, a *redis.XReadArgs) ([]redis.XStream, error)
- func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error)
- type HPair
- type Heartbeat
- type HeartbeatMessage
- type Options
- type Streams
Constants ¶
const Timeout = time.Minute
Timeout defines how long a heartbeat may be absent if a heartbeat has already been received. After this time, a heartbeat loss is propagated.
Variables ¶
This section is empty.
Functions ¶
func CreateEntities ¶
func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error)
CreateEntities streams and creates entities from the given Redis field value pairs using the specified factory function, and streams them on a returned channel.
func SetChecksums ¶
func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error)
SetChecksums concurrently streams from the given entities and sets their checksums using the specified map and streams the results on a returned channel.
func WrapCmdErr ¶
func WrapCmdErr(cmd redis.Cmder) error
WrapCmdErr adds the command itself and the stack of the current goroutine to the command's error if any.
Types ¶
type Client ¶
type Client struct { *redis.Client Options *Options // contains filtered or unexported fields }
Client is a wrapper around redis.Client with streaming and logging capabilities.
func NewClient ¶
NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client.
func (*Client) HMYield ¶
func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error)
HMYield yields HPair field-value pairs for the specified fields in the hash stored at key.
func (*Client) HYield ¶
HYield yields HPair field-value pairs for all fields in the hash stored at key.
func (*Client) XReadUntilResult ¶
XReadUntilResult (repeatedly) calls XREAD with the specified arguments until a result is returned. Each call blocks at most for the duration specified in Options.BlockTimeout until data is available before it times out and the next call is made. This also means that an already set block timeout is overridden.
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. Also signals on if the heartbeat is Lost.
func NewHeartbeat ¶
NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
func (*Heartbeat) Close ¶
Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. Implements the io.Closer interface.
func (*Heartbeat) Done ¶
func (h *Heartbeat) Done() <-chan struct{}
Done returns a channel that will be closed when the heartbeat controller loop has ended.
func (*Heartbeat) Err ¶
Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
func (*Heartbeat) Events ¶
func (h *Heartbeat) Events() <-chan *HeartbeatMessage
Events returns a channel that is sent to on heartbeat events.
A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
func (*Heartbeat) LastReceived ¶
LastReceived returns the last heartbeat's receive time in ms.
type HeartbeatMessage ¶
type HeartbeatMessage struct {
// contains filtered or unexported fields
}
HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
func (*HeartbeatMessage) EnvironmentID ¶
func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error)
EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message.
func (*HeartbeatMessage) ExpiryTime ¶
func (m *HeartbeatMessage) ExpiryTime() time.Time
ExpiryTime returns the timestamp when the heartbeat expires.
func (*HeartbeatMessage) Stats ¶
func (m *HeartbeatMessage) Stats() *v1.StatsMessage
Stats returns the underlying heartbeat message from the icinga:stats stream.
type Options ¶
type Options struct { BlockTimeout time.Duration `yaml:"block_timeout" default:"1s"` HMGetCount int `yaml:"hmget_count" default:"4096"` HScanCount int `yaml:"hscan_count" default:"4096"` MaxHMGetConnections int `yaml:"max_hmget_connections" default:"8"` Timeout time.Duration `yaml:"timeout" default:"30s"` XReadCount int `yaml:"xread_count" default:"4096"` }
Options define user configurable Redis options.