dao

package
v0.2.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const BatchSize = 50

Variables

View Source
var (
	KeyEntityTable = "entity"
	KeyNodeTable   = "nodes"
	KeyAppTable    = "apps"
	MaxConfigApp   = "maxConfig"

	KeyEntitiesOfNode    = "SELECT id, status FROM " + KeyNodeTable + " WHERE nodename='%s';"
	KeyGetAllEntities    = "SELECT id, nodename, status, history FROM " + KeyEntityTable + ";"
	KeyGetEntity         = "SELECT id, nodename, status, history FROM " + KeyEntityTable + " WHERE id='%s';"
	KeyUpdateEntityInfo  = "UPDATE " + KeyEntityTable + " SET nodename='%s', status=%d, history='%s' WHERE id='%s';"
	QueryInsertEntity    = "INSERT INTO " + KeyEntityTable + " (id, nodename, status) VALUES (?, ?, ?)"
	QueryInsertApp       = "INSERT INTO " + KeyAppTable + " (id, partitions, active, configuration) VALUES (?, ?, ?, ?)"
	KeyAppById           = "SELECT id, partitions, active, configuration FROM " + KeyAppTable + " WHERE id='%s';"
	KeyAppByIds          = "SELECT id, partitions, active, configuration FROM " + KeyAppTable + " WHERE id in (?, ?);"
	KeyGelAllApps        = "SELECT id, partitions, active, configuration FROM " + KeyAppTable + ";"
	QueryUpdateAppStatus = "UPDATE " + KeyAppTable + " set active = %s where id='%s'"
	QueryGetConfig       = "SELECT configuration FROM " + KeyAppTable + " WHERE id='%s';"
	QueryUpdateConfig    = "UPDATE " + KeyAppTable + " SET configuration='%s' WHERE id='%s';"
)

Functions

This section is empty.

Types

type AppMap

type AppMap struct {
	// contains filtered or unexported fields
}

type ClusterDao

type ClusterDao interface {
	GetAllEntitiesInfoOfNode(nodeName string) []e.EntityInfo
	GetAllEntitiesInfo() []e.EntityInfo
	GetEntityInfo(id string) e.EntityInfo
	UpdateEntityStatus(id string, nodeName string, status int) error
	GetApp(appName string) (store.App, error)
	InvalidateSingleAppCache(appName string)
	InsertApp(app store.App) error
	CreateEntity(info e.EntityInfo) error
	UpdateAppActiveStatus(appName string, activeStatus bool) error
	GetApps(appId string) ([]store.App, error)
	GetDCAwareApp(appName string) (store.App, error)
	CreateConfigurations(appId string, configuration store.Configuration) (store.Configuration, error)
	GetConfiguration(appId string) (store.Configuration, error)
	UpdateConfiguration(appId string, configuration store.Configuration) (store.Configuration, error)
	DeleteConfiguration(appId string) (store.Configuration, error)
}

type ClusterDaoImplCassandra

type ClusterDaoImplCassandra struct {
	Session db_wrapper.SessionInterface
	AppMap  AppMap
	Conf    *conf.Configuration
	Monitor p.Monitor
}

func GetClusterDaoImpl

func GetClusterDaoImpl(conf *conf.Configuration, monitor p.Monitor) *ClusterDaoImplCassandra

TODO: Should we make it singleton?

func (*ClusterDaoImplCassandra) CreateConfigurations

func (c *ClusterDaoImplCassandra) CreateConfigurations(appId string, configuration store.Configuration) (store.Configuration, error)

Create configuration for a given appId and configuration

func (*ClusterDaoImplCassandra) CreateEntity

func (c *ClusterDaoImplCassandra) CreateEntity(entityInfo e.EntityInfo) error

CreateEntity creates a new entity in the Cassandra database in a disabled state.

func (*ClusterDaoImplCassandra) DeleteConfiguration

func (c *ClusterDaoImplCassandra) DeleteConfiguration(appId string) (store.Configuration, error)

Delete the configurations for a given appId

func (*ClusterDaoImplCassandra) GetAllEntitiesInfo

func (c *ClusterDaoImplCassandra) GetAllEntitiesInfo() []e.EntityInfo

Get all the entities Raise a fatal exception in case there is an exception getting all entities TODO: Return error

func (*ClusterDaoImplCassandra) GetAllEntitiesInfoOfNode

func (c *ClusterDaoImplCassandra) GetAllEntitiesInfoOfNode(nodeName string) []e.EntityInfo

Get all the entities assigned to the node Raise a fatal exception in case there is an exception getting all entities TODO: Return error

func (*ClusterDaoImplCassandra) GetApp

func (c *ClusterDaoImplCassandra) GetApp(appName string) (store.App, error)

func (*ClusterDaoImplCassandra) GetApps

func (c *ClusterDaoImplCassandra) GetApps(appId string) ([]store.App, error)

GetApps retrieves application data from the Cassandra database. If an appId is provided, it returns data for the specific app. Otherwise, it returns data for all apps.

func (*ClusterDaoImplCassandra) GetConfiguration

func (c *ClusterDaoImplCassandra) GetConfiguration(appId string) (store.Configuration, error)

Get app configurations for a given appId

func (*ClusterDaoImplCassandra) GetDCAwareApp

func (c *ClusterDaoImplCassandra) GetDCAwareApp(appName string) (store.App, error)

func (*ClusterDaoImplCassandra) GetEntityInfo

func (c *ClusterDaoImplCassandra) GetEntityInfo(id string) e.EntityInfo

Get entity by id TODO: Return error

func (*ClusterDaoImplCassandra) InsertApp

func (c *ClusterDaoImplCassandra) InsertApp(app store.App) error

By default if Active parameter is not set it will create the app in deactivated state

func (*ClusterDaoImplCassandra) InvalidateSingleAppCache

func (c *ClusterDaoImplCassandra) InvalidateSingleAppCache(appName string)

InvalidateSingleAppCache removes a specific app from the AppMap cache.

func (*ClusterDaoImplCassandra) UpdateAppActiveStatus

func (c *ClusterDaoImplCassandra) UpdateAppActiveStatus(appName string, activeStatus bool) error

UpdateAppActiveStatus updates the active status of an application in the Cassandra database.

func (*ClusterDaoImplCassandra) UpdateConfiguration

func (c *ClusterDaoImplCassandra) UpdateConfiguration(appId string, configuration store.Configuration) (store.Configuration, error)

Update the configurations for given appId and configurations

func (*ClusterDaoImplCassandra) UpdateEntityStatus

func (c *ClusterDaoImplCassandra) UpdateEntityStatus(id string, nodename string, status int) error

UpdateEntityStatus updates the status of a specific entity in the Cassandra database. It also updates the history of the entity status changes.

func (*ClusterDaoImplCassandra) ValidateConfigurations

func (c *ClusterDaoImplCassandra) ValidateConfigurations(config store.Configuration) error

App configurations are validated against max configs

type DummyClusterDaoImpl

type DummyClusterDaoImpl struct {
}

func (DummyClusterDaoImpl) CreateConfigurations

func (d DummyClusterDaoImpl) CreateConfigurations(appId string, configuration store.Configuration) (store.Configuration, error)

func (DummyClusterDaoImpl) CreateEntity

func (d DummyClusterDaoImpl) CreateEntity(info e.EntityInfo) error

func (DummyClusterDaoImpl) DeleteConfiguration

func (d DummyClusterDaoImpl) DeleteConfiguration(appId string) (store.Configuration, error)

func (DummyClusterDaoImpl) GetAllEntitiesInfo

func (d DummyClusterDaoImpl) GetAllEntitiesInfo() []e.EntityInfo

func (DummyClusterDaoImpl) GetAllEntitiesInfoOfNode

func (d DummyClusterDaoImpl) GetAllEntitiesInfoOfNode(nodeName string) []e.EntityInfo

func (DummyClusterDaoImpl) GetApp

func (d DummyClusterDaoImpl) GetApp(appName string) (store.App, error)

func (DummyClusterDaoImpl) GetApps

func (d DummyClusterDaoImpl) GetApps(appId string) ([]store.App, error)

func (DummyClusterDaoImpl) GetConfiguration

func (d DummyClusterDaoImpl) GetConfiguration(appId string) (store.Configuration, error)

func (DummyClusterDaoImpl) GetDCAwareApp

func (d DummyClusterDaoImpl) GetDCAwareApp(appName string) (store.App, error)

func (DummyClusterDaoImpl) GetEntityInfo

func (d DummyClusterDaoImpl) GetEntityInfo(id string) e.EntityInfo

func (DummyClusterDaoImpl) InsertApp

func (d DummyClusterDaoImpl) InsertApp(app store.App) error

func (DummyClusterDaoImpl) InvalidateSingleAppCache

func (d DummyClusterDaoImpl) InvalidateSingleAppCache(appName string)

func (DummyClusterDaoImpl) UpdateAppActiveStatus

func (d DummyClusterDaoImpl) UpdateAppActiveStatus(appName string, activeStatus bool) error

func (DummyClusterDaoImpl) UpdateConfiguration

func (d DummyClusterDaoImpl) UpdateConfiguration(appId string, configuration store.Configuration) (store.Configuration, error)

func (DummyClusterDaoImpl) UpdateEntityStatus

func (d DummyClusterDaoImpl) UpdateEntityStatus(id string, nodeName string, status int) error

type DummyScheduleDaoImpl

type DummyScheduleDaoImpl struct{}

func (*DummyScheduleDaoImpl) BulkAction

func (d *DummyScheduleDaoImpl) BulkAction(app s.App, partitionId int, scheduleTimeGroup time.Time, status []s.Status, actionType s.ActionType) error

func (*DummyScheduleDaoImpl) CreateRun

func (d *DummyScheduleDaoImpl) CreateRun(schedule s.Schedule, app s.App) (s.Schedule, error)

func (*DummyScheduleDaoImpl) CreateSchedule

func (d *DummyScheduleDaoImpl) CreateSchedule(schedule s.Schedule, app s.App) (s.Schedule, error)

func (*DummyScheduleDaoImpl) DeleteSchedule

func (d *DummyScheduleDaoImpl) DeleteSchedule(uuid gocql.UUID) (s.Schedule, error)

func (*DummyScheduleDaoImpl) EnrichSchedule

func (d *DummyScheduleDaoImpl) EnrichSchedule(schedule *s.Schedule) error

func (*DummyScheduleDaoImpl) GetCronSchedulesByApp

func (d *DummyScheduleDaoImpl) GetCronSchedulesByApp(appId string, status s.Status) ([]s.Schedule, []string)

func (*DummyScheduleDaoImpl) GetEnrichedSchedule

func (d *DummyScheduleDaoImpl) GetEnrichedSchedule(uuid gocql.UUID) (s.Schedule, error)

func (*DummyScheduleDaoImpl) GetPaginatedSchedules

func (d *DummyScheduleDaoImpl) GetPaginatedSchedules(appId string, partitions int, timeRange Range, size int64, status s.Status, pageState []byte, continuationStartTime time.Time) ([]s.Schedule, []byte, time.Time, error)

func (*DummyScheduleDaoImpl) GetRecurringScheduleByPartition

func (d *DummyScheduleDaoImpl) GetRecurringScheduleByPartition(partitionId int) ([]s.Schedule, []error)

func (*DummyScheduleDaoImpl) GetSchedule

func (d *DummyScheduleDaoImpl) GetSchedule(uuid gocql.UUID) (s.Schedule, error)

func (*DummyScheduleDaoImpl) GetScheduleRuns

func (d *DummyScheduleDaoImpl) GetScheduleRuns(uuid gocql.UUID, size int64, when string, pageState []byte) ([]s.Schedule, []byte, error)

func (*DummyScheduleDaoImpl) GetSchedulesForEntity

func (d *DummyScheduleDaoImpl) GetSchedulesForEntity(appId string, partitionId int, timeBucket time.Time, pageState []byte) db_wrapper.IterInterface

func (*DummyScheduleDaoImpl) OptimizedEnrichSchedule

func (d *DummyScheduleDaoImpl) OptimizedEnrichSchedule(schedules []s.Schedule) ([]s.Schedule, error)

func (*DummyScheduleDaoImpl) UpdateStatus

func (d *DummyScheduleDaoImpl) UpdateStatus(schedules []s.Schedule, app s.App) error

type Range

type Range struct {
	//start time of the schedules for filter
	StartTime time.Time
	//end time of the schedules for filter
	EndTime time.Time
}

type ScheduleDao

type ScheduleDao interface {
	CreateSchedule(schedule s.Schedule, app s.App) (s.Schedule, error)
	GetRecurringScheduleByPartition(partitionId int) ([]s.Schedule, []error)
	GetSchedule(uuid gocql.UUID) (s.Schedule, error)
	GetEnrichedSchedule(uuid gocql.UUID) (s.Schedule, error)
	EnrichSchedule(schedule *s.Schedule) error
	DeleteSchedule(uuid gocql.UUID) (s.Schedule, error)
	GetScheduleRuns(uuid gocql.UUID, size int64, when string, pageState []byte) ([]s.Schedule, []byte, error)
	CreateRun(schedule s.Schedule, app s.App) (s.Schedule, error)
	UpdateStatus(schedules []s.Schedule, app s.App) error
	GetPaginatedSchedules(appId string, partitions int, timeRange Range, size int64, status s.Status, pageState []byte, continuationStartTime time.Time) ([]s.Schedule, []byte, time.Time, error)
	GetSchedulesForEntity(appId string, partitionId int, timeBucket time.Time, pageState []byte) db_wrapper.IterInterface
	OptimizedEnrichSchedule(schedules []s.Schedule) ([]s.Schedule, error)
	GetCronSchedulesByApp(appId string, status s.Status) ([]s.Schedule, []string)
	BulkAction(app s.App, partitionId int, scheduleTimeGroup time.Time, status []s.Status, actionType s.ActionType) error
}

type ScheduleDaoImpl

type ScheduleDaoImpl struct {
	Session db_wrapper.SessionInterface
	Conf    *conf.Configuration
	Monitor p.Monitor
}

func GetScheduleDaoImpl

func GetScheduleDaoImpl(conf *conf.Configuration, monitor p.Monitor) *ScheduleDaoImpl

func (*ScheduleDaoImpl) BulkAction

func (s *ScheduleDaoImpl) BulkAction(app store.App, partitionId int, scheduleTimeGroup time.Time, status []store.Status, actionType store.ActionType) error

func (*ScheduleDaoImpl) CreateRun

func (s *ScheduleDaoImpl) CreateRun(schedule store.Schedule, app store.App) (store.Schedule, error)

Create a one time schedule for a recurring schedule. The schedule will be persisted in schedule and runs tables. Returns a non nil error in case persisting the data fails.

func (*ScheduleDaoImpl) CreateSchedule

func (s *ScheduleDaoImpl) CreateSchedule(schedule store.Schedule, app store.App) (store.Schedule, error)

Persist the schedule details in cassandra. The tables to which the schedule is written to is determined based on it being a recurring schedule or not. Throws error if the writing to the schedule fails.

func (*ScheduleDaoImpl) DeleteSchedule

func (s *ScheduleDaoImpl) DeleteSchedule(uuid gocql.UUID) (store.Schedule, error)

Delete schedule with the given id For one time schedules, the rows are removed from the schedule table where as for the recurring schedules, the status is marked accordingly. Returns a non nil error in case deleting the row from Cassandra fails.

func (*ScheduleDaoImpl) EnrichSchedule

func (s *ScheduleDaoImpl) EnrichSchedule(schedule *store.Schedule) error

Enrich a non-recurring schedule with status data Returns a non nil error if enriching the schedule fails.

func (*ScheduleDaoImpl) GetCronSchedulesByApp

func (s *ScheduleDaoImpl) GetCronSchedulesByApp(appId string, status store.Status) ([]store.Schedule, []string)

func (*ScheduleDaoImpl) GetEnrichedSchedule

func (s *ScheduleDaoImpl) GetEnrichedSchedule(uuid gocql.UUID) (store.Schedule, error)

Get a single schedule enriched with status data Returns a non nil error if fetching the details of the schedule fails.

func (*ScheduleDaoImpl) GetPaginatedSchedules

func (s *ScheduleDaoImpl) GetPaginatedSchedules(appId string, partitions int, timeRange Range, size int64, status store.Status, pageState []byte, continuationStartTime time.Time) ([]store.Schedule, []byte, time.Time, error)

func (*ScheduleDaoImpl) GetRecurringScheduleByPartition

func (s *ScheduleDaoImpl) GetRecurringScheduleByPartition(partitionId int) ([]store.Schedule, []error)

Get all recurring schedules with partition id Returns a list of schedules and non nill error in case fetching the details fail.

func (*ScheduleDaoImpl) GetSchedule

func (s *ScheduleDaoImpl) GetSchedule(uuid gocql.UUID) (store.Schedule, error)

Get a single schedule with the supplied id. Attempts to find a one time schedule and falls back to recurring schedules if not found. Returns a non nil error if fetching the details of the schedule fails.

func (*ScheduleDaoImpl) GetScheduleRuns

func (s *ScheduleDaoImpl) GetScheduleRuns(uuid gocql.UUID, size int64, when string, pageState []byte) ([]store.Schedule, []byte, error)

Get size number of top runs for a given schedule id.

func (*ScheduleDaoImpl) GetSchedulesForEntity

func (s *ScheduleDaoImpl) GetSchedulesForEntity(appId string, partitionId int, timeBucket time.Time, pageState []byte) db_wrapper.IterInterface

fetch schedules from the db based on appId, time range in paginated way

func (*ScheduleDaoImpl) OptimizedEnrichSchedule

func (s *ScheduleDaoImpl) OptimizedEnrichSchedule(schedules []store.Schedule) ([]store.Schedule, error)

Fetch status data in bulk by appId, partitionId, scheduleIds Set status for schedules present in Status table Set status for schedules non present in status table Return error if there is any error while fetching data

func (*ScheduleDaoImpl) UpdateStatus

func (s *ScheduleDaoImpl) UpdateStatus(schedules []store.Schedule, app store.App) error

updates status in batches set ttl same as buffer ttl as data is added to this table after callback is fired

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL