controller

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package controller provides the core Controller struct.

Index

Constants

View Source
const (
	// ErrCodeCustom can be used to return a custom error message.
	ErrCodeCustom errors.Code = "CustomError"

	// ErrCodeInternal can be used when the cause of an error can't be
	// determined. It can be accompanied by a single string message.
	ErrCodeInternal errors.Code = "InternalError"

	// ErrCodeTODO can be used as a placeholder until a proper error code is
	// created and assigned.
	ErrCodeTODO errors.Code = "TODOError"

	ErrCodeNodeExists      errors.Code = "NodeExists"
	ErrCodeNodeKeyInvalid  errors.Code = "NodeKeyInvalid"
	ErrCodeNoAvailableNode errors.Code = "NoAvailableNode"

	ErrCodeRoleTypeInvalid errors.Code = "RoleTypeInvalid"

	ErrCodeDirectiveSendFailure errors.Code = "DirectiveSendFailure"

	ErrCodeInvalidRequest errors.Code = "InvalidRequest"

	ErrCodeUnassignedJobs errors.Code = "UnassignedJobs"

	UndefinedErrorMessage string = "undefined message format"
)

Variables

This section is empty.

Functions

func NewErrCustom

func NewErrCustom() error

NewErrCustom can be used to return a custom error message.

func NewErrDirectiveSendFailure

func NewErrDirectiveSendFailure(msg string) error

func NewErrInternal

func NewErrInternal(msg string) error

NewErrInternal can be used when the cause of an error can't be determined. It can be accompanied by a single string message.

func NewErrInvalidRequest

func NewErrInvalidRequest(msg string) error

func NewErrNoAvailableNode

func NewErrNoAvailableNode() error

func NewErrNodeExists

func NewErrNodeExists(addr dax.Address) error

func NewErrNodeKeyInvalid

func NewErrNodeKeyInvalid(addr dax.Address) error

func NewErrRoleTypeInvalid

func NewErrRoleTypeInvalid(roleType dax.RoleType) error

func NewErrUnassignedJobs

func NewErrUnassignedJobs(jobs []dax.Job) error

func NewNopNodeService

func NewNopNodeService() *nopNodeService

Types

type AddressSet

type AddressSet map[dax.Address]struct{}

AddressSet is a set of strings.

func NewAddressSet

func NewAddressSet() AddressSet

func (AddressSet) Add

func (s AddressSet) Add(p dax.Address)

func (AddressSet) Contains

func (s AddressSet) Contains(p dax.Address) bool

func (AddressSet) Merge

func (s AddressSet) Merge(o AddressSet)

func (AddressSet) Minus

func (s AddressSet) Minus(m AddressSet) []dax.Address

func (AddressSet) Remove

func (s AddressSet) Remove(p dax.Address)

func (AddressSet) SortedSlice

func (s AddressSet) SortedSlice() []dax.Address

type Balancer

type Balancer interface {
	// AddWorker adds a worker to the global pool of available workers.
	AddWorker(tx dax.Transaction, node *dax.Node) ([]dax.WorkerDiff, error)

	// RemoveWorker removes a worker from the system. If the worker is currently
	// assigned to a database and has jobs, it will be removed and its jobs will
	// be either transferred to other workers or placed on the free job list.
	RemoveWorker(tx dax.Transaction, addr dax.Address) ([]dax.WorkerDiff, error)

	// FreeWorkers dissociates the given workers from a database.
	FreeWorkers(tx dax.Transaction, addrs ...dax.Address) error

	// AddJobs adds new jobs for the given database.
	AddJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

	// RemoveJobs removes jobs for the given database.
	RemoveJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

	// BalanceDatabase forces a database balance. TODO(tlt): currently this is
	// only used in tests, so perhaps we can get rid of it.
	BalanceDatabase(tx dax.Transaction, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerDiff, error)

	// CurrentState returns the workers and jobs currently active for the given
	// database.
	CurrentState(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerInfo, error)

	// WorkerState returns the jobs currently active for the given worker.
	WorkerState(tx dax.Transaction, roleType dax.RoleType, addr dax.Address) (dax.WorkerInfo, error)

	// WorkersForJobs returns the workers and jobs currently responsible for the
	// given jobs.
	WorkersForJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID, jobs ...dax.Job) ([]dax.WorkerInfo, error)

	// WorkersForTable returns the workers responsible for any job related to
	// the given table.
	WorkersForTable(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID) ([]dax.WorkerInfo, error)

	// ReadNode returns the node for the given address.
	ReadNode(tx dax.Transaction, addr dax.Address) (*dax.Node, error)

	// Nodes returns all nodes known by the Balancer.
	Nodes(tx dax.Transaction) ([]*dax.Node, error)
}

type Config

type Config struct {
	Director Director

	// Poller
	PollInterval time.Duration `toml:"poll-interval"`

	// Storage
	StorageMethod string `toml:"storage-method"`

	SQLDB *SQLDBConfig `toml:"sqldb"`

	SnapshotterDir string `toml:"snapshotter-dir"`
	WriteloggerDir string `toml:"writelogger-dir"`

	// RegistrationBatchTimeout is the time that the controller will
	// wait after a node registers itself to see if any more nodes
	// will register before sending out directives to all nodes which
	// have been registered.
	RegistrationBatchTimeout time.Duration

	// SnappingTurtleTimeout is the period on which the automatic
	// snapshotting routine will run. If performing all the snapshots
	// takes longer than this amount of time, snapshotting will run
	// continuously. If it finishes before the timeout, it will wait
	// until the timeout expires to start another round of snapshots.
	SnappingTurtleTimeout time.Duration

	Logger logger.Logger `toml:"-"`
}

type Controller

type Controller struct {
	// Schemar is used by the controller to get table, and other schema,
	// information.
	Schemar  schemar.Schemar
	Balancer Balancer

	Transactor Transactor

	Snapshotter *snapshotter.Snapshotter
	Writelogger *writelogger.Writelogger

	// Director is used to send directives to computer workers.
	Director Director

	DirectiveVersion dax.DirectiveVersion
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *Controller

New returns a new instance of Controller with default values.

func (*Controller) AddAddresses

func (c *Controller) AddAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) CheckInNode

func (c *Controller) CheckInNode(ctx context.Context, n *dax.Node) error

CheckInNode handles a "check-in" from a compute node. These come periodically, and if the controller already knows about the compute node, it can simply no-op. If, however, the controller is not aware of the node checking in, then that probably means that the poller has removed that node from its list (perhaps due to a network fault) and therefore the node needs to be re-registered.

func (*Controller) ComputeNodes

func (c *Controller) ComputeNodes(ctx context.Context, qtid dax.QualifiedTableID, shards dax.ShardNums) ([]dax.ComputeNode, error)

ComputeNodes returns the compute nodes for the given table/shards. It always uses a read transaction. The writable equivalent to this method is `IngestShard`.

func (*Controller) CreateDatabase

func (c *Controller) CreateDatabase(ctx context.Context, qdb *dax.QualifiedDatabase) error

CreateDatabase adds a database to the schemar.

func (*Controller) CreateField

func (c *Controller) CreateField(ctx context.Context, qtid dax.QualifiedTableID, fld *dax.Field) error

func (*Controller) CreateNode

func (c *Controller) CreateNode(context.Context, dax.Address, *dax.Node) error

func (*Controller) CreateTable

func (c *Controller) CreateTable(ctx context.Context, qtbl *dax.QualifiedTable) error

CreateTable adds a table to the schemar, and then sends directives to all affected nodes based on the change.

func (*Controller) CurrentState added in v3.33.0

func (c *Controller) CurrentState(ctx context.Context) ([]dax.WorkerInfo, error)

func (*Controller) DatabaseByID

func (c *Controller) DatabaseByID(ctx context.Context, qdbid dax.QualifiedDatabaseID) (*dax.QualifiedDatabase, error)

DatabaseByID returns the database for the given id.

func (*Controller) DatabaseByName

func (c *Controller) DatabaseByName(ctx context.Context, orgID dax.OrganizationID, dbname dax.DatabaseName) (*dax.QualifiedDatabase, error)

DatabaseByName returns the database for the given name.

func (*Controller) Databases

func (c *Controller) Databases(ctx context.Context, orgID dax.OrganizationID, ids ...dax.DatabaseID) ([]*dax.QualifiedDatabase, error)

func (*Controller) DebugNodes

func (c *Controller) DebugNodes(ctx context.Context) ([]*dax.Node, error)

func (*Controller) DeleteNode

func (c *Controller) DeleteNode(context.Context, dax.Address) error

func (*Controller) DeregisterNodes

func (c *Controller) DeregisterNodes(ctx context.Context, addresses ...dax.Address) error

DeregisterNodes removes nodes from the controller's list of registered nodes. It sends directives to the removed nodes, but ignores errors.

func (*Controller) DropDatabase

func (c *Controller) DropDatabase(ctx context.Context, qdbid dax.QualifiedDatabaseID) error

func (*Controller) DropField

func (c *Controller) DropField(ctx context.Context, qtid dax.QualifiedTableID, fldName dax.FieldName) error

func (*Controller) DropTable

func (c *Controller) DropTable(ctx context.Context, qtid dax.QualifiedTableID) error

DropTable removes a table from the schema and sends directives to all affected nodes based on the change.

func (*Controller) IngestPartition

func (c *Controller) IngestPartition(ctx context.Context, qtid dax.QualifiedTableID, partition dax.PartitionNum) (dax.Address, error)

func (*Controller) IngestShard

func (c *Controller) IngestShard(ctx context.Context, qtid dax.QualifiedTableID, shrdNum dax.ShardNum) (dax.Address, error)

IngestShard handles an ingest shard request.

func (*Controller) Logger added in v3.35.0

func (c *Controller) Logger() logger.Logger

func (*Controller) Nodes

func (c *Controller) Nodes(ctx context.Context) ([]*dax.Node, error)

func (*Controller) ReadNode

func (c *Controller) ReadNode(context.Context, dax.Address) (*dax.Node, error)

func (*Controller) RegisterNode

func (c *Controller) RegisterNode(ctx context.Context, n *dax.Node) error

RegisterNode adds a node to the controller's list of registered nodes. It makes no guarantees about when the node will actually be used for anything or assigned any jobs.

func (*Controller) RegisterNodes

func (c *Controller) RegisterNodes(ctx context.Context, nodes ...*dax.Node) error

RegisterNodes adds nodes to the controller's list of registered nodes.

func (*Controller) RemoveAddresses

func (c *Controller) RemoveAddresses(ctx context.Context, addrs ...dax.Address) error

func (*Controller) RemoveShards

func (c *Controller) RemoveShards(ctx context.Context, qtid dax.QualifiedTableID, shards ...dax.ShardNum) error

RemoveShards deregisters the table/shard combinations with the controller and sends the necessary directives.

func (*Controller) SetDatabaseOption added in v3.32.0

func (c *Controller) SetDatabaseOption(ctx context.Context, qdbid dax.QualifiedDatabaseID, option string, value string) error

SetDatabaseOption sets the option on the given database.

func (*Controller) SnapshotFieldKeys

func (c *Controller) SnapshotFieldKeys(ctx context.Context, qtid dax.QualifiedTableID, field dax.FieldName) error

SnapshotFieldKeys forces the translate node responsible for the given field to snapshot the keys for that field, then increment its version for logs written to the Writelogger.

func (*Controller) SnapshotShardData

func (c *Controller) SnapshotShardData(ctx context.Context, qtid dax.QualifiedTableID, shardNum dax.ShardNum) error

SnapshotShardData forces the compute node responsible for the given shard to snapshot that shard, then increment its shard version for logs written to the Writelogger.

func (*Controller) SnapshotTable

func (c *Controller) SnapshotTable(ctx context.Context, qtid dax.QualifiedTableID) error

SnapshotTable snapshots a table. It might also snapshot everything else... no guarantees here, only used in tests as of this writing.

func (*Controller) SnapshotTableKeys

func (c *Controller) SnapshotTableKeys(ctx context.Context, qtid dax.QualifiedTableID, partitionNum dax.PartitionNum) error

SnapshotTableKeys forces the translate node responsible for the given partition to snapshot the table keys for that partition, then increment its version for logs written to the Writelogger.

func (*Controller) Start

func (c *Controller) Start() error

Start starts long running subroutines.

func (*Controller) Stop

func (c *Controller) Stop() error

Stop stops the node registration routine.

func (*Controller) TableByID

func (c *Controller) TableByID(ctx context.Context, qtid dax.QualifiedTableID) (*dax.QualifiedTable, error)

TableByID returns a table by quaified table id.

func (*Controller) TableByName

func (c *Controller) TableByName(ctx context.Context, qdbid dax.QualifiedDatabaseID, name dax.TableName) (*dax.QualifiedTable, error)

TableByName gets the full table by name.

func (*Controller) TableID

TableID returns the table id by table name. TODO(tlt): try to phase this out in favor of TableByName().

func (*Controller) Tables

func (c *Controller) Tables(ctx context.Context, qdbid dax.QualifiedDatabaseID, ids ...dax.TableID) ([]*dax.QualifiedTable, error)

Tables returns a list of tables by name.

func (*Controller) TranslateNodes

func (c *Controller) TranslateNodes(ctx context.Context, qtid dax.QualifiedTableID, partitions dax.PartitionNums) ([]dax.TranslateNode, error)

TranslateNodes returns the translate nodes for the given table/partitions. It always uses a read transaction. The writable equivalent to this method is `IngestPartition`.

type Director

type Director interface {
	SendDirective(ctx context.Context, dir *dax.Directive) error
	SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error
	SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error
	SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error
}

type NewBalancerFn

type NewBalancerFn func(string, logger.Logger) Balancer

type NodeService

type NodeService interface {
	CreateNode(dax.Transaction, dax.Address, *dax.Node) error
	ReadNode(dax.Transaction, dax.Address) (*dax.Node, error)
	DeleteNode(dax.Transaction, dax.Address) error
	Nodes(dax.Transaction) ([]*dax.Node, error)
}

NodeService represents a service for managing Nodes.

type NopBalancer

type NopBalancer struct{}

NopBalancer is a no-op implementation of the Balancer interface.

func NewNopBalancer

func NewNopBalancer() *NopBalancer

func (*NopBalancer) AddJobs

func (b *NopBalancer) AddJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

func (*NopBalancer) AddWorker

func (b *NopBalancer) AddWorker(tx dax.Transaction, node *dax.Node) ([]dax.WorkerDiff, error)

func (*NopBalancer) BalanceDatabase

func (b *NopBalancer) BalanceDatabase(tx dax.Transaction, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerDiff, error)

func (*NopBalancer) CurrentState

func (b *NopBalancer) CurrentState(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID) ([]dax.WorkerInfo, error)

func (*NopBalancer) FreeWorkers added in v3.35.0

func (b *NopBalancer) FreeWorkers(tx dax.Transaction, addrs ...dax.Address) error

func (*NopBalancer) Nodes

func (b *NopBalancer) Nodes(tx dax.Transaction) ([]*dax.Node, error)

func (*NopBalancer) ReadNode

func (b *NopBalancer) ReadNode(tx dax.Transaction, addr dax.Address) (*dax.Node, error)

func (*NopBalancer) RemoveJobs

func (b *NopBalancer) RemoveJobs(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID, jobs ...dax.Job) ([]dax.WorkerDiff, error)

func (*NopBalancer) RemoveWorker

func (b *NopBalancer) RemoveWorker(tx dax.Transaction, addr dax.Address) ([]dax.WorkerDiff, error)

func (*NopBalancer) WorkerState

func (b *NopBalancer) WorkerState(tx dax.Transaction, roleType dax.RoleType, addr dax.Address) (dax.WorkerInfo, error)

func (*NopBalancer) WorkersForJobs

func (b *NopBalancer) WorkersForJobs(tx dax.Transaction, roleType dax.RoleType, qdbid dax.QualifiedDatabaseID, jobs ...dax.Job) ([]dax.WorkerInfo, error)

func (*NopBalancer) WorkersForTable

func (b *NopBalancer) WorkersForTable(tx dax.Transaction, roleType dax.RoleType, qtid dax.QualifiedTableID) ([]dax.WorkerInfo, error)

type NopDirector

type NopDirector struct{}

NopDirector is a no-op implementation of the Director interface.

func NewNopDirector

func NewNopDirector() *NopDirector

func (*NopDirector) SendDirective

func (d *NopDirector) SendDirective(ctx context.Context, dir *dax.Directive) error

func (*NopDirector) SendSnapshotFieldKeysRequest

func (d *NopDirector) SendSnapshotFieldKeysRequest(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error

func (*NopDirector) SendSnapshotShardDataRequest

func (d *NopDirector) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error

func (*NopDirector) SendSnapshotTableKeysRequest

func (d *NopDirector) SendSnapshotTableKeysRequest(ctx context.Context, req *dax.SnapshotTableKeysRequest) error

type SQLDBConfig added in v3.35.0

type SQLDBConfig struct {
	// Dialect is the pop dialect to use. Example: "postgres" or "sqlite3" or "mysql"
	Dialect string
	// The name of your database. Example: "foo_development"
	Database string
	// The host of your database. Example: "127.0.0.1"
	Host string
	// The port of your database. Example: 1234
	// Will default to the "default" port for each dialect.
	Port string
	// The username of the database user. Example: "root"
	User string
	// The password of the database user. Example: "password"
	Password string
	// Instead of specifying each individual piece of the
	// connection you can instead just specify the URL of the
	// database. Example: "postgres://postgres:postgres@localhost:5432/pop_test?sslmode=disable"
	URL string
	// Defaults to 0 "unlimited". See https://golang.org/pkg/database/sql/#DB.SetMaxOpenConns
	Pool int
	// Defaults to 2. See https://golang.org/pkg/database/sql/#DB.SetMaxIdleConns
	IdlePool int
	// Defaults to 0 "unlimited". See https://golang.org/pkg/database/sql/#DB.SetConnMaxLifetime
	ConnMaxLifetime time.Duration
	// Defaults to 0 "unlimited". See https://golang.org/pkg/database/sql/#DB.SetConnMaxIdleTime
	ConnMaxIdleTime time.Duration
}

func NewSQLDBConfig added in v3.35.0

func NewSQLDBConfig() *SQLDBConfig

type StringSet

type StringSet map[string]struct{}

StringSet is a set of strings.

func NewStringSet

func NewStringSet() StringSet

func (StringSet) Add

func (s StringSet) Add(p string)

func (StringSet) Contains

func (s StringSet) Contains(p string) bool

func (StringSet) Minus

func (s StringSet) Minus(m StringSet) []string

func (StringSet) Remove

func (s StringSet) Remove(p string)

func (StringSet) SortedSlice

func (s StringSet) SortedSlice() []string

type TableSet

type TableSet map[dax.TableKey]struct{}

TableSet is a set of strings.

func NewTableSet

func NewTableSet() TableSet

func (TableSet) Add

func (s TableSet) Add(t dax.TableKey)

func (TableSet) Contains

func (s TableSet) Contains(t dax.TableKey) bool

func (TableSet) Minus

func (s TableSet) Minus(m TableSet) dax.TableKeys

func (TableSet) QualifiedSortedSlice

func (s TableSet) QualifiedSortedSlice() map[dax.QualifiedDatabaseID]dax.TableIDs

func (TableSet) Remove

func (s TableSet) Remove(t dax.TableKey)

func (TableSet) SortedSlice

func (s TableSet) SortedSlice() dax.TableKeys

type Transactor added in v3.35.0

type Transactor interface {
	// Start is useful for Transactor implementations which need to establish a
	// connection. We don't want to do that in the NewImplementation() function;
	// we want that to happen upon Start().
	Start() error

	BeginTx(ctx context.Context, writable bool) (dax.Transaction, error)
	Close() error
}

Directories

Path Synopsis
Package balancer is an implementation of the controller.Balancer interface.
Package balancer is an implementation of the controller.Balancer interface.
Package client is an HTTP client for Controller.
Package client is an HTTP client for Controller.
Package http provides the http implementation of the Director interface.
Package http provides the http implementation of the Director interface.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.
Package partitioner provides the Partitioner type, which provides helper methods for determining partitions based on string keys.
Package poller provides the core Poller struct.
Package poller provides the core Poller struct.
Package schemar provides the core Schemar interface.
Package schemar provides the core Schemar interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL