destination

package
v3.10.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2023 License: MPL-2.0 Imports: 20 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PluginTestSuiteRunner

func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testOptions ...func(o *PluginTestSuiteRunnerOptions))

func RecordDiff

func RecordDiff(l, r arrow.Record) string

func WithTestIgnoreNullsInLists added in v3.6.0

func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceAllowNull added in v3.7.0

func WithTestSourceAllowNull(allowNull func(arrow.DataType) bool) func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipDates added in v3.6.0

func WithTestSourceSkipDates() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipDecimals added in v3.7.0

func WithTestSourceSkipDecimals() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipDurations added in v3.6.0

func WithTestSourceSkipDurations() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipIntervals added in v3.6.0

func WithTestSourceSkipIntervals() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipLargeTypes added in v3.6.0

func WithTestSourceSkipLargeTypes() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipLists added in v3.6.0

func WithTestSourceSkipLists() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipMaps added in v3.6.0

func WithTestSourceSkipMaps() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipStructs added in v3.6.0

func WithTestSourceSkipStructs() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipTimes added in v3.6.0

func WithTestSourceSkipTimes() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceSkipTimestamps added in v3.6.0

func WithTestSourceSkipTimestamps() func(o *PluginTestSuiteRunnerOptions)

func WithTestSourceTimePrecision added in v3.6.0

func WithTestSourceTimePrecision(precision time.Duration) func(o *PluginTestSuiteRunnerOptions)

Types

type AllowNullFunc added in v3.7.0

type AllowNullFunc func(arrow.DataType) bool

type Client

type Client interface {
	Migrate(ctx context.Context, tables schema.Tables) error
	Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error
	ManagedWriter
	UnmanagedWriter
	DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
	Close(ctx context.Context) error
}

type ClientResource

type ClientResource struct {
	TableName string
	Data      []any
}

type ManagedWriter

type ManagedWriter interface {
	WriteTableBatch(ctx context.Context, table *schema.Table, data []arrow.Record) error
}

type Metrics

type Metrics struct {
	// Errors number of errors / failed writes
	Errors uint64
	// Writes number of successful writes
	Writes uint64
}

type MigrateStrategy

type MigrateStrategy struct {
	AddColumn           specs.MigrateMode
	AddColumnNotNull    specs.MigrateMode
	RemoveColumn        specs.MigrateMode
	RemoveColumnNotNull specs.MigrateMode
	ChangeColumn        specs.MigrateMode
}

MigrateStrategy defines which tests we should include

type NewClientFunc

type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error)

type NewPluginFunc

type NewPluginFunc func() *Plugin

type Option

type Option func(*Plugin)

func WithBatchTimeout

func WithBatchTimeout(seconds int) Option

func WithDefaultBatchSize

func WithDefaultBatchSize(defaultBatchSize int) Option

func WithDefaultBatchSizeBytes

func WithDefaultBatchSizeBytes(defaultBatchSizeBytes int) Option

func WithManagedWriter

func WithManagedWriter() Option

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func NewPlugin

func NewPlugin(name string, version string, newClientFunc NewClientFunc, opts ...Option) *Plugin

NewPlugin creates a new destination plugin

func (*Plugin) Close

func (p *Plugin) Close(ctx context.Context) error

func (*Plugin) DeleteStale

func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error

func (*Plugin) Init

func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error

we need lazy loading because we want to be able to initialize after

func (*Plugin) Metrics

func (p *Plugin) Metrics() Metrics

func (*Plugin) Migrate

func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error

we implement all DestinationClient functions so we can hook into pre-post behavior

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Read

func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error

func (*Plugin) Version

func (p *Plugin) Version() string

func (*Plugin) Write

func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan arrow.Record) error

type PluginTestSuite

type PluginTestSuite struct {
	// contains filtered or unexported fields
}

type PluginTestSuiteRunnerOptions added in v3.6.0

type PluginTestSuiteRunnerOptions struct {
	// IgnoreNullsInLists allows stripping null values from lists before comparison.
	// Destination setups that don't support nulls in lists should set this to true.
	IgnoreNullsInLists bool

	// AllowNull is a custom func to determine whether a data type may be correctly represented as null.
	// Destinations that have problems representing some data types should provide a custom implementation here.
	// If this param is empty, the default is to allow all data types to be nullable.
	// When the value returned by this func is `true` the comparison is made with the empty value instead of null.
	AllowNull AllowNullFunc

	schema.TestSourceOptions
}

type PluginTestSuiteTests

type PluginTestSuiteTests struct {
	// SkipOverwrite skips testing for "overwrite" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipOverwrite bool

	// SkipDeleteStale skips testing "delete-stale" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipDeleteStale bool

	// SkipAppend skips testing for "append" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipAppend bool

	// SkipSecondAppend skips the second append step in the test.
	// This is useful in cases like cloud storage where you can't append to an
	// existing object after the file has been closed.
	SkipSecondAppend bool

	// SkipMigrateAppend skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data appended, checking that the migrations handle
	// this correctly.
	SkipMigrateAppend bool
	// SkipMigrateAppendForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateAppendForce bool

	// SkipMigrateOverwrite skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data overwritten, checking that the migrations handle
	// this correctly.
	SkipMigrateOverwrite bool
	// SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateOverwriteForce bool

	MigrateStrategyOverwrite MigrateStrategy
	MigrateStrategyAppend    MigrateStrategy
}

type UnimplementedManagedWriter

type UnimplementedManagedWriter struct{}

func (UnimplementedManagedWriter) WriteTableBatch

type UnimplementedUnmanagedWriter

type UnimplementedUnmanagedWriter struct{}

func (UnimplementedUnmanagedWriter) Metrics

func (UnimplementedUnmanagedWriter) Write

type UnmanagedWriter

type UnmanagedWriter interface {
	Write(ctx context.Context, tables schema.Tables, res <-chan arrow.Record) error
	Metrics() Metrics
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL