eestream

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: MIT Imports: 25 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error is the default eestream errs class.
	Error = errs.Class("eestream")

	// QuiescentError is the class of errors returned when a stream is quiescent
	// and should be restarted.
	QuiescentError = errs.Class("quiescence")
)

Functions

func CalcPieceSize

func CalcPieceSize(dataSize int64, scheme ErasureScheme) int64

CalcPieceSize calculates what would be the piece size of the encoded data after erasure coding data with dataSize using the given ErasureScheme.

func Decode

func Decode(rrs map[int]ranger.Ranger, es ErasureScheme, mbm int, forceErrorDetection bool) (ranger.Ranger, error)

Decode takes a map of Rangers and an ErasureScheme and returns a combined Ranger.

rrs is a map of erasure piece numbers to erasure piece rangers. mbm is the maximum memory (in bytes) to be allocated for read buffers. If set to 0, the minimum possible memory will be used. if forceErrorDetection is set to true then k+1 pieces will be always required for decoding, so corrupted pieces can be detected.

func DecodeReaders2 added in v1.6.0

func DecodeReaders2(ctx context.Context, cancel func(), rs map[int]io.ReadCloser, es ErasureScheme, expectedSize int64, mbm int, forceErrorDetection bool) io.ReadCloser

DecodeReaders2 takes a map of readers and an ErasureScheme returning a combined Reader.

rs is a map of erasure piece numbers to erasure piece streams. expectedSize is the number of bytes expected to be returned by the Reader. mbm is the maximum memory (in bytes) to be allocated for read buffers. If set to 0, the minimum possible memory will be used. if forceErrorDetection is set to true then k+1 pieces will be always required for decoding, so corrupted pieces can be detected.

func EncodeReader2 added in v1.6.0

func EncodeReader2(ctx context.Context, r io.Reader, rs RedundancyStrategy) (_ []io.ReadCloser, err error)

EncodeReader2 takes a Reader and a RedundancyStrategy and returns a slice of io.ReadClosers.

func NewFEC added in v1.12.2

func NewFEC(k, n int) (*infectious.FEC, error)

NewFEC creates a *FEC using k required pieces and n total pieces. Encoding data with this *FEC will generate n pieces, and decoding data requires k uncorrupted pieces. During decode, when more than k pieces exist, corrupted data can be detected and recovered from.

Types

type Batch added in v1.13.0

type Batch struct {
	// contains filtered or unexported fields
}

A Batch is a reference counted slice of erasure shares. Batches are returned by BatchPool.Get with a starting reference count of 1.

func (*Batch) Claim added in v1.13.0

func (b *Batch) Claim() bool

Claim adds 1 to the batch reference count and returns true if the batch was claimable. See Release.

func (*Batch) Release added in v1.13.0

func (b *Batch) Release()

Release subtracts 1 from the batch reference count, returning the batch to the pool when it hits zero. Future Claim calls will return false once the counter drops to zero.

func (*Batch) Slice added in v1.13.0

func (b *Batch) Slice() []byte

Slice returns the batch's underlying memory allocation.

type BatchPool added in v1.13.0

type BatchPool struct {
	// contains filtered or unexported fields
}

A BatchPool is a sync.Pool that deals with batches of erasure shares, serialized as []byte slices of a fixed size. The fixed size is the largest multiple of the erasure share size that fits in standardBufSize.

func NewBatchPool added in v1.13.0

func NewBatchPool(shareSize int) *BatchPool

NewBatchPool creates a BatchPool with the given erasure share size.

func (*BatchPool) GetAndClaim added in v1.13.0

func (b *BatchPool) GetAndClaim() *Batch

GetAndClaim returns a batch of the pool. To free the batch, a Dec() call is needed.

func (*BatchPool) Size added in v1.13.0

func (b *BatchPool) Size() int

Size returns the buffer size used in this pool.

type EncodedRanger

type EncodedRanger struct {
	// contains filtered or unexported fields
}

EncodedRanger will take an existing Ranger and provide a means to get multiple Ranged sub-Readers. EncodedRanger does not match the normal Ranger interface.

func NewEncodedRanger

func NewEncodedRanger(rr ranger.Ranger, rs RedundancyStrategy) (*EncodedRanger, error)

NewEncodedRanger from the given Ranger and RedundancyStrategy. See the comments for EncodeReader about the repair and success thresholds.

func (*EncodedRanger) OutputSize

func (er *EncodedRanger) OutputSize() int64

OutputSize is like Ranger.Size but returns the Size of the erasure encoded pieces that come out.

func (*EncodedRanger) Range

func (er *EncodedRanger) Range(ctx context.Context, offset, length int64) (_ []io.ReadCloser, err error)

Range is like Ranger.Range, but returns a slice of Readers.

type ErasureScheme

type ErasureScheme interface {
	// Encode will take 'in' and call 'out' with erasure coded pieces.
	Encode(in []byte, out func(num int, data []byte)) error

	// EncodeSingle will take 'in' with the stripe and fill 'out' with the erasure share for piece 'num'.
	EncodeSingle(in, out []byte, num int) error

	// Decode will take a mapping of available erasure coded piece num -> data,
	// 'in', and append the combined data to 'out', returning it.
	Decode(out []byte, in []infectious.Share) ([]byte, error)

	// Rebuild is a direct call to infectious.Rebuild, which does no error
	// detection and is faster.
	Rebuild(in []infectious.Share, out func(infectious.Share)) error

	// ErasureShareSize is the size of the erasure shares that come from Encode
	// and are passed to Decode.
	ErasureShareSize() int

	// StripeSize is the size the stripes that are passed to Encode and come
	// from Decode.
	StripeSize() int

	// Encode will generate this many erasure shares and therefore this many pieces.
	TotalCount() int

	// Decode requires at least this many pieces.
	RequiredCount() int
}

ErasureScheme represents the general format of any erasure scheme algorithm. If this interface can be implemented, the rest of this library will work with it.

func NewRSScheme

func NewRSScheme(fc *infectious.FEC, erasureShareSize int) ErasureScheme

NewRSScheme returns a Reed-Solomon-based ErasureScheme.

func NewUnsafeRSScheme

func NewUnsafeRSScheme(fc *infectious.FEC, erasureShareSize int) ErasureScheme

NewUnsafeRSScheme returns a Reed-Solomon-based ErasureScheme without error correction.

type PiecesProgress added in v1.13.0

type PiecesProgress struct {
	// contains filtered or unexported fields
}

PiecesProgress is an interesting concurrency primitive we don't know what to call, but it's kind of like those old clocks that employees would clock in and out of. Imagine that Looney Toons episode where Sam the Sheepdog and Wile E Coyote are clocking in and out.

In our case, PiecesProgress is two dimensional:

  • There is the neededShares vs total dimension - this is the number of shares that are necessary for Reed Solomon construction, for instance.
  • There is the current watermark of what stripe specific pieces are working on.

A bunch of piece readers will be keeping PiecesProgress updated with how many shares they have downloaded within their piece. When a piece reader tells PiecesProgress that they have downloaded n shares, and that was the trigger that means we now have the neededShares necessary shares at a certain watermark or stripe, PiecesProgress will tell the piece reader to wake up the combining layer.

This data structure is designed to cause these wakeups to happen as little as possible.

func NewPiecesProgress added in v1.13.0

func NewPiecesProgress(minimum, total int32) *PiecesProgress

NewPiecesProgress constructs PiecesProgress with a neededShares number of necessary shares per stripe, out of total total shares. PiecesProgress doesn't care about how many stripes there are but will keep track of which stripe each share reader is on.

func (*PiecesProgress) AcknowledgeNewStripes added in v1.13.0

func (y *PiecesProgress) AcknowledgeNewStripes()

AcknowledgeNewStripes tells PiecesProgress that the combiner has woken up and new alarms are okay to trigger.

func (*PiecesProgress) IncreaseNeededShares added in v1.13.0

func (y *PiecesProgress) IncreaseNeededShares() bool

IncreaseNeededShares tells PiecesProgress that going forward, we need more shares per watermark value.

func (*PiecesProgress) NeededShares added in v1.13.0

func (y *PiecesProgress) NeededShares() int32

NeededShares returns the current value of the number of shares required at a given watermark.

func (*PiecesProgress) PieceSharesReceived added in v1.13.0

func (y *PiecesProgress) PieceSharesReceived(idx int) int32

PieceSharesReceived returns the current watermark for reader idx.

func (*PiecesProgress) ProgressSnapshot added in v1.13.0

func (y *PiecesProgress) ProgressSnapshot(out []int32) []int32

ProgressSnapshot returns a snapshot of the current progress. No locks are held so it doesn't represent a single point in time in the presence of concurrent mutations.

func (*PiecesProgress) SetStripesNeeded added in v1.13.0

func (y *PiecesProgress) SetStripesNeeded(required int32)

SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next.

func (*PiecesProgress) SharesCompleted added in v1.13.0

func (y *PiecesProgress) SharesCompleted(idx int, delta int32) bool

SharesCompleted adds some read events to a given index. If SharesCompleted returns true, then the calling reader should wake up the combiner.

type RedundancyStrategy

type RedundancyStrategy struct {
	ErasureScheme
	// contains filtered or unexported fields
}

RedundancyStrategy is an ErasureScheme with a repair and optimal thresholds.

func NewRedundancyStrategy

func NewRedundancyStrategy(es ErasureScheme, repairThreshold, optimalThreshold int) (RedundancyStrategy, error)

NewRedundancyStrategy from the given ErasureScheme, repair and optimal thresholds.

repairThreshold is the minimum repair threshold. If set to 0, it will be reset to the TotalCount of the ErasureScheme. optimalThreshold is the optimal threshold. If set to 0, it will be reset to the TotalCount of the ErasureScheme.

func NewRedundancyStrategyFromProto

func NewRedundancyStrategyFromProto(scheme *pb.RedundancyScheme) (RedundancyStrategy, error)

NewRedundancyStrategyFromProto creates new RedundancyStrategy from the given RedundancyScheme protobuf.

func NewRedundancyStrategyFromStorj

func NewRedundancyStrategyFromStorj(scheme storj.RedundancyScheme) (RedundancyStrategy, error)

NewRedundancyStrategyFromStorj creates new RedundancyStrategy from the given storj.RedundancyScheme.

func (*RedundancyStrategy) OptimalThreshold

func (rs *RedundancyStrategy) OptimalThreshold() int

OptimalThreshold is the number of available erasure pieces above which there is no need for the data to be repaired.

func (*RedundancyStrategy) RepairThreshold

func (rs *RedundancyStrategy) RepairThreshold() int

RepairThreshold is the number of available erasure pieces below which the data must be repaired to avoid loss.

type Share added in v1.12.2

type Share = infectious.Share

A Share represents a piece of the FEC-encoded data.

type StreamingPiece added in v1.13.0

type StreamingPiece struct {
	// contains filtered or unexported fields
}

A StreamingPiece is an in memory storage location for a stream of bytes being operated on by a single producer and a single consumer in atomic units of a given erasure share size. The StreamingPiece type must know its full expected size up front, and allocates slots for each *BufPool batch of erasure shares up to that total size. It will hydrate these slots on demand and free them back to the BufPool as they are consumed.

func NewStreamingPiece added in v1.13.0

func NewStreamingPiece(shareSize int, totalSize int64, pool *BatchPool) *StreamingPiece

NewStreamingPiece creates a buffer that uses units of size unitSize, with a total amount of bytes of totalSize. It uses pool to hydrate and return buffers in its slots.

func (*StreamingPiece) Err added in v1.13.0

func (b *StreamingPiece) Err() error

Err returns the last error encountered during reading.

func (*StreamingPiece) MarkCompleted added in v1.13.0

func (b *StreamingPiece) MarkCompleted(sharesCompleted int)

MarkCompleted tells the StreamingPiece to return some internal batches back to the BatchPool, since we don't need them anymore. It will assume that none of the first sharesCompleted units will be asked for again.

func (*StreamingPiece) ReadShare added in v1.13.0

func (b *StreamingPiece) ReadShare(shareIdx int) (data []byte, release func(), err error)

ReadShare returns the byte slice that references the read data in a buffer representing the share with index shareIdx. Note that shareIdx is not the Reed Solomon Share Number, since all shares in this buffer share the same Reed Solomon Share Number. If a share at shareIdx cannot be returned, it will return an error, which may be a read error determined by ReadSharesFrom. The release callback must be released when the share is done being read from.

func (*StreamingPiece) ReadSharesFrom added in v1.13.0

func (b *StreamingPiece) ReadSharesFrom(r io.Reader) (shareCount int, done bool)

ReadSharesFrom is going to call r.Read() once, and will return the number of full shares that are now newly completely read as a result of this call. If r.Read() returns an error or io.EOF, or no data is expected otherwise, done will be true. The read error if any is available from Err() or ReadShare().

type StripeReader

type StripeReader struct {
	// contains filtered or unexported fields
}

StripeReader reads from a collection of piece io.ReadClosers in parallel, recombining them into a single stream using an ErasureScheme.

func NewStripeReader

func NewStripeReader(readers map[int]io.ReadCloser, scheme ErasureScheme, totalStripes int,
	errorDetection bool) *StripeReader

NewStripeReader makes a new StripeReader using the provided map of share number to io.ReadClosers, an ErasureScheme, the total number of stripes in the stream, and whether or not to use the Erasure Scheme's error detection.

func (*StripeReader) Close

func (s *StripeReader) Close() error

Close does *not* close the readers it received in the constructor. Close does *not* wait for reader goroutines to shut down. See CloseAndWait if you want other behavior. Close mimics the older eestream.StripeReader behavior.

func (*StripeReader) CloseAndWait added in v1.13.0

func (s *StripeReader) CloseAndWait() error

CloseAndWait closes all readers and waits for all goroutines.

func (*StripeReader) ReadStripes added in v1.12.2

func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out []byte) (_ []byte, count int, err error)

ReadStripes returns 1 or more stripes. out is overwritten.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL