destination

package
v1.45.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2023 License: MPL-2.0 Imports: 18 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PluginTestSuiteRunner

func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests)

func SetDestinationManagedCqColumns

func SetDestinationManagedCqColumns(tables []*schema.Table)

Overwrites or adds the CQ columns that are managed by the destination plugins (_cq_sync_time, _cq_source_name).

Types

type Client

type Client interface {
	schema.CQTypeTransformer
	ReverseTransformValues(table *schema.Table, values []any) (schema.CQTypes, error)
	Migrate(ctx context.Context, tables schema.Tables) error
	Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []any) error
	ManagedWriter
	UnmanagedWriter
	DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
	Close(ctx context.Context) error
}

type ClientResource

type ClientResource struct {
	TableName string
	Data      []any
}

type DefaultReverseTransformer

type DefaultReverseTransformer struct {
}

func (*DefaultReverseTransformer) ReverseTransformValues

func (*DefaultReverseTransformer) ReverseTransformValues(table *schema.Table, values []any) (schema.CQTypes, error)

DefaultReverseTransformer tries best effort to convert a slice of values to CQTypes based on the provided table columns.

type ManagedWriter added in v1.13.0

type ManagedWriter interface {
	WriteTableBatch(ctx context.Context, table *schema.Table, data [][]any) error
}

type Metrics

type Metrics struct {
	// Errors number of errors / failed writes
	Errors uint64
	// Writes number of successful writes
	Writes uint64
}

type MigrateStrategy added in v1.38.0

type MigrateStrategy struct {
	AddColumn           specs.MigrateMode
	AddColumnNotNull    specs.MigrateMode
	RemoveColumn        specs.MigrateMode
	RemoveColumnNotNull specs.MigrateMode
	ChangeColumn        specs.MigrateMode
}

MigrateStrategy defines which tests we should include

type NewClientFunc

type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error)

type NewPluginFunc added in v1.36.3

type NewPluginFunc func() *Plugin

type Option added in v1.13.0

type Option func(*Plugin)

func WithBatchTimeout added in v1.13.0

func WithBatchTimeout(seconds int) Option

func WithDefaultBatchSize added in v1.16.0

func WithDefaultBatchSize(defaultBatchSize int) Option

func WithDefaultBatchSizeBytes added in v1.23.0

func WithDefaultBatchSizeBytes(defaultBatchSizeBytes int) Option

func WithManagedWriter added in v1.13.1

func WithManagedWriter() Option

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func NewPlugin added in v1.12.7

func NewPlugin(name string, version string, newClientFunc NewClientFunc, opts ...Option) *Plugin

NewPlugin creates a new destination plugin

func (*Plugin) Close

func (p *Plugin) Close(ctx context.Context) error

func (*Plugin) DeleteStale

func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error

func (*Plugin) Init

func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error

we need lazy loading because we want to be able to initialize after

func (*Plugin) Metrics

func (p *Plugin) Metrics() Metrics

func (*Plugin) Migrate

func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error

we implement all DestinationClient functions so we can hook into pre-post behavior

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Read

func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error

func (*Plugin) Version

func (p *Plugin) Version() string

func (*Plugin) Write

func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error

type PluginTestSuite added in v1.13.0

type PluginTestSuite struct {
	// contains filtered or unexported fields
}

type PluginTestSuiteTests added in v1.13.0

type PluginTestSuiteTests struct {
	// SkipOverwrite skips testing for "overwrite" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipOverwrite bool

	// SkipDeleteStale skips testing "delete-stale" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipDeleteStale bool

	// SkipAppend skips testing for "append" mode. Use if the destination
	// plugin doesn't support this feature.
	SkipAppend bool

	// SkipSecondAppend skips the second append step in the test.
	// This is useful in cases like cloud storage where you can't append to an
	// existing object after the file has been closed.
	SkipSecondAppend bool

	// SkipMigrateAppend skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data appended, checking that the migrations handle
	// this correctly.
	SkipMigrateAppend bool
	// SkipMigrateAppendForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateAppendForce bool

	// SkipMigrateOverwrite skips a test for the migrate function where a column is added,
	// data is appended, then the column is removed and more data overwritten, checking that the migrations handle
	// this correctly.
	SkipMigrateOverwrite bool
	// SkipMigrateOverwriteForce skips a test for the migrate function where a column is changed in force mode
	SkipMigrateOverwriteForce bool

	MigrateStrategyOverwrite MigrateStrategy
	MigrateStrategyAppend    MigrateStrategy
}

type UnimplementedManagedWriter added in v1.13.0

type UnimplementedManagedWriter struct{}

func (*UnimplementedManagedWriter) WriteTableBatch added in v1.13.0

func (*UnimplementedManagedWriter) WriteTableBatch(context.Context, *schema.Table, [][]any) error

type UnimplementedUnmanagedWriter added in v1.13.0

type UnimplementedUnmanagedWriter struct{}

func (*UnimplementedUnmanagedWriter) Metrics added in v1.13.0

func (*UnimplementedUnmanagedWriter) Write added in v1.13.0

type UnmanagedWriter added in v1.13.0

type UnmanagedWriter interface {
	Write(ctx context.Context, tables schema.Tables, res <-chan *ClientResource) error
	Metrics() Metrics
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL