ingestion

package
v0.0.0-...-03d6fc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2019 License: BSD-3-Clause Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TABLE_FILES_PROCESSED is the table to keep track of processed files.
	TABLE_FILES_PROCESSED = "files-processed"

	// COLFAM_FILES_PROCESSED is the column family used to keep track of processed files.
	COLFAM_FILES_PROCESSED = "fproc"

	// COL_STATE is the column used to keep track of processed files.
	COL_STATE = "st"
)
View Source
const (
	MEASUREMENT_INGESTION = "ingestion"
	TAG_INGESTION_METRIC  = "metric"
	TAG_INGESTER_ID       = "ingester"
	TAG_INGESTER_SOURCE   = "source"
)

Tag names used to collect metrics.

View Source
const (
	// Limit the number of times the ingester tries to get a file before giving up.
	MAX_URI_GET_TRIES = 4
)
View Source
const PROCESSED_FILES_BUCKET = "processed_files"

BoltDB bucket where MD5 hashes of processed files are stored.

Variables

View Source
var (
	// VAL_TRUE is a true value.
	VAL_TRUE = []byte("t")

	// BigTableConfig describes the tables and column families used by this
	// package. It can be used by bt.InitBigtable to set up the tables.
	BigTableConfig = bt.TableConfig{
		TABLE_FILES_PROCESSED: {
			COLFAM_FILES_PROCESSED,
		},
	}
)
View Source
var (
	// IgnoreResultsFileErr can be returned by the Process function of a processor to
	// indicated that this file should be considered ignored. It is up to the processor
	// to write to the log.
	IgnoreResultsFileErr = errors.New("Ignore this file.")
)

Functions

func MockVCS

func MockVCS(commits []*vcsinfo.LongCommit, depsContentMap map[string]string, pathContentMap map[string]string) vcsinfo.VCS

MockVCS returns an instance of VCS that returns the commits passed as arguments. To control the GetFile function use these two parameters:

depsContentMap maps commits from a hash to a dependency file.
pathContentMap maps file names to string content.

Currently the GetFile function will only consider the fileName or the hash but not a combination of both. The fileName has priority.

func Register

func Register(id string, constructor Constructor)

Register registers the given constructor to create an instance of a Processor.

Types

type BTIStore

type BTIStore struct {
	// contains filtered or unexported fields
}

BTIStore implementes the IngestionStore interface.

func (*BTIStore) Clear

func (b *BTIStore) Clear() error

Clear implements the IngestionStore interface.

func (*BTIStore) Close

func (b *BTIStore) Close() error

Close implement the IngestionStore interface.

func (*BTIStore) ContainsResultFileHash

func (b *BTIStore) ContainsResultFileHash(md5Sum string) (bool, error)

ContainsResultFileHash implements the IngestionStore interface.

func (*BTIStore) SetResultFileHash

func (b *BTIStore) SetResultFileHash(md5Sum string) error

SetResultFileHash implements the IngestionStore interface.

type Constructor

type Constructor func(vcs vcsinfo.VCS, config *sharedconfig.IngesterConfig, client *http.Client, eventBus eventbus.EventBus) (Processor, error)

Constructor is the signature that has to be implemented to register a Processor implementation to be instantiated by name from a config struct.

vcs is an instance that might be shared across multiple ingesters.
config is ususally parsed from a JSON5 file.
client can be assumed to be ready to serve the needs of the resulting Processor.
eventBus is the eventbus to be used by the ingester (optional).

type FileSystemSource

type FileSystemSource struct {
	// contains filtered or unexported fields
}

FileSystemSource implements the Source interface to read from the local file system.

func (*FileSystemSource) ID

func (f *FileSystemSource) ID() string

See Source interface.

func (*FileSystemSource) Poll

func (f *FileSystemSource) Poll(startTime, endTime int64) <-chan ResultFileLocation

See Source interface.

func (*FileSystemSource) SetEventChannel

func (f *FileSystemSource) SetEventChannel(resultCh chan<- ResultFileLocation) error

SetEventChannel implements the Source interface.

type GoogleStorageSource

type GoogleStorageSource struct {
	// contains filtered or unexported fields
}

GoogleStorageSource implements the Source interface for Google Storage.

func (*GoogleStorageSource) ID

func (g *GoogleStorageSource) ID() string

See Source interface.

func (*GoogleStorageSource) Poll

func (g *GoogleStorageSource) Poll(startTime, endTime int64) <-chan ResultFileLocation

See Source interface.

func (*GoogleStorageSource) SetEventChannel

func (g *GoogleStorageSource) SetEventChannel(resultCh chan<- ResultFileLocation) error

SetEventChannel implements the Source interface.

type Ingester

type Ingester struct {
	// contains filtered or unexported fields
}

Ingester is the main type that drives ingestion for a single type.

func IngestersFromConfig

func IngestersFromConfig(ctx context.Context, config *sharedconfig.Config, client *http.Client, eventBus eventbus.EventBus, ingestionStore IngestionStore) ([]*Ingester, error)

IngestersFromConfig creates a list of ingesters from a config struct. Usually the struct is created from parsing a config file. client is assumed to be suitable for the given application. If e.g. the processors of the current application require an authenticated http client, then it is expected that client meets these requirements.

func NewIngester

func NewIngester(ingesterID string, ingesterConf *sharedconfig.IngesterConfig, vcs vcsinfo.VCS, sources []Source, processor Processor, ingestionStore IngestionStore, eventBus eventbus.EventBus) (*Ingester, error)

NewIngester creates a new ingester with the given id and configuration around the supplied vcs (version control system), input sources and Processor instance. The ingester is event driven by storage events with a background process that polls the storage locations. The given eventBus cannot be nil and must be shared with the sources that are passed. To only do polling-based ingestion use an in-memory eventbus (created via eventbus.New()). To drive ingestion from storage events use a PubSub-based eventbus (created via the gevent.New(...) function).

func (*Ingester) Close

func (i *Ingester) Close() error

Close stops the ingestion process. Currently only used for testing. It's mainly intended to terminate as many goroutines as possible.

func (*Ingester) Start

func (i *Ingester) Start(ctx context.Context) error

Start starts the ingester in a new goroutine.

type IngestionStore

type IngestionStore interface {
	// Clear completely clears the datastore. Mostly used for testing.
	Clear() error

	// SetResultFileHash sets the given md5 hash in the database.
	SetResultFileHash(md5 string) error

	// ContainsResultFileHash returns true if the provided md5 hash is in the DB.
	ContainsResultFileHash(md5 string) (bool, error)

	// Close closes the ingestion store.
	Close() error
}

IngestionStore keeps track of files being ingested based on their MD5 hashes.

func NewBTIStore

func NewBTIStore(projectID, bigTableInstance, nameSpace string) (IngestionStore, error)

NewBTIStore creates a BigTable backed implemenation of IngestionStore. nameSpace is a prefix that is added to every row key to allow multitenancy.

type MockVCSImpl

type MockVCSImpl struct {
	// contains filtered or unexported fields
}

func (MockVCSImpl) ByIndex

func (m MockVCSImpl) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error)

func (MockVCSImpl) Details

func (m MockVCSImpl) Details(ctx context.Context, hash string, getBranches bool) (*vcsinfo.LongCommit, error)

func (MockVCSImpl) From

func (m MockVCSImpl) From(start time.Time) []string

func (MockVCSImpl) GetFile

func (m MockVCSImpl) GetFile(ctx context.Context, fileName, commitHash string) (string, error)

func (MockVCSImpl) IndexOf

func (m MockVCSImpl) IndexOf(ctx context.Context, hash string) (int, error)

func (MockVCSImpl) LastNIndex

func (m MockVCSImpl) LastNIndex(N int) []*vcsinfo.IndexCommit

func (MockVCSImpl) Range

func (m MockVCSImpl) Range(begin, end time.Time) []*vcsinfo.IndexCommit

func (MockVCSImpl) ResolveCommit

func (m MockVCSImpl) ResolveCommit(ctx context.Context, commitHash string) (string, error)

func (MockVCSImpl) SetSecondaryRepo

func (m MockVCSImpl) SetSecondaryRepo(secVCS vcsinfo.VCS, extractor depot_tools.DEPSExtractor)

func (MockVCSImpl) Update

func (m MockVCSImpl) Update(ctx context.Context, pull, allBranches bool) error

type Processor

type Processor interface {
	// Process ingests a single result file.
	Process(ctx context.Context, resultsFile ResultFileLocation) error
}

Processor is the core of an ingester. It takes instances of ResultFileLocation and ingests them. It is responsible for the storage of ingested data.

type ResultFileLocation

type ResultFileLocation interface {
	// Open returns a reader that allows to read the content of the file.
	Open() (io.ReadCloser, error)

	// Name returns the full path of the file. The last segment is usually the
	// the file name.
	Name() string

	// StorageIDs return the bucket and object ID for the given location.
	StorageIDs() (string, string)

	// MD5 returns the MD5 hash of the content of the file.
	MD5() string

	// Timestamp returns the timestamp when the file was last updated.
	TimeStamp() int64

	// Content returns the content of the file if has been read or nil otherwise.
	Content() []byte
}

ResultFileLocation is an abstract interface to a file like object that contains results that need to be ingested.

func FileSystemResult

func FileSystemResult(path, rootDir string) (ResultFileLocation, error)

FileSystemResult returns a ResultFileLocation for files. path is the path where the target file resides and rootDir is the root of all paths.

type Source

type Source interface {
	// ID returns a unique identifier for this source.
	ID() string

	// Poll returns a channel to read all the result files that originated between
	// the given timestamps in seconds since the epoch.
	Poll(startTime, endTime int64) <-chan ResultFileLocation

	// SetEventChannel configures storage events and sets up routines to send
	// new results to the given channel.
	SetEventChannel(resultCh chan<- ResultFileLocation) error
}

Source defines an ingestion source that returns lists of result files either through polling or in an event driven mode.

func NewFileSystemSource

func NewFileSystemSource(baseName, rootDir string) (Source, error)

func NewGoogleStorageSource

func NewGoogleStorageSource(baseName, bucket, rootDir string, client *http.Client, eventBus eventbus.EventBus) (Source, error)

NewGoogleStorageSource returns a new instance of GoogleStorageSource based on the bucket and directory provided. The id is used to identify the Source and is generally the same id as the ingester.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL