channel

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Overview

Package channel provides fleetspeak.Message passing over interprocess pipes.

Index

Constants

This section is empty.

Variables

View Source
var (
	// MagicTimeout is how long we are willing to wait for a magic
	// number. Public to support testing. Should only be changed when no
	// channels are active.
	MagicTimeout = 5 * time.Minute

	// MessageTimeout is how long we are willing to wait for a message
	// body. Public to support testing. Should only be changed when no
	// channels are active.
	MessageTimeout = 5 * time.Minute
)

Functions

This section is empty.

Types

type Builder

type Builder func() (c *Channel, cancel func())

Builder must return a new Channel connected to the target process, along with a cancel function that should shut down the Channel and release any associated resources.

May return (nil, nil) if the system is shutting down and the RelentlessChannel using the builder should stop. Otherwise, should only return once it has a Channel.

type Channel

type Channel struct {
	In <-chan *fspb.Message // Messages received from the other process. Will be closed when the underlying pipe is closed.

	Out chan<- *fspb.Message // Messages to send to the other process. Close to shutdown the Channel.

	Err <-chan error // Any errors encountered.
	// contains filtered or unexported fields
}

Channel handles the communication of fspb.Messages over interprocess pipes.

NOTE: once any error occurs, the channel may be only partially functional. In that case, the channel should be shutdown and recreated.

In particular, once an error is written to Err, the user of Channel is responsible for ensuring that any current operations against the provided io.Reader and io.Writer interfaces will unblock and terminate.

func New

func New(pr io.ReadCloser, pw io.WriteCloser) *Channel

New instantiates a Channel. pr and pw will be closed when the Channel is shutdown.

func (*Channel) Wait

func (c *Channel) Wait()

Wait waits for all underlying threads to shutdown.

type RelentlessAcknowledger

type RelentlessAcknowledger struct {
	In <-chan service.AckMessage // Wraps Channel.In.
	// contains filtered or unexported fields
}

RelentlessAcknowledger partially wraps a Channel. It assumes that the other end of the Channel is attached to a RelentlessChannel and implements the acknowledgement protocol which RelentlessChannel expects.

Once a Channel is so wrapped, the caller should read from RelentlessAcknowledger.In instead of Channel.In. The resulting AckMessages should be acknowledged in order to inform the attached RelentlessChannel that the message was successfully handled.

func NewRelentlessAcknowledger

func NewRelentlessAcknowledger(c *Channel, size int) *RelentlessAcknowledger

NewRelentlessAcknowledger creates a RelentlessAcknowledger wrapping c, buffered to smoothly handle 'size' simultaneously unacknowledged messages.

func (*RelentlessAcknowledger) Stop

func (a *RelentlessAcknowledger) Stop()

Stop stops the RelentlessAcknowledger and closes a.In.

type RelentlessChannel

type RelentlessChannel struct {
	In  <-chan *fspb.Message      // Messages received from the other process.
	Out chan<- service.AckMessage // Messages to send to the other process. Close to shutdown.
	// contains filtered or unexported fields
}

A RelentlessChannel is like a Channel, but relentless. Essentially it wraps a Channel, which it recreates on error. Furthermore, it maintains a collection of messages which have not been acknowledged, and resends them after channel recreation. It also provides a mechanism for messages sent through it to be acknowledged by the other side of the channel.

func NewRelentlessChannel

func NewRelentlessChannel(b Builder) *RelentlessChannel

NewRelentlessChannel returns a RelentlessChannel which wraps Builder, and uses it to create channels.

Directories

Path Synopsis
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL