delta

package module
v0.0.0-...-963cbb1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

README

Delta Go

License

Delta[https://delta.io/] native implementation of the delta protocol in go. This library started as a port of delta-rs[https://github.com/delta-io/delta-rs/tree/main/rust] This project is in alpha and the api is under development.

Current implementation is designed for highly concurrent writes, reads are not yet supported.

Usage

Create a Table with object store, state store and lock

	store := filestore.New(tmpPath)
	state := filestate.New(tmpPath, "_delta_log/_commit.state")
	lock := filelock.New(tmpPath, "_delta_log/_commit.lock", filelock.Options{})
	checkpointLock := filelock.New(tmpPath, "_delta_log/_checkpoint.lock", filelock.Options{})
	table := delta.NewTable(store, lock, state)
	metadata := delta.NewTableMetaData("Test Table", "test description", new(delta.Format).Default(), getSchema(), []string{}, make(map[string]string))
	err := table.Create(*metadata, new(delta.Protocol).Default(), delta.CommitInfo{}, []delta.Add{})

Commit an Add transaction to the _delta_log/

	add := delta.Add{
		Path:             fileName,
		Size:             p.Size,
		DataChange:       true,
		ModificationTime: time.Now().UnixMilli(),
		Stats:            string(stats.Json()),
		PartitionValues:  make(map[string]string),
	}
	transaction := table.CreateTransaction(delta.NewTransactionOptions())

	transaction.AddAction(add)
	operation := delta.Write{Mode: delta.Overwrite}
	appMetaData := make(map[string]any)
	appMetaData["test"] = 123

	transaction.SetAppMetadata(appMetaData)
	transaction.SetOperation(operation)
	v, err := transaction.Commit()

Read the data

Start a pyspark session

pyspark --packages io.delta:delta-core_2.12:2.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
df = spark.read.format("delta").load("table")
df.show()

Limitations / TODO

Checkpoints:

  • The checkpoint checksum is not being written or validated

Other:

  • Nested schemas (containing nested structs, arrays, or maps) are not supported
  • Deletion vectors are not supported
  • Table features are not supported
  • Change data files are not supported
  • CDC files are not supported
  • Add stats need to be manually generated instead of being read from the parquet file

Documentation

Overview

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Package delta contains the resources required to interact with a Delta table.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrActionJSONFormat is returned when there is an error reading actions from a commit log
	ErrActionJSONFormat error = errors.New("invalid format for action JSON")
	// ErrActionUnknown is returned when there is an unknown action in a commit log
	ErrActionUnknown error = errors.New("unknown action")
	// ErrAddZeroSize is returned when an add action has zero size
	ErrAddZeroSize error = errors.New("add size must not be zero to prevent optimize failures")
)
View Source
var (
	// ErrCheckpointAlreadyExists is returned when trying to create a checkpoint but it already exists
	ErrCheckpointAlreadyExists error = errors.New("checkpoint already exists")
	// ErrCheckpointRowCountMismatch is returned when the checkpoint is generated with a different row count
	// than expected from the table state.  This indicates an internal error.
	ErrCheckpointRowCountMismatch error = errors.New("checkpoint generated with unexpected row count")
	// ErrCheckpointIncomplete is returned when trying to read a multi-part checkpoint but not all parts exist
	ErrCheckpointIncomplete error = errors.New("checkpoint is missing parts")
	// ErrCheckpointInvalidMultipartFileName is returned when a multi-part checkpoint file has the wrong number of parts in the filename
	ErrCheckpointInvalidMultipartFileName error = errors.New("checkpoint file name is invalid")
	// ErrCheckpointAddZeroSize is returned if there is an Add action with size 0
	// because including this would cause subsequent Optimize operations to fail.
	ErrCheckpointAddZeroSize error = errors.New("zero size in add not allowed")
	// ErrCheckpointEntryMultipleActions is returned if a checkpoint entry has more than one non-null action
	ErrCheckpointEntryMultipleActions error = errors.New("checkpoint entry contains multiple actions")
	// ErrCheckpointOptimizationWorkingFolder is returned if there is a problem with the optimization working folder
	ErrCheckpointOptimizationWorkingFolder error = errors.New("error using checkpoint optimization working folder")
)
View Source
var (
	// ErrExceededCommitRetryAttempts is returned when the maximum number of commit retry attempts has been exceeded.
	ErrExceededCommitRetryAttempts error = errors.New("exceeded commit retry attempts")
	// ErrNotATable is returned when a Delta table is not valid.
	ErrNotATable error = errors.New("not a table")
	// ErrInvalidVersion is returned when a version is invalid.
	ErrInvalidVersion error = errors.New("invalid version")
	// ErrUnableToLoadVersion is returned when a version cannot be loaded.
	ErrUnableToLoadVersion error = errors.New("unable to load specified version")
	// ErrLockFailed is returned a lock fails unexpectedly.
	ErrLockFailed error = errors.New("lock failed unexpectedly without an error")
	// ErrNotImplemented is returned when a feature has not been implemented.
	ErrNotImplemented error = errors.New("not implemented")
	// ErrUnsupportedReaderVersion is returned when a reader version is unsupported.
	ErrUnsupportedReaderVersion error = errors.New("reader version is unsupported")
	// ErrUnsupportedWriterVersion is returned when a writer version is unsupported.
	ErrUnsupportedWriterVersion error = errors.New("writer version is unsupported")
	// ErrFailedToCopyTempFile is returned when a temp file fails to be copied into a commit URI.
	ErrFailedToCopyTempFile error = errors.New("failed to copy temp file")
	// ErrFailedToAcknowledgeCommit is returned when a commit fails to be acknowledged.
	ErrFailedToAcknowledgeCommit error = errors.New("failed to acknowledge commit")
)
View Source
var (
	// ErrMissingMetadata is returned if trying to create a checkpoint with no metadata
	ErrMissingMetadata error = errors.New("missing metadata")
	// ErrConvertingCheckpointAdd is returned if there is an error converting an Add action to checkpoint format
	ErrConvertingCheckpointAdd error = errors.New("unable to generate checkpoint add")
	// ErrCDCNotSupported is returned if a CDC action is seen when generating a checkpoint
	ErrCDCNotSupported error = errors.New("cdc is not supported")
	// ErrReadingCheckpoint is returned if there is an error reading a checkpoint
	ErrReadingCheckpoint error = errors.New("unable to read checkpoint")
	// ErrVersionOutOfOrder is returned if the versions are out of order when loading the table state
	// This would indicate an internal logic error
	ErrVersionOutOfOrder error = errors.New("versions out of order during update")
)
View Source
var (
	//ErrConfigValidation is returned when a Delta configuration cannot be validated.
	ErrConfigValidation = errors.New("error validating delta configuration")
)
View Source
var (
	// ErrParseSchema is returned when parsing the schema from JSON fails
	ErrParseSchema error = errors.New("unable to parse schema")
)

Functions

func BaseCommitURI

func BaseCommitURI() storage.Path

BaseCommitURI returns the base path of a commit URI.

func CommitOrCheckpointVersionFromURI

func CommitOrCheckpointVersionFromURI(path storage.Path) (bool, int64)

CommitOrCheckpointVersionFromURI returns true plus the version if the URI is a valid commit or checkpoint filename.

func CommitURIFromVersion

func CommitURIFromVersion(version int64) storage.Path

CommitURIFromVersion returns the URI of commit version.

func CommitVersionFromURI

func CommitVersionFromURI(path storage.Path) (bool, int64)

CommitVersionFromURI returns true plus the version if the URI is a valid commit filename.

func CreateCheckpoint

func CreateCheckpoint(store storage.ObjectStore, checkpointLock lock.Locker, checkpointConfiguration *CheckpointConfiguration, version int64) (checkpointed bool, err error)

CreateCheckpoint creates a checkpoint for a table located at the store for the given version If expired log cleanup is enabled on this table, then after a successful checkpoint, run the cleanup to delete expired logs Returns whether the checkpoint was created and any error If the lock cannot be obtained, does not retry - if other processes are checkpointing there's no need to duplicate the effort

func DoesCheckpointVersionExist

func DoesCheckpointVersionExist(store storage.ObjectStore, version int64, validateAllPartsExist bool) (bool, error)

DoesCheckpointVersionExist returns true if the given checkpoint version exists, either as a single- or multi-part checkpoint

func IsValidCommitOrCheckpointURI

func IsValidCommitOrCheckpointURI(path storage.Path) bool

IsValidCommitOrCheckpointURI returns true if a URI is a valid commit or checkpoint file name. Otherwise, it returns false.

func IsValidCommitURI

func IsValidCommitURI(path storage.Path) bool

IsValidCommitURI returns true if a URI is a valid commit filename (not a checkpoint file, and not a temp commit).

func LogEntryFromActions

func LogEntryFromActions(actions []Action) ([]byte, error)

LogEntryFromActions retrieves a log entry from a list of actions.

func UpdateStats

func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T)

UpdateStats computes Stats.NullCount, Stats.MinValues, Stats.MaxValues for a given k,v struct property the struct property is passed in as a pointer to ensure that it can be evaluated as nil[NULL] TODO Handle struct types

Types

type Action

type Action interface {
}

Action represents a Delta log action that describes a parquet data file part of the table.

func ActionsFromLogEntries

func ActionsFromLogEntries(logEntries []byte) ([]Action, error)

ActionsFromLogEntries retrieves all the actions from a log.

type ActionKey

type ActionKey string

ActionKey represents a Delta action.

const (
	// AddActionKey represents an Add action.
	AddActionKey ActionKey = "add"
	// RemoveActionKey represents a Remove action.
	RemoveActionKey ActionKey = "remove"
	// CommitInfoActionKey represents a CommitInfo action.
	CommitInfoActionKey ActionKey = "commitInfo"
	// ProtocolActionKey represents a Protocol action.
	ProtocolActionKey ActionKey = "protocol"
	// MetaDataActionKey represents a metaData action.
	MetaDataActionKey ActionKey = "metaData"
	// FormatActionKey represents a Format action.
	FormatActionKey ActionKey = "format"
	// TransactionActionKey represents a Txn action.
	TransactionActionKey ActionKey = "txn"
	// CDCActionKey represents a CDC action.
	CDCActionKey ActionKey = "cdc"
)

type Add

type Add struct {
	// A relative path, from the root of the table, to a file that should be added to the table
	Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"`
	// A map from partition column to value for this file
	// This field is required even without a partition.
	PartitionValues map[string]string `json:"partitionValues" parquet:"name=partitionValues, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"`
	// The size of this file in bytes
	Size int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"`
	// The time this file was created, as milliseconds since the epoch
	ModificationTime int64 `json:"modificationTime" parquet:"name=modificationTime, repetition=OPTIONAL"`
	// When false the file must already be present in the table or the records in the added file
	// must be contained in one or more remove actions in the same version
	//
	// streaming queries that are tailing the transaction log can use this flag to skip actions
	// that would not affect the final results.
	DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"`
	// Map containing metadata about this file
	Tags map[string]string `json:"tags,omitempty" parquet:"name=tags, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"`
	// Contains statistics (e.g., count, min/max values for columns) about the data in this file
	Stats string `json:"stats" parquet:"name=stats, repetition=OPTIONAL, converted=UTF8"`
}

An Add action is typed to allow the stats_parsed and partitionValues_parsed fields to be written to checkpoints with the correct schema without using reflection. The Add variant is for a non-partitioned table; the PartitionValuesParsed field will be omitted.

func NewAdd

func NewAdd(store storage.ObjectStore, location storage.Path, partitionValues map[string]string) (*Add, []string, error)

NewAdd returns a new Add action, using the given location and partition values The modification time will be set to now The size and stats will be retrieved from the parquet file at the given location It also returns a list of columns that did not have stats set in the parquet file

type CDC

type CDC struct {
	/// A relative path, from the root of the table, or an
	/// absolute path to a CDC file
	Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"`
	/// The size of this file in bytes
	Size int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"`
	/// A map from partition column to value for this file
	PartitionValues map[string]string `json:"partitionValues"`
	/// Should always be set to false because they do not change the underlying data of the table
	DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"`
	/// Map containing metadata about this file
	Tags *map[string]string `json:"tags,omitempty"`
}

CDC represents a CDC action.

type CheckPoint

type CheckPoint struct {
	/// Delta table version
	Version int64 `json:"version"`
	// The number of actions in the checkpoint. -1 if not available.
	Size int64 `json:"size"`
	// The number of parts if the checkpoint has multiple parts.  Omit if single part.
	Parts *int32 `json:"parts,omitempty"`
	// Size of the checkpoint in bytes
	SizeInBytes   int64 `json:"sizeInBytes"`
	NumOfAddFiles int64 `json:"numOfAddFiles"`
}

CheckPoint holds the metadata for a checkpoint file. This gets written out to _last_checkpoint.

type CheckpointConfiguration

type CheckpointConfiguration struct {
	// Maximum numbers of rows to include in each multi-part checkpoint part
	// Current default 50k
	MaxRowsPerPart int
	// Allow checkpointing even if the table reader version or writer version is greater than supported
	// by this client. Defaults to false.
	// **WARNING** If you set this to true and the table being checkpointed uses features that are not supported by this
	// client, the resulting checkpoint might fail unpredictably and silently; this could cause data loss or corruption
	UnsafeIgnoreUnsupportedReaderWriterVersionErrors bool
	// Disable any cleanup after checkpointing, even if it was enabled in the table configuration.
	// Defaults to false.
	DisableCleanup bool
	// Configure use of on-disk intermediate storage to reduce memory requirements
	ReadWriteConfiguration OptimizeCheckpointConfiguration
}

CheckpointConfiguration contains additional configuration for checkpointing

func NewCheckpointConfiguration

func NewCheckpointConfiguration() *CheckpointConfiguration

NewCheckpointConfiguration returns the default configuration for creating checkpoints

type CheckpointEntry

type CheckpointEntry struct {
	Txn      *Txn      `parquet:"name=txn"`
	Add      *Add      `parquet:"name=add"`
	Remove   *Remove   `parquet:"name=remove"`
	MetaData *MetaData `parquet:"name=metaData"`
	Protocol *Protocol `parquet:"name=protocol"`
	Cdc      *CDC      `parquet:"-"` // CDC not implemented yet
}

CheckpointEntry contains a single entry in the checkpoint Parquet file All but one of the pointers should be nil

type CommitInfo

type CommitInfo map[string]interface{}

CommitInfo represents a CommitInfo action.

type ConfigKey

type ConfigKey string

ConfigKey represents a Delta configuration.

const (
	// AppendOnlyDeltaConfigKey represents the Delta configuration to specify whethere a table is append-only.
	AppendOnlyDeltaConfigKey ConfigKey = "delta.appendOnly"
	// CheckpointIntervalDeltaConfigKey represents the Delta configuration to specify a checkpoint interval.
	CheckpointIntervalDeltaConfigKey ConfigKey = "delta.checkpointInterval"
	// AutoOptimizeAutoCompactDeltaConfigKey represents the Delta configuration to specify whether auto compaction needs to be enabled.
	AutoOptimizeAutoCompactDeltaConfigKey ConfigKey = "delta.autoOptimize.autoCompact"
	// AutoOptimizeOptimizeWriteDeltaConfigKey represents the Delta configuration to specify whether optimized writing needs to be enabled.
	AutoOptimizeOptimizeWriteDeltaConfigKey ConfigKey = "delta.autoOptimize.optimizeWrite"
	// CheckpointWriteStatsAsJSONDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a JSON object in a checkpoint.
	CheckpointWriteStatsAsJSONDeltaConfigKey ConfigKey = "delta.checkpoint.writeStatsAsJson"
	// CheckpointWriteStatsAsStructDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a struct in a checkpoint.
	CheckpointWriteStatsAsStructDeltaConfigKey ConfigKey = "delta.checkpoint.writeStatsAsStruct"
	// ColumnMappingModeDeltaConfigKey represents the Delta configuration to specify whether column mapping needs to be enabled.
	ColumnMappingModeDeltaConfigKey ConfigKey = "delta.columnMapping.mode"
	// DataSkippingNumIndexedColsDeltaConfigKey represents the Delta configuration to specify the number of columns for which to collect stats.
	DataSkippingNumIndexedColsDeltaConfigKey ConfigKey = "delta.dataSkippingNumIndexedCols"
	// DeletedFileRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a deleted file.
	DeletedFileRetentionDurationDeltaConfigKey ConfigKey = "delta.deletedFileRetentionDuration"
	// EnableChangeDataFeedDeltaConfigKey represents the Delta configuration to specify whether change data feed needs to be enabled.
	EnableChangeDataFeedDeltaConfigKey ConfigKey = "delta.enableChangeDataFeed"
	// IsolationLevelDeltaConfigKey represents the Delta configuration to specify what isolation level to use.
	IsolationLevelDeltaConfigKey ConfigKey = "delta.isolationLevel"
	// LogRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of commit logs.
	LogRetentionDurationDeltaConfigKey ConfigKey = "delta.logRetentionDuration"
	// EnableExpiredLogCleanupDeltaConfigKey represents the Delta configuration to specify whether expired commit logs need be cleaned up.
	EnableExpiredLogCleanupDeltaConfigKey ConfigKey = "delta.enableExpiredLogCleanup"
	// MinReaderVersionDeltaConfigKey represents the Delta configuration tp specify the minimum reader version.
	MinReaderVersionDeltaConfigKey ConfigKey = "delta.minReaderVersion"
	// MinWriterVersionDeltaConfigKey represents the Delta configuration to specify the minimum writer version.
	MinWriterVersionDeltaConfigKey ConfigKey = "delta.minWriterVersion"
	// RandomizeFilePrefixesDeltaConfigKey represents the Delta configuration to specify whether file prefixes should be randomized.
	RandomizeFilePrefixesDeltaConfigKey ConfigKey = "delta.randomizeFilePrefixes"
	// RandomPrefixLengthDeltaConfigKey represents the Delta configuration to specify the number of characters generated for random prefixes.
	RandomPrefixLengthDeltaConfigKey ConfigKey = "delta.randomPrefixLength"
	// SetTransactionRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a transaction.
	SetTransactionRetentionDurationDeltaConfigKey ConfigKey = "delta.setTransactionRetentionDuration"
	// TargetFileSizeDeltaConfigKey represents the Delta configuration to specify the target size of a file.
	TargetFileSizeDeltaConfigKey ConfigKey = "delta.targetFileSize"
	// TuneFileSizesForRewritesDeltaConfigKey represents the Delta configuration to specify whether file sizes need to be tuned for rewrites.
	TuneFileSizesForRewritesDeltaConfigKey ConfigKey = "delta.tuneFileSizesForRewrites"
)

type Create

type Create struct {
	/// The save mode used during the create.
	Mode SaveMode `json:"mode"`
	/// The storage location of the new table
	Location string `json:"location"`
	/// The min reader and writer protocol versions of the table
	Protocol Protocol
	/// Metadata associated with the new table
	MetaData TableMetaData
}

Create represents a Delta `Create` operation. Would usually only create the table, if also data is written, a `Write` operations is more appropriate

func (Create) GetCommitInfo

func (op Create) GetCommitInfo() CommitInfo

GetCommitInfo retrieves commit info.

type Format

type Format struct {
	/// Name of the encoding for files in this table.
	// Default: "parquet"
	Provider string `json:"provider" parquet:"name=provider, repetition=OPTIONAL, converted=UTF8"`
	/// A map containing configuration options for the format.
	// Default: {}
	Options map[string]string `json:"options" parquet:"name=options, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"`
}

Format describes the data format of files in the table. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#format-specification

func (*Format) Default

func (format *Format) Default() Format

Default provides the correct default format options as of Delta Lake 0.3.0 https://github.com/delta-io/delta/blob/master/PROTOCOL.md#format-specification As of Delta Lake 0.3.0, user-facing APIs only allow the creation of tables where format = 'parquet' and options = {}.

type GUID

type GUID string

GUID is a type alias for a string expected to match a GUID/UUID format.

type MetaData

type MetaData struct {
	/// Unique identifier for this table
	ID uuid.UUID `json:"id" parquet:"-"`
	/// Parquet library cannot import to UUID
	IDAsString string `json:"-" parquet:"name=id, repetition=OPTIONAL, converted=UTF8"`
	/// User-provided identifier for this table
	Name *string `json:"name" parquet:"name=name, repetition=OPTIONAL, converted=UTF8"`
	/// User-provided description for this table
	Description *string `json:"description" parquet:"name=description, repetition=OPTIONAL, converted=UTF8"`
	/// Specification of the encoding for the files stored in the table
	Format Format `json:"format" parquet:"name=format, repetition=OPTIONAL"`
	/// Schema of the table
	SchemaString string `json:"schemaString" parquet:"name=schemaString, repetition=OPTIONAL, converted=UTF8"`
	/// An array containing the names of columns by which the data should be partitioned
	PartitionColumns []string `json:"partitionColumns" parquet:"name=partitionColumns, repetition=OPTIONAL, valueconverted=UTF8"`
	/// A map containing configuration options for the table
	Configuration map[string]string `json:"configuration" parquet:"name=configuration, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"`
	/// The time when this metadata action is created, in milliseconds since the Unix epoch
	CreatedTime *int64 `json:"createdTime" parquet:"name=createdTime, repetition=OPTIONAL"`
}

MetaData represents the action that describes the metadata of the table. This is a top-level action in Delta log entries.

func (*MetaData) GetSchema

func (md *MetaData) GetSchema() (Schema, error)

GetSchema returns the table schema from the embedded schema string contained within the metadata action.

type OnDiskTableState

type OnDiskTableState struct {
	// contains filtered or unexported fields
}

OnDiskTableState contains information about the table state that is stored on disk instead of in memory

type Operation

type Operation interface {
	GetCommitInfo() CommitInfo
}

Operation represents the operation performed when creating a new log entry with one or more actions.

type OptimizeCheckpointConfiguration

type OptimizeCheckpointConfiguration struct {
	// Use an intermediate on-disk storage location to reduce memory
	OnDiskOptimization bool
	WorkingStore       storage.ObjectStore
	WorkingFolder      storage.Path
	// If these are > 1, checkpoint read and write operations will use this many goroutines
	ConcurrentCheckpointRead  int
	ConcurrentCheckpointWrite int
}

OptimizeCheckpointConfiguration holds settings for optimizing checkpoint read and write operations

func NewOptimizeCheckpointConfiguration

func NewOptimizeCheckpointConfiguration(store storage.ObjectStore, version int64) (*OptimizeCheckpointConfiguration, error)

NewOptimizeCheckpointConfiguration returns a default enabled optimization configuration with a working folder in the table store's _delta_log/.tmp/ folder but no concurrency enabled

type OutputMode

type OutputMode string

OutputMode represents the output mode used in streaming operations.

const (
	// AppendOutputMode causes only new rows to be written when new data is available.
	AppendOutputMode OutputMode = "Append"
	// Complete causes the full output (all rows) to be written whenever new data is available.
	Complete OutputMode = "Complete"
	// Update causes only rows with updates to be written when new or changed data is available.
	Update OutputMode = "Update"
)

type PreparedCommit

type PreparedCommit struct {
	URI storage.Path
}

PreparedCommit holds the URI of a temp commit.

type Protocol

type Protocol struct {
	/// Minimum version of the Delta read protocol a client must implement to correctly read the
	/// table.
	MinReaderVersion int32 `json:"minReaderVersion" parquet:"name=minReaderVersion, repetition=OPTIONAL"`
	/// Minimum version of the Delta write protocol a client must implement to correctly read the
	/// table.
	MinWriterVersion int32 `json:"minWriterVersion" parquet:"name=minWriterVersion, repetition=OPTIONAL"`
}

Protocol represents the action used to increase the version of the Delta protocol required to read or write to the table.

func (*Protocol) Default

func (protocol *Protocol) Default() Protocol

Default sets the minimum reader and writer version to 1.

type Remove

type Remove struct {
	/// The path of the file that is removed from the table.
	Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"`
	/// The timestamp when the remove was added to table state.
	DeletionTimestamp *int64 `json:"deletionTimestamp" parquet:"name=deletionTimestamp, repetition=OPTIONAL"`
	/// Whether data is changed by the remove. A table optimize will report this as false for
	/// example, since it adds and removes files by combining many files into one.
	DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"`
	/// When true the fields partitionValues, size, and tags are present
	///
	/// NOTE: Although it's defined as required in scala Delta implementation, but some writes
	/// it's still nullable so we keep it as Option<> for compatibly.
	ExtendedFileMetadata bool `json:"extendedFileMetadata" parquet:"name=extendedFileMetadata, repetition=OPTIONAL"`
	/// A map from partition column to value for this file.
	PartitionValues *map[string]string `json:"partitionValues" parquet:"name=partitionValues, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"`
	/// Size of this file in bytes
	Size *int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"`
	/// Map containing metadata about this file
	Tags *map[string]string `json:"tags" parquet:"-"`
}

Remove represents a tombstone (deleted file) in the Delta log. This is a top-level action in Delta log entries.

type SaveMode

type SaveMode string

SaveMode represents the save mode used when performing a Operation.

const (
	// Append causes files to be appended to the target location.
	Append SaveMode = "Append"
	// Overwrite causes a target location to be overwritten.
	Overwrite SaveMode = "Overwrite"
	// ErrorIfExists causes an operation to fail if files exist for the target.
	ErrorIfExists SaveMode = "ErrorIfExists"
	// Ignore causes an operation to not proceed or change any data if files exist for the target.
	Ignore SaveMode = "Ignore"
)

type Schema

type Schema = SchemaTypeStruct

Schema represents the schema of the Delta table.

type SchemaDataType

type SchemaDataType interface{}

SchemaDataType is one of: SchemaDataTypeName | SchemaTypeArray | SchemaTypeMap | SchemaTypeStruct We can't use a union constraint because the type is recursive

type SchemaDataTypeName

type SchemaDataTypeName string

SchemaDataTypeName contains the string .

const (
	// String is the schema data type representing a string.
	String SchemaDataTypeName = "string" //  * string: utf8
	// Long is the schema data type representing a long.
	Long SchemaDataTypeName = "long" //  * long  // undocumented, i64?
	// Integer is the schema data type representing an integer.
	Integer SchemaDataTypeName = "integer" //  * integer: i32
	// Short is the schema data type representing a short.
	Short SchemaDataTypeName = "short" //  * short: i16
	// Byte is the schema data type representing a byte.
	Byte SchemaDataTypeName = "byte" //  * byte: i8
	// Float is the schema data type representing a float.
	Float SchemaDataTypeName = "float" //  * float: f32
	// Double is the schema data type representing a double.
	Double SchemaDataTypeName = "double" //  * double: f64
	// Boolean is the schema data type representing a boolean.
	Boolean SchemaDataTypeName = "boolean" //  * boolean: bool
	// Binary is the schema data type representing a binary.
	Binary SchemaDataTypeName = "binary" //  * binary: a sequence of binary data
	// Date is the schema data type representing a date.
	Date SchemaDataTypeName = "date" //  * date: A calendar date, represented as a year-month-day triple without a timezone
	// Timestamp is the schema data type representing a timestamp.
	Timestamp SchemaDataTypeName = "timestamp" //  * timestamp: Microsecond precision timestamp without a timezone
	// Struct is the schema data type representing a struct.
	Struct SchemaDataTypeName = "struct" //  * struct:
	// Array is the schema data type representing an array.
	Array SchemaDataTypeName = "array" //  * array:
	// Map is the schema data type representing a map.
	Map SchemaDataTypeName = "map" //  * map:
	// Unknown is the schema data type representing an unknown.
	Unknown SchemaDataTypeName = "unknown"
)

type SchemaField

type SchemaField struct {
	// Name of this (possibly nested) column
	Name string         `json:"name"`
	Type SchemaDataType `json:"type"`
	// Boolean denoting whether this field can be null
	Nullable bool `json:"nullable"`
	// A JSON map containing information about this column. Keys prefixed with Delta are reserved
	// for the implementation.
	Metadata map[string]any `json:"metadata"`
}

SchemaField describes a specific field of the Delta table schema.

func (*SchemaField) UnmarshalJSON

func (s *SchemaField) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals a JSON object into a schema field.

type SchemaTypeArray

type SchemaTypeArray struct {
	Type         SchemaDataTypeName `json:"type"` // Has to be "array"
	ElementType  SchemaDataType     `json:"elementType"`
	ContainsNull bool               `json:"containsNull"`
}

SchemaTypeArray represents an array field

type SchemaTypeMap

type SchemaTypeMap struct {
	Type              SchemaDataTypeName `json:"type"` // Has to be "map"
	KeyType           SchemaDataType     `json:"keyType"`
	ValueType         SchemaDataType     `json:"valueType"`
	ValueContainsNull bool               `json:"valueContainsNull"`
}

SchemaTypeMap represents a map field

type SchemaTypeStruct

type SchemaTypeStruct struct {
	Type   SchemaDataTypeName `json:"type"` // Has to be "struct"
	Fields []SchemaField      `json:"fields"`
}

SchemaTypeStruct represents a struct in the schema

func GetSchema

func GetSchema(i any) SchemaTypeStruct

GetSchema recursively walks over the given struct interface i and extracts SchemaTypeStruct StructFields using reflect TODO: Handle error cases where types are not compatible with spark types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#schema-serialization-format i.e. Value int is not currently readable with spark.read.format("delta").load("...")

func (*SchemaTypeStruct) JSON

func (s *SchemaTypeStruct) JSON() []byte

JSON marshals a struct type field in a schema into a JSON object.

type Stats

type Stats struct {
	NumRecords  int64            `json:"numRecords" parquet:"name=numRecords, repetition=OPTIONAL"`
	TightBounds bool             `json:"tightBounds" parquet:"name=tightBounds, repetition=OPTIONAL"`
	MinValues   map[string]any   `json:"minValues" parquet:"name=minValues, repetition=OPTIONAL, keyconverted=UTF8"`
	MaxValues   map[string]any   `json:"maxValues" parquet:"name=maxValues, repetition=OPTIONAL, keyconverted=UTF8"`
	NullCount   map[string]int64 `json:"nullCount" parquet:"name=nullCount, repetition=OPTIONAL, keyconverted=UTF8, valuetype=INT64"`
}

Stats contains statistics about a Parquet file in an Add action

func StatsFromJSON

func StatsFromJSON(b []byte) (*Stats, error)

StatsFromJSON parses JSON into a Stats object

func StatsFromParquet

func StatsFromParquet(store storage.ObjectStore, add *Add) (*Stats, []string, error)

StatsFromParquet retrieves stats directly from the Parquet file in the Add action It does not currently support nested types, or logical types that can't be generated in Spark (UUID, interval, JSON, BSON) It also will not return stats for timestamps stored in int96 columns because the Parquet file won't have those stats

func (*Stats) JSON

func (s *Stats) JSON() []byte

JSON converts the stats into JSON

type StatsDecimal

type StatsDecimal string

StatsDecimal allows us to store decimal stats as a string and write to JSON without quotes

func (StatsDecimal) MarshalJSON

func (sd StatsDecimal) MarshalJSON() ([]byte, error)

MarshalJSON writes the decimal string without surrounding quotes

type StreamingUpdate

type StreamingUpdate struct {
	/// The output mode the streaming writer is using.
	OutputMode OutputMode
	/// The query id of the streaming writer.
	QueryID string
	/// The epoch id of the written micro-batch.
	EpochID int64
}

StreamingUpdate represents a Delta `StreamingUpdate` operation.

type Table

type Table struct {
	// The state of the table as of the most recent loaded Delta log entry.
	State TableState
	// The remote store of the state of the table as of the most recent loaded Delta log entry.
	StateStore state.Store
	// object store to access log and data files
	Store storage.ObjectStore
	// Locking client to ensure optimistic locked commits from distributed workers
	LockClient lock.Locker
	// file metadata for latest checkpoint
	LastCheckPoint *CheckPoint
	// table versions associated with timestamps
	VersionTimestamp map[int64]time.Time
	// Log store which provides multi-cluster write support
	LogStore logstore.LogStore
}

Table represents a Delta table.

func NewTable

func NewTable(store storage.ObjectStore, lock lock.Locker, stateStore state.Store) *Table

NewTable creates a new Table struct without loading any data from backing storage.

NOTE: This is for advanced users. If you don't know why you need to use this method, please call one of the `open_table` helper methods instead.

func NewTableWithLogStore

func NewTableWithLogStore(store storage.ObjectStore, lock lock.Locker, logStore logstore.LogStore) *Table

NewTableWithLogStore creates a new Table instance with a log store configured.

func OpenTable

func OpenTable(store storage.ObjectStore, lock lock.Locker, stateStore state.Store) (*Table, error)

OpenTable loads the latest version of the table If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned

func OpenTableWithConfiguration

func OpenTableWithConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, config *OptimizeCheckpointConfiguration) (*Table, error)

OpenTableWithConfiguration loads the latest version of the table, using the given configuration for optimization settings

func OpenTableWithVersion

func OpenTableWithVersion(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, version int64) (*Table, error)

OpenTableWithVersion loads the table at this specific version If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned

func OpenTableWithVersionAndConfiguration

func OpenTableWithVersionAndConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, version int64, config *OptimizeCheckpointConfiguration) (*Table, error)

OpenTableWithVersionAndConfiguration loads the table at this specific version using the given configuration for optimization settings

func (*Table) Create

func (t *Table) Create(metadata TableMetaData, protocol Protocol, commitInfo CommitInfo, addActions []Add) error

Create creates a Table with version 0 given the provided MetaData, Protocol, and CommitInfo. Note that if the protocol MinReaderVersion or MinWriterVersion is too high, the table will be created and then an error will be returned

func (*Table) CreateCheckpoint

func (t *Table) CreateCheckpoint(checkpointLock lock.Locker, checkpointConfiguration *CheckpointConfiguration, version int64) (bool, error)

CreateCheckpoint creates a checkpoint for this table at the given version The existing table state will not be used or modified; a new table instance will be opened at the checkpoint version Returns whether the checkpoint was created and any error If the lock cannot be obtained, does not retry

func (*Table) CreateTransaction

func (t *Table) CreateTransaction(opts TransactionOptions) *Transaction

CreateTransaction creates a new Transaction for the Table. The transaction holds a mutable reference to the Table, preventing other references until the transaction is dropped.

func (*Table) Exists

func (t *Table) Exists() (bool, error)

Exists checks if a Table with version 0 exists in the object store.

func (*Table) GetCheckpointDataPaths

func (t *Table) GetCheckpointDataPaths(checkpoint *CheckPoint) []storage.Path

GetCheckpointDataPaths returns the expected file path(s) for the given checkpoint Parquet files If it is a multi-part checkpoint then there will be one path for each part

func (*Table) LatestVersion

func (t *Table) LatestVersion() (int64, error)

LatestVersion gets the latest version of a table.

func (*Table) Load

func (t *Table) Load(config *OptimizeCheckpointConfiguration) error

Load loads the table state using the given configuration

func (*Table) LoadVersion

func (t *Table) LoadVersion(version *int64) error

LoadVersion loads the table state at the specified version using default configuration options

func (*Table) LoadVersionWithConfiguration

func (t *Table) LoadVersionWithConfiguration(version *int64, config *OptimizeCheckpointConfiguration) error

LoadVersionWithConfiguration loads the table state at the specified version using the given configuration

func (*Table) ReadCommitVersion

func (t *Table) ReadCommitVersion(version int64) ([]Action, error)

ReadCommitVersion retrieves the actions from a commit log.

type TableMetaData

type TableMetaData struct {
	// Unique identifier for this table
	ID uuid.UUID
	/// User-provided identifier for this table
	Name string
	/// User-provided description for this table
	Description string
	/// Specification of the encoding for the files stored in the table
	Format Format
	/// Schema of the table
	Schema Schema
	/// An array containing the names of columns by which the data should be partitioned
	PartitionColumns []string
	/// The time when this metadata action is created, in milliseconds since the Unix epoch
	CreatedTime time.Time
	/// table properties
	Configuration map[string]string
}

TableMetaData represents the metadata of a Delta table.

func NewTableMetaData

func NewTableMetaData(name string, description string, format Format, schema Schema, partitionColumns []string, configuration map[string]string) *TableMetaData

NewTableMetaData creates a new TableMetaData instance.

type TableState

type TableState struct {
	// current table version represented by this table state
	Version int64
	// A remove action should remain in the state of the table as a tombstone until it has expired.
	// A tombstone expires when the creation timestamp of the Delta file exceeds the expiration
	// This is empty if on-disk optimization is enabled
	Tombstones map[string]Remove
	// Active files for table state
	// This is empty if on-disk optimization is enabled
	Files map[string]Add
	// Information added to individual commits
	CommitInfos           []CommitInfo
	AppTransactionVersion map[string]int64
	MinReaderVersion      int32
	MinWriterVersion      int32
	// Table metadata corresponding to current version
	CurrentMetadata *TableMetaData
	// Retention period for tombstones as time.Duration (nanoseconds)
	TombstoneRetention time.Duration
	// Retention period for log entries as time.Duration (nanoseconds)
	LogRetention time.Duration
	// Expired log cleanup has not been thoroughly tested, so marking as experimental
	ExperimentalEnableExpiredLogCleanup bool

	OnDiskTableState
	// contains filtered or unexported fields
}

TableState maintains the current known state of a table This is used in reading and generating checkpoints If on-disk optimization is enabled, some of the information here is empty as the state is offloaded to disk to reduce memory use

func NewTableState

func NewTableState(version int64) *TableState

NewTableState creates an empty table state for the given version

func NewTableStateFromActions

func NewTableStateFromActions(actions []Action, version int64) (*TableState, error)

NewTableStateFromActions generates table state from a list of actions

func NewTableStateFromCommit

func NewTableStateFromCommit(table *Table, version int64) (*TableState, error)

NewTableStateFromCommit reads a specific commit version and returns the contained TableState

func (*TableState) FileCount

func (tableState *TableState) FileCount() int

FileCount returns the total number of Parquet files making up the table at the loaded version

func (*TableState) TombstoneCount

func (tableState *TableState) TombstoneCount() int

TombstoneCount returns the total number of tombstones (logically but not physically deleted files) in the table at the loaded version

type Transaction

type Transaction struct {
	Table       *Table
	Actions     []Action
	Operation   Operation
	AppMetadata map[string]any
	// contains filtered or unexported fields
}

Transaction represents a Delta transaction. Clients that do not need to mutate action content in case a Transaction conflict is encountered may use the `commit` method and rely on optimistic concurrency to determine the appropriate Delta version number for a commit. A good example of this type of client is an append only client that does not need to maintain Transaction state with external systems. Clients that may need to do conflict resolution if the Delta version changes should use the `prepare_commit` and `try_commit_transaction` methods and manage the Delta version themselves so that they can resolve data conflicts that may occur between Delta versions.

Please not that in case of non-retryable error the temporary commit file such as `_delta_log/_commit_<uuid>.json` will orphaned in storage.

func (*Transaction) AddAction

func (t *Transaction) AddAction(action Action)

AddAction adds an arbitrary "action" to the actions associated with this transaction.

func (*Transaction) AddActions

func (t *Transaction) AddActions(actions []Action)

AddActions adds an arbitrary number of actions to the actions associated with this transaction.

func (*Transaction) Commit

func (t *Transaction) Commit() (int64, error)

Commit commits the given actions to the Delta log. This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `TransactionOptions`.

func (*Transaction) CommitLogStore

func (t *Transaction) CommitLogStore() (int64, error)

CommitLogStore writes actions to a file.

To commit for Delta version N: - Step 0: Fail if N.json already exists in the file system. - Step 1: Ensure that N-1.json exists. If not, perform a recovery. - Step 2: PREPARE the commit.

  • Write the actions into temp file T(N).
  • Write uncompleted commit entry E(N, T(N)) with mutual exclusion to the log store.

- Step 3: COMMIT the commit to the Delta log.

  • Copy T(N) into N.json.

- Step 4: ACKNOWLEDGE the commit.

  • Overwrite and complete commit entry E in the log store.

func (*Transaction) ReadActions

func (t *Transaction) ReadActions(path storage.Path) ([]Action, error)

ReadActions gets actions from a file.

With many concurrent readers/writers, there's a chance that concurrent recovery operations occur on the same file, i.e. the same temp file T(N) is copied into the target N.json file more than once. Though data loss will *NOT* occur, readers of N.json may receive an error from S3 as the ETag of N.json was changed. This is safe to retry, so we do so here.

func (*Transaction) SetAppMetadata

func (t *Transaction) SetAppMetadata(appMetadata map[string]any)

SetAppMetadata sets the app metadata for this transaction.

func (*Transaction) SetOperation

func (t *Transaction) SetOperation(operation Operation)

SetOperation sets the Delta operation for this transaction.

type TransactionOptions

type TransactionOptions struct {
	// number of retry attempts allowed when committing a transaction
	MaxRetryCommitAttempts uint32
	// RetryWaitDuration sets the amount of times between retry's on the transaction
	RetryWaitDuration time.Duration
	// Number of retry attempts allowed when reading actions from a log entry
	MaxRetryReadAttempts uint16
	// Number of retry attempts allowed when writing actions to a log entry
	MaxRetryWriteAttempts uint32
	// number of retry commit attempts before loading the latest version from the table rather
	// than using the state store
	RetryCommitAttemptsBeforeLoadingTable uint32
	// Number of retry attempts allowed when fixing the Delta log
	MaxRetryLogFixAttempts uint16
}

TransactionOptions customizes the behavior of a transaction.

func NewTransactionOptions

func NewTransactionOptions() TransactionOptions

NewTransactionOptions sets the default transaction options.

type Txn

type Txn struct {
	/// A unique identifier for the application performing the transaction.
	AppID string `json:"appId" parquet:"name=appId, repetition=OPTIONAL, converted=UTF8"`
	/// An application-specific numeric identifier for this transaction.
	Version int64 `json:"version" parquet:"name=version, repetition=OPTIONAL"`
	/// The time when this transaction action was created in milliseconds since the Unix epoch.
	LastUpdated *int64 `json:"-" parquet:"name=lastUpdated, repetition=OPTIONAL"`
}

Txn represents the action used by streaming systems to track progress using application-specific versions to enable idempotency.

type Write

type Write struct {
	/// The save mode used during the write.
	Mode SaveMode `json:"mode"`
	/// The columns the write is partitioned by.
	PartitionBy []string `json:"partitionBy"`
	/// The predicate used during the write.
	Predicate []string `json:"predicate"`
}

Write represents a Delta `Write` operation. Write operations will typically only include `Add` actions.

func (Write) GetCommitInfo

func (op Write) GetCommitInfo() CommitInfo

GetCommitInfo retrieves commit info.

Directories

Path Synopsis
examples
internal
dynamodbutils
Package dynamodbutils implements utilities used to interact with DynamoDB.
Package dynamodbutils implements utilities used to interact with DynamoDB.
s3utils
Package s3utils implements utilities used to interact with S3.
Package s3utils implements utilities used to interact with S3.
Package lock contains the resources required to create a lock.
Package lock contains the resources required to create a lock.
dynamolock
Package dynamolock contains the resources required a create a DynamoDB lock.
Package dynamolock contains the resources required a create a DynamoDB lock.
filelock
Package filelock provides the resources required to create a file lock.
Package filelock provides the resources required to create a file lock.
nillock
Package nillock contains the resources required to create a nil lock.
Package nillock contains the resources required to create a nil lock.
redislock
Package redislock contains the resources required a create a Redis lock.
Package redislock contains the resources required a create a Redis lock.
Package logstore contains the resources required to create a log store.
Package logstore contains the resources required to create a log store.
dynamodblogstore
Package dynamodblogstore contains the resources required to create a DynamoDB log store.
Package dynamodblogstore contains the resources required to create a DynamoDB log store.
Package state contains the resources required to create a state store.
Package state contains the resources required to create a state store.
dynamostate
Package dynamostate contains the resources required to create a DynamoDB state store.
Package dynamostate contains the resources required to create a DynamoDB state store.
filestate
Package filestate contains the resources required to create a file state store.
Package filestate contains the resources required to create a file state store.
localstate
Package localstate contains the resources required to create a localstate.
Package localstate contains the resources required to create a localstate.
redisstate
Package redisstate contains the resources required to create a Redis state store.
Package redisstate contains the resources required to create a Redis state store.
Package storage contains the resources required to interact with an object store.
Package storage contains the resources required to interact with an object store.
filestore
Package filestore contains the resources required to interact with an file store.
Package filestore contains the resources required to interact with an file store.
s3store
Package s3store contains the resources required to interact with an S3 store.
Package s3store contains the resources required to interact with an S3 store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL