sprout-go: git.sr.ht/~whereswaldon/sprout-go Index | Files | Directories

package sprout

import "git.sr.ht/~whereswaldon/sprout-go"

Package sprout provides types and utilities for implementing client and server programs that speak the Sprout Protocol. The Sprout Protocol is specified here:

https://man.sr.ht/~whereswaldon/arborchat/specifications/sprout.md

NOTE: this package requires using a fork of golang.org/x/crypto, and you must therefore include the following in your `go.mod`:

replace golang.org/x/crypto => github.com/ProtonMail/crypto <version-from-sprout-go's-go.mod>

This package exports several important types.

The Conn type wraps a connection-oriented transport (usually a TCP connection) and provides methods for sending sprout messages and reading sprout messages off of the connection. It has a number of exported fields which are functions that should handle incoming messages. These must be set by the user, and their behavior should conform to the Sprout specification. If using a Conn directly, be sure to invoke the ReadMessage() method properly to ensure that you receive repies.

The Worker type wraps a Conn and provides automatic implementations of both the handler functions for each sprout message and the processing loop that will read new messages and dispatch their handlers. You can send messages on a worker by calling Conn methods via struct embedding. It has an exported embedded Conn.

The Conn type has both synchronous and asynchronous methods for sending messages. The synchronous ones block until they recieve a response or their timeout channel emits a value. Details on how to use these methods follow.

Note: The Send* methods

The non-Async methods block until the get a response or until their timeout is reached. There are several cases in which will return an error:

- There is a network problem sending the message or receiving the response

- There is a problem creating the outbound message or parsing the inbound response

- The status message received in response is not sprout.StatusOk. In this case, the error will be of type sprout.Status

The recommended way to invoke synchronous Send*() methods is with a time.Ticker as the input channel, like so:

err := s.SendVersion(time.NewTicker(time.Second*5).C)

Note: The Send*Async methods

The Async versions of each send operation provide more granular control over blocking behavior. They return a chan interface{}, but will never send anything other than a sprout.Status or sprout.Response over that channel. It is safe to assume that the value will be one of those two.

The Async versions also return a handle for the request called a MessageID. This can be used to cancel the request in the event that it doesn't have a response or the response no longer matters. This can be done manually using the Cancel() method on the Conn type. The synchronous version of each send method handles this for you, but it must be done manually with the async variant.

An example of the appropriate use of an async method:

resultChan, messageID, err := conn.SendQueryAsync(ids)
if err != nil {
    // handle err
}
select {
    case data := <-resultChan:
        switch asConcrete := data.(type) {
            case sprout.Status:
                // handle status
            case sprout.Response:
                // handle Response
        }
    case <-time.NewTicker(time.Second*5).C:
        conn.Cancel(messageID)
        // handle timeout
}

Index

Package Files

doc.go session.go sprout.go supervisor.go worker.go

Constants

const (
    CurrentMajor = 0
    CurrentMinor = 0
)

func LaunchSupervisedWorker Uses

func LaunchSupervisedWorker(done <-chan struct{}, addr string, s store.ExtendedStore, tlsConfig *tls.Config, logger *log.Logger)

LaunchSupervisedWorker launches a worker in a new goroutine that will connect to `addr` and use `store` as its node storage. It will dial using the provided `tlsConfig`, and it will log errors on the given `logger`.

BUG(whereswaldon): this interface is experimental and likely to change.

func NodeFromBase64URL Uses

func NodeFromBase64URL(in string) (forest.Node, error)

type Conn Uses

type Conn struct {
    // Write side of connection, synchronized with mutex
    sync.Mutex
    Conn io.ReadWriteCloser

    // Read side of connection, buffered for parse simplicity
    BufferedConn io.Reader

    // Protocol version in use
    Major, Minor int

    // Map from messageID to channel waiting for response
    PendingStatus sync.Map

    OnVersion     func(s *Conn, messageID MessageID, major, minor int) error
    OnList        func(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error
    OnQuery       func(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error
    OnAncestry    func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error
    OnLeavesOf    func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error
    OnSubscribe   func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error
    OnUnsubscribe func(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) error
    OnAnnounce    func(s *Conn, messageID MessageID, nodes []forest.Node) error
    // contains filtered or unexported fields
}

func NewConn Uses

func NewConn(transport io.ReadWriteCloser) (*Conn, error)

NewConn constructs a sprout connection using the provided transport. Writes to the transport are expected to reach the other end of the sprout connection, and reads should deliver bytes from the other end. The expected use is TCP connections, though other transports are possible.

func (*Conn) Cancel Uses

func (s *Conn) Cancel(messageID MessageID)

Cancel deallocates the response structures associated with the protocol message with the given identifier. This is primarily useful when the other end of the connection has not responded in a long time, and we are interested in cleaning up the resources used in waiting for them to respond. An attempt to cancel a message that is not waiting for a response will have no effect.

func (*Conn) ReadMessage Uses

func (s *Conn) ReadMessage() error

ReadMessage reads and parses a single sprout protocol message off of the connection. It calls the appropriate OnVerb handler function when it parses a message, and it returns any parse errors. It will block when no messages are available.

This method must be called in a loop in order for the sprout connection to be able to receive messages properly. This isn't done automatically by the Conn type in order to provide flexibility on how to handler errors from this method. The Worker type can wrap a Conn to both implement its handler functions and call this method automatically.

This method may return an UnsolicitedMessageError in some cases. This may be due to a local timeout/request cancellation, and should generally not be cause to close the connection entirely.

func (*Conn) SendAncestry Uses

func (s *Conn) SendAncestry(nodeID *fields.QualifiedHash, levels int, timeoutChan <-chan time.Time) (Response, error)

SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return.

func (*Conn) SendAncestryAsync Uses

func (s *Conn) SendAncestryAsync(nodeID *fields.QualifiedHash, levels int) (<-chan interface{}, MessageID, error)

SendAncestry requests the ancestry of the node with the given id. The levels parameter specifies the maximum number of leves of ancestry to return. See the package-level documentation for details on how to use the Async methods.

func (*Conn) SendAnnounce Uses

func (s *Conn) SendAnnounce(nodes []forest.Node, timeoutChan <-chan time.Time) error

SendAnnounce announces the existence of the given nodes to the peer on the other end of the sprout connection.

func (*Conn) SendAnnounceAsync Uses

func (s *Conn) SendAnnounceAsync(nodes []forest.Node) (<-chan interface{}, MessageID, error)

SendAnnounceAsync announces the existence of the given nodes to the peer on the other end of the sprout connection. See the package-level documentation for details on how to use Async methods.

func (*Conn) SendLeavesOf Uses

func (s *Conn) SendLeavesOf(nodeId *fields.QualifiedHash, quantity int, timeoutChan <-chan time.Time) (Response, error)

SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID.

func (*Conn) SendLeavesOfAsync Uses

func (s *Conn) SendLeavesOfAsync(nodeId *fields.QualifiedHash, quantity int) (<-chan interface{}, MessageID, error)

SendLeavesOf returns up to quantity nodes that are leaves in the tree rooted at the given ID. For a description of how to use the Async methods, see the package-level documentation.

func (*Conn) SendList Uses

func (s *Conn) SendList(nodeType fields.NodeType, quantity int, timeoutChan <-chan time.Time) (Response, error)

SendList requests a list of recent nodes of a particular node type from the other end of the sprout connection.

func (*Conn) SendListAsync Uses

func (s *Conn) SendListAsync(nodeType fields.NodeType, quantity int) (<-chan interface{}, MessageID, error)

SendListAsync requests a list of recent nodes of a particular node type from the other end of the sprout connection. The requested quantity is the maximum number of nodes that the other end should provide, though it may provide significantly fewer. See the package level documentation for details on how to use the Async methods.

func (*Conn) SendQuery Uses

func (s *Conn) SendQuery(nodeIds []*fields.QualifiedHash, timeoutChan <-chan time.Time) (Response, error)

SendQuery requests the nodes with a list of IDs from the other side of the sprout connection.

func (*Conn) SendQueryAsync Uses

func (s *Conn) SendQueryAsync(nodeIds ...*fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendQueryAsync requests the nodes with a list of IDs from the other side of the sprout connection. See the package level documentation for details on how to use the Async methods.

func (*Conn) SendResponse Uses

func (s *Conn) SendResponse(msgID MessageID, nodes []forest.Node) error

func (*Conn) SendStatus Uses

func (s *Conn) SendStatus(targetMessageID MessageID, errorCode StatusCode) error

SendStatus responds to the message with the give targetMessageID with the given status code. It is always synchronous, and will return any error in transmitting the message.

func (*Conn) SendSubscribe Uses

func (s *Conn) SendSubscribe(community *forest.Community, timeoutChan <-chan time.Time) error

SendSubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendSubscribeAsync Uses

func (s *Conn) SendSubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)

SendSubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendSubscribeByID Uses

func (s *Conn) SendSubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error

SendSubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendSubscribeByIDAsync Uses

func (s *Conn) SendSubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendSubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendUnsubscribe Uses

func (s *Conn) SendUnsubscribe(community *forest.Community, timeoutChan <-chan time.Time) error

SendUnsubscribe attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendUnsubscribeAsync Uses

func (s *Conn) SendUnsubscribeAsync(community *forest.Community) (<-chan interface{}, MessageID, error)

SendUnsubscribeAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendUnsubscribeByID Uses

func (s *Conn) SendUnsubscribeByID(community *fields.QualifiedHash, timeoutChan <-chan time.Time) error

SendUnsubscribeByID attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce().

func (*Conn) SendUnsubscribeByIDAsync Uses

func (s *Conn) SendUnsubscribeByIDAsync(community *fields.QualifiedHash) (<-chan interface{}, MessageID, error)

SendUnsubscribeByIDAsync attempts to add the given community ID to the list of subscribed IDs for this connection. If it succeeds, both peers are required to exchange new nodes for that community using Announce(). For details on how to use Async methods, see the package-level documentation.

func (*Conn) SendVersion Uses

func (s *Conn) SendVersion(timeoutChan <-chan time.Time) error

SendVersion notifies the other end of the sprout connection of our supported protocol version number.

func (*Conn) SendVersionAsync Uses

func (s *Conn) SendVersionAsync() (<-chan interface{}, MessageID, error)

SendVersionAsync notifies the other end of the sprout connection of our supported protocol version number. See the package-level documentation for details on how to use the Async methods properly.

type MessageID Uses

type MessageID int

type Response Uses

type Response struct {
    Nodes []forest.Node
}

type Session Uses

type Session struct {
    sync.RWMutex
    Communities map[*fields.QualifiedHash]struct{}
}

Session stores the state of a sprout connection between hosts, which is currently just the subscribed community set.

func NewSession Uses

func NewSession() *Session

func (*Session) IsSubscribed Uses

func (c *Session) IsSubscribed(communityID *fields.QualifiedHash) bool

func (*Session) Subscribe Uses

func (c *Session) Subscribe(communityID *fields.QualifiedHash)

func (*Session) Unsubscribe Uses

func (c *Session) Unsubscribe(communityID *fields.QualifiedHash)

type Status Uses

type Status struct {
    Code StatusCode
}

func (Status) Error Uses

func (s Status) Error() string

type StatusCode Uses

type StatusCode int

StatusCode represents the status of a sprout protocol message.

const (
    StatusOk            StatusCode = 0
    ErrorMalformed      StatusCode = 1
    ErrorProtocolTooOld StatusCode = 2
    ErrorProtocolTooNew StatusCode = 3
    ErrorUnknownNode    StatusCode = 4
)

func (StatusCode) String Uses

func (s StatusCode) String() string

String converts the status code into a human-readable error message

type UnsolicitedMessageError Uses

type UnsolicitedMessageError struct {
    // The ID that the unsolicited message was in response to
    MessageID
}

UnsolicitedMessageError is an error indicating that a sprout peer sent a response or status message with an ID that was unexpected. This could occur when we cancelled waiting on a request (such as a timeout), when the peer has a bug (double response, incorrect target message id in response), or when the peer is misbehaving.

func (UnsolicitedMessageError) Error Uses

func (u UnsolicitedMessageError) Error() string

type Verb Uses

type Verb string
const (
    VersionVerb     Verb = "version"
    ListVerb        Verb = "list"
    QueryVerb       Verb = "query"
    AncestryVerb    Verb = "ancestry"
    LeavesOfVerb    Verb = "leaves_of"
    SubscribeVerb   Verb = "subscribe"
    UnsubscribeVerb Verb = "unsubscribe"
    AnnounceVerb    Verb = "announce"
    ResponseVerb    Verb = "response"
    StatusVerb      Verb = "status"
)

type Worker Uses

type Worker struct {
    Done           <-chan struct{}
    DefaultTimeout time.Duration
    *Conn
    *log.Logger
    *Session
    SubscribableStore store.ExtendedStore
    // contains filtered or unexported fields
}

func NewWorker Uses

func NewWorker(done <-chan struct{}, conn net.Conn, s store.ExtendedStore) (*Worker, error)

func (*Worker) BootstrapLocalStore Uses

func (c *Worker) BootstrapLocalStore(maxCommunities int)

BootstrapLocalStore is a utility method for loading all available content from the peer on the other end of the sprout connection. It will

- discover all communities - fetch the signing identities of those communities - validate and insert those identities and communities into the

worker's store

- subscribe to all of those communities - fetch all leaves of those communities - fetch the ancestry of each leaf and validate it (fetching identities as necessary), inserting nodes that pass valdiation into the store

func (*Worker) HandleNewNode Uses

func (c *Worker) HandleNewNode(node forest.Node)

Asynchronously announce new node if appropriate

func (*Worker) IngestNode Uses

func (c *Worker) IngestNode(node forest.Node) error

IngestNode makes a best-effort attempt to validate and insert the given node. It will fetch the author (if not already available), then attempt to validate the node. If that validation fails, it will attempt to fetch the node's entire ancestry and the authorship of each ancestry node. It will validate each ancestor and insert them into the local store (if they are not already there), then it will attempt to re-validate the original node after processing its entire ancestry.

It will return the first error during this chain of validations.

func (*Worker) OnAncestry Uses

func (c *Worker) OnAncestry(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, levels int) error

func (*Worker) OnAnnounce Uses

func (c *Worker) OnAnnounce(s *Conn, messageID MessageID, nodes []forest.Node) error

func (*Worker) OnLeavesOf Uses

func (c *Worker) OnLeavesOf(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash, quantity int) error

func (*Worker) OnList Uses

func (c *Worker) OnList(s *Conn, messageID MessageID, nodeType fields.NodeType, quantity int) error

func (*Worker) OnQuery Uses

func (c *Worker) OnQuery(s *Conn, messageID MessageID, nodeIds []*fields.QualifiedHash) error

func (*Worker) OnSubscribe Uses

func (c *Worker) OnSubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)

func (*Worker) OnUnsubscribe Uses

func (c *Worker) OnUnsubscribe(s *Conn, messageID MessageID, nodeID *fields.QualifiedHash) (err error)

func (*Worker) OnVersion Uses

func (c *Worker) OnVersion(s *Conn, messageID MessageID, major, minor int) error

func (*Worker) Run Uses

func (c *Worker) Run()

Bugs

this interface is experimental and likely to change.

Directories

PathSynopsis
watch

Package sprout imports 15 packages (graph). Updated 2020-07-14. Refresh now. Tools for package owners.