merging

package
v0.0.0-...-8feb696 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2019 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleMergeRequest

func HandleMergeRequest(ctx context.Context, event events.SQSEvent) (string, error)

Types

type FileProcessor

type FileProcessor struct {
	// contains filtered or unexported fields
}

FileProcessor is a worker go routine that will process files out of the "filesToProcess" channel.

func MakeFileProcessor

func MakeFileProcessor(region string, filesToProcess <-chan *data.FileToProcessInfo, recordChannel chan<- *data.LogToProcess, returnToSQSChannel chan<- *data.FileToProcessInfo, fileProcessorsWG *sync.WaitGroup, recordTracker *RecordTracker) *FileProcessor

MakeFileProcessor creates a new FileProcessor and returns it.

func (*FileProcessor) Run

func (fp *FileProcessor) Run()

Run starts the FileProcessor loop, reading files off the channel

type RecordMerger

type RecordMerger struct {
	// contains filtered or unexported fields
}

RecordMerger merges as many records as possible into a single file over the period of time that it's processing. Once it has enough data to form a complete file it'll write that file to S3 and begin building a new file.

func MakeRecordMerger

func MakeRecordMerger(forHour int, deadline time.Time, writeToBucket string, writeToPath string, records <-chan *data.LogToProcess, recordTracker *RecordTracker) (*RecordMerger, *sync.WaitGroup)

MakeRecordMerger creates a RecordMerger worker and returns it.

func (*RecordMerger) Run

func (m *RecordMerger) Run()

Run starts the main processing loop

type RecordTracker

type RecordTracker struct {
	// contains filtered or unexported fields
}

func (*RecordTracker) GetRecord

func (t *RecordTracker) GetRecord(file *data.FileToProcessInfo) (*TrackRecord, error)

func (*RecordTracker) MarkFileAsDone

func (t *RecordTracker) MarkFileAsDone(file *data.FileToProcessInfo) (bool, error)

func (*RecordTracker) MarkFileAsFailed

func (t *RecordTracker) MarkFileAsFailed(file *data.FileToProcessInfo) (bool, error)

func (*RecordTracker) MarkFileAsProcessing

func (t *RecordTracker) MarkFileAsProcessing(file *data.FileToProcessInfo) int

MarkFileAsProcessing will create the tracking record (if needed), and mark it as processing. Returns an indicator of the success or failure of doing this.

Return:

-1 => Failure creating/updating the tracking record, this file should be attempted again later
0 => File already processed, this file should be ignored
1 => Record created/updated successfully, it is safe to process this file

type TrackRecord

type TrackRecord struct {
	ID        string    `json:"id"`
	State     int       `json:"state"`
	StartTime time.Time `json:"start"`
	Expiry    time.Time `json:"expiry"`
}

type WriterInfo

type WriterInfo struct {
	RecordMap       map[string]*data.LogToProcess
	Timestamp       time.Time
	NumRecords      int64
	WritableRecords int64
	// contains filtered or unexported fields
}

WriterInfo is used for the writer channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL