processor

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2021 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package processor is one of the core entities of the downloader. It facilitates the processing of Jobs. Its main responsibility is to manage the creation and destruction of workerPools, which actually perform the Job download process.

Each WorkerPool processes jobs belonging to a single aggregation and is in charge of imposing the corresponding rate-limit rules. Job routing for each Aggregation is performed through a redis list which is popped periodically by each WorkerPool. Popped jobs are then published to the WorkerPool's job channel. worker pools spawn worker goroutines (up to a max concurrency limit set for each aggregation) that consume from the aforementioned job channel and perform the actual download.

-----------------------------------------
|              Processor                |
|                                       |
|    ----------          ----------     |
|    |   WP   |          |   WP   |     |
|    |--------|          |--------|     |
|    |   W    |          |  W  W  |     |
|    | W   W  |          |  W  W  |     |
|    ----------          ----------     |
|                                       |
-----------------------------------------

Cancellation and shutdown are coordinated through the use of contexts all along the stack. When a shutdown signal is received from the application it propagates from the processor to the active worker pools, stopping any in-progress jobs and gracefully shutting down the corresponding workers.

Index

Constants

This section is empty.

Variables

View Source
var (
	// RetryBackoffDuration indicates the time to wait between retries.
	RetryBackoffDuration = 2 * time.Minute
)

Functions

This section is empty.

Types

type Processor

type Processor struct {
	Storage *storage.Storage

	// ScanInterval is the amount of seconds to wait before re-scanning
	// Redis for new Aggregations.
	ScanInterval int

	// StorageDir is the filesystem location where the actual downloads
	// will be saved.
	StorageDir string

	// The client that will be used for the download requests
	Client *http.Client

	// The default request headers that will be used for file
	// downloads in case a job does not specify any request headers.
	RequestHeaders map[string]string

	Log *log.Logger

	// Interval between each stats flush
	StatsIntvl time.Duration
	// contains filtered or unexported fields
}

Processor is the main entity of the downloader. For more info of its architecture see package level doc.

func New

func New(storage *storage.Storage, scanInterval int, storageDir string, logger *log.Logger) (Processor, error)

New initializes and returns a Processor, or an error if storageDir is not writable.

TODO: only add REQUIRED arguments, the rest should be set from the struct

func (*Processor) Start

func (p *Processor) Start(closeCh chan struct{})

Start starts p.

It spawns helpers goroutines & starts spawning worker pools by scanning Redis for new Aggregations

Directories

Path Synopsis
Package diskcheck provides a communication channel for checking the disk health.
Package diskcheck provides a communication channel for checking the disk health.
Package errors contains types and interfaces representing download errors.
Package errors contains types and interfaces representing download errors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL