catalog

package
v0.0.0-...-267b159 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: Apache-2.0 Imports: 22 Imported by: 8

Documentation

Index

Constants

View Source
const (
	ErrResponseNotReady errors.ErrorCode = "RESPONSE_NOT_READY"
	ErrSystemError      errors.ErrorCode = "SYSTEM_ERROR"
)

Variables

This section is empty.

Functions

func HashLiteralMap

func HashLiteralMap(ctx context.Context, literalMap *core.LiteralMap) (string, error)

func IsNotFound

func IsNotFound(err error) bool

func NewWriterProcessor

func NewWriterProcessor(catalogClient Client) workqueue.Processor

Types

type AsyncClient

type AsyncClient interface {
	// Returns if an entry exists for the given task and input. It returns the data as a LiteralMap
	Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)

	// Adds a new entry to catalog for the given task execution context and the generated output
	Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)
}

An interface that helps async interaction with catalog service

type AsyncClientImpl

type AsyncClientImpl struct {
	Reader workqueue.IndexedWorkQueue
	Writer workqueue.IndexedWorkQueue
}

An async-client for catalog that can queue download and upload requests on workqueues.

func NewAsyncClient

func NewAsyncClient(client Client, cfg Config, scope promutils.Scope) (AsyncClientImpl, error)

func (AsyncClientImpl) Download

func (c AsyncClientImpl) Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)

func (AsyncClientImpl) Start

func (c AsyncClientImpl) Start(ctx context.Context) error

func (AsyncClientImpl) Upload

func (c AsyncClientImpl) Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)

type Client

type Client interface {
	// Get returns the artifact associated with the given key.
	Get(ctx context.Context, key Key) (Entry, error)
	// GetOrExtendReservation tries to retrieve a (valid) reservation for the given key, creating a new one using the
	// specified owner ID if none was found or updating an existing one if it has expired.
	GetOrExtendReservation(ctx context.Context, key Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error)
	// Put stores the given data using the specified key, creating artifact entries as required.
	// To update an existing artifact, use Update instead.
	Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
	// Update updates existing data stored at the specified key, overwriting artifact entries with the new data provided.
	// To create a new (non-existent) artifact, use Put instead.
	Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
	// ReleaseReservation releases an acquired reservation for the given key and owner ID.
	ReleaseReservation(ctx context.Context, key Key, ownerID string) error
}

Client represents the default Catalog client that allows memoization and indexing of intermediate data in Nebula

type Config

type Config struct {
	ReaderWorkqueueConfig workqueue.Config `` /* 168-byte string literal not displayed */
	WriterWorkqueueConfig workqueue.Config `` /* 168-byte string literal not displayed */
}

func GetConfig

func GetConfig() *Config

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type DownloadFuture

type DownloadFuture interface {
	Future

	// Gets the actual response from the future. This will return an error if the future isn't ready yet.
	GetResponse() (DownloadResponse, error)
}

Catalog download future to represent async process of downloading catalog artifacts.

type DownloadRequest

type DownloadRequest struct {
	Key    Key
	Target io.OutputWriter
}

Catalog Download Request to represent async operation download request.

type DownloadResponse

type DownloadResponse interface {
	// Gets a bit set representing which items from the request were cached.
	GetCachedResults() *bitarray.BitSet

	// Gets the total size of the cached result.
	GetResultsSize() int

	// A convenience method to retrieve the number of cached items.
	GetCachedCount() int
}

Catalog download response.

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Indicates the Entry in Catalog that was populated

func NewCatalogEntry

func NewCatalogEntry(outputs io.OutputReader, status Status) Entry

func NewFailedCatalogEntry

func NewFailedCatalogEntry(status Status) Entry

func (Entry) GetOutputs

func (e Entry) GetOutputs() io.OutputReader

func (Entry) GetStatus

func (e Entry) GetStatus() Status

type Future

type Future interface {
	// Gets the response status for the future. If the future represents multiple operations, the status will only be
	// ready if all of them are.
	GetResponseStatus() ResponseStatus

	// Sets a callback handler to be called when the future status changes to ready.
	OnReady(handler ReadyHandler)

	GetResponseError() error
}

A generic Future interface to represent async operations results

type Key

type Key struct {
	Identifier     core.Identifier
	CacheVersion   string
	TypedInterface core.TypedInterface
	InputReader    io.InputReader
}

An identifier for a catalog object.

func (Key) String

func (k Key) String() string

type Metadata

type Metadata struct {
	WorkflowExecutionIdentifier *core.WorkflowExecutionIdentifier
	NodeExecutionIdentifier     *core.NodeExecutionIdentifier
	TaskExecutionIdentifier     *core.TaskExecutionIdentifier
}

Metadata to be associated with the catalog object

type ReaderProcessor

type ReaderProcessor struct {
	// contains filtered or unexported fields
}

func NewReaderProcessor

func NewReaderProcessor(catalogClient Client) ReaderProcessor

func (ReaderProcessor) Process

type ReaderWorkItem

type ReaderWorkItem struct {
	// contains filtered or unexported fields
}

func NewReaderWorkItem

func NewReaderWorkItem(key Key, outputsWriter io.OutputWriter) *ReaderWorkItem

func (ReaderWorkItem) IsCached

func (item ReaderWorkItem) IsCached() bool

type ReadyHandler

type ReadyHandler func(ctx context.Context, future Future)

type ReservationEntry

type ReservationEntry struct {
	// contains filtered or unexported fields
}

ReservationEntry encapsulates the current state of an artifact reservation within the catalog

func NewReservationEntry

func NewReservationEntry(expiresAt time.Time, heartbeatInterval time.Duration, ownerID string, status core.CatalogReservation_Status) ReservationEntry

Creates a new ReservationEntry populated with the specified parameters

func NewReservationEntryStatus

func NewReservationEntryStatus(status core.CatalogReservation_Status) ReservationEntry

Creates a new ReservationEntry using the status, all other fields are set to default values

func (ReservationEntry) GetExpiresAt

func (r ReservationEntry) GetExpiresAt() time.Time

Returns the expiration timestamp at which the reservation will no longer be valid

func (ReservationEntry) GetHeartbeatInterval

func (r ReservationEntry) GetHeartbeatInterval() time.Duration

Returns the heartbeat interval, denoting how often the catalog expects a reservation extension request

func (ReservationEntry) GetOwnerID

func (r ReservationEntry) GetOwnerID() string

Returns the ID of the current reservation owner

func (ReservationEntry) GetStatus

Returns the status of the attempted reservation operation

type ResponseStatus

type ResponseStatus uint8
const (
	ResponseStatusNotReady ResponseStatus = iota
	ResponseStatusReady
)

type Status

type Status struct {
	// contains filtered or unexported fields
}

Indicates that status of the query to Catalog. This can be returned for both Get and Put calls

func NewStatus

func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status

func (Status) GetCacheStatus

func (s Status) GetCacheStatus() core.CatalogCacheStatus

func (Status) GetMetadata

func (s Status) GetMetadata() *core.CatalogMetadata

type UploadFuture

type UploadFuture interface {
	Future
}

Catalog Sidecar future to represent async process of uploading catalog artifacts.

type UploadRequest

type UploadRequest struct {
	Key              Key
	ArtifactData     io.OutputReader
	ArtifactMetadata Metadata
}

type WriterWorkItem

type WriterWorkItem struct {
	// contains filtered or unexported fields
}

func NewWriterWorkItem

func NewWriterWorkItem(key Key, data io.OutputReader, metadata Metadata) *WriterWorkItem

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL