cachewarmer

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: CC-BY-4.0 Imports: 28 Imported by: 0

Documentation

Overview

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information. Package cachewarmer implements the structures, methods, and functions used by the cache warmer

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.

Index

Constants

View Source
const (
	B = 1 << (10 * iota)
	KB
	MB
	GB
)

sizes

View Source
const (
	MinimumSingleFileSize = int64(100 * MB)
	MaximumJobSize        = int64(1500 * MB)

	MaximumFilesToRead = 200

	// golang uses an 8192 buffer passed to getdents64 so we'll choose 128 because we get these on the first call anyway
	MinimumJobsOnDirRead = 128

	WarmPathJobQueueSuffix = "job"
	WorkQueueSuffix        = "work"

	// the base mount path
	DefaultCacheWarmerMountPath = "/mnt/cachewarmer"

	NumberOfMessagesToDequeue    = 1
	CacheWarmerVisibilityTimeout = time.Duration(60) * time.Second // 1 minute visibility timeout

	// retry mounting for 10 minutes
	MountRetryCount        = 60
	MountRetrySleepSeconds = 10

	// this size is the most common, and will stand up the fastest
	VMSSNodeSize = "Standard_D2s_v3"
	VmssName     = "cwvmss"

	// the controller will work in an airgapped environment
	MarketPlacePublisher = "microsoft-avere"
	MarketPlaceOffer     = "vfxt"
	MarketPlaceSku       = "avere-vfxt-controller"
	PlanName             = "avere-vfxt-controller"
	PlanPublisherName    = "microsoft-avere"
	PlanProductName      = "vfxt"

	// file read settings
	ReadPageSize = 10 * MB

	WorkerMultiplier        = 2
	MinimumJobsBeforeRefill = 100

	SubscriptionIdEnvVar = "AZURE_SUBSCRIPTION_ID"
)

Variables

This section is empty.

Functions

func AlreadyMounted

func AlreadyMounted(mountPath string) bool

func BashCommand

func BashCommand(cmdstr string) (bytes.Buffer, bytes.Buffer, error)

func CreateVmss

func CreateVmss(ctx context.Context, azureClients *AzureClients, vmssModel compute.VirtualMachineScaleSet) (vmss compute.VirtualMachineScaleSet, err error)

func DeleteVmss

func DeleteVmss(ctx context.Context, azureClients *AzureClients, name string) error

func EnsureWarmPath

func EnsureWarmPath(jobMountAddress string, jobExportPath string, jobBasePath string) (string, error)

EnsureWarmPath ensures that the path is mounted and exists

func FileMatches

func FileMatches(inclusionList []string, exclusionList []string, maxFileSizeBytes int64, filename string, filesize int64) bool

func GetLocalMountPath

func GetLocalMountPath(jobMountAddress string, jobExportPath string) string

func GetPrimaryStorageKey added in v1.3.0

func GetPrimaryStorageKey(ctx context.Context, resourceGroup string, accountName string) (string, error)

func GetResourceName

func GetResourceName(id string) string

func GetSubnetId

func GetSubnetId(ctx context.Context, azureClients *AzureClients) (string, error)

GetSubnetId returns the subnet of the current VM

func GetSubscriptionID added in v1.3.0

func GetSubscriptionID() (string, error)

func IsDirectory

func IsDirectory(path string) (bool, error)

func MountPath

func MountPath(address string, exportPath string, localPath string) error

func SwapResourceName

func SwapResourceName(id string, resourceName string) string

func VmssExists

func VmssExists(ctx context.Context, azureClients *AzureClients, name string) (bool, error)

Types

type AzureClients

type AzureClients struct {
	VMClient      compute.VirtualMachinesClient
	VMSSClient    compute.VirtualMachineScaleSetsClient
	NICClient     network.InterfacesClient
	LocalMetadata ComputeMetadata
}

func InitializeAzureClients

func InitializeAzureClients() (*AzureClients, error)

type CacheWarmerCloudInit

type CacheWarmerCloudInit struct {
	LocalMountPath      string
	BootstrapAddress    string
	BootstrapExportPath string
	BootstrapScriptPath string
	EnvVars             string
}

func InitializeCloutInit

func InitializeCloutInit(
	httpProxyStr string,
	httpsProxyStr string,
	noProxyStr string,
	bootstrapAddress string,
	bootstrapExportPath string,
	bootstrapScriptPath string,
	storageAccount string,
	storageKey string,
	queueNamePrefix string) *CacheWarmerCloudInit

func (*CacheWarmerCloudInit) GetCacheWarmerCloudInit

func (c *CacheWarmerCloudInit) GetCacheWarmerCloudInit() (string, error)

type CacheWarmerQueues

type CacheWarmerQueues struct {
	// contains filtered or unexported fields
}

func InitializeCacheWarmerQueues

func InitializeCacheWarmerQueues(
	ctx context.Context,
	storageAccount string,
	storageKey string,
	queueNamePrefix string) (*CacheWarmerQueues, error)

func (*CacheWarmerQueues) DeleteWarmPathJob

func (q *CacheWarmerQueues) DeleteWarmPathJob(warmPathJob *WarmPathJob) error

func (*CacheWarmerQueues) DeleteWorkerJob

func (q *CacheWarmerQueues) DeleteWorkerJob(workerJob *WorkerJob) error

func (*CacheWarmerQueues) GetWarmPathJob

func (q *CacheWarmerQueues) GetWarmPathJob() (*WarmPathJob, error)

func (*CacheWarmerQueues) GetWorkerJob

func (q *CacheWarmerQueues) GetWorkerJob() (*WorkerJob, error)

func (*CacheWarmerQueues) IsJobQueueEmpty

func (q *CacheWarmerQueues) IsJobQueueEmpty() (bool, error)

IsJobQueueEmpty returns true if there are one or more visible or invisible items in the queue

func (*CacheWarmerQueues) IsWorkQueueEmpty

func (q *CacheWarmerQueues) IsWorkQueueEmpty() (bool, error)

func (*CacheWarmerQueues) PeekWorkerJob

func (q *CacheWarmerQueues) PeekWorkerJob() (*WorkerJob, error)

func (*CacheWarmerQueues) StillProcessingWarmPathJob

func (q *CacheWarmerQueues) StillProcessingWarmPathJob(warmPathJob *WarmPathJob) error

func (*CacheWarmerQueues) StillProcessingWorkerJob

func (q *CacheWarmerQueues) StillProcessingWorkerJob(workerJob *WorkerJob) error

func (*CacheWarmerQueues) WriteWarmPathJob

func (q *CacheWarmerQueues) WriteWarmPathJob(job WarmPathJob) error

func (*CacheWarmerQueues) WriteWorkerJob

func (q *CacheWarmerQueues) WriteWorkerJob(workerjob *WorkerJob) error

type ComputeMetadata

type ComputeMetadata struct {
	SubscriptionId string `json:"subscriptionId"`
	ResourceGroup  string `json:"resourceGroupName"`
	Location       string `json:"location"`
	Name           string `json:"name"`
}

func GetComputeMetadata

func GetComputeMetadata() (*ComputeMetadata, error)

type FileToWarm

type FileToWarm struct {
	WarmFileFullPath string
	StartByte        int64
	StopByte         int64
}

func InitializeFileToWarm

func InitializeFileToWarm(warmFilePath string, startByte int64, stopByte int64) FileToWarm

type WarmPathJob

type WarmPathJob struct {
	WarmTargetMountAddresses []string
	WarmTargetExportPath     string
	WarmTargetPath           string
	InclusionList            []string
	ExclusionList            []string
	MaxFileSizeBytes         int64
	// contains filtered or unexported fields
}

WarmPathJob contains the information for a new job item

func InitializeWarmPathJob

func InitializeWarmPathJob(
	warmTargetMountAddresses string,
	warmTargetExportPath string,
	warmTargetPath string,
	inclusionCsv string,
	exclusionCsv string,
	maxFileSizeBytes int64) *WarmPathJob

InitializeWarmPathJob initializes the job submitter structure

func InitializeWarmPathJobFromString

func InitializeWarmPathJobFromString(warmPathJobContents string) (*WarmPathJob, error)

InitializeWarmPathJobFromString reads warmPathJobContents

func (*WarmPathJob) FileMatches

func (j *WarmPathJob) FileMatches(filename string, filesize int64) bool

func (*WarmPathJob) GetQueueMessageInfo

func (j *WarmPathJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)

func (*WarmPathJob) GetWarmPathJobFileContents

func (j *WarmPathJob) GetWarmPathJobFileContents() (string, error)

GetWarmPathJobFileContents returns the contents of the file

func (*WarmPathJob) SetQueueMessageInfo

func (j *WarmPathJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)

type WarmPathManager

type WarmPathManager struct {
	AzureClients *AzureClients
	WorkerCount  int64
	Queues       *CacheWarmerQueues
	// contains filtered or unexported fields
}

WarmPathManager contains the information for the manager

func InitializeWarmPathManager

func InitializeWarmPathManager(
	azureClients *AzureClients,
	workerCount int64,
	queues *CacheWarmerQueues,
	bootstrapMountAddress string,
	bootstrapExportPath string,
	bootstrapScriptPath string,
	vmssUserName string,
	vmssPassword string,
	vmssSshPublicKey string,
	vmssSubnet string,
	storageAccount string,
	storageKey string,
	queueNamePrefix string) *WarmPathManager

InitializeWarmPathManager initializes the job submitter structure

func (*WarmPathManager) EnsureVmssDeleted

func (m *WarmPathManager) EnsureVmssDeleted(ctx context.Context)

func (*WarmPathManager) EnsureVmssRunning

func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context)

func (*WarmPathManager) RunJobGenerator

func (m *WarmPathManager) RunJobGenerator(ctx context.Context, syncWaitGroup *sync.WaitGroup)

func (*WarmPathManager) RunVMSSManager

func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *sync.WaitGroup)

type WorkQueue

type WorkQueue struct {
	// contains filtered or unexported fields
}

RoundRobinPathManager round robins among the available paths

func InitializeWorkQueue

func InitializeWorkQueue() *WorkQueue

func (*WorkQueue) AddWorkItem

func (q *WorkQueue) AddWorkItem(fileToWarm FileToWarm)

func (*WorkQueue) GetNextWorkItem

func (q *WorkQueue) GetNextWorkItem() (FileToWarm, bool)

GetNextWorkItem retrieves the next workItem

func (*WorkQueue) IsEmpty

func (q *WorkQueue) IsEmpty() bool

func (*WorkQueue) WorkItemCount

func (q *WorkQueue) WorkItemCount() int

type Worker

type Worker struct {
	Queues *CacheWarmerQueues
	// contains filtered or unexported fields
}

Worker contains the information for the worker

func InitializeWorker

func InitializeWorker(queues *CacheWarmerQueues) *Worker

InitializeWorker initializes the job submitter structure

func (*Worker) QueueWork

func (w *Worker) QueueWork(localPaths []string, filenames []string) int

func (*Worker) RunWorkerManager

func (w *Worker) RunWorkerManager(ctx context.Context, syncWaitGroup *sync.WaitGroup)

type WorkerJob

type WorkerJob struct {
	WarmTargetMountAddresses []string
	WarmTargetExportPath     string
	WarmTargetPath           string
	StartByte                int64
	StopByte                 int64
	ApplyFilter              bool
	StartFileFilter          string
	EndFileFilter            string
	InclusionList            []string
	ExclusionList            []string
	MaxFileSizeBytes         int64
	// contains filtered or unexported fields
}

WorkerJob contains the information for a worker job item

func InitializeWorkerJob

func InitializeWorkerJob(
	warmTargetMountAddresses []string,
	warmTargetExportPath string,
	warmTargetPath string,
	inclusionList []string,
	exclusionList []string,
	maxFileSizeBytes int64) *WorkerJob

InitializeWorkerJob initializes the worker job structure

func InitializeWorkerJobForLargeFile

func InitializeWorkerJobForLargeFile(
	warmTargetMountAddresses []string,
	warmTargetExportPath string,
	warmTargetPath string,
	startByte int64,
	stopByte int64,
	inclusionList []string,
	exclusionList []string,
	maxFileSizeBytes int64) *WorkerJob

InitializeWorkerJob initializes the worker job structure

func InitializeWorkerJobFromString

func InitializeWorkerJobFromString(workerJobContents string) (*WorkerJob, error)

InitializeWorkerJobFromString reads warmPathJobContents

func InitializeWorkerJobWithFilter

func InitializeWorkerJobWithFilter(
	warmTargetMountAddresses []string,
	warmTargetExportPath string,
	warmTargetPath string,
	startFileFilter string,
	endFileFilter string,
	inclusionList []string,
	exclusionList []string,
	maxFileSizeBytes int64) *WorkerJob

InitializeWorkerJobWithFilter initializes the worker job structure

func (*WorkerJob) FilterFiles

func (j *WorkerJob) FilterFiles(dirEntries []os.FileInfo) []string

func (*WorkerJob) GetQueueMessageInfo

func (j *WorkerJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)

func (*WorkerJob) GetWorkerJobFileContents

func (j *WorkerJob) GetWorkerJobFileContents() (string, error)

GetWorkerJobFileContents returns the contents of the file

func (*WorkerJob) SetQueueMessageInfo

func (j *WorkerJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL