protocol

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//ErrUniqueConstraintNotFound is an error
	ErrUniqueConstraintNotFound = errors.New("unique constraint not found")

	//ErrTableMetadataNotFound should be thrown when a table is not exist
	ErrTableMetadataNotFound = errors.New("table metadata not found")
)
View Source
var (
	//ErrProfileNotFound is an error
	ErrProfileNotFound = errors.New("profile not found")
	//ErrProfileInvalid when a profile doesnt have any status in status Log
	//normally profile at least has one status in status Log
	ErrProfileInvalid = errors.New("profile invalid")
)
View Source
var (
	//ErrAuditIDInvalid thrown when access audit that is not belong to a profile
	ErrAuditIDInvalid = errors.New("error audit not belong to profile")
)
View Source
var (
	//ErrAuditNotFound is an error
	ErrAuditNotFound = errors.New("audit not found")
)
View Source
var (
	//ErrAuditResultNotFound is an error
	ErrAuditResultNotFound = errors.New("audit result not found")
)
View Source
var ErrEntityNotFound = errors.New("entity not found")
View Source
var ErrFileNotFound = errors.New("file not found")
View Source
var (
	//ErrNoProfileMetricFound thrown when metric store do not find any profile metric record
	ErrNoProfileMetricFound = errors.New("no profile metric found")
)
View Source
var ErrPartitionExpressionIsNotSupported = errors.New("partition expression is not supported for this table")
View Source
var (
	//ErrStatusNotFound is an error when getting status
	ErrStatusNotFound = errors.New("status not found")
)
View Source
var (
	//ErrToleranceNotFound thrown when tolerance for a tableID not found
	ErrToleranceNotFound = errors.New("tolerance for tableID not found")
)
View Source
var Ext = ".yaml"

Ext is default yaml file extension

View Source
var GitSshUrlPattern = regexp.MustCompile(`^git@.+\.git$`)

GitSshUrlPattern is regex pattern of supported git ssh url format The supported is git ssh format git@www.git.com:group/repo.git this can be used to verify the supported format

Functions

func FormIssueSummary

func FormIssueSummary(auditResults []*AuditReport) string

FormIssueSummary create issue summary

func IsSpecInvalidError

func IsSpecInvalidError(err error) bool

func IsUploadSpecValidationError

func IsUploadSpecValidationError(err error) bool

Types

type AuditGroup

type AuditGroup []*AuditReport

AuditGroup is a type to do group by operation on AuditReport

func (AuditGroup) ByFieldID

func (ag AuditGroup) ByFieldID() map[string][]*AuditReport

ByFieldID group by field ID

func (AuditGroup) ByGroupValue

func (ag AuditGroup) ByGroupValue() map[string][]*AuditReport

ByGroupValue group by group value

func (AuditGroup) ByPartitionDate

func (ag AuditGroup) ByPartitionDate() map[string][]*AuditReport

ByPartitionDate group by partition date

type AuditPublisher

type AuditPublisher interface {
	PublishAuditResult(audit *job.Audit, auditResult []*AuditReport) error
	Close(ctx context.Context) error
}

AuditPublisher for publisher for audit

type AuditReport

type AuditReport struct {
	AuditID        string
	Partition      string
	GroupValue     string
	TableURN       string
	FieldID        string
	MetricName     metric.Type
	MetricValue    float64
	Condition      string
	Metadata       map[string]interface{}
	ToleranceRules []ToleranceRule
	PassFlag       bool
	EventTimestamp time.Time
}

AuditReport is the result of audit

type AuditResult

type AuditResult struct {
	Audit        *job.Audit
	AuditReports []*AuditReport
}

AuditResult is audit job and the report detail

type AuditResultStore

type AuditResultStore interface {
	StoreResults(results []*AuditReport) error
}

AuditResultStore to store the auditing result

type AuditService

type AuditService interface {
	//RunAudit start audit service
	RunAudit(profileID string) (*AuditResult, error)
}

AuditService is service of auditor

type AuditStore

type AuditStore interface {
	CreateAudit(audit *job.Audit) (*job.Audit, error)
	UpdateAudit(audit *job.Audit) error
}

AuditStore is store for audit entity

type AuditSummary

type AuditSummary struct {
	IsPass  bool
	Message string
}

AuditSummary is summary of audit

type AuditSummaryFactory

type AuditSummaryFactory interface {
	Create(auditResults []*AuditReport, auditJob *job.Audit) (*AuditSummary, error)
}

type Auditor

type Auditor interface {
	Audit(audit *job.Audit) ([]*AuditReport, error)
}

Auditor to compare quality result with tolerances

type BigqueryJob

type BigqueryJob struct {
	ID        string
	ProfileID string

	//BqID is bigquery job ID provided by query execution
	BqID string

	CreatedAt time.Time
}

BigqueryJob bigquery job information of a profile task

type BigqueryJobStore

type BigqueryJobStore interface {
	Store(bigqueryJob *BigqueryJob) error
}

BigqueryJobStore to store Bigquery job created by profile job

type Comparator

type Comparator string

Comparator comparator of tolerance rule

const (
	//ComparatorLessThan metric < value
	ComparatorLessThan Comparator = "less_than"
	//ComparatorLessThanEq metric <= value
	ComparatorLessThanEq Comparator = "less_than_eq"
	//ComparatorMoreThan metric > value
	ComparatorMoreThan Comparator = "more_than"
	//ComparatorMoreThanEq metric >= value
	ComparatorMoreThanEq Comparator = "more_than_eq"
)

func (Comparator) String

func (c Comparator) String() string

type ConstraintStore

type ConstraintStore interface {
	FetchConstraints(tableID string) ([]string, error)
}

ConstraintStore interface of store that

type Entity

type Entity struct {
	ID            string
	Name          string
	Environment   string
	GitURL        string
	GcpProjectIDs []string
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

Entity is information about an entity

type EntityFinder

type EntityFinder []*Entity

func (EntityFinder) FindByProjectID

func (e EntityFinder) FindByProjectID(projectID string) (*Entity, error)

type EntityStore

type EntityStore interface {
	Save(entity *Entity) (*Entity, error)
	Create(entity *Entity) (*Entity, error)
	Get(ID string) (*Entity, error)
	GetEntityByGitURL(gitURL string) (*Entity, error)
	GetEntityByProjectID(gcpProjectID string) (*Entity, error)
	GetAll() ([]*Entity, error)
	Update(entity *Entity) (*Entity, error)
}

EntityStore is storage for Entity

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry as an entry struct for logging

func NewEntry

func NewEntry() Entry

NewEntry to construct Entry

func (Entry) Group

func (e Entry) Group() string

Group to get group

func (Entry) JobID

func (e Entry) JobID() string

JobID to get jobID

func (Entry) JobType

func (e Entry) JobType() job.Type

JobType to get jobType

func (Entry) Partition

func (e Entry) Partition() string

Partition to get partition

func (Entry) Status

func (e Entry) Status() string

Status to get status

func (Entry) TableURN

func (e Entry) TableURN() string

TableURN to get tableURN

func (Entry) WithGroup

func (e Entry) WithGroup(group string) Entry

WithGroup to set group

func (Entry) WithJobID

func (e Entry) WithJobID(jobID string) Entry

WithJobID to set Profile or AuditReport Partition

func (Entry) WithJobType

func (e Entry) WithJobType(jobType job.Type) Entry

WithJobType to set job type

func (Entry) WithPartition

func (e Entry) WithPartition(partition string) Entry

WithPartition to set partition

func (Entry) WithStatus

func (e Entry) WithStatus(status string) Entry

WithStatus to set status

func (Entry) WithTableURN

func (e Entry) WithTableURN(tableURN string) Entry

WithTableURN to set table URN

type ErrSpecInvalid

type ErrSpecInvalid struct {
	URN    string
	Errors []error
}

ErrSpecInvalid error thrown when a spec content is invalid, contains list of errors

func (*ErrSpecInvalid) Error

func (e *ErrSpecInvalid) Error() string

type ErrUploadSpecValidation

type ErrUploadSpecValidation struct {
	Errors []error
}

ErrUploadSpecValidation thrown when upload failed caused by invalid spec, contains list of invalid spec errors

func (*ErrUploadSpecValidation) Error

func (s *ErrUploadSpecValidation) Error() string

type File

type File struct {
	Path    string
	Content []byte
}

File a file

type FileStore

type FileStore interface {
	Get(filePath string) (*File, error)
	GetAll() ([]*File, error)
	GetPaths() ([]string, error)
	Create(file *File) error
	Delete(filePath string) error
}

FileStore is storage that contains tolerance spec in a file

type FileStoreFactory

type FileStoreFactory interface {
	Create(URL string) (FileStore, error)
}

FileStoreFactory is creator of FileStore

type GitAuth

type GitAuth interface {
	transport.AuthMethod
}

GitAuth describes authentication configuration for GitRepository

type GitInfo

type GitInfo struct {

	//URL is a git url it must comply GitSshUrlPattern pattern format
	//this will be used to do git clone using ssh protocol
	URL string

	//CommitID is commit id that will be to be checked out
	//empty string means checking out latest revision
	CommitID string

	//PathPrefix is path prefix on git repository where the predator spec root directory structure is located
	//for example using Default directory structure, the files is flatly placed on a folder
	//if git url is git@github.com:username/project.git then in the repository the spec files placed under PathPrefix/ dir
	PathPrefix string
}

GitInfo git repository information

type GitRepository

type GitRepository interface {
	Checkout(commit string) (FileStore, error)
}

GitRepository a git repository

type GitRepositoryFactory

type GitRepositoryFactory interface {
	Create(url string) GitRepository
	CreateWithPrefix(url string, pathPrefix string) GitRepository
}

GitRepositoryFactory creator of GitRepository

type Label

type Label struct {
	Project string
	Dataset string
	Table   string
}

Label is structured bigquery resource identifier

func ParseLabel

func ParseLabel(URN string) (*Label, error)

ParseLabel create Label from string formatted bigquery fully qualified identifier

func (*Label) String

func (l *Label) String() string

type Message

type Message struct {
	Key   proto.Message
	Value proto.Message
}

type MessageProvider

type MessageProvider interface {
	Get() (*Message, error)
}

type MessageProviderFactory

type MessageProviderFactory interface {
	CreateProfileMessage(profile *job.Profile, metrics []*metric.Metric) []MessageProvider
	CreateAuditMessage(audit *job.Audit, auditResult []*AuditReport) []MessageProvider
}

type MetadataStore

type MetadataStore interface {
	//GetMetadata to fetch metadata as requirement for profiling
	GetMetadata(tableID string) (*meta.TableSpec, error)
	//GetUniqueConstraints to fetch unique constraints to calculate duplication metric
	GetUniqueConstraints(tableID string) ([]string, error)
}

MetadataStore is store to get metadata information

type MetricGenerator

type MetricGenerator interface {
	//Generate metrics
	Generate(entry Entry, config *job.Profile) ([]*metric.Metric, error)
}

MetricsGenerator generate metric

type MetricProfiler

type MetricProfiler interface {
	Profile(entry Entry, profile *job.Profile, metricSpecs []*metric.Spec) ([]*metric.Metric, error)
}

MetricProfiler collect metrics, actually do metric calculation to obtain the value of metric

type MetricQuery

type MetricQuery struct {
	ProfileID   string
	Partition   string
	MetricTypes []metric.Type
	URN         string
}

MetricQuery is field selector to query metrics

type MetricResultIdentifier

type MetricResultIdentifier struct {
	TableURN  string
	StartDate string
	EndDate   string
}

MetricResultIdentifier to identify result beside ID

type MetricSpecGenerator

type MetricSpecGenerator interface {
	Generate(tableSpec *meta.TableSpec, tolerances []*Tolerance) ([]*metric.Spec, error)
	GenerateMetricSpec(urn string) ([]*metric.Spec, error)
}

MetricSpecGenerator produce metric specification to be collected

type MetricStore

type MetricStore interface {
	Store(profile *job.Profile, metrics []*metric.Metric) error
	GetMetricsByProfileID(ID string) ([]*metric.Metric, error)
}

MetricStore to store profile result

type PartitionScanner

type PartitionScanner interface {
	GetAffectedPartition(tableURN string, lastModifiedTimestamp time.Time) ([]string, error)
}

PartitionScanner to get affected partitions using last modified timestamp

type PathResolver

type PathResolver interface {
	GetPath(urn string) (string, error)
	GetURN(filePath string) (string, error)
}

PathResolver to get path from resource name with possibility of multiple layout

type PathType

type PathType string

PathType path of directory structure of a FileStore that is supported

const (
	//Git is path type that used to resolve git directory structure
	Git PathType = "git"
	//MultiTenancy is path type that used to resolve multi tenancy directory structure
	MultiTenancy PathType = "multi_tenancy"
	//Default simple path type, mapping from project.dataset.table to project.dataset.table.yaml
	Default PathType = "default"
)

type ProfileBQLogger

type ProfileBQLogger interface {
	Log(entry Entry, bqJobID string) error
}

ProfileBQLogger to log profile id and bq job id mapping

type ProfileConfig

type ProfileConfig struct {
	ProfileID   string
	TableSpec   *meta.TableSpec
	MetricSpecs []*metric.Spec
	Partition   string
}

ProfileConfig as an identifier to do profiling

type ProfileGroup

type ProfileGroup []*ProfileMetric

ProfileGroup is a type to do group by operation on ProfileMetric

func (ProfileGroup) ByPartitionDate

func (pg ProfileGroup) ByPartitionDate() map[string][]*ProfileMetric

ByPartitionDate group by partition date

type ProfileMetric

type ProfileMetric struct {
	ID             string
	ProfileID      string
	TableURN       string
	Partition      string
	FieldID        string
	OwnerType      metric.Owner
	Category       metric.Category
	Condition      string
	MetricName     metric.Type
	MetricValue    float64
	EventTimestamp time.Time
}

ProfileMetric is single metric that produced by publisher

type ProfilePublisher

type ProfilePublisher interface {
	Publish(profileJob *job.Profile, metrics []*metric.Metric) error
	Close(context.Context) error
}

ProfilePublisher for profiler

type ProfileService

type ProfileService interface {
	//CreateProfile create profile job
	CreateProfile(detail *job.Profile) (*job.Profile, error)
	Get(ID string) (*job.Profile, error)
	WaitAll(ctx context.Context) error
	GetLog(ID string) ([]*Status, error)
}

ProfileService is service of profiler

type ProfileStatisticGenerator

type ProfileStatisticGenerator interface {
	Generate(profile *job.Profile) error
}

ProfileStatisticGenerator generate profile statistic

type ProfileStore

type ProfileStore interface {
	Create(profile *job.Profile) (*job.Profile, error)
	Update(profile *job.Profile) error
	Get(ID string) (*job.Profile, error)
}

ProfileStore to store profile

type ProtoBuilder

type ProtoBuilder interface {
	Build() (proto.Message, error)
}

type Publisher

type Publisher interface {
	Publish(provider MessageProvider) error
	Close(ctx context.Context) error
}

type PublisherType

type PublisherType string

PublisherType type of supported publisher

const (
	//Kafka for publish to apache kafka
	Kafka PublisherType = "kafka"
	//Console for publish to terminal console or log (for testing purpose)
	Console PublisherType = "console"
	//Dummy if publish to none
	Dummy PublisherType = "none"
)

type QueryExecutor

type QueryExecutor interface {
	Run(profile *job.Profile, query string, queryType job.QueryType) ([]Row, error)
}

QueryExecutor that execute bigquery SQL query script return list of Row as result

type Row

type Row map[string]interface{}

Row is single table row the key of map is column name and the value is the cell value

type SQLExpressionFactory

type SQLExpressionFactory interface {
	CreatePartitionExpression(urn string) (string, error)
}

SQLExpressionFactory to generate SQL expression

type Sink

type Sink interface {
	Sink(message *Message) error
	Close(ctx context.Context) error
}

type SinkConfig

type SinkConfig struct {
	Type   PublisherType
	Broker []string
	Topic  string
}

type SinkFactory

type SinkFactory interface {
	Create(config *SinkConfig) Sink
}

type SpecValidator

type SpecValidator interface {
	//Validate content of data quality spec should return error ErrSpecInvalid when field or table not found
	Validate(spec *ToleranceSpec) error
}

type Status

type Status struct {
	ID             string
	JobID          string
	JobType        job.Type
	Status         string
	Message        string
	EventTimestamp time.Time
}

Status is status of any task

type StatusLogger

type StatusLogger interface {
	Log(entry Entry, message string) error
}

StatusLogger to log status

type StatusStore

type StatusStore interface {
	Store(status *Status) error
	GetLatestStatusByIDandType(jobID string, jobType job.Type) (*Status, error)
	GetStatusLogByIDandType(jobID string, jobType job.Type) ([]*Status, error)
}

StatusStore to store status of profile and audit process

type Task

type Task interface {
	Run() (interface{}, error)
}

Task is an unit of an operation

type Tolerance

type Tolerance struct {
	ID             string
	TableURN       string
	FieldID        string
	MetricName     metric.Type
	Condition      string //condition for invalid_pct metric
	Metadata       map[string]interface{}
	ToleranceRules []ToleranceRule
	CreatedAt      time.Time
	UpdatedAt      time.Time
}

Tolerance is tolerance of quality metrics

type ToleranceRule

type ToleranceRule struct {
	Comparator Comparator `json:"comparator"`
	Value      float64    `json:"value"`
}

ToleranceRule represents tolerance comparator and its value

type ToleranceSpec

type ToleranceSpec struct {
	URN        string
	Tolerances []*Tolerance
}

type ToleranceSpecStateStore

type ToleranceSpecStateStore interface {
	SaveTolerances(profileID string, tolerances []*Tolerance) error
	GetTolerancesByProfileID(profileID string) ([]*Tolerance, error)
}

ToleranceSpecStateStore store of tolerance to be used for profile and audit

type ToleranceStore

type ToleranceStore interface {
	Create(spec *ToleranceSpec) error
	GetByTableID(tableID string) (*ToleranceSpec, error)
	Delete(tableID string) error
	GetAll() ([]*ToleranceSpec, error)
	GetByProjectID(projectID string) ([]*ToleranceSpec, error)

	//GetResourceNames provide information all of tableID in the stored specs
	GetResourceNames() ([]string, error)
}

ToleranceStore to fetch the quality tolerances

type ToleranceStoreFactory

type ToleranceStoreFactory interface {
	Create(URL string, multiTenancyEnabled bool) (ToleranceStore, error)
	CreateWithOptions(store FileStore, pathType PathType) (ToleranceStore, error)
}

ToleranceStoreFactory creator of ToleranceStore

type UploadFactory

type UploadFactory interface {
	Create(gitRepo *GitInfo) (Task, error)
}

UploadFactory creator of UploadTask

type ValidatedMetric

type ValidatedMetric struct {
	Metric         *metric.Metric
	ToleranceRules []ToleranceRule
	PassFlag       bool
}

ValidatedMetric is metric audited

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL