Documentation ¶
Index ¶
- Constants
- Variables
- func IntMax(a int, b int) int
- func IntMin(a int, b int) int
- func Uint64Max(a uint64, b uint64) uint64
- func Uint64Min(a uint64, b uint64) uint64
- type ChunkQueue
- func (h *ChunkQueue) Chunk(idx int) *types.Chunk
- func (h *ChunkQueue) Len() int
- func (h *ChunkQueue) Less(i, j int) bool
- func (h *ChunkQueue) Lock()
- func (h *ChunkQueue) Peek() *types.Chunk
- func (h *ChunkQueue) Pop() interface{}
- func (h *ChunkQueue) Push(x interface{})
- func (h *ChunkQueue) Swap(i, j int)
- func (h *ChunkQueue) Unlock()
- type LineageDifferenceRank
- type LineageStorage
- func (s *LineageStorage) ClearBackup()
- func (s *LineageStorage) Commit() (*types.CommitOption, error)
- func (s *LineageStorage) ConfigS3(bucket string, prefix string)
- func (s *LineageStorage) Del(key string, reason string) *types.OpRet
- func (s *LineageStorage) IsConsistent(meta *types.LineageMeta) (bool, error)
- func (s *LineageStorage) Keys() <-chan string
- func (s *LineageStorage) Len() int
- func (s *LineageStorage) Recover(meta *types.LineageMeta) (bool, <-chan error)
- func (s *LineageStorage) StartTracker()
- func (s *LineageStorage) Status(short bool) (confirmedTerm uint64, status types.LineageStatus)
- func (s *LineageStorage) StopTracker() error
- func (s *LineageStorage) Validate(meta *types.LineageMeta) (types.LineageValidationResult, error)
- type PersistHelper
- type PersistentStorage
- type PersistentStorageSignal
- type SimpleDifferenceRank
- type Storage
- func (s *Storage) ConfigLogger(lvl int, color bool)
- func (s *Storage) Del(key string, reason string) *types.OpRet
- func (s *Storage) Get(key string) (string, []byte, *types.OpRet)
- func (s *Storage) GetStream(key string) (string, resp.AllReadCloser, *types.OpRet)
- func (s *Storage) Id() uint64
- func (s *Storage) Keys() <-chan string
- func (s *Storage) Len() int
- func (s *Storage) Meta() types.StorageMeta
- func (s *Storage) Set(key string, chunkId string, val []byte) *types.OpRet
- func (s *Storage) SetStream(key string, chunkId string, valReader resp.AllReadCloser) *types.OpRet
- type StorageHelper
- type StorageMeta
- func (m *StorageMeta) BackupSize() uint64
- func (m *StorageMeta) Calibrate()
- func (m *StorageMeta) Capacity() uint64
- func (m *StorageMeta) DecreaseSize(dec uint64) uint64
- func (m *StorageMeta) Effective() uint64
- func (m *StorageMeta) IncreaseSize(inc uint64) uint64
- func (m *StorageMeta) ModifiedSize() uint64
- func (m *StorageMeta) Reserved() uint64
- func (m *StorageMeta) Size() uint64
- func (m *StorageMeta) System() uint64
- func (m *StorageMeta) Waterline() uint64
- type StorageSignal
Constants ¶
const ( LINEAGE_KEY = "%s%s/lineage-%d" SNAPSHOT_KEY = "%s%s/snapshot-%d.gz" RECOVERING_NONE uint32 = 0x00 RECOVERING_MAIN uint32 = 0x01 RECOVERING_BACKUP uint32 = 0x02 LineageStorageOverhead = StorageOverhead // Reuse StorageOverhead BackupStoreageReservation = 0.1 // 1/N * 2, N = 20, backups per lambda. MaximumCommitDelay = 1 * time.Second // 1 second )
const ( CHUNK_KEY = "%schunks/%s%s" // StorageSignalFlagForceCommit Indicate a PersistentStorage to commit regardless of any queued operations. StorageSignalFlagForceCommit = 0x0001 )
const ( CalibratePriorityNormal types.CalibratePriority = iota CalibratePriorityRecover CalibratePriorityMax = CalibratePriorityRecover )
const CalibrateFactor = 0.9
const (
StorageOverhead = 100000000 // 100 MB
)
Variables ¶
var ( Backups = 10 // Updated: 6/30/2010 // Try download as few as lineage file in batch: try one snapshot each term SnapshotInterval = uint64(1) // To minimize lineage recovery latency, try download the same file with multiple contenders. LineageRecoveryContenders = 3 // Errors ErrRecovering = errors.New("already recovering") ErrRecovered = errors.New("already recovered") ErrRecoveryInterrupted = errors.New("recovery interrupted") ErrBackupSetForbidden = errors.New("forbidden to set backup objects") )
var ( Concurrency = types.DownloadConcurrency Buckets = 1 Hasher = &util.Hasher{} Segmentor = regexp.MustCompile(`[0-9a-f]{2}`) ErrTrackerNotStarted = errors.New("tracker not started") )
var ( FunctionPrefix string FunctionPrefixMatcher = regexp.MustCompile(`\d+$`) ContextKeyLog = "log" ErrOOStorage = errors.New("out of storage") )
var (
// SimpleDifferenceRankSignificanceRatio diffrank parameter
SimpleDifferenceRankSignificanceRatio = 1.0
)
Functions ¶
Types ¶
type ChunkQueue ¶
type ChunkQueue struct {
// contains filtered or unexported fields
}
func (*ChunkQueue) Len ¶
func (h *ChunkQueue) Len() int
func (*ChunkQueue) Less ¶
func (h *ChunkQueue) Less(i, j int) bool
func (*ChunkQueue) Lock ¶
func (h *ChunkQueue) Lock()
func (*ChunkQueue) Peek ¶
func (h *ChunkQueue) Peek() *types.Chunk
func (*ChunkQueue) Pop ¶
func (h *ChunkQueue) Pop() interface{}
func (*ChunkQueue) Push ¶
func (h *ChunkQueue) Push(x interface{})
func (*ChunkQueue) Swap ¶
func (h *ChunkQueue) Swap(i, j int)
func (*ChunkQueue) Unlock ¶
func (h *ChunkQueue) Unlock()
type LineageDifferenceRank ¶
type LineageDifferenceRank interface { Reset(float64) AddOp(*types.LineageOp) IsSignificant(float64) bool Rank() float64 }
LineageDifferenceRank Metric used to decide if a fast recovery should be requested.
type LineageStorage ¶
type LineageStorage struct { *PersistentStorage // contains filtered or unexported fields }
Storage with lineage
func NewLineageStorage ¶
func NewLineageStorage(id uint64, cap uint64) *LineageStorage
func (*LineageStorage) ClearBackup ¶
func (s *LineageStorage) ClearBackup()
func (*LineageStorage) Commit ¶
func (s *LineageStorage) Commit() (*types.CommitOption, error)
func (*LineageStorage) ConfigS3 ¶
func (s *LineageStorage) ConfigS3(bucket string, prefix string)
Lineage Implementation
func (*LineageStorage) IsConsistent ¶
func (s *LineageStorage) IsConsistent(meta *types.LineageMeta) (bool, error)
func (*LineageStorage) Keys ¶
func (s *LineageStorage) Keys() <-chan string
func (*LineageStorage) Len ¶
func (s *LineageStorage) Len() int
func (*LineageStorage) Recover ¶
func (s *LineageStorage) Recover(meta *types.LineageMeta) (bool, <-chan error)
Recover based on the term of specified meta. We support partial recovery. Errors during recovery will be sent to returned channel. The recovery ends if returned channel is closed. If the first return value is false, no fast recovery is needed.
func (*LineageStorage) StartTracker ¶
func (s *LineageStorage) StartTracker()
func (*LineageStorage) Status ¶
func (s *LineageStorage) Status(short bool) (confirmedTerm uint64, status types.LineageStatus)
Status returns the status of the storage. If short is specified, returns nil if all terms confirmed, or returns the meta of main storage only.
func (*LineageStorage) StopTracker ¶
func (s *LineageStorage) StopTracker() error
func (*LineageStorage) Validate ¶
func (s *LineageStorage) Validate(meta *types.LineageMeta) (types.LineageValidationResult, error)
type PersistHelper ¶
type PersistHelper interface {
// contains filtered or unexported methods
}
type PersistentStorage ¶
type PersistentStorage struct { *Storage // contains filtered or unexported fields }
PersistentStorage Storage with S3 as persistent layer
func NewPersistentStorage ¶
func NewPersistentStorage(id uint64, cap uint64) *PersistentStorage
func (*PersistentStorage) ConfigS3 ¶
func (s *PersistentStorage) ConfigS3(bucket string, prefix string)
func (*PersistentStorage) SetRecovery ¶
func (*PersistentStorage) StartTracker ¶
func (s *PersistentStorage) StartTracker()
func (*PersistentStorage) StopTracker ¶
func (s *PersistentStorage) StopTracker() error
type PersistentStorageSignal ¶
type PersistentStorageSignal uint32
func (PersistentStorageSignal) Flags ¶
func (sig PersistentStorageSignal) Flags() uint32
type SimpleDifferenceRank ¶
type SimpleDifferenceRank struct {
// contains filtered or unexported fields
}
SimpleDifferenceRank only count the number of object stored. Only set operations are considered. The significant change is considered if difference of operations is larger than the number of backups, which means
changes / backups >= 1
For now, we assume # of backups will be around 10. It is reasonable to change to a smaller value if backup increases.
func NewSimpleDifferenceRank ¶
func NewSimpleDifferenceRank(backups int) *SimpleDifferenceRank
NewSimpleDifferenceRank Create a SimpleDifferenceRank instance.
func (*SimpleDifferenceRank) AddOp ¶
func (dr *SimpleDifferenceRank) AddOp(op *types.LineageOp)
AddOp Notify rank that an operation is performed.
func (*SimpleDifferenceRank) IsSignificant ¶
func (dr *SimpleDifferenceRank) IsSignificant(rank float64) bool
IsSignificant Determind if specified rank is significant different from the caller.
func (*SimpleDifferenceRank) Rank ¶
func (dr *SimpleDifferenceRank) Rank() float64
Rank Get the rank in numeric form.
func (*SimpleDifferenceRank) Reset ¶
func (dr *SimpleDifferenceRank) Reset(rank float64)
Reset Reset rank
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage with lineage
func NewStorage ¶
func (*Storage) ConfigLogger ¶
func (*Storage) Meta ¶
func (s *Storage) Meta() types.StorageMeta
type StorageHelper ¶
type StorageHelper interface {
// contains filtered or unexported methods
}
type StorageMeta ¶
type StorageMeta struct { Cap uint64 // Capacity of the lambda. Overhead uint64 // Minimum overhead reserved. Rsrved uint64 // Other reserved storage capacity. // contains filtered or unexported fields }
func (*StorageMeta) BackupSize ¶
func (m *StorageMeta) BackupSize() uint64
func (*StorageMeta) Calibrate ¶
func (m *StorageMeta) Calibrate()
func (*StorageMeta) Capacity ¶
func (m *StorageMeta) Capacity() uint64
func (*StorageMeta) DecreaseSize ¶
func (m *StorageMeta) DecreaseSize(dec uint64) uint64
func (*StorageMeta) Effective ¶
func (m *StorageMeta) Effective() uint64
func (*StorageMeta) IncreaseSize ¶
func (m *StorageMeta) IncreaseSize(inc uint64) uint64
func (*StorageMeta) ModifiedSize ¶
func (m *StorageMeta) ModifiedSize() uint64
func (*StorageMeta) Reserved ¶
func (m *StorageMeta) Reserved() uint64
func (*StorageMeta) Size ¶
func (m *StorageMeta) Size() uint64
func (*StorageMeta) System ¶
func (m *StorageMeta) System() uint64
func (*StorageMeta) Waterline ¶
func (m *StorageMeta) Waterline() uint64
type StorageSignal ¶
type StorageSignal interface { // Flags flags of signal to customize behavior. Flags() uint32 }