Documentation ¶
Index ¶
- Constants
- func CatchFatalError(logger logrus.FieldLogger) func()
- func CreateFileIfAbsent(filename string) error
- func LoadFileLines(filename string) ([]string, error)
- func SegmentOutputItems(items []OutputItem, maxSegmentSize int, callback func([]OutputItem))
- func SerializeOutputItemsInParallel(items []OutputItem, logger logrus.FieldLogger) [][]byte
- func StringSetToSlice(strings map[string]bool) []string
- func StringSliceToSet(strings []string) map[string]bool
- func WithRetries(clock clockwork.Clock, retries int, operation func() error) error
- func WriteFileLines(filename string, lines []string) error
- type BlobReader
- type Controller
- type ControllerConfiguration
- type DefaultWorkItemReader
- type Downloader
- type FileDownload
- type FileDownloadFactory
- type FileMockHTTPClient
- type FileStatus
- type FilesystemInput
- type FlushRequest
- type HTTPClient
- type HTTPFileDownload
- type HTTPFileDownloadFactory
- type InputSource
- type JSONReader
- type LambdaHandler
- type LocalFileDownload
- type LocalFileDownloadFactory
- type LocalOutputFileWriter
- type LogStatusWriter
- type MockHTTPClient
- type MockInputSource
- type MockOutputFileWriter
- type MockStatusWriter
- type Monitor
- type OutputFileWriter
- type OutputFilter
- type OutputItem
- type OutputItems
- type OutputReducer
- type OutputStore
- type OutputWriter
- type ParsedRecord
- type Parser
- type Plugin
- type PluginConfiguration
- type PluginInstance
- type ProcessStatus
- type RecordMapper
- type RecordOutput
- type RecordProcessor
- type RecordReader
- type S3FileDownload
- type S3FileDownloadFactory
- type S3OutputFileWriter
- type S3WorkItemReader
- type SQSInput
- type SingleInputSource
- type StatusWriter
- type TaskConfiguration
- type WARCReader
- type WorkItem
- type WorkItemReader
- type WorkItemStatus
Constants ¶
const ( // PhaseMap indicates that the common crawl processing is in map phase PhaseMap = "map" // PhaseReduce indicates that the common crawl processing is in reduce phase PhaseReduce = "reduce" )
const ( // PublicDatasetsPrefix is the source of all WARC files PublicDatasetsPrefix = "https://commoncrawl.s3.amazonaws.com/" // DownloadTimeout represents the time that the downloader will sit idle // before emitting a checkpoint signal. DownloadTimeout = time.Second * 10 // DownloadDelay is the interval between each file download // This is to make sure that file downloads are staggered so that // a somewhat steady stream of data flows out of the system // without huge bursts. DownloadDelay = time.Millisecond )
const ( // MonitorLogInterval is interval between each log message from the monitor MonitorLogInterval = time.Second * 10 // MonitorCompletedFilesOutputInterval - The amount of time between each check for completed work items // If the monitor detects completed files, an attempt is made to send them // through the completed work items output channel. If the send is blocked, // the monitor tries again after the next duration. MonitorCompletedFilesOutputInterval = time.Millisecond * 100 // ProcessStuckTimeout is the amount of time the monitor will wait for some update // before killing the process ProcessStuckTimeout = time.Minute * 10 // FileStuckTimeout is the amount of time the monitor will wait for some update to // a file before giving up on processing it. FileStuckTimeout = time.Minute * 5 // FileCompletionCooldown is the minimum time since last update before marking a file as completed FileCompletionCooldown = time.Second * 9 )
const (
// BufferSize is set to 1K. Certain channels that require a buffer will have one of this size.
BufferSize = 100
)
const (
// CompletedFilenamesPath is the path to the file containing the list of completed WARC files
CompletedFilenamesPath = "completed-files.txt"
)
const (
// DefaultOutputStoreCapacity is set to 100K. This is the initial capacity of the OutputStore.
DefaultOutputStoreCapacity = 100 * 1000
)
Variables ¶
This section is empty.
Functions ¶
func CatchFatalError ¶
func CatchFatalError(logger logrus.FieldLogger) func()
CatchFatalError returns a function that can be used to report panics. Once the error is reported, the process exits abnormally.
Usage: ``` defer commoncrawl.CatchFatalError(logger)() ```
func CreateFileIfAbsent ¶
CreateFileIfAbsent creates the specified file if it does not already exist.
* filename - Name of the file that should be created
func LoadFileLines ¶
LoadFileLines reads a file into a slice of strings Each line has whitespace trimmed.
* filename - Name of the file that should be loaded
func SegmentOutputItems ¶
func SegmentOutputItems(items []OutputItem, maxSegmentSize int, callback func([]OutputItem))
SegmentOutputItems will break up a slice of output items into multiple segments, invoking the callback for each segment. All segments will be at most maxSegmentSize.
* items - Input list of items to break into segments * maxSegmentSize - No segment should be larger than this * callback - Invoke for each segment
func SerializeOutputItemsInParallel ¶
func SerializeOutputItemsInParallel(items []OutputItem, logger logrus.FieldLogger) [][]byte
SerializeOutputItemsInParallel converts output items into JSON byte slices using every CPU available.
* items - The items that need to be serialized into json * logger - Log events
func StringSetToSlice ¶
StringSetToSlice converts a set of strings into a slice of strings
func StringSliceToSet ¶
StringSliceToSet converts a slice of strings into a set of strings
func WithRetries ¶
WithRetries will continue to attempt an operation up to the specified number of retries with an exponential backoff. If the operation is still unsuccessful after all retries are exhausted (operation returns an error), that error is returned.
func WriteFileLines ¶
WriteFileLines writes a slice of strings to a file Each string is written to the file, followed by a newline character.
* filename - Name of the destination file * lines - Slice of strings that should be written
Types ¶
type BlobReader ¶
type BlobReader struct {
// contains filtered or unexported fields
}
BlobReader implements the Reader interface. It will read an open file and output a stream of byte slices. It expects input files to be in gzip format as newline delimited text. Newline characters are not included in the output.
func NewBlobReader ¶
func NewBlobReader(logger logrus.FieldLogger) *BlobReader
NewBlobReader returns a new instance of BlobReader
* logger - Log events
func (*BlobReader) Read ¶
func (reader *BlobReader) Read(file io.ReadCloser, filename string, callback func(interface{}))
Reads a file and sends a stream of record objects to callback
* file - The open file that should be read * callback - Should be called for each record that is read
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
The Controller bootstraps the common crawl process
func NewController ¶
func NewController( conf *ControllerConfiguration, ) *Controller
NewController returns a new instance of `Controller`
* configuration - Contains all necessary dependencies for the process to function.
func (*Controller) Run ¶
func (controller *Controller) Run()
Run launches its own goroutine. It will continue running until an endOfInput signal is received, after which it will emit an endOfProcess signal and terminate.
type ControllerConfiguration ¶
type ControllerConfiguration struct { AWSRegion string // Required - AWS region to use for AWS services TaskConfiguration *TaskConfiguration // Required - Input channel of filenames to process InputSource InputSource // Required - Produces work items to be processed EndOfProcess chan<- bool // Required - Output channel for end of process signal RecordReader RecordReader // Required - Reads an open file into a stream of records Plugins map[string]Plugin // Plugins that will provide specific mappers and reducers for processing // TODO: configure output writer from plugin OutputFileWriter OutputFileWriter // Optional - Writer to save data to output files StatusWriter StatusWriter // Optional - Sends status notifications. HTTPClient HTTPClient // Optional - Client to make http calls to S3. Defaults to http.DefaultClient Clock clockwork.Clock // Optional - Pauses and checks the time. Defaults to an instance of RealClock Logger logrus.FieldLogger // Optional - passes in a logger that will be used by all components }
A ControllerConfiguration contains the necessary dependencies that a Controller will need to function. Some dependencies are optional and have default values.
type DefaultWorkItemReader ¶
type DefaultWorkItemReader struct{}
DefaultWorkItemReader implements the WorkItemReader interface. WorkItems are produced with a WorkItemId and a single source file that are the same as the input message.
func NewDefaultWorkItemReader ¶
func NewDefaultWorkItemReader() *DefaultWorkItemReader
NewDefaultWorkItemReader returns a new instance of DefaultWorkItemReader
func (*DefaultWorkItemReader) ReadWorkItem ¶
func (workItemReader *DefaultWorkItemReader) ReadWorkItem(message string) (*WorkItem, error)
ReadWorkItem parses the message into a `WorkItem` instance.
type Downloader ¶
type Downloader struct {
// contains filtered or unexported fields
}
Downloader coordinates WARC file downloads
func NewDownloader ¶
func NewDownloader( fileDownloadFactory FileDownloadFactory, clock clockwork.Clock, workItemsInput <-chan *WorkItem, workItemReceivedListener chan<- *WorkItem, downloadOutput chan<- FileDownload, logger logrus.FieldLogger, ) *Downloader
NewDownloader returns a new instance of Downloader
* fileDownloadFactory - Creates FileDownload instances * clock - Used for expoential backoff * checkpointSize - The number of files to emit before emitting a checkpoint signal * filenamesInput - Input channel of file paths to download * downloadOutput - Output channel of `FileDownload` instances * logger - Log events
func (*Downloader) Run ¶
func (downloader *Downloader) Run()
Run launches its own goroutine. It will begin pulling filenames from filenamesInput and emitting FileDownload objects from downloadOutput. Once a checkpoint is reached, a checkpoint signal will be emitted from checkpointsOutput. The value of the checkpoint signal will be the number of files processed since the last checkpoint. This may be less than the checkpoint size if the downloader has stopped receiving file names from filenamesInput.
type FileDownload ¶
type FileDownload interface { // Open gets a handle on an open data file Open() (io.ReadCloser, error) // Filename returns the name of the data file Filename() string // WorkItemID returns the work item associated with the file WorkItemID() string }
FileDownload represents a data file that can be downloaded
type FileDownloadFactory ¶
type FileDownloadFactory interface { // NewFileDownload returns a new FileDownload object // // * workItemID - Work item associated with the file // * filename - Full path to the file to download NewFileDownload(workItemID string, filename string) FileDownload }
A FileDownloadFactory creates FileDownload instances
func NewFileDownloadFactory ¶
func NewFileDownloadFactory(region string, conf *TaskConfiguration, clock clockwork.Clock, httpClient HTTPClient, logger logrus.FieldLogger) FileDownloadFactory
NewFileDownloadFactory returns a new FileDownloadFactory based on configuration
type FileMockHTTPClient ¶
FileMockHTTPClient emulates an http client by reading files out of testdata/
func (*FileMockHTTPClient) Get ¶
func (client *FileMockHTTPClient) Get(url string) (*http.Response, error)
Get gets the file name from the specified URL and opens it as a file from the testdata directory. The only populated field of the returned Response is the Body, which is a handle on the open file.
type FileStatus ¶
type FileStatus struct { LastUpdate time.Time RecordsParsed int RecordsProcessed int OutputsMapped int OutputsReduced int EndOfFile bool }
FileStatus represents the current state of an individual file that is being processed.
type FilesystemInput ¶
type FilesystemInput struct {
// contains filtered or unexported fields
}
FilesystemInput reads in a list of WARC file paths from a local file and pushes each file path to an output channel for downstream processing. Files that have already been marked as completed from previous checkpoints will not be sent.
func NewFilesystemInput ¶
func NewFilesystemInput(indexFile string, workItemReader WorkItemReader, logger logrus.FieldLogger) *FilesystemInput
NewFilesystemInput returns a new instance of FilesystemInput
* filenamesOutput Output channel of WARC file paths
func (*FilesystemInput) AbortedWorkItemsInput ¶
func (input *FilesystemInput) AbortedWorkItemsInput() chan<- string
AbortedWorkItemsInput returns an input channel of aborted work item ids
func (*FilesystemInput) CompletedWorkItemsInput ¶
func (input *FilesystemInput) CompletedWorkItemsInput() chan<- string
CompletedWorkItemsInput returns an input channel of completed work item ids
func (*FilesystemInput) Run ¶
func (input *FilesystemInput) Run()
Run launches its own goroutine. It will read the list of file paths found in `indexFile` and emit each path to `filenamesOutput`.
func (*FilesystemInput) WorkItemsOutput ¶
func (input *FilesystemInput) WorkItemsOutput() <-chan *WorkItem
WorkItemsOutput returns an output channel of WorkItem
type FlushRequest ¶
type FlushRequest struct { WorkItemID string // ID of work item data to flush Callback chan<- map[string]map[string]OutputItem // Callback for flushed data }
FlushRequest represents a request to flush the OutputStore
type HTTPClient ¶
type HTTPClient interface { // Get issues a GET to the specified URL Get(url string) (*http.Response, error) }
HTTPClient is a stripped-down interface for fetching files over http. `http.DefaultClient` satisfies this interface.
type HTTPFileDownload ¶
type HTTPFileDownload struct {
// contains filtered or unexported fields
}
HTTPFileDownload represents a connection to a WARC file in the common crawl public dataset
func (*HTTPFileDownload) Filename ¶
func (fileDownload *HTTPFileDownload) Filename() string
Filename returns the name of the WARC file
func (*HTTPFileDownload) Open ¶
func (fileDownload *HTTPFileDownload) Open() (io.ReadCloser, error)
Open gets a handle on an open WARC file from S3
func (*HTTPFileDownload) WorkItemID ¶
func (fileDownload *HTTPFileDownload) WorkItemID() string
WorkItemID returns the work item associated with the file
type HTTPFileDownloadFactory ¶
type HTTPFileDownloadFactory struct {
// contains filtered or unexported fields
}
HTTPFileDownloadFactory creates HTTPFileDownload instances
func NewHTTPFileDownloadFactory ¶
func NewHTTPFileDownloadFactory(datasetHost string, httpClient HTTPClient, clock clockwork.Clock) *HTTPFileDownloadFactory
NewHTTPFileDownloadFactory returns a new instance of HTTPFileDownloadFactory
* httpClient - HTTP client that will be used to download * clock - Used for exponential backoff
func (*HTTPFileDownloadFactory) NewFileDownload ¶
func (factory *HTTPFileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload
NewFileDownload returns a new HTTPFileDownload object
* filename - Full path to the file to download
type InputSource ¶
type InputSource interface { // Run launches its own goroutine. // Begins reading input and producing WorkItem instances for processing. Run() // WorkItemsOutput returns an output channel of WorkItem WorkItemsOutput() <-chan *WorkItem // CompletedWorkItemsInput returns an input channel of completed work item ids CompletedWorkItemsInput() chan<- string }
An InputSource represents the source of work items to be processed
type JSONReader ¶
type JSONReader struct { *BlobReader // contains filtered or unexported fields }
JSONReader implements the Reader interface. It will read an open file and output a stream of records. It is designed to capture the output of the default OutputWriter implementations that are provided for writing data extracted from the common crawl. This allows for additional processing to be done on extracted data.
func NewJSONReader ¶
func NewJSONReader(recordFactory func() interface{}, logger logrus.FieldLogger) *JSONReader
NewJSONReader returns a new instance of JSONReader
* logger - Log events
func (*JSONReader) Read ¶
func (reader *JSONReader) Read(file io.ReadCloser, filename string, callback func(interface{}))
Reads a file and sends a stream of record objects to callback
* file - The open file that should be read * callback - Should be called for each record that is read
type LambdaHandler ¶
type LambdaHandler struct {
// contains filtered or unexported fields
}
LambdaHandler provides functionality to pull tasks off of an SQS queue that will configure a common crawl lambda task
func NewLambdaHandler ¶
func NewLambdaHandler(task func(*TaskConfiguration), logger logrus.FieldLogger) *LambdaHandler
NewLambdaHandler returns a new instance of `LambdaHandler`
func (*LambdaHandler) Handle ¶
func (handler *LambdaHandler) Handle() error
Handle pulls a message off of SQS and parses it into a `TaskConfiguration` struct
type LocalFileDownload ¶
type LocalFileDownload struct {
// contains filtered or unexported fields
}
LocalFileDownload points to a source file in the local file system
func (*LocalFileDownload) Filename ¶
func (fileDownload *LocalFileDownload) Filename() string
Filename returns the name of the file
func (*LocalFileDownload) Open ¶
func (fileDownload *LocalFileDownload) Open() (io.ReadCloser, error)
Open gets a handle on an open data file
func (*LocalFileDownload) WorkItemID ¶
func (fileDownload *LocalFileDownload) WorkItemID() string
WorkItemID returns the work item associated with the file
type LocalFileDownloadFactory ¶
type LocalFileDownloadFactory struct { }
LocalFileDownloadFactory creates LocalFileDownload instances
func NewLocalFileDownloadFactory ¶
func NewLocalFileDownloadFactory() *LocalFileDownloadFactory
NewLocalFileDownloadFactory returns a new instance of LocalFileDownloadFactory
func (*LocalFileDownloadFactory) NewFileDownload ¶
func (factory *LocalFileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload
NewFileDownload returns a new LocalFileDownload object
* workItemID - the associated work item * filename - Full path to the file to download
type LocalOutputFileWriter ¶
type LocalOutputFileWriter struct{}
LocalOutputFileWriter implements the OutputFileWriter interface. Writes files to the local file system
func (*LocalOutputFileWriter) WriteOutputFile ¶
func (fileWriter *LocalOutputFileWriter) WriteOutputFile(filepath string, data []byte) error
WriteOutputFile writes out the specified file to the local filesystem
type LogStatusWriter ¶
type LogStatusWriter struct {
// contains filtered or unexported fields
}
LogStatusWriter sends status notifications to the console
func NewLogStatusWriter ¶
func NewLogStatusWriter(logger logrus.FieldLogger) *LogStatusWriter
NewLogStatusWriter returns a new instance of LogStatusWriter
func (*LogStatusWriter) WriteStatus ¶
func (writer *LogStatusWriter) WriteStatus(status ProcessStatus)
WriteStatus sends a notification about the current state of the system to the console.
type MockHTTPClient ¶
MockHTTPClient is an autogenerated mock type for the HTTPClient type
type MockInputSource ¶
MockInputSource is an autogenerated mock type for the InputSource type
func (*MockInputSource) AbortedWorkItemsInput ¶
func (_m *MockInputSource) AbortedWorkItemsInput() chan<- string
AbortedWorkItemsInput provides a mock function with given fields:
func (*MockInputSource) CompletedWorkItemsInput ¶
func (_m *MockInputSource) CompletedWorkItemsInput() chan<- string
CompletedWorkItemsInput provides a mock function with given fields:
func (*MockInputSource) Run ¶
func (_m *MockInputSource) Run()
Run provides a mock function with given fields:
func (*MockInputSource) WorkItemsOutput ¶
func (_m *MockInputSource) WorkItemsOutput() <-chan *WorkItem
WorkItemsOutput provides a mock function with given fields:
type MockOutputFileWriter ¶
MockOutputFileWriter is an autogenerated mock type for the OutputFileWriter type
func (*MockOutputFileWriter) WriteOutputFile ¶
func (_m *MockOutputFileWriter) WriteOutputFile(filepath string, data []byte) error
WriteOutputFile provides a mock function with given fields: filepath, data
type MockStatusWriter ¶
MockStatusWriter is an autogenerated mock type for the StatusWriter type
func (*MockStatusWriter) GetStatusCalls ¶
func (_m *MockStatusWriter) GetStatusCalls() []ProcessStatus
GetStatusCalls returns a list of all ProcessStatus objects passed into WriteStatus
func (*MockStatusWriter) WriteStatus ¶
func (_m *MockStatusWriter) WriteStatus(status ProcessStatus)
WriteStatus provides a mock function with given fields: status
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
The Monitor receives signals from other parts in the system, prints out aggregated metrics at a regular interval, and emits a signal when the system is ready to perform a save of collected data.
func NewMonitor ¶
func NewMonitor( statusWriter StatusWriter, clock clockwork.Clock, recordsParsedInput <-chan string, recordsProcessedInput <-chan string, outputsMappedInput <-chan string, outputsReducedInput <-chan string, workItemReceivedInput <-chan *WorkItem, filesParsingInput <-chan string, filesParsedInput <-chan string, completedWorkItemsOutput chan<- string, endOfInput chan<- bool, logger logrus.FieldLogger, ) *Monitor
NewMonitor returns a new instance of Monitor
* statusWriter - Sends status notifications * clock - Emits events at an interval * checkpointSize - Maximum number of files to complete before saving * recordsParsedInput - Input channel of parsed records count * recordsProcessedInput - Input channel of processed records count * outputsMappedInput - Input channel of mapped outputs * outputsReducedInput - Input channel of reduced outputs * workItemReceivedInput - Input channel of work items * filesParsingInput - Input channel of files being parsed * filesParsedInput - Input channel of files fully parsed * checkpointFilesOutput - Output channel of files completed since last checkpoint * endOfInput - Output channel indicating end of input (process complete) * logger - Log events
type OutputFileWriter ¶
type OutputFileWriter interface { // Write out the specified file WriteOutputFile(filepath string, data []byte) error }
OutputFileWriter writes specific output files.
func NewOutputFileWriter ¶
func NewOutputFileWriter(region string, conf *TaskConfiguration) OutputFileWriter
NewOutputFileWriter returns a new instance of OutputFileWriter based on configuration
type OutputFilter ¶
type OutputFilter interface { // TestOutputItem returns true if the output item should be saved TestOutputItem(item OutputItem) bool }
An OutputFilter can modify the stream of output items before they are processed by the OutputWriter
type OutputItem ¶
type OutputItem interface { // OutputSet indicates the output folder for the data OutputSet() string // Key is the unique identifier for this output. Key() string // Value is the data contained by the output. This can be the same object as the OutputItem. Value() interface{} }
OutputItem represents output data produced when processing a `WARCRecord`.
type OutputItems ¶
type OutputItems []OutputItem
OutputItems implements sort.Interface for []OutputItem based on the Key() return value.
func (OutputItems) Len ¶
func (a OutputItems) Len() int
func (OutputItems) Less ¶
func (a OutputItems) Less(i, j int) bool
func (OutputItems) Swap ¶
func (a OutputItems) Swap(i, j int)
type OutputReducer ¶
type OutputReducer interface { // Reduce merges two OutputItems with the same key into a single item. // The details of how the merge is performed is dependent on the implementation. Reduce(existingItem OutputItem, newItem OutputItem) OutputItem }
An OutputReducer is responsible for reducing the values of OutputItems with the same key into a single OutputItem.
type OutputStore ¶
type OutputStore struct {
// contains filtered or unexported fields
}
The OutputStore accumulates data that is output by the processor pool
Each OutputItem that is sent to the OutputStore is accumulated by unique key returned by the Key() method. Upon receiving a flush signal, the OutputStore will send the current data set and clear it, resetting the OutputStore to its initial state.
func NewOutputStore ¶
func NewOutputStore( reducers map[string]OutputReducer, dataInput <-chan RecordOutput, flushSignal <-chan FlushRequest, outputReducedOutput chan<- string, stuckFileInput <-chan string, logger logrus.FieldLogger, ) *OutputStore
NewOutputStore returns a new instance of OutputStore
* reducer - Used for reducing OutputItems * dataInput - Input channel of OutputItems * flushSignal - InputChannel of flush signals * outputReducedOutput - Output channel of reduced outputs
func (*OutputStore) Run ¶
func (store *OutputStore) Run()
Run launches its own goroutine. The store will listen to both input channels. When it receives OutputItems from the dataInput channel, it will use the reducer to combine it with any existing OutputItem and save the result, or simply save the OutputItem if no previous entry exists. Upon receiving a flush signal, the entire data set will be returned on the provided channel, and a new empty dataset will replace it in the store.
type OutputWriter ¶
type OutputWriter struct {
// contains filtered or unexported fields
}
The OutputWriter is responsible for persisting the output of common crawl processing. Specific storage medium, output format, file or record name, is decided by the implementation.
func NewOutputWriter ¶
func NewOutputWriter( fileWriter OutputFileWriter, outputFolder string, logger logrus.FieldLogger, shards int, ) *OutputWriter
NewOutputWriter returns a new instance of OutputWriter
* fileWriter - Write output files * outputFolder - Folder that will contain output files * logger - Log events * shards - Number of shards for output
func (*OutputWriter) WriteOutputData ¶
func (writer *OutputWriter) WriteOutputData(outputSet string, data []OutputItem)
WriteOutputData Writes a segment of output data to a destination.
* data - Data that should be persisted.
type ParsedRecord ¶
type ParsedRecord struct { WorkItemID string // Name of the work item associated with the file SourceFile string // Name of the file where the record originated Record interface{} // Contents of the record }
A ParsedRecord is the output of a Parser
type Parser ¶
type Parser struct {
// contains filtered or unexported fields
}
A Parser is responsible for parsing raw file data into record objects.
func NewParser ¶
func NewParser( reader RecordReader, fileInput <-chan FileDownload, recordOutput chan<- ParsedRecord, fileParsingListener chan<- string, fileParsedListener chan<- string, recordReadListener chan<- string, logger logrus.FieldLogger, ) *Parser
NewParser returns a new instance of Parser
* reader = Reads files into records * fileInput - Input channel of FileDownload instances * recordOutput - Output channel of record instances * fileParsingListener - Notified when a file begins parsing * fileParsedListener - Notified when a file is completely parsed * recordReadListener - Notified for each record that is read from the file * logger - Log events
type Plugin ¶
type Plugin interface { // Creates a new plugin instance with the desired configuration NewInstance(config *PluginConfiguration) PluginInstance }
Plugin provides specific functionality that can utilize the commoncrawl processing framework
type PluginConfiguration ¶
type PluginConfiguration struct { PluginName string // Indicate which plugin to use ConfigData map[string]string // Custom configuration data for the plugin OutputSet string // Name of output data set - subfolder will be named after this OutputShards int // Number of output shards for this plugin configuration }
PluginConfiguration provides a way to customize the behavior of a plugin, output subfolder name, and number of shards.
type PluginInstance ¶
type PluginInstance interface { // Mapper that will process map-phase records for this plugin Mapper() RecordMapper // Mapper that will process reduce-phase records for this plugin. // Reduce-phase records are json blobs of the output items produced // by the map-phase mapper. ReductionMapper() RecordMapper // Reducer to use for this plugin Reducer() OutputReducer // Filter for output before it is saved OutputFilter() OutputFilter }
PluginInstance is a configured instance of a plugin that provides specific functionality that can utilize the commoncrawl processing framework
type ProcessStatus ¶
type ProcessStatus struct { LastUpdate time.Time // Last time the process received any kind of update RecordsParsed int // Total # of records parsed RecordsProcessed int // Total # of records fully processed OutputsMapped int // Total # of outputs mapped OutputsReduced int // Total # of outputs reduced FilesInProcess int // # of files currently being processed FilesParsed int // Total # of files parsed OutputFilesSaved int // Total # of output files that have been saved ProcessComplete bool // True if process is completed }
ProcessStatus represents the current state of the common crawl process
type RecordMapper ¶
type RecordMapper interface { // Maps a record into zero to many OutputItems. // // * record - A record object containing data about a webpage // * output - Each OutputItem that is sent to this callback will be reduced in the output. MapRecord(record interface{}, output func(OutputItem)) }
A RecordMapper is responsible for converting a record object into the desired output form.
The RecordMapper should be able to accept input for the configured RecordParser
type RecordOutput ¶
type RecordOutput struct { WorkItemID string // Name of the work item associated with the file SourceFile string // Name of the file where the data originated OutputItem OutputItem // Output data }
A RecordOutput wraps and OutputItem and refers to the source file
type RecordProcessor ¶
type RecordProcessor struct {
// contains filtered or unexported fields
}
A RecordProcessor is responsible for converting a stream of WARCRecord objects into a stream of output data.
func NewRecordProcessor ¶
func NewRecordProcessor( mappers []RecordMapper, recordInput <-chan ParsedRecord, dataOutput chan<- RecordOutput, recordProcessedListener chan<- string, outputMappedListener chan<- string, logger logrus.FieldLogger, ) *RecordProcessor
NewRecordProcessor returns a new instance of RecordProcessor
* mappers - Map of data type to mapper. Used to convert each WARCRecord object into `OutputItems“ * recordInput - Input channel of record objects * dataOutput - Output channel of OutputItem objects * recordProcessedListener - Listener for completed records * outputMappedListener - Listener for mapped OutputItems * logger - Log events
func (*RecordProcessor) Run ¶
func (processor *RecordProcessor) Run()
Run launches its own goroutine. It will begin pulling WARCRecord objects from recordInput, converting them to OutputItem objects, and sending each OutputItem object to dataOutput. Listeners will be notified when each record is processed.
type RecordReader ¶
type RecordReader interface { // Reads a file and sends a stream of record objects to callback // // * file - The open file that should be read // * callback - Should be called for each record that is read Read(file io.ReadCloser, filename string, callback func(interface{})) }
A RecordReader is responsible for outputting a stream of records from an open file.
type S3FileDownload ¶
type S3FileDownload struct {
// contains filtered or unexported fields
}
S3FileDownload represents a handle on a file in an S3 bucket
func (*S3FileDownload) Filename ¶
func (fileDownload *S3FileDownload) Filename() string
Filename returns the name of the S3 file
func (*S3FileDownload) Open ¶
func (fileDownload *S3FileDownload) Open() (io.ReadCloser, error)
Open gets a handle on an open data file from S3
func (*S3FileDownload) WorkItemID ¶
func (fileDownload *S3FileDownload) WorkItemID() string
WorkItemID returns the work item associated with the file
type S3FileDownloadFactory ¶
type S3FileDownloadFactory struct {
// contains filtered or unexported fields
}
S3FileDownloadFactory creates S3FileDownload instances
func NewS3FileDownloadFactory ¶
func NewS3FileDownloadFactory(regionName string, s3BucketName string, clock clockwork.Clock) *S3FileDownloadFactory
NewS3FileDownloadFactory returns a new instance of S3FileDownloadFactory
* regionName - AWS Region name of the S3 bucket * s3BucketName - S3 Input bucket name * clock - Used for exponential backoff
func (*S3FileDownloadFactory) NewFileDownload ¶
func (factory *S3FileDownloadFactory) NewFileDownload(workItemID string, filename string) FileDownload
NewFileDownload returns a new S3FileDownload object
* filename - Full path to the file to download
type S3OutputFileWriter ¶
type S3OutputFileWriter struct {
// contains filtered or unexported fields
}
S3OutputFileWriter implements the OutputFileWriter interface. Writes files to an S3 bucket
func NewS3OutputFileWriter ¶
func NewS3OutputFileWriter(regionName string, s3BucketName string) *S3OutputFileWriter
NewS3OutputFileWriter returns a new instance of NewS3OutputFileWriter
* regionName - AWS region * s3BucketName - S3 output bucket name
func (*S3OutputFileWriter) WriteOutputFile ¶
func (fileWriter *S3OutputFileWriter) WriteOutputFile(filepath string, data []byte) error
WriteOutputFile writes out the specified file to the local filesystem
type S3WorkItemReader ¶
type S3WorkItemReader struct {
// contains filtered or unexported fields
}
S3WorkItemReader implements the WorkItemReader interface for S3 source files.
func NewS3WorkItemReader ¶
func NewS3WorkItemReader(regionName string, conf *TaskConfiguration) (*S3WorkItemReader, error)
NewS3WorkItemReader returns a new instance of S3WorkItemReader
* conf - AWSRegionName and S3InputBucket are required.
func (*S3WorkItemReader) ReadWorkItem ¶
func (workItemReader *S3WorkItemReader) ReadWorkItem(message string) (*WorkItem, error)
ReadWorkItem parses the message into a `WorkItem` instance. It queries S3 by prefix in the message and sets the result list of keys as source files in the `WorkItem`
type SQSInput ¶
type SQSInput struct {
// contains filtered or unexported fields
}
SQSInput reads in a list of WARC file paths from a local file and pushes each file path to an output channel for downstream processing. Files that have already been marked as completed from previous checkpoints will not be sent.
func NewSQSInput ¶
func NewSQSInput( regionName string, sqsQueueName string, logger logrus.FieldLogger, workItemReader WorkItemReader, ) *SQSInput
NewSQSInput returns a new instance of SQSInput
* regionName - Name of the AWS region where the queue is located * sqsQueueName - Name of SQS queue that should be polled for files * workItemsOutput - Output channel of work items * completedWorkItemsInput - Used to mark work items as completed so that they are not processed multiple times * abortedWorkItemsInput - Input channel of aborted work items * logger - Log events
func (*SQSInput) AbortedWorkItemsInput ¶
AbortedWorkItemsInput returns an input channel of aborted work item ids
func (*SQSInput) CompletedWorkItemsInput ¶
CompletedWorkItemsInput returns an input channel of completed work item ids
func (*SQSInput) Run ¶
func (input *SQSInput) Run()
Run launches its own goroutine. It will read the list of file paths found in `indexFile` and emit each path to `filenamesOutput`.
func (*SQSInput) WorkItemsOutput ¶
WorkItemsOutput returns an output channel of WorkItem
type SingleInputSource ¶
type SingleInputSource struct {
// contains filtered or unexported fields
}
SingleInputSource implements the InputSource interface, and is designed to provide a single work item from SQS.
func NewSingleInputSource ¶
func NewSingleInputSource( region string, conf *TaskConfiguration, logger logrus.FieldLogger, ) *SingleInputSource
NewSingleInputSource returns a new instance of `SingleInputSource`
* receiptHandle - Receipt handle from SQS message. This is used as the work item id. * filenames - List of files to process
func (*SingleInputSource) CompletedWorkItemsInput ¶
func (input *SingleInputSource) CompletedWorkItemsInput() chan<- string
CompletedWorkItemsInput returns an input channel of completed work item ids
func (*SingleInputSource) Run ¶
func (input *SingleInputSource) Run()
Run launches its own goroutine. Begins reading input and producing WorkItem instances for processing.
func (*SingleInputSource) WorkItemsOutput ¶
func (input *SingleInputSource) WorkItemsOutput() <-chan *WorkItem
WorkItemsOutput returns an output channel of WorkItem
type StatusWriter ¶
type StatusWriter interface { // WriteStatus sends a notification about the current state of the system. WriteStatus(status ProcessStatus) }
A StatusWriter is responsible for conveying the current state of the system. The details of how this is done is left up to the implementation.
type TaskConfiguration ¶
type TaskConfiguration struct { // How many threads are used for parsing data files DownloadPoolSize int // S3 bucket for input files. Necessary for pulling data from private buckets. InputBucket string OutputBucket string OutputFolder string Phase string InputFilename string DatasetHost string // Used to download non-commoncrawl data files through http PluginConfigurations []*PluginConfiguration }
TaskConfiguration is the payload that is expected from SQS that instructs a common crawl lambda task on how to run.
type WARCReader ¶
type WARCReader struct {
// contains filtered or unexported fields
}
WARCReader implements the Reader interface. It will read an open file and output a stream of WARCRecord objects.
func NewWARCReader ¶
func NewWARCReader(logger logrus.FieldLogger) *WARCReader
NewWARCReader returns a new instance of WARCReader
* logger - Log events
func (*WARCReader) Read ¶
func (reader *WARCReader) Read(file io.ReadCloser, filename string, callback func(interface{}))
Reads a file and sends a stream of record objects to callback
* file - The open file that should be read * callback - Should be called for each record that is read
type WorkItem ¶
type WorkItem struct { WorkItemID string // ID of the work item SourceFiles []string // Source files to be processed }
A WorkItem is a single task for the process, and will have one to many source files that need to be processed.
type WorkItemReader ¶
type WorkItemReader interface { // ReadWorkItem parses the message into a `WorkItem` instance ReadWorkItem(message string) (*WorkItem, error) }
WorkItemReader parses raw messages from a queue and outputs `WorkItem` instances.
type WorkItemStatus ¶
WorkItemStatus represents the current state of a work item
Source Files ¶
- blob_reader.go
- configuration.go
- controller.go
- download_factory.go
- downloader.go
- filesystem_input.go
- input.go
- json_reader.go
- lambda_handler.go
- local_output_file_writer.go
- log_status_writer.go
- mock_HTTPClient.go
- mock_InputSource.go
- mock_OutputFileWriter.go
- mock_StatusWriter.go
- monitor.go
- output.go
- output_writer.go
- parser.go
- plugin.go
- processor.go
- s3_output_file_writer.go
- singleinput.go
- sqs_input.go
- utils.go
- warc_reader.go
- work_items.go