sdk

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 32 Imported by: 297

README

Conduit Connector SDK

License Build Go Report Card Go Reference

This repository contains the Go software development kit for implementing a connector for Conduit. If you want to implement a connector in another language please have a look at the connector protocol.

Quickstart

Create a new folder and initialize a fresh go module:

go mod init example.com/conduit-connector-demo

Add the connector SDK dependency:

go get github.com/conduitio/conduit-connector-sdk

With this you can start implementing the connector. To implement a source (a connector that reads from a 3rd party resource and sends data to Conduit) create a struct that implements sdk.Source. To implement a destination (a connector that receives data from Conduit and writes it to a 3rd party resource) create a struct that implements sdk.Destination. You can implement both to make a connector that can be used both as a source or a destination.

Apart from the source and/or destination you should create a global variable of type sdk.Connector that contains references to constructors for sdk.Source, sdk.Destination and sdk.Specification.

The last part is the entrypoint, it needs to call sdk.Serve and pass in the connector mentioned above.

package main

import (
	demo "example.com/conduit-connector-demo"
	sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
	sdk.Serve(demo.Connector)
}

Now you can build the standalone connector:

go build path/to/main.go

You will get a compiled binary which Conduit can use as a connector. To run your connector as part of a Conduit pipeline you can create it using the connectors API and specify the path to the compiled connector binary in the field plugin.

Here is an example request to POST /v1/connectors (find more about the Conduit API):

{
  "type": "TYPE_SOURCE",
  "plugin": "/path/to/compiled/connector/binary",
  "pipelineId": "...",
  "config": {
    "name": "my-connector",
    "settings": {
      "my-key": "my-value"
    }
  }
}

Find out more information on building a connector in the Go doc reference.

FAQ

Q: How to identify the source from which a record originated?

A connector can use whatever means available to associate a record with the originating source. However, to promote compatibility between connectors, we highly recommend that a record's metadata is used to indicate from which collection[^1] a record originated.

The metadata key to be used is opencdc.collection, which can be accessed through the sdk.MetadataCollection constant.

For example, if a record was read from a database table called employees, it should have the following in its metadata:

{
  "opencdc.collection": "employees",
  // other metadata
}

Additionally, Conduit automatically adds the following metadata to each record:

  • conduit.source.plugin.name: the source plugin that created a record
  • conduit.source.plugin.version: version of the source plugin that created this record

More information about metadata in OpenCDC records can be found here.

Q: If a destination connector is able to write to multiple tables (topics, collections, indexes, etc.), how should a record be routed to the correct destination?

Similarly to above, we recommend that the metadata key "opencdc.collection" is used.

For example, if a record has the metadata field "opencdc.collection" set to employees, then the PostgreSQL destination connector will write it to the employees table.

Q: Is there a standard format for errors?

Conduit doesn't expect any specific error format. We still encourage developers to follow the conventional error message formatting and include enough contextual information to make debugging as easy as possible (e.g. stack trace, information about the value that caused the error, internal state).

Q: Is there a standard format for logging?

Developers should use sdk.Logger to retrieve a *zerolog.Logger instance. It can be used to emit structured and leveled log messages that will be included in Conduit logs.

Keep in mind that logging in the hot path (e.g. reading or writing a record) can have a negative impact on performance and should be avoided. If you really want to add a log message in the hot path please use the "trace" level.

Q: How do I enable logging in my tests?

By default, logging calls made using the sdk.Logger in your tests will not produce any output. To enable logging while running your connector tests or debugging, you need to pass a custom context with a zerolog logger attached:

func TestFoo(t *testing.T) {
	logger := zerolog.New(zerolog.NewTestWriter(t))
	ctx := logger.WithContext(context.Background())

	// pass ctx to connector functions ...
}

Q: Do I need to worry about ordering?

In case of the destination connector you do not have to worry about ordering. Conduit will supply records one by one in the order they were produced in the source.

On the other hand, the source connector is in charge of producing records and thus dictates the order. That said, you do not have to worry about concurrent reads, the SDK will call Source.Read repeatedly and only in one goroutine, all you have to do is return one record at a time.

Examples

For examples of simple connectors you can look at existing connectors like conduit-connector-generator or conduit-connector-file.

[^1]: Collection is a generic term used in Conduit to describe an entity in a 3rd party system from which records are read from or to which records they are written to. Examples are: topics (in Kafka), tables (in a database), indexes (in a search engine), and collections (in NoSQL databases).

Documentation

Overview

Package sdk implements utilities for implementing a Conduit connector.

Getting started

Conduit connectors can be thought of as the edges of a Conduit pipeline. They are responsible for reading records from and writing records to third party systems. Conduit uses connectors as plugins that hide the intricacies of working with a particular third party system, so that Conduit itself can focus on efficiently processing records and moving them safely from sources to destinations.

To implement a connector, start by defining a global variable of type Connector, preferably in connector.go at the root of your project to make it easy to discover.

var Connector = sdk.Connector {
    NewSpecification: Specification,  // Specification is my connector's specification
    NewSource:        NewSource,      // NewSource is the constructor for my source
    NewDestination:   NewDestination, // NewDestination is the constructor for my destination
}

Connector will be used as the starting point for accessing three main connector components that you need to provide:

  • Specification contains general information about the plugin like its name and what it does. Writing a specification is relatively simple and straightforward, for more info check the corresponding field docs of Specification.
  • Source is the connector part that knows how to fetch data from the third party system and convert it to a Record.
  • Destination is the connector part that knows how to write a Record to the third party system.

General advice for implementing connectors:

  • The SDK provides a structured logger that can be retrieved with Logger. It allows you to create structured and leveled output that will be included as part of the Conduit logs.
  • If you want to add logging to the hot path (i.e. code that is executed for every record that is read or written) you should use the log level "trace", otherwise it can greatly impact the performance of your connector.

Source

A Source is responsible for continuously reading data from a third party system and returning it in form of a Record.

Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Source implementations.

type Source struct {
  sdk.UnimplementedSource
}

You need to implement the functions required by Source and provide your own implementations. Please look at the documentation of Source for further information about individual functions.

You should also create a constructor function for your source struct. Note that this is the same function that should be set as the value of Connector.NewSource. The constructor should be used to wrap your source in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.

func NewSource() sdk.Source {
  return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}

Additional tips for implementing a source:

  • The SDK provides utilities for certain operations like creating records in SourceUtil. You can access it through the global variable Util.Source.
  • The function Source.Ack is optional and does not have to be implemented.
  • Source is responsible for creating record positions that should ideally uniquely identify a record. Think carefully about what you will store in the position, it should give the source enough information to resume reading records at that specific position.
  • The SDK provides acceptance tests, if your source doesn't pass it means your implementation has a bug¹.

Destination

A Destination is responsible for writing Record to third party systems.

Every Destination implementation needs to include an UnimplementedDestination to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Destination implementations.

type Destination struct {
  sdk.UnimplementedSource
}

You need to implement the functions required by Destination and provide your own implementations. Please look at the documentation of Destination for further information about individual functions.

You should also create a constructor function for your destination struct. Note that this is the same function that should be set as the value of Connector.NewDestination. The constructor should be used to wrap your destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.

func NewDestination() sdk.Destination {
  return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

Additional tips for implementing a destination:

  • The SDK provides utilities for certain operations like routing records based on their operation in DestinationUtil. You can access it through the global variable Util.Destination.
  • If your destination writes records as a whole to the destination you should use Record.Bytes to get the raw record representation.
  • If possible, make your destination writes idempotent. It is possible that the destination will receive the same record twice after a pipeline restart.
  • Some sources won't be able to distinguish create and update operations. In case your destination is updating data in place, we recommend to upsert the record on a create or update.
  • The SDK provides acceptance tests, if your destination doesn't pass it means your implementation has a bug¹.

Acceptance tests

The SDK provides acceptance tests that can be run in a simple Go test.¹

To run acceptance tests you should create a test file, preferably named acceptance_test.go at the root of your project to make it easy to discover. Inside create a Go test where you trigger the function AcceptanceTest.

func TestAcceptance(t *testing.T) {
  // set up dependencies here
  sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{
    Config: sdk.ConfigurableAcceptanceTestDriverConfig{
      Connector: Connector, // Connector is the global variable from your connector
      SourceConfig: map[string]string{ … },
      DestinationConfig: map[string]string{ … },
    },
  }
}

AcceptanceTest uses the AcceptanceTestDriver for certain operations. The SDK already provides a default implementation for the driver with ConfigurableAcceptanceTestDriver, although you can supply your own implementation if you need to adjust the behavior of acceptance tests for your connector.

Some acceptance tests will try to write data using the destination and then read the same data using the source. Because of that you need to make sure that the configurations point both to the same exact data store (e.g. in case of the file connector the source and destination need to read and write to the same file).

If your connector does not implement both sides of the connector (a source and a destination) you will need to write a custom driver that knows how to read or write, depending on which side of the connector is not implemented. Here is an example how to do that:

type CustomAcceptanceTestDriver struct {
  sdk.ConfigurableAcceptanceTestDriver
}
func (d *CustomAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record {
  // implement read
}
func (d *CustomAcceptanceTestDriver) WriteToSource(t *testing.T, records []sdk.Record) []sdk.Record {
  // implement write
}

For more information about what behavior can be customized please refer to the AcceptanceTestDriver interface.

¹Acceptance tests are currently still experimental.

Index

Constants

View Source
const (
	// MetadataOpenCDCVersion is a Record.Metadata key for the version of the
	// OpenCDC format (e.g. "v1"). This field exists to ensure the OpenCDC
	// format version can be easily identified in case the record gets marshaled
	// into a different untyped format (e.g. JSON).
	MetadataOpenCDCVersion = cpluginv1.MetadataOpenCDCVersion

	// MetadataCreatedAt is a Record.Metadata key for the time when the record
	// was created in the 3rd party system. The expected format is a unix
	// timestamp in nanoseconds.
	MetadataCreatedAt = cpluginv1.MetadataCreatedAt
	// MetadataReadAt is a Record.Metadata key for the time when the record was
	// read from the 3rd party system. The expected format is a unix timestamp
	// in nanoseconds.
	MetadataReadAt = cpluginv1.MetadataReadAt
	// MetadataCollection is a Record.Metadata key for the name of the collection
	// where the record originated from and/or where it should be stored.
	MetadataCollection = cpluginv1.MetadataCollection

	// MetadataConduitSourcePluginName is a Record.Metadata key for the name of
	// the source plugin that created this record.
	MetadataConduitSourcePluginName = cpluginv1.MetadataConduitSourcePluginName
	// MetadataConduitSourcePluginVersion is a Record.Metadata key for the
	// version of the source plugin that created this record.
	MetadataConduitSourcePluginVersion = cpluginv1.MetadataConduitSourcePluginVersion
	// MetadataConduitDestinationPluginName is a Record.Metadata key for the
	// name of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginName = cpluginv1.MetadataConduitDestinationPluginName
	// MetadataConduitDestinationPluginVersion is a Record.Metadata key for the
	// version of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginVersion = cpluginv1.MetadataConduitDestinationPluginVersion

	// MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of
	// the source connector that produced this record.
	MetadataConduitSourceConnectorID = cpluginv1.MetadataConduitSourceConnectorID
	// MetadataConduitDLQNackError is a Record.Metadata key for the error that
	// caused a record to be nacked and pushed to the dead-letter queue.
	MetadataConduitDLQNackError = cpluginv1.MetadataConduitDLQNackError
	// MetadataConduitDLQNackNodeID is a Record.Metadata key for the ID of the
	// internal node that nacked the record.
	MetadataConduitDLQNackNodeID = cpluginv1.MetadataConduitDLQNackNodeID
)

Variables

View Source
var (
	// ErrBackoffRetry can be returned by Source.Read to signal the SDK there is
	// no record to fetch right now and it should try again later.
	ErrBackoffRetry = errors.New("backoff retry")

	// ErrUnimplemented is returned in functions of plugins that don't implement
	// a certain method.
	ErrUnimplemented = errors.New("the connector plugin does not implement this action, please check the source code of the connector and make sure all required connector methods are implemented")

	// ErrMetadataFieldNotFound is returned in metadata utility functions when a
	// metadata field is not found.
	ErrMetadataFieldNotFound = errors.New("metadata field not found")
)
View Source
var (
	ErrUnrecognizedParameter    = config.ErrUnrecognizedParameter
	ErrInvalidParameterValue    = config.ErrInvalidParameterValue
	ErrInvalidParameterType     = config.ErrInvalidParameterType
	ErrInvalidValidationType    = config.ErrInvalidValidationType
	ErrRequiredParameterMissing = config.ErrRequiredParameterMissing

	ErrLessThanValidationFail    = config.ErrLessThanValidationFail
	ErrGreaterThanValidationFail = config.ErrGreaterThanValidationFail
	ErrInclusionValidationFail   = config.ErrInclusionValidationFail
	ErrExclusionValidationFail   = config.ErrExclusionValidationFail
	ErrRegexValidationFail       = config.ErrRegexValidationFail
)
View Source
var Util = struct {
	// SourceUtil provides utility methods for implementing a source.
	Source SourceUtil
	// SourceUtil provides utility methods for implementing a destination.
	Destination DestinationUtil
	// ParseConfig provided to parse a config map into a struct
	// Under the hood, this function uses the library mitchellh/mapstructure, with the "mapstructure" tag renamed to "json",
	// so to rename a key, use the "json" tag and set a value directly. To embed structs, append ",squash" to your tag.
	// for more details and docs, check https://pkg.go.dev/github.com/mitchellh/mapstructure
	ParseConfig func(map[string]string, interface{}) error
}{
	ParseConfig: parseConfig,
}

Util provides utilities for implementing connectors.

Functions

func AcceptanceTest added in v0.3.0

func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)

AcceptanceTest is the acceptance test that all connector implementations should pass. It should manually be called from a test case in each implementation:

func TestAcceptance(t *testing.T) {
    // set up test dependencies ...
    sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{
        Config: sdk.ConfigurableAcceptanceTestDriverConfig{
            Connector: myConnector,
            SourceConfig: map[string]string{...},      // valid source config
            DestinationConfig: map[string]string{...}, // valid destination config
        },
    })
}

func BenchmarkSource added in v0.9.0

func BenchmarkSource(
	b *testing.B,
	s Source,
	cfg map[string]string,
)

BenchmarkSource is a benchmark that any source implementation can run to figure out its performance. The benchmark expects that the source resource contains at least b.N number of records. This should be prepared before the benchmark is executed. The function should be manually called from a benchmark function:

func BenchmarkConnector(b *testing.B) {
    // set up test dependencies and write b.N records to source resource ...
    sdk.BenchmarkSource(
        b,
        mySourceConnector,
        map[string]string{...}, // valid source config
    )
}

The benchmark can be run with a specific number of records by supplying the option -benchtime=Nx, where N is the number of records to be benchmarked (e.g. -benchtime=100x benchmarks reading 100 records).

func Logger

func Logger(ctx context.Context) *zerolog.Logger

Logger returns an instance of a logger that can be used for leveled and structured logging in a plugin. The logger will respect the log level configured in Conduit.

func NewDestinationPlugin

func NewDestinationPlugin(impl Destination) cpluginv1.DestinationPlugin

NewDestinationPlugin takes a Destination and wraps it into an adapter that converts it into a cpluginv1.DestinationPlugin. If the parameter is nil it will wrap UnimplementedDestination instead.

func NewSourcePlugin

func NewSourcePlugin(impl Source) cpluginv1.SourcePlugin

NewSourcePlugin takes a Source and wraps it into an adapter that converts it into a cpluginv1.SourcePlugin. If the parameter is nil it will wrap UnimplementedSource instead.

func NewSpecifierPlugin

func NewSpecifierPlugin(specs Specification, source Source, dest Destination) cpluginv1.SpecifierPlugin

NewSpecifierPlugin takes a Specification and wraps it into an adapter that converts it into a cpluginv1.SpecifierPlugin.

func Serve

func Serve(c Connector)

Serve starts the plugin and takes care of its whole lifecycle by blocking until the plugin can safely stop running. Any fixable errors will be output to os.Stderr and the process will exit with a status code of 1. Serve will panic for unexpected conditions where a user's fix is unknown.

It is essential that nothing gets written to stdout or stderr before this function is called, as the first output is used to perform the initial handshake.

Plugins should call Serve in their main() functions.

Types

type AcceptanceTestDriver added in v0.3.0

type AcceptanceTestDriver interface {
	// Context returns the context to use in tests.
	Context() context.Context
	// Connector is the connector to be tested.
	Connector() Connector

	// SourceConfig should be a valid config for a source connector, reading
	// from the same location as the destination will write to.
	SourceConfig(*testing.T) map[string]string
	// DestinationConfig should be a valid config for a destination connector,
	// writing to the same location as the source will read from.
	DestinationConfig(*testing.T) map[string]string

	// BeforeTest is executed before each acceptance test.
	BeforeTest(*testing.T)
	// AfterTest is executed after each acceptance test.
	AfterTest(*testing.T)

	// GoleakOptions will be applied to goleak.VerifyNone. Can be used to
	// suppress false positive goroutine leaks.
	GoleakOptions(*testing.T) []goleak.Option

	// GenerateRecord will generate a new Record for a certain Operation. It's
	// the responsibility of the AcceptanceTestDriver implementation to provide
	// records with appropriate contents (e.g. appropriate type of payload).
	// The generated record will contain mixed data types in the field Key and
	// Payload (i.e. RawData and StructuredData), unless configured otherwise
	// (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
	GenerateRecord(*testing.T, Operation) Record

	// WriteToSource receives a slice of records that should be prepared in the
	// 3rd party system so that the source will read them. The returned slice
	// will be used to verify the source connector can successfully execute
	// reads.
	// It is encouraged for the driver to return the same slice, unless there is
	// no way to write the records to the 3rd party system, then the returning
	// slice should contain the expected records a source should read.
	WriteToSource(*testing.T, []Record) []Record
	// ReadFromDestination should return a slice with the records that were
	// written to the destination. The slice will be used to verify the
	// destination has successfully executed writes.
	// The parameter contains records that were actually written to the
	// destination. These will be compared to the returned slice of records. It
	// is encouraged for the driver to only touch the input records to change
	// the order of records and to not change the records themselves.
	ReadFromDestination(*testing.T, []Record) []Record

	// ReadTimeout controls the time the test should wait for a read operation
	// to return before it considers the operation as failed.
	ReadTimeout() time.Duration
	// WriteTimeout controls the time the test should wait for a write operation
	// to return before it considers the operation as failed.
	WriteTimeout() time.Duration
}

AcceptanceTestDriver is the object that each test uses for fetching the connector and its configurations. The SDK provides a default implementation ConfigurableAcceptanceTestDriver that should fit most use cases. In case more flexibility is needed you can create your own driver, include the default driver in the struct and override methods as needed.

type Change added in v0.3.0

type Change struct {
	// Before contains the data before the operation occurred. This field is
	// optional and should only be populated for operations OperationUpdate
	// OperationDelete (if the system supports fetching the data before the
	// operation).
	Before Data `json:"before"`
	// After contains the data after the operation occurred. This field should
	// be populated for all operations except OperationDelete.
	After Data `json:"after"`
}

type ConfigurableAcceptanceTestDriver added in v0.3.0

type ConfigurableAcceptanceTestDriver struct {
	Config ConfigurableAcceptanceTestDriverConfig
	// contains filtered or unexported fields
}

ConfigurableAcceptanceTestDriver is the default implementation of AcceptanceTestDriver. It provides a convenient way of configuring the driver without the need of implementing a custom driver from scratch.

func (ConfigurableAcceptanceTestDriver) AfterTest added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)

func (ConfigurableAcceptanceTestDriver) BeforeTest added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)

func (ConfigurableAcceptanceTestDriver) Connector added in v0.3.0

func (ConfigurableAcceptanceTestDriver) Context added in v0.9.0

func (ConfigurableAcceptanceTestDriver) DestinationConfig added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) map[string]string

func (ConfigurableAcceptanceTestDriver) GenerateData added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data

GenerateData generates either RawData or StructuredData depending on the configured data type (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).

func (ConfigurableAcceptanceTestDriver) GenerateRecord added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operation) Record

func (ConfigurableAcceptanceTestDriver) GenerateValue added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}

GenerateValue generates a random value of a random builtin type.

func (ConfigurableAcceptanceTestDriver) GoleakOptions added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option

func (ConfigurableAcceptanceTestDriver) ReadFromDestination added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []Record) []Record

ReadFromDestination by default opens the source and reads all records from the source. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a source the function will fail the test.

func (ConfigurableAcceptanceTestDriver) ReadTimeout added in v0.3.0

func (ConfigurableAcceptanceTestDriver) Skip added in v0.3.0

func (ConfigurableAcceptanceTestDriver) SourceConfig added in v0.3.0

func (ConfigurableAcceptanceTestDriver) WriteTimeout added in v0.3.0

func (ConfigurableAcceptanceTestDriver) WriteToSource added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []Record) []Record

WriteToSource by default opens the destination and writes records to the destination. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a destination the function will fail the test.

type ConfigurableAcceptanceTestDriverConfig added in v0.3.0

type ConfigurableAcceptanceTestDriverConfig struct {
	// Context is the context to use in tests. The default is a context with a
	// logger that writes to the test output.
	Context context.Context
	// Connector is the connector to be tested.
	Connector Connector

	// SourceConfig should be a valid config for a source connector, reading
	// from the same location as the destination will write to.
	SourceConfig map[string]string
	// DestinationConfig should be a valid config for a destination connector,
	// writing to the same location as the source will read from.
	DestinationConfig map[string]string

	// BeforeTest is executed before each acceptance test.
	BeforeTest func(t *testing.T)
	// AfterTest is executed after each acceptance test.
	AfterTest func(t *testing.T)

	// GoleakOptions will be applied to goleak.VerifyNone. Can be used to
	// suppress false positive goroutine leaks.
	GoleakOptions []goleak.Option

	// Skip is a slice of regular expressions used to identify tests that should
	// be skipped. The full test name will be matched against all regular
	// expressions and the test will be skipped if a match is found.
	Skip []string

	// GenerateDataType controls which Data type will be generated in test
	// records. The default is GenerateMixedData which will produce both RawData
	// and StructuredData. To generate only one type of data set this field to
	// GenerateRawData or GenerateStructuredData.
	GenerateDataType GenerateDataType

	// ReadTimeout controls the time the test should wait for a read operation
	// to return a record before it considers the operation as failed. The
	// default timeout is 5 seconds. This value should be changed only if there
	// is a good reason (uncontrollable limitations of the 3rd party system).
	ReadTimeout time.Duration
	// WriteTimeout controls the time the test should wait for a write operation
	// to return a record before it considers the operation as failed. The
	// default timeout is 5 seconds. This value should be changed only if there
	// is a good reason (uncontrollable limitations of the 3rd party system).
	WriteTimeout time.Duration
}

ConfigurableAcceptanceTestDriverConfig contains the configuration for ConfigurableAcceptanceTestDriver.

type Connector added in v0.3.0

type Connector struct {
	// NewSpecification should create a new Specification that describes the
	// connector. This field is mandatory, if it is empty the connector won't
	// work.
	NewSpecification func() Specification
	// NewSource should create a new Source plugin. If the plugin doesn't
	// implement a source connector this field can be nil.
	NewSource func() Source
	// NewDestination should create a new Destination plugin. If the plugin
	// doesn't implement a destination connector this field can be nil.
	NewDestination func() Destination
}

Connector combines all constructors for each plugin into one struct.

type Converter added in v0.5.0

type Converter interface {
	Name() string
	Configure(map[string]string) (Converter, error)
	Convert(Record) (any, error)
}

Converter is a type that can change the structure of a Record. It's used in destination connectors to change the output structure (e.g. opencdc records, debezium records etc.).

type Data

type Data interface {
	Bytes() []byte
	// contains filtered or unexported methods
}

Data is a structure that contains some bytes. The only structs implementing Data are RawData and StructuredData.

type DebeziumConverter added in v0.5.0

type DebeziumConverter struct {
	SchemaName string
	RawDataKey string
}

DebeziumConverter outputs a Debezium record.

func (DebeziumConverter) Configure added in v0.5.0

func (c DebeziumConverter) Configure(opt map[string]string) (Converter, error)

func (DebeziumConverter) Convert added in v0.5.0

func (c DebeziumConverter) Convert(r Record) (any, error)

func (DebeziumConverter) Name added in v0.5.0

func (c DebeziumConverter) Name() string

type Destination

type Destination interface {
	// Parameters is a map of named Parameters that describe how to configure
	// the Destination.
	Parameters() map[string]Parameter

	// Configure is the first function to be called in a connector. It provides the
	// connector with the configuration that needs to be validated and stored.
	// In case the configuration is not valid it should return an error.
	// Testing if your connector can reach the configured data source should be
	// done in Open, not in Configure.
	// The connector SDK will sanitize, apply defaults and validate the
	// configuration before calling this function. This means that the
	// configuration will always contain all keys defined in Parameters
	// (unprovided keys will have their default values) and all non-empty
	// values will be of the correct type.
	Configure(context.Context, map[string]string) error

	// Open is called after Configure to signal the plugin it can prepare to
	// start writing records. If needed, the plugin should open connections in
	// this function.
	Open(context.Context) error

	// Write writes len(r) records from r to the destination right away without
	// caching. It should return the number of records written from r
	// (0 <= n <= len(r)) and any error encountered that caused the write to
	// stop early. Write must return a non-nil error if it returns n < len(r).
	Write(ctx context.Context, r []Record) (n int, err error)

	// Teardown signals to the plugin that all records were written and there
	// will be no more calls to any other function. After Teardown returns, the
	// plugin should be ready for a graceful shutdown.
	Teardown(context.Context) error

	// LifecycleOnCreated is called after Configure and before Open when the
	// connector is run for the first time. This call will be skipped if the
	// connector was already started before. This method can be used to do some
	// initialization that needs to happen only once in the lifetime of a
	// connector (e.g. create a bucket). Anything that the connector creates in
	// this method is considered to be owned by this connector and should be
	// cleaned up in LifecycleOnDeleted.
	LifecycleOnCreated(ctx context.Context, config map[string]string) error
	// LifecycleOnUpdated is called after Configure and before Open when the
	// connector configuration has changed since the last run. This call will be
	// skipped if the connector configuration did not change. It can be used to
	// update anything that was initialized in LifecycleOnCreated, in case the
	// configuration change affects it.
	LifecycleOnUpdated(ctx context.Context, configBefore, configAfter map[string]string) error
	// LifecycleOnDeleted is called when the connector was deleted. It will be
	// the only method that is called in that case. This method can be used to
	// clean up anything that was initialized in LifecycleOnCreated.
	LifecycleOnDeleted(ctx context.Context, config map[string]string) error
	// contains filtered or unexported methods
}

Destination receives records from Conduit and writes them to 3rd party resources. All implementations must embed UnimplementedDestination for forward compatibility.

func DestinationWithMiddleware added in v0.3.0

func DestinationWithMiddleware(d Destination, middleware ...DestinationMiddleware) Destination

DestinationWithMiddleware wraps the destination into the supplied middleware.

type DestinationMiddleware added in v0.3.0

type DestinationMiddleware interface {
	Wrap(Destination) Destination
}

DestinationMiddleware wraps a Destination and adds functionality to it.

func DefaultDestinationMiddleware added in v0.3.0

func DefaultDestinationMiddleware() []DestinationMiddleware

DefaultDestinationMiddleware returns a slice of middleware that should be added to all destinations unless there's a good reason not to.

type DestinationUtil added in v0.3.0

type DestinationUtil struct{}

DestinationUtil provides utility methods for implementing a destination. Use it by calling Util.Destination.*.

func (DestinationUtil) Route added in v0.3.0

func (DestinationUtil) Route(
	ctx context.Context,
	rec Record,
	handleCreate func(context.Context, Record) error,
	handleUpdate func(context.Context, Record) error,
	handleDelete func(context.Context, Record) error,
	handleSnapshot func(context.Context, Record) error,
) error

Route makes it easier to implement a destination that mutates entities in place and thus handles different operations differently. It will inspect the operation on the record and based on that choose which handler to call.

Example usage:

func (d *Destination) Write(ctx context.Context, r sdk.Record) error {
  return d.Util.Route(ctx, r,
    d.handleInsert,
    d.handleUpdate,
    d.handleDelete,
    d.handleSnapshot, // we could also reuse d.handleInsert
  )
}
func (d *Destination) handleInsert(ctx context.Context, r sdk.Record) error {
  ...
}

type DestinationWithBatch added in v0.3.0

type DestinationWithBatch struct {
	// DefaultBatchSize is the default value for the batch size.
	DefaultBatchSize int
	// DefaultBatchDelay is the default value for the batch delay.
	DefaultBatchDelay time.Duration
}

DestinationWithBatch adds support for batching on the destination. It adds two parameters to the destination config:

  • `sdk.batch.size` - Maximum size of batch before it gets written to the destination.
  • `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the destination.

To change the defaults of these parameters use the fields of this struct.

func (DestinationWithBatch) BatchDelayParameterName added in v0.5.0

func (d DestinationWithBatch) BatchDelayParameterName() string

func (DestinationWithBatch) BatchSizeParameterName added in v0.5.0

func (d DestinationWithBatch) BatchSizeParameterName() string

func (DestinationWithBatch) Wrap added in v0.3.0

Wrap a Destination into the batching middleware.

type DestinationWithRateLimit added in v0.3.0

type DestinationWithRateLimit struct {
	// DefaultRatePerSecond is the default value for the rate per second.
	DefaultRatePerSecond float64
	// DefaultBurst is the default value for the allowed burst count.
	DefaultBurst int
}

DestinationWithRateLimit adds support for rate limiting to the destination. It adds two parameters to the destination config:

  • `sdk.rate.perSecond` - Maximum times the Write function can be called per second (0 means no rate limit).
  • `sdk.rate.burst` - Allow bursts of at most X writes (0 means that bursts are not allowed).

To change the defaults of these parameters use the fields of this struct.

func (DestinationWithRateLimit) RateBurstParameterName added in v0.5.0

func (d DestinationWithRateLimit) RateBurstParameterName() string

func (DestinationWithRateLimit) RatePerSecondParameterName added in v0.5.0

func (d DestinationWithRateLimit) RatePerSecondParameterName() string

func (DestinationWithRateLimit) Wrap added in v0.3.0

Wrap a Destination into the rate limiting middleware.

type DestinationWithRecordFormat added in v0.5.0

type DestinationWithRecordFormat struct {
	// DefaultRecordFormat is the default record format.
	DefaultRecordFormat string
	RecordFormatters    []RecordFormatter
}

DestinationWithRecordFormat adds support for changing the output format of records, specifically of the Record.Bytes method. It adds two parameters to the destination config:

  • `sdk.record.format` - The format of the output record. The inclusion validation exposes a list of valid options.
  • `sdk.record.format.options` - Options are used to configure the format.

func (DestinationWithRecordFormat) DefaultRecordFormatters added in v0.5.0

func (d DestinationWithRecordFormat) DefaultRecordFormatters() []RecordFormatter

DefaultRecordFormatters returns the list of record formatters that are used if DestinationWithRecordFormat.RecordFormatters is nil.

func (DestinationWithRecordFormat) RecordFormatOptionsParameterName added in v0.5.0

func (d DestinationWithRecordFormat) RecordFormatOptionsParameterName() string

func (DestinationWithRecordFormat) RecordFormatParameterName added in v0.5.0

func (d DestinationWithRecordFormat) RecordFormatParameterName() string

func (DestinationWithRecordFormat) Wrap added in v0.5.0

Wrap a Destination into the record format middleware.

type Encoder added in v0.5.0

type Encoder interface {
	Name() string
	Configure(options map[string]string) (Encoder, error)
	Encode(r any) ([]byte, error)
}

Encoder is a type that can encode a random struct into a byte slice. It's used in destination connectors to encode records into different formats (e.g. JSON, Avro etc.).

type GenerateDataType added in v0.3.0

type GenerateDataType int

GenerateDataType is used in acceptance tests to control what data type will be generated.

const (
	GenerateMixedData GenerateDataType = iota
	GenerateRawData
	GenerateStructuredData
)

type GenericRecordFormatter added in v0.5.0

type GenericRecordFormatter struct {
	Converter
	Encoder
}

GenericRecordFormatter is a formatter that uses a Converter and Encoder to format a record.

func (GenericRecordFormatter) Configure added in v0.5.0

func (rf GenericRecordFormatter) Configure(optRaw string) (RecordFormatter, error)

func (GenericRecordFormatter) Format added in v0.5.0

func (rf GenericRecordFormatter) Format(r Record) ([]byte, error)

Format converts and encodes record into a byte array.

func (GenericRecordFormatter) Name added in v0.5.0

func (rf GenericRecordFormatter) Name() string

Name returns the name of the record formatter combined from the converter name and encoder name.

type JSONEncoder added in v0.5.0

type JSONEncoder struct{}

JSONEncoder is an Encoder that outputs JSON.

func (JSONEncoder) Configure added in v0.5.0

func (e JSONEncoder) Configure(map[string]string) (Encoder, error)

func (JSONEncoder) Encode added in v0.5.0

func (e JSONEncoder) Encode(v any) ([]byte, error)

func (JSONEncoder) Name added in v0.5.0

func (e JSONEncoder) Name() string

type Metadata added in v0.3.0

type Metadata map[string]string

func (Metadata) GetCollection added in v0.9.0

func (m Metadata) GetCollection() (string, error)

GetCollection returns the value for key MetadataCollection. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackError added in v0.9.0

func (m Metadata) GetConduitDLQNackError() (string, error)

GetConduitDLQNackError returns the value for key MetadataConduitDLQNackError. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackNodeID added in v0.9.0

func (m Metadata) GetConduitDLQNackNodeID() (string, error)

GetConduitDLQNackNodeID returns the value for key MetadataConduitDLQNackNodeID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginName added in v0.4.0

func (m Metadata) GetConduitDestinationPluginName() (string, error)

GetConduitDestinationPluginName returns the value for key MetadataConduitDestinationPluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginVersion added in v0.4.0

func (m Metadata) GetConduitDestinationPluginVersion() (string, error)

GetConduitDestinationPluginVersion returns the value for key MetadataConduitDestinationPluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourceConnectorID added in v0.9.0

func (m Metadata) GetConduitSourceConnectorID() (string, error)

GetConduitSourceConnectorID returns the value for key MetadataConduitSourceConnectorID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginName added in v0.4.0

func (m Metadata) GetConduitSourcePluginName() (string, error)

GetConduitSourcePluginName returns the value for key MetadataConduitSourcePluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginVersion added in v0.4.0

func (m Metadata) GetConduitSourcePluginVersion() (string, error)

GetConduitSourcePluginVersion returns the value for key MetadataConduitSourcePluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetCreatedAt added in v0.3.0

func (m Metadata) GetCreatedAt() (time.Time, error)

GetCreatedAt parses the value for key MetadataCreatedAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) GetOpenCDCVersion added in v0.3.0

func (m Metadata) GetOpenCDCVersion() (string, error)

GetOpenCDCVersion returns the value for key MetadataOpenCDCVersion. If the value is does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetReadAt added in v0.3.0

func (m Metadata) GetReadAt() (time.Time, error)

GetReadAt parses the value for key MetadataReadAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) SetCollection added in v0.9.0

func (m Metadata) SetCollection(collection string)

SetCollection sets the metadata value for key MetadataCollection.

func (Metadata) SetConduitDLQNackError added in v0.9.0

func (m Metadata) SetConduitDLQNackError(err string)

SetConduitDLQNackError sets the metadata value for key MetadataConduitDLQNackError.

func (Metadata) SetConduitDLQNackNodeID added in v0.9.0

func (m Metadata) SetConduitDLQNackNodeID(id string)

SetConduitDLQNackNodeID sets the metadata value for key MetadataConduitDLQNackNodeID.

func (Metadata) SetConduitDestinationPluginName added in v0.4.0

func (m Metadata) SetConduitDestinationPluginName(name string)

SetConduitDestinationPluginName sets the metadata value for key MetadataConduitDestinationPluginName.

func (Metadata) SetConduitDestinationPluginVersion added in v0.4.0

func (m Metadata) SetConduitDestinationPluginVersion(version string)

SetConduitDestinationPluginVersion sets the metadata value for key MetadataConduitDestinationPluginVersion.

func (Metadata) SetConduitSourceConnectorID added in v0.9.0

func (m Metadata) SetConduitSourceConnectorID(id string)

SetConduitSourceConnectorID sets the metadata value for key MetadataConduitSourceConnectorID.

func (Metadata) SetConduitSourcePluginName added in v0.4.0

func (m Metadata) SetConduitSourcePluginName(name string)

SetConduitSourcePluginName sets the metadata value for key MetadataConduitSourcePluginName.

func (Metadata) SetConduitSourcePluginVersion added in v0.4.0

func (m Metadata) SetConduitSourcePluginVersion(version string)

SetConduitSourcePluginVersion sets the metadata value for key MetadataConduitSourcePluginVersion.

func (Metadata) SetCreatedAt added in v0.3.0

func (m Metadata) SetCreatedAt(createdAt time.Time)

SetCreatedAt sets the metadata value for key MetadataCreatedAt as a unix timestamp in nanoseconds.

func (Metadata) SetOpenCDCVersion added in v0.3.0

func (m Metadata) SetOpenCDCVersion()

SetOpenCDCVersion sets the metadata value for key MetadataVersion to the current version of OpenCDC used.

func (Metadata) SetReadAt added in v0.3.0

func (m Metadata) SetReadAt(createdAt time.Time)

SetReadAt sets the metadata value for key MetadataReadAt as a unix timestamp in nanoseconds.

type OpenCDCConverter added in v0.5.0

type OpenCDCConverter struct{}

OpenCDCConverter outputs an OpenCDC record (it does not change the structure of the record).

func (OpenCDCConverter) Configure added in v0.5.0

func (c OpenCDCConverter) Configure(map[string]string) (Converter, error)

func (OpenCDCConverter) Convert added in v0.5.0

func (c OpenCDCConverter) Convert(r Record) (any, error)

func (OpenCDCConverter) Name added in v0.5.0

func (c OpenCDCConverter) Name() string

type Operation added in v0.3.0

type Operation int

Operation defines what triggered the creation of a record.

const (
	OperationCreate   Operation = iota + 1 // create
	OperationUpdate                        // update
	OperationDelete                        // delete
	OperationSnapshot                      // snapshot
)

func (Operation) MarshalText added in v0.3.0

func (i Operation) MarshalText() ([]byte, error)

func (Operation) String added in v0.3.0

func (i Operation) String() string

func (*Operation) UnmarshalText added in v0.3.0

func (i *Operation) UnmarshalText(b []byte) error

type Parameter

type Parameter struct {
	// Default is the default value of the parameter, if any.
	Default string
	// Required controls if the parameter will be shown as required or optional.
	// Deprecated: add ValidationRequired to Parameter.Validations instead.
	Required bool
	// Description holds a description of the field and how to configure it.
	Description string
	// Type defines the parameter data type.
	Type ParameterType
	// Validations slice of validations to be checked for the parameter.
	Validations []Validation
}

Parameter defines a single connector parameter.

type ParameterType added in v0.4.0

type ParameterType config.ParameterType
const (
	ParameterTypeString ParameterType = iota + 1
	ParameterTypeInt
	ParameterTypeFloat
	ParameterTypeBool
	ParameterTypeFile
	ParameterTypeDuration
)

type Position

type Position []byte

Position is a unique identifier for a record. It is the responsibility of the Source to choose and assign record positions, it can freely choose a format that makes sense and contains everything needed to restart a pipeline at a certain position.

type RawData

type RawData []byte

RawData contains unstructured data in form of a byte slice.

func (RawData) Bytes

func (d RawData) Bytes() []byte

Bytes simply casts RawData to a byte slice.

type Record

type Record struct {
	// Position uniquely represents the record.
	Position Position `json:"position"`
	// Operation defines what triggered the creation of a record. There are four
	// possibilities: create, update, delete or snapshot. The first three
	// operations are encountered during normal CDC operation, while "snapshot"
	// is meant to represent records during an initial load. Depending on the
	// operation, the record will contain either the payload before the change,
	// after the change, or both (see field Payload).
	Operation Operation `json:"operation"`
	// Metadata contains additional information regarding the record.
	Metadata Metadata `json:"metadata"`

	// Key represents a value that should identify the entity (e.g. database
	// row).
	Key Data `json:"key"`
	// Payload holds the payload change (data before and after the operation
	// occurred).
	Payload Change `json:"payload"`
	// contains filtered or unexported fields
}

Record represents a single data record produced by a source and/or consumed by a destination connector.

func (Record) Bytes added in v0.3.0

func (r Record) Bytes() []byte

Bytes returns the JSON encoding of the Record.

type RecordFormatter added in v0.5.0

type RecordFormatter interface {
	Name() string
	Configure(string) (RecordFormatter, error)
	Format(Record) ([]byte, error)
}

RecordFormatter is a type that can format a record to bytes. It's used in destination connectors to change the output structure and format.

type Source

type Source interface {
	// Parameters is a map of named Parameters that describe how to configure
	// the Source.
	Parameters() map[string]Parameter

	// Configure is the first function to be called in a connector. It provides the
	// connector with the configuration that needs to be validated and stored.
	// In case the configuration is not valid it should return an error.
	// Testing if your connector can reach the configured data source should be
	// done in Open, not in Configure.
	// The connector SDK will sanitize, apply defaults and validate the
	// configuration before calling this function. This means that the
	// configuration will always contain all keys defined in Parameters
	// (unprovided keys will have their default values) and all non-empty
	// values will be of the correct type.
	Configure(context.Context, map[string]string) error

	// Open is called after Configure to signal the plugin it can prepare to
	// start producing records. If needed, the plugin should open connections in
	// this function. The position parameter will contain the position of the
	// last record that was successfully processed, Source should therefore
	// start producing records after this position. The context passed to Open
	// will be cancelled once the plugin receives a stop signal from Conduit.
	Open(context.Context, Position) error

	// Read returns a new Record and is supposed to block until there is either
	// a new record or the context gets cancelled. It can also return the error
	// ErrBackoffRetry to signal to the SDK it should call Read again with a
	// backoff retry.
	// If Read receives a cancelled context or the context is cancelled while
	// Read is running it must stop retrieving new records from the source
	// system and start returning records that have already been buffered. If
	// there are no buffered records left Read must return the context error to
	// signal a graceful stop. If Read returns ErrBackoffRetry while the context
	// is cancelled it will also signal that there are no records left and Read
	// won't be called again.
	// After Read returns an error the function won't be called again (except if
	// the error is ErrBackoffRetry, as mentioned above).
	// Read can be called concurrently with Ack.
	Read(context.Context) (Record, error)
	// Ack signals to the implementation that the record with the supplied
	// position was successfully processed. This method might be called after
	// the context of Read is already cancelled, since there might be
	// outstanding acks that need to be delivered. When Teardown is called it is
	// guaranteed there won't be any more calls to Ack.
	// Ack can be called concurrently with Read.
	Ack(context.Context, Position) error

	// Teardown signals to the plugin that there will be no more calls to any
	// other function. After Teardown returns, the plugin should be ready for a
	// graceful shutdown.
	Teardown(context.Context) error

	// LifecycleOnCreated is called after Configure and before Open when the
	// connector is run for the first time. This call will be skipped if the
	// connector was already started before. This method can be used to do some
	// initialization that needs to happen only once in the lifetime of a
	// connector (e.g. create a logical replication slot). Anything that the
	// connector creates in this method is considered to be owned by this
	// connector and should be cleaned up in LifecycleOnDeleted.
	LifecycleOnCreated(ctx context.Context, config map[string]string) error
	// LifecycleOnUpdated is called after Configure and before Open when the
	// connector configuration has changed since the last run. This call will be
	// skipped if the connector configuration did not change. It can be used to
	// update anything that was initialized in LifecycleOnCreated, in case the
	// configuration change affects it.
	LifecycleOnUpdated(ctx context.Context, configBefore, configAfter map[string]string) error
	// LifecycleOnDeleted is called when the connector was deleted. It will be
	// the only method that is called in that case. This method can be used to
	// clean up anything that was initialized in LifecycleOnCreated.
	LifecycleOnDeleted(ctx context.Context, config map[string]string) error
	// contains filtered or unexported methods
}

Source fetches records from 3rd party resources and sends them to Conduit. All implementations must embed UnimplementedSource for forward compatibility.

func SourceWithMiddleware added in v0.3.0

func SourceWithMiddleware(d Source, middleware ...SourceMiddleware) Source

SourceWithMiddleware wraps the source into the supplied middleware.

type SourceMiddleware added in v0.3.0

type SourceMiddleware interface {
	Wrap(Source) Source
}

SourceMiddleware wraps a Source and adds functionality to it.

func DefaultSourceMiddleware added in v0.3.0

func DefaultSourceMiddleware() []SourceMiddleware

DefaultSourceMiddleware returns a slice of middleware that should be added to all sources unless there's a good reason not to.

type SourceUtil added in v0.3.0

type SourceUtil struct{}

SourceUtil provides utility methods for implementing a source. Use it by calling Util.Source.*.

func (SourceUtil) NewRecordCreate added in v0.3.0

func (SourceUtil) NewRecordCreate(
	position Position,
	metadata Metadata,
	key Data,
	payload Data,
) Record

NewRecordCreate can be used to instantiate a record with OperationCreate.

func (SourceUtil) NewRecordDelete added in v0.3.0

func (SourceUtil) NewRecordDelete(
	position Position,
	metadata Metadata,
	key Data,
) Record

NewRecordDelete can be used to instantiate a record with OperationDelete.

func (SourceUtil) NewRecordSnapshot added in v0.3.0

func (SourceUtil) NewRecordSnapshot(
	position Position,
	metadata Metadata,
	key Data,
	payload Data,
) Record

NewRecordSnapshot can be used to instantiate a record with OperationSnapshot.

func (SourceUtil) NewRecordUpdate added in v0.3.0

func (SourceUtil) NewRecordUpdate(
	position Position,
	metadata Metadata,
	key Data,
	payloadBefore Data,
	payloadAfter Data,
) Record

NewRecordUpdate can be used to instantiate a record with OperationUpdate.

type Specification

type Specification struct {
	// Name is the name of the plugin.
	Name string
	// Summary is a brief description of the plugin and what it does. Try not to
	// exceed 200 characters.
	Summary string
	// Description is a more long form area appropriate for README-like text
	// that the author can provide for explaining the behavior of the connector
	// or specific parameters.
	Description string
	// Version string. Should be prepended with `v` like Go, e.g. `v1.54.3`.
	Version string
	// Author declares the entity that created or maintains this plugin.
	Author string
}

Specification contains general information regarding the plugin like its name and what it does.

type StructuredData

type StructuredData map[string]interface{}

StructuredData contains data in form of a map with string keys and arbitrary values.

func (StructuredData) Bytes

func (d StructuredData) Bytes() []byte

Bytes returns the JSON encoding of the map.

type TemplateRecordFormatter added in v0.5.0

type TemplateRecordFormatter struct {
	// contains filtered or unexported fields
}

TemplateRecordFormatter is a RecordFormatter that formats a record using a Go template.

func (TemplateRecordFormatter) Configure added in v0.5.0

func (e TemplateRecordFormatter) Configure(tmpl string) (RecordFormatter, error)

func (TemplateRecordFormatter) Format added in v0.5.0

func (e TemplateRecordFormatter) Format(r Record) ([]byte, error)

func (TemplateRecordFormatter) Name added in v0.5.0

type UnimplementedDestination

type UnimplementedDestination struct{}

UnimplementedDestination should be embedded to have forward compatible implementations.

func (UnimplementedDestination) Configure

Configure needs to be overridden in the actual implementation.

func (UnimplementedDestination) LifecycleOnCreated added in v0.6.0

func (UnimplementedDestination) LifecycleOnCreated(context.Context, map[string]string) error

LifecycleOnCreated won't do anything by default.

func (UnimplementedDestination) LifecycleOnDeleted added in v0.6.0

func (UnimplementedDestination) LifecycleOnDeleted(context.Context, map[string]string) error

LifecycleOnDeleted won't do anything by default.

func (UnimplementedDestination) LifecycleOnUpdated added in v0.6.0

func (UnimplementedDestination) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error

LifecycleOnUpdated won't do anything by default.

func (UnimplementedDestination) Open

Open needs to be overridden in the actual implementation.

func (UnimplementedDestination) Parameters added in v0.3.0

func (UnimplementedDestination) Parameters() map[string]Parameter

Parameters needs to be overridden in the actual implementation.

func (UnimplementedDestination) Teardown

Teardown needs to be overridden in the actual implementation.

func (UnimplementedDestination) Write

Write needs to be overridden in the actual implementation.

type UnimplementedSource

type UnimplementedSource struct{}

UnimplementedSource should be embedded to have forward compatible implementations.

func (UnimplementedSource) Ack

Ack should be overridden if acks need to be forwarded to the source, otherwise it is optional.

func (UnimplementedSource) Configure

Configure needs to be overridden in the actual implementation.

func (UnimplementedSource) LifecycleOnCreated added in v0.6.0

func (UnimplementedSource) LifecycleOnCreated(context.Context, map[string]string) error

LifecycleOnCreated won't do anything by default.

func (UnimplementedSource) LifecycleOnDeleted added in v0.6.0

func (UnimplementedSource) LifecycleOnDeleted(context.Context, map[string]string) error

LifecycleOnDeleted won't do anything by default.

func (UnimplementedSource) LifecycleOnUpdated added in v0.6.0

func (UnimplementedSource) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error

LifecycleOnUpdated won't do anything by default.

func (UnimplementedSource) Open

Open needs to be overridden in the actual implementation.

func (UnimplementedSource) Parameters added in v0.3.0

func (UnimplementedSource) Parameters() map[string]Parameter

Parameters needs to be overridden in the actual implementation.

func (UnimplementedSource) Read

Read needs to be overridden in the actual implementation.

func (UnimplementedSource) Teardown

Teardown needs to be overridden in the actual implementation.

type Validation added in v0.4.0

type Validation interface {
	// contains filtered or unexported methods
}

type ValidationExclusion added in v0.4.0

type ValidationExclusion struct {
	List []string
}

type ValidationGreaterThan added in v0.4.0

type ValidationGreaterThan struct {
	Value float64
}

type ValidationInclusion added in v0.4.0

type ValidationInclusion struct {
	List []string
}

type ValidationLessThan added in v0.4.0

type ValidationLessThan struct {
	Value float64
}

type ValidationRegex added in v0.4.0

type ValidationRegex struct {
	Regex *regexp.Regexp
}

type ValidationRequired added in v0.4.0

type ValidationRequired struct{}

Directories

Path Synopsis
cmd
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL