tractserver

package
v0.0.0-...-fd5963e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2019 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// WRITE indicates that the tract should be accessed exclusively, but for a
	// short time (typical disk latency). Waiters might want to block.
	WRITE = iota

	// LONG_WRITE indicates the tract will be accessed exclusively for an
	// indeterminate time.
	LONG_WRITE

	// READ indicates that the tract can be shared with other readers.
	READ
)

Variables

View Source
var (
	ErrDiskExists   = errors.New("A disk with that root already exists")
	ErrDiskNotFound = errors.New("A disk with that root doesn't exist")
	ErrTooManyDisks = errors.New("Too many disks!")
)
View Source
var DefaultProdConfig = Config{

	DiskStatusCacheTTL: time.Minute,

	Addr: "localhost:59900",

	DiskControllerBase: "/var/tmp/blb-tractserver",

	RejectReqThreshold:    1000,
	RejectCtlReqThreshold: 1000,

	UseFailure: false,

	ScrubRate: 3 * 1000 * 1000,

	RegistrationRetry:       5 * time.Second,
	MasterHeartbeatInterval: 10 * time.Second,
	MasterHeartbeatRetry:    5 * time.Second,

	CuratorHeartbeatInterval:  5 * time.Second,
	TractsPerCuratorHeartbeat: 100,
	BadTractsPerHeartbeat:     2500,

	UnlinkTractDelay:   7 * 24 * time.Hour,
	SweepTractInterval: 24 * time.Hour,
	DiskLowThreshold:   1024 * 1024 * 1024,
	Workers:            1,
	DropCache:          true,

	EncodeIncrementSize: 4 << 20,
}

DefaultProdConfig specifies the default values for Config that is used for production. TODO(PL-1113). Some of these values are sized for the current configuration of 12 x 4TB disks, holding up to 1M tracts each, and four curator groups. If we deploy with a significantly different configuration, we may have to revisit these.

View Source
var DefaultTestConfig = Config{

	DiskStatusCacheTTL: 20 * time.Second,

	Addr: "localhost:59900",

	DiskControllerBase: "/var/tmp/blb-tractserver",

	RejectReqThreshold:    1000,
	RejectCtlReqThreshold: 1000,

	UseFailure: true,

	ScrubRate: 10 * 1024,

	RegistrationRetry:       5 * time.Second,
	MasterHeartbeatInterval: 10 * time.Second,
	MasterHeartbeatRetry:    5 * time.Second,

	CuratorHeartbeatInterval:  5 * time.Second,
	TractsPerCuratorHeartbeat: 100,
	BadTractsPerHeartbeat:     2500,

	UnlinkTractDelay:   7 * 24 * time.Hour,
	SweepTractInterval: 24 * time.Hour,
	DiskLowThreshold:   1024 * 1024 * 1024,
	Workers:            1,
	DropCache:          true,

	EncodeIncrementSize: 1 << 20,
}

DefaultTestConfig specifies the default values for Config that is used for testing.

View Source
var ErrQueueFull = errors.New("Queue is full, cannot add additional items")

ErrQueueFull is the error returned when a push fails due to a full queue.

View Source
var Modes = []string{"WRITE", "LONG_WRITE", "READ"}

Modes maps above constants to strings.

Functions

func NewDiskController

func NewDiskController(s *Store) *diskController

NewDiskController creates a new disk controller, listening on a unix socket based on the address in the config, adding and removing disks from the given store.

Types

type Config

type Config struct {
	MasterSpec            string // How to find masters.
	Addr                  string // Address for service.
	DiskControllerBase    string // Base directory for disk controller unix sockets.
	RejectReqThreshold    int    // Pending incoming client requests on 'Addr' are rejected after this threshold.
	RejectCtlReqThreshold int    // Pending incoming control requests on 'Addr' are rejected after this threshold.
	UseFailure            bool   // Whether to enable the failure service.

	// --- Data and Metadata ---
	DiskStatusCacheTTL time.Duration // How long a cached disk status stays valid.

	// --- Disk Scrubbing ---
	ScrubRate uint64 // How many bytes per second for data scrubbing.

	// --- Master ---
	// How long to wait after an unsuccessful registration.
	RegistrationRetry time.Duration
	// How often to send heartbeats to the master.
	MasterHeartbeatInterval time.Duration
	// How long to wait after an unsuccessful heartbeat.
	MasterHeartbeatRetry time.Duration

	// --- Curator ---
	// How often to send heartbeats to curators.
	CuratorHeartbeatInterval time.Duration
	// How many tracts do we put into each heartbeat message to the curator for them to consider GC-ing?
	TractsPerCuratorHeartbeat int
	// Max number of bad tracts to report per heartbeat.
	BadTractsPerHeartbeat int

	// --- Disk Manager ---
	// How long do we delay before _really_ deleting tracts
	UnlinkTractDelay time.Duration
	// How often do we sweep for deleted tracts.
	SweepTractInterval time.Duration
	// We treat a disk is full when the available space is found to be below this
	// value (in bytes) during Statfs.
	DiskLowThreshold uint64
	// How many IO workers per disk.
	Workers int
	// Whether to attempt to keep data out of buffer cache.
	DropCache bool

	// --- Reed-Solomon ---
	EncodeIncrementSize int

	// If OverrideID.IsValid, will skip contacting the master for an ID and just use this.
	// The default value is not valid.
	OverrideID int
}

Config encapsulates parameters for Tractserver.

func (Config) Validate

func (c Config) Validate() error

Validate validates the configuration object has reasonable(not obviously wrong) values.

type CuratorTalker

type CuratorTalker interface {
	// CuratorTractserverHeartbeat sends a heartbeat to the curator at 'curatorAddr'
	// to notify that this tractserver is still up.  The curator returns which partitions it is
	// in charge of so that we can promptly report corruption to that curator.
	CuratorTractserverHeartbeat(curatorAddr string, beat core.CuratorTractserverHeartbeatReq) ([]core.PartitionID, error)

	// close closes the connection to the curator addressed at 'curatorAddr'.
	Close(curatorAddr string)
}

CuratorTalker is the aggregation of connections to curators.

type Disk

type Disk interface {
	// Open opens a tract for I/O or metadata changes.
	// Returns a file handle for the opened tract and core.NoError on success,
	// another error otherwise.
	Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error)

	// Close closes a file handle created by calling Open.
	// Returns core.NoError if the file was opened and successfully closed.
	// Returns another error otherwise.
	Close(f interface{}) core.Error

	// Write writes to the provided tract.
	Write(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)

	// Read reads from the provided tract.
	Read(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)

	// Scrub scrubs the provided tract (read and verify checksums, then discard data).
	// Returns the size of the file.
	Scrub(id core.TractID) (int64, core.Error)

	// Size returns the size of the provided tract.
	Size(f interface{}) (int64, core.Error)

	// Delete removes the provided tract.
	Delete(id core.TractID) core.Error

	// OpenDir gets an iterator over the full set of tracts on the disk. The
	// value returned can be passed to ReadDir and CloseDir.
	OpenDir() (interface{}, core.Error)

	// ReadDir reads some tract IDs from the disk. It will return ErrEOF when
	// it's done iterating through the directory.
	ReadDir(d interface{}) ([]core.TractID, core.Error)

	// CloseDir closes a directory iterator opened by OpenDir.
	CloseDir(d interface{}) core.Error

	// Getxattr gets the value of xattr named 'name' in the open tract 'f'.
	Getxattr(f interface{}, name string) ([]byte, core.Error)

	// Setxattr sets xattr named 'name' to value 'value' in the open tract 'f'.
	Setxattr(f interface{}, name string, value []byte) core.Error

	// Statfs returns heavyweight information about the disk under management.
	Statfs() core.FsStatus

	// Return lightweight information about the health of the disk under management.
	Status() core.DiskStatus

	// Sets control flags for this disk. This is persistent: these flags will
	// persist unless changed with another call to SetControlFlags, or another
	// mechanism specific to this disk.
	SetControlFlags(core.DiskControlFlags) core.Error

	// Stop causes the disk to release all allocated resources and return ErrDiskRemoved
	// to all subsequent calls except Close, CloseDir, and Status.
	Stop()
}

Disk is a wrapper around an actual disk that we use for testing.

Disk is thread-safe in general, BUT: each individual tract must be used in a way that is compatible with a read-write lock. That is, multiple threads may have a single tract open for read at once, but at most one thread may have a tract open for writing. If multiple threads try to write to a tract concurrently, it may get corrupted. Disk may return an error in that case, but is not required to.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

A Manager prioritizes I/O to a disk and executes it.

This is the set of errors that can be returned from methods on Manager and the implied semantics:

core.NoError -- everything was fine. core.ErrEOF -- could not read as much data as asked to. core.ErrNoSpace -- the filesystem is full. core.ErrInvalidArgument -- a bad file descriptor or similar, shouldn't see this but don't know what steps to take. core.ErrCorruptData -- the tract is corrupt and should be reported to the curator. core.ErrIO -- the filesystem may be corrupt, all tracts are suspect. core.ErrDiskRemoved -- Stop has been called.

func NewManager

func NewManager(root string, config *Config) (*Manager, error)

NewManager creates a new Manager to manage the disk rooted at 'root'.

NewManager is thread-safe in that it will never produce *corrupt data* when handling parallel mutations to a file, but it may produce inconsistent data.

func (*Manager) Close

func (m *Manager) Close(f interface{}) core.Error

Close closes an Open'd file. After calling Close on a file, the caller must not use the file handle for any future operation (even if Close returns an error).

func (*Manager) CloseDir

func (m *Manager) CloseDir(d interface{}) core.Error

CloseDir closes the directory.

func (*Manager) Delete

func (m *Manager) Delete(id core.TractID) core.Error

Delete removes the data identified by tract 'id'.

func (*Manager) Getxattr

func (m *Manager) Getxattr(f interface{}, name string) ([]byte, core.Error)

Getxattr returns the value of the xattr named 'name' in the open tract 'f'.

func (*Manager) Open

func (m *Manager) Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error)

Open creates (if necessary) and opens a file. The opaque value returned is a file handle and may be used in future calls to Manager.

func (*Manager) OpenDir

func (m *Manager) OpenDir() (interface{}, core.Error)

OpenDir returns a value that can be used to read all tract ids.

func (*Manager) Read

func (m *Manager) Read(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)

Read reads len(b) bytes into 'b' from offset 'off' in the open tract 'f'.

func (*Manager) ReadDir

func (m *Manager) ReadDir(d interface{}) ([]core.TractID, core.Error)

ReadDir reads a set of tract ids.

func (*Manager) Scrub

func (m *Manager) Scrub(id core.TractID) (int64, core.Error)

Scrub checks whether or not the tract 'id' is corrupt.

func (*Manager) SetConfig

func (m *Manager) SetConfig(ncfg *Config)

SetConfig notifies the Manager of a new configuration. Not all fields support dynamic configuration (only one does so far: worker count).

func (*Manager) SetControlFlags

func (m *Manager) SetControlFlags(flags core.DiskControlFlags) core.Error

SetControlFlags sets flags for this disk. Note that flags may take effect even if an error is returned.

func (*Manager) Setxattr

func (m *Manager) Setxattr(f interface{}, name string, value []byte) core.Error

Setxattr sets the xattr named 'name' to value 'value' in the open tract 'f'.

func (*Manager) Size

func (m *Manager) Size(f interface{}) (int64, core.Error)

Size returns the size of the data stored in the tract.

func (*Manager) Statfs

func (m *Manager) Statfs() core.FsStatus

Statfs returns information about the underlying filesystem.

func (*Manager) Status

func (m *Manager) Status() core.DiskStatus

Status returns lightweight health information about this disk.

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the manager and all associated goroutines.

func (*Manager) String

func (m *Manager) String() string

String returns the name of the disk, for logging.

func (*Manager) Write

func (m *Manager) Write(ctx context.Context, f interface{}, b []byte, off int64) (int, core.Error)

Write writes len(b) bytes from 'b' starting at offset 'off' in the open tract 'f'.

type MemDisk

type MemDisk struct {
	// contains filtered or unexported fields
}

MemDisk is a memory-only implementation of the Disk interface that is useful for testing. Files are stored in a map.

func NewMemDisk

func NewMemDisk() *MemDisk

NewMemDisk returns a new MemDisk.

func (*MemDisk) Close

func (m *MemDisk) Close(f interface{}) core.Error

Close closes an open tract.

func (*MemDisk) CloseDir

func (m *MemDisk) CloseDir(d interface{}) core.Error

CloseDir closes the directory.

func (*MemDisk) Delete

func (m *MemDisk) Delete(id core.TractID) core.Error

Delete removes a tract from the MemDisk.

func (*MemDisk) Getxattr

func (m *MemDisk) Getxattr(f interface{}, name string) ([]byte, core.Error)

Getxattr gets the value for the named xattr.

func (*MemDisk) Open

func (m *MemDisk) Open(ctx context.Context, id core.TractID, flags int) (interface{}, core.Error)

Open opens a tract.

func (*MemDisk) OpenDir

func (m *MemDisk) OpenDir() (interface{}, core.Error)

OpenDir returns a value that can be used to read all tract ids.

func (*MemDisk) Read

func (m *MemDisk) Read(_ context.Context, f interface{}, b []byte, off int64) (int, core.Error)

Read reads from the MemDisk.

func (*MemDisk) ReadDir

func (m *MemDisk) ReadDir(d interface{}) ([]core.TractID, core.Error)

ReadDir reads a set of tract ids.

func (*MemDisk) Scrub

func (m *MemDisk) Scrub(id core.TractID) (int64, core.Error)

Scrub just assumes everything is good.

func (*MemDisk) SetControlFlags

func (m *MemDisk) SetControlFlags(flags core.DiskControlFlags) core.Error

SetControlFlags sets flags for this disk.

func (*MemDisk) Setxattr

func (m *MemDisk) Setxattr(f interface{}, name string, value []byte) core.Error

Setxattr sets the value for the named xattr.

func (*MemDisk) Size

func (m *MemDisk) Size(f interface{}) (int64, core.Error)

Size returns the size in bytes of the provided tract.

func (*MemDisk) Statfs

func (m *MemDisk) Statfs() core.FsStatus

Statfs returns fake information about the underlying "file system"

func (*MemDisk) Status

func (m *MemDisk) Status() core.DiskStatus

Status returns lightweight disk status.

func (*MemDisk) Stop

func (m *MemDisk) Stop()

Stop releases resources associated with this disk.

func (*MemDisk) Write

func (m *MemDisk) Write(_ context.Context, f interface{}, b []byte, off int64) (int, core.Error)

Write writes to the MemDisk.

type MetadataStore

type MetadataStore struct {
	// contains filtered or unexported fields
}

MetadataStore stores the metadata (master assigned id) of the tractserver. MetadataStore is thread-safe.

func NewMetadataStore

func NewMetadataStore() *MetadataStore

NewMetadataStore creates a new metadata store. Metadata will be read from and all 'disks' and merged on startup, and written to all 'disks'.

type PQAble

type PQAble interface {
	// Less returns true if the element should be ordered before its peer.
	Less(interface{}) bool
}

PQAble if two elements are comparable, the one that is smaller will be ordered before the other.

type Priority

type Priority int

The Priority of a request. We map client-supplied priorities to our own scale to reduce coupling.

const (
	LowPri     Priority = 10
	MedPri     Priority = 20
	HighPri    Priority = 30
	ScrubPri            = LowPri  // Scrub at low priority.
	ControlPri          = HighPri // Control requests get high priority.
)

Pre-defined priority levels.

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue of Requests, using the "container/heap" interface. The element with the highest priority() will be Pop()'d first.

func NewPriorityQueue

func NewPriorityQueue(max int) *PriorityQueue

NewPriorityQueue creates a new PriorityQueue with max capacity 'max'. If max is less than or equal to zero, there is no limit.

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

Len returns the number of items currently in the queue.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() PQAble

Pop removes an element from the priority queue. Blocks until an element can be removed.

func (*PriorityQueue) TryPush

func (pq *PriorityQueue) TryPush(item PQAble) error

TryPush tries to push an element to the priority queue. If there is no space in the queue, returns ErrQueueFull. Otherwise, returns nil.

type RPCCuratorTalker

type RPCCuratorTalker struct {
	// contains filtered or unexported fields
}

RPCCuratorTalker implements CuratorTalker using ConnectionCache.

func NewRPCCuratorTalker

func NewRPCCuratorTalker() *RPCCuratorTalker

NewRPCCuratorTalker returns a new RPCCuratorTalker.

func (*RPCCuratorTalker) Close

func (ct *RPCCuratorTalker) Close(curatorAddr string)

Close implements CuratorTalker.

func (*RPCCuratorTalker) CuratorTractserverHeartbeat

func (ct *RPCCuratorTalker) CuratorTractserverHeartbeat(curatorAddr string, beat core.CuratorTractserverHeartbeatReq) ([]core.PartitionID, error)

CuratorTractserverHeartbeat implements CuratorTalker.

type RPCMasterConnection

type RPCMasterConnection struct {
	// contains filtered or unexported fields
}

RPCMasterConnection encapsulates a connection to a master group. It will probe who the leader is in the master group and send requests(for now only Heartbeat) to the master leader.

func NewRPCMasterConnection

func NewRPCMasterConnection(masterSpec string) *RPCMasterConnection

NewRPCMasterConnection creates a new connection to a master group specified in 'addrs'.

func (*RPCMasterConnection) MasterTractserverHeartbeat

func (r *RPCMasterConnection) MasterTractserverHeartbeat(ctx context.Context, id core.TractserverID, addr string, disks []core.FsStatus) (core.MasterTractserverHeartbeatReply, core.Error)

MasterTractserverHeartbeat sends a heartbeat to the master and returns its reply.

func (*RPCMasterConnection) RegisterTractserver

func (r *RPCMasterConnection) RegisterTractserver(ctx context.Context, addr string) (core.RegisterTractserverReply, core.Error)

RegisterTractserver is called only once when a new tractserver is added to the cluster. It tries forever to get a tractserver ID from the master, until a valid tractserver ID is assigned.

type RPCTractserverTalker

type RPCTractserverTalker struct {
	// contains filtered or unexported fields
}

RPCTractserverTalker is a Go RPC-based implementation of TractserverTalker.

func (*RPCTractserverTalker) CtlRead

func (t *RPCTractserverTalker) CtlRead(ctx context.Context, addr string, id core.TractID, v, len int, off int64) ([]byte, core.Error)

CtlRead reads the tract (id, v) from 'addr'.

func (*RPCTractserverTalker) CtlWrite

func (t *RPCTractserverTalker) CtlWrite(ctx context.Context, addr string, id core.TractID, v int, offset int64, b []byte) (reply core.Error)

CtlWrite writes to the tract (id, v) on addr at the given offset.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the RPC server for the TractStore

func NewServer

func NewServer(store *Store, ct CuratorTalker, mc *RPCMasterConnection, cfg *Config) *Server

NewServer creates a new Server. The server does not listen for or serve requests until Start() is called on it.

func (*Server) Start

func (s *Server) Start() (err error)

Start starts the TractServer by launching goroutines to accept RPC requests.

type StatusData

type StatusData struct {
	JobName  string
	Cfg      Config
	ID       core.TractserverID
	FreeMem  uint64
	TotalMem uint64

	FsStatus []core.FsStatus

	Reboot time.Time // When was the last reboot?
	CtlRPC map[string]string
	SrvRPC map[string]string
	Now    time.Time
}

StatusData includes tractserver status info.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a tract store. A tract store is a replication layer on top of a durable local disk layer.

func NewStore

func NewStore(tt TractserverTalker, m *MetadataStore, config *Config) *Store

NewStore returns a new tract store. The caller must provide a way to store data and a way to track versions.

func (*Store) AddDisk

func (s *Store) AddDisk(disk Disk) error

AddDisk adds a disk to the store. If AddDisk returns nil, the disk is added and belongs to the store. If this returns an error, the caller should probably Stop the disk.

func (*Store) Check

func (s *Store) Check(tracts []core.TractState) (missing []core.TractState)

Check verifies that we have 'tracts', and our version for the tracts is at least the version provided. If we're missing the tract or our version is too old, we report it as missing. PL-1114

func (*Store) Config

func (s *Store) Config() *Config

Config returns the current configuration.

func (*Store) Create

func (s *Store) Create(ctx context.Context, id core.TractID, b []byte, off int64) core.Error

Create creates a tract. This involves creating an empty file on disk, then adding a version for it.

func (*Store) DiskCount

func (s *Store) DiskCount() (t int)

DiskCount the number of disks the tractserver has.

func (*Store) DiskExists

func (s *Store) DiskExists(root string) bool

DiskExists returns true if a disk with the given root exists.

func (*Store) GCTracts

func (s *Store) GCTracts(old []core.TractState, gone []core.TractID)

GCTracts reads the tract state returned from the curator and decides if a tract is stale. If so, it removes the tract from 's.Disk'. Error is returned if we fail to remove it.

We can delete our copy of the tract if it has version <= the version in the GC message.

func (*Store) GetBadTracts

func (s *Store) GetBadTracts(parts []core.PartitionID, limit int) (bad []core.TractID)

GetBadTracts returns the tracts that we know are missing or corrupt for the provided partition(s).

func (*Store) GetID

func (s *Store) GetID() core.TractserverID

GetID returns the tractserver's ID.

func (*Store) GetSomeTractsByPartition

func (s *Store) GetSomeTractsByPartition(shard uint64) map[core.PartitionID][]core.TractID

GetSomeTractsByPartition returns tract ids grouped by partition. To reduce the need to allocate very large slices of tract ids, it returns only a subset of the known tracts with each call, controlled by the low bits of 'shard'. To ensure that all tracts are visited, a caller should loop and increment shard with each call.

func (*Store) GetTractsByDisk

func (s *Store) GetTractsByDisk(i int) (out []core.TractID)

GetTractsByDisk returns all tracts on the given disk (by index).

func (*Store) HasID

func (s *Store) HasID(id core.TractserverID) bool

HasID returns true if this tractserver has a valid ID and it matches the provided 'id', false otherwise.

func (*Store) PackTracts

func (s *Store) PackTracts(ctx context.Context, length int, srcs []*core.PackTractSpec, dest core.RSChunkID) core.Error

PackTracts reads tracts from other tractservers and writes them to a local RS data chunk.

func (*Store) PullTract

func (s *Store) PullTract(ctx context.Context, sources []string, id core.TractID, version int) (err core.Error)

PullTract attempts to read the tract (id, version) from the given 'sources'. It tries each source sequentially and stops if it succeeds early. It returns the latest error if pulling from all source hosts fails.

func (*Store) RSEncode

func (s *Store) RSEncode(ctx context.Context, baseid core.RSChunkID, length int, srcs, dests []core.TSAddr, indexMap []int) (err core.Error)

RSEncode performs the RS parity computation, reading data and writing parity to other tractservers.

func (*Store) Read

func (s *Store) Read(ctx context.Context, id core.TractID, version int, length int, off int64) ([]byte, core.Error)

Read is called from the RPC layer to perform a read.

func (*Store) RemoveDisk

func (s *Store) RemoveDisk(root string) error

RemoveDisk removes a disk from the store, identified by its root. This will Stop the disk. All disk-related goroutines and resources may not be freed by the time this returns, but they should be soon after.

func (*Store) SetConfig

func (s *Store) SetConfig(ncfg Config)

SetConfig notifies the Store of a new configuration. Not all fields support dynamic configuration changes.

func (*Store) SetControlFlags

func (s *Store) SetControlFlags(root string, flags core.DiskControlFlags) (err core.Error)

SetControlFlags changes control flags on one disk (specified by root). If the given root is unknown, it returns ErrFileNotFound.

func (*Store) SetID

SetID sets the tractserver's id to 'id'.

func (*Store) SetVersion

func (s *Store) SetVersion(id core.TractID, newVersion int, conditionalStamp uint64) (finalVersion int, err core.Error)

SetVersion tries to set the version for an existing tract 'id' to 'newVersion'. The rule is that 'newVersion' must be <= ourVersion+1. Then the version is set to max(ourVersion, newVersion) and this value is returned.

On success, returns the version number of the tract (may be unmodified) and core.NoError. On failure, returns another core.Error.

func (*Store) Stat

func (s *Store) Stat(ctx context.Context, id core.TractID, version int) (int64, uint64, core.Error)

Stat is called from the RPC layer to get the size and mod stamp of a tract.

func (*Store) Write

func (s *Store) Write(ctx context.Context, id core.TractID, version int, b []byte, off int64) core.Error

Write is called from the RPC layer to write to a tract. See curator/rereplicate.go for the details of how writes and version numbers interact.

type TSCtlHandler

type TSCtlHandler struct {
	// contains filtered or unexported fields
}

TSCtlHandler handles all control messages.

func (*TSCtlHandler) CheckTracts

func (h *TSCtlHandler) CheckTracts(req core.CheckTractsReq, reply *core.CheckTractsReply) error

CheckTracts is a request from a curator to verify that we have the tracts the curator thinks we do.

func (*TSCtlHandler) CtlRead

func (h *TSCtlHandler) CtlRead(req core.ReadReq, reply *core.ReadReply) error

CtlRead reads from a tract, bypassing request limits. CtlReads are not cancellable.

func (*TSCtlHandler) CtlStatTract

func (h *TSCtlHandler) CtlStatTract(req core.StatTractReq, reply *core.StatTractReply) error

CtlStatTract returns the size of a tract.

func (*TSCtlHandler) CtlWrite

func (h *TSCtlHandler) CtlWrite(req core.WriteReq, reply *core.Error) error

CtlWrite does a write to a tract, bypassing request limits. Unlike regular Write, CtlWrite creates the tract when Off is zero.

func (*TSCtlHandler) GCTract

func (h *TSCtlHandler) GCTract(req core.GCTractReq, reply *core.Error) error

GCTract is a request to garbage collect the provided tracts as we don't need to store them anymore.

func (*TSCtlHandler) GetTSID

func (h *TSCtlHandler) GetTSID(req struct{}, reply *core.TractserverID) error

GetTSID returns the id of this tractserver. This is only used for testing purposes and not in production.

func (*TSCtlHandler) PackTracts

func (h *TSCtlHandler) PackTracts(req core.PackTractsReq, reply *core.Error) error

PackTracts instructs the tractserver to read a bunch of tracts from other tractservers and write them to one RS data chunk on the local tractserver.

func (*TSCtlHandler) PullTract

func (h *TSCtlHandler) PullTract(req core.PullTractReq, reply *core.Error) error

PullTract copies a tract from an existing tractserver.

func (*TSCtlHandler) RSEncode

func (h *TSCtlHandler) RSEncode(req core.RSEncodeReq, reply *core.Error) error

RSEncode instructs the tractserver to read a bunch of RS data chunks, perform the RS parity computation, and write out parity chunks to other tractservers.

func (*TSCtlHandler) SetVersion

func (h *TSCtlHandler) SetVersion(req core.SetVersionReq, reply *core.SetVersionReply) error

SetVersion sets the version of a tract. This is done by the curator when it changes replication group membership.

type TSSrvHandler

type TSSrvHandler struct {
	// contains filtered or unexported fields
}

TSSrvHandler handles all client requests.

func (*TSSrvHandler) Cancel

func (h *TSSrvHandler) Cancel(id string, reply *core.Error) error

Cancel attempts to signal the pending operation identified by 'id' that it should be canceled.

func (*TSSrvHandler) CreateTract

func (h *TSSrvHandler) CreateTract(req core.CreateTractReq, reply *core.Error) error

CreateTract creates a tract and writes data to it. Upon success, the tract can be further Read/Write'd, and will have version 1.

func (*TSSrvHandler) GetDiskInfo

func (h *TSSrvHandler) GetDiskInfo(req core.GetDiskInfoReq, reply *core.GetDiskInfoReply) error

GetDiskInfo returns a summary of disk info in reply.

func (*TSSrvHandler) Read

func (h *TSSrvHandler) Read(req core.ReadReq, reply *core.ReadReply) error

Read reads from a tract.

func (*TSSrvHandler) SetControlFlags

func (h *TSSrvHandler) SetControlFlags(req core.SetControlFlagsReq, reply *core.Error) error

SetControlFlags changes control flags for a disk.

func (*TSSrvHandler) StatTract

func (h *TSSrvHandler) StatTract(req core.StatTractReq, reply *core.StatTractReply) error

StatTract returns the size of a tract.

func (*TSSrvHandler) Write

func (h *TSSrvHandler) Write(req core.WriteReq, reply *core.Error) error

Write does a write to a tract on this tractserver.

type TractserverTalker

type TractserverTalker interface {
	// CtlRead does a normal read of the tract (id, version) from 'addr'.
	CtlRead(ctx context.Context, addr string, id core.TractID, version int, len int, offset int64) ([]byte, core.Error)

	// CtlWrite writes data to a tract or RS chunk.
	CtlWrite(ctx context.Context, addr string, id core.TractID, v int, offset int64, b []byte) core.Error
}

TractserverTalker manages talking to other tractservers to replicate data.

func NewRPCTractserverTalker

func NewRPCTractserverTalker() TractserverTalker

NewRPCTractserverTalker returns a new RPCTractserverTalker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL