rpc

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2023 License: MIT Imports: 14 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ProcedureTypeUnknown = ProcedureType{"unknown"} // some procedures don't have a type
	ProcedureTypeSource  = ProcedureType{"source"}
	ProcedureTypeDuplex  = ProcedureType{"duplex"}
	ProcedureTypeAsync   = ProcedureType{"async"}
)
View Source
var (
	// ErrRemoteEnd signals that the remote closed the stream but didn't signal
	// that an error occurred.
	ErrRemoteEnd = errors.New("remote end")
)

Functions

func GetRemoteIdentityFromContext

func GetRemoteIdentityFromContext(ctx context.Context) (identity.Public, bool)

func MarshalRequestBody

func MarshalRequestBody(req *Request) ([]byte, error)

func MustMarshalRequestBody

func MustMarshalRequestBody(req *Request) []byte

func NewRemoteError

func NewRemoteError(response []byte) error

func PutConnectionIdInContext

func PutConnectionIdInContext(ctx context.Context, id ConnectionId) context.Context

func PutRemoteIdentityInContext

func PutRemoteIdentityInContext(ctx context.Context, remoteIdentity identity.Public) context.Context

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(
	id ConnectionId,
	wasInitiatedByRemote bool,
	raw RawConnection,
	handler RequestHandler,
	logger logging.Logger,
) (*Connection, error)

NewConnection is the only way of creating a new Connection, zero value is invalid. Terminating the provided context is equivalent to calling Close. The provided context is used as a base context for the contexts passed to the request handler. Connection takes over managing RawConnection which must not be used further.

func (*Connection) Close

func (c *Connection) Close() error

Close always returns nil. In theory shutting down a Secure Scuttlebutt RPC connection can result in an error as a goodbye message for the entire connection has to be sent successfully to the other side but those errors are not made available as it is unclear what to do with them.

func (*Connection) Loop

func (c *Connection) Loop(ctx context.Context) error

func (*Connection) PerformRequest

func (c *Connection) PerformRequest(ctx context.Context, req *Request) (ResponseStream, error)

func (*Connection) String

func (c *Connection) String() string

func (*Connection) WasInitiatedByRemote

func (c *Connection) WasInitiatedByRemote() bool

type ConnectionId

type ConnectionId struct {
	// contains filtered or unexported fields
}

func GetConnectionIdFromContext

func GetConnectionIdFromContext(ctx context.Context) (ConnectionId, bool)

func NewConnectionId

func NewConnectionId(id int) ConnectionId

func (ConnectionId) String

func (c ConnectionId) String() string

type ConnectionIdGenerator

type ConnectionIdGenerator struct {
	// contains filtered or unexported fields
}

func NewConnectionIdGenerator

func NewConnectionIdGenerator() *ConnectionIdGenerator

func (*ConnectionIdGenerator) Generate

func (v *ConnectionIdGenerator) Generate() ConnectionId

type IncomingMessage

type IncomingMessage struct {
	Body []byte
}

type MessageSender

type MessageSender interface {
	Send(msg *transport.Message) error
}

type Procedure

type Procedure struct {
	// contains filtered or unexported fields
}

func MustNewProcedure

func MustNewProcedure(name ProcedureName, typ ProcedureType) Procedure

func NewProcedure

func NewProcedure(name ProcedureName, typ ProcedureType) (Procedure, error)

func (Procedure) Name

func (p Procedure) Name() ProcedureName

func (Procedure) Typ

func (p Procedure) Typ() ProcedureType

type ProcedureName

type ProcedureName struct {
	// contains filtered or unexported fields
}

func MustNewProcedureName

func MustNewProcedureName(name []string) ProcedureName

func NewProcedureName

func NewProcedureName(name []string) (ProcedureName, error)

func (ProcedureName) Components

func (n ProcedureName) Components() []string

func (ProcedureName) Equal

func (n ProcedureName) Equal(o ProcedureName) bool

func (ProcedureName) IsZero

func (n ProcedureName) IsZero() bool

func (ProcedureName) String

func (n ProcedureName) String() string

type ProcedureType

type ProcedureType struct {
	// contains filtered or unexported fields
}

func (ProcedureType) IsZero

func (t ProcedureType) IsZero() bool

type RawConnection

type RawConnection interface {
	Next() (*transport.Message, error)
	Send(msg *transport.Message) error
	Close() error
}

type RemoteError

type RemoteError struct {
	// contains filtered or unexported fields
}

func (RemoteError) As

func (e RemoteError) As(target interface{}) bool

func (RemoteError) Error

func (e RemoteError) Error() string

func (RemoteError) Is

func (e RemoteError) Is(target error) bool

func (RemoteError) Response

func (e RemoteError) Response() []byte

type Request

type Request struct {
	// contains filtered or unexported fields
}

func MustNewRequest

func MustNewRequest(name ProcedureName, typ ProcedureType, arguments json.RawMessage) *Request

func NewRequest

func NewRequest(name ProcedureName, typ ProcedureType, arguments json.RawMessage) (*Request, error)

func (Request) Arguments

func (r Request) Arguments() json.RawMessage

func (Request) Name

func (r Request) Name() ProcedureName

func (Request) Type

func (r Request) Type() ProcedureType

type RequestBody

type RequestBody struct {
	Name []string        `json:"name"`
	Type string          `json:"type"`
	Args json.RawMessage `json:"args"`
}

type RequestHandler

type RequestHandler interface {
	// HandleRequest should respond to the provided request using the response
	// writer. Implementations must eventually call Stream.CloseWithError.
	// HandleRequest may block as it is executed in a goroutine.
	HandleRequest(ctx context.Context, s Stream, req *Request)
}

type RequestStream

type RequestStream struct {
	// contains filtered or unexported fields
}

func NewRequestStream

func NewRequestStream(ctx context.Context, onLocalClose onLocalCloseFn, number int, typ ProcedureType, raw MessageSender) (*RequestStream, error)

func (*RequestStream) CloseWithError

func (rs *RequestStream) CloseWithError(err error) error

func (*RequestStream) HandleNewMessage

func (rs *RequestStream) HandleNewMessage(msg transport.Message) error

func (*RequestStream) IncomingMessages

func (rs *RequestStream) IncomingMessages() (<-chan IncomingMessage, error)

func (*RequestStream) RequestNumber

func (rs *RequestStream) RequestNumber() int

func (*RequestStream) WriteMessage

func (rs *RequestStream) WriteMessage(body []byte, bodyType transport.MessageBodyType) error

type RequestStreams

type RequestStreams struct {
	// contains filtered or unexported fields
}

RequestStreams is used for handling streams initiated by remote (for which incoming messages have positive request numbers).

func NewRequestStreams

func NewRequestStreams(raw MessageSender, handler RequestHandler, logger logging.Logger) *RequestStreams

NewRequestStreams creates new request streams which use the provided context to run the loop cleaning up closed streams. The lifecycle of this context should most likely be the same as the lifecycle of the underlying connection.

func (*RequestStreams) Close added in v0.0.2

func (s *RequestStreams) Close()

func (*RequestStreams) HandleIncomingRequest

func (s *RequestStreams) HandleIncomingRequest(ctx context.Context, msg *transport.Message) error

HandleIncomingRequest processes incoming messages: requests to open a new stream, messages which are a part of an open duplex stream initiated by the remote or messages closing a stream initiated by the remote. Returning an error from this function shuts down the entire connection.

type Response

type Response struct {
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse(b []byte) *Response

func (Response) Bytes

func (r Response) Bytes() []byte

type ResponseStream

type ResponseStream interface {
	WriteMessage(body []byte, bodyType transport.MessageBodyType) error
	Channel() <-chan ResponseWithError
	Ctx() context.Context
}

ResponseStream represents a stream that we initiated.

type ResponseStreams

type ResponseStreams struct {
	// contains filtered or unexported fields
}

ResponseStreams is used for handling streams initiated by us (for which incoming messages have negative request numbers).

func NewResponseStreams

func NewResponseStreams(raw MessageSender, logger logging.Logger) *ResponseStreams

func (*ResponseStreams) Close

func (s *ResponseStreams) Close()

func (*ResponseStreams) HandleIncomingResponse

func (s *ResponseStreams) HandleIncomingResponse(msg *transport.Message) error

HandleIncomingResponse processes an incoming response. Returning an error from this function shuts down the entire connection.

func (*ResponseStreams) Open

type ResponseWithError

type ResponseWithError struct {
	// Value is only set if Err is nil.
	Value *Response

	// If Err is not nil then it may be of ErrRemoteEnd, RemoteError or a
	// different error.
	Err error
}

todo private fields and constructor

type Stream

type Stream interface {
	// WriteMessage sends a message over the underlying stream.
	WriteMessage(body []byte, bodyType transport.MessageBodyType) error

	// CloseWithError terminates the underlying stream. Error is sent to the
	// other party. Error can be nil.
	CloseWithError(err error) error

	// IncomingMessages gives the caller access to incoming messages. Returns an
	// error if this isn't a duplex stream.
	IncomingMessages() (<-chan IncomingMessage, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL