interop

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package interop lets all the languages talk to each other.

Index

Constants

View Source
const (
	MessageIDHeader            = "Interop-Rpc-Id"
	MessageClassHeader         = "Interop-Rpc-Class"
	MessageErrorHeader         = "Interop-Error"
	MessageContentTypeHeader   = "Content-Type"
	MessageContentLengthHeader = "Content-Length"
)
View Source
const HeaderDelimiter = "\n"

Variables

View Source
var ContentTypeBinary = StdContentTypes.Register("application/octet-stream", encoding.Null)
View Source
var ContentTypeJSON = StdContentTypes.Register("application/json", encoding.JSON)

Functions

func NewReader

func NewReader(r io.Reader) *reader

func NewWriter

func NewWriter(w io.Writer) *writer

func ReadAllMessages

func ReadAllMessages(reader Reader, callback func(message Message) error) (err error)

func WriteHeaders

func WriteHeaders(headers []Header, w io.Writer) (n int64, err error)

func WriteMessage

func WriteMessage(message Message, w io.Writer) (n int64, err error)

Types

type Conn

type Conn interface {
	Reader
	Writer
}

func BuildConn

func BuildConn(reader io.Reader, writer io.Writer) Conn

func CombineReaderWriter

func CombineReaderWriter(reader Reader, writer Writer) Conn

func NewConn

func NewConn(conn io.ReadWriter) Conn

func RpcRequestSource added in v0.5.0

func RpcRequestSource(request Message) Conn

RpcRequestSource returns the Conn from which an RPC Message was received. If the Message did not come from an RpcServer, nil is returned.

func StdioConn

func StdioConn() Conn

func Wrap

func Wrap(conn Conn, readInterceptor, writeInterceptor Interceptor) Conn

Wrap wraps a Conn with optional read (inbound) and write (outbound) interceptors, returning a new Conn.

type ContentType added in v0.3.0

type ContentType struct {
	encoding.Marshaler
	Name string
}

func (*ContentType) Decode added in v0.3.0

func (c *ContentType) Decode(m Message) (interface{}, error)

func (*ContentType) DecodeTo added in v0.3.0

func (c *ContentType) DecodeTo(m Message, t interface{}) error

func (*ContentType) Encode added in v0.3.0

func (c *ContentType) Encode(value interface{}) (Message, error)

func (*ContentType) EncodeTo added in v0.3.0

func (c *ContentType) EncodeTo(builder *MessageBuilder, value interface{}) error

type ContentTypes added in v0.3.0

type ContentTypes []*ContentType
var StdContentTypes ContentTypes

func (ContentTypes) Decode added in v0.3.0

func (t ContentTypes) Decode(message Message) (result interface{}, err error)

func (ContentTypes) DecodeTo added in v0.3.0

func (t ContentTypes) DecodeTo(message Message, target interface{}) (err error)

func (ContentTypes) Encode added in v0.3.0

func (t ContentTypes) Encode(contentTypeName string, value interface{}) (Message, error)

func (ContentTypes) EncodeTo added in v0.3.0

func (t ContentTypes) EncodeTo(contentTypeName string, builder *MessageBuilder, value interface{}) error

func (ContentTypes) FindByName added in v0.3.0

func (t ContentTypes) FindByName(contentTypeName string) *ContentType

func (ContentTypes) Marshal added in v0.3.0

func (t ContentTypes) Marshal(contentTypeName string, value interface{}) (b []byte, err error)

func (*ContentTypes) Register added in v0.3.0

func (t *ContentTypes) Register(contentTypeName string, marshaller encoding.Marshaler) (contentType *ContentType)

func (ContentTypes) Unmarshal added in v0.3.0

func (t ContentTypes) Unmarshal(contentTypeName string, b []byte) (result interface{}, err error)

func (ContentTypes) UnmarshalTo added in v0.3.0

func (t ContentTypes) UnmarshalTo(contentTypeName string, b []byte, target interface{}) (err error)

type Error

type Error string
const (
	ErrAlreadyClosed    Error = "already closed"
	ErrEventHasID       Error = "event must not have an ID"
	ErrUnrecognisedType Error = "unrecognised content type"
)

func (Error) Error

func (e Error) Error() string

type EventDispatcher

type EventDispatcher struct {
	// contains filtered or unexported fields
}

func (*EventDispatcher) Dispatch

func (d *EventDispatcher) Dispatch(event Message) (err error)

func (*EventDispatcher) Handle

func (d *EventDispatcher) Handle(matcher Matcher, handler Handler)

func (*EventDispatcher) HandleClassName

func (d *EventDispatcher) HandleClassName(name string, handler Handler)

func (*EventDispatcher) HandleClassRegexp

func (d *EventDispatcher) HandleClassRegexp(pattern *regexp.Regexp, handler Handler)

type Handler

type Handler interface {
	Handle(event Message) error
}

type HandlerFunc

type HandlerFunc func(event Message) error

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(event Message) error
type Header interface {
	Name() string
	Value() string
}

func NewHeader

func NewHeader(name, value string) Header

func ParseHeader

func ParseHeader(headerLine []byte) (Header, error)

type Headers

type Headers []Header

func (*Headers) Add

func (h *Headers) Add(name, value string) *Headers

func (*Headers) Delete

func (h *Headers) Delete(name string) *Headers

func (Headers) Get

func (h Headers) Get(name string) string

func (Headers) GetAll

func (h Headers) GetAll(name string) (values []string)

func (*Headers) Set

func (h *Headers) Set(name, value string) *Headers

type Interceptor

type Interceptor func(message Message, writer Writer) error

An Interceptor receives a Message and a Writer, and should write a modified version of the message to the writer. Messages can be removed from a message bus by skipping Write() calls. Multiple writes within a single intercept are permitted. Writing the message once, without modification, to the writer, is equivalent to having no interceptor.

type Matcher

type Matcher interface {
	Match(message Message) bool
}

func MatchClassName

func MatchClassName(name string) Matcher

func MatchClassRegexp

func MatchClassRegexp(pattern *regexp.Regexp) Matcher

type MatcherFunc

type MatcherFunc func(message Message) bool

func (MatcherFunc) Match

func (f MatcherFunc) Match(message Message) bool

type Message

type Message interface {
	GetHeader(name string) string
	GetHeaders(name string) []string
	GetAllHeaders() []Header
	Body() []byte
}

type MessageBuilder

type MessageBuilder struct {
	// contains filtered or unexported fields
}

func DuplicateMessage

func DuplicateMessage(message Message) *MessageBuilder

func NewRpcMessage

func NewRpcMessage(class string) *MessageBuilder

func (*MessageBuilder) AddError

func (b *MessageBuilder) AddError(err error) *MessageBuilder

func (*MessageBuilder) AddHeader

func (b *MessageBuilder) AddHeader(name, value string) *MessageBuilder

func (*MessageBuilder) Body

func (m *MessageBuilder) Body() []byte

func (*MessageBuilder) GetAllHeaders

func (m *MessageBuilder) GetAllHeaders() []Header

func (*MessageBuilder) GetHeader

func (m *MessageBuilder) GetHeader(name string) string

func (*MessageBuilder) GetHeaders

func (m *MessageBuilder) GetHeaders(name string) []string

func (*MessageBuilder) SetBody

func (b *MessageBuilder) SetBody(body []byte) *MessageBuilder

func (*MessageBuilder) SetContent added in v0.3.0

func (b *MessageBuilder) SetContent(contentType *ContentType, content interface{}) error

func (*MessageBuilder) SetHeader

func (b *MessageBuilder) SetHeader(name, value string) *MessageBuilder

type MultiWriter added in v0.3.2

type MultiWriter struct {
	// contains filtered or unexported fields
}

func (*MultiWriter) Subscribe added in v0.3.2

func (m *MultiWriter) Subscribe(writer Writer) (unsubscribe func())

func (*MultiWriter) Write added in v0.3.2

func (m *MultiWriter) Write(message Message) error

type MultiWriterError added in v0.3.2

type MultiWriterError struct {
	Writer Writer
	Error  error
}

type MultiWriterErrors added in v0.3.2

type MultiWriterErrors []MultiWriterError

func (MultiWriterErrors) Error added in v0.3.2

func (m MultiWriterErrors) Error() string

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

func NewBufferedPipe

func NewBufferedPipe(bufferSize int) *Pipe

func NewPipe

func NewPipe() *Pipe

func (*Pipe) Close

func (p *Pipe) Close() error

func (*Pipe) CloseWithError

func (p *Pipe) CloseWithError(err error) error

func (*Pipe) Read

func (p *Pipe) Read() (message Message, err error)

func (*Pipe) Write

func (p *Pipe) Write(message Message) (err error)

type Reader

type Reader interface {
	Read() (message Message, err error)
}

func NewReadInterceptor

func NewReadInterceptor(reader Reader, interceptor Interceptor) Reader

type Responder

type Responder interface {
	Respond(ctx context.Context, request Message, response *MessageBuilder)
}

type ResponderContextFunc added in v0.5.0

type ResponderContextFunc func(ctx context.Context, request Message, response *MessageBuilder)

func (ResponderContextFunc) Respond added in v0.5.0

func (f ResponderContextFunc) Respond(ctx context.Context, request Message, response *MessageBuilder)

type ResponderFunc

type ResponderFunc func(request Message, response *MessageBuilder)

func (ResponderFunc) Respond

func (f ResponderFunc) Respond(_ context.Context, request Message, response *MessageBuilder)

type RpcClient

type RpcClient struct {
	Events   *EventDispatcher
	IDPrefix string
	Timeout  time.Duration
	// contains filtered or unexported fields
}

func NewRpcClient

func NewRpcClient(conn Conn) (client *RpcClient)

func (*RpcClient) Call

func (c *RpcClient) Call(class string) (response Message, err error)

func (*RpcClient) CallContext

func (c *RpcClient) CallContext(ctx context.Context, class string) (response Message, err error)

func (*RpcClient) CallWithContent added in v0.3.0

func (c *RpcClient) CallWithContent(class string, contentType *ContentType, content interface{}) (response Message, err error)

func (*RpcClient) CallWithContentContext added in v0.3.0

func (c *RpcClient) CallWithContentContext(ctx context.Context, class string, contentType *ContentType, content interface{}) (response Message, err error)

func (*RpcClient) Run

func (c *RpcClient) Run() (err error)

func (*RpcClient) Send

func (c *RpcClient) Send(request Message) (response Message, err error)

func (*RpcClient) SendContext

func (c *RpcClient) SendContext(ctx context.Context, request Message) (response Message, err error)

func (*RpcClient) Start

func (b *RpcClient) Start()

Start calls the receiver's Run command in a new goroutine. Wait will block and return its error when it finishes.

func (*RpcClient) Wait

func (b *RpcClient) Wait() error

Wait for the process to finish running in the background, returning its error.

type RpcDispatcher

type RpcDispatcher struct {
	// contains filtered or unexported fields
}

func (*RpcDispatcher) Handle

func (d *RpcDispatcher) Handle(matcher Matcher, responder Responder)

func (*RpcDispatcher) HandleClassName

func (d *RpcDispatcher) HandleClassName(name string, responder Responder)

func (*RpcDispatcher) HandleClassRegexp

func (d *RpcDispatcher) HandleClassRegexp(pattern *regexp.Regexp, responder Responder)

func (*RpcDispatcher) Respond added in v0.3.2

func (d *RpcDispatcher) Respond(ctx context.Context, request Message, response *MessageBuilder)

type RpcMultiServer added in v0.3.2

type RpcMultiServer struct {
	RpcDispatcher
	// contains filtered or unexported fields
}

func (*RpcMultiServer) Run added in v0.3.2

func (s *RpcMultiServer) Run(conn Conn) (err error)

func (*RpcMultiServer) Send added in v0.3.2

func (s *RpcMultiServer) Send(event Message) error

func (*RpcMultiServer) Wait added in v0.5.0

func (s *RpcMultiServer) Wait()

Wait blocks while connections are open and readable. Use WaitClean to also wait for all requests to complete.

func (*RpcMultiServer) WaitClean added in v0.5.0

func (s *RpcMultiServer) WaitClean()

WaitClean blocks while connections are open, or while RPC requests are running.

func (*RpcMultiServer) WaitCleanContext added in v0.5.0

func (s *RpcMultiServer) WaitCleanContext(ctx context.Context) error

func (*RpcMultiServer) WaitContext added in v0.5.0

func (s *RpcMultiServer) WaitContext(ctx context.Context) error

type RpcServer

type RpcServer struct {
	RpcDispatcher
	// contains filtered or unexported fields
}

func NewRpcServer

func NewRpcServer(conn Conn) (server *RpcServer)

func (*RpcServer) Run

func (s *RpcServer) Run() (err error)

func (*RpcServer) Send

func (s *RpcServer) Send(event Message) error

func (*RpcServer) Start

func (b *RpcServer) Start()

Start calls the receiver's Run command in a new goroutine. Wait will block and return its error when it finishes.

func (*RpcServer) Wait

func (b *RpcServer) Wait() error

Wait for the process to finish running in the background, returning its error.

func (*RpcServer) WaitClean added in v0.5.0

func (s *RpcServer) WaitClean()

WaitClean blocks while RPC requests are running, even after Run has returned.

func (*RpcServer) WaitCleanContext added in v0.5.0

func (s *RpcServer) WaitCleanContext(ctx context.Context) error

type Writer

type Writer interface {
	Write(message Message) (err error)
}

func NewWriteInterceptor

func NewWriteInterceptor(writer Writer, interceptor Interceptor) Writer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL