Documentation ¶
Overview ¶
Package carmirror provides a generic Go implementation of the CAR Mirror protocol.
Index ¶
- func BlockEqual[I BlockId](a RawBlock[I], b RawBlock[I]) bool
- type Block
- type BlockId
- type BlockIdRef
- type BlockReceiver
- type BlockSender
- type BlockStore
- type ByteAndBlockReader
- type Flags
- type MutablePointerResolver
- type Orchestrator
- type RawBlock
- type ReadableBlockStore
- type SessionEvent
- type SimpleStatusAccumulator
- func (ssa *SimpleStatusAccumulator[I]) Have(id I) error
- func (ssa *SimpleStatusAccumulator[I]) HaveCount() uint
- func (ssa *SimpleStatusAccumulator[I]) Receive(id I) error
- func (ssa *SimpleStatusAccumulator[I]) Send(sender StatusSender[I]) error
- func (ssa *SimpleStatusAccumulator[I]) Want(id I) error
- func (ssa *SimpleStatusAccumulator[I]) WantCount() uint
- type SinkSession
- func (ss *SinkSession[I, F]) AccumulateStatus(id I) error
- func (ss *SinkSession[I, F]) Cancel() error
- func (ss *SinkSession[I, F]) Close() error
- func (ss *SinkSession[I, F]) Done() <-chan error
- func (ss *SinkSession[I, F]) Enqueue(id I) error
- func (ss *SinkSession[I, F]) HandleBlock(rawBlock RawBlock[I])
- func (ss *SinkSession[I, F]) HandleBlocks(rawBlocks []RawBlock[I])
- func (ss *SinkSession[I, F]) Info() *SinkSessionInfo[F]
- func (ss *SinkSession[I, F]) IsClosed() bool
- func (ss *SinkSession[I, F]) Orchestrator() Orchestrator[F]
- func (ss *SinkSession[I, F]) Run(statusSender StatusSender[I])
- func (ss *SinkSession[I, F]) Started() <-chan bool
- type SinkSessionInfo
- type SourceSession
- func (ss *SourceSession[I, F]) Cancel() error
- func (ss *SourceSession[I, F]) Close() error
- func (ss *SourceSession[I, F]) Done() <-chan error
- func (ss *SourceSession[I, F]) Enqueue(id I) error
- func (ss *SourceSession[I, F]) HandleStatus(have filter.Filter[I], want []I)
- func (ss *SourceSession[I, F]) Info() *SourceSessionInfo[F]
- func (ss *SourceSession[I, F]) IsClosed() bool
- func (ss *SourceSession[I, F]) NumBlocksToSend() int
- func (ss *SourceSession[I, F]) Orchestrator() Orchestrator[F]
- func (ss *SourceSession[I, F]) Run(blockSender BlockSender[I])
- func (ss *SourceSession[I, F]) Started() <-chan bool
- type SourceSessionInfo
- type StatusAccumulator
- type StatusReceiver
- type StatusSender
- type SynchronizedBlockStore
- func (bs *SynchronizedBlockStore[I]) Add(ctx context.Context, block RawBlock[I]) (Block[I], error)
- func (bs *SynchronizedBlockStore[I]) AddMany(ctx context.Context, blocks []RawBlock[I]) ([]Block[I], error)
- func (bs *SynchronizedBlockStore[I]) All(ctx context.Context) (<-chan I, error)
- func (bs *SynchronizedBlockStore[I]) Get(ctx context.Context, id I) (Block[I], error)
- func (bs *SynchronizedBlockStore[I]) Has(ctx context.Context, id I) (bool, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Block ¶
type Block[I BlockId] interface { RawBlock[I] // Children returns the BlockIds of the children of this block. Children() []I }
Block is an immutable data block referenced by a unique ID. It may reference other blocks by Id.
type BlockId ¶
type BlockId interface { comparable encoding.BinaryMarshaler json.Marshaler cbor.Marshaler String() string }
BlockId represents a unique identifier for a Block. This interface only represents the identifier, not the Block. The interface is chosen for compatibility with ipfs/go-cid - noting that go-cid is, for the moment, comparable.
It's annoying that go-cid doesn't implement cbor.Marshaler/cbor.Unmarshaler because the binary representation of a CID and the CBOR canonical representation are quite different:
- [https://pkg.go.dev/github.com/ipfs/go-cid#Cast] - binary form of CID
- [https://ipld.io/specs/codecs/dag-cbor/spec/#links] - CBOR representation
Unfortunately, we intend to leverage the CAR v1 spec for compatibility, which, oddly, specifies using the CBOR form of the CID in the header and the raw byte form in the body. Thus, our interface here needs to be able support both.
type BlockIdRef ¶
type BlockIdRef[T BlockId] interface { *T encoding.BinaryUnmarshaler json.Unmarshaler cbor.Unmarshaler Read(ByteAndBlockReader) (int, error) }
BlockIdRef is a reference to a BlockId. In Go, you are not supposed to have a mix of pointer and non-pointer receivers for the same interface. go-cid has a mix of pointer and non-pointer receivers. Since unmarshaling a BlockId will require a pointer receiver in order to update the BlockId, we needed a separate interface.
type BlockReceiver ¶
type BlockReceiver[I BlockId] interface { // HandleBlock is called on receipt of a new block. HandleBlock(block RawBlock[I]) // HandleBlocks is called on receipt of a new batch of blocks. HandleBlocks(block []RawBlock[I]) }
BlockReceiver is responsible for receiving blocks at the Sink.
type BlockSender ¶
type BlockSender[I BlockId] interface { // Send a block SendBlock(block RawBlock[I]) error // Ensure any blocks queued for sending are actually sent; should block until all blocks are sent. Flush() error // Close the sender gracefully, ensuring any pending blocks are flushed. Close() error // Len returns the number of blocks queued for sending. Len() int }
BlockSender is responsible for sending blocks from the Source, immediately and asynchronously, or via a buffer. The details are up to the implementor.
type BlockStore ¶
type BlockStore[I BlockId] interface { ReadableBlockStore[I] // Add adds a given block into the blockstore. Add(context.Context, RawBlock[I]) (Block[I], error) // AddMany adds a given set of blocks into the blockstore. AddMany(context.Context, []RawBlock[I]) ([]Block[I], error) }
BlockStore represents read and write operations for a store of blocks.
type ByteAndBlockReader ¶
type ByteAndBlockReader interface { io.Reader io.ByteReader }
ByteAndBlockReader is an io.Reader that also implements io.ByteReader.
type MutablePointerResolver ¶
type MutablePointerResolver[I BlockId] interface { // Resolve attempts to resolve ptr into a block ID. Resolve(ptr string) (I, error) }
MutablePointerResolver is responsible for resolving a pointer into a BlockId.
type Orchestrator ¶
type Orchestrator[F Flags] interface { // Notify is used to notify the orchestrator of an event. Notify(SessionEvent) error // Get the current state of the Orchestrator. State() F // IsClosed returns true if the orchestrator is closed. IsClosed() bool // IsRequester returns true if the Orchestrator is the requester. IsRequester() bool // IsSafeStateToClose returns true if the state of the Orchestrator indicates that the session should close. // Other factors may need to be taken into account as well to determine if close should happen, but this // tells us if the states indicate a close is safe. IsSafeStateToClose() bool // ShouldFlush returns true if the state of the Orchestrator indicates that the session should flush. ShouldFlush() bool }
Orchestrator is responsible for managing the flow of blocks and/or status. Typically the Orchestrator is some external object such as a connection which is aware of details which are specific to the implementation (such as whether the communication is synchronous or batch oriented).
type RawBlock ¶
type RawBlock[I BlockId] interface { // Id returns the BlockId for the Block. Id() I // RawData returns the raw data bytes for the Block. RawData() []byte // Size returns the size of the block Size() int64 }
RawBlock represents a raw block before any association with a blockstore.
type ReadableBlockStore ¶
type ReadableBlockStore[I BlockId] interface { // Get returns the block from the blockstore with the given ID. Get(context.Context, I) (Block[I], error) // Has returns true if the blockstore has a block with the given ID. Has(context.Context, I) (bool, error) // All returns a channel that will receive all of the block IDs in this store. All(context.Context) (<-chan I, error) }
ReadableBlockStore represents read operations for a store of blocks.
type SessionEvent ¶
type SessionEvent uint16
SessionEvent is an event that can occur during a session. When events occur, they trigger updates to session state. The Orchestrator, which is typically some object outside the session which is aware of implementation specific information about the channel over which communication is actually occuring, may also 'wait' on notification of an event.
const ( BEGIN_SESSION SessionEvent = iota // A session has begun END_SESSION // A session has ended BEGIN_DRAINING // No more data is currently available for processing END_DRAINING // Session has completed processing related to the draining event BEGIN_CLOSE // The session has been notified it should close when current processing is complete END_CLOSE // Immediate actions to start the session closing are complete BEGIN_FLUSH // Flush has been called on a SimpleBatchBlockSender to send a batch of blocks from Source to Sink END_FLUSH // Flush has completed BEGIN_SEND // On Source, an individual block has been sent to the BlockSender. On Sink, we are 'sending' status to the StatusAccumulator END_SEND // Send has completed BEGIN_RECEIVE // An individual block or status update is received END_RECEIVE // Receive operation is completed BEGIN_PROCESSING // Begin main processing loop END_PROCESSING // End main procesing loop // TODO: Change comments to allow BATCH to be about blocks and status too. BEGIN_BATCH // SimpleBatchBlockReceiver has started processing a batch of blocks on the Sink END_BATCH // Finished processing a batch of blocks BEGIN_ENQUEUE // Request made to start transfer of a specific block and its children END_ENQUEUE // Enqueue is complete CANCEL // Request made to immediately end session and abandon any current transfer )
Core session event constants.
func (SessionEvent) String ¶
func (se SessionEvent) String() string
String returns a string representation of the session event.
type SimpleStatusAccumulator ¶
type SimpleStatusAccumulator[I BlockId] struct { // contains filtered or unexported fields }
SimpleStatusAccumulator is a simple implementation of StatusAccumulator. Its operations are protected by a mutex, so it is safe to use from multiple goroutines.
func NewSimpleStatusAccumulator ¶
func NewSimpleStatusAccumulator[I BlockId](filter filter.Filter[I]) *SimpleStatusAccumulator[I]
NewSimpleStatusAccumulator creates a new SimpleStatusAccumulator.
func (*SimpleStatusAccumulator[I]) Have ¶
func (ssa *SimpleStatusAccumulator[I]) Have(id I) error
Have marks the given block as having been received.
func (*SimpleStatusAccumulator[I]) HaveCount ¶
func (ssa *SimpleStatusAccumulator[I]) HaveCount() uint
HaveCount returns the number of blocks that have been received.
func (*SimpleStatusAccumulator[I]) Receive ¶
func (ssa *SimpleStatusAccumulator[I]) Receive(id I) error
Receive marks the given block as received.
func (*SimpleStatusAccumulator[I]) Send ¶
func (ssa *SimpleStatusAccumulator[I]) Send(sender StatusSender[I]) error
Send sends the current status using the given StatusSender.
func (*SimpleStatusAccumulator[I]) Want ¶
func (ssa *SimpleStatusAccumulator[I]) Want(id I) error
Want marks the given block as wanted.
func (*SimpleStatusAccumulator[I]) WantCount ¶
func (ssa *SimpleStatusAccumulator[I]) WantCount() uint
WantCount returns the number of blocks that are currently wanted.
type SinkSession ¶
SinkSession is a session that receives blocks and sends out status updates. Two type parameters must be supplied, which are the concrete type of the BlockId and the session State
func NewSinkSession ¶
func NewSinkSession[I BlockId, F Flags]( store BlockStore[I], statusAccumulator StatusAccumulator[I], orchestrator Orchestrator[F], stats stats.Stats, maxBlocksPerRound uint32, requester bool, ) *SinkSession[I, F]
NewSinkSession creates a new SinkSession.
func (*SinkSession[I, F]) AccumulateStatus ¶
func (ss *SinkSession[I, F]) AccumulateStatus(id I) error
Accumulates the status of the block with the given id and all of its children.
func (*SinkSession[I, F]) Cancel ¶
func (ss *SinkSession[I, F]) Cancel() error
Cancel cancels the session. The session does not *immediately* terminate; the orchestrator plays a role in deciding this. However, transfers in progress will not usually complete.
func (*SinkSession[I, F]) Close ¶
func (ss *SinkSession[I, F]) Close() error
Closes the sink session. Note that the session does not close immediately; this method will return before the session is closed. Exactly *when* the session closes is determined by the orchestrator, but in general this should be only after the session has completed all transfers which are currently in progresss.
func (*SinkSession[I, F]) Done ¶
func (ss *SinkSession[I, F]) Done() <-chan error
Done returns an error channel which will be closed when the session is complete.
func (*SinkSession[I, F]) Enqueue ¶
func (ss *SinkSession[I, F]) Enqueue(id I) error
Enqueue a block for transfer (will retrieve block and children from the related source session)
func (*SinkSession[I, F]) HandleBlock ¶
func (ss *SinkSession[I, F]) HandleBlock(rawBlock RawBlock[I])
HandleBlock handles a block that is being received. Adds the block to the session's store, and then queues the block for further processing.
func (*SinkSession[I, F]) HandleBlocks ¶
func (ss *SinkSession[I, F]) HandleBlocks(rawBlocks []RawBlock[I])
HandleBlocks handles a slice of blocks that are being received. Adds the blocks to the session's store, and then queues the blocks for further processing.
func (*SinkSession[I, F]) Info ¶
func (ss *SinkSession[I, F]) Info() *SinkSessionInfo[F]
Get information about this session
func (*SinkSession[I, F]) IsClosed ¶
func (ss *SinkSession[I, F]) IsClosed() bool
IsClosed returns true if the session is closed.
func (*SinkSession[I, F]) Orchestrator ¶
func (ss *SinkSession[I, F]) Orchestrator() Orchestrator[F]
Get the orchestrator for this session
func (*SinkSession[I, F]) Run ¶
func (ss *SinkSession[I, F]) Run( statusSender StatusSender[I], )
Runs the receiver session. Pulls blocks from the queue of received blocks, then checks the block descendents to see if any are already present in the store, accumulating status accordingly. Terminates when the orchestrator's IsClosed method returns true.
func (*SinkSession[I, F]) Started ¶
func (ss *SinkSession[I, F]) Started() <-chan bool
Started returns a bool channel which will receive a value when the session has started.
type SinkSessionInfo ¶
type SinkSessionInfo[F Flags] struct { PendingBlocksCount uint // Number of received blocks currently pending processing State F // State from Orchestrator HavesEstimate uint // Estimate of the number of 'haves' accumulated since status last sent Wants uint // Number of 'wants' accumulated since status last sent }
Struct for returning summary information about the session. This is intended to allow implementers to provide users with information concerning transfers in progress.
func (*SinkSessionInfo[F]) String ¶
func (inf *SinkSessionInfo[F]) String() string
Default string representation of SinkSessionInfo
type SourceSession ¶
SinkSession is a session that sends blocks and receives status updates. Two type parameters must be supplied, which are the concrete type of the BlockId and the session State
func NewSourceSession ¶
func NewSourceSession[I BlockId, F Flags]( store BlockStore[I], filter filter.Filter[I], orchestrator Orchestrator[F], stats stats.Stats, maxBlocksPerRound uint32, maxBlocksPerColdCall uint32, requester bool, ) *SourceSession[I, F]
NewSourceSession creates a new SourceSession.
func (*SourceSession[I, F]) Cancel ¶
func (ss *SourceSession[I, F]) Cancel() error
Cancel cancels the session. The session does not *immediately* terminate; the orchestrator plays a role in deciding this. However, transfers in progress will not usually complete.
func (*SourceSession[I, F]) Close ¶
func (ss *SourceSession[I, F]) Close() error
Closes the source session. Note that the session does not close immediately; this method will return before the session is closed. Exactly *when* the session closes is determined by the orchestrator, but in general this should be only after the session has completed all transfers which are currently in progresss.
func (*SourceSession[I, F]) Done ¶
func (ss *SourceSession[I, F]) Done() <-chan error
Done returns an error channel that will be closed when the session is closed. If the session is closed due to an error, the error will be sent on the channel.
func (*SourceSession[I, F]) Enqueue ¶
func (ss *SourceSession[I, F]) Enqueue(id I) error
Enqueue enqueues a block id to be sent.
func (*SourceSession[I, F]) HandleStatus ¶
func (ss *SourceSession[I, F]) HandleStatus( have filter.Filter[I], want []I, )
HandleStatus handles incoming status, updating the filter and pending blocks list.
func (*SourceSession[I, F]) Info ¶
func (ss *SourceSession[I, F]) Info() *SourceSessionInfo[F]
Retrieve information about this session
func (*SourceSession[I, F]) IsClosed ¶
func (ss *SourceSession[I, F]) IsClosed() bool
IsClosed returns true if the session is closed.
func (*SourceSession[I, F]) NumBlocksToSend ¶
func (ss *SourceSession[I, F]) NumBlocksToSend() int
func (*SourceSession[I, F]) Orchestrator ¶
func (ss *SourceSession[I, F]) Orchestrator() Orchestrator[F]
Retrieve the current session state
func (*SourceSession[I, F]) Run ¶
func (ss *SourceSession[I, F]) Run( blockSender BlockSender[I], )
Runs the sender session. Polls the queue of pending blocks, and sends the next block and then queues the children of that block to be sent. Only sends a block if it does not match the Filter, or if it is specifically requested via Enqueue or included as a 'want' in received status.
func (*SourceSession[I, F]) Started ¶
func (ss *SourceSession[I, F]) Started() <-chan bool
Started returns a channel that will be closed when the session is started.
type SourceSessionInfo ¶
type SourceSessionInfo[F Flags] struct { PendingBlocksCount uint // Number of blocks awaiting transmission State F // Internal sessions state provided by the Orchestraotr HavesEstimate uint // Estimate of the number of specific block Ids we know are present on the Sink }
Struct for returting summary information about a Source Session. This is allows implementers to provide user feedback on the state of in-progress transfers.
func (*SourceSessionInfo[F]) String ¶
func (inf *SourceSessionInfo[F]) String() string
Default user-friendly representation of SourceSessionInfo
type StatusAccumulator ¶
type StatusAccumulator[I BlockId] interface { // Have records that the Sink has a block. Have(I) error // Get the number of unique Haves since the last Send HaveCount() uint // Want records that the Sink wants a block. Want(I) error // Get the number of unique Wants since the last Send WantCount() uint // Send sends the status to the StatusSender. Send(StatusSender[I]) error // Receive records that the block id has been received, updating any accumulated status as a result. Receive(I) error }
StatusAccumulator is responsible for collecting status on the Sink so that it can be sent to the Source.
type StatusReceiver ¶
type StatusReceiver[I BlockId] interface { // HandleStatus is called on receipt of a new status. // The have filter is a lossy filter of the blocks that the Sink has. // The want list is a list of blocks that the Sink wants. HandleStatus(have filter.Filter[I], want []I) }
StatusReceiver is responsible for receiving status on the Source.
type StatusSender ¶
type StatusSender[I BlockId] interface { // SendStatus sends the status to the SourceSession. // The have filter is a lossy filter of the blocks that the SinkSession has. // The want list is a list of blocks that the SinkSession wants. SendStatus(have filter.Filter[I], want []I) error // Close the Status Sender Close() error }
StatusSender is responsible for sending status from the Sink. The key intuition of CAR Mirror is information about blocks already present on a Sink can be sent efficiently using a lossy filter. We also need to be able to request specific blocks from the source. 'Status' is therefore formally a Filter of blocks the source already have and a list of blocks the source definitely wants.
type SynchronizedBlockStore ¶
type SynchronizedBlockStore[I BlockId] struct { // contains filtered or unexported fields }
SynchronizedBlockStore is a BlockStore that is also thread-safe.
func NewSynchronizedBlockStore ¶
func NewSynchronizedBlockStore[I BlockId](store BlockStore[I]) *SynchronizedBlockStore[I]
NewSynchronizedBlockStore creates a new SynchronizedBlockStore.
func (*SynchronizedBlockStore[I]) Add ¶
Add adds a given RawBlock to the synchronized blockstore, returnng the Block.
func (*SynchronizedBlockStore[I]) AddMany ¶
func (bs *SynchronizedBlockStore[I]) AddMany(ctx context.Context, blocks []RawBlock[I]) ([]Block[I], error)
AddMany adds a given set of RawBlocks to the synchronized blockstore, returning the Blocks.
func (*SynchronizedBlockStore[I]) All ¶
func (bs *SynchronizedBlockStore[I]) All(ctx context.Context) (<-chan I, error)
All returns a channel that will receive all of the block IDs in this synchronized blockstore.