Documentation ¶
Index ¶
- Constants
- Variables
- func EventsUnaryClientInterceptor(collector *events.Collector) grpc.UnaryClientInterceptor
- func GetHost(err error) string
- func GetJsonEncodedRequest(err error) (string, error)
- func GetRequest(err error) interface{}
- func GetRpc(err error) string
- func GetStatus(err error) *status.Status
- func HashSetToSlices(hashes hash.HashSet) ([]hash.Hash, [][]byte)
- func HashesToSlices(hashes []hash.Hash) [][]byte
- func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, ...) error
- func IsChunkStoreRpcErr(err error) bool
- func ParseByteSlices(byteSlices [][]byte) (hash.HashSet, map[hash.Hash]int)
- func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, ...) error
- func StatsFlusherToColorError(r StatsRecorder)
- type CacheStats
- type ChunkCache
- type ConcurrencyParams
- type DoltChunkStore
- func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error
- func (dcs *DoltChunkStore) Close() error
- func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error)
- func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error)
- func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, ...) error
- func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, ...) error
- func (dcs *DoltChunkStore) Has(ctx context.Context, h hash.Hash) (bool, error)
- func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error)
- func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error
- func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) error
- func (dcs *DoltChunkStore) Rebase(ctx context.Context) error
- func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error)
- func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger)
- func (dcs *DoltChunkStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error
- func (dcs *DoltChunkStore) Size(ctx context.Context) (uint64, error)
- func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, []nbs.TableFile, error)
- func (dcs *DoltChunkStore) Stats() interface{}
- func (dcs *DoltChunkStore) StatsSummary() string
- func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps
- func (dcs *DoltChunkStore) Version() string
- func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore
- func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams) *DoltChunkStore
- func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
- func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore
- func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, ...) error
- type DoltRemoteTableFile
- type DurationEstimator
- type DurationObserver
- type DynamicEstimator
- type EstimateStrategy
- type ExponentialHedgeStrategy
- type FixedHedgeStrategy
- type GetRange
- func (gr *GetRange) Append(other *GetRange)
- func (gr *GetRange) ChunkByteRange(i int) (uint64, uint64)
- func (gr *GetRange) ChunkEndOffset(i int) uint64
- func (gr *GetRange) ChunkStartOffset(i int) uint64
- func (gr *GetRange) GapBetween(i, j int) uint64
- func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, fetcher HTTPFetcher, ...) func() error
- func (gr *GetRange) NumBytesInRanges() uint64
- func (gr *GetRange) NumChunks() int
- func (gr *GetRange) RangeLen() uint64
- func (gr *GetRange) ResourcePath() string
- func (gr *GetRange) Sort()
- func (gr *GetRange) SplitAtGaps(maxGapBytes uint64) []*GetRange
- type HTTPFetcher
- type HedgeStrategy
- type Hedger
- type HistogramStatsRecorder
- func (r *HistogramStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)
- func (r *HistogramStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)
- func (r *HistogramStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)
- func (r *HistogramStatsRecorder) WriteSummaryTo(w io.Writer) error
- type MinHedgeStrategy
- type NoopObserver
- type NullStatsRecorder
- func (NullStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)
- func (NullStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)
- func (NullStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)
- func (NullStatsRecorder) WriteSummaryTo(io.Writer) error
- type PercentileEstimator
- type RpcError
- type Sizer
- type StatsRecorder
- type Work
Constants ¶
const HedgeDownloadSizeLimit = 4 * 1024 * 1024
Only hedge downloads of ranges < 4MB in length for now.
const MaxFetchSize = 128 * 1024 * 1024
Variables ¶
var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path")
var ErrRemoteTableFileGet = errors.New("HTTP GET for remote table file failed")
var ErrUploadFailed = errors.New("upload failed")
var HttpError = errors.New("http")
var MaxHedgesPerRequest = 1
var StatsFactory func() StatsRecorder = NullStatsRecorderFactory
var StatsFlusher func(StatsRecorder) = func(StatsRecorder) {}
Functions ¶
func EventsUnaryClientInterceptor ¶
func EventsUnaryClientInterceptor(collector *events.Collector) grpc.UnaryClientInterceptor
func GetJsonEncodedRequest ¶
func GetRequest ¶
func GetRequest(err error) interface{}
func HashSetToSlices ¶
HashSetToSlices takes a HashSet and converts it to a slice of hashes, and a slice of byte slices
func HashesToSlices ¶
HashesToSlices takes a list of hashes and converts each hash to a byte slice returning a slice of byte slices
func HttpPostUpload ¶
func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, contentHash []byte, contentLength int64, body io.ReadCloser) error
func IsChunkStoreRpcErr ¶
func ParseByteSlices ¶
ParseByteSlices takes a slice of byte slices and converts it to a HashSet, and a map from hash to it's index in the original slice
func RetryingUnaryClientInterceptor ¶
func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error
func StatsFlusherToColorError ¶
func StatsFlusherToColorError(r StatsRecorder)
Types ¶
type CacheStats ¶
type CacheStats interface {
CacheHits() uint32
}
type ChunkCache ¶
type ChunkCache interface { // Put puts a slice of chunks into the cache. Put(c []nbs.CompressedChunk) bool // Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty. // is put in it's place Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk // Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it. Has(h hash.HashSet) (absent hash.HashSet) // PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully // and false is returned if that chunk is already is the cache. PutChunk(chunk nbs.CompressedChunk) bool // GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache // between the last time GetAndClearChunksToFlush was called and now. GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk }
ChunkCache is an interface used for caching chunks
type ConcurrencyParams ¶
type DoltChunkStore ¶
type DoltChunkStore struct {
// contains filtered or unexported fields
}
func NewDoltChunkStore ¶
func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoName, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error)
func NewDoltChunkStoreFromPath ¶
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error)
func (*DoltChunkStore) AddTableFilesToManifest ¶
func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error
AddTableFilesToManifest adds table files to the manifest
func (*DoltChunkStore) Close ¶
func (dcs *DoltChunkStore) Close() error
Close tears down any resources in use by the implementation. After Close(), the ChunkStore may not be used again. It is NOT SAFE to call Close() concurrently with any other ChunkStore method; behavior is undefined and probably crashy.
func (*DoltChunkStore) Commit ¶
Commit atomically attempts to persist all novel Chunks and update the persisted root hash from last to current (or keeps it the same). If last doesn't match the root in persistent storage, returns false.
func (*DoltChunkStore) Get ¶
Get the Chunk for the value of the hash in the store. If the hash is absent from the store EmptyChunk is returned.
func (*DoltChunkStore) GetManyCompressed ¶
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.CompressedChunk)) error
GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks which have been found. Any non-present chunks will silently be ignored.
func (*DoltChunkStore) Has ¶
Returns true iff the value at the address |h| is contained in the store
func (*DoltChunkStore) HasMany ¶
Returns a new HashSet containing any members of |hashes| that are absent from the store.
func (*DoltChunkStore) PruneTableFiles ¶
func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error
PruneTableFiles deletes old table files that are no longer referenced in the manifest.
func (*DoltChunkStore) Put ¶
Put caches c. Upon return, c must be visible to subsequent Get and Has calls, but must not be persistent until a call to Flush(). Put may be called concurrently with other calls to Put(), Get(), GetMany(), Has() and HasMany().
func (*DoltChunkStore) Rebase ¶
func (dcs *DoltChunkStore) Rebase(ctx context.Context) error
Rebase brings this ChunkStore into sync with the persistent storage's current root.
func (*DoltChunkStore) Root ¶
Root returns the root of the database as of the time the ChunkStore was opened or the most recent call to Rebase.
func (*DoltChunkStore) SetLogger ¶
func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger)
func (*DoltChunkStore) SetRootChunk ¶
SetRootChunk changes the root chunk hash from the previous value to the new root.
func (*DoltChunkStore) Sources ¶
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, []nbs.TableFile, error)
Sources retrieves the current root hash, a list of all the table files (which may include appendix table files) and a list of only appendix table files
func (*DoltChunkStore) Stats ¶
func (dcs *DoltChunkStore) Stats() interface{}
Stats may return some kind of struct that reports statistics about the ChunkStore instance. The type is implementation-dependent, and impls may return nil
func (*DoltChunkStore) StatsSummary ¶
func (dcs *DoltChunkStore) StatsSummary() string
StatsSummary may return a string containing summarized statistics for this ChunkStore. It must return "Unsupported" if this operation is not supported.
func (*DoltChunkStore) SupportedOperations ¶
func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps
func (*DoltChunkStore) Version ¶
func (dcs *DoltChunkStore) Version() string
Returns the NomsVersion with which this ChunkSource is compatible.
func (*DoltChunkStore) WithChunkCache ¶
func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore
func (*DoltChunkStore) WithDownloadConcurrency ¶
func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams) *DoltChunkStore
func (*DoltChunkStore) WithHTTPFetcher ¶
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
func (*DoltChunkStore) WithNoopChunkCache ¶
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore
func (*DoltChunkStore) WriteTableFile ¶
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
type DoltRemoteTableFile ¶
type DoltRemoteTableFile struct {
// contains filtered or unexported fields
}
DoltRemoteTableFile is an implementation of a TableFile that lives in a DoltChunkStore
func (DoltRemoteTableFile) FileID ¶
func (drtf DoltRemoteTableFile) FileID() string
FileID gets the id of the file
func (DoltRemoteTableFile) NumChunks ¶
func (drtf DoltRemoteTableFile) NumChunks() int
NumChunks returns the number of chunks in a table file
func (DoltRemoteTableFile) Open ¶
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error)
Open returns an io.ReadCloser which can be used to read the bytes of a table file.
type DurationEstimator ¶
type DurationEstimator interface { // Duration returns the expected |time.Duration| of a Work with |Size| |sz|. Duration(sz uint64) time.Duration }
DurationEstimator returns an estimated duration given the size of a Work
type DurationObserver ¶
type DurationObserver interface { // Observe is called by |Hedger| when work is completed. |sz| is the |Size| // of the work. |n| specifies which |Run| called by the Hedger completed, // with 1 being the first |Run|. |d| is the duration the |Run| function // took for the |Run| that completed. |err| is any |error| returned from // |Run|. Observe(sz uint64, n int, d time.Duration, err error) }
DurationObserver observes Work completions
type DynamicEstimator ¶
type DynamicEstimator interface { DurationEstimator DurationObserver }
DynamicEstimator returns an estimated |Duration| for Work that is dynamically updated by observations from |Observe|.
type EstimateStrategy ¶
type EstimateStrategy struct {
// contains filtered or unexported fields
}
EstimateStrategy wraps a DurationEstimator. It wants to hedge Work |Run|'s when it takes longer than the Work estimate.
func NewEstimateStrategy ¶
func NewEstimateStrategy(e DurationEstimator) *EstimateStrategy
NewEstimateStrategy returns a new EstimateStrategy
type ExponentialHedgeStrategy ¶
type ExponentialHedgeStrategy struct {
// contains filtered or unexported fields
}
ExponentialHedgeStrategy increases the |underlying|'s nextTry by a factor of two for every hedge attempt, including the first attempt.
func NewExponentialHedgeStrategy ¶
func NewExponentialHedgeStrategy(u HedgeStrategy) *ExponentialHedgeStrategy
NewExponentialHedgeStrategy returns a new ExponentialHedgeStrategy
type FixedHedgeStrategy ¶
FixedHedgeStrategy always returns |FixedNextTry| from |NextTry|
func NewFixedHedgeStrategy ¶
func NewFixedHedgeStrategy(fixedNextTry time.Duration) *FixedHedgeStrategy
NewFixedHedgeStrategy returns a new FixedHedgeStrategy
type GetRange ¶
type GetRange remotesapi.HttpGetRange
func (*GetRange) ChunkEndOffset ¶
func (*GetRange) ChunkStartOffset ¶
func (*GetRange) GapBetween ¶
func (*GetRange) GetDownloadFunc ¶
func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, fetcher HTTPFetcher, chunkChan chan nbs.CompressedChunk, pathToUrl resourcePathToUrlFunc) func() error
func (*GetRange) NumBytesInRanges ¶
func (*GetRange) ResourcePath ¶
func (*GetRange) SplitAtGaps ¶
type HedgeStrategy ¶
type HedgeStrategy interface { // NextTry determines how long to wait before hedging a Work's |Run| // function. |sz| is the |Size| of the Work, |elapsed| is the time since the // first |Run| was called, and |n| is the hedge number starting from 1. NextTry(sz uint64, elapsed time.Duration, n int) time.Duration }
HedgeStrategy is used by Hedger to decide when to launch concurrent |Run| calls.
type Hedger ¶
type Hedger struct {
// contains filtered or unexported fields
}
Hedger can |Do| Work, potentially invoking Work |Run|'s more than once concurrently if it is taking longer than |strat|'s |NextTry| duration. Completed Work gets reported to the DurationObserver.
var DownloadHedger *Hedger
func NewHedger ¶
func NewHedger(maxOutstanding int64, strat HedgeStrategy, observer DurationObserver) *Hedger
NewHedger returns a new Hedger. |maxOutstanding| is the most hedged |Run|'s that can be outstanding. If a |Run| would be hedged, but there are already maxOutstanding hedged |Run|'s, nothing happens instead. |strat| is the HedgeStrategy to use for this hedger. |observer| is a DurationObserver that will receive |Observe|'s when a Work completes.
type HistogramStatsRecorder ¶
type HistogramStatsRecorder struct {
// contains filtered or unexported fields
}
func NewHistorgramStatsRecorder ¶
func NewHistorgramStatsRecorder() *HistogramStatsRecorder
func (*HistogramStatsRecorder) RecordDownloadAttemptStart ¶
func (r *HistogramStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)
func (*HistogramStatsRecorder) RecordDownloadComplete ¶
func (r *HistogramStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)
func (*HistogramStatsRecorder) RecordTimeToFirstByte ¶
func (r *HistogramStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)
func (*HistogramStatsRecorder) WriteSummaryTo ¶
func (r *HistogramStatsRecorder) WriteSummaryTo(w io.Writer) error
type MinHedgeStrategy ¶
type MinHedgeStrategy struct {
// contains filtered or unexported fields
}
MinHedgeStrategy bounds an underlying strategy's NextTry duration to be above |min|.
func NewMinHedgeStrategy ¶
func NewMinHedgeStrategy(min time.Duration, underlying HedgeStrategy) *MinHedgeStrategy
NewMinHedgeStrategy returns a new MinHedgeStrategy
type NoopObserver ¶
type NoopObserver struct { }
NoopObserver is a DurationObserver has a noop |Observe|
func NewNoopObserver ¶
func NewNoopObserver() *NoopObserver
NewNoopObserver returns a new NoopObserver
type NullStatsRecorder ¶
type NullStatsRecorder struct { }
func (NullStatsRecorder) RecordDownloadAttemptStart ¶
func (NullStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)
func (NullStatsRecorder) RecordDownloadComplete ¶
func (NullStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)
func (NullStatsRecorder) RecordTimeToFirstByte ¶
func (NullStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)
func (NullStatsRecorder) WriteSummaryTo ¶
func (NullStatsRecorder) WriteSummaryTo(io.Writer) error
type PercentileEstimator ¶
type PercentileEstimator struct { Percentile float64 // contains filtered or unexported fields }
PercentileEstimator is an DynamicEstimator which puts all |Observe| durations into a histogram and returns the current value of the provided |Percentile| in that histogram for the estimated |Duration|. |sz| is ignored.
func NewPercentileEstimator ¶
func NewPercentileEstimator(low, high time.Duration, perc float64) *PercentileEstimator
NewPercentileEstimator returns an initialized |PercentileEstimator|.
type RpcError ¶
type RpcError struct {
// contains filtered or unexported fields
}
func NewRpcError ¶
func (*RpcError) FullDetails ¶
type StatsRecorder ¶
type StatsRecorder interface { RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration) WriteSummaryTo(io.Writer) error }
func HistogramStatsRecorderFactory ¶
func HistogramStatsRecorderFactory() StatsRecorder
func NullStatsRecorderFactory ¶
func NullStatsRecorderFactory() StatsRecorder
type Work ¶
type Work struct { // Run is the function that will be called by |Hedger.Do|. It will be // called at least once, and possibly multiple times depending on how // long it takes and the |Hedger|'s |Strategy|. Run func(ctx context.Context, n int) (interface{}, error) // Size is an integer representation of the size of the work. // Potentially used by |Strategy|, not used by |Hedger|. Size uint64 }
Work is a description of work that can be hedged. The supplied Run function should expect to potentially be called multiple times concurrently, and it should respect |ctx| cancellation. |Size| will be used to estimate Run's duration. This estimate is also used to determine when a Work's Run should be hedged with concurrent Run call(s).