service

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CheckResourceTimeBuffer = 3
)
View Source
const (
	ContainerIdShortSize = 12
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CPTaskWatcher added in v0.0.14

type CPTaskWatcher interface {
	datastructure.Observer[models.TaskEventBus]

	AddTaskToWatch(taskID primitive.ObjectID)
	WatcherLoop(ctx context.Context)
}

CPTaskWatcher is a service to watch for a task that haven't heard back from the WorkerNode after being distributed for a specified time.

func ProvideCPTaskWatcher added in v0.0.14

func ProvideCPTaskWatcher(taskService Task, config config.TaskWatcherConfigModel) CPTaskWatcher

type ContainerService

type ContainerService struct {
	// contains filtered or unexported fields
}

func (*ContainerService) AddContainerTakeDownTimer added in v0.0.5

func (c *ContainerService) AddContainerTakeDownTimer(containerId string) error

func (*ContainerService) DownContainer added in v0.0.5

func (c *ContainerService) DownContainer(ctx context.Context, container container.IContainer) error

func (*ContainerService) GetContainerBuffer

func (c *ContainerService) GetContainerBuffer() datastructure.Buffer[string, container.IContainer]

func (*ContainerService) GetContainerCoolDownState

func (c *ContainerService) GetContainerCoolDownState() datastructure.Buffer[string, time.Time]

func (*ContainerService) GetContainerIdShort

func (c *ContainerService) GetContainerIdShort() []string

func (*ContainerService) RemoveContainerTakeDownTimer added in v0.0.5

func (c *ContainerService) RemoveContainerTakeDownTimer(containerId string)

func (*ContainerService) StartContainer

func (c *ContainerService) StartContainer(ctx context.Context, imageUrl string) (container.IContainer, error)

type DynamicScalingService

type DynamicScalingService struct {
	// contains filtered or unexported fields
}

func (*DynamicScalingService) CheckResourceUsageLimit

func (d *DynamicScalingService) CheckResourceUsageLimit(ctx context.Context) (*models.CheckResourceReport, error)

func (*DynamicScalingService) CheckResourceUsageLimitWithTimeBuffer

func (d *DynamicScalingService) CheckResourceUsageLimitWithTimeBuffer(ctx context.Context) (*models.CheckResourceReport, error)

type IContainer

type IContainer interface {
	StartContainer(ctx context.Context, imageUrl string) (container.IContainer, error)
	GetContainerIdShort() []string
	GetContainerCoolDownState() datastructure.Buffer[string, time.Time]
	GetContainerBuffer() datastructure.Buffer[string, container.IContainer]
	DownContainer(ctx context.Context, container container.IContainer) error
	AddContainerTakeDownTimer(containerId string) error
	RemoveContainerTakeDownTimer(containerImage string)
}

func ProvideContainer

func ProvideContainer(dockerClient *client.Client, config config.WorkerConfigModel, meter metric.Meter) IContainer

type IDynamicScaling

type IDynamicScaling interface {
	CheckResourceUsageLimit(ctx context.Context) (*models.CheckResourceReport, error)
	CheckResourceUsageLimitWithTimeBuffer(ctx context.Context) (*models.CheckResourceReport, error)
}

func ProvideDynamicScaling

func ProvideDynamicScaling(containerService IContainer, resourceMonitoringService IResourceMonitor, config config.WorkerConfigModel) IDynamicScaling

type IResourceMonitor

type IResourceMonitor interface {
	GetResourceUsage() ([]models.ContainerResourceUsage, error)
	GetSystemMemUsage() (*models.MemoryUsage, error)
	GetSystemCpuUsage(ctx context.Context) (*models.CpuUsage, error)
	CalculateAvailableResource(ctx context.Context) (*models.AvailableResource, error)
}

func ProvideResourcesMonitor

func ProvideResourcesMonitor(dockerClient *client.Client, workerService IContainer, config config.WorkerConfigModel) IResourceMonitor

type Job

type Job interface {
	GetJob(ctx context.Context, id primitive.ObjectID) (*models.Job, error)
	CreateJob(ctx context.Context, name, imageURL string, isExperiment bool, distributionLogic models.DistributorName) (*models.Job, error)
	ListJob(ctx context.Context) ([]models.Job, error)
	ListJobReadyToDistribute(ctx context.Context) ([]models.Job, error)
	UpdateJobStatusToFinish(ctx context.Context, id *primitive.ObjectID) error
	UpdateJobStatusToDistributing(ctx context.Context, id *primitive.ObjectID) error
	UpdateJobStatusToExperimenting(ctx context.Context, id *primitive.ObjectID) error
	UpdateJobToFailed(ctx context.Context, id *primitive.ObjectID, message string, inputErr error) error
}

func ProvideJobService

func ProvideJobService(jobRepository repository.IJob) Job

type ResourceMonitor

type ResourceMonitor struct {
	// contains filtered or unexported fields
}

func (*ResourceMonitor) CalculateAvailableResource added in v0.0.5

func (r *ResourceMonitor) CalculateAvailableResource(ctx context.Context) (*models.AvailableResource, error)

func (*ResourceMonitor) GetResourceUsage

func (r *ResourceMonitor) GetResourceUsage() ([]models.ContainerResourceUsage, error)

func (*ResourceMonitor) GetSystemCpuUsage

func (r *ResourceMonitor) GetSystemCpuUsage(ctx context.Context) (*models.CpuUsage, error)

func (*ResourceMonitor) GetSystemMemUsage

func (r *ResourceMonitor) GetSystemMemUsage() (*models.MemoryUsage, error)

type Task

type Task interface {
	GetAvailableTask(ctx context.Context, jobIDs []models.Job) (*models.Job, []models.Task, error)
	UpdateTaskAfterDistribution(ctx context.Context, successTasks []models.Task, errorTasks []models.DistributeError) error
	UpdateTaskWorkOnFailure(ctx context.Context, taskID primitive.ObjectID, nodeID string, errMessage string) error
	UpdateTaskSuccess(ctx context.Context, taskID primitive.ObjectID, nodeID string, result []byte, averageCPUUsage float32, averageMemoryUsage float64) error
	UpdateTaskWaitTimeout(ctx context.Context, taskID primitive.ObjectID) error
	CreateTask(ctx context.Context, job *models.Job, taskAttributes [][]byte, isExperiment bool) ([]models.Task, error)
	GetTaskByJob(ctx context.Context, job *models.Job) ([]models.Task, error)
	GetTaskByID(ctx context.Context, taskID primitive.ObjectID) (*models.Task, error)
	GetAverageResourceUsage(ctx context.Context, jobID *primitive.ObjectID) (*models.TaskResourceUsage, error)
	UpdateTaskToBeReadyToBeDistributed(ctx context.Context, jobID *primitive.ObjectID) error
	CountUnfinishedTaskByJobID(ctx context.Context, jobID *primitive.ObjectID) (int64, error)
	UpdateAllTaskToWorkOnFailure(ctx context.Context, job *models.Job, jobErrorMessage string) error
	GetAllDistributedTask(ctx context.Context) ([]models.Task, error)
}

func ProvideTaskService

func ProvideTaskService(taskRepository repository.ITask) Task

Directories

Path Synopsis
Package mock_service is a generated GoMock package.
Package mock_service is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL