channel

package module
v2.0.128 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 27 Imported by: 106

README

Channel

channel is a binary message framework. It includes the following features:

  • Supports registering message type handlers
  • Supports request/response pattern as well as unidirectional message streams
  • Allows plugging observers to enable metrics and other use cases
  • Async or sync patterns
  • Protobufs supported but not required

Documentation

Overview

Example
addr, err := tcp.AddressParser{}.Parse("tcp:localhost:6565")
if err != nil {
	panic(err)
}
dialId := &identity.TokenId{Token: "echo-client"}
underlayFactory := channel.NewClassicDialer(dialId, addr, nil)

ch, err := channel.NewChannel("echo-test", underlayFactory, nil, nil)
if err != nil {
	panic(err)
}

helloMessageType := int32(256)
msg := channel.NewMessage(helloMessageType, []byte("hello!"))

// Can send the message on the channel. The call will return once the message is queued
if err := ch.Send(msg); err != nil {
	panic(err)
}

// Can also have the message send itself on the channel
if err := msg.Send(ch); err != nil {
	panic(err)
}

// Can set a priority on the message before sending. This will only affect the order in the send queue
if err := msg.WithPriority(channel.High).Send(ch); err != nil {
	panic(err)
}

// Can set a timeout before sending. If the message can't be queued before the timeout, an error will be returned
// If the timeout expires before the message can be sent, the message won't be sent
if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
	panic(err)
}

// Can set a timeout before sending and wait for the message to be written to the wire. If the timeout expires
// before the message is sent, the message won't be sent and a TimeoutError will be returned
if err := msg.WithTimeout(time.Second).SendAndWaitForWire(ch); err != nil {
	panic(err)
}

// Can set a timeout before sending and waiting for a reply message. If the timeout expires before the message is
// sent, the message won't be sent and a TimeoutError will be returned. If the timeout expires before the reply
// arrives a TimeoutError will be returned.
reply, err := msg.WithTimeout(time.Second).SendForReply(ch)
if err != nil {
	panic(err)
}
fmt.Println(string(reply.Body))
Output:

Index

Examples

Constants

View Source
const (
	DefaultOutstandingConnects = 16
	DefaultQueuedConnects      = 1
	DefaultConnectTimeout      = 5 * time.Second

	MinQueuedConnects      = 1
	MinOutstandingConnects = 1
	MinConnectTimeout      = 30 * time.Millisecond

	MaxQueuedConnects      = 5000
	MaxOutstandingConnects = 1000
	MaxConnectTimeout      = 60000 * time.Millisecond

	DefaultOutQueueSize = 4
)
View Source
const (
	DefaultHeartbeatSendInterval  = 10 * time.Second
	DefaultHeartbeatCheckInterval = time.Second
	DefaultHeartbeatTimeout       = 30 * time.Second
)
View Source
const (
	ConnectionIdHeader              = 0
	ReplyForHeader                  = 1
	ResultSuccessHeader             = 2
	HelloRouterAdvertisementsHeader = 3
	HelloVersionHeader              = 4
	HeartbeatHeader                 = 5
	HeartbeatResponseHeader         = 6
	TypeHeader                      = 7
	IdHeader                        = 8

	// Headers in the range 128-255 inclusive will be reflected when creating replies
	ReflectedHeaderBitMask = 1 << 7
	MaxReflectedHeader     = (1 << 8) - 1
)

*

  • Message headers notes
  • 0-127 reserved for channel
  • 128-255 reserved for headers that need to be reflected back to sender on responses
  • 128 is used for a message UUID for tracing
  • 1000-1099 reserved for edge messages
  • 1100-1199 is reserved for control plane messages
  • 2000-2500 is reserved for xgress messages
  • 2000-2255 is reserved for xgress implementation headers
View Source
const (
	ContentTypeHelloType           = 0
	ContentTypePingType            = 1
	ContentTypeResultType          = 2
	ContentTypeLatencyType         = 3
	ContentTypeLatencyResponseType = 4
	ContentTypeHeartbeat           = 5
)
View Source
const (
	Highest  = 0
	High     = 1024
	Standard = 4096
	Low      = 10240
)
View Source
const AnyContentType = -1
View Source
const BadMagicNumberError = stringError("protocol error: invalid header")
View Source
const DECODER = "channel"
View Source
const DecoderFieldName = "__decoder__"
View Source
const HelloSequence = -1
View Source
const MessageFieldName = "__message__"

Variables

View Source
var ListenerClosedError = listenerClosedError{}

Functions

func AcceptNextChannel

func AcceptNextChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) error

func ConfigureHeartbeat

func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, cb HeartbeatCallback)

ConfigureHeartbeat setups up heartbeats on the given channel. It assumes that an equivalent setup happens on the other side of the channel.

When possible, heartbeats will be sent on existing traffic. When a heartbeat is due to be sent, the next message sent will include a heartbeat header. If no message is sent by the time the checker runs on checkInterval, a standalone heartbeat message will be sent.

Similarly, when a message with a heartbeat header is received, the next sent message will have a header set with the heartbeat response. If no message is sent within a few milliseconds, a standalone heartbeat response will be sent

func DecodeString added in v2.0.118

func DecodeString(t string, b []byte) ([]byte, string, error)

func DecodeStringSlice added in v2.0.118

func DecodeStringSlice(b []byte) ([]string, error)

func DecodeStringToStringMap added in v2.0.118

func DecodeStringToStringMap(b []byte) (map[string]string, error)

func DecodeU32ToBytesMap added in v2.0.118

func DecodeU32ToBytesMap(b []byte) (map[uint32][]byte, error)

func EncodeStringSlice added in v2.0.118

func EncodeStringSlice(strSlice []string) []byte

func EncodeStringToStringMap added in v2.0.118

func EncodeStringToStringMap(m map[string]string) []byte

func EncodeU32ToBytesMap added in v2.0.118

func EncodeU32ToBytesMap(m map[uint32][]byte) []byte

func GetRetryVersion

func GetRetryVersion(err error) (uint32, bool)

func IsTimeout

func IsTimeout(err error) bool

func MarshalV2

func MarshalV2(m *Message) ([]byte, error)

MarshalV2 converts a *Message into a block of V2 wire format data.

func NewClassicListenerF added in v2.0.112

func NewClassicListenerF(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, f func(underlay Underlay)) (io.Closer, error)

func NewErrorContext

func NewErrorContext(err error) context.Context

func NewWSListener

func NewWSListener(peer transport.Conn) *wsListener

func NextConnectionId

func NextConnectionId() (string, error)

func WriteUnknownVersionResponse

func WriteUnknownVersionResponse(writer io.Writer)

Types

type AsyncFunctionReceiveAdapter

type AsyncFunctionReceiveAdapter struct {
	Type    int32
	Handler ReceiveHandlerF
}

func (*AsyncFunctionReceiveAdapter) ContentType

func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32

func (*AsyncFunctionReceiveAdapter) HandleReceive

func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)

type BaseSendListener

type BaseSendListener struct{}

BaseSendListener is a type that may be used to provide default methods for SendListener implementation

func (BaseSendListener) NotifyAfterWrite

func (BaseSendListener) NotifyAfterWrite()

func (BaseSendListener) NotifyBeforeWrite

func (BaseSendListener) NotifyBeforeWrite()

func (BaseSendListener) NotifyErr

func (BaseSendListener) NotifyErr(error)

func (BaseSendListener) NotifyQueued

func (BaseSendListener) NotifyQueued()

type BaseSendable

type BaseSendable struct{}

BaseSendable is a type that may be used to provide default methods for Sendable implementation

func (BaseSendable) Context

func (BaseSendable) Context() context.Context

func (BaseSendable) Msg

func (BaseSendable) Msg() *Message

func (BaseSendable) Priority

func (BaseSendable) Priority() Priority

func (BaseSendable) ReplyReceiver

func (BaseSendable) ReplyReceiver() ReplyReceiver

func (BaseSendable) SendListener

func (BaseSendable) SendListener() SendListener

type BindHandler

type BindHandler interface {
	BindChannel(binding Binding) error
}

func BindHandlers added in v2.0.45

func BindHandlers(handlers ...BindHandler) BindHandler

BindHandlers takes the given handlers and returns a BindHandler which runs the handlers one at a time, returning an error as soon as an error is encountered, or nil, if no errors are encountered.

type BindHandlerF

type BindHandlerF func(binding Binding) error

func (BindHandlerF) BindChannel

func (f BindHandlerF) BindChannel(binding Binding) error

type Binding

type Binding interface {
	Bind(h BindHandler) error
	AddPeekHandler(h PeekHandler)
	AddTransformHandler(h TransformHandler)
	AddReceiveHandler(contentType int32, h ReceiveHandler)
	AddReceiveHandlerF(contentType int32, h ReceiveHandlerF)
	AddTypedReceiveHandler(h TypedReceiveHandler)
	AddErrorHandler(h ErrorHandler)
	AddCloseHandler(h CloseHandler)
	SetUserData(data interface{})
	GetUserData() interface{}
	GetChannel() Channel
}

Binding is used to add handlers to Channel.

NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. The Binding should not be retained once the channel setup is complete

type Channel

type Channel interface {
	Identity
	SetLogicalName(logicalName string)
	Sender
	io.Closer
	IsClosed() bool
	Underlay() Underlay
	StartRx()
	GetTimeSinceLastRead() time.Duration
}

Channel represents an asynchronous, message-passing framework, designed to sit on top of an underlay.

func NewChannel

func NewChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options) (Channel, error)

func NewChannelWithTransportConfiguration

func NewChannelWithTransportConfiguration(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options, tcfg transport.Configuration) (Channel, error)

func NewChannelWithUnderlay

func NewChannelWithUnderlay(logicalName string, underlay Underlay, bindHandler BindHandler, options *Options) (Channel, error)

type CloseHandler

type CloseHandler interface {
	HandleClose(ch Channel)
}

type CloseHandlerF

type CloseHandlerF func(ch Channel)

func (CloseHandlerF) HandleClose

func (self CloseHandlerF) HandleClose(ch Channel)

type ClosedError

type ClosedError struct{}

func (ClosedError) Error

func (ClosedError) Error() string

type ConnectOptions

type ConnectOptions struct {
	MaxQueuedConnects      int
	MaxOutstandingConnects int
	ConnectTimeout         time.Duration
}

func DefaultConnectOptions

func DefaultConnectOptions() ConnectOptions

func (*ConnectOptions) Validate

func (co *ConnectOptions) Validate() error

type ConnectionHandler

type ConnectionHandler interface {
	HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}

type Decoder

type Decoder struct{}

func (Decoder) Decode

func (d Decoder) Decode(msg *Message) ([]byte, bool)

type Envelope

type Envelope interface {
	// WithPriority returns an Envelope with the given priority
	WithPriority(p Priority) Envelope

	// WithTimeout returns a TimeoutEnvelope with a context using the given timeout
	WithTimeout(duration time.Duration) TimeoutEnvelope

	// Send sends the envelope on the given Channel
	Send(ch Channel) error

	// ReplyTo allows setting the reply header in a fluent style
	ReplyTo(msg *Message) Envelope

	// ToSendable converts the Envelope into a Sendable, which can be submitted to a Channel for sending
	ToSendable() Sendable
}

Envelope allows setting message priority and context. Message is an Envelope (as well as a Sendable)

func NewErrorEnvelope

func NewErrorEnvelope(err error) Envelope

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error, ch Channel)
}

type ErrorHandlerF

type ErrorHandlerF func(err error, ch Channel)

func (ErrorHandlerF) HandleError

func (self ErrorHandlerF) HandleError(err error, ch Channel)

type Headers

type Headers map[int32][]byte

func (Headers) GetBoolHeader

func (self Headers) GetBoolHeader(key int32) (bool, bool)

func (Headers) GetByteHeader

func (self Headers) GetByteHeader(key int32) (byte, bool)

func (Headers) GetStringHeader

func (self Headers) GetStringHeader(key int32) (string, bool)

func (Headers) GetStringSliceHeader added in v2.0.118

func (self Headers) GetStringSliceHeader(key int32) ([]string, bool, error)

func (Headers) GetStringToStringMapHeader added in v2.0.118

func (self Headers) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)

func (Headers) GetU32ToBytesMapHeader added in v2.0.118

func (self Headers) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)

func (Headers) GetUint16Header

func (self Headers) GetUint16Header(key int32) (uint16, bool)

func (Headers) GetUint32Header

func (self Headers) GetUint32Header(key int32) (uint32, bool)

func (Headers) GetUint64Header

func (self Headers) GetUint64Header(key int32) (uint64, bool)

func (Headers) PutBoolHeader

func (self Headers) PutBoolHeader(key int32, value bool)

func (Headers) PutByteHeader

func (self Headers) PutByteHeader(key int32, value byte)

func (Headers) PutStringHeader

func (self Headers) PutStringHeader(key int32, value string)

func (Headers) PutStringSliceHeader added in v2.0.118

func (self Headers) PutStringSliceHeader(key int32, s []string)

func (Headers) PutStringToStringMapHeader added in v2.0.118

func (self Headers) PutStringToStringMapHeader(key int32, m map[string]string)

func (Headers) PutU32ToBytesMapHeader added in v2.0.118

func (self Headers) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)

func (Headers) PutUint16Header

func (self Headers) PutUint16Header(key int32, value uint16)

func (Headers) PutUint32Header

func (self Headers) PutUint32Header(key int32, value uint32)

func (Headers) PutUint64Header

func (self Headers) PutUint64Header(key int32, value uint64)

type HeartbeatCallback

type HeartbeatCallback interface {
	HeartbeatTx(ts int64)
	HeartbeatRx(ts int64)
	HeartbeatRespTx(ts int64)
	HeartbeatRespRx(ts int64)
	CheckHeartBeat()
}

HeartbeatCallback provide an interface that is notified when various heartbeat events take place

type HeartbeatOptions added in v2.0.35

type HeartbeatOptions struct {
	SendInterval             time.Duration `json:"sendInterval"`
	CheckInterval            time.Duration `json:"checkInterval"`
	CloseUnresponsiveTimeout time.Duration `json:"closeUnresponsiveTimeout"`
	// contains filtered or unexported fields
}

func DefaultHeartbeatOptions added in v2.0.35

func DefaultHeartbeatOptions() *HeartbeatOptions

func LoadHeartbeatOptions added in v2.0.35

func LoadHeartbeatOptions(data map[interface{}]interface{}) (*HeartbeatOptions, error)

func (*HeartbeatOptions) GetDuration added in v2.0.35

func (self *HeartbeatOptions) GetDuration(name string) (*time.Duration, error)

type Hello

type Hello struct {
	IdToken string
	Headers map[int32][]byte
}

func UnmarshalHello

func UnmarshalHello(message *Message) *Hello

type Identity

type Identity interface {
	// The Id used to represent the identity of this channel to lower-level resources.
	//
	Id() string

	// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
	// by humans in understand the logical purpose of a channel.
	//
	LogicalName() string

	// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
	// Usually used by the Channel framework to differentiate Channel instances.
	//
	ConnectionId() string

	// Certificates contains the identity certificates provided by the peer.
	//
	Certificates() []*x509.Certificate

	// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
	//
	Label() string
}

type ListenerConfig

type ListenerConfig struct {
	ConnectOptions
	Headers            map[int32][]byte
	TransportConfig    transport.Configuration
	PoolConfigurator   func(config *goroutines.PoolConfig)
	ConnectionHandlers []ConnectionHandler
}

func DefaultListenerConfig

func DefaultListenerConfig() ListenerConfig

type Message

type Message struct {
	MessageHeader
	Body []byte
}

func NewHello

func NewHello(idToken string, attributes map[int32][]byte) *Message

func NewMessage

func NewMessage(contentType int32, body []byte) *Message

func NewResult

func NewResult(success bool, message string) *Message

func ReadV2

func ReadV2(peer io.Reader) (*Message, error)

ReadV2 reads a V2 message from the given reader and returns the unmarshalled message

func (*Message) Context

func (m *Message) Context() context.Context

func (*Message) Msg

func (m *Message) Msg() *Message

func (*Message) Priority

func (m *Message) Priority() Priority

func (*Message) ReplyReceiver

func (m *Message) ReplyReceiver() ReplyReceiver

func (*Message) ReplyTo

func (m *Message) ReplyTo(o *Message) Envelope

func (*Message) Send

func (m *Message) Send(ch Channel) error

func (*Message) SendListener

func (m *Message) SendListener() SendListener

func (*Message) SetSequence

func (m *Message) SetSequence(seq int32)

func (*Message) String

func (m *Message) String() string

func (*Message) ToSendable

func (m *Message) ToSendable() Sendable

func (*Message) WithPriority

func (m *Message) WithPriority(p Priority) Envelope

func (*Message) WithTimeout

func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope

type MessageHeader

type MessageHeader struct {
	ContentType int32

	Headers Headers
	// contains filtered or unexported fields
}

func (*MessageHeader) GetBoolHeader

func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)

func (*MessageHeader) GetByteHeader

func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)

func (*MessageHeader) GetStringHeader

func (header *MessageHeader) GetStringHeader(key int32) (string, bool)

func (*MessageHeader) GetStringSliceHeader added in v2.0.118

func (header *MessageHeader) GetStringSliceHeader(key int32) ([]string, bool, error)

func (*MessageHeader) GetStringToStringMapHeader added in v2.0.118

func (header *MessageHeader) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)

func (*MessageHeader) GetU32ToBytesMapHeader added in v2.0.118

func (header *MessageHeader) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)

func (*MessageHeader) GetUint16Header

func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)

func (*MessageHeader) GetUint32Header

func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)

func (*MessageHeader) GetUint64Header

func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)

func (*MessageHeader) IsReply

func (header *MessageHeader) IsReply() bool

func (*MessageHeader) IsReplyingTo

func (header *MessageHeader) IsReplyingTo(sequence int32) bool

func (*MessageHeader) PutBoolHeader

func (header *MessageHeader) PutBoolHeader(key int32, value bool)

func (*MessageHeader) PutByteHeader

func (header *MessageHeader) PutByteHeader(key int32, value byte)

func (*MessageHeader) PutStringHeader

func (header *MessageHeader) PutStringHeader(key int32, value string)

func (*MessageHeader) PutStringSliceHeader added in v2.0.118

func (header *MessageHeader) PutStringSliceHeader(key int32, s []string)

func (*MessageHeader) PutStringToStringMapHeader added in v2.0.118

func (header *MessageHeader) PutStringToStringMapHeader(key int32, m map[string]string)

func (*MessageHeader) PutU32ToBytesMapHeader added in v2.0.118

func (header *MessageHeader) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)

func (*MessageHeader) PutUint16Header

func (header *MessageHeader) PutUint16Header(key int32, value uint16)

func (*MessageHeader) PutUint32Header

func (header *MessageHeader) PutUint32Header(key int32, value uint32)

func (*MessageHeader) PutUint64Header

func (header *MessageHeader) PutUint64Header(key int32, value uint64)

func (*MessageHeader) ReplyFor

func (header *MessageHeader) ReplyFor() int32

func (*MessageHeader) Sequence

func (header *MessageHeader) Sequence() int32

type Options

type Options struct {
	OutQueueSize int
	ConnectOptions
	DelayRxStart bool
	WriteTimeout time.Duration
}

func DefaultOptions

func DefaultOptions() *Options

func LoadOptions

func LoadOptions(data map[interface{}]interface{}) (*Options, error)

func (Options) String

func (o Options) String() string

type PeekHandler

type PeekHandler interface {
	Connect(ch Channel, remoteAddress string)
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
	Close(ch Channel)
}

type Priority

type Priority uint32

type ReceiveHandler

type ReceiveHandler interface {
	HandleReceive(m *Message, ch Channel)
}

type ReceiveHandlerF

type ReceiveHandlerF func(m *Message, ch Channel)

func (ReceiveHandlerF) HandleReceive

func (self ReceiveHandlerF) HandleReceive(m *Message, ch Channel)

type ReplyReceiver

type ReplyReceiver interface {
	AcceptReply(*Message)
}

ReplyReceiver is used to get notified when a Message reply arrives

type Result

type Result struct {
	Success bool
	Message string
}

func UnmarshalResult

func UnmarshalResult(message *Message) *Result

type SendListener

type SendListener interface {
	// Notify Queued is called when the message has been queued for send
	NotifyQueued()
	// NotifyBeforeWrite is called before send is called
	NotifyBeforeWrite()
	// NotifyAfterWrite is called after the message has been written to the Underlay
	NotifyAfterWrite()
	// NotifyErr is called if the Sendable context errors before send or if writing to the Underlay fails
	NotifyErr(error)
}

SendListener is notified at the various stages of a message send

type Sendable

type Sendable interface {
	// Msg return the Message to send
	Msg() *Message

	// SetSequence sets a sequence number indicating in which order the message was received
	SetSequence(seq int32)

	// Sequence returns the sequence number
	Sequence() int32

	// Priority returns the Priority of the Message
	Priority() Priority

	// Context returns the Context used for timeouts/cancelling message sends, etc
	Context() context.Context

	// SendListener returns the SendListener to invoke at each stage of the send operation
	SendListener() SendListener

	// ReplyReceiver returns the ReplyReceiver to be invoked when a reply for the message or received, or nil if
	// no ReplyReceiver should be invoked if or when a reply is received
	ReplyReceiver() ReplyReceiver
}

Sendable encapsulates all the data and callbacks that a Channel requires when sending a Message.

type Sender

type Sender interface {
	// Send will send the given Sendable. If the Sender is busy, it will wait until either the Sender
	// can process the Sendable, the channel is closed or the associated context.Context times out
	Send(s Sendable) error

	// TrySend will send the given Sendable. If the Sender is busy (outgoing message queue is full), it will return
	// immediately rather than wait. The boolean return indicates whether the message was queued or not
	TrySend(s Sendable) (bool, error)
}

type TimeoutEnvelope

type TimeoutEnvelope interface {
	Envelope

	// SendAndWaitForWire will wait until the configured timeout or until the message is sent, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendAndWaitForWire(ch Channel) error

	// SendForReply will wait until the configured timeout or until a reply is received, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendForReply(ch Channel) (*Message, error)
}

TimeoutEnvelope has timeout related convenience methods, such as waiting for a Message to be written to the wire or waiting for a Message reply

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

TimeoutError is used to indicate a timeout happened

func (TimeoutError) Unwrap

func (self TimeoutError) Unwrap() error

type TraceHandler

type TraceHandler struct {
	// contains filtered or unexported fields
}

func NewTraceHandler

func NewTraceHandler(path string, id string) (*TraceHandler, error)

func (*TraceHandler) AddDecoder

func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)

func (TraceHandler) Close

func (h TraceHandler) Close(ch Channel)

func (*TraceHandler) Connect

func (h *TraceHandler) Connect(ch Channel, remoteAddress string)

func (TraceHandler) Rx

func (h TraceHandler) Rx(msg *Message, ch Channel)

func (TraceHandler) Tx

func (h TraceHandler) Tx(msg *Message, ch Channel)

type TraceMessageDecode

type TraceMessageDecode map[string]interface{}

func NewTraceMessageDecode

func NewTraceMessageDecode(decoder, message string) TraceMessageDecode

func (TraceMessageDecode) MarshalResult

func (d TraceMessageDecode) MarshalResult() ([]byte, bool)

func (TraceMessageDecode) MarshalTraceMessageDecode

func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)

type TraceMessageDecoder

type TraceMessageDecoder interface {
	Decode(msg *Message) ([]byte, bool)
}

type TransformHandler

type TransformHandler interface {
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
}

type TypedReceiveHandler

type TypedReceiveHandler interface {
	ContentType() int32
	ReceiveHandler
}

type Underlay

type Underlay interface {
	Rx() (*Message, error)
	Tx(m *Message) error
	Identity
	io.Closer
	IsClosed() bool
	Headers() map[int32][]byte
	SetWriteTimeout(duration time.Duration) error
	SetWriteDeadline(time time.Time) error
	GetLocalAddr() net.Addr
	GetRemoteAddr() net.Addr
}

Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.

type UnderlayAcceptor

type UnderlayAcceptor interface {
	AcceptUnderlay(u Underlay) error
}

An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use. It can be used when handling multiple channel types on a single listener

type UnderlayDispatcher

type UnderlayDispatcher struct {
	// contains filtered or unexported fields
}

An UnderlayDispatcher accept underlays from an underlay listener and hands them off to UnderlayAcceptor instances, based on the TypeHeader.

func NewUnderlayDispatcher

func NewUnderlayDispatcher(config UnderlayDispatcherConfig) *UnderlayDispatcher

func (*UnderlayDispatcher) Run

func (self *UnderlayDispatcher) Run()

type UnderlayDispatcherConfig

type UnderlayDispatcherConfig struct {
	Listener        UnderlayListener
	ConnectTimeout  time.Duration
	TransportConfig transport.Configuration
	Acceptors       map[string]UnderlayAcceptor
	DefaultAcceptor UnderlayAcceptor
}

type UnderlayFactory

type UnderlayFactory interface {
	Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
}

UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.

func NewClassicDialer

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewClassicDialerWithBindAddress

func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte) UnderlayFactory

func NewExistingConnDialer

func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

func NewExistingConnListener

func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialer

func NewReconnectingDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory

func NewReconnectingDialerWithHandler

func NewReconnectingDialerWithHandler(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

func NewReconnectingDialerWithHandlerAndLocalBinding

func NewReconnectingDialerWithHandlerAndLocalBinding(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte, reconnectHandler func()) UnderlayFactory

type UnderlayListener

type UnderlayListener interface {
	Listen(handlers ...ConnectionHandler) error
	UnderlayFactory
	io.Closer
}

UnderlayListener represents a component designed to listen for incoming peer connections.

func NewClassicListener

func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig) UnderlayListener

type UnsupportedVersionError

type UnsupportedVersionError struct {
	// contains filtered or unexported fields
}

func (UnsupportedVersionError) Error

func (u UnsupportedVersionError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL