Documentation ¶
Index ¶
- Constants
- Variables
- func CheckAgent(agentUrl string) error
- func InitQueue(transferQueueSize int, storageQueueSize int, mfile string, minterval int64, ...)
- func NewRouter(interval string, agent *map[string]string, csvFile string) *cron.Cron
- func RedirectRequest(t *TransferRequest) error
- func SubmitRequest(j []Job, src string, dst string) error
- type AgentStatus
- type CallerFunc
- type Catalog
- func (c *Catalog) Add(entry CatalogEntry) error
- func (c *Catalog) Dump() []byte
- func (c *Catalog) Exec(stm, status, rid string) error
- func (c *Catalog) Files(dataset, block, lfn string) []string
- func (c *Catalog) GetStatus(rid string) (string, error)
- func (c *Catalog) GetTransfers(time0, time1 string) ([]TransferData, error)
- func (c *Catalog) InsertRequest(r TransferRequest) error
- func (c *Catalog) InsertTransfers(time int64, cpuUsage float64, memUsage float64, throughput float64)
- func (c *Catalog) ListRequest(query string) ([]TransferRequest, error)
- func (c *Catalog) PfnFiles(dataset, block, lfn string) []string
- func (c *Catalog) Records(req TransferRequest) []CatalogEntry
- func (c *Catalog) RetrieveRequest(r *TransferRequest) error
- func (c *Catalog) Snapshot() map[string][]string
- func (c *Catalog) Transfers(time0, time1 string) []CatalogEntry
- func (c *Catalog) UpdateRequest(rid string, status string) error
- type CatalogEntry
- type CentralCatalog
- type Decorator
- type Dispatcher
- type FileSystemStager
- func (s *FileSystemStager) Access(lfn string) string
- func (s *FileSystemStager) Exist(lfn string) bool
- func (s *FileSystemStager) Read(lfn string, chunk int64) ([]byte, error)
- func (s *FileSystemStager) Stage(lfn string) error
- func (s *FileSystemStager) Write(data []byte, lfn string) (string, int64, string, error)
- type Item
- type Job
- type Metrics
- type PriorityQueue
- func (pq *PriorityQueue) Delete(rid string) bool
- func (pq *PriorityQueue) GetAllRequest() []TransferRequest
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Swap(i, j int)
- type Processor
- type Record
- type Request
- type RequestFunc
- type Router
- type SourceStats
- type Stager
- type TransferData
- type TransferRequest
- type Worker
Constants ¶
const MIN_FLOAT = -1000000
MIN_FLOAT defin min float value if error occures while getting predictions set default value to MinFloat
Variables ¶
var DB *sql.DB
DB is global pointer to sql database object, it is initialized once when server starts
var DBTYPE string
DBTYPE holds database type, e.g. sqlite3
var DefaultProcessor = &Processor{}
DefaultProcessor is a default processor instance
var RouterModel bool
RouterModel tells if agent enable router
var StorageQueue chan Job
StorageQueue is a buffered channel that we can send work requests on.
var TransferDelayThreshold int
TransferDelayThreshold controls maximum threshold in seconds TransferRequest will wait before giving up
var TransferQueue chan Job
TransferQueue is an instance of dispatcher to handle the transfer process
var TransferType string
TransferType decides which pull or push based model is used
Functions ¶
func InitQueue ¶
func InitQueue(transferQueueSize int, storageQueueSize int, mfile string, minterval int64, monitorTime int64, router bool)
InitQueue initializes RequestQueue, transferQueue and StorageQueue
func RedirectRequest ¶
func RedirectRequest(t *TransferRequest) error
RedirectRequest sends request to appropriate agent(s) either based on routing predictions or to src/dst agents for push/pull model
Types ¶
type AgentStatus ¶
type AgentStatus struct { Url string `json:"url"` // agent url Name string `json:"name"` // agent name or alias TimeStamp int64 `json:"ts"` // time stamp Catalog string `json:"catalog"` // underlying TFC catalog Protocol string `json:"protocol"` // underlying transfer protocol Backend string `json:"backend"` // underlying transfer backend Tool string `json:"tool"` // underlying transfer tool, e.g. xrdcp ToolOpts string `json:"toolopts"` // options for backend tool Agents map[string]string `json:"agents"` // list of known agents Addrs []string `json:"addrs"` // list of all IP addresses Metrics map[string]int64 `json:"metrics"` // agent metrics CpuUsage float64 `json:"cpuusage"` // percentage of cpu used MemUsage float64 `json:"memusage"` // Avg RAM used in MB }
AgentStatus data type
func (*AgentStatus) String ¶
func (a *AgentStatus) String() string
String provides string representation of given agent status
type CallerFunc ¶
type CallerFunc func(agent, src, dst string)
CallerFunc type func(string, string, string)
func AuthzDecorator ¶
func AuthzDecorator(fn CallerFunc, policy string) CallerFunc
AuthzDecorator provides skeleton for performing authorization check with given function
type Catalog ¶
type Catalog struct { Type string `json:"type"` // catalog type, e.g. sqlite3, etc. Uri string `json:"uri"` // catalog uri, e.g. file.db Login string `json:"login"` // database login Password string `json:"password"` // database password Owner string `json:"owner"` // used by ORACLE DB, defines owner of the database }
Catalog represents Trivial File Catalog (TFC) of the model
var TFC Catalog
TFC stands for Trivial File Catalog
func (*Catalog) Add ¶
func (c *Catalog) Add(entry CatalogEntry) error
Add method adds entry to a catalog
func (*Catalog) GetTransfers ¶
func (c *Catalog) GetTransfers(time0, time1 string) ([]TransferData, error)
GetTransfers provide details about transfers in given time interval
func (*Catalog) InsertRequest ¶
func (c *Catalog) InsertRequest(r TransferRequest) error
InsertRequest inserts new request
func (*Catalog) InsertTransfers ¶
func (c *Catalog) InsertTransfers(time int64, cpuUsage float64, memUsage float64, throughput float64)
InsertTransfers inserts new row to TRANSFERS table
func (*Catalog) ListRequest ¶
func (c *Catalog) ListRequest(query string) ([]TransferRequest, error)
ListRequest gets specific type of transfer requests according to status
func (*Catalog) Records ¶
func (c *Catalog) Records(req TransferRequest) []CatalogEntry
Records returns catalog records for a given transfer request
func (*Catalog) RetrieveRequest ¶
func (c *Catalog) RetrieveRequest(r *TransferRequest) error
RetrieveRequest gets the request details based on request id
func (*Catalog) Snapshot ¶
Snapshot returns a snapshot of the TFC catalog and return it as a map which holds table names and list of rows where each row is represented as a comma separated values
func (*Catalog) Transfers ¶
func (c *Catalog) Transfers(time0, time1 string) []CatalogEntry
Transfers method returns transfers of the agent in given time interval
type CatalogEntry ¶
type CatalogEntry struct { Lfn string `json:"lfn"` // lfn stands for Logical File Name Pfn string `json:"pfn"` // pfn stands for Physical File Name Dataset string `json:"dataset"` // dataset represents collection of blocks Block string `json:"block"` // block idetify single block within a dataset Bytes int64 `json:"bytes"` // size of the files in bytes Hash string `json:"hash"` // hash represents checksum of the pfn TransferTime int64 `json:"transferTime"` // transfer time Timestamp int64 `json:"timestamp"` // time stamp }
CatalogEntry represents an entry in TFC
func GetRecords ¶
func GetRecords(tr TransferRequest, agent string) ([]CatalogEntry, error)
GetRecords get catalog entries from given agent
func (*CatalogEntry) String ¶
func (c *CatalogEntry) String() string
String provides string representation of CatalogEntry
type CentralCatalog ¶
type CentralCatalog struct {
Path string `json:"path"` // path to central catalog
}
CentralCatalog represent structure of Central Catalog it is represneted by list of tables where each table contains list of records
var CC CentralCatalog
CC reprents isntance of CentralCatalog
type Decorator ¶
Decorator wraps a request with extra behavior
func PullTransfer ¶
func PullTransfer() Decorator
PullTransfer returns a Decorator that performs request transfers by pull model
func PushTransfer ¶
func PushTransfer() Decorator
PushTransfer returns a Decorator that performs request transfers
type Dispatcher ¶
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher JobPool chan chan Job MaxWorkers int BufferSize int }
Dispatcher implementation
func NewDispatcher ¶
func NewDispatcher(maxWorkers int, bufferSize int) *Dispatcher
NewDispatcher returns new instance of Dispatcher type
func (*Dispatcher) StorageRunner ¶
func (d *Dispatcher) StorageRunner()
StorageRunner function starts the worker and dispatch it as go-routine
func (*Dispatcher) TransferRunner ¶
func (d *Dispatcher) TransferRunner()
TransferRunner function starts the worker and dispatch it as go-routine
type FileSystemStager ¶
type FileSystemStager struct { Pool string // pool area on file system Catalog Catalog // TFC catalog of the agent }
FileSystemStager defines simple file-based stager
var AgentStager *FileSystemStager
AgentStager represent instance of agent's stager
func NewStager ¶
func NewStager(pool string, catalog Catalog) *FileSystemStager
NewStager returns new instance of Dispatcher type
func (*FileSystemStager) Access ¶
func (s *FileSystemStager) Access(lfn string) string
Access provides access path to the file
func (*FileSystemStager) Exist ¶
func (s *FileSystemStager) Exist(lfn string) bool
Exist implements exists functionality of the Stager interface
func (*FileSystemStager) Read ¶
func (s *FileSystemStager) Read(lfn string, chunk int64) ([]byte, error)
Read implements read functionality of the Stager interface this function get file associated with lfn from the pool area and return its content
func (*FileSystemStager) Stage ¶
func (s *FileSystemStager) Stage(lfn string) error
Stage implements stage functionality of the Stager interface this function takes given lfn and place into internal pool area
type Item ¶
type Item struct { Value TransferRequest // contains filtered or unexported fields }
An Item is something we manage in a priority queue.
type Job ¶
type Job struct { TransferRequest TransferRequest `json:"request"` // TransferRequest Action string `json:"action"` // Action to apply to TransferRequest, e.g. delete or transfer }
Job represents the job to be run
func (*Job) RequestSuccess ¶
func (j *Job) RequestSuccess()
RequestSuccess function to handle success jobs
func (*Job) UpdateRequest ¶
UpdateRequest sends request to main agent to update request status in its persistent store (REQUESTS table)
type Metrics ¶
type Metrics struct { In metrics.Counter // number of live transfer requests Failed metrics.Counter // number of failed transfer requests Total metrics.Counter // total number of transfer requests TotalBytes metrics.Counter // total number of bytes by this agent Bytes metrics.Counter // number of bytes in progress CpuUsage metrics.GaugeFloat64 // CPU usage in percentage MemUsage metrics.GaugeFloat64 // Memory usage in MB Tick metrics.Counter // Store cpu ticks MaxTick int64 // Max tick after which reset metrics }
Metrics of the agent
var AgentMetrics Metrics
AgentMetrics defines various metrics about the agent work
func (*Metrics) GetCurrentStats ¶
func (m *Metrics) GetCurrentStats()
GetCurrentStats function to get current system usage
type PriorityQueue ¶
type PriorityQueue []*Item
A PriorityQueue implements heap.Interface and holds Items.
var RequestQueue PriorityQueue
RequestQueue is a queue to sort the requests according to priority.
func (*PriorityQueue) Delete ¶
func (pq *PriorityQueue) Delete(rid string) bool
Delete request from PriorityQueue. The complexity is O(n) where n = heap.Len()
func (*PriorityQueue) GetAllRequest ¶
func (pq *PriorityQueue) GetAllRequest() []TransferRequest
GetAllRequest gets the entire list of requests
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
Less provides less function for PriorityQueue
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
Pop provides pop function for PriorityQueue
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
Push provides push function for PriorityQueue
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
Swap provides swap function for PriorityQueue
type Processor ¶
type Processor struct { }
Processor is an object who process' given task The logic of the Processor should be implemented.
func (*Processor) Process ¶
func (e *Processor) Process(t *TransferRequest) error
Process defines execution process for a given task
type Record ¶
type Record map[string]interface{}
Record represent main DB record we work with
var DBSQL Record
DBSQL represent common record we get from DB SQL statement
type Request ¶
type Request interface {
Process(*TransferRequest) error
}
Request interface defines a task process
type RequestFunc ¶
type RequestFunc func(*TransferRequest) error
RequestFunc is a function type that implements the Request interface
func (RequestFunc) Process ¶
func (f RequestFunc) Process(t *TransferRequest) error
Process is a method of TransferRequest
type Router ¶
type Router struct { CronInterval string // Helps to set hourly based cron job LinearRegression *regression.Regression // machine learning model CSVfile string // historical data file Agents *map[string]string // list of connected agents }
Router structure defines attributes of the router
var AgentRouter Router
AgentRouter helps to call router's methods
func (*Router) FindSource ¶
func (r *Router) FindSource(tr *TransferRequest) ([]SourceStats, int, error)
FindSource finds appropriate source agent(s) for given transfer request
func (*Router) InitialTrain ¶
func (r *Router) InitialTrain()
InitialTrain function to train router by previous data(After restarting it)
type SourceStats ¶
type SourceStats struct { SrcUrl string SrcAlias string Jobs []Job // contains filtered or unexported fields }
SourceStats structure to store source informations
func GetUnionCatalog ¶
func GetUnionCatalog(tRequest *TransferRequest) (*set.SetNonTS, []SourceStats, map[string][]string)
GetUnionCatalog function to get the union of files
type Stager ¶
type Stager interface { Stage(lfn string) error Read(lfn string, chunk int64) ([]byte, error) Write([]byte) (string, int64, string) Exist(lfn string) bool Access(lfn string) string }
Stager interface defines abstract functionality of the file stage system
type TransferData ¶
type TransferData struct { Timestamp int64 `json:"timestamp"` // Helps to get data historically CpuUsage float64 `json:"cpu"` // percentage of cpu used MemUsage float64 `json:"ram"` // ram used in MB Throughput float64 `json:"throughput"` // network throughput during transfer in MB }
TransferData helps to structure the rows of transfers table
type TransferRequest ¶
type TransferRequest struct { TimeStamp int64 `json:"ts"` // timestamp of the request Lfn string `json:"file"` // LFN name to be transferred Block string `json:"block"` // block name to be transferred Dataset string `json:"dataset"` // dataset name to be transferred SrcUrl string `json:"srcUrl"` // source agent URL which initiate the transfer SrcAlias string `json:"srcAlias"` // source agent name DstUrl string `json:"dstUrl"` // destination agent URL which will consume the transfer DstAlias string `json:"dstAlias"` // destination agent name RegUrl string `json:"regUrl"` // registration agent url (main agent) RegAlias string `json:"regAlias"` // registration agent name Delay int `json:"delay"` // transfer delay time, i.e. post-pone transfer Id string `json:"id"` // unique id of each request Priority int `json:"priority"` // priority of request Status string `json:"status"` // Identify the category of request }
TransferRequest data type
func ResolveRequest ¶
func ResolveRequest(t TransferRequest) []TransferRequest
ResolveRequest will resolve input transfer request into series of requests with known lfn/block/dataset triplets
func (*TransferRequest) Clone ¶
func (t *TransferRequest) Clone() TransferRequest
Clone provides copy of transfer request
func (*TransferRequest) Delete ¶
func (t *TransferRequest) Delete() error
Delete performs deletion of transfer request
func (*TransferRequest) RunPull ¶
func (t *TransferRequest) RunPull() error
RunPull method perform a job on transfer request. It will use pull model
func (*TransferRequest) RunPush ¶
func (t *TransferRequest) RunPush() error
RunPush method perform a job on transfer request. It will use push model
func (*TransferRequest) Store ¶
func (t *TransferRequest) Store() error
Store method stores a job in heap and db
func (*TransferRequest) String ¶
func (t *TransferRequest) String() string
String method return string representation of transfer request
func (*TransferRequest) UUID ¶
func (t *TransferRequest) UUID() string
UUID generates unique id for transfer request
type Worker ¶
type Worker struct { Id int JobPool chan chan Job JobChannel chan Job // contains filtered or unexported fields }
Worker represents the worker that executes the job