core

package
v0.0.0-...-ee0e00b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: Apache-2.0 Imports: 16 Imported by: 1

Documentation

Overview

Package carmirror provides a generic Go implementation of the CAR Mirror protocol.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BlockEqual

func BlockEqual[I BlockId](a RawBlock[I], b RawBlock[I]) bool

BlockEqual returns true if the two blocks are equal.

Types

type Block

type Block[I BlockId] interface {
	RawBlock[I]

	// Children returns the BlockIds of the children of this block.
	Children() []I
}

Block is an immutable data block referenced by a unique ID. It may reference other blocks by Id.

type BlockId

type BlockId interface {
	comparable
	encoding.BinaryMarshaler
	json.Marshaler
	cbor.Marshaler
	String() string
}

BlockId represents a unique identifier for a Block. This interface only represents the identifier, not the Block. The interface is chosen for compatibility with ipfs/go-cid - noting that go-cid is, for the moment, comparable.

It's annoying that go-cid doesn't implement cbor.Marshaler/cbor.Unmarshaler because the binary representation of a CID and the CBOR canonical representation are quite different:

Unfortunately, we intend to leverage the CAR v1 spec for compatibility, which, oddly, specifies using the CBOR form of the CID in the header and the raw byte form in the body. Thus, our interface here needs to be able support both.

type BlockIdRef

type BlockIdRef[T BlockId] interface {
	*T
	encoding.BinaryUnmarshaler
	json.Unmarshaler
	cbor.Unmarshaler
	Read(ByteAndBlockReader) (int, error)
}

BlockIdRef is a reference to a BlockId. In Go, you are not supposed to have a mix of pointer and non-pointer receivers for the same interface. go-cid has a mix of pointer and non-pointer receivers. Since unmarshaling a BlockId will require a pointer receiver in order to update the BlockId, we needed a separate interface.

type BlockReceiver

type BlockReceiver[I BlockId] interface {
	// HandleBlock is called on receipt of a new block.
	HandleBlock(block RawBlock[I])

	// HandleBlocks is called on receipt of a new batch of blocks.
	HandleBlocks(block []RawBlock[I])
}

BlockReceiver is responsible for receiving blocks at the Sink.

type BlockSender

type BlockSender[I BlockId] interface {
	// Send a block
	SendBlock(block RawBlock[I]) error
	// Ensure any blocks queued for sending are actually sent; should block until all blocks are sent.
	Flush() error
	// Close the sender gracefully, ensuring any pending blocks are flushed.
	Close() error
	// Len returns the number of blocks queued for sending.
	Len() int
}

BlockSender is responsible for sending blocks from the Source, immediately and asynchronously, or via a buffer. The details are up to the implementor.

type BlockStore

type BlockStore[I BlockId] interface {
	ReadableBlockStore[I]

	// Add adds a given block into the blockstore.
	Add(context.Context, RawBlock[I]) (Block[I], error)

	// AddMany adds a given set of blocks into the blockstore.
	AddMany(context.Context, []RawBlock[I]) ([]Block[I], error)
}

BlockStore represents read and write operations for a store of blocks.

type ByteAndBlockReader

type ByteAndBlockReader interface {
	io.Reader
	io.ByteReader
}

ByteAndBlockReader is an io.Reader that also implements io.ByteReader.

type Flags

type Flags constraints.Unsigned

Flags represent the internal state of a session.

type MutablePointerResolver

type MutablePointerResolver[I BlockId] interface {
	// Resolve attempts to resolve ptr into a block ID.
	Resolve(ptr string) (I, error)
}

MutablePointerResolver is responsible for resolving a pointer into a BlockId.

type Orchestrator

type Orchestrator[F Flags] interface {
	// Notify is used to notify the orchestrator of an event.
	Notify(SessionEvent) error
	// Get the current state of the Orchestrator.
	State() F
	// IsClosed returns true if the orchestrator is closed.
	IsClosed() bool

	// IsRequester returns true if the Orchestrator is the requester.
	IsRequester() bool

	// IsSafeStateToClose returns true if the state of the Orchestrator indicates that the session should close.
	// Other factors may need to be taken into account as well to determine if close should happen, but this
	// tells us if the states indicate a close is safe.
	IsSafeStateToClose() bool

	// ShouldFlush returns true if the state of the Orchestrator indicates that the session should flush.
	ShouldFlush() bool
}

Orchestrator is responsible for managing the flow of blocks and/or status. Typically the Orchestrator is some external object such as a connection which is aware of details which are specific to the implementation (such as whether the communication is synchronous or batch oriented).

type RawBlock

type RawBlock[I BlockId] interface {
	// Id returns the BlockId for the Block.
	Id() I
	// RawData returns the raw data bytes for the Block.
	RawData() []byte
	// Size returns the size of the block
	Size() int64
}

RawBlock represents a raw block before any association with a blockstore.

type ReadableBlockStore

type ReadableBlockStore[I BlockId] interface {
	// Get returns the block from the blockstore with the given ID.
	Get(context.Context, I) (Block[I], error)

	// Has returns true if the blockstore has a block with the given ID.
	Has(context.Context, I) (bool, error)

	// All returns a channel that will receive all of the block IDs in this store.
	All(context.Context) (<-chan I, error)
}

ReadableBlockStore represents read operations for a store of blocks.

type SessionEvent

type SessionEvent uint16

SessionEvent is an event that can occur during a session. When events occur, they trigger updates to session state. The Orchestrator, which is typically some object outside the session which is aware of implementation specific information about the channel over which communication is actually occuring, may also 'wait' on notification of an event.

const (
	BEGIN_SESSION    SessionEvent = iota // A session has begun
	END_SESSION                          // A session has ended
	BEGIN_DRAINING                       // No more data is currently available for processing
	END_DRAINING                         // Session has completed processing related to the draining event
	BEGIN_CLOSE                          // The session has been notified it should close when current processing is complete
	END_CLOSE                            // Immediate actions to start the session closing are complete
	BEGIN_FLUSH                          // Flush has been called on a SimpleBatchBlockSender to send a batch of blocks from Source to Sink
	END_FLUSH                            // Flush has completed
	BEGIN_SEND                           // On Source, an individual block has been sent to the BlockSender. On Sink, we are 'sending' status to the StatusAccumulator
	END_SEND                             // Send has completed
	BEGIN_RECEIVE                        // An individual block or status update is received
	END_RECEIVE                          // Receive operation is completed
	BEGIN_PROCESSING                     // Begin main processing loop
	END_PROCESSING                       // End main procesing loop
	// TODO: Change comments to allow BATCH to be about blocks and status too.
	BEGIN_BATCH   // SimpleBatchBlockReceiver has started processing a batch of blocks on the Sink
	END_BATCH     // Finished processing a batch of blocks
	BEGIN_ENQUEUE // Request made to start transfer of a specific block and its children
	END_ENQUEUE   // Enqueue is complete
	CANCEL        // Request made to immediately end session and abandon any current transfer
)

Core session event constants.

func (SessionEvent) String

func (se SessionEvent) String() string

String returns a string representation of the session event.

type SimpleStatusAccumulator

type SimpleStatusAccumulator[I BlockId] struct {
	// contains filtered or unexported fields
}

SimpleStatusAccumulator is a simple implementation of StatusAccumulator. Its operations are protected by a mutex, so it is safe to use from multiple goroutines.

func NewSimpleStatusAccumulator

func NewSimpleStatusAccumulator[I BlockId](filter filter.Filter[I]) *SimpleStatusAccumulator[I]

NewSimpleStatusAccumulator creates a new SimpleStatusAccumulator.

func (*SimpleStatusAccumulator[I]) Have

func (ssa *SimpleStatusAccumulator[I]) Have(id I) error

Have marks the given block as having been received.

func (*SimpleStatusAccumulator[I]) HaveCount

func (ssa *SimpleStatusAccumulator[I]) HaveCount() uint

HaveCount returns the number of blocks that have been received.

func (*SimpleStatusAccumulator[I]) Receive

func (ssa *SimpleStatusAccumulator[I]) Receive(id I) error

Receive marks the given block as received.

func (*SimpleStatusAccumulator[I]) Send

func (ssa *SimpleStatusAccumulator[I]) Send(sender StatusSender[I]) error

Send sends the current status using the given StatusSender.

func (*SimpleStatusAccumulator[I]) Want

func (ssa *SimpleStatusAccumulator[I]) Want(id I) error

Want marks the given block as wanted.

func (*SimpleStatusAccumulator[I]) WantCount

func (ssa *SimpleStatusAccumulator[I]) WantCount() uint

WantCount returns the number of blocks that are currently wanted.

type SinkSession

type SinkSession[
	I BlockId,
	F Flags,
] struct {
	// contains filtered or unexported fields
}

SinkSession is a session that receives blocks and sends out status updates. Two type parameters must be supplied, which are the concrete type of the BlockId and the session State

func NewSinkSession

func NewSinkSession[I BlockId, F Flags](
	store BlockStore[I],
	statusAccumulator StatusAccumulator[I],
	orchestrator Orchestrator[F],
	stats stats.Stats,
	maxBlocksPerRound uint32,
	requester bool,
) *SinkSession[I, F]

NewSinkSession creates a new SinkSession.

func (*SinkSession[I, F]) AccumulateStatus

func (ss *SinkSession[I, F]) AccumulateStatus(id I) error

Accumulates the status of the block with the given id and all of its children.

func (*SinkSession[I, F]) Cancel

func (ss *SinkSession[I, F]) Cancel() error

Cancel cancels the session. The session does not *immediately* terminate; the orchestrator plays a role in deciding this. However, transfers in progress will not usually complete.

func (*SinkSession[I, F]) Close

func (ss *SinkSession[I, F]) Close() error

Closes the sink session. Note that the session does not close immediately; this method will return before the session is closed. Exactly *when* the session closes is determined by the orchestrator, but in general this should be only after the session has completed all transfers which are currently in progresss.

func (*SinkSession[I, F]) Done

func (ss *SinkSession[I, F]) Done() <-chan error

Done returns an error channel which will be closed when the session is complete.

func (*SinkSession[I, F]) Enqueue

func (ss *SinkSession[I, F]) Enqueue(id I) error

Enqueue a block for transfer (will retrieve block and children from the related source session)

func (*SinkSession[I, F]) HandleBlock

func (ss *SinkSession[I, F]) HandleBlock(rawBlock RawBlock[I])

HandleBlock handles a block that is being received. Adds the block to the session's store, and then queues the block for further processing.

func (*SinkSession[I, F]) HandleBlocks

func (ss *SinkSession[I, F]) HandleBlocks(rawBlocks []RawBlock[I])

HandleBlocks handles a slice of blocks that are being received. Adds the blocks to the session's store, and then queues the blocks for further processing.

func (*SinkSession[I, F]) Info

func (ss *SinkSession[I, F]) Info() *SinkSessionInfo[F]

Get information about this session

func (*SinkSession[I, F]) IsClosed

func (ss *SinkSession[I, F]) IsClosed() bool

IsClosed returns true if the session is closed.

func (*SinkSession[I, F]) Orchestrator

func (ss *SinkSession[I, F]) Orchestrator() Orchestrator[F]

Get the orchestrator for this session

func (*SinkSession[I, F]) Run

func (ss *SinkSession[I, F]) Run(
	statusSender StatusSender[I],
)

Runs the receiver session. Pulls blocks from the queue of received blocks, then checks the block descendents to see if any are already present in the store, accumulating status accordingly. Terminates when the orchestrator's IsClosed method returns true.

func (*SinkSession[I, F]) Started

func (ss *SinkSession[I, F]) Started() <-chan bool

Started returns a bool channel which will receive a value when the session has started.

type SinkSessionInfo

type SinkSessionInfo[F Flags] struct {
	PendingBlocksCount uint // Number of received blocks currently pending processing
	State              F    // State from Orchestrator
	HavesEstimate      uint // Estimate of the number of 'haves' accumulated since status last sent
	Wants              uint // Number of 'wants' accumulated since status last sent
}

Struct for returning summary information about the session. This is intended to allow implementers to provide users with information concerning transfers in progress.

func (*SinkSessionInfo[F]) String

func (inf *SinkSessionInfo[F]) String() string

Default string representation of SinkSessionInfo

type SourceSession

type SourceSession[
	I BlockId,
	F Flags,
] struct {
	// contains filtered or unexported fields
}

SinkSession is a session that sends blocks and receives status updates. Two type parameters must be supplied, which are the concrete type of the BlockId and the session State

func NewSourceSession

func NewSourceSession[I BlockId, F Flags](
	store BlockStore[I],
	filter filter.Filter[I],
	orchestrator Orchestrator[F],
	stats stats.Stats,
	maxBlocksPerRound uint32,
	maxBlocksPerColdCall uint32,
	requester bool,
) *SourceSession[I, F]

NewSourceSession creates a new SourceSession.

func (*SourceSession[I, F]) Cancel

func (ss *SourceSession[I, F]) Cancel() error

Cancel cancels the session. The session does not *immediately* terminate; the orchestrator plays a role in deciding this. However, transfers in progress will not usually complete.

func (*SourceSession[I, F]) Close

func (ss *SourceSession[I, F]) Close() error

Closes the source session. Note that the session does not close immediately; this method will return before the session is closed. Exactly *when* the session closes is determined by the orchestrator, but in general this should be only after the session has completed all transfers which are currently in progresss.

func (*SourceSession[I, F]) Done

func (ss *SourceSession[I, F]) Done() <-chan error

Done returns an error channel that will be closed when the session is closed. If the session is closed due to an error, the error will be sent on the channel.

func (*SourceSession[I, F]) Enqueue

func (ss *SourceSession[I, F]) Enqueue(id I) error

Enqueue enqueues a block id to be sent.

func (*SourceSession[I, F]) HandleStatus

func (ss *SourceSession[I, F]) HandleStatus(
	have filter.Filter[I],
	want []I,
)

HandleStatus handles incoming status, updating the filter and pending blocks list.

func (*SourceSession[I, F]) Info

func (ss *SourceSession[I, F]) Info() *SourceSessionInfo[F]

Retrieve information about this session

func (*SourceSession[I, F]) IsClosed

func (ss *SourceSession[I, F]) IsClosed() bool

IsClosed returns true if the session is closed.

func (*SourceSession[I, F]) NumBlocksToSend

func (ss *SourceSession[I, F]) NumBlocksToSend() int

func (*SourceSession[I, F]) Orchestrator

func (ss *SourceSession[I, F]) Orchestrator() Orchestrator[F]

Retrieve the current session state

func (*SourceSession[I, F]) Run

func (ss *SourceSession[I, F]) Run(
	blockSender BlockSender[I],
)

Runs the sender session. Polls the queue of pending blocks, and sends the next block and then queues the children of that block to be sent. Only sends a block if it does not match the Filter, or if it is specifically requested via Enqueue or included as a 'want' in received status.

func (*SourceSession[I, F]) Started

func (ss *SourceSession[I, F]) Started() <-chan bool

Started returns a channel that will be closed when the session is started.

type SourceSessionInfo

type SourceSessionInfo[F Flags] struct {
	PendingBlocksCount uint // Number of blocks awaiting transmission
	State              F    // Internal sessions state provided by the Orchestraotr
	HavesEstimate      uint // Estimate of the number of specific block Ids we know are present on the Sink
}

Struct for returting summary information about a Source Session. This is allows implementers to provide user feedback on the state of in-progress transfers.

func (*SourceSessionInfo[F]) String

func (inf *SourceSessionInfo[F]) String() string

Default user-friendly representation of SourceSessionInfo

type StatusAccumulator

type StatusAccumulator[I BlockId] interface {
	// Have records that the Sink has a block.
	Have(I) error
	// Get the number of unique Haves since the last Send
	HaveCount() uint
	// Want records that the Sink wants a block.
	Want(I) error
	// Get the number of unique Wants since the last Send
	WantCount() uint

	// Send sends the status to the StatusSender.
	Send(StatusSender[I]) error

	// Receive records that the block id has been received, updating any accumulated status as a result.
	Receive(I) error
}

StatusAccumulator is responsible for collecting status on the Sink so that it can be sent to the Source.

type StatusReceiver

type StatusReceiver[I BlockId] interface {
	// HandleStatus is called on receipt of a new status.
	// The have filter is a lossy filter of the blocks that the Sink has.
	// The want list is a list of blocks that the Sink wants.
	HandleStatus(have filter.Filter[I], want []I)
}

StatusReceiver is responsible for receiving status on the Source.

type StatusSender

type StatusSender[I BlockId] interface {
	// SendStatus sends the status to the SourceSession.
	// The have filter is a lossy filter of the blocks that the SinkSession has.
	// The want list is a list of blocks that the SinkSession wants.
	SendStatus(have filter.Filter[I], want []I) error

	// Close the Status Sender
	Close() error
}

StatusSender is responsible for sending status from the Sink. The key intuition of CAR Mirror is information about blocks already present on a Sink can be sent efficiently using a lossy filter. We also need to be able to request specific blocks from the source. 'Status' is therefore formally a Filter of blocks the source already have and a list of blocks the source definitely wants.

type SynchronizedBlockStore

type SynchronizedBlockStore[I BlockId] struct {
	// contains filtered or unexported fields
}

SynchronizedBlockStore is a BlockStore that is also thread-safe.

func NewSynchronizedBlockStore

func NewSynchronizedBlockStore[I BlockId](store BlockStore[I]) *SynchronizedBlockStore[I]

NewSynchronizedBlockStore creates a new SynchronizedBlockStore.

func (*SynchronizedBlockStore[I]) Add

func (bs *SynchronizedBlockStore[I]) Add(ctx context.Context, block RawBlock[I]) (Block[I], error)

Add adds a given RawBlock to the synchronized blockstore, returnng the Block.

func (*SynchronizedBlockStore[I]) AddMany

func (bs *SynchronizedBlockStore[I]) AddMany(ctx context.Context, blocks []RawBlock[I]) ([]Block[I], error)

AddMany adds a given set of RawBlocks to the synchronized blockstore, returning the Blocks.

func (*SynchronizedBlockStore[I]) All

func (bs *SynchronizedBlockStore[I]) All(ctx context.Context) (<-chan I, error)

All returns a channel that will receive all of the block IDs in this synchronized blockstore.

func (*SynchronizedBlockStore[I]) Get

func (bs *SynchronizedBlockStore[I]) Get(ctx context.Context, id I) (Block[I], error)

Get returns the block from the synchronized blockstore with the given ID.

func (*SynchronizedBlockStore[I]) Has

func (bs *SynchronizedBlockStore[I]) Has(ctx context.Context, id I) (bool, error)

Has returns true if the synchronized blockstore has a block with the given ID.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL