commoncrawl

package
v0.0.0-...-91368a2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2018 License: GPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PhaseMap indicates that the common crawl processing is in map phase
	PhaseMap = "map"
	// PhaseReduce indicates that the common crawl processing is in reduce phase
	PhaseReduce = "reduce"
)
View Source
const (
	// PublicDatasetsPrefix is the source of all WARC files
	PublicDatasetsPrefix = "https://commoncrawl.s3.amazonaws.com/"
	// DownloadTimeout represents the time that the downloader will sit idle
	// before emitting a checkpoint signal.
	DownloadTimeout = time.Second * 10
	// DownloadDelay is the interval between each file download
	// This is to make sure that file downloads are staggered so that
	// a somewhat steady stream of data flows out of the system
	// without huge bursts.
	DownloadDelay = time.Millisecond
)
View Source
const (
	// MonitorLogInterval is interval between each log message from the monitor
	MonitorLogInterval = time.Second * 10
	// MonitorCompletedFilesOutputInterval - The amount of time between each check for completed work items
	// If the monitor detects completed files, an attempt is made to send them
	// through the completed work items output channel. If the send is blocked,
	// the monitor tries again after the next duration.
	MonitorCompletedFilesOutputInterval = time.Millisecond * 100
	// ProcessStuckTimeout is the amount of time the monitor will wait for some update
	// before killing the process
	ProcessStuckTimeout = time.Minute * 10
	// FileStuckTimeout is the amount of time the monitor will wait for some update to
	// a file before giving up on processing it.
	FileStuckTimeout = time.Minute * 5
	// FileCompletionCooldown is the minimum time since last update before marking a file as completed
	FileCompletionCooldown = time.Second * 9
)
View Source
const (
	// BufferSize is set to 1K. Certain channels that require a buffer will have one of this size.
	BufferSize = 100
)
View Source
const (
	// CompletedFilenamesPath is the path to the file containing the list of completed WARC files
	CompletedFilenamesPath = "completed-files.txt"
)
View Source
const (
	// DefaultOutputStoreCapacity is set to 100K. This is the initial capacity of the OutputStore.
	DefaultOutputStoreCapacity = 100 * 1000
)

Variables

This section is empty.

Functions

func CatchFatalError

func CatchFatalError(logger logrus.FieldLogger) func()

CatchFatalError returns a function that can be used to report panics. Once the error is reported, the process exits abnormally.

Usage: ``` defer commoncrawl.CatchFatalError(logger)() ```

func CreateFileIfAbsent

func CreateFileIfAbsent(filename string) error

CreateFileIfAbsent creates the specified file if it does not already exist.

* filename - Name of the file that should be created

func LoadFileLines

func LoadFileLines(filename string) ([]string, error)

LoadFileLines reads a file into a slice of strings Each line has whitespace trimmed.

* filename - Name of the file that should be loaded

func SegmentOutputItems

func SegmentOutputItems(items []OutputItem, maxSegmentSize int, callback func([]OutputItem))

SegmentOutputItems will break up a slice of output items into multiple segments, invoking the callback for each segment. All segments will be at most maxSegmentSize.

* items - Input list of items to break into segments * maxSegmentSize - No segment should be larger than this * callback - Invoke for each segment

func SerializeOutputItemsInParallel

func SerializeOutputItemsInParallel(items []OutputItem, logger logrus.FieldLogger) [][]byte

SerializeOutputItemsInParallel converts output items into JSON byte slices using every CPU available.

* items - The items that need to be serialized into json * logger - Log events

func StringSetToSlice

func StringSetToSlice(strings map[string]bool) []string

StringSetToSlice converts a set of strings into a slice of strings

func StringSliceToSet

func StringSliceToSet(strings []string) map[string]bool

StringSliceToSet converts a slice of strings into a set of strings

func WithRetries

func WithRetries(clock clockwork.Clock, retries int, operation func() error) error

WithRetries will continue to attempt an operation up to the specified number of retries with an exponential backoff. If the operation is still unsuccessful after all retries are exhausted (operation returns an error), that error is returned.

func WriteFileLines

func WriteFileLines(filename string, lines []string) error

WriteFileLines writes a slice of strings to a file Each string is written to the file, followed by a newline character.

* filename - Name of the destination file * lines - Slice of strings that should be written

Types

type BlobReader

type BlobReader struct {
	// contains filtered or unexported fields
}

BlobReader implements the Reader interface. It will read an open file and output a stream of byte slices. It expects input files to be in gzip format as newline delimited text. Newline characters are not included in the output.

func NewBlobReader

func NewBlobReader(logger logrus.FieldLogger) *BlobReader

NewBlobReader returns a new instance of BlobReader

* logger - Log events

func (*BlobReader) Read

func (reader *BlobReader) Read(file io.ReadCloser, filename string, callback func(interface{}))

Reads a file and sends a stream of record objects to callback

* file - The open file that should be read * callback - Should be called for each record that is read

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

The Controller bootstraps the common crawl process

func NewController

func NewController(
	conf *ControllerConfiguration,
) *Controller

NewController returns a new instance of `Controller`

* configuration - Contains all necessary dependencies for the process to function.

func (*Controller) Run

func (controller *Controller) Run()

Run launches its own goroutine. It will continue running until an endOfInput signal is received, after which it will emit an endOfProcess signal and terminate.

type ControllerConfiguration

type ControllerConfiguration struct {
	AWSRegion         string             // Required - AWS region to use for AWS services
	TaskConfiguration *TaskConfiguration // Required - Input channel of filenames to process
	InputSource       InputSource        // Required - Produces work items to be processed
	EndOfProcess      chan<- bool        // Required - Output channel for end of process signal
	RecordReader      RecordReader       // Required - Reads an open file into a stream of records

	Plugins map[string]Plugin // Plugins that will provide specific mappers and reducers for processing

	// TODO: configure output writer from plugin
	OutputFileWriter OutputFileWriter   // Optional - Writer to save data to output files
	StatusWriter     StatusWriter       // Optional - Sends status notifications.
	HTTPClient       HTTPClient         // Optional - Client to make http calls to S3. Defaults to http.DefaultClient
	Clock            clockwork.Clock    // Optional - Pauses and checks the time. Defaults to an instance of RealClock
	Logger           logrus.FieldLogger // Optional - passes in a logger that will be used by all components

}

A ControllerConfiguration contains the necessary dependencies that a Controller will need to function. Some dependencies are optional and have default values.

type DefaultWorkItemReader

type DefaultWorkItemReader struct{}

DefaultWorkItemReader implements the WorkItemReader interface. WorkItems are produced with a WorkItemId and a single source file that are the same as the input message.

func NewDefaultWorkItemReader

func NewDefaultWorkItemReader() *DefaultWorkItemReader

NewDefaultWorkItemReader returns a new instance of DefaultWorkItemReader

func (*DefaultWorkItemReader) ReadWorkItem

func (workItemReader *DefaultWorkItemReader) ReadWorkItem(message string) (*WorkItem, error)

ReadWorkItem parses the message into a `WorkItem` instance.

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

Downloader coordinates WARC file downloads

func NewDownloader

func NewDownloader(
	fileDownloadFactory FileDownloadFactory,
	clock clockwork.Clock,
	workItemsInput <-chan *WorkItem,
	workItemReceivedListener chan<- *WorkItem,
	downloadOutput chan<- FileDownload,
	logger logrus.FieldLogger,
) *Downloader

NewDownloader returns a new instance of Downloader

* fileDownloadFactory - Creates FileDownload instances * clock - Used for expoential backoff * checkpointSize - The number of files to emit before emitting a checkpoint signal * filenamesInput - Input channel of file paths to download * downloadOutput - Output channel of `FileDownload` instances * logger - Log events

func (*Downloader) Run

func (downloader *Downloader) Run()

Run launches its own goroutine. It will begin pulling filenames from filenamesInput and emitting FileDownload objects from downloadOutput. Once a checkpoint is reached, a checkpoint signal will be emitted from checkpointsOutput. The value of the checkpoint signal will be the number of files processed since the last checkpoint. This may be less than the checkpoint size if the downloader has stopped receiving file names from filenamesInput.

type FileDownload

type FileDownload interface {
	// Open gets a handle on an open data file
	Open() (io.ReadCloser, error)
	// Filename returns the name of the data file
	Filename() string
	// WorkItemID returns the work item associated with the file
	WorkItemID() string
}

FileDownload represents a data file that can be downloaded

type FileDownloadFactory

type FileDownloadFactory interface {
	// NewFileDownload returns a new FileDownload object
	//
	// * workItemID - Work item associated with the file
	// * filename - Full path to the file to download
	NewFileDownload(workItemID string, filename string) FileDownload
}

A FileDownloadFactory creates FileDownload instances

func NewFileDownloadFactory

func NewFileDownloadFactory(region string, conf *TaskConfiguration, clock clockwork.Clock, httpClient HTTPClient, logger logrus.FieldLogger) FileDownloadFactory

NewFileDownloadFactory returns a new FileDownloadFactory based on configuration

type FileMockHTTPClient

type FileMockHTTPClient struct {
	Clock   clockwork.Clock
	Latency time.Duration
}

FileMockHTTPClient emulates an http client by reading files out of testdata/

func (*FileMockHTTPClient) Get

func (client *FileMockHTTPClient) Get(url string) (*http.Response, error)

Get gets the file name from the specified URL and opens it as a file from the testdata directory. The only populated field of the returned Response is the Body, which is a handle on the open file.

type FileStatus

type FileStatus struct {
	LastUpdate       time.Time
	RecordsParsed    int
	RecordsProcessed int
	OutputsMapped    int
	OutputsReduced   int
	EndOfFile        bool
}

FileStatus represents the current state of an individual file that is being processed.

type FilesystemInput

type FilesystemInput struct {
	// contains filtered or unexported fields
}

FilesystemInput reads in a list of WARC file paths from a local file and pushes each file path to an output channel for downstream processing. Files that have already been marked as completed from previous checkpoints will not be sent.

func NewFilesystemInput

func NewFilesystemInput(indexFile string, workItemReader WorkItemReader, logger logrus.FieldLogger) *FilesystemInput

NewFilesystemInput returns a new instance of FilesystemInput

* filenamesOutput Output channel of WARC file paths

func (*FilesystemInput) AbortedWorkItemsInput

func (input *FilesystemInput) AbortedWorkItemsInput() chan<- string

AbortedWorkItemsInput returns an input channel of aborted work item ids

func (*FilesystemInput) CompletedWorkItemsInput

func (input *FilesystemInput) CompletedWorkItemsInput() chan<- string

CompletedWorkItemsInput returns an input channel of completed work item ids

func (*FilesystemInput) Run

func (input *FilesystemInput) Run()

Run launches its own goroutine. It will read the list of file paths found in `indexFile` and emit each path to `filenamesOutput`.

func (*FilesystemInput) WorkItemsOutput

func (input *FilesystemInput) WorkItemsOutput() <-chan *WorkItem

WorkItemsOutput returns an output channel of WorkItem

type FlushRequest

type FlushRequest struct {
	WorkItemID string                                  // ID of work item data to flush
	Callback   chan<- map[string]map[string]OutputItem // Callback for flushed data
}

FlushRequest represents a request to flush the OutputStore

type HTTPClient

type HTTPClient interface {
	// Get issues a GET to the specified URL
	Get(url string) (*http.Response, error)
}

HTTPClient is a stripped-down interface for fetching files over http. `http.DefaultClient` satisfies this interface.

type HTTPFileDownload

type HTTPFileDownload struct {
	// contains filtered or unexported fields
}

HTTPFileDownload represents a connection to a WARC file in the common crawl public dataset

func (*HTTPFileDownload) Filename

func (fileDownload *HTTPFileDownload) Filename() string

Filename returns the name of the WARC file

func (*HTTPFileDownload) Open

func (fileDownload *HTTPFileDownload) Open() (io.ReadCloser, error)

Open gets a handle on an open WARC file from S3

func (*HTTPFileDownload) WorkItemID

func (fileDownload *HTTPFileDownload) WorkItemID() string

WorkItemID returns the work item associated with the file

type HTTPFileDownloadFactory

type HTTPFileDownloadFactory struct {
	// contains filtered or unexported fields
}

HTTPFileDownloadFactory creates HTTPFileDownload instances

func NewHTTPFileDownloadFactory

func NewHTTPFileDownloadFactory(datasetHost string, httpClient HTTPClient, clock clockwork.Clock) *HTTPFileDownloadFactory

NewHTTPFileDownloadFactory returns a new instance of HTTPFileDownloadFactory

* httpClient - HTTP client that will be used to download * clock - Used for exponential backoff

func (*HTTPFileDownloadFactory) NewFileDownload

func (factory *HTTPFileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload

NewFileDownload returns a new HTTPFileDownload object

* filename - Full path to the file to download

type InputSource

type InputSource interface {
	// Run launches its own goroutine.
	// Begins reading input and producing WorkItem instances for processing.
	Run()
	// WorkItemsOutput returns an output channel of WorkItem
	WorkItemsOutput() <-chan *WorkItem
	// CompletedWorkItemsInput returns an input channel of completed work item ids
	CompletedWorkItemsInput() chan<- string
}

An InputSource represents the source of work items to be processed

type JSONReader

type JSONReader struct {
	*BlobReader
	// contains filtered or unexported fields
}

JSONReader implements the Reader interface. It will read an open file and output a stream of records. It is designed to capture the output of the default OutputWriter implementations that are provided for writing data extracted from the common crawl. This allows for additional processing to be done on extracted data.

func NewJSONReader

func NewJSONReader(recordFactory func() interface{}, logger logrus.FieldLogger) *JSONReader

NewJSONReader returns a new instance of JSONReader

* logger - Log events

func (*JSONReader) Read

func (reader *JSONReader) Read(file io.ReadCloser, filename string, callback func(interface{}))

Reads a file and sends a stream of record objects to callback

* file - The open file that should be read * callback - Should be called for each record that is read

type LambdaHandler

type LambdaHandler struct {
	// contains filtered or unexported fields
}

LambdaHandler provides functionality to pull tasks off of an SQS queue that will configure a common crawl lambda task

func NewLambdaHandler

func NewLambdaHandler(task func(*TaskConfiguration), logger logrus.FieldLogger) *LambdaHandler

NewLambdaHandler returns a new instance of `LambdaHandler`

func (*LambdaHandler) Handle

func (handler *LambdaHandler) Handle() error

Handle pulls a message off of SQS and parses it into a `TaskConfiguration` struct

type LocalFileDownload

type LocalFileDownload struct {
	// contains filtered or unexported fields
}

LocalFileDownload points to a source file in the local file system

func (*LocalFileDownload) Filename

func (fileDownload *LocalFileDownload) Filename() string

Filename returns the name of the file

func (*LocalFileDownload) Open

func (fileDownload *LocalFileDownload) Open() (io.ReadCloser, error)

Open gets a handle on an open data file

func (*LocalFileDownload) WorkItemID

func (fileDownload *LocalFileDownload) WorkItemID() string

WorkItemID returns the work item associated with the file

type LocalFileDownloadFactory

type LocalFileDownloadFactory struct {
}

LocalFileDownloadFactory creates LocalFileDownload instances

func NewLocalFileDownloadFactory

func NewLocalFileDownloadFactory() *LocalFileDownloadFactory

NewLocalFileDownloadFactory returns a new instance of LocalFileDownloadFactory

func (*LocalFileDownloadFactory) NewFileDownload

func (factory *LocalFileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload

NewFileDownload returns a new LocalFileDownload object

* workItemID - the associated work item * filename - Full path to the file to download

type LocalOutputFileWriter

type LocalOutputFileWriter struct{}

LocalOutputFileWriter implements the OutputFileWriter interface. Writes files to the local file system

func (*LocalOutputFileWriter) WriteOutputFile

func (fileWriter *LocalOutputFileWriter) WriteOutputFile(filepath string, data []byte) error

WriteOutputFile writes out the specified file to the local filesystem

type LogStatusWriter

type LogStatusWriter struct {
	// contains filtered or unexported fields
}

LogStatusWriter sends status notifications to the console

func NewLogStatusWriter

func NewLogStatusWriter(logger logrus.FieldLogger) *LogStatusWriter

NewLogStatusWriter returns a new instance of LogStatusWriter

func (*LogStatusWriter) WriteStatus

func (writer *LogStatusWriter) WriteStatus(status ProcessStatus)

WriteStatus sends a notification about the current state of the system to the console.

type MockHTTPClient

type MockHTTPClient struct {
	mock.Mock
}

MockHTTPClient is an autogenerated mock type for the HTTPClient type

func (*MockHTTPClient) Get

func (_m *MockHTTPClient) Get(url string) (*http.Response, error)

Get provides a mock function with given fields: url

type MockInputSource

type MockInputSource struct {
	mock.Mock
}

MockInputSource is an autogenerated mock type for the InputSource type

func (*MockInputSource) AbortedWorkItemsInput

func (_m *MockInputSource) AbortedWorkItemsInput() chan<- string

AbortedWorkItemsInput provides a mock function with given fields:

func (*MockInputSource) CompletedWorkItemsInput

func (_m *MockInputSource) CompletedWorkItemsInput() chan<- string

CompletedWorkItemsInput provides a mock function with given fields:

func (*MockInputSource) Run

func (_m *MockInputSource) Run()

Run provides a mock function with given fields:

func (*MockInputSource) WorkItemsOutput

func (_m *MockInputSource) WorkItemsOutput() <-chan *WorkItem

WorkItemsOutput provides a mock function with given fields:

type MockOutputFileWriter

type MockOutputFileWriter struct {
	mock.Mock
}

MockOutputFileWriter is an autogenerated mock type for the OutputFileWriter type

func (*MockOutputFileWriter) WriteOutputFile

func (_m *MockOutputFileWriter) WriteOutputFile(filepath string, data []byte) error

WriteOutputFile provides a mock function with given fields: filepath, data

type MockStatusWriter

type MockStatusWriter struct {
	mock.Mock
}

MockStatusWriter is an autogenerated mock type for the StatusWriter type

func (*MockStatusWriter) GetStatusCalls

func (_m *MockStatusWriter) GetStatusCalls() []ProcessStatus

GetStatusCalls returns a list of all ProcessStatus objects passed into WriteStatus

func (*MockStatusWriter) WriteStatus

func (_m *MockStatusWriter) WriteStatus(status ProcessStatus)

WriteStatus provides a mock function with given fields: status

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

The Monitor receives signals from other parts in the system, prints out aggregated metrics at a regular interval, and emits a signal when the system is ready to perform a save of collected data.

func NewMonitor

func NewMonitor(
	statusWriter StatusWriter,
	clock clockwork.Clock,
	recordsParsedInput <-chan string,
	recordsProcessedInput <-chan string,
	outputsMappedInput <-chan string,
	outputsReducedInput <-chan string,
	workItemReceivedInput <-chan *WorkItem,
	filesParsingInput <-chan string,
	filesParsedInput <-chan string,
	completedWorkItemsOutput chan<- string,
	endOfInput chan<- bool,
	logger logrus.FieldLogger,
) *Monitor

NewMonitor returns a new instance of Monitor

* statusWriter - Sends status notifications * clock - Emits events at an interval * checkpointSize - Maximum number of files to complete before saving * recordsParsedInput - Input channel of parsed records count * recordsProcessedInput - Input channel of processed records count * outputsMappedInput - Input channel of mapped outputs * outputsReducedInput - Input channel of reduced outputs * workItemReceivedInput - Input channel of work items * filesParsingInput - Input channel of files being parsed * filesParsedInput - Input channel of files fully parsed * checkpointFilesOutput - Output channel of files completed since last checkpoint * endOfInput - Output channel indicating end of input (process complete) * logger - Log events

func (*Monitor) Run

func (monitor *Monitor) Run()

Run launches its own goroutine. It will proceed to monitor inputs and emit outputs as designed, while printing out a summary of the state of the process to the console at a regular interval.

type OutputFileWriter

type OutputFileWriter interface {
	// Write out the specified file
	WriteOutputFile(filepath string, data []byte) error
}

OutputFileWriter writes specific output files.

func NewOutputFileWriter

func NewOutputFileWriter(region string, conf *TaskConfiguration) OutputFileWriter

NewOutputFileWriter returns a new instance of OutputFileWriter based on configuration

type OutputFilter

type OutputFilter interface {
	// TestOutputItem returns true if the output item should be saved
	TestOutputItem(item OutputItem) bool
}

An OutputFilter can modify the stream of output items before they are processed by the OutputWriter

type OutputItem

type OutputItem interface {
	// OutputSet indicates the output folder for the data
	OutputSet() string
	// Key is the unique identifier for this output.
	Key() string
	// Value is the data contained by the output. This can be the same object as the OutputItem.
	Value() interface{}
}

OutputItem represents output data produced when processing a `WARCRecord`.

type OutputItems

type OutputItems []OutputItem

OutputItems implements sort.Interface for []OutputItem based on the Key() return value.

func (OutputItems) Len

func (a OutputItems) Len() int

func (OutputItems) Less

func (a OutputItems) Less(i, j int) bool

func (OutputItems) Swap

func (a OutputItems) Swap(i, j int)

type OutputReducer

type OutputReducer interface {
	// Reduce merges two OutputItems with the same key into a single item.
	// The details of how the merge is performed is dependent on the implementation.
	Reduce(existingItem OutputItem, newItem OutputItem) OutputItem
}

An OutputReducer is responsible for reducing the values of OutputItems with the same key into a single OutputItem.

type OutputStore

type OutputStore struct {
	// contains filtered or unexported fields
}

The OutputStore accumulates data that is output by the processor pool

Each OutputItem that is sent to the OutputStore is accumulated by unique key returned by the Key() method. Upon receiving a flush signal, the OutputStore will send the current data set and clear it, resetting the OutputStore to its initial state.

func NewOutputStore

func NewOutputStore(
	reducers map[string]OutputReducer,
	dataInput <-chan RecordOutput,
	flushSignal <-chan FlushRequest,
	outputReducedOutput chan<- string,
	stuckFileInput <-chan string,
	logger logrus.FieldLogger,
) *OutputStore

NewOutputStore returns a new instance of OutputStore

* reducer - Used for reducing OutputItems * dataInput - Input channel of OutputItems * flushSignal - InputChannel of flush signals * outputReducedOutput - Output channel of reduced outputs

func (*OutputStore) Run

func (store *OutputStore) Run()

Run launches its own goroutine. The store will listen to both input channels. When it receives OutputItems from the dataInput channel, it will use the reducer to combine it with any existing OutputItem and save the result, or simply save the OutputItem if no previous entry exists. Upon receiving a flush signal, the entire data set will be returned on the provided channel, and a new empty dataset will replace it in the store.

type OutputWriter

type OutputWriter struct {
	// contains filtered or unexported fields
}

The OutputWriter is responsible for persisting the output of common crawl processing. Specific storage medium, output format, file or record name, is decided by the implementation.

func NewOutputWriter

func NewOutputWriter(
	fileWriter OutputFileWriter,
	outputFolder string,
	logger logrus.FieldLogger,
	shards int,
) *OutputWriter

NewOutputWriter returns a new instance of OutputWriter

* fileWriter - Write output files * outputFolder - Folder that will contain output files * logger - Log events * shards - Number of shards for output

func (*OutputWriter) WriteOutputData

func (writer *OutputWriter) WriteOutputData(outputSet string, data []OutputItem)

WriteOutputData Writes a segment of output data to a destination.

* data - Data that should be persisted.

type ParsedRecord

type ParsedRecord struct {
	WorkItemID string      // Name of the work item associated with the file
	SourceFile string      // Name of the file where the record originated
	Record     interface{} // Contents of the record
}

A ParsedRecord is the output of a Parser

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

A Parser is responsible for parsing raw file data into record objects.

func NewParser

func NewParser(
	reader RecordReader,
	fileInput <-chan FileDownload,
	recordOutput chan<- ParsedRecord,
	fileParsingListener chan<- string,
	fileParsedListener chan<- string,
	recordReadListener chan<- string,
	logger logrus.FieldLogger,
) *Parser

NewParser returns a new instance of Parser

* reader = Reads files into records * fileInput - Input channel of FileDownload instances * recordOutput - Output channel of record instances * fileParsingListener - Notified when a file begins parsing * fileParsedListener - Notified when a file is completely parsed * recordReadListener - Notified for each record that is read from the file * logger - Log events

func (*Parser) Run

func (parser *Parser) Run()

Run launches its own goroutine. It will begin pulling FileDownload objects from fileInput and sending parsed WARCRecord objects to recordOutput.

type Plugin

type Plugin interface {
	// Creates a new plugin instance with the desired configuration
	NewInstance(config *PluginConfiguration) PluginInstance
}

Plugin provides specific functionality that can utilize the commoncrawl processing framework

type PluginConfiguration

type PluginConfiguration struct {
	PluginName   string            // Indicate which plugin to use
	ConfigData   map[string]string // Custom configuration data for the plugin
	OutputSet    string            // Name of output data set - subfolder will be named after this
	OutputShards int               // Number of output shards for this plugin configuration
}

PluginConfiguration provides a way to customize the behavior of a plugin, output subfolder name, and number of shards.

type PluginInstance

type PluginInstance interface {
	// Mapper that will process map-phase records for this plugin
	Mapper() RecordMapper
	// Mapper that will process reduce-phase records for this plugin.
	// Reduce-phase records are json blobs of the output items produced
	// by the map-phase mapper.
	ReductionMapper() RecordMapper
	// Reducer to use for this plugin
	Reducer() OutputReducer
	// Filter for output before it is saved
	OutputFilter() OutputFilter
}

PluginInstance is a configured instance of a plugin that provides specific functionality that can utilize the commoncrawl processing framework

type ProcessStatus

type ProcessStatus struct {
	LastUpdate       time.Time // Last time the process received any kind of update
	RecordsParsed    int       // Total # of records parsed
	RecordsProcessed int       // Total # of records fully processed
	OutputsMapped    int       // Total # of outputs mapped
	OutputsReduced   int       // Total # of outputs reduced
	FilesInProcess   int       // # of files currently being processed
	FilesParsed      int       // Total # of files parsed
	OutputFilesSaved int       // Total # of output files that have been saved
	ProcessComplete  bool      // True if process is completed
}

ProcessStatus represents the current state of the common crawl process

type RecordMapper

type RecordMapper interface {
	// Maps a record into zero to many OutputItems.
	//
	// * record - A record object containing data about a webpage
	// * output - Each OutputItem that is sent to this callback will be reduced in the output.
	MapRecord(record interface{}, output func(OutputItem))
}

A RecordMapper is responsible for converting a record object into the desired output form.

The RecordMapper should be able to accept input for the configured RecordParser

type RecordOutput

type RecordOutput struct {
	WorkItemID string     // Name of the work item associated with the file
	SourceFile string     // Name of the file where the data originated
	OutputItem OutputItem // Output data
}

A RecordOutput wraps and OutputItem and refers to the source file

type RecordProcessor

type RecordProcessor struct {
	// contains filtered or unexported fields
}

A RecordProcessor is responsible for converting a stream of WARCRecord objects into a stream of output data.

func NewRecordProcessor

func NewRecordProcessor(
	mappers []RecordMapper,
	recordInput <-chan ParsedRecord,
	dataOutput chan<- RecordOutput,
	recordProcessedListener chan<- string,
	outputMappedListener chan<- string,
	logger logrus.FieldLogger,
) *RecordProcessor

NewRecordProcessor returns a new instance of RecordProcessor

* mappers - Map of data type to mapper. Used to convert each WARCRecord object into `OutputItems“ * recordInput - Input channel of record objects * dataOutput - Output channel of OutputItem objects * recordProcessedListener - Listener for completed records * outputMappedListener - Listener for mapped OutputItems * logger - Log events

func (*RecordProcessor) Run

func (processor *RecordProcessor) Run()

Run launches its own goroutine. It will begin pulling WARCRecord objects from recordInput, converting them to OutputItem objects, and sending each OutputItem object to dataOutput. Listeners will be notified when each record is processed.

type RecordReader

type RecordReader interface {
	// Reads a file and sends a stream of record objects to callback
	//
	// * file - The open file that should be read
	// * callback - Should be called for each record that is read
	Read(file io.ReadCloser, filename string, callback func(interface{}))
}

A RecordReader is responsible for outputting a stream of records from an open file.

type S3FileDownload

type S3FileDownload struct {
	// contains filtered or unexported fields
}

S3FileDownload represents a handle on a file in an S3 bucket

func (*S3FileDownload) Filename

func (fileDownload *S3FileDownload) Filename() string

Filename returns the name of the S3 file

func (*S3FileDownload) Open

func (fileDownload *S3FileDownload) Open() (io.ReadCloser, error)

Open gets a handle on an open data file from S3

func (*S3FileDownload) WorkItemID

func (fileDownload *S3FileDownload) WorkItemID() string

WorkItemID returns the work item associated with the file

type S3FileDownloadFactory

type S3FileDownloadFactory struct {
	// contains filtered or unexported fields
}

S3FileDownloadFactory creates S3FileDownload instances

func NewS3FileDownloadFactory

func NewS3FileDownloadFactory(regionName string, s3BucketName string, clock clockwork.Clock) *S3FileDownloadFactory

NewS3FileDownloadFactory returns a new instance of S3FileDownloadFactory

* regionName - AWS Region name of the S3 bucket * s3BucketName - S3 Input bucket name * clock - Used for exponential backoff

func (*S3FileDownloadFactory) NewFileDownload

func (factory *S3FileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload

NewFileDownload returns a new S3FileDownload object

* filename - Full path to the file to download

type S3OutputFileWriter

type S3OutputFileWriter struct {
	// contains filtered or unexported fields
}

S3OutputFileWriter implements the OutputFileWriter interface. Writes files to an S3 bucket

func NewS3OutputFileWriter

func NewS3OutputFileWriter(regionName string, s3BucketName string) *S3OutputFileWriter

NewS3OutputFileWriter returns a new instance of NewS3OutputFileWriter

* regionName - AWS region * s3BucketName - S3 output bucket name

func (*S3OutputFileWriter) WriteOutputFile

func (fileWriter *S3OutputFileWriter) WriteOutputFile(filepath string, data []byte) error

WriteOutputFile writes out the specified file to the local filesystem

type S3WorkItemReader

type S3WorkItemReader struct {
	// contains filtered or unexported fields
}

S3WorkItemReader implements the WorkItemReader interface for S3 source files.

func NewS3WorkItemReader

func NewS3WorkItemReader(regionName string, conf *TaskConfiguration) (*S3WorkItemReader, error)

NewS3WorkItemReader returns a new instance of S3WorkItemReader

* conf - AWSRegionName and S3InputBucket are required.

func (*S3WorkItemReader) ReadWorkItem

func (workItemReader *S3WorkItemReader) ReadWorkItem(message string) (*WorkItem, error)

ReadWorkItem parses the message into a `WorkItem` instance. It queries S3 by prefix in the message and sets the result list of keys as source files in the `WorkItem`

type SQSInput

type SQSInput struct {
	// contains filtered or unexported fields
}

SQSInput reads in a list of WARC file paths from a local file and pushes each file path to an output channel for downstream processing. Files that have already been marked as completed from previous checkpoints will not be sent.

func NewSQSInput

func NewSQSInput(
	regionName string,
	sqsQueueName string,
	logger logrus.FieldLogger,
	workItemReader WorkItemReader,
) *SQSInput

NewSQSInput returns a new instance of SQSInput

* regionName - Name of the AWS region where the queue is located * sqsQueueName - Name of SQS queue that should be polled for files * workItemsOutput - Output channel of work items * completedWorkItemsInput - Used to mark work items as completed so that they are not processed multiple times * abortedWorkItemsInput - Input channel of aborted work items * logger - Log events

func (*SQSInput) AbortedWorkItemsInput

func (input *SQSInput) AbortedWorkItemsInput() chan<- string

AbortedWorkItemsInput returns an input channel of aborted work item ids

func (*SQSInput) CompletedWorkItemsInput

func (input *SQSInput) CompletedWorkItemsInput() chan<- string

CompletedWorkItemsInput returns an input channel of completed work item ids

func (*SQSInput) Run

func (input *SQSInput) Run()

Run launches its own goroutine. It will read the list of file paths found in `indexFile` and emit each path to `filenamesOutput`.

func (*SQSInput) WorkItemsOutput

func (input *SQSInput) WorkItemsOutput() <-chan *WorkItem

WorkItemsOutput returns an output channel of WorkItem

type SingleInputSource

type SingleInputSource struct {
	// contains filtered or unexported fields
}

SingleInputSource implements the InputSource interface, and is designed to provide a single work item from SQS.

func NewSingleInputSource

func NewSingleInputSource(
	region string,
	conf *TaskConfiguration,
	logger logrus.FieldLogger,
) *SingleInputSource

NewSingleInputSource returns a new instance of `SingleInputSource`

* receiptHandle - Receipt handle from SQS message. This is used as the work item id. * filenames - List of files to process

func (*SingleInputSource) CompletedWorkItemsInput

func (input *SingleInputSource) CompletedWorkItemsInput() chan<- string

CompletedWorkItemsInput returns an input channel of completed work item ids

func (*SingleInputSource) Run

func (input *SingleInputSource) Run()

Run launches its own goroutine. Begins reading input and producing WorkItem instances for processing.

func (*SingleInputSource) WorkItemsOutput

func (input *SingleInputSource) WorkItemsOutput() <-chan *WorkItem

WorkItemsOutput returns an output channel of WorkItem

type StatusWriter

type StatusWriter interface {
	// WriteStatus sends a notification about the current state of the system.
	WriteStatus(status ProcessStatus)
}

A StatusWriter is responsible for conveying the current state of the system. The details of how this is done is left up to the implementation.

type TaskConfiguration

type TaskConfiguration struct {
	// How many threads are used for parsing data files
	DownloadPoolSize int
	// S3 bucket for input files. Necessary for pulling data from private buckets.
	InputBucket          string
	OutputBucket         string
	OutputFolder         string
	Phase                string
	InputFilename        string
	DatasetHost          string // Used to download non-commoncrawl data files through http
	PluginConfigurations []*PluginConfiguration
}

TaskConfiguration is the payload that is expected from SQS that instructs a common crawl lambda task on how to run.

type WARCReader

type WARCReader struct {
	// contains filtered or unexported fields
}

WARCReader implements the Reader interface. It will read an open file and output a stream of WARCRecord objects.

func NewWARCReader

func NewWARCReader(logger logrus.FieldLogger) *WARCReader

NewWARCReader returns a new instance of WARCReader

* logger - Log events

func (*WARCReader) Read

func (reader *WARCReader) Read(file io.ReadCloser, filename string, callback func(interface{}))

Reads a file and sends a stream of record objects to callback

* file - The open file that should be read * callback - Should be called for each record that is read

type WorkItem

type WorkItem struct {
	WorkItemID  string   // ID of the work item
	SourceFiles []string // Source files to be processed
}

A WorkItem is a single task for the process, and will have one to many source files that need to be processed.

type WorkItemReader

type WorkItemReader interface {
	// ReadWorkItem parses the message into a `WorkItem` instance
	ReadWorkItem(message string) (*WorkItem, error)
}

WorkItemReader parses raw messages from a queue and outputs `WorkItem` instances.

type WorkItemStatus

type WorkItemStatus struct {
	WorkItem       *WorkItem
	CompletedFiles []string
}

WorkItemStatus represents the current state of a work item

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL