Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type Curator
- type CuratorCtlHandler
- type CuratorSrvHandler
- func (h *CuratorSrvHandler) AckExtendBlob(req core.AckExtendBlobReq, reply *core.AckExtendBlobReply) error
- func (h *CuratorSrvHandler) CreateBlob(req core.CreateBlobReq, reply *core.CreateBlobReply) error
- func (h *CuratorSrvHandler) DeleteBlob(id core.BlobID, reply *core.Error) error
- func (h *CuratorSrvHandler) ExtendBlob(req core.ExtendBlobReq, reply *core.ExtendBlobReply) error
- func (h *CuratorSrvHandler) FixVersion(req core.FixVersionReq, reply *core.Error) error
- func (h *CuratorSrvHandler) GetTracts(req core.GetTractsReq, reply *core.GetTractsReply) error
- func (h *CuratorSrvHandler) ListBlobs(req core.ListBlobsReq, reply *core.ListBlobsReply) error
- func (h *CuratorSrvHandler) ReportBadTS(req core.ReportBadTSReq, reply *core.Error) error
- func (h *CuratorSrvHandler) SetMetadata(req core.SetMetadataReq, reply *core.Error) error
- func (h *CuratorSrvHandler) StatBlob(id core.BlobID, reply *core.StatBlobReply) error
- func (h *CuratorSrvHandler) UndeleteBlob(id core.BlobID, reply *core.Error) error
- type DyConfig
- type FailureDomainService
- type MasterConnection
- type RPCMasterConnection
- type RPCTractserverTalker
- func (t *RPCTractserverTalker) CheckTracts(addr string, tsid core.TractserverID, tracts []core.TractState) core.Error
- func (t *RPCTractserverTalker) CtlStatTract(addr string, tsid core.TractserverID, id core.TractID, version int) (reply core.StatTractReply)
- func (t *RPCTractserverTalker) GCTract(addr string, tsid core.TractserverID, old []core.TractState, ...) core.Error
- func (t *RPCTractserverTalker) PackTracts(addr string, tsid core.TractserverID, length int, tracts []*core.PackTractSpec, ...) (reply core.Error)
- func (t *RPCTractserverTalker) PullTract(addr string, tsid core.TractserverID, from []string, id core.TractID, ...) core.Error
- func (t *RPCTractserverTalker) RSEncode(addr string, tsid core.TractserverID, id core.RSChunkID, length int, ...) (reply core.Error)
- func (t *RPCTractserverTalker) SetVersion(addr string, tsid core.TractserverID, id core.TractID, newVersion int, ...) core.Error
- type RackBasedFailureDomain
- type Server
- type StatusData
- type TSIDSet
- type TractserverTalker
Constants ¶
const ( // RSPieceLength is how large we should aim to make each chunk piece. This // funny value will ensure that each piece comes out <= 64MiB after // ChecksumFile and any future overhead (let's reserve 64 bytes per 64KB). // But it means that we can only fit 7.992 tracts in a chunk, so we'll need // to have some partial tracts to make best use of our space. RSPieceLength = 64*1024*1024 - 64*1024 - 64 // REPLICATED is just copied here to make some code slightly shorter. REPLICATED = core.StorageClassREPLICATED )
Variables ¶
var DefaultDyConfig = DyConfig{
RSEncodeBandwidthGbps: 5.0,
RecoveryBandwidthGbps: 10.0,
CuratorGroups: 4,
}
DefaultDyConfig holds default values for dynamic configuration.
var DefaultProdConfig = Config{ CuratorSeed: time.Now().UnixNano(), UseFailure: false, MaxReplFactor: 10, MaxTractsToExtend: 20, RejectReqThreshold: 100, PartitionMonInterval: 5 * time.Minute, MinFreeBlobSlot: uint64(core.MaxBlobKey / 4 * 3), FreeMemLimit: 2 * 1024 * 1024 * 1024, ConsistencyCheckInterval: 1 * time.Minute, ConsistencyCheckBatchSize: 5000, MasterHeartbeatInterval: 10 * time.Second, MetadataGCInterval: time.Hour, MetadataUndeleteTime: 3 * 24 * time.Hour, TsUnhealthy: 1 * time.Minute, TsDown: 15 * time.Minute, TsHeartbeatGracePeriod: 3 * time.Minute, SyncInterval: 10 * time.Minute, WriteDelay: (8*24 + 12) * time.Hour, }
DefaultProdConfig specifies the default values for Config that is used for production environment.
var DefaultTestConfig = Config{ CuratorSeed: 31337, UseFailure: true, MaxReplFactor: 10, MaxTractsToExtend: 20, RejectReqThreshold: 100, PartitionMonInterval: 5 * time.Minute, MinFreeBlobSlot: uint64(core.MaxBlobKey / 4 * 3), FreeMemLimit: 2 * 1024 * 1024 * 1024, ConsistencyCheckInterval: 10 * time.Second, ConsistencyCheckBatchSize: 10, MasterHeartbeatInterval: 10 * time.Second, MetadataGCInterval: 15 * time.Minute, MetadataUndeleteTime: 3 * time.Hour, TsUnhealthy: 20 * time.Second, TsDown: 30 * time.Second, TsHeartbeatGracePeriod: 30 * time.Second, SyncInterval: 5 * time.Second, WriteDelay: 30 * time.Second, }
DefaultTestConfig specifies the default values for Config that is used for testing environment.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { CuratorSeed int64 // Seed for the random number generator. MaxReplFactor int // What is the max replication factor of a blob? MaxTractsToExtend int // At most how many tracts can one extend a blob by at a time? MasterSpec string // How to find masters. Addr string // Address for RPCs. UseFailure bool // Whether to enable the failure service. RejectReqThreshold int // Pending client request limit. PartitionMonInterval time.Duration // At what interval do we check partitions? MinFreeBlobSlot uint64 // Ask for a new partition when the number of free blob slots in existing partitions drops below this threshold. FreeMemLimit uint64 // No new partition if free system memory drops below this. ConsistencyCheckInterval time.Duration // How often to do a consistency check ConsistencyCheckBatchSize int // How many blobs to checksum at once RaftACSpec string // Spec for raft autoconfig. // --- Master --- MasterHeartbeatInterval time.Duration // --- GC --- MetadataGCInterval time.Duration MetadataUndeleteTime time.Duration // --- Tract Server Monitor --- // TODO(PL-1107) // Unhealthy is anything that hasn't beaten recently but hasn't been offline // long enough to be re-replicated. Unhealthy servers won't host new data as // we expect writes to them to fail. // // NOTE: Needs to be related to tractserver/server.go:curatorHeartbeatInterval. // The idea is that if we miss a few heartbeats we'll re-replicate data to // unblock clients. TsUnhealthy time.Duration // Down is anything that hasn't beaten in so long that it is probably offline // for and the data on it should be considered gone. TsDown time.Duration // When a curator becomes a new leader tract servers might take some time to // know it. A newly elected curator leader will not claim any tract servers // unhealthy or down during the grace period. TsHeartbeatGracePeriod time.Duration // --- Tract Server --- // The interval of sending tracts to tract servers for finding missing tracts. SyncInterval time.Duration // --- Erasure Coding --- // Time after last write that a blob can be considered for erasure coding. WriteDelay time.Duration }
Config encapsulates parameters for Curator.
type Curator ¶
type Curator struct {
// contains filtered or unexported fields
}
Curator manages metadata for tractservers.
func NewCurator ¶
func NewCurator(curatorCfg *Config, mc MasterConnection, tt TractserverTalker, stateCfg durable.StateConfig, r *raft.Raft, fds FailureDomainService, getTime func() time.Time, metricNameSuffix string, ) *Curator
NewCurator creates and returns a new Curator.
func (*Curator) LeadershipChange ¶
LeadershipChange is called from the replication level our leadership status changes. We want to send heartbeats to the master, but only if we think we're leader.
type CuratorCtlHandler ¶
type CuratorCtlHandler struct {
// contains filtered or unexported fields
}
CuratorCtlHandler handles heartbeat message and other non-client-generated RPCs.
func (*CuratorCtlHandler) CuratorTractserverHeartbeat ¶
func (h *CuratorCtlHandler) CuratorTractserverHeartbeat(msg core.CuratorTractserverHeartbeatReq, reply *core.CuratorTractserverHeartbeatReply) error
CuratorTractserverHeartbeat is called by a tractserver to notify the curator that it is still alive.
type CuratorSrvHandler ¶
type CuratorSrvHandler struct {
// contains filtered or unexported fields
}
CuratorSrvHandler defines methods that conform to the Go's RPC requirement.
func (*CuratorSrvHandler) AckExtendBlob ¶
func (h *CuratorSrvHandler) AckExtendBlob(req core.AckExtendBlobReq, reply *core.AckExtendBlobReply) error
AckExtendBlob is the RPC callback for acknowledging the success of extending a blob.
func (*CuratorSrvHandler) CreateBlob ¶
func (h *CuratorSrvHandler) CreateBlob(req core.CreateBlobReq, reply *core.CreateBlobReply) error
CreateBlob is the RPC callback for creating a blob.
func (*CuratorSrvHandler) DeleteBlob ¶
DeleteBlob is the RPC callback for deleting a blob.
func (*CuratorSrvHandler) ExtendBlob ¶
func (h *CuratorSrvHandler) ExtendBlob(req core.ExtendBlobReq, reply *core.ExtendBlobReply) error
ExtendBlob is the RPC callback for extending a blob.
func (*CuratorSrvHandler) FixVersion ¶
func (h *CuratorSrvHandler) FixVersion(req core.FixVersionReq, reply *core.Error) error
FixVersion is sent by a client when they're trying to interact with a tractserver but have invalid version information.
func (*CuratorSrvHandler) GetTracts ¶
func (h *CuratorSrvHandler) GetTracts(req core.GetTractsReq, reply *core.GetTractsReply) error
GetTracts is the RPC callback for getting tract location information for a read or a write.
func (*CuratorSrvHandler) ListBlobs ¶
func (h *CuratorSrvHandler) ListBlobs(req core.ListBlobsReq, reply *core.ListBlobsReply) error
ListBlobs returns a range of blob keys in a partition.
func (*CuratorSrvHandler) ReportBadTS ¶
func (h *CuratorSrvHandler) ReportBadTS(req core.ReportBadTSReq, reply *core.Error) error
ReportBadTS is sent by a client when they can't talk to the host in req. We can use this information to repair tracts more quickly.
func (*CuratorSrvHandler) SetMetadata ¶
func (h *CuratorSrvHandler) SetMetadata(req core.SetMetadataReq, reply *core.Error) error
SetMetadata is the RPC callback for changing blob metadata.
func (*CuratorSrvHandler) StatBlob ¶
func (h *CuratorSrvHandler) StatBlob(id core.BlobID, reply *core.StatBlobReply) error
StatBlob is the RPC callback for getting information about a blob.
func (*CuratorSrvHandler) UndeleteBlob ¶
UndeleteBlob is the RPC callback for undeleting a blob.
type DyConfig ¶
type DyConfig struct { // How much bandwidth we can use for RS encoding. RSEncodeBandwidthGbps float32 // How much bandwidth can we use for recovery (all types). RecoveryBandwidthGbps float32 // Number of curator groups in this cluster. CuratorGroups int }
DyConfig holds values that can be dynamically configured in the curator.
type FailureDomainService ¶
type FailureDomainService interface { // FailureDomain retrieves the failure domain hierarchy for 'hosts'. The // returned failure domains for each host are organized from the lowest // level (host) to the highest (e.g., datacenter). The number of // levels should be the same across all hosts. Nil is returned if no // information is available. // // Use our current naming convention for example, // hosts := []string{"bbaa01", "bbaa02", "ggaa23"} // can be mapped to the following result, // { // {"bbaa01", "bbaa", "bb"}, // {host, rack, cluster} // {"bbaa02", "bbaa", "bb"}, // {"ggaa23", "ggaa", "gg"}, // } GetFailureDomain(hosts []string) [][]string }
FailureDomainService defines the interface for the curator to learn about the failure domain hierarchy of the cluster.
It is assumed that backup/redundancy links are ignored and thus the failure domain hierarchy for the entire cluster can be described as a tree or a forest consisting of trees of equal height. The leaves of the tree(s) are tractservers, and moving up, one might see racks, rows, clusters, etc. Each entity (host, rack, ...) should have a globally unique name.
PL-1362.
type MasterConnection ¶
type MasterConnection interface { // RegisterCurator sends a request to the master to register this curator. RegisterCurator() (core.RegisterCuratorReply, core.Error) // CuratorHeartbeat sends a heartbeat to the master. CuratorHeartbeat(core.CuratorID) (core.CuratorHeartbeatReply, core.Error) // NewPartition sends a request for a new partition to the master. NewPartition(core.CuratorID) (core.NewPartitionReply, core.Error) }
MasterConnection is a connection to a group of masters. Connection establishment is done transparently.
func NewRPCMasterConnection ¶
func NewRPCMasterConnection(addr string, masterSpec string) MasterConnection
NewRPCMasterConnection creates a new RPCMasterConnection to the master set.
type RPCMasterConnection ¶
type RPCMasterConnection struct {
// contains filtered or unexported fields
}
RPCMasterConnection implements MasterConnection based on Go's RPC pacakge.
func (*RPCMasterConnection) CuratorHeartbeat ¶
func (r *RPCMasterConnection) CuratorHeartbeat(id core.CuratorID) (core.CuratorHeartbeatReply, core.Error)
CuratorHeartbeat sends a heartbeat to the master and returns its reply.
func (*RPCMasterConnection) NewPartition ¶
func (r *RPCMasterConnection) NewPartition(id core.CuratorID) (core.NewPartitionReply, core.Error)
NewPartition sends a request for a new partition to the master and returns its reply.
func (*RPCMasterConnection) RegisterCurator ¶
func (r *RPCMasterConnection) RegisterCurator() (core.RegisterCuratorReply, core.Error)
RegisterCurator checks to see if we're already registered. If not, it sends a registration request to the master and logs the registration information.
type RPCTractserverTalker ¶
type RPCTractserverTalker struct {
// contains filtered or unexported fields
}
RPCTractserverTalker implements TractserverTalker using ConnectionCache.
func (*RPCTractserverTalker) CheckTracts ¶
func (t *RPCTractserverTalker) CheckTracts(addr string, tsid core.TractserverID, tracts []core.TractState) core.Error
CheckTracts implements TractserverTalker.CheckTracts
func (*RPCTractserverTalker) CtlStatTract ¶
func (t *RPCTractserverTalker) CtlStatTract(addr string, tsid core.TractserverID, id core.TractID, version int) (reply core.StatTractReply)
CtlStatTract implements TractserverTalker.CtlStatTract.
func (*RPCTractserverTalker) GCTract ¶
func (t *RPCTractserverTalker) GCTract(addr string, tsid core.TractserverID, old []core.TractState, gone []core.TractID) core.Error
GCTract implements TractserverTalker.GCTract.
func (*RPCTractserverTalker) PackTracts ¶
func (t *RPCTractserverTalker) PackTracts( addr string, tsid core.TractserverID, length int, tracts []*core.PackTractSpec, id core.RSChunkID) (reply core.Error)
PackTracts implements TractserverTalker.PackTracts.
func (*RPCTractserverTalker) PullTract ¶
func (t *RPCTractserverTalker) PullTract(addr string, tsid core.TractserverID, from []string, id core.TractID, version int) core.Error
PullTract implements TractserverTalker.PullTract
func (*RPCTractserverTalker) RSEncode ¶
func (t *RPCTractserverTalker) RSEncode( addr string, tsid core.TractserverID, id core.RSChunkID, length int, srcs, dests []core.TSAddr, im []int) (reply core.Error)
RSEncode implements TractserverTalker.RSEncode.
func (*RPCTractserverTalker) SetVersion ¶
func (t *RPCTractserverTalker) SetVersion(addr string, tsid core.TractserverID, id core.TractID, newVersion int, conditionalStamp uint64) core.Error
SetVersion implements TractserverTalker.SetVersion
type RackBasedFailureDomain ¶
type RackBasedFailureDomain struct { }
RackBasedFailureDomain is an implementation of FailureDomainService that parses cluster and rack names out of hostname. Following our current naming convention, a host is named as "XXYYDD", where "XX" stands for a cluster, "YY" stands for a rack, and "DD" stands for a machine. For example, "bbaa20" is machine "20" in rack "aa" of cluster "bb". As a result, RackBasedFailureDomain doesn't need to maintain any state.
Each entity (host, rack, cluster) should have a globally unique name. As a result, "bbaa20" will be mapped to {"bbaa20", "bbaa", "bb"}, because there could be another rack "aa" under a different cluster and similarly another host "20" somewhere under a different rack.
func (RackBasedFailureDomain) GetFailureDomain ¶
func (RackBasedFailureDomain) GetFailureDomain(hosts []string) [][]string
GetFailureDomain simply parses the strings.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the RPC server for a Curator.
type StatusData ¶
type StatusData struct { JobName string Cfg Config ID core.CuratorID LeaderAddr string Members []string RaftTerm uint64 NumPartitions int FreeMem uint64 TotalMem uint64 FreeSpace uint64 Tractservers []tractserverData Reboot time.Time CtlRPC map[string]string SrvRPC map[string]string CuratorStats map[string]interface{} Now time.Time }
StatusData includes curator status info.
type TSIDSet ¶
type TSIDSet struct {
// contains filtered or unexported fields
}
TSIDSet is a representation of a set of tractserver IDs. Up to three IDs can be packed in a pointer with no external storage, as long as they fit in 20 bits. The zero value (a nil pointer) represents the empty set. We use bit ranges (from lsb):
0-1: tag bits: 0 is pointer, 1-3 is count of ids 2-21: id 1 22-41: id 2 42-61: id 3 62: unused 63: always set if tag != 0 (to make it not look like a pointer)
If we have more than three IDs, or any ID can't fit in 20 bits, then we hold a pointer to a plain []core.TractserverID.
We call this a "Set" because it's used like one, even though it does preserve order, and Add doesn't check for duplicates.
This casting is technically not allowed by the unsafe rules. To make it a little safer, we set the highest bit so the stuffed values definitely don't look like a valid pointer (on x86-64). Nothing should ever look at these except the GC, and the GC will skip pointers that fall outside the malloc heap range (as of Go 1.7).
func (TSIDSet) Add ¶
func (s TSIDSet) Add(id core.TractserverID) TSIDSet
Add returns a new TSIDSet with the new id added. The old value should be considered invalidated (just like append).
func (TSIDSet) Contains ¶
func (s TSIDSet) Contains(id core.TractserverID) bool
Contains returns true if the given id is in the set.
func (TSIDSet) Get ¶
func (s TSIDSet) Get(idx int) core.TractserverID
Get returns the id at the given index. It will panic if idx is out of bounds (just like slice indexing).
func (TSIDSet) ToSlice ¶
func (s TSIDSet) ToSlice() []core.TractserverID
ToSlice returns a slice with the same ids as this set.
type TractserverTalker ¶
type TractserverTalker interface { // SetVersion asks the tractserver (tsid, addr) to set the version of 'id' to newVersion. // This will only succeed if the version at the tractserver is newVersion or newVersion-1. SetVersion(addr string, tsid core.TractserverID, id core.TractID, newVersion int, conditionalStamp uint64) core.Error // PullTract asks the tractserver at 'addr' with id 'tsid' to pull the tract with id 'id' and // version 'version' from the source hosts 'from', each of which should own a copy of the tract. PullTract(addr string, tsid core.TractserverID, from []string, id core.TractID, version int) core.Error // CheckTracts asks the tractserver at 'addr' with id 'tsid' if it has the tracts 'tracts'. CheckTracts(addr string, tsid core.TractserverID, tracts []core.TractState) core.Error // GCTract tells the tractserver at 'addr' that the tracts in 'old' are obsolete and the blob that // has the tracts in 'gone' has been deleted. // // Any host may delete the tracts in 'old' (or any older version thereof). // The tracts in 'gone' are safe to be deleted from any host as the blobs are permanently gone. // // Returns any error encountered when trying to talk to the tractserver. GCTract(addr string, tsid core.TractserverID, old []core.TractState, gone []core.TractID) core.Error // CtlStatTract is like the client StatTract, but bypasses request limits. CtlStatTract(addr string, tsid core.TractserverID, id core.TractID, version int) core.StatTractReply // PackTracts asks the tractserver to create a chunk with a bunch of tracts in it. // It will pull data from other tractservers and write it to the local disk. PackTracts(addr string, tsid core.TractserverID, length int, tracts []*core.PackTractSpec, id core.RSChunkID) core.Error // RSEncode asks the tractserver to pull N chunks from other tractservers, compute M // parity chunks, and write the parity chunks to other tractservers. RSEncode(addr string, tsid core.TractserverID, id core.RSChunkID, length int, srcs, dests []core.TSAddr, indexMap []int) core.Error }
TractserverTalker is an abstraction of the transport layer between the Curator and the Tractserver.
func NewRPCTractserverTalker ¶
func NewRPCTractserverTalker() TractserverTalker
NewRPCTractserverTalker returns a new Golang RPC based implementation of TractserverTalker.
Source Files ¶
- config.go
- curator.go
- dyconfig.go
- failure_domain.go
- leader.go
- master_conn.go
- pack_tracts.go
- pack_tracts_context.go
- reconstruct.go
- recovery_loop.go
- rereplicate.go
- rpc_master_conn.go
- rpc_tractserver_talker.go
- server.go
- status.go
- storage_class_loop.go
- sync_state.go
- time_updates.go
- tractserver_monitor.go
- tractserver_picker.go
- tractserver_talker.go
- tsidset.go