Documentation ¶
Index ¶
- Variables
- func GetRemoteIdentityFromContext(ctx context.Context) (identity.Public, bool)
- func MarshalRequestBody(req *Request) ([]byte, error)
- func MustMarshalRequestBody(req *Request) []byte
- func NewRemoteError(response []byte) error
- func PutConnectionIdInContext(ctx context.Context, id ConnectionId) context.Context
- func PutRemoteIdentityInContext(ctx context.Context, remoteIdentity identity.Public) context.Context
- type Connection
- type ConnectionId
- type ConnectionIdGenerator
- type IncomingMessage
- type MessageSender
- type Procedure
- type ProcedureName
- type ProcedureType
- type RawConnection
- type RemoteError
- type Request
- type RequestBody
- type RequestHandler
- type RequestStream
- func (rs *RequestStream) CloseWithError(err error) error
- func (rs *RequestStream) HandleNewMessage(msg transport.Message) error
- func (rs *RequestStream) IncomingMessages() (<-chan IncomingMessage, error)
- func (rs *RequestStream) RequestNumber() int
- func (rs *RequestStream) WriteMessage(body []byte, bodyType transport.MessageBodyType) error
- type RequestStreams
- type Response
- type ResponseStream
- type ResponseStreams
- type ResponseWithError
- type Stream
Constants ¶
This section is empty.
Variables ¶
var ( ProcedureTypeUnknown = ProcedureType{"unknown"} // some procedures don't have a type ProcedureTypeSource = ProcedureType{"source"} ProcedureTypeDuplex = ProcedureType{"duplex"} ProcedureTypeAsync = ProcedureType{"async"} )
var ( // ErrRemoteEnd signals that the remote closed the stream but didn't signal // that an error occurred. ErrRemoteEnd = errors.New("remote end") )
Functions ¶
func MarshalRequestBody ¶
func MustMarshalRequestBody ¶
func NewRemoteError ¶
func PutConnectionIdInContext ¶
func PutConnectionIdInContext(ctx context.Context, id ConnectionId) context.Context
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func NewConnection( id ConnectionId, wasInitiatedByRemote bool, raw RawConnection, handler RequestHandler, logger logging.Logger, ) (*Connection, error)
NewConnection is the only way of creating a new Connection, zero value is invalid. Terminating the provided context is equivalent to calling Close. The provided context is used as a base context for the contexts passed to the request handler. Connection takes over managing RawConnection which must not be used further.
func (*Connection) Close ¶
func (c *Connection) Close() error
Close always returns nil. In theory shutting down a Secure Scuttlebutt RPC connection can result in an error as a goodbye message for the entire connection has to be sent successfully to the other side but those errors are not made available as it is unclear what to do with them.
func (*Connection) PerformRequest ¶
func (c *Connection) PerformRequest(ctx context.Context, req *Request) (ResponseStream, error)
func (*Connection) String ¶
func (c *Connection) String() string
func (*Connection) WasInitiatedByRemote ¶
func (c *Connection) WasInitiatedByRemote() bool
type ConnectionId ¶
type ConnectionId struct {
// contains filtered or unexported fields
}
func GetConnectionIdFromContext ¶
func GetConnectionIdFromContext(ctx context.Context) (ConnectionId, bool)
func NewConnectionId ¶
func NewConnectionId(id int) ConnectionId
func (ConnectionId) String ¶
func (c ConnectionId) String() string
type ConnectionIdGenerator ¶
type ConnectionIdGenerator struct {
// contains filtered or unexported fields
}
func NewConnectionIdGenerator ¶
func NewConnectionIdGenerator() *ConnectionIdGenerator
func (*ConnectionIdGenerator) Generate ¶
func (v *ConnectionIdGenerator) Generate() ConnectionId
type IncomingMessage ¶
type IncomingMessage struct {
Body []byte
}
type MessageSender ¶
type Procedure ¶
type Procedure struct {
// contains filtered or unexported fields
}
func MustNewProcedure ¶
func MustNewProcedure(name ProcedureName, typ ProcedureType) Procedure
func NewProcedure ¶
func NewProcedure(name ProcedureName, typ ProcedureType) (Procedure, error)
func (Procedure) Name ¶
func (p Procedure) Name() ProcedureName
func (Procedure) Typ ¶
func (p Procedure) Typ() ProcedureType
type ProcedureName ¶
type ProcedureName struct {
// contains filtered or unexported fields
}
func MustNewProcedureName ¶
func MustNewProcedureName(name []string) ProcedureName
func NewProcedureName ¶
func NewProcedureName(name []string) (ProcedureName, error)
func (ProcedureName) Components ¶
func (n ProcedureName) Components() []string
func (ProcedureName) Equal ¶
func (n ProcedureName) Equal(o ProcedureName) bool
func (ProcedureName) IsZero ¶
func (n ProcedureName) IsZero() bool
func (ProcedureName) String ¶
func (n ProcedureName) String() string
type ProcedureType ¶
type ProcedureType struct {
// contains filtered or unexported fields
}
func (ProcedureType) IsZero ¶
func (t ProcedureType) IsZero() bool
type RawConnection ¶
type RemoteError ¶
type RemoteError struct {
// contains filtered or unexported fields
}
func (RemoteError) As ¶
func (e RemoteError) As(target interface{}) bool
func (RemoteError) Error ¶
func (e RemoteError) Error() string
func (RemoteError) Is ¶
func (e RemoteError) Is(target error) bool
func (RemoteError) Response ¶
func (e RemoteError) Response() []byte
type Request ¶
type Request struct {
// contains filtered or unexported fields
}
func MustNewRequest ¶
func MustNewRequest(name ProcedureName, typ ProcedureType, arguments json.RawMessage) *Request
func NewRequest ¶
func NewRequest(name ProcedureName, typ ProcedureType, arguments json.RawMessage) (*Request, error)
func (Request) Arguments ¶
func (r Request) Arguments() json.RawMessage
func (Request) Name ¶
func (r Request) Name() ProcedureName
func (Request) Type ¶
func (r Request) Type() ProcedureType
type RequestBody ¶
type RequestBody struct { Name []string `json:"name"` Type string `json:"type"` Args json.RawMessage `json:"args"` }
type RequestHandler ¶
type RequestStream ¶
type RequestStream struct {
// contains filtered or unexported fields
}
func NewRequestStream ¶
func NewRequestStream(ctx context.Context, onLocalClose onLocalCloseFn, number int, typ ProcedureType, raw MessageSender) (*RequestStream, error)
func (*RequestStream) CloseWithError ¶
func (rs *RequestStream) CloseWithError(err error) error
func (*RequestStream) HandleNewMessage ¶
func (rs *RequestStream) HandleNewMessage(msg transport.Message) error
func (*RequestStream) IncomingMessages ¶
func (rs *RequestStream) IncomingMessages() (<-chan IncomingMessage, error)
func (*RequestStream) RequestNumber ¶
func (rs *RequestStream) RequestNumber() int
func (*RequestStream) WriteMessage ¶
func (rs *RequestStream) WriteMessage(body []byte, bodyType transport.MessageBodyType) error
type RequestStreams ¶
type RequestStreams struct {
// contains filtered or unexported fields
}
RequestStreams is used for handling streams initiated by remote (for which incoming messages have positive request numbers).
func NewRequestStreams ¶
func NewRequestStreams(raw MessageSender, handler RequestHandler, logger logging.Logger) *RequestStreams
NewRequestStreams creates new request streams which use the provided context to run the loop cleaning up closed streams. The lifecycle of this context should most likely be the same as the lifecycle of the underlying connection.
func (*RequestStreams) Close ¶ added in v0.0.2
func (s *RequestStreams) Close()
func (*RequestStreams) HandleIncomingRequest ¶
HandleIncomingRequest processes incoming messages: requests to open a new stream, messages which are a part of an open duplex stream initiated by the remote or messages closing a stream initiated by the remote. Returning an error from this function shuts down the entire connection.
type Response ¶
type Response struct {
// contains filtered or unexported fields
}
func NewResponse ¶
type ResponseStream ¶
type ResponseStream interface { WriteMessage(body []byte, bodyType transport.MessageBodyType) error Channel() <-chan ResponseWithError Ctx() context.Context }
ResponseStream represents a stream that we initiated.
type ResponseStreams ¶
type ResponseStreams struct {
// contains filtered or unexported fields
}
ResponseStreams is used for handling streams initiated by us (for which incoming messages have negative request numbers).
func NewResponseStreams ¶
func NewResponseStreams(raw MessageSender, logger logging.Logger) *ResponseStreams
func (*ResponseStreams) Close ¶
func (s *ResponseStreams) Close()
func (*ResponseStreams) HandleIncomingResponse ¶
func (s *ResponseStreams) HandleIncomingResponse(msg *transport.Message) error
HandleIncomingResponse processes an incoming response. Returning an error from this function shuts down the entire connection.
func (*ResponseStreams) Open ¶
func (s *ResponseStreams) Open(ctx context.Context, req *Request) (ResponseStream, error)
type ResponseWithError ¶
type ResponseWithError struct { // Value is only set if Err is nil. Value *Response // If Err is not nil then it may be of ErrRemoteEnd, RemoteError or a // different error. Err error }
todo private fields and constructor
type Stream ¶
type Stream interface { // WriteMessage sends a message over the underlying stream. WriteMessage(body []byte, bodyType transport.MessageBodyType) error // CloseWithError terminates the underlying stream. Error is sent to the // other party. Error can be nil. CloseWithError(err error) error // IncomingMessages gives the caller access to incoming messages. Returns an // error if this isn't a duplex stream. IncomingMessages() (<-chan IncomingMessage, error) }