channel

package
v0.0.0-...-50c22c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConnectionIdHeader              = 0
	ReplyForHeader                  = 1
	ResultSuccessHeader             = 2
	HelloRouterAdvertisementsHeader = 3
	HelloVersionHeader              = 4

	// Headers in the range 128-255 inclusive will be reflected when creating replies
	ReflectedHeaderBitMask = 1 << 7
	MaxReflectedHeader     = (1 << 8) - 1
)

*

  • Message headers notes
  • 0-127 reserved for channel
  • 128-255 reserved for headers that need to be reflected back to sender on responses
  • 128 is used for a message UUID for tracing
  • 1000-1099 reserved for edge messages
  • 1100-1199 is reserved for control plane messages
  • 2000-2500 is reserved for xgress messages
  • 2000-2255 is reserved for xgress implementation headers
View Source
const (
	ContentTypeHelloType           = 0
	ContentTypePingType            = 1
	ContentTypeResultType          = 2
	ContentTypeLatencyType         = 3
	ContentTypeLatencyResponseType = 4
)
View Source
const (
	Highest  = 0
	High     = 1024
	Standard = 4096
	Low      = 10240
)
View Source
const AnyContentType = -1
View Source
const DECODER = "channel"
View Source
const DecoderFieldName = "__decoder__"
View Source
const HelloSequence = -1
View Source
const MessageFieldName = "__message__"

Variables

View Source
var ListenerClosedError = listenerClosedError{}
View Source
var UnknownVersionError = errors.New("channel synchronization error, bad magic number")

Functions

func AcceptNextChannel

func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) error

func DecodeTraceAndFormat

func DecodeTraceAndFormat(decode []byte) string

func SetUnderlayRegistrySequence

func SetUnderlayRegistrySequence(sequence *sequence.Sequence)

Types

type BindHandler

type BindHandler interface {
	BindChannel(ch Channel) error
}

type Binding

type Binding interface {
	Bind(h BindHandler) error
	AddPeekHandler(h PeekHandler)
	AddTransformHandler(h TransformHandler)
	AddReceiveHandler(h ReceiveHandler)
	AddErrorHandler(h ErrorHandler)
	AddCloseHandler(h CloseHandler)
	SetUserData(data interface{})
	GetUserData() interface{}
}

Binding is used to add handlers to Channel.

NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. This API may change in the future to enforce those semantics programmatically.

type Channel

type Channel interface {
	Identity
	SetLogicalName(logicalName string)
	Binding
	Sender
	io.Closer
	IsClosed() bool
	Underlay() Underlay
	StartRx()
	GetTimeSinceLastRead() time.Duration
}

Channel represents an asyncronous, message-passing endpoint, designed to sit on top of an underlay.

func NewChannel

func NewChannel(logicalName string, underlayFactory UnderlayFactory, options *Options) (Channel, error)

func NewChannelWithTransportConfiguration

func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, options *Options, tcfg transport.Configuration) (Channel, error)

type CloseHandler

type CloseHandler interface {
	HandleClose(ch Channel)
}

type ConnectOptions

type ConnectOptions struct {
	MaxQueuedConnects      int
	MaxOutstandingConnects int
	ConnectTimeoutMs       int
}

func DefaultConnectOptions

func DefaultConnectOptions() ConnectOptions

func (*ConnectOptions) ConnectTimeout

func (co *ConnectOptions) ConnectTimeout() time.Duration

func (*ConnectOptions) Validate

func (co *ConnectOptions) Validate() error

type ConnectionHandler

type ConnectionHandler interface {
	HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *Message) ([]byte, bool)

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error, ch Channel)
}

type Hello

type Hello struct {
	IdToken string
	Headers map[int32][]byte
}

func UnmarshalHello

func UnmarshalHello(message *Message) *Hello

type Identity

type Identity interface {
	// The TokenId used to represent the identity of this channel to lower-level resources.
	//
	Id() *identity.TokenId

	// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
	// by humans in understand the logical purpose of a channel.
	//
	LogicalName() string

	// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
	// Usually used by the Channel framework to differentiate Channel instances.
	//
	ConnectionId() string

	// Certificates contains the identity certificates provided by the peer.
	//
	Certificates() []*x509.Certificate

	// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
	//
	Label() string
}

type LatencyHandler

type LatencyHandler struct {
	// contains filtered or unexported fields
}

LatencyHandler responds to latency messages with Result messages.

func (*LatencyHandler) ContentType

func (h *LatencyHandler) ContentType() int32

func (*LatencyHandler) HandleReceive

func (h *LatencyHandler) HandleReceive(msg *Message, ch Channel)

type MemoryContext

type MemoryContext struct {
	// contains filtered or unexported fields
}

func NewMemoryContext

func NewMemoryContext() *MemoryContext

type Message

type Message struct {
	MessageHeader
	Body []byte
}

func NewHello

func NewHello(idToken string, attributes map[int32][]byte) *Message

func NewMessage

func NewMessage(contentType int32, body []byte) *Message

func NewResult

func NewResult(success bool, message string) *Message

func ReadWSMessage

func ReadWSMessage(peer io.Reader) (*Message, error)

func (*Message) ReplyTo

func (m *Message) ReplyTo(o *Message)

func (*Message) String

func (m *Message) String() string

type MessageHeader

type MessageHeader struct {
	ContentType int32

	Headers map[int32][]byte
	// contains filtered or unexported fields
}

func (*MessageHeader) GetBoolHeader

func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)

func (*MessageHeader) GetByteHeader

func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)

func (*MessageHeader) GetStringHeader

func (header *MessageHeader) GetStringHeader(key int32) (string, bool)

func (*MessageHeader) GetUint16Header

func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)

func (*MessageHeader) GetUint32Header

func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)

func (*MessageHeader) GetUint64Header

func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)

func (*MessageHeader) IsReply

func (header *MessageHeader) IsReply() bool

func (*MessageHeader) IsReplyingTo

func (header *MessageHeader) IsReplyingTo(sequence int32) bool

func (*MessageHeader) PutBoolHeader

func (header *MessageHeader) PutBoolHeader(key int32, value bool)

func (*MessageHeader) PutByteHeader

func (header *MessageHeader) PutByteHeader(key int32, value byte)

func (*MessageHeader) PutUint16Header

func (header *MessageHeader) PutUint16Header(key int32, value uint16)

func (*MessageHeader) PutUint32Header

func (header *MessageHeader) PutUint32Header(key int32, value uint32)

func (*MessageHeader) PutUint64Header

func (header *MessageHeader) PutUint64Header(key int32, value uint64)

func (*MessageHeader) ReplyFor

func (header *MessageHeader) ReplyFor() int32

func (*MessageHeader) Sequence

func (header *MessageHeader) Sequence() int32

type Options

type Options struct {
	OutQueueSize int
	BindHandlers []BindHandler
	PeekHandlers []PeekHandler
	ConnectOptions
	DelayRxStart bool
}

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data map[interface{}]interface{}) *Options

func (Options) String

func (o Options) String() string

type PeekHandler

type PeekHandler interface {
	Connect(ch Channel, remoteAddress string)
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
	Close(ch Channel)
}

type Priority

type Priority uint32

type ReceiveHandler

type ReceiveHandler interface {
	ContentType() int32
	HandleReceive(m *Message, ch Channel)
}

type Result

type Result struct {
	Success bool
	Message string
}

func UnmarshalResult

func UnmarshalResult(message *Message) *Result

type Sender

type Sender interface {
	Send(m *Message) error
	SendWithPriority(m *Message, p Priority) error
	SendAndSync(m *Message) (chan error, error)
	SendAndSyncWithPriority(m *Message, p Priority) (chan error, error)
	SendWithTimeout(m *Message, timeout time.Duration) error
	SendPrioritizedWithTimeout(m *Message, p Priority, timeout time.Duration) error
	SendAndWaitWithTimeout(m *Message, timeout time.Duration) (*Message, error)
	SendPrioritizedAndWaitWithTimeout(m *Message, p Priority, timeout time.Duration) (*Message, error)
	SendAndWait(m *Message) (chan *Message, error)
	SendAndWaitWithPriority(m *Message, p Priority) (chan *Message, error)
	SendForReply(msg TypedMessage, timeout time.Duration) (*Message, error)
	SendForReplyAndDecode(msg TypedMessage, timeout time.Duration, result TypedMessage) error
}

type TraceHandler

type TraceHandler struct {
	// contains filtered or unexported fields
}

func NewTraceHandler

func NewTraceHandler(path string, id *identity.TokenId) (*TraceHandler, error)

func (*TraceHandler) AddDecoder

func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)

func (TraceHandler) Close

func (h TraceHandler) Close(ch Channel)

func (*TraceHandler) Connect

func (h *TraceHandler) Connect(ch Channel, remoteAddress string)

func (TraceHandler) Rx

func (h TraceHandler) Rx(msg *Message, ch Channel)

func (TraceHandler) Tx

func (h TraceHandler) Tx(msg *Message, ch Channel)

type TraceMessageDecode

type TraceMessageDecode map[string]interface{}

func NewTraceMessageDecode

func NewTraceMessageDecode(decoder, message string) TraceMessageDecode

func (TraceMessageDecode) MarshalResult

func (d TraceMessageDecode) MarshalResult() ([]byte, bool)

func (TraceMessageDecode) MarshalTraceMessageDecode

func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)

type TraceMessageDecoder

type TraceMessageDecoder interface {
	Decode(msg *Message) ([]byte, bool)
}

type TransformHandler

type TransformHandler interface {
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
}

type TypedMessage

type TypedMessage interface {
	proto.Message
	GetContentType() int32
}

type Underlay

type Underlay interface {
	Rx() (*Message, error)
	Tx(m *Message) error
	Identity
	io.Closer
	IsClosed() bool
	Headers() map[int32][]byte
}

Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.

type UnderlayFactory

type UnderlayFactory interface {
	Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}

UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implements UnderlayFactory, to provide instances to Channel.

func NewClassicDialer

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewMemoryDialer

func NewMemoryDialer(identity *identity.TokenId, headers map[int32][]byte, ctx *MemoryContext) UnderlayFactory

func NewReconnectingDialer

func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialerWithHandler

func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

type UnderlayListener

type UnderlayListener interface {
	Listen(handlers ...ConnectionHandler) error
	UnderlayFactory
	io.Closer
}

UnderlayListener represents a component designed to listen for incoming peer connections on some abstracted underlay facility.

func NewClassicListener

func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, headers map[int32][]byte) UnderlayListener

func NewClassicListenerWithTransportConfiguration

func NewClassicListenerWithTransportConfiguration(identity *identity.TokenId, endpoint transport.Address, connectOptions ConnectOptions, tcfg transport.Configuration, headers map[int32][]byte) UnderlayListener

func NewMemoryListener

func NewMemoryListener(identity *identity.TokenId, ctx *MemoryContext) UnderlayListener

type UnsupportedVersionError

type UnsupportedVersionError struct {
	// contains filtered or unexported fields
}

func (UnsupportedVersionError) Error

func (u UnsupportedVersionError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL