Documentation ¶
Index ¶
- Constants
- Variables
- func NewDiskController(s *Store) *diskController
- type Config
- type CuratorTalker
- type Disk
- type Manager
- func (m *Manager) Close(f interface{}) core.Error
- func (m *Manager) CloseDir(d interface{}) core.Error
- func (m *Manager) Delete(id core.TractID) core.Error
- func (m *Manager) Getxattr(f interface{}, name string) ([]byte, core.Error)
- func (m *Manager) Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error)
- func (m *Manager) OpenDir() (interface{}, core.Error)
- func (m *Manager) Read(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)
- func (m *Manager) ReadDir(d interface{}) ([]core.TractID, core.Error)
- func (m *Manager) Scrub(id core.TractID) (int64, core.Error)
- func (m *Manager) SetConfig(ncfg *Config)
- func (m *Manager) SetControlFlags(flags core.DiskControlFlags) core.Error
- func (m *Manager) Setxattr(f interface{}, name string, value []byte) core.Error
- func (m *Manager) Size(f interface{}) (int64, core.Error)
- func (m *Manager) Statfs() core.FsStatus
- func (m *Manager) Status() core.DiskStatus
- func (m *Manager) Stop()
- func (m *Manager) String() string
- func (m *Manager) Write(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)
- type MemDisk
- func (m *MemDisk) Close(f interface{}) core.Error
- func (m *MemDisk) CloseDir(d interface{}) core.Error
- func (m *MemDisk) Delete(id core.TractID) core.Error
- func (m *MemDisk) Getxattr(f interface{}, name string) ([]byte, core.Error)
- func (m *MemDisk) Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error)
- func (m *MemDisk) OpenDir() (interface{}, core.Error)
- func (m *MemDisk) Read(_ context.Context, f interface{}, b []byte, off int64) (int, core.Error)
- func (m *MemDisk) ReadDir(d interface{}) ([]core.TractID, core.Error)
- func (m *MemDisk) Scrub(id core.TractID) (int64, core.Error)
- func (m *MemDisk) SetControlFlags(flags core.DiskControlFlags) core.Error
- func (m *MemDisk) Setxattr(f interface{}, name string, value []byte) core.Error
- func (m *MemDisk) Size(f interface{}) (int64, core.Error)
- func (m *MemDisk) Statfs() core.FsStatus
- func (m *MemDisk) Status() core.DiskStatus
- func (m *MemDisk) Stop()
- func (m *MemDisk) Write(_ context.Context, f interface{}, b []byte, off int64) (int, core.Error)
- type MetadataStore
- type PQAble
- type Priority
- type PriorityQueue
- type RPCCuratorTalker
- type RPCMasterConnection
- func (r *RPCMasterConnection) MasterTractserverHeartbeat(ctx context.Context, id core.TractserverID, addr string, disks []core.FsStatus) (core.MasterTractserverHeartbeatReply, core.Error)
- func (r *RPCMasterConnection) RegisterTractserver(ctx context.Context, addr string) (core.RegisterTractserverReply, core.Error)
- type RPCTractserverTalker
- type Server
- type StatusData
- type Store
- func (s *Store) AddDisk(disk Disk) error
- func (s *Store) Check(tracts []core.TractState) (missing []core.TractState)
- func (s *Store) Config() *Config
- func (s *Store) Create(ctx context.Context, id core.TractID, b []byte, off int64) core.Error
- func (s *Store) DiskCount() (t int)
- func (s *Store) DiskExists(root string) bool
- func (s *Store) GCTracts(old []core.TractState, gone []core.TractID)
- func (s *Store) GetBadTracts(parts []core.PartitionID, limit int) (bad []core.TractID)
- func (s *Store) GetID() core.TractserverID
- func (s *Store) GetSomeTractsByPartition(shard uint64) map[core.PartitionID][]core.TractID
- func (s *Store) GetTractsByDisk(i int) (out []core.TractID)
- func (s *Store) HasID(id core.TractserverID) bool
- func (s *Store) PackTracts(ctx context.Context, length int, srcs []*core.PackTractSpec, ...) core.Error
- func (s *Store) PullTract(ctx context.Context, sources []string, id core.TractID, version int) (err core.Error)
- func (s *Store) RSEncode(ctx context.Context, baseid core.RSChunkID, length int, ...) (err core.Error)
- func (s *Store) Read(ctx context.Context, id core.TractID, version int, length int, off int64) ([]byte, core.Error)
- func (s *Store) RemoveDisk(root string) error
- func (s *Store) SetConfig(ncfg Config)
- func (s *Store) SetControlFlags(root string, flags core.DiskControlFlags) (err core.Error)
- func (s *Store) SetID(id core.TractserverID) (core.TractserverID, core.Error)
- func (s *Store) SetVersion(id core.TractID, newVersion int, conditionalStamp uint64) (finalVersion int, err core.Error)
- func (s *Store) Stat(ctx context.Context, id core.TractID, version int) (int64, uint64, core.Error)
- func (s *Store) Write(ctx context.Context, id core.TractID, version int, b []byte, off int64) core.Error
- type TSCtlHandler
- func (h *TSCtlHandler) CheckTracts(req core.CheckTractsReq, reply *core.CheckTractsReply) error
- func (h *TSCtlHandler) CtlRead(req core.ReadReq, reply *core.ReadReply) error
- func (h *TSCtlHandler) CtlStatTract(req core.StatTractReq, reply *core.StatTractReply) error
- func (h *TSCtlHandler) CtlWrite(req core.WriteReq, reply *core.Error) error
- func (h *TSCtlHandler) GCTract(req core.GCTractReq, reply *core.Error) error
- func (h *TSCtlHandler) GetTSID(req struct{}, reply *core.TractserverID) error
- func (h *TSCtlHandler) PackTracts(req core.PackTractsReq, reply *core.Error) error
- func (h *TSCtlHandler) PullTract(req core.PullTractReq, reply *core.Error) error
- func (h *TSCtlHandler) RSEncode(req core.RSEncodeReq, reply *core.Error) error
- func (h *TSCtlHandler) SetVersion(req core.SetVersionReq, reply *core.SetVersionReply) error
- type TSSrvHandler
- func (h *TSSrvHandler) Cancel(id string, reply *core.Error) error
- func (h *TSSrvHandler) CreateTract(req core.CreateTractReq, reply *core.Error) error
- func (h *TSSrvHandler) GetDiskInfo(req core.GetDiskInfoReq, reply *core.GetDiskInfoReply) error
- func (h *TSSrvHandler) Read(req core.ReadReq, reply *core.ReadReply) error
- func (h *TSSrvHandler) SetControlFlags(req core.SetControlFlagsReq, reply *core.Error) error
- func (h *TSSrvHandler) StatTract(req core.StatTractReq, reply *core.StatTractReply) error
- func (h *TSSrvHandler) Write(req core.WriteReq, reply *core.Error) error
- type TractserverTalker
Constants ¶
const ( // WRITE indicates that the tract should be accessed exclusively, but for a // short time (typical disk latency). Waiters might want to block. WRITE = iota // LONG_WRITE indicates the tract will be accessed exclusively for an // indeterminate time. LONG_WRITE // READ indicates that the tract can be shared with other readers. READ )
Variables ¶
var ( ErrDiskExists = errors.New("A disk with that root already exists") ErrDiskNotFound = errors.New("A disk with that root doesn't exist") ErrTooManyDisks = errors.New("Too many disks!") )
var DefaultProdConfig = Config{ DiskStatusCacheTTL: time.Minute, Addr: "localhost:59900", DiskControllerBase: "/var/tmp/blb-tractserver", RejectReqThreshold: 1000, RejectCtlReqThreshold: 1000, UseFailure: false, ScrubRate: 3 * 1000 * 1000, RegistrationRetry: 5 * time.Second, MasterHeartbeatInterval: 10 * time.Second, MasterHeartbeatRetry: 5 * time.Second, CuratorHeartbeatInterval: 5 * time.Second, TractsPerCuratorHeartbeat: 100, BadTractsPerHeartbeat: 2500, UnlinkTractDelay: 7 * 24 * time.Hour, SweepTractInterval: 24 * time.Hour, DiskLowThreshold: 1024 * 1024 * 1024, Workers: 1, DropCache: true, EncodeIncrementSize: 4 << 20, }
DefaultProdConfig specifies the default values for Config that is used for production. TODO(PL-1113). Some of these values are sized for the current configuration of 12 x 4TB disks, holding up to 1M tracts each, and four curator groups. If we deploy with a significantly different configuration, we may have to revisit these.
var DefaultTestConfig = Config{ DiskStatusCacheTTL: 20 * time.Second, Addr: "localhost:59900", DiskControllerBase: "/var/tmp/blb-tractserver", RejectReqThreshold: 1000, RejectCtlReqThreshold: 1000, UseFailure: true, ScrubRate: 10 * 1024, RegistrationRetry: 5 * time.Second, MasterHeartbeatInterval: 10 * time.Second, MasterHeartbeatRetry: 5 * time.Second, CuratorHeartbeatInterval: 5 * time.Second, TractsPerCuratorHeartbeat: 100, BadTractsPerHeartbeat: 2500, UnlinkTractDelay: 7 * 24 * time.Hour, SweepTractInterval: 24 * time.Hour, DiskLowThreshold: 1024 * 1024 * 1024, Workers: 1, DropCache: true, EncodeIncrementSize: 1 << 20, }
DefaultTestConfig specifies the default values for Config that is used for testing.
var ErrQueueFull = errors.New("Queue is full, cannot add additional items")
ErrQueueFull is the error returned when a push fails due to a full queue.
var Modes = []string{"WRITE", "LONG_WRITE", "READ"}
Modes maps above constants to strings.
Functions ¶
func NewDiskController ¶
func NewDiskController(s *Store) *diskController
NewDiskController creates a new disk controller, listening on a unix socket based on the address in the config, adding and removing disks from the given store.
Types ¶
type Config ¶
type Config struct { MasterSpec string // How to find masters. Addr string // Address for service. DiskControllerBase string // Base directory for disk controller unix sockets. RejectReqThreshold int // Pending incoming client requests on 'Addr' are rejected after this threshold. RejectCtlReqThreshold int // Pending incoming control requests on 'Addr' are rejected after this threshold. UseFailure bool // Whether to enable the failure service. // --- Data and Metadata --- DiskStatusCacheTTL time.Duration // How long a cached disk status stays valid. // --- Disk Scrubbing --- ScrubRate uint64 // How many bytes per second for data scrubbing. // --- Master --- // How long to wait after an unsuccessful registration. RegistrationRetry time.Duration // How often to send heartbeats to the master. MasterHeartbeatInterval time.Duration // How long to wait after an unsuccessful heartbeat. MasterHeartbeatRetry time.Duration // --- Curator --- // How often to send heartbeats to curators. CuratorHeartbeatInterval time.Duration // How many tracts do we put into each heartbeat message to the curator for them to consider GC-ing? TractsPerCuratorHeartbeat int // Max number of bad tracts to report per heartbeat. BadTractsPerHeartbeat int // --- Disk Manager --- // How long do we delay before _really_ deleting tracts UnlinkTractDelay time.Duration // How often do we sweep for deleted tracts. SweepTractInterval time.Duration // We treat a disk is full when the available space is found to be below this // value (in bytes) during Statfs. DiskLowThreshold uint64 // How many IO workers per disk. Workers int // Whether to attempt to keep data out of buffer cache. DropCache bool // --- Reed-Solomon --- EncodeIncrementSize int // If OverrideID.IsValid, will skip contacting the master for an ID and just use this. // The default value is not valid. OverrideID int }
Config encapsulates parameters for Tractserver.
type CuratorTalker ¶
type CuratorTalker interface { // CuratorTractserverHeartbeat sends a heartbeat to the curator at 'curatorAddr' // to notify that this tractserver is still up. The curator returns which partitions it is // in charge of so that we can promptly report corruption to that curator. CuratorTractserverHeartbeat(curatorAddr string, beat core.CuratorTractserverHeartbeatReq) ([]core.PartitionID, error) // close closes the connection to the curator addressed at 'curatorAddr'. Close(curatorAddr string) }
CuratorTalker is the aggregation of connections to curators.
type Disk ¶
type Disk interface { // Open opens a tract for I/O or metadata changes. // Returns a file handle for the opened tract and core.NoError on success, // another error otherwise. Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error) // Close closes a file handle created by calling Open. // Returns core.NoError if the file was opened and successfully closed. // Returns another error otherwise. Close(f interface{}) core.Error // Write writes to the provided tract. Write(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error) // Read reads from the provided tract. Read(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error) // Scrub scrubs the provided tract (read and verify checksums, then discard data). // Returns the size of the file. Scrub(id core.TractID) (int64, core.Error) // Size returns the size of the provided tract. Size(f interface{}) (int64, core.Error) // Delete removes the provided tract. Delete(id core.TractID) core.Error // OpenDir gets an iterator over the full set of tracts on the disk. The // value returned can be passed to ReadDir and CloseDir. OpenDir() (interface{}, core.Error) // ReadDir reads some tract IDs from the disk. It will return ErrEOF when // it's done iterating through the directory. ReadDir(d interface{}) ([]core.TractID, core.Error) // CloseDir closes a directory iterator opened by OpenDir. CloseDir(d interface{}) core.Error // Getxattr gets the value of xattr named 'name' in the open tract 'f'. Getxattr(f interface{}, name string) ([]byte, core.Error) // Setxattr sets xattr named 'name' to value 'value' in the open tract 'f'. Setxattr(f interface{}, name string, value []byte) core.Error // Statfs returns heavyweight information about the disk under management. Statfs() core.FsStatus // Return lightweight information about the health of the disk under management. Status() core.DiskStatus // Sets control flags for this disk. This is persistent: these flags will // persist unless changed with another call to SetControlFlags, or another // mechanism specific to this disk. SetControlFlags(core.DiskControlFlags) core.Error // Stop causes the disk to release all allocated resources and return ErrDiskRemoved // to all subsequent calls except Close, CloseDir, and Status. Stop() }
Disk is a wrapper around an actual disk that we use for testing.
Disk is thread-safe in general, BUT: each individual tract must be used in a way that is compatible with a read-write lock. That is, multiple threads may have a single tract open for read at once, but at most one thread may have a tract open for writing. If multiple threads try to write to a tract concurrently, it may get corrupted. Disk may return an error in that case, but is not required to.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
A Manager prioritizes I/O to a disk and executes it.
This is the set of errors that can be returned from methods on Manager and the implied semantics:
core.NoError -- everything was fine. core.ErrEOF -- could not read as much data as asked to. core.ErrNoSpace -- the filesystem is full. core.ErrInvalidArgument -- a bad file descriptor or similar, shouldn't see this but don't know what steps to take. core.ErrCorruptData -- the tract is corrupt and should be reported to the curator. core.ErrIO -- the filesystem may be corrupt, all tracts are suspect. core.ErrDiskRemoved -- Stop has been called.
func NewManager ¶
NewManager creates a new Manager to manage the disk rooted at 'root'.
NewManager is thread-safe in that it will never produce *corrupt data* when handling parallel mutations to a file, but it may produce inconsistent data.
func (*Manager) Close ¶
Close closes an Open'd file. After calling Close on a file, the caller must not use the file handle for any future operation (even if Close returns an error).
func (*Manager) Getxattr ¶
Getxattr returns the value of the xattr named 'name' in the open tract 'f'.
func (*Manager) Open ¶
Open creates (if necessary) and opens a file. The opaque value returned is a file handle and may be used in future calls to Manager.
func (*Manager) SetConfig ¶
SetConfig notifies the Manager of a new configuration. Not all fields support dynamic configuration (only one does so far: worker count).
func (*Manager) SetControlFlags ¶
func (m *Manager) SetControlFlags(flags core.DiskControlFlags) core.Error
SetControlFlags sets flags for this disk. Note that flags may take effect even if an error is returned.
func (*Manager) Setxattr ¶
Setxattr sets the xattr named 'name' to value 'value' in the open tract 'f'.
func (*Manager) Status ¶
func (m *Manager) Status() core.DiskStatus
Status returns lightweight health information about this disk.
func (*Manager) Stop ¶
func (m *Manager) Stop()
Stop stops the manager and all associated goroutines.
type MemDisk ¶
type MemDisk struct {
// contains filtered or unexported fields
}
MemDisk is a memory-only implementation of the Disk interface that is useful for testing. Files are stored in a map.
func (*MemDisk) SetControlFlags ¶
func (m *MemDisk) SetControlFlags(flags core.DiskControlFlags) core.Error
SetControlFlags sets flags for this disk.
func (*MemDisk) Status ¶
func (m *MemDisk) Status() core.DiskStatus
Status returns lightweight disk status.
type MetadataStore ¶
type MetadataStore struct {
// contains filtered or unexported fields
}
MetadataStore stores the metadata (master assigned id) of the tractserver. MetadataStore is thread-safe.
func NewMetadataStore ¶
func NewMetadataStore() *MetadataStore
NewMetadataStore creates a new metadata store. Metadata will be read from and all 'disks' and merged on startup, and written to all 'disks'.
type PQAble ¶
type PQAble interface { // Less returns true if the element should be ordered before its peer. Less(interface{}) bool }
PQAble if two elements are comparable, the one that is smaller will be ordered before the other.
type Priority ¶
type Priority int
The Priority of a request. We map client-supplied priorities to our own scale to reduce coupling.
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue of Requests, using the "container/heap" interface. The element with the highest priority() will be Pop()'d first.
func NewPriorityQueue ¶
func NewPriorityQueue(max int) *PriorityQueue
NewPriorityQueue creates a new PriorityQueue with max capacity 'max'. If max is less than or equal to zero, there is no limit.
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
Len returns the number of items currently in the queue.
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() PQAble
Pop removes an element from the priority queue. Blocks until an element can be removed.
func (*PriorityQueue) TryPush ¶
func (pq *PriorityQueue) TryPush(item PQAble) error
TryPush tries to push an element to the priority queue. If there is no space in the queue, returns ErrQueueFull. Otherwise, returns nil.
type RPCCuratorTalker ¶
type RPCCuratorTalker struct {
// contains filtered or unexported fields
}
RPCCuratorTalker implements CuratorTalker using ConnectionCache.
func NewRPCCuratorTalker ¶
func NewRPCCuratorTalker() *RPCCuratorTalker
NewRPCCuratorTalker returns a new RPCCuratorTalker.
func (*RPCCuratorTalker) Close ¶
func (ct *RPCCuratorTalker) Close(curatorAddr string)
Close implements CuratorTalker.
func (*RPCCuratorTalker) CuratorTractserverHeartbeat ¶
func (ct *RPCCuratorTalker) CuratorTractserverHeartbeat(curatorAddr string, beat core.CuratorTractserverHeartbeatReq) ([]core.PartitionID, error)
CuratorTractserverHeartbeat implements CuratorTalker.
type RPCMasterConnection ¶
type RPCMasterConnection struct {
// contains filtered or unexported fields
}
RPCMasterConnection encapsulates a connection to a master group. It will probe who the leader is in the master group and send requests(for now only Heartbeat) to the master leader.
func NewRPCMasterConnection ¶
func NewRPCMasterConnection(masterSpec string) *RPCMasterConnection
NewRPCMasterConnection creates a new connection to a master group specified in 'addrs'.
func (*RPCMasterConnection) MasterTractserverHeartbeat ¶
func (r *RPCMasterConnection) MasterTractserverHeartbeat(ctx context.Context, id core.TractserverID, addr string, disks []core.FsStatus) (core.MasterTractserverHeartbeatReply, core.Error)
MasterTractserverHeartbeat sends a heartbeat to the master and returns its reply.
func (*RPCMasterConnection) RegisterTractserver ¶
func (r *RPCMasterConnection) RegisterTractserver(ctx context.Context, addr string) (core.RegisterTractserverReply, core.Error)
RegisterTractserver is called only once when a new tractserver is added to the cluster. It tries forever to get a tractserver ID from the master, until a valid tractserver ID is assigned.
type RPCTractserverTalker ¶
type RPCTractserverTalker struct {
// contains filtered or unexported fields
}
RPCTractserverTalker is a Go RPC-based implementation of TractserverTalker.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the RPC server for the TractStore
func NewServer ¶
func NewServer(store *Store, ct CuratorTalker, mc *RPCMasterConnection, cfg *Config) *Server
NewServer creates a new Server. The server does not listen for or serve requests until Start() is called on it.
type StatusData ¶
type StatusData struct { JobName string Cfg Config ID core.TractserverID FreeMem uint64 TotalMem uint64 FsStatus []core.FsStatus Reboot time.Time // When was the last reboot? CtlRPC map[string]string SrvRPC map[string]string Now time.Time }
StatusData includes tractserver status info.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a tract store. A tract store is a replication layer on top of a durable local disk layer.
func NewStore ¶
func NewStore(tt TractserverTalker, m *MetadataStore, config *Config) *Store
NewStore returns a new tract store. The caller must provide a way to store data and a way to track versions.
func (*Store) AddDisk ¶
AddDisk adds a disk to the store. If AddDisk returns nil, the disk is added and belongs to the store. If this returns an error, the caller should probably Stop the disk.
func (*Store) Check ¶
func (s *Store) Check(tracts []core.TractState) (missing []core.TractState)
Check verifies that we have 'tracts', and our version for the tracts is at least the version provided. If we're missing the tract or our version is too old, we report it as missing. PL-1114
func (*Store) Create ¶
Create creates a tract. This involves creating an empty file on disk, then adding a version for it.
func (*Store) DiskExists ¶
DiskExists returns true if a disk with the given root exists.
func (*Store) GCTracts ¶
func (s *Store) GCTracts(old []core.TractState, gone []core.TractID)
GCTracts reads the tract state returned from the curator and decides if a tract is stale. If so, it removes the tract from 's.Disk'. Error is returned if we fail to remove it.
We can delete our copy of the tract if it has version <= the version in the GC message.
func (*Store) GetBadTracts ¶
GetBadTracts returns the tracts that we know are missing or corrupt for the provided partition(s).
func (*Store) GetID ¶
func (s *Store) GetID() core.TractserverID
GetID returns the tractserver's ID.
func (*Store) GetSomeTractsByPartition ¶
GetSomeTractsByPartition returns tract ids grouped by partition. To reduce the need to allocate very large slices of tract ids, it returns only a subset of the known tracts with each call, controlled by the low bits of 'shard'. To ensure that all tracts are visited, a caller should loop and increment shard with each call.
func (*Store) GetTractsByDisk ¶
GetTractsByDisk returns all tracts on the given disk (by index).
func (*Store) HasID ¶
func (s *Store) HasID(id core.TractserverID) bool
HasID returns true if this tractserver has a valid ID and it matches the provided 'id', false otherwise.
func (*Store) PackTracts ¶
func (s *Store) PackTracts(ctx context.Context, length int, srcs []*core.PackTractSpec, dest core.RSChunkID) core.Error
PackTracts reads tracts from other tractservers and writes them to a local RS data chunk.
func (*Store) PullTract ¶
func (s *Store) PullTract(ctx context.Context, sources []string, id core.TractID, version int) (err core.Error)
PullTract attempts to read the tract (id, version) from the given 'sources'. It tries each source sequentially and stops if it succeeds early. It returns the latest error if pulling from all source hosts fails.
func (*Store) RSEncode ¶
func (s *Store) RSEncode(ctx context.Context, baseid core.RSChunkID, length int, srcs, dests []core.TSAddr, indexMap []int) (err core.Error)
RSEncode performs the RS parity computation, reading data and writing parity to other tractservers.
func (*Store) Read ¶
func (s *Store) Read(ctx context.Context, id core.TractID, version int, length int, off int64) ([]byte, core.Error)
Read is called from the RPC layer to perform a read.
func (*Store) RemoveDisk ¶
RemoveDisk removes a disk from the store, identified by its root. This will Stop the disk. All disk-related goroutines and resources may not be freed by the time this returns, but they should be soon after.
func (*Store) SetConfig ¶
SetConfig notifies the Store of a new configuration. Not all fields support dynamic configuration changes.
func (*Store) SetControlFlags ¶
SetControlFlags changes control flags on one disk (specified by root). If the given root is unknown, it returns ErrFileNotFound.
func (*Store) SetID ¶
func (s *Store) SetID(id core.TractserverID) (core.TractserverID, core.Error)
SetID sets the tractserver's id to 'id'.
func (*Store) SetVersion ¶
func (s *Store) SetVersion(id core.TractID, newVersion int, conditionalStamp uint64) (finalVersion int, err core.Error)
SetVersion tries to set the version for an existing tract 'id' to 'newVersion'. The rule is that 'newVersion' must be <= ourVersion+1. Then the version is set to max(ourVersion, newVersion) and this value is returned.
On success, returns the version number of the tract (may be unmodified) and core.NoError. On failure, returns another core.Error.
type TSCtlHandler ¶
type TSCtlHandler struct {
// contains filtered or unexported fields
}
TSCtlHandler handles all control messages.
func (*TSCtlHandler) CheckTracts ¶
func (h *TSCtlHandler) CheckTracts(req core.CheckTractsReq, reply *core.CheckTractsReply) error
CheckTracts is a request from a curator to verify that we have the tracts the curator thinks we do.
func (*TSCtlHandler) CtlRead ¶
CtlRead reads from a tract, bypassing request limits. CtlReads are not cancellable.
func (*TSCtlHandler) CtlStatTract ¶
func (h *TSCtlHandler) CtlStatTract(req core.StatTractReq, reply *core.StatTractReply) error
CtlStatTract returns the size of a tract.
func (*TSCtlHandler) CtlWrite ¶
CtlWrite does a write to a tract, bypassing request limits. Unlike regular Write, CtlWrite creates the tract when Off is zero.
func (*TSCtlHandler) GCTract ¶
func (h *TSCtlHandler) GCTract(req core.GCTractReq, reply *core.Error) error
GCTract is a request to garbage collect the provided tracts as we don't need to store them anymore.
func (*TSCtlHandler) GetTSID ¶
func (h *TSCtlHandler) GetTSID(req struct{}, reply *core.TractserverID) error
GetTSID returns the id of this tractserver. This is only used for testing purposes and not in production.
func (*TSCtlHandler) PackTracts ¶
func (h *TSCtlHandler) PackTracts(req core.PackTractsReq, reply *core.Error) error
PackTracts instructs the tractserver to read a bunch of tracts from other tractservers and write them to one RS data chunk on the local tractserver.
func (*TSCtlHandler) PullTract ¶
func (h *TSCtlHandler) PullTract(req core.PullTractReq, reply *core.Error) error
PullTract copies a tract from an existing tractserver.
func (*TSCtlHandler) RSEncode ¶
func (h *TSCtlHandler) RSEncode(req core.RSEncodeReq, reply *core.Error) error
RSEncode instructs the tractserver to read a bunch of RS data chunks, perform the RS parity computation, and write out parity chunks to other tractservers.
func (*TSCtlHandler) SetVersion ¶
func (h *TSCtlHandler) SetVersion(req core.SetVersionReq, reply *core.SetVersionReply) error
SetVersion sets the version of a tract. This is done by the curator when it changes replication group membership.
type TSSrvHandler ¶
type TSSrvHandler struct {
// contains filtered or unexported fields
}
TSSrvHandler handles all client requests.
func (*TSSrvHandler) Cancel ¶
func (h *TSSrvHandler) Cancel(id string, reply *core.Error) error
Cancel attempts to signal the pending operation identified by 'id' that it should be canceled.
func (*TSSrvHandler) CreateTract ¶
func (h *TSSrvHandler) CreateTract(req core.CreateTractReq, reply *core.Error) error
CreateTract creates a tract and writes data to it. Upon success, the tract can be further Read/Write'd, and will have version 1.
func (*TSSrvHandler) GetDiskInfo ¶
func (h *TSSrvHandler) GetDiskInfo(req core.GetDiskInfoReq, reply *core.GetDiskInfoReply) error
GetDiskInfo returns a summary of disk info in reply.
func (*TSSrvHandler) SetControlFlags ¶
func (h *TSSrvHandler) SetControlFlags(req core.SetControlFlagsReq, reply *core.Error) error
SetControlFlags changes control flags for a disk.
func (*TSSrvHandler) StatTract ¶
func (h *TSSrvHandler) StatTract(req core.StatTractReq, reply *core.StatTractReply) error
StatTract returns the size of a tract.
type TractserverTalker ¶
type TractserverTalker interface { // CtlRead does a normal read of the tract (id, version) from 'addr'. CtlRead(ctx context.Context, addr string, id core.TractID, version int, len int, offset int64) ([]byte, core.Error) // CtlWrite writes data to a tract or RS chunk. CtlWrite(ctx context.Context, addr string, id core.TractID, v int, offset int64, b []byte) core.Error }
TractserverTalker manages talking to other tractservers to replicate data.
func NewRPCTractserverTalker ¶
func NewRPCTractserverTalker() TractserverTalker
NewRPCTractserverTalker returns a new RPCTractserverTalker.