Documentation ¶
Overview ¶
Package etl provides all major interfaces used across packages.
Index ¶
- Constants
- Variables
- func DirToTablename(dir string) string
- func GetFilename(filename string) (string, error)
- func GetIATACode(rawFilename string) string
- func GetIntFromIPv4(p4 net.IP) uint
- func GetIntFromIPv6Upper(p6 net.IP) uint64
- func IsBatchService() bool
- func NumberBitsDifferent(first string, second string) (int, int)
- type DataPath
- type DataType
- type Inserter
- type InserterParams
- type Parser
- type ProcessingError
- type RowStats
- type TestSource
- type Uploader
Constants ¶
const ( ANNOTATION = DataType("annotation") NDT = DataType("ndt") NDT5 = DataType("ndt5") NDT7 = DataType("ndt7") NDT_OMIT_DELTAS = DataType("ndt_nodelta") // to support larger buffer size. SS = DataType("sidestream") PT = DataType("traceroute") SW = DataType("switch") TCPINFO = DataType("tcpinfo") INVALID = DataType("invalid") )
These constants enumerate the different data types. TODO - use camelcase.
const BucketPattern = `gs://([^/]*)/`
BucketPattern is used to extract gsutil bucket name.
const DatePathPattern = `(\d{4}/[01]\d/[0123]\d)/`
DatePathPattern is used to extract the date directory part of the path, e.g. 2017/01/02
const ExpTypePattern = `(?:([a-z-]+)/)?([a-z0-9-]+)/` // experiment OR experiment/type.
ExpTypePattern is used to extract the experiment or experiment/type part of the path.
const MlabDomain = `measurement-lab.org`
MlabDomain is the DNS domain for all mlab servers.
const YYYYMMDD = `\d{4}[01]\d[0123]\d`
YYYYMMDD is a regexp string for identifying dense dates.
Variables ¶
var ( // ErrBufferFull is returned when an InsertBuffer is full. ErrBufferFull = errors.New("insert buffer is full") // ErrBadDataType is returned when a path does not have a valid datatype. ErrBadDataType = errors.New("unknown data type") )
Inserter related constants.
var ErrHighInsertionFailureRate = errors.New("too many insertion failures")
ErrHighInsertionFailureRate should be returned by TaskError when there are more than 10% BQ insertion errors.
var IsBatch bool
IsBatch indicates this process is a batch processing service.
var OmitDeltas bool
OmitDeltas indicates we should NOT process all snapshots.
Functions ¶
func DirToTablename ¶
DirToTablename translates gs dir to BQ tablename.
func GetFilename ¶
GetFilename converts request received from the queue into a filename. TODO(dev) Add unit test
func GetIATACode ¶
GetIATACode extracts iata code like "acc" from file name like 20170501T000000Z-mlab1-acc02-paris-traceroute-0000.tgz
func GetIntFromIPv4 ¶
GetIntFromIPv4 converts an IPv4 address to equivalent uint32.
func GetIntFromIPv6Upper ¶
GetIntFromIPv6Upper converts the upper 64 bits of an IPv6 address into uint64.
func IsBatchService ¶
func IsBatchService() bool
IsBatchService return true if this is a batch service.
Types ¶
type DataPath ¶
type DataPath struct { URI string // The full URI // These fields are from the bucket and path Bucket string // the GCS bucket name. ExpDir string // the experiment directory. DataType string // DatePath string // the YYYY/MM/DD date path. // The rest are from the filename PackedDate string // the YYYYMMDD date. PackedTime string // the HHMMSS time. DataType2 string // new platform also embeds the data type in the filename Host string // the short server name, e.g. mlab1. Site string // the pod/site name, e.g. ams02. Experiment string // the experiment name, e.g. ndt, typically identical to ExpDir FileNumber string // the file number, e.g. 0001 Embargo string // optional Suffix string // the archive suffix, e.g. .tgz }
DataPath breaks out the components of a task filename.
func ValidateTestPath ¶
ValidateTestPath validates a task filename.
func (DataPath) GetDataType ¶
GetDataType finds the type of data stored in a file from its complete filename
type DataType ¶
type DataType string
DataType identifies the type of data handled by a parser.
func (DataType) BQBufferSize ¶
BQBufferSize returns the appropriate BQ insert buffer size.
func (DataType) BigqueryProject ¶
BigqueryProject returns the appropriate project.
type Inserter ¶
type Inserter interface { // Put synchronously sends a slice of rows to BigQuery // This is THREADSAFE Put(rows []interface{}) error // PutAsync asynchronously sends a slice of rows to BigQuery. // It is THREADSAFE. // It may block if there is already a Put (or Flush) in progress. // To synchronize following PutAsync, call one of the stats functions, // e.g. Commited() or Failed() PutAsync(rows []interface{}) // InsertRow inserts one row into the insert buffer. // Deprecated: Please use AddRow and FlushAsync instead. InsertRow(data interface{}) error // InsertRows inserts multiple rows into the insert buffer. // Deprecated: Please use AddRow and FlushAsync instead. InsertRows(data []interface{}) error // Flush flushes any rows in the buffer out to bigquery. // This is synchronous - on return, rows should be committed. // Deprecated: Please use external buffer, Put, and PutAsync instead. Flush() error // Base Table name of the BQ table that the uploader pushes to. TableBase() string // Table name suffix of the BQ table that the uploader pushes to. TableSuffix() string // Full table name of the BQ table that the uploader pushes to, // including $YYYYMMNN, or _YYYYMMNN FullTableName() string // Dataset name of the BQ dataset containing the table. Dataset() string // Project name Project() string RowStats // Inserter must implement RowStats }
Inserter is a data sink that writes to BigQuery tables. Inserters should provide the invariants:
After Flush() returns, RowsInBuffer == 0
type InserterParams ¶
type InserterParams struct { // These specify the google cloud project:dataset.table to write to. Project string Dataset string Table string // Suffix may be table suffix _YYYYMMDD or partition $YYYYMMDD Suffix string // Table name suffix for templated tables or partitions. BufferSize int // Number of rows to buffer before writing to backend. PutTimeout time.Duration // max duration of bigquery Put ops. (for context) MaxRetryDelay time.Duration // Maximum backoff time for Put retries. }
InserterParams for NewInserter
type Parser ¶
type Parser interface { // IsParsable reports a canonical file "kind" and whether the file appears to // be parsable based on the name and content size. A true result does not // guarantee that ParseAndInsert will succeed, but a false result means that // ParseAndInsert can be skipped. IsParsable(testName string, test []byte) (string, bool) // meta - metadata, e.g. from the original tar file name. // testName - Name of test file (typically extracted from a tar file) // test - binary test data ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error // Flush flushes any pending rows. Flush() error // TableName of the table that this Parser inserts into. // Used for metrics and logging. TableName() string // FullTableName of the BQ table that the uploader pushes to, // including $YYYYMMNN, or _YYYYMMNN FullTableName() string // Task level error, based on failed rows, or any other criteria. TaskError() error RowStats // Parser must implement RowStats }
Parser is the generic interface implemented by each experiment parser.
type ProcessingError ¶
ProcessingError extends error to provide dataType and detail for metrics, and appropriate return codes for http handlers.
type RowStats ¶
type RowStats interface { // RowsInBuffer returns the count of rows currently in the buffer. RowsInBuffer() int // Committed returns the count of rows successfully committed to BQ. Committed() int // Accepted returns the count of all rows received through InsertRow(s) Accepted() int // Failed returns the count of all rows that could not be committed. Failed() int }
RowStats interface defines some useful Inserter stats that will also be implemented by Parser. RowStats implementations should provide the invariants:
Accepted == Failed + Committed + RowsInBuffer
type TestSource ¶
type TestSource interface { // NextTest reads the next test object from the tar file. // Skips reading contents of any file larger than maxSize, returning empty data // and storage.ErrOversizeFile. // Returns io.EOF when there are no more tests. NextTest(maxSize int64) (string, []byte, error) Close() error Detail() string // Detail for logs. Type() string // Data type for logs and metrics Date() civil.Date // Date associated with test source }
TestSource provides a source of test data.