datacatalog

package
v0.0.0-...-267b159 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DatasetIDToIdentifier

func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier

func EventCatalogMetadata

func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, sourceID *core.TaskExecutionIdentifier) *core.CatalogMetadata

Given the Catalog Information (returned from a Catalog call), returns the CatalogMetadata that is populated in the event.

func GenerateArtifactTagName

func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error)

Generate a tag by hashing the input values

func GenerateDatasetIDForTask

func GenerateDatasetIDForTask(ctx context.Context, k catalog.Key) (*datacatalog.DatasetID, error)

Get the DataSetID for a task. NOTE: the version of the task is a combination of both the discoverable_version and the task signature. This is because the interface may of changed even if the discoverable_version hadn't.

func GenerateTaskOutputsFromArtifact

func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, artifact *datacatalog.Artifact) (*core.LiteralMap, error)

Transform the artifact Data into task execution outputs as a literal map

func GetArtifactMetadataForSource

func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata

func GetDatasetMetadataForSource

func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata

Understanding Catalog Identifiers DatasetID represents the ID of the dataset. For Nebula this represents the ID of the generating task and the version calculated as the hash of the interface & cache version. refer to `GenerateDatasetIDForTask` TaskID is the same as the DatasetID + name: (DataSetID - namespace) + task version which is stored in the metadata ExecutionID is stored only in the metadata (project and domain available after Jul-2020) NodeExecID = Execution ID + Node ID (available after Jul-2020) TaskExecID is the same as the NodeExecutionID + attempt (attempt is available in Metadata) after Jul-2020

func GetOrDefault

func GetOrDefault(m map[string]string, key, defaultValue string) string

Returns a default value, if the given key is not found in the map, else returns the value in the map

func GetSourceFromMetadata

func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) (*core.TaskExecutionIdentifier, error)

GetSourceFromMetadata returns the Source TaskExecutionIdentifier from the catalog metadata For all the information not available it returns Unknown. This is because as of July-2020 Catalog does not have all the information. After the first deployment of this code, it will have this and the "unknown's" can be phased out

Types

type CatalogClient

type CatalogClient struct {
	// contains filtered or unexported fields
}

CatalogClient is the client that caches task executions to DataCatalog service.

func NewDataCatalog

func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error)

NewDataCatalog creates a new Datacatalog client for task execution caching

func (*CatalogClient) CreateArtifact

func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error)

CreateArtifact creates an Artifact in datacatalog including its associated ArtifactData and tags it with a hash of the provided input values for retrieval.

func (*CatalogClient) CreateDataset

func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)

CreateDataset creates a Dataset in datacatalog including the associated metadata.

func (*CatalogClient) Get

func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error)

Get the cached task execution from Catalog. These are the steps taken: - Verify there is a Dataset created for the Task - Lookup the Artifact that is tagged with the hash of the input values - The artifactData contains the literal values that serve as the task outputs

func (*CatalogClient) GetArtifactByTag

func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)

GetArtifactByTag retrieves an artifact using the provided tag and dataset.

func (*CatalogClient) GetDataset

func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)

GetDataset retrieves a dataset that is associated with the task represented by the provided catalog.Key.

func (*CatalogClient) GetOrExtendReservation

func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error)

GetOrExtendReservation attempts to get a reservation for the cacheable task. If you have previously acquired a reservation it will be extended. If another entity holds the reservation that is returned.

func (*CatalogClient) Put

func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)

Put stores the result of a task execution as a cached Artifact and associates it with the data by tagging it with the hash of the input values. The CatalogClient will ensure a dataset exists for the Artifact to be created. A Dataset represents the project/domain/name/version of the task executed. Lastly, CatalogClient will create an Artifact tagged with the input value hash and store the provided execution data.

func (*CatalogClient) ReleaseReservation

func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error

ReleaseReservation attempts to release a reservation for a cacheable task. If the reservation does not exist (e.x. it never existed or has been acquired by another owner) then this call still succeeds.

func (*CatalogClient) Update

func (m *CatalogClient) Update(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)

Update stores the result of a task execution as a cached Artifact, overwriting any already stored data from a previous execution. The CatalogClient will ensure the referenced dataset exists and will silently create a new Artifact if the referenced key does not exist in datacatalog yet. After the operation succeeds, an artifact with the given key and data will be stored in catalog and a tag with the has of the input values will exist.

func (*CatalogClient) UpdateArtifact

func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error)

UpdateArtifact overwrites the ArtifactData of an existing artifact with the provided data in datacatalog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL