cassandra

package
v0.0.0-...-ddc1a4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FillObject

func FillObject(data map[string]interface{}, object interface{}, objType reflect.Type) error

FillObject fills the data from DB into an object

func Less

func Less(orderByList []*query.OrderBy, t1 *task.TaskInfo, t2 *task.TaskInfo) bool

Less function holds the task sorting logic

func SetObjectField

func SetObjectField(object interface{}, fieldName string, value interface{}) error

SetObjectField sets a field in object with the fieldname with the value

func ToOrmConfig

func ToOrmConfig(c *Config) *ormcassandra.Config

ToOrmConfig is needed to generate ORM config from legacy config so that the ORM code doesn't depend on legacy storage code and can be imported into the legacy code

Types

type Config

type Config struct {
	CassandraConn *impl.CassandraConn `yaml:"connection"`
	StoreName     string              `yaml:"store_name"`
	Migrations    string              `yaml:"migrations"`
	// MaxParallelBatches controls the maximum number of go routines run to create tasks
	MaxParallelBatches int `yaml:"max_parallel_batches"`
	// MaxUpdatesPerJob controls the maximum number of
	// updates per job kept in the database
	MaxUpdatesPerJob int `yaml:"max_updates_job"`
	// Replication controls the replication config of the keyspace
	Replication *Replication `yaml:"replication"`
}

Config is the config for cassandra Store

func GenerateTestCassandraConfig

func GenerateTestCassandraConfig() *Config

GenerateTestCassandraConfig generates a test config for local C* client This is meant for sharing testing code only, not for production

func (*Config) AutoMigrate

func (c *Config) AutoMigrate() []error

AutoMigrate migrates the db schemas for cassandra

func (*Config) MigrateString

func (c *Config) MigrateString() string

MigrateString returns the db string required for database migration The code assumes that the keyspace (indicated by StoreName) is already created

type FrameworkInfoRecord

type FrameworkInfoRecord struct {
	FrameworkName string    `cql:"framework_name"`
	FrameworkID   string    `cql:"framework_id"`
	MesosStreamID string    `cql:"mesos_stream_id"`
	UpdateTime    time.Time `cql:"update_time"`
	UpdateHost    string    `cql:"update_host"`
}

FrameworkInfoRecord tracks the framework info

type JobConfigRecord

type JobConfigRecord struct {
	JobID        querybuilder.UUID `cql:"job_id"`
	Version      int
	CreationTime time.Time `cql:"creation_time"`
	Config       []byte
	ConfigAddOn  []byte `cql:"config_addon"`
}

JobConfigRecord correspond to a peloton job config.

func (*JobConfigRecord) GetConfigAddOn

func (j *JobConfigRecord) GetConfigAddOn() (*models.ConfigAddOn, error)

GetConfigAddOn returns the unmarshaled models.ConfigAddOn

func (*JobConfigRecord) GetJobConfig

func (j *JobConfigRecord) GetJobConfig() (*job.JobConfig, error)

GetJobConfig returns the unmarshaled job.JobConfig

type JobRuntimeRecord

type JobRuntimeRecord struct {
	JobID       querybuilder.UUID `cql:"job_id"`
	State       string            `cql:"state"`
	UpdateTime  time.Time         `cql:"update_time"`
	RuntimeInfo []byte            `cql:"runtime_info"`
}

JobRuntimeRecord contains job runtime info

func (*JobRuntimeRecord) GetJobRuntime

func (t *JobRuntimeRecord) GetJobRuntime() (*job.RuntimeInfo, error)

GetJobRuntime returns the job.Runtime from a JobRecord table record

type Migrator

type Migrator struct {
	Config *Config
}

Migrator manages the keyspace schema versions

func NewMigrator

func NewMigrator(config *Config) (*Migrator, error)

NewMigrator creates a DB migrator

func (*Migrator) UpSync

func (m *Migrator) UpSync() []error

UpSync applies the schema migrations to new version

func (*Migrator) Version

func (m *Migrator) Version() (uint64, error)

Version returns the current version of migration. Returns 0 in error case.

type PersistentVolumeRecord

type PersistentVolumeRecord struct {
	VolumeID      string `cql:"volume_id"`
	JobID         string `cql:"job_id"`
	InstanceID    int    `cql:"instance_id"`
	Hostname      string
	State         string
	GoalState     string    `cql:"goal_state"`
	SizeMB        int       `cql:"size_mb"`
	ContainerPath string    `cql:"container_path"`
	CreateTime    time.Time `cql:"creation_time"`
	UpdateTime    time.Time `cql:"update_time"`
}

PersistentVolumeRecord contains persistent volume info.

type Replica

type Replica struct {
	// Name of the replica config, i.e. replication_factor for
	// SimpleStategy or datacenter1 for NetworkTopologyStrategy
	Name string `yaml:"name"`
	// Value of the replica config such as number of replicas
	Value int `yaml:"value"`
}

Replica is the config for Cassandra replicas

type Replication

type Replication struct {
	// Strategy controls the replication strategy. Only two strategies
	// are supported: SimpleStrategy and NetworkTopologyStrategy
	Strategy string `yaml:"strategy"`
	// Replicas controls the number of replicas of the keyspace. For
	// SimpleStrategy, it is a single replication_factor like 3. For
	// NetworkTopologyStrategy, it will be a list of <datacenter,
	// replicas> pairs like {'dc1': '3', 'dc2': '3'}
	Replicas []*Replica `yaml:"replicas"`
}

Replication is the config for Cassandra replication

type ResourcePoolRecord

type ResourcePoolRecord struct {
	RespoolID     string `cql:"respool_id"`
	RespoolConfig string `cql:"respool_config"`
	Owner         string
	CreationTime  time.Time `cql:"creation_time"`
	UpdateTime    time.Time `cql:"update_time"`
}

ResourcePoolRecord corresponds to a peloton resource pool TODO: Add versioning.

func (*ResourcePoolRecord) GetResourcePoolConfig

func (r *ResourcePoolRecord) GetResourcePoolConfig() (*respool.ResourcePoolConfig, error)

GetResourcePoolConfig returns the unmarshaled respool.ResourceConfig

type SortUpdateInfo

type SortUpdateInfo struct {
	// contains filtered or unexported fields
}

SortUpdateInfo is the structure used by the sortable interface for updates, where the sorting will be done according to the job configuration version for a given job.

type SortUpdateInfoTS

type SortUpdateInfoTS struct {
	// contains filtered or unexported fields
}

SortUpdateInfoTS is the structure used by the sortable interface for updates, where the sorting will be done according to the update create timestamp for a given job.

type SortedTaskInfoList

type SortedTaskInfoList []*task.TaskInfo

SortedTaskInfoList makes TaskInfo implement sortable interface

func (SortedTaskInfoList) Len

func (a SortedTaskInfoList) Len() int

func (SortedTaskInfoList) Less

func (a SortedTaskInfoList) Less(i, j int) bool

func (SortedTaskInfoList) Swap

func (a SortedTaskInfoList) Swap(i, j int)

type SortedUpdateList

type SortedUpdateList []*SortUpdateInfo

SortedUpdateList implements a sortable interface for updates according to the job configuration versions for a given job.

func (SortedUpdateList) Len

func (u SortedUpdateList) Len() int

func (SortedUpdateList) Less

func (u SortedUpdateList) Less(i, j int) bool

func (SortedUpdateList) Swap

func (u SortedUpdateList) Swap(i, j int)

type SortedUpdateListTS

type SortedUpdateListTS []*SortUpdateInfoTS

SortedUpdateListTS implements a sortable interface for updates according to the create time for a given job.

func (SortedUpdateListTS) Len

func (u SortedUpdateListTS) Len() int

func (SortedUpdateListTS) Less

func (u SortedUpdateListTS) Less(i, j int) bool

func (SortedUpdateListTS) Swap

func (u SortedUpdateListTS) Swap(i, j int)

type Store

type Store struct {
	DataStore api.DataStore

	Conf *Config
	// contains filtered or unexported fields
}

Store implements JobStore, TaskStore, UpdateStore, FrameworkInfoStore, and PersistentVolumeStore using a cassandra backend TODO: Break this up into different files (and or structs) that implement each of these interfaces to keep code modular.

func NewStore

func NewStore(config *Config, scope tally.Scope) (*Store, error)

NewStore creates a Store

func (*Store) AddWorkflowEvent

func (s *Store) AddWorkflowEvent(
	ctx context.Context,
	updateID *peloton.UpdateID,
	instanceID uint32,
	workflowType models.WorkflowType,
	workflowState update.State) error

AddWorkflowEvent adds workflow events for an update and instance to track the progress

func (*Store) CreatePersistentVolume

func (s *Store) CreatePersistentVolume(ctx context.Context, volume *pb_volume.PersistentVolumeInfo) error

CreatePersistentVolume creates a persistent volume entry.

func (*Store) CreateTaskRuntime

func (s *Store) CreateTaskRuntime(
	ctx context.Context,
	jobID *peloton.JobID,
	instanceID uint32,
	runtime *task.RuntimeInfo,
	owner string,
	jobType job.JobType) error

CreateTaskRuntime creates a task runtime for a peloton job

func (*Store) CreateUpdate

func (s *Store) CreateUpdate(
	ctx context.Context,
	updateInfo *models.UpdateModel,
) error

CreateUpdate creates a new update entry in DB. If it already exists, the create will return an error.

func (*Store) DeleteJob

func (s *Store) DeleteJob(
	ctx context.Context,
	jobID string) error

DeleteJob deletes a job and associated tasks, by job id. TODO: This implementation is not perfect, as if it's getting an transient error, the job or some tasks may not be fully deleted.

func (*Store) DeletePodEvents

func (s *Store) DeletePodEvents(
	ctx context.Context,
	jobID string,
	instanceID uint32,
	fromRunID uint64,
	toRunID uint64,
) error

DeletePodEvents deletes the pod events for provided JobID, InstanceID and RunID in the range [fromRunID-toRunID)

func (*Store) DeleteTaskRuntime

func (s *Store) DeleteTaskRuntime(
	ctx context.Context,
	id *peloton.JobID,
	instanceID uint32) error

DeleteTaskRuntime deletes runtime of a particular task . It is used to delete a task when update workflow reduces the instance count during an update. The pod events are retained in case the user wants to fetch the events or the logs from a previous run of a deleted task. The task configurations from previous versions are retained in case auto-rollback gets triggered.

func (*Store) DeleteUpdate

func (s *Store) DeleteUpdate(
	ctx context.Context,
	updateID *peloton.UpdateID,
	jobID *peloton.JobID,
	jobConfigVersion uint64) error

DeleteUpdate deletes the update from the update_info table and deletes all job and task configurations created for the update.

func (*Store) GetFrameworkID

func (s *Store) GetFrameworkID(ctx context.Context, frameworkName string) (string, error)

GetFrameworkID reads the framework id for a framework name

func (*Store) GetMaxJobConfigVersion

func (s *Store) GetMaxJobConfigVersion(
	ctx context.Context,
	jobID string) (uint64, error)

GetMaxJobConfigVersion returns the maximum version of configs of a given job

func (*Store) GetMesosStreamID

func (s *Store) GetMesosStreamID(ctx context.Context, frameworkName string) (string, error)

GetMesosStreamID reads the mesos stream id for a framework name

func (*Store) GetPersistentVolume

func (s *Store) GetPersistentVolume(ctx context.Context, volumeID *peloton.VolumeID) (*pb_volume.PersistentVolumeInfo, error)

GetPersistentVolume gets the persistent volume object.

func (*Store) GetPodEvents

func (s *Store) GetPodEvents(
	ctx context.Context,
	jobID string,
	instanceID uint32,
	podID ...string) ([]*pod.PodEvent, error)

GetPodEvents returns pod events for a Job + Instance + PodID (optional) Pod events are sorted by PodID + Timestamp only is called from this file

func (*Store) GetTaskByID

func (s *Store) GetTaskByID(ctx context.Context, taskID string) (*task.TaskInfo, error)

GetTaskByID returns the tasks (tasks.TaskInfo) for a peloton job

func (*Store) GetTaskConfigs

func (s *Store) GetTaskConfigs(ctx context.Context, id *peloton.JobID,
	instanceIDs []uint32, version uint64) (map[uint32]*task.TaskConfig, *models.ConfigAddOn, error)

GetTaskConfigs returns the task configs for a list of instance IDs, job ID and config version.

func (*Store) GetTaskForJob

func (s *Store) GetTaskForJob(ctx context.Context, jobID string, instanceID uint32) (map[uint32]*task.TaskInfo, error)

GetTaskForJob returns a task by jobID and instanceID

func (*Store) GetTaskRuntime

func (s *Store) GetTaskRuntime(ctx context.Context, jobID *peloton.JobID, instanceID uint32) (*task.RuntimeInfo, error)

GetTaskRuntime for a job and instance id.

func (*Store) GetTaskRuntimesForJobByRange

func (s *Store) GetTaskRuntimesForJobByRange(ctx context.Context,
	id *peloton.JobID, instanceRange *task.InstanceRange) (map[uint32]*task.RuntimeInfo, error)

GetTaskRuntimesForJobByRange returns the Task RuntimeInfo for batch jobs by instance ID range.

func (*Store) GetTasksByQuerySpec

func (s *Store) GetTasksByQuerySpec(
	ctx context.Context,
	jobID *peloton.JobID,
	spec *task.QuerySpec) (map[uint32]*task.TaskInfo, error)

GetTasksByQuerySpec returns the tasks for a peloton job which satisfy the QuerySpec field 'state' is filtered by DB query, field 'name', 'host' is filter

func (*Store) GetTasksForJob

func (s *Store) GetTasksForJob(ctx context.Context, id *peloton.JobID) (map[uint32]*task.TaskInfo, error)

GetTasksForJob returns all the task runtimes (no configuration) in a map of tasks.TaskInfo for a peloton job

func (*Store) GetTasksForJobAndStates

func (s *Store) GetTasksForJobAndStates(
	ctx context.Context,
	id *peloton.JobID,
	states []task.TaskState) (map[uint32]*task.TaskInfo, error)

GetTasksForJobAndStates returns the tasks for a peloton job which are in one of the specified states. result map key is TaskID, value is TaskHost

func (*Store) GetTasksForJobByRange

func (s *Store) GetTasksForJobByRange(ctx context.Context,
	id *peloton.JobID, instanceRange *task.InstanceRange) (map[uint32]*task.TaskInfo, error)

GetTasksForJobByRange returns the TaskInfo for batch jobs by instance ID range.

func (*Store) GetTasksForJobResultSet

func (s *Store) GetTasksForJobResultSet(ctx context.Context, id *peloton.JobID) ([]map[string]interface{}, error)

GetTasksForJobResultSet returns the result set that can be used to iterate each task in a job Caller need to call result.Close()

func (*Store) GetUpdate

func (s *Store) GetUpdate(ctx context.Context, id *peloton.UpdateID) (
	*models.UpdateModel,
	error)

GetUpdate fetches the job update stored in the DB.

func (*Store) GetUpdateProgress

func (s *Store) GetUpdateProgress(ctx context.Context, id *peloton.UpdateID) (
	*models.UpdateModel,
	error)

GetUpdateProgress fetches the job update progress, which includes the instances already updated, instances being updated and the current state of the update.

func (*Store) GetUpdatesForJob

func (s *Store) GetUpdatesForJob(
	ctx context.Context,
	jobID string,
) ([]*peloton.UpdateID, error)

GetUpdatesForJob returns the list of job updates created for a given job.

func (*Store) GetWorkflowEvents

func (s *Store) GetWorkflowEvents(
	ctx context.Context,
	updateID *peloton.UpdateID,
	instanceID uint32,
	limit uint32,
) ([]*stateless.WorkflowEvent, error)

GetWorkflowEvents gets workflow events for an update and instance, events are sorted in descending create timestamp

func (*Store) ModifyUpdate

func (s *Store) ModifyUpdate(
	ctx context.Context,
	updateInfo *models.UpdateModel) error

ModifyUpdate modify the progress of an update, instances to update/remove/add and the job config version

func (*Store) QueryJobs

func (s *Store) QueryJobs(ctx context.Context, respoolID *peloton.ResourcePoolID, spec *job.QuerySpec, summaryOnly bool) ([]*job.JobInfo, []*job.JobSummary, uint32, error)

QueryJobs returns all jobs in the resource pool that matches the spec.

func (*Store) QueryTasks

func (s *Store) QueryTasks(
	ctx context.Context,
	jobID *peloton.JobID,
	spec *task.QuerySpec) ([]*task.TaskInfo, uint32, error)

QueryTasks returns the tasks filtered on states(spec.TaskStates) in the given offset..offset+limit range.

func (*Store) SetMesosFrameworkID

func (s *Store) SetMesosFrameworkID(ctx context.Context, frameworkName string, frameworkID string) error

SetMesosFrameworkID stores the mesos framework id for a framework name

func (*Store) SetMesosStreamID

func (s *Store) SetMesosStreamID(ctx context.Context, frameworkName string, mesosStreamID string) error

SetMesosStreamID stores the mesos framework id for a framework name

func (*Store) UpdatePersistentVolume

func (s *Store) UpdatePersistentVolume(ctx context.Context, volumeInfo *pb_volume.PersistentVolumeInfo) error

UpdatePersistentVolume updates persistent volume info.

func (*Store) UpdateTaskRuntime

func (s *Store) UpdateTaskRuntime(
	ctx context.Context,
	jobID *peloton.JobID,
	instanceID uint32,
	runtime *task.RuntimeInfo,
	jobType job.JobType) error

UpdateTaskRuntime updates a task for a peloton job

func (*Store) WriteUpdateProgress

func (s *Store) WriteUpdateProgress(
	ctx context.Context,
	updateInfo *models.UpdateModel) error

WriteUpdateProgress writes the progress of the job update to the DB. The inputs to this function are the only mutable fields in update.

type TaskConfigRecord

type TaskConfigRecord struct {
	JobID        querybuilder.UUID `cql:"job_id"`
	Version      int
	InstanceID   int       `cql:"instance_id"`
	CreationTime time.Time `cql:"creation_time"`
	Config       []byte
	ConfigAddOn  []byte `cql:"config_addon"`
}

TaskConfigRecord correspond to a peloton task config

func (*TaskConfigRecord) GetConfigAddOn

func (t *TaskConfigRecord) GetConfigAddOn() (*models.ConfigAddOn, error)

GetConfigAddOn returns the unmarshaled models.ConfigAddOn

func (*TaskConfigRecord) GetTaskConfig

func (t *TaskConfigRecord) GetTaskConfig() (*task.TaskConfig, error)

GetTaskConfig returns the unmarshaled task.TaskInfo

type TaskRuntimeRecord

type TaskRuntimeRecord struct {
	JobID       querybuilder.UUID `cql:"job_id"`
	InstanceID  int               `cql:"instance_id"`
	Version     int64
	UpdateTime  time.Time `cql:"update_time"`
	State       string
	RuntimeInfo []byte `cql:"runtime_info"`
}

TaskRuntimeRecord correspond to a peloton task

func (*TaskRuntimeRecord) GetTaskRuntime

func (t *TaskRuntimeRecord) GetTaskRuntime() (*task.RuntimeInfo, error)

GetTaskRuntime returns the unmarshaled task.TaskInfo

type TaskStateChangeRecord

type TaskStateChangeRecord struct {
	TaskState          string `cql:"task_state"`
	EventTime          string `cql:"event_time"`
	TaskHost           string `cql:"task_host"`
	JobID              string `cql:"job_id"`
	InstanceID         uint32 `cql:"instance_id"`
	MesosTaskID        string `cql:"mesos_task_id"`
	Message            string `cql:"message"`
	Healthy            string `cql:"healthy"`
	Reason             string `cql:"reason"`
	AgentID            string `cql:"agent_id"`
	PrevMesosTaskID    string `cql:"prev_mesos_task_id"`
	DesiredMesosTaskID string `cql:"desired_mesos_task_id"`
}

TaskStateChangeRecord tracks a peloton task state transition

type TaskStateChangeRecords

type TaskStateChangeRecords struct {
	JobID      querybuilder.UUID `cql:"job_id"`
	InstanceID int               `cql:"instance_id"`
	Events     []string
}

TaskStateChangeRecords tracks a peloton task's state transition events

func (*TaskStateChangeRecords) GetStateChangeRecords

func (t *TaskStateChangeRecords) GetStateChangeRecords() ([]*TaskStateChangeRecord, error)

GetStateChangeRecords returns the TaskStateChangeRecord array

type UpdateRecord

type UpdateRecord struct {
	UpdateID             querybuilder.UUID `cql:"update_id"`
	UpdateOptions        []byte            `cql:"update_options"`
	State                string            `cql:"update_state"`
	PrevState            string            `cql:"update_prev_state"`
	Type                 string            `cql:"update_type"`
	JobID                querybuilder.UUID `cql:"job_id"`
	InstancesTotal       int               `cql:"instances_total"`
	InstancesCurrent     []int             `cql:"instances_current"`
	InstancesUpdated     []int             `cql:"instances_updated"`
	InstancesAdded       []int             `cql:"instances_added"`
	InstancesRemoved     []int             `cql:"instances_removed"`
	InstancesDone        int               `cql:"instances_done"`
	InstancesFailed      int               `cql:"instances_failed"`
	JobConfigVersion     int64             `cql:"job_config_version"`
	PrevJobConfigVersion int64             `cql:"job_config_prev_version"`
	CreationTime         time.Time         `cql:"creation_time"`
	UpdateTime           time.Time         `cql:"update_time"`
	OpaqueData           string            `cql:"opaque_data"`
	CompletionTime       string            `cql:"completion_time"`
}

UpdateRecord tracks the job update info

func (*UpdateRecord) GetInstancesAdded

func (u *UpdateRecord) GetInstancesAdded() []uint32

GetInstancesAdded returns a list of tasks to be added

func (*UpdateRecord) GetInstancesRemoved

func (u *UpdateRecord) GetInstancesRemoved() []uint32

GetInstancesRemoved returns a list of tasks to be added

func (*UpdateRecord) GetInstancesUpdated

func (u *UpdateRecord) GetInstancesUpdated() []uint32

GetInstancesUpdated returns a list of tasks to be updated

func (*UpdateRecord) GetProcessingInstances

func (u *UpdateRecord) GetProcessingInstances() []uint32

GetProcessingInstances returns a list of tasks currently being updated.

func (*UpdateRecord) GetUpdateConfig

func (u *UpdateRecord) GetUpdateConfig() (*update.UpdateConfig, error)

GetUpdateConfig unmarshals and returns the configuration of the job update.

type UpdateViewRecord

type UpdateViewRecord struct {
	UpdateID     querybuilder.UUID `cql:"update_id"`
	JobID        querybuilder.UUID `cql:"job_id"`
	CreationTime time.Time         `cql:"creation_time"`
}

UpdateViewRecord tracks the job update info from materialized view

Directories

Path Synopsis
Package api provides data access management to structured storage backend
Package api provides data access management to structured storage backend

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL