etl

package
v2.4.3+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2020 License: Apache-2.0 Imports: 14 Imported by: 1

Documentation

Overview

Package etl provides all major interfaces used across packages.

Index

Constants

View Source
const (
	ANNOTATION      = DataType("annotation")
	NDT             = DataType("ndt")
	NDT5            = DataType("ndt5")
	NDT7            = DataType("ndt7")
	NDT_OMIT_DELTAS = DataType("ndt_nodelta") // to support larger buffer size.
	SS              = DataType("sidestream")
	PT              = DataType("traceroute")
	SW              = DataType("switch")
	TCPINFO         = DataType("tcpinfo")
	INVALID         = DataType("invalid")
)

These constants enumerate the different data types. TODO - use camelcase.

View Source
const BucketPattern = `gs://([^/]*)/`

BucketPattern is used to extract gsutil bucket name.

View Source
const DatePathPattern = `(\d{4}/[01]\d/[0123]\d)/`

DatePathPattern is used to extract the date directory part of the path, e.g. 2017/01/02

View Source
const ExpTypePattern = `(?:([a-z-]+)/)?([a-z0-9-]+)/` // experiment OR experiment/type.

ExpTypePattern is used to extract the experiment or experiment/type part of the path.

View Source
const MlabDomain = `measurement-lab.org`

MlabDomain is the DNS domain for all mlab servers.

View Source
const YYYYMMDD = `\d{4}[01]\d[0123]\d`

YYYYMMDD is a regexp string for identifying dense dates.

Variables

View Source
var (
	// ErrBufferFull is returned when an InsertBuffer is full.
	ErrBufferFull = errors.New("insert buffer is full")

	// ErrBadDataType is returned when a path does not have a valid datatype.
	ErrBadDataType = errors.New("unknown data type")
)

Inserter related constants.

View Source
var ErrHighInsertionFailureRate = errors.New("too many insertion failures")

ErrHighInsertionFailureRate should be returned by TaskError when there are more than 10% BQ insertion errors.

View Source
var IsBatch bool

IsBatch indicates this process is a batch processing service.

View Source
var OmitDeltas bool

OmitDeltas indicates we should NOT process all snapshots.

Functions

func DirToTablename

func DirToTablename(dir string) string

DirToTablename translates gs dir to BQ tablename.

func GetFilename

func GetFilename(filename string) (string, error)

GetFilename converts request received from the queue into a filename. TODO(dev) Add unit test

func GetIATACode

func GetIATACode(rawFilename string) string

GetIATACode extracts iata code like "acc" from file name like 20170501T000000Z-mlab1-acc02-paris-traceroute-0000.tgz

func GetIntFromIPv4

func GetIntFromIPv4(p4 net.IP) uint

GetIntFromIPv4 converts an IPv4 address to equivalent uint32.

func GetIntFromIPv6Upper

func GetIntFromIPv6Upper(p6 net.IP) uint64

GetIntFromIPv6Upper converts the upper 64 bits of an IPv6 address into uint64.

func IsBatchService

func IsBatchService() bool

IsBatchService return true if this is a batch service.

func NumberBitsDifferent

func NumberBitsDifferent(first string, second string) (int, int)

NumberBitsDifferent computes how many trailing bits differ between two IP addresses. The second returned number is 4 for IP_v4, 6 for IP_v6, and 0 for invalid input.

Types

type DataPath

type DataPath struct {
	URI string // The full URI

	// These fields are from the bucket and path
	Bucket   string // the GCS bucket name.
	ExpDir   string // the experiment directory.
	DataType string //
	DatePath string // the YYYY/MM/DD date path.
	// The rest are from the filename
	PackedDate string // the YYYYMMDD date.
	PackedTime string // the HHMMSS time.
	DataType2  string // new platform also embeds the data type in the filename
	Host       string // the short server name, e.g. mlab1.
	Site       string // the pod/site name, e.g. ams02.
	Experiment string // the experiment name, e.g. ndt, typically identical to ExpDir
	FileNumber string // the file number, e.g. 0001
	Embargo    string // optional
	Suffix     string // the archive suffix, e.g. .tgz
}

DataPath breaks out the components of a task filename.

func ValidateTestPath

func ValidateTestPath(path string) (DataPath, error)

ValidateTestPath validates a task filename.

func (DataPath) GetDataType

func (dp DataPath) GetDataType() DataType

GetDataType finds the type of data stored in a file from its complete filename

func (DataPath) TableBase

func (dp DataPath) TableBase() string

TableBase returns the base bigquery table name associated with the DataPath data type.

type DataType

type DataType string

DataType identifies the type of data handled by a parser.

func (DataType) BQBufferSize

func (dt DataType) BQBufferSize() int

BQBufferSize returns the appropriate BQ insert buffer size.

func (DataType) BigqueryProject

func (dt DataType) BigqueryProject() string

BigqueryProject returns the appropriate project.

func (DataType) Dataset

func (dt DataType) Dataset() string

Dataset returns the appropriate dataset to use. This is a bit of a hack, but works for our current needs.

func (DataType) Table

func (dt DataType) Table() string

Table returns the appropriate table to use.

type Inserter

type Inserter interface {
	// Put synchronously sends a slice of rows to BigQuery
	// This is THREADSAFE
	Put(rows []interface{}) error
	// PutAsync asynchronously sends a slice of rows to BigQuery.
	// It is THREADSAFE.
	// It may block if there is already a Put (or Flush) in progress.
	// To synchronize following PutAsync, call one of the stats functions,
	// e.g. Commited() or Failed()
	PutAsync(rows []interface{})

	// InsertRow inserts one row into the insert buffer.
	// Deprecated:  Please use AddRow and FlushAsync instead.
	InsertRow(data interface{}) error
	// InsertRows inserts multiple rows into the insert buffer.
	// Deprecated:  Please use AddRow and FlushAsync instead.
	InsertRows(data []interface{}) error

	// Flush flushes any rows in the buffer out to bigquery.
	// This is synchronous - on return, rows should be committed.
	// Deprecated:  Please use external buffer, Put, and PutAsync instead.
	Flush() error

	// Base Table name of the BQ table that the uploader pushes to.
	TableBase() string
	// Table name suffix of the BQ table that the uploader pushes to.
	TableSuffix() string
	// Full table name of the BQ table that the uploader pushes to,
	// including $YYYYMMNN, or _YYYYMMNN
	FullTableName() string
	// Dataset name of the BQ dataset containing the table.
	Dataset() string
	// Project name
	Project() string

	RowStats // Inserter must implement RowStats
}

Inserter is a data sink that writes to BigQuery tables. Inserters should provide the invariants:

After Flush() returns, RowsInBuffer == 0

type InserterParams

type InserterParams struct {
	// These specify the google cloud project:dataset.table to write to.
	Project string
	Dataset string
	Table   string

	// Suffix may be table suffix _YYYYMMDD or partition $YYYYMMDD
	Suffix string // Table name suffix for templated tables or partitions.

	BufferSize int // Number of rows to buffer before writing to backend.

	PutTimeout    time.Duration // max duration of bigquery Put ops.  (for context)
	MaxRetryDelay time.Duration // Maximum backoff time for Put retries.
}

InserterParams for NewInserter

type Parser

type Parser interface {
	// IsParsable reports a canonical file "kind" and whether the file appears to
	// be parsable based on the name and content size. A true result does not
	// guarantee that ParseAndInsert will succeed, but a false result means that
	// ParseAndInsert can be skipped.
	IsParsable(testName string, test []byte) (string, bool)

	// meta - metadata, e.g. from the original tar file name.
	// testName - Name of test file (typically extracted from a tar file)
	// test - binary test data
	ParseAndInsert(meta map[string]bigquery.Value, testName string, test []byte) error

	// Flush flushes any pending rows.
	Flush() error

	// TableName of the table that this Parser inserts into.
	// Used for metrics and logging.
	TableName() string

	// FullTableName of the BQ table that the uploader pushes to,
	// including $YYYYMMNN, or _YYYYMMNN
	FullTableName() string

	// Task level error, based on failed rows, or any other criteria.
	TaskError() error

	RowStats // Parser must implement RowStats
}

Parser is the generic interface implemented by each experiment parser.

type ProcessingError

type ProcessingError interface {
	DataType() string
	Detail() string
	Code() int
	error
}

ProcessingError extends error to provide dataType and detail for metrics, and appropriate return codes for http handlers.

type RowStats

type RowStats interface {
	// RowsInBuffer returns the count of rows currently in the buffer.
	RowsInBuffer() int
	// Committed returns the count of rows successfully committed to BQ.
	Committed() int
	// Accepted returns the count of all rows received through InsertRow(s)
	Accepted() int
	// Failed returns the count of all rows that could not be committed.
	Failed() int
}

RowStats interface defines some useful Inserter stats that will also be implemented by Parser. RowStats implementations should provide the invariants:

Accepted == Failed + Committed + RowsInBuffer

type TestSource

type TestSource interface {
	// NextTest reads the next test object from the tar file.
	// Skips reading contents of any file larger than maxSize, returning empty data
	// and storage.ErrOversizeFile.
	// Returns io.EOF when there are no more tests.
	NextTest(maxSize int64) (string, []byte, error)
	Close() error

	Detail() string   // Detail for logs.
	Type() string     // Data type for logs and metrics
	Date() civil.Date // Date associated with test source
}

TestSource provides a source of test data.

type Uploader

type Uploader interface {
	Put(ctx context.Context, src interface{}) error
}

Uploader defines the BQ Uploader interface, for injection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL