curator

package
v0.0.0-...-fd5963e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2019 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RSPieceLength is how large we should aim to make each chunk piece. This
	// funny value will ensure that each piece comes out <= 64MiB after
	// ChecksumFile and any future overhead (let's reserve 64 bytes per 64KB).
	// But it means that we can only fit 7.992 tracts in a chunk, so we'll need
	// to have some partial tracts to make best use of our space.
	RSPieceLength = 64*1024*1024 - 64*1024 - 64

	// REPLICATED is just copied here to make some code slightly shorter.
	REPLICATED = core.StorageClassREPLICATED
)

Variables

View Source
var DefaultDyConfig = DyConfig{
	RSEncodeBandwidthGbps: 5.0,
	RecoveryBandwidthGbps: 10.0,
	CuratorGroups:         4,
}

DefaultDyConfig holds default values for dynamic configuration.

View Source
var DefaultProdConfig = Config{

	CuratorSeed: time.Now().UnixNano(),

	UseFailure: false,

	MaxReplFactor: 10,

	MaxTractsToExtend: 20,

	RejectReqThreshold: 100,

	PartitionMonInterval: 5 * time.Minute,

	MinFreeBlobSlot: uint64(core.MaxBlobKey / 4 * 3),

	FreeMemLimit: 2 * 1024 * 1024 * 1024,

	ConsistencyCheckInterval: 1 * time.Minute,

	ConsistencyCheckBatchSize: 5000,

	MasterHeartbeatInterval: 10 * time.Second,

	MetadataGCInterval:   time.Hour,
	MetadataUndeleteTime: 3 * 24 * time.Hour,

	TsUnhealthy:            1 * time.Minute,
	TsDown:                 15 * time.Minute,
	TsHeartbeatGracePeriod: 3 * time.Minute,

	SyncInterval: 10 * time.Minute,

	WriteDelay: (8*24 + 12) * time.Hour,
}

DefaultProdConfig specifies the default values for Config that is used for production environment.

View Source
var DefaultTestConfig = Config{

	CuratorSeed: 31337,

	UseFailure: true,

	MaxReplFactor: 10,

	MaxTractsToExtend: 20,

	RejectReqThreshold: 100,

	PartitionMonInterval: 5 * time.Minute,

	MinFreeBlobSlot: uint64(core.MaxBlobKey / 4 * 3),

	FreeMemLimit: 2 * 1024 * 1024 * 1024,

	ConsistencyCheckInterval: 10 * time.Second,

	ConsistencyCheckBatchSize: 10,

	MasterHeartbeatInterval: 10 * time.Second,

	MetadataGCInterval:   15 * time.Minute,
	MetadataUndeleteTime: 3 * time.Hour,

	TsUnhealthy:            20 * time.Second,
	TsDown:                 30 * time.Second,
	TsHeartbeatGracePeriod: 30 * time.Second,

	SyncInterval: 5 * time.Second,

	WriteDelay: 30 * time.Second,
}

DefaultTestConfig specifies the default values for Config that is used for testing environment.

Functions

This section is empty.

Types

type Config

type Config struct {
	CuratorSeed       int64 // Seed for the random number generator.
	MaxReplFactor     int   // What is the max replication factor of a blob?
	MaxTractsToExtend int   // At most how many tracts can one extend a blob by at a time?

	MasterSpec string // How to find masters.

	Addr               string // Address for RPCs.
	UseFailure         bool   // Whether to enable the failure service.
	RejectReqThreshold int    // Pending client request limit.

	PartitionMonInterval time.Duration // At what interval do we check partitions?
	MinFreeBlobSlot      uint64        // Ask for a new partition when the number of free blob slots in existing partitions drops below this threshold.
	FreeMemLimit         uint64        // No new partition if free system memory drops below this.

	ConsistencyCheckInterval  time.Duration // How often to do a consistency check
	ConsistencyCheckBatchSize int           // How many blobs to checksum at once

	RaftACSpec string // Spec for raft autoconfig.

	// --- Master ---
	MasterHeartbeatInterval time.Duration

	// --- GC ---
	MetadataGCInterval   time.Duration
	MetadataUndeleteTime time.Duration

	// --- Tract Server Monitor ---
	// TODO(PL-1107)
	// Unhealthy is anything that hasn't beaten recently but hasn't been offline
	// long enough to be re-replicated.  Unhealthy servers won't host new data as
	// we expect writes to them to fail.
	//
	// NOTE: Needs to be related to tractserver/server.go:curatorHeartbeatInterval.
	// The idea is that if we miss a few heartbeats we'll re-replicate data to
	// unblock clients.
	TsUnhealthy time.Duration
	// Down is anything that hasn't beaten in so long that it is probably offline
	// for and the data on it should be considered gone.
	TsDown time.Duration
	// When a curator becomes a new leader tract servers might take some time to
	// know it. A newly elected curator leader will not claim any tract servers
	// unhealthy or down during the grace period.
	TsHeartbeatGracePeriod time.Duration

	// --- Tract Server ---
	// The interval of sending tracts to tract servers for finding missing tracts.
	SyncInterval time.Duration

	// --- Erasure Coding ---
	// Time after last write that a blob can be considered for erasure coding.
	WriteDelay time.Duration
}

Config encapsulates parameters for Curator.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration object has reasonable(not obviously wrong) values.

type Curator

type Curator struct {
	// contains filtered or unexported fields
}

Curator manages metadata for tractservers.

func NewCurator

func NewCurator(curatorCfg *Config,
	mc MasterConnection,
	tt TractserverTalker,
	stateCfg durable.StateConfig,
	r *raft.Raft,
	fds FailureDomainService,
	getTime func() time.Time,
	metricNameSuffix string,
) *Curator

NewCurator creates and returns a new Curator.

func (*Curator) LeadershipChange

func (c *Curator) LeadershipChange(iAmLeader bool)

LeadershipChange is called from the replication level our leadership status changes. We want to send heartbeats to the master, but only if we think we're leader.

type CuratorCtlHandler

type CuratorCtlHandler struct {
	// contains filtered or unexported fields
}

CuratorCtlHandler handles heartbeat message and other non-client-generated RPCs.

func (*CuratorCtlHandler) CuratorTractserverHeartbeat

CuratorTractserverHeartbeat is called by a tractserver to notify the curator that it is still alive.

type CuratorSrvHandler

type CuratorSrvHandler struct {
	// contains filtered or unexported fields
}

CuratorSrvHandler defines methods that conform to the Go's RPC requirement.

func (*CuratorSrvHandler) AckExtendBlob

func (h *CuratorSrvHandler) AckExtendBlob(req core.AckExtendBlobReq, reply *core.AckExtendBlobReply) error

AckExtendBlob is the RPC callback for acknowledging the success of extending a blob.

func (*CuratorSrvHandler) CreateBlob

func (h *CuratorSrvHandler) CreateBlob(req core.CreateBlobReq, reply *core.CreateBlobReply) error

CreateBlob is the RPC callback for creating a blob.

func (*CuratorSrvHandler) DeleteBlob

func (h *CuratorSrvHandler) DeleteBlob(id core.BlobID, reply *core.Error) error

DeleteBlob is the RPC callback for deleting a blob.

func (*CuratorSrvHandler) ExtendBlob

func (h *CuratorSrvHandler) ExtendBlob(req core.ExtendBlobReq, reply *core.ExtendBlobReply) error

ExtendBlob is the RPC callback for extending a blob.

func (*CuratorSrvHandler) FixVersion

func (h *CuratorSrvHandler) FixVersion(req core.FixVersionReq, reply *core.Error) error

FixVersion is sent by a client when they're trying to interact with a tractserver but have invalid version information.

func (*CuratorSrvHandler) GetTracts

func (h *CuratorSrvHandler) GetTracts(req core.GetTractsReq, reply *core.GetTractsReply) error

GetTracts is the RPC callback for getting tract location information for a read or a write.

func (*CuratorSrvHandler) ListBlobs

func (h *CuratorSrvHandler) ListBlobs(req core.ListBlobsReq, reply *core.ListBlobsReply) error

ListBlobs returns a range of blob keys in a partition.

func (*CuratorSrvHandler) ReportBadTS

func (h *CuratorSrvHandler) ReportBadTS(req core.ReportBadTSReq, reply *core.Error) error

ReportBadTS is sent by a client when they can't talk to the host in req. We can use this information to repair tracts more quickly.

func (*CuratorSrvHandler) SetMetadata

func (h *CuratorSrvHandler) SetMetadata(req core.SetMetadataReq, reply *core.Error) error

SetMetadata is the RPC callback for changing blob metadata.

func (*CuratorSrvHandler) StatBlob

func (h *CuratorSrvHandler) StatBlob(id core.BlobID, reply *core.StatBlobReply) error

StatBlob is the RPC callback for getting information about a blob.

func (*CuratorSrvHandler) UndeleteBlob

func (h *CuratorSrvHandler) UndeleteBlob(id core.BlobID, reply *core.Error) error

UndeleteBlob is the RPC callback for undeleting a blob.

type DyConfig

type DyConfig struct {

	// How much bandwidth we can use for RS encoding.
	RSEncodeBandwidthGbps float32

	// How much bandwidth can we use for recovery (all types).
	RecoveryBandwidthGbps float32

	// Number of curator groups in this cluster.
	CuratorGroups int
}

DyConfig holds values that can be dynamically configured in the curator.

type FailureDomainService

type FailureDomainService interface {
	// FailureDomain retrieves the failure domain hierarchy for 'hosts'. The
	// returned failure domains for each host are organized from the lowest
	// level (host) to the highest (e.g., datacenter). The number of
	// levels should be the same across all hosts. Nil is returned if no
	// information is available.
	//
	// Use our current naming convention for example,
	//   hosts := []string{"bbaa01", "bbaa02", "ggaa23"}
	// can be mapped to the following result,
	//   {
	//     {"bbaa01", "bbaa", "bb"}, // {host, rack, cluster}
	//     {"bbaa02", "bbaa", "bb"},
	//     {"ggaa23", "ggaa", "gg"},
	//   }
	GetFailureDomain(hosts []string) [][]string
}

FailureDomainService defines the interface for the curator to learn about the failure domain hierarchy of the cluster.

It is assumed that backup/redundancy links are ignored and thus the failure domain hierarchy for the entire cluster can be described as a tree or a forest consisting of trees of equal height. The leaves of the tree(s) are tractservers, and moving up, one might see racks, rows, clusters, etc. Each entity (host, rack, ...) should have a globally unique name.

PL-1362.

type MasterConnection

type MasterConnection interface {
	// RegisterCurator sends a request to the master to register this curator.
	RegisterCurator() (core.RegisterCuratorReply, core.Error)

	// CuratorHeartbeat sends a heartbeat to the master.
	CuratorHeartbeat(core.CuratorID) (core.CuratorHeartbeatReply, core.Error)

	// NewPartition sends a request for a new partition to the master.
	NewPartition(core.CuratorID) (core.NewPartitionReply, core.Error)
}

MasterConnection is a connection to a group of masters. Connection establishment is done transparently.

func NewRPCMasterConnection

func NewRPCMasterConnection(addr string, masterSpec string) MasterConnection

NewRPCMasterConnection creates a new RPCMasterConnection to the master set.

type RPCMasterConnection

type RPCMasterConnection struct {
	// contains filtered or unexported fields
}

RPCMasterConnection implements MasterConnection based on Go's RPC pacakge.

func (*RPCMasterConnection) CuratorHeartbeat

CuratorHeartbeat sends a heartbeat to the master and returns its reply.

func (*RPCMasterConnection) NewPartition

NewPartition sends a request for a new partition to the master and returns its reply.

func (*RPCMasterConnection) RegisterCurator

func (r *RPCMasterConnection) RegisterCurator() (core.RegisterCuratorReply, core.Error)

RegisterCurator checks to see if we're already registered. If not, it sends a registration request to the master and logs the registration information.

type RPCTractserverTalker

type RPCTractserverTalker struct {
	// contains filtered or unexported fields
}

RPCTractserverTalker implements TractserverTalker using ConnectionCache.

func (*RPCTractserverTalker) CheckTracts

func (t *RPCTractserverTalker) CheckTracts(addr string, tsid core.TractserverID, tracts []core.TractState) core.Error

CheckTracts implements TractserverTalker.CheckTracts

func (*RPCTractserverTalker) CtlStatTract

func (t *RPCTractserverTalker) CtlStatTract(addr string, tsid core.TractserverID, id core.TractID, version int) (reply core.StatTractReply)

CtlStatTract implements TractserverTalker.CtlStatTract.

func (*RPCTractserverTalker) GCTract

func (t *RPCTractserverTalker) GCTract(addr string, tsid core.TractserverID, old []core.TractState, gone []core.TractID) core.Error

GCTract implements TractserverTalker.GCTract.

func (*RPCTractserverTalker) PackTracts

func (t *RPCTractserverTalker) PackTracts(
	addr string, tsid core.TractserverID, length int, tracts []*core.PackTractSpec, id core.RSChunkID) (reply core.Error)

PackTracts implements TractserverTalker.PackTracts.

func (*RPCTractserverTalker) PullTract

func (t *RPCTractserverTalker) PullTract(addr string, tsid core.TractserverID, from []string, id core.TractID, version int) core.Error

PullTract implements TractserverTalker.PullTract

func (*RPCTractserverTalker) RSEncode

func (t *RPCTractserverTalker) RSEncode(
	addr string, tsid core.TractserverID, id core.RSChunkID, length int, srcs, dests []core.TSAddr, im []int) (reply core.Error)

RSEncode implements TractserverTalker.RSEncode.

func (*RPCTractserverTalker) SetVersion

func (t *RPCTractserverTalker) SetVersion(addr string, tsid core.TractserverID, id core.TractID, newVersion int, conditionalStamp uint64) core.Error

SetVersion implements TractserverTalker.SetVersion

type RackBasedFailureDomain

type RackBasedFailureDomain struct {
}

RackBasedFailureDomain is an implementation of FailureDomainService that parses cluster and rack names out of hostname. Following our current naming convention, a host is named as "XXYYDD", where "XX" stands for a cluster, "YY" stands for a rack, and "DD" stands for a machine. For example, "bbaa20" is machine "20" in rack "aa" of cluster "bb". As a result, RackBasedFailureDomain doesn't need to maintain any state.

Each entity (host, rack, cluster) should have a globally unique name. As a result, "bbaa20" will be mapped to {"bbaa20", "bbaa", "bb"}, because there could be another rack "aa" under a different cluster and similarly another host "20" somewhere under a different rack.

func (RackBasedFailureDomain) GetFailureDomain

func (RackBasedFailureDomain) GetFailureDomain(hosts []string) [][]string

GetFailureDomain simply parses the strings.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the RPC server for a Curator.

func NewServer

func NewServer(curator *Curator, cfg *Config, raft *raft.Raft, storage *raft.Storage) *Server

NewServer creates a new Server. The server does not listen for or serve requests until Start() is called on it.

func (*Server) Start

func (s *Server) Start() (err error)

Start starts the RPC server. Start is blocking and will run forever.

type StatusData

type StatusData struct {
	JobName       string
	Cfg           Config
	ID            core.CuratorID
	LeaderAddr    string
	Members       []string
	RaftTerm      uint64
	NumPartitions int
	FreeMem       uint64
	TotalMem      uint64
	FreeSpace     uint64
	Tractservers  []tractserverData

	Reboot       time.Time
	CtlRPC       map[string]string
	SrvRPC       map[string]string
	CuratorStats map[string]interface{}
	Now          time.Time
}

StatusData includes curator status info.

type TSIDSet

type TSIDSet struct {
	// contains filtered or unexported fields
}

TSIDSet is a representation of a set of tractserver IDs. Up to three IDs can be packed in a pointer with no external storage, as long as they fit in 20 bits. The zero value (a nil pointer) represents the empty set. We use bit ranges (from lsb):

0-1: tag bits: 0 is pointer, 1-3 is count of ids
2-21: id 1
22-41: id 2
42-61: id 3
62: unused
63: always set if tag != 0 (to make it not look like a pointer)

If we have more than three IDs, or any ID can't fit in 20 bits, then we hold a pointer to a plain []core.TractserverID.

We call this a "Set" because it's used like one, even though it does preserve order, and Add doesn't check for duplicates.

This casting is technically not allowed by the unsafe rules. To make it a little safer, we set the highest bit so the stuffed values definitely don't look like a valid pointer (on x86-64). Nothing should ever look at these except the GC, and the GC will skip pointers that fall outside the malloc heap range (as of Go 1.7).

func (TSIDSet) Add

func (s TSIDSet) Add(id core.TractserverID) TSIDSet

Add returns a new TSIDSet with the new id added. The old value should be considered invalidated (just like append).

func (TSIDSet) Contains

func (s TSIDSet) Contains(id core.TractserverID) bool

Contains returns true if the given id is in the set.

func (TSIDSet) Get

func (s TSIDSet) Get(idx int) core.TractserverID

Get returns the id at the given index. It will panic if idx is out of bounds (just like slice indexing).

func (TSIDSet) Len

func (s TSIDSet) Len() int

Len returns the number of ids stored in this set.

func (TSIDSet) Merge

func (s TSIDSet) Merge(t TSIDSet) TSIDSet

Merge returns a new set with all the ids in this set and another set.

func (TSIDSet) ToSlice

func (s TSIDSet) ToSlice() []core.TractserverID

ToSlice returns a slice with the same ids as this set.

type TractserverTalker

type TractserverTalker interface {
	// SetVersion asks the tractserver (tsid, addr) to set the version of 'id' to newVersion.
	// This will only succeed if the version at the tractserver is newVersion or newVersion-1.
	SetVersion(addr string, tsid core.TractserverID, id core.TractID, newVersion int, conditionalStamp uint64) core.Error

	// PullTract asks the tractserver at 'addr' with id 'tsid' to pull the tract with id 'id' and
	// version 'version' from the source hosts 'from', each of which should own a copy of the tract.
	PullTract(addr string, tsid core.TractserverID, from []string, id core.TractID, version int) core.Error

	// CheckTracts asks the tractserver at 'addr' with id 'tsid' if it has the tracts 'tracts'.
	CheckTracts(addr string, tsid core.TractserverID, tracts []core.TractState) core.Error

	// GCTract tells the tractserver at 'addr' that the tracts in 'old' are obsolete and the blob that
	// has the tracts in 'gone' has been deleted.
	//
	// Any host may delete the tracts in 'old' (or any older version thereof).
	// The tracts in 'gone' are safe to be deleted from any host as the blobs are permanently gone.
	//
	// Returns any error encountered when trying to talk to the tractserver.
	GCTract(addr string, tsid core.TractserverID, old []core.TractState, gone []core.TractID) core.Error

	// CtlStatTract is like the client StatTract, but bypasses request limits.
	CtlStatTract(addr string, tsid core.TractserverID, id core.TractID, version int) core.StatTractReply

	// PackTracts asks the tractserver to create a chunk with a bunch of tracts in it.
	// It will pull data from other tractservers and write it to the local disk.
	PackTracts(addr string, tsid core.TractserverID, length int, tracts []*core.PackTractSpec, id core.RSChunkID) core.Error

	// RSEncode asks the tractserver to pull N chunks from other tractservers, compute M
	// parity chunks, and write the parity chunks to other tractservers.
	RSEncode(addr string, tsid core.TractserverID, id core.RSChunkID, length int, srcs, dests []core.TSAddr, indexMap []int) core.Error
}

TractserverTalker is an abstraction of the transport layer between the Curator and the Tractserver.

func NewRPCTractserverTalker

func NewRPCTractserverTalker() TractserverTalker

NewRPCTractserverTalker returns a new Golang RPC based implementation of TractserverTalker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL