Documentation ¶
Index ¶
- Constants
- type CPTaskWatcher
- type ContainerService
- func (c *ContainerService) AddContainerTakeDownTimer(containerId string) error
- func (c *ContainerService) DownContainer(ctx context.Context, container container.IContainer) error
- func (c *ContainerService) GetContainerBuffer() datastructure.Buffer[string, container.IContainer]
- func (c *ContainerService) GetContainerCoolDownState() datastructure.Buffer[string, time.Time]
- func (c *ContainerService) GetContainerIdShort() []string
- func (c *ContainerService) RemoveContainerTakeDownTimer(containerId string)
- func (c *ContainerService) StartContainer(ctx context.Context, imageUrl string) (container.IContainer, error)
- type DynamicScalingService
- type IContainer
- type IDynamicScaling
- type IResourceMonitor
- type Job
- type ResourceMonitor
- func (r *ResourceMonitor) CalculateAvailableResource(ctx context.Context) (*models.AvailableResource, error)
- func (r *ResourceMonitor) GetResourceUsage() ([]models.ContainerResourceUsage, error)
- func (r *ResourceMonitor) GetSystemCpuUsage(ctx context.Context) (*models.CpuUsage, error)
- func (r *ResourceMonitor) GetSystemMemUsage() (*models.MemoryUsage, error)
- type Task
Constants ¶
View Source
const (
CheckResourceTimeBuffer = 3
)
View Source
const (
ContainerIdShortSize = 12
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CPTaskWatcher ¶ added in v0.0.14
type CPTaskWatcher interface { datastructure.Observer[models.TaskEventBus] AddTaskToWatch(taskID primitive.ObjectID) WatcherLoop(ctx context.Context) }
CPTaskWatcher is a service to watch for a task that haven't heard back from the WorkerNode after being distributed for a specified time.
func ProvideCPTaskWatcher ¶ added in v0.0.14
func ProvideCPTaskWatcher(taskService Task, config config.TaskWatcherConfigModel) CPTaskWatcher
type ContainerService ¶
type ContainerService struct {
// contains filtered or unexported fields
}
func (*ContainerService) AddContainerTakeDownTimer ¶ added in v0.0.5
func (c *ContainerService) AddContainerTakeDownTimer(containerId string) error
func (*ContainerService) DownContainer ¶ added in v0.0.5
func (c *ContainerService) DownContainer(ctx context.Context, container container.IContainer) error
func (*ContainerService) GetContainerBuffer ¶
func (c *ContainerService) GetContainerBuffer() datastructure.Buffer[string, container.IContainer]
func (*ContainerService) GetContainerCoolDownState ¶
func (c *ContainerService) GetContainerCoolDownState() datastructure.Buffer[string, time.Time]
func (*ContainerService) GetContainerIdShort ¶
func (c *ContainerService) GetContainerIdShort() []string
func (*ContainerService) RemoveContainerTakeDownTimer ¶ added in v0.0.5
func (c *ContainerService) RemoveContainerTakeDownTimer(containerId string)
func (*ContainerService) StartContainer ¶
func (c *ContainerService) StartContainer(ctx context.Context, imageUrl string) (container.IContainer, error)
type DynamicScalingService ¶
type DynamicScalingService struct {
// contains filtered or unexported fields
}
func (*DynamicScalingService) CheckResourceUsageLimit ¶
func (d *DynamicScalingService) CheckResourceUsageLimit(ctx context.Context) (*models.CheckResourceReport, error)
func (*DynamicScalingService) CheckResourceUsageLimitWithTimeBuffer ¶
func (d *DynamicScalingService) CheckResourceUsageLimitWithTimeBuffer(ctx context.Context) (*models.CheckResourceReport, error)
type IContainer ¶
type IContainer interface { StartContainer(ctx context.Context, imageUrl string) (container.IContainer, error) GetContainerIdShort() []string GetContainerCoolDownState() datastructure.Buffer[string, time.Time] GetContainerBuffer() datastructure.Buffer[string, container.IContainer] DownContainer(ctx context.Context, container container.IContainer) error AddContainerTakeDownTimer(containerId string) error RemoveContainerTakeDownTimer(containerImage string) }
func ProvideContainer ¶
func ProvideContainer(dockerClient *client.Client, config config.WorkerConfigModel, meter metric.Meter) IContainer
type IDynamicScaling ¶
type IDynamicScaling interface { CheckResourceUsageLimit(ctx context.Context) (*models.CheckResourceReport, error) CheckResourceUsageLimitWithTimeBuffer(ctx context.Context) (*models.CheckResourceReport, error) }
func ProvideDynamicScaling ¶
func ProvideDynamicScaling(containerService IContainer, resourceMonitoringService IResourceMonitor, config config.WorkerConfigModel) IDynamicScaling
type IResourceMonitor ¶
type IResourceMonitor interface { GetResourceUsage() ([]models.ContainerResourceUsage, error) GetSystemMemUsage() (*models.MemoryUsage, error) GetSystemCpuUsage(ctx context.Context) (*models.CpuUsage, error) CalculateAvailableResource(ctx context.Context) (*models.AvailableResource, error) }
func ProvideResourcesMonitor ¶
func ProvideResourcesMonitor(dockerClient *client.Client, workerService IContainer, config config.WorkerConfigModel) IResourceMonitor
type Job ¶
type Job interface { GetJob(ctx context.Context, id primitive.ObjectID) (*models.Job, error) CreateJob(ctx context.Context, name, imageURL string, isExperiment bool, distributionLogic models.DistributorName) (*models.Job, error) ListJob(ctx context.Context) ([]models.Job, error) ListJobReadyToDistribute(ctx context.Context) ([]models.Job, error) UpdateJobStatusToFinish(ctx context.Context, id *primitive.ObjectID) error UpdateJobStatusToDistributing(ctx context.Context, id *primitive.ObjectID) error UpdateJobStatusToExperimenting(ctx context.Context, id *primitive.ObjectID) error UpdateJobToFailed(ctx context.Context, id *primitive.ObjectID, message string, inputErr error) error }
func ProvideJobService ¶
func ProvideJobService(jobRepository repository.IJob) Job
type ResourceMonitor ¶
type ResourceMonitor struct {
// contains filtered or unexported fields
}
func (*ResourceMonitor) CalculateAvailableResource ¶ added in v0.0.5
func (r *ResourceMonitor) CalculateAvailableResource(ctx context.Context) (*models.AvailableResource, error)
func (*ResourceMonitor) GetResourceUsage ¶
func (r *ResourceMonitor) GetResourceUsage() ([]models.ContainerResourceUsage, error)
func (*ResourceMonitor) GetSystemCpuUsage ¶
func (*ResourceMonitor) GetSystemMemUsage ¶
func (r *ResourceMonitor) GetSystemMemUsage() (*models.MemoryUsage, error)
type Task ¶
type Task interface { GetAvailableTask(ctx context.Context, jobIDs []models.Job) (*models.Job, []models.Task, error) UpdateTaskAfterDistribution(ctx context.Context, successTasks []models.Task, errorTasks []models.DistributeError) error UpdateTaskWorkOnFailure(ctx context.Context, taskID primitive.ObjectID, nodeID string, errMessage string) error UpdateTaskSuccess(ctx context.Context, taskID primitive.ObjectID, nodeID string, result []byte, averageCPUUsage float32, averageMemoryUsage float64) error UpdateTaskWaitTimeout(ctx context.Context, taskID primitive.ObjectID) error CreateTask(ctx context.Context, job *models.Job, taskAttributes [][]byte, isExperiment bool) ([]models.Task, error) GetTaskByJob(ctx context.Context, job *models.Job) ([]models.Task, error) GetTaskByID(ctx context.Context, taskID primitive.ObjectID) (*models.Task, error) GetAverageResourceUsage(ctx context.Context, jobID *primitive.ObjectID) (*models.TaskResourceUsage, error) UpdateTaskToBeReadyToBeDistributed(ctx context.Context, jobID *primitive.ObjectID) error CountUnfinishedTaskByJobID(ctx context.Context, jobID *primitive.ObjectID) (int64, error) UpdateAllTaskToWorkOnFailure(ctx context.Context, job *models.Job, jobErrorMessage string) error GetAllDistributedTask(ctx context.Context) ([]models.Task, error) }
func ProvideTaskService ¶
func ProvideTaskService(taskRepository repository.ITask) Task
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mock_service is a generated GoMock package.
|
Package mock_service is a generated GoMock package. |
Click to show internal directories.
Click to hide internal directories.