Documentation ¶
Overview ¶
Package sdk implements utilities for implementing a Conduit connector.
Getting started ¶
Conduit connectors can be thought of as the edges of a Conduit pipeline. They are responsible for reading records from and writing records to third party systems. Conduit uses connectors as plugins that hide the intricacies of working with a particular third party system, so that Conduit itself can focus on efficiently processing records and moving them safely from sources to destinations.
To implement a connector, start by defining a global variable of type Connector, preferably in connector.go at the root of your project to make it easy to discover.
var Connector = sdk.Connector { NewSpecification: Specification, // Specification is my connector's specification NewSource: NewSource, // NewSource is the constructor for my source NewDestination: NewDestination, // NewDestination is the constructor for my destination }
Connector will be used as the starting point for accessing three main connector components that you need to provide:
- Specification contains general information about the plugin like its name and what it does. Writing a specification is relatively simple and straightforward, for more info check the corresponding field docs of Specification.
- Source is the connector part that knows how to fetch data from the third party system and convert it to a Record.
- Destination is the connector part that knows how to write a Record to the third party system.
General advice for implementing connectors:
- The SDK provides a structured logger that can be retrieved with Logger. It allows you to create structured and leveled output that will be included as part of the Conduit logs.
- If you want to add logging to the hot path (i.e. code that is executed for every record that is read or written) you should use the log level "trace", otherwise it can greatly impact the performance of your connector.
Source ¶
A Source is responsible for continuously reading data from a third party system and returning it in form of a Record.
Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Source implementations.
type Source struct { sdk.UnimplementedSource }
You need to implement the functions required by Source and provide your own implementations. Please look at the documentation of Source for further information about individual functions.
You should also create a constructor function for your source struct. Note that this is the same function that should be set as the value of Connector.NewSource. The constructor should be used to wrap your source in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.
func NewSource() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) }
Additional tips for implementing a source:
- The SDK provides utilities for certain operations like creating records in SourceUtil. You can access it through the global variable Util.Source.
- The function Source.Ack is optional and does not have to be implemented.
- Source is responsible for creating record positions that should ideally uniquely identify a record. Think carefully about what you will store in the position, it should give the source enough information to resume reading records at that specific position.
- The SDK provides acceptance tests, if your source doesn't pass it means your implementation has a bug¹.
Destination ¶
A Destination is responsible for writing Record to third party systems.
Every Destination implementation needs to include an UnimplementedDestination to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Destination implementations.
type Destination struct { sdk.UnimplementedSource }
You need to implement the functions required by Destination and provide your own implementations. Please look at the documentation of Destination for further information about individual functions.
You should also create a constructor function for your destination struct. Note that this is the same function that should be set as the value of Connector.NewDestination. The constructor should be used to wrap your destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.
func NewDestination() sdk.Destination { return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) }
Additional tips for implementing a destination:
- The SDK provides utilities for certain operations like routing records based on their operation in DestinationUtil. You can access it through the global variable Util.Destination.
- If your destination writes records as a whole to the destination you should use Record.Bytes to get the raw record representation.
- If possible, make your destination writes idempotent. It is possible that the destination will receive the same record twice after a pipeline restart.
- Some sources won't be able to distinguish create and update operations. In case your destination is updating data in place, we recommend to upsert the record on a create or update.
- The SDK provides acceptance tests, if your destination doesn't pass it means your implementation has a bug¹.
Acceptance tests ¶
The SDK provides acceptance tests that can be run in a simple Go test.¹
To run acceptance tests you should create a test file, preferably named acceptance_test.go at the root of your project to make it easy to discover. Inside create a Go test where you trigger the function AcceptanceTest.
func TestAcceptance(t *testing.T) { // set up dependencies here sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{ Config: sdk.ConfigurableAcceptanceTestDriverConfig{ Connector: Connector, // Connector is the global variable from your connector SourceConfig: map[string]string{ … }, DestinationConfig: map[string]string{ … }, }, } }
AcceptanceTest uses the AcceptanceTestDriver for certain operations. The SDK already provides a default implementation for the driver with ConfigurableAcceptanceTestDriver, although you can supply your own implementation if you need to adjust the behavior of acceptance tests for your connector.
Some acceptance tests will try to write data using the destination and then read the same data using the source. Because of that you need to make sure that the configurations point both to the same exact data store (e.g. in case of the file connector the source and destination need to read and write to the same file).
If your connector does not implement both sides of the connector (a source and a destination) you will need to write a custom driver that knows how to read or write, depending on which side of the connector is not implemented. Here is an example how to do that:
type CustomAcceptanceTestDriver struct { sdk.ConfigurableAcceptanceTestDriver } func (d *CustomAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []sdk.Record) []sdk.Record { // implement read } func (d *CustomAcceptanceTestDriver) WriteToSource(t *testing.T, records []sdk.Record) []sdk.Record { // implement write }
For more information about what behavior can be customized please refer to the AcceptanceTestDriver interface.
¹Acceptance tests are currently still experimental.
Index ¶
- Constants
- Variables
- func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)
- func BenchmarkSource(b *testing.B, s Source, cfg map[string]string)
- func Logger(ctx context.Context) *zerolog.Logger
- func NewDestinationPlugin(impl Destination) cpluginv1.DestinationPlugin
- func NewSourcePlugin(impl Source) cpluginv1.SourcePlugin
- func NewSpecifierPlugin(specs Specification, source Source, dest Destination) cpluginv1.SpecifierPlugin
- func Serve(c Connector)
- type AcceptanceTestDriver
- type Change
- type ConfigurableAcceptanceTestDriver
- func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) Connector() Connector
- func (d ConfigurableAcceptanceTestDriver) Context() context.Context
- func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) map[string]string
- func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data
- func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operation) Record
- func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}
- func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option
- func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []Record) []Record
- func (d ConfigurableAcceptanceTestDriver) ReadTimeout() time.Duration
- func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T)
- func (d ConfigurableAcceptanceTestDriver) SourceConfig(*testing.T) map[string]string
- func (d ConfigurableAcceptanceTestDriver) WriteTimeout() time.Duration
- func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []Record) []Record
- type ConfigurableAcceptanceTestDriverConfig
- type Connector
- type Converter
- type Data
- type DebeziumConverter
- type Destination
- type DestinationMiddleware
- type DestinationUtil
- type DestinationWithBatch
- type DestinationWithRateLimit
- type DestinationWithRecordFormat
- func (d DestinationWithRecordFormat) DefaultRecordFormatters() []RecordFormatter
- func (d DestinationWithRecordFormat) RecordFormatOptionsParameterName() string
- func (d DestinationWithRecordFormat) RecordFormatParameterName() string
- func (d DestinationWithRecordFormat) Wrap(impl Destination) Destination
- type Encoder
- type GenerateDataType
- type GenericRecordFormatter
- type JSONEncoder
- type Metadata
- func (m Metadata) GetCollection() (string, error)
- func (m Metadata) GetConduitDLQNackError() (string, error)
- func (m Metadata) GetConduitDLQNackNodeID() (string, error)
- func (m Metadata) GetConduitDestinationPluginName() (string, error)
- func (m Metadata) GetConduitDestinationPluginVersion() (string, error)
- func (m Metadata) GetConduitSourceConnectorID() (string, error)
- func (m Metadata) GetConduitSourcePluginName() (string, error)
- func (m Metadata) GetConduitSourcePluginVersion() (string, error)
- func (m Metadata) GetCreatedAt() (time.Time, error)
- func (m Metadata) GetOpenCDCVersion() (string, error)
- func (m Metadata) GetReadAt() (time.Time, error)
- func (m Metadata) SetCollection(collection string)
- func (m Metadata) SetConduitDLQNackError(err string)
- func (m Metadata) SetConduitDLQNackNodeID(id string)
- func (m Metadata) SetConduitDestinationPluginName(name string)
- func (m Metadata) SetConduitDestinationPluginVersion(version string)
- func (m Metadata) SetConduitSourceConnectorID(id string)
- func (m Metadata) SetConduitSourcePluginName(name string)
- func (m Metadata) SetConduitSourcePluginVersion(version string)
- func (m Metadata) SetCreatedAt(createdAt time.Time)
- func (m Metadata) SetOpenCDCVersion()
- func (m Metadata) SetReadAt(createdAt time.Time)
- type OpenCDCConverter
- type Operation
- type Parameter
- type ParameterType
- type Position
- type RawData
- type Record
- type RecordFormatter
- type Source
- type SourceMiddleware
- type SourceUtil
- func (SourceUtil) NewRecordCreate(position Position, metadata Metadata, key Data, payload Data) Record
- func (SourceUtil) NewRecordDelete(position Position, metadata Metadata, key Data) Record
- func (SourceUtil) NewRecordSnapshot(position Position, metadata Metadata, key Data, payload Data) Record
- func (SourceUtil) NewRecordUpdate(position Position, metadata Metadata, key Data, payloadBefore Data, ...) Record
- type Specification
- type StructuredData
- type TemplateRecordFormatter
- type UnimplementedDestination
- func (UnimplementedDestination) Configure(context.Context, map[string]string) error
- func (UnimplementedDestination) LifecycleOnCreated(context.Context, map[string]string) error
- func (UnimplementedDestination) LifecycleOnDeleted(context.Context, map[string]string) error
- func (UnimplementedDestination) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error
- func (UnimplementedDestination) Open(context.Context) error
- func (UnimplementedDestination) Parameters() map[string]Parameter
- func (UnimplementedDestination) Teardown(context.Context) error
- func (UnimplementedDestination) Write(context.Context, []Record) (int, error)
- type UnimplementedSource
- func (UnimplementedSource) Ack(context.Context, Position) error
- func (UnimplementedSource) Configure(context.Context, map[string]string) error
- func (UnimplementedSource) LifecycleOnCreated(context.Context, map[string]string) error
- func (UnimplementedSource) LifecycleOnDeleted(context.Context, map[string]string) error
- func (UnimplementedSource) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error
- func (UnimplementedSource) Open(context.Context, Position) error
- func (UnimplementedSource) Parameters() map[string]Parameter
- func (UnimplementedSource) Read(context.Context) (Record, error)
- func (UnimplementedSource) Teardown(context.Context) error
- type Validation
- type ValidationExclusion
- type ValidationGreaterThan
- type ValidationInclusion
- type ValidationLessThan
- type ValidationRegex
- type ValidationRequired
Constants ¶
const ( // MetadataOpenCDCVersion is a Record.Metadata key for the version of the // OpenCDC format (e.g. "v1"). This field exists to ensure the OpenCDC // format version can be easily identified in case the record gets marshaled // into a different untyped format (e.g. JSON). MetadataOpenCDCVersion = cpluginv1.MetadataOpenCDCVersion // MetadataCreatedAt is a Record.Metadata key for the time when the record // was created in the 3rd party system. The expected format is a unix // timestamp in nanoseconds. MetadataCreatedAt = cpluginv1.MetadataCreatedAt // MetadataReadAt is a Record.Metadata key for the time when the record was // read from the 3rd party system. The expected format is a unix timestamp // in nanoseconds. MetadataReadAt = cpluginv1.MetadataReadAt // MetadataCollection is a Record.Metadata key for the name of the collection // where the record originated from and/or where it should be stored. MetadataCollection = cpluginv1.MetadataCollection // MetadataConduitSourcePluginName is a Record.Metadata key for the name of // the source plugin that created this record. MetadataConduitSourcePluginName = cpluginv1.MetadataConduitSourcePluginName // MetadataConduitSourcePluginVersion is a Record.Metadata key for the // version of the source plugin that created this record. MetadataConduitSourcePluginVersion = cpluginv1.MetadataConduitSourcePluginVersion // MetadataConduitDestinationPluginName is a Record.Metadata key for the // name of the destination plugin that has written this record // (only available in records once they are written by a destination). MetadataConduitDestinationPluginName = cpluginv1.MetadataConduitDestinationPluginName // MetadataConduitDestinationPluginVersion is a Record.Metadata key for the // version of the destination plugin that has written this record // (only available in records once they are written by a destination). MetadataConduitDestinationPluginVersion = cpluginv1.MetadataConduitDestinationPluginVersion // MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of // the source connector that produced this record. MetadataConduitSourceConnectorID = cpluginv1.MetadataConduitSourceConnectorID // MetadataConduitDLQNackError is a Record.Metadata key for the error that // caused a record to be nacked and pushed to the dead-letter queue. MetadataConduitDLQNackError = cpluginv1.MetadataConduitDLQNackError // MetadataConduitDLQNackNodeID is a Record.Metadata key for the ID of the // internal node that nacked the record. MetadataConduitDLQNackNodeID = cpluginv1.MetadataConduitDLQNackNodeID )
Variables ¶
var ( // ErrBackoffRetry can be returned by Source.Read to signal the SDK there is // no record to fetch right now and it should try again later. ErrBackoffRetry = errors.New("backoff retry") // ErrUnimplemented is returned in functions of plugins that don't implement // a certain method. ErrUnimplemented = errors.New("the connector plugin does not implement this action, please check the source code of the connector and make sure all required connector methods are implemented") // ErrMetadataFieldNotFound is returned in metadata utility functions when a // metadata field is not found. ErrMetadataFieldNotFound = errors.New("metadata field not found") )
var ( ErrUnrecognizedParameter = config.ErrUnrecognizedParameter ErrInvalidParameterValue = config.ErrInvalidParameterValue ErrInvalidParameterType = config.ErrInvalidParameterType ErrInvalidValidationType = config.ErrInvalidValidationType ErrRequiredParameterMissing = config.ErrRequiredParameterMissing ErrLessThanValidationFail = config.ErrLessThanValidationFail ErrGreaterThanValidationFail = config.ErrGreaterThanValidationFail ErrInclusionValidationFail = config.ErrInclusionValidationFail ErrExclusionValidationFail = config.ErrExclusionValidationFail ErrRegexValidationFail = config.ErrRegexValidationFail )
var Util = struct { // SourceUtil provides utility methods for implementing a source. Source SourceUtil // SourceUtil provides utility methods for implementing a destination. Destination DestinationUtil // ParseConfig provided to parse a config map into a struct // Under the hood, this function uses the library mitchellh/mapstructure, with the "mapstructure" tag renamed to "json", // so to rename a key, use the "json" tag and set a value directly. To embed structs, append ",squash" to your tag. // for more details and docs, check https://pkg.go.dev/github.com/mitchellh/mapstructure ParseConfig func(map[string]string, interface{}) error }{ ParseConfig: parseConfig, }
Util provides utilities for implementing connectors.
Functions ¶
func AcceptanceTest ¶ added in v0.3.0
func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)
AcceptanceTest is the acceptance test that all connector implementations should pass. It should manually be called from a test case in each implementation:
func TestAcceptance(t *testing.T) { // set up test dependencies ... sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{ Config: sdk.ConfigurableAcceptanceTestDriverConfig{ Connector: myConnector, SourceConfig: map[string]string{...}, // valid source config DestinationConfig: map[string]string{...}, // valid destination config }, }) }
func BenchmarkSource ¶ added in v0.9.0
BenchmarkSource is a benchmark that any source implementation can run to figure out its performance. The benchmark expects that the source resource contains at least b.N number of records. This should be prepared before the benchmark is executed. The function should be manually called from a benchmark function:
func BenchmarkConnector(b *testing.B) { // set up test dependencies and write b.N records to source resource ... sdk.BenchmarkSource( b, mySourceConnector, map[string]string{...}, // valid source config ) }
The benchmark can be run with a specific number of records by supplying the option -benchtime=Nx, where N is the number of records to be benchmarked (e.g. -benchtime=100x benchmarks reading 100 records).
func Logger ¶
Logger returns an instance of a logger that can be used for leveled and structured logging in a plugin. The logger will respect the log level configured in Conduit.
func NewDestinationPlugin ¶
func NewDestinationPlugin(impl Destination) cpluginv1.DestinationPlugin
NewDestinationPlugin takes a Destination and wraps it into an adapter that converts it into a cpluginv1.DestinationPlugin. If the parameter is nil it will wrap UnimplementedDestination instead.
func NewSourcePlugin ¶
func NewSourcePlugin(impl Source) cpluginv1.SourcePlugin
NewSourcePlugin takes a Source and wraps it into an adapter that converts it into a cpluginv1.SourcePlugin. If the parameter is nil it will wrap UnimplementedSource instead.
func NewSpecifierPlugin ¶
func NewSpecifierPlugin(specs Specification, source Source, dest Destination) cpluginv1.SpecifierPlugin
NewSpecifierPlugin takes a Specification and wraps it into an adapter that converts it into a cpluginv1.SpecifierPlugin.
func Serve ¶
func Serve(c Connector)
Serve starts the plugin and takes care of its whole lifecycle by blocking until the plugin can safely stop running. Any fixable errors will be output to os.Stderr and the process will exit with a status code of 1. Serve will panic for unexpected conditions where a user's fix is unknown.
It is essential that nothing gets written to stdout or stderr before this function is called, as the first output is used to perform the initial handshake.
Plugins should call Serve in their main() functions.
Types ¶
type AcceptanceTestDriver ¶ added in v0.3.0
type AcceptanceTestDriver interface { // Context returns the context to use in tests. Context() context.Context // Connector is the connector to be tested. Connector() Connector // SourceConfig should be a valid config for a source connector, reading // from the same location as the destination will write to. SourceConfig(*testing.T) map[string]string // DestinationConfig should be a valid config for a destination connector, // writing to the same location as the source will read from. DestinationConfig(*testing.T) map[string]string // BeforeTest is executed before each acceptance test. BeforeTest(*testing.T) // AfterTest is executed after each acceptance test. AfterTest(*testing.T) // GoleakOptions will be applied to goleak.VerifyNone. Can be used to // suppress false positive goroutine leaks. GoleakOptions(*testing.T) []goleak.Option // GenerateRecord will generate a new Record for a certain Operation. It's // the responsibility of the AcceptanceTestDriver implementation to provide // records with appropriate contents (e.g. appropriate type of payload). // The generated record will contain mixed data types in the field Key and // Payload (i.e. RawData and StructuredData), unless configured otherwise // (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType). GenerateRecord(*testing.T, Operation) Record // WriteToSource receives a slice of records that should be prepared in the // 3rd party system so that the source will read them. The returned slice // will be used to verify the source connector can successfully execute // reads. // It is encouraged for the driver to return the same slice, unless there is // no way to write the records to the 3rd party system, then the returning // slice should contain the expected records a source should read. WriteToSource(*testing.T, []Record) []Record // ReadFromDestination should return a slice with the records that were // written to the destination. The slice will be used to verify the // destination has successfully executed writes. // The parameter contains records that were actually written to the // destination. These will be compared to the returned slice of records. It // is encouraged for the driver to only touch the input records to change // the order of records and to not change the records themselves. ReadFromDestination(*testing.T, []Record) []Record // ReadTimeout controls the time the test should wait for a read operation // to return before it considers the operation as failed. ReadTimeout() time.Duration // WriteTimeout controls the time the test should wait for a write operation // to return before it considers the operation as failed. WriteTimeout() time.Duration }
AcceptanceTestDriver is the object that each test uses for fetching the connector and its configurations. The SDK provides a default implementation ConfigurableAcceptanceTestDriver that should fit most use cases. In case more flexibility is needed you can create your own driver, include the default driver in the struct and override methods as needed.
type Change ¶ added in v0.3.0
type Change struct { // Before contains the data before the operation occurred. This field is // optional and should only be populated for operations OperationUpdate // OperationDelete (if the system supports fetching the data before the // operation). Before Data `json:"before"` // After contains the data after the operation occurred. This field should // be populated for all operations except OperationDelete. After Data `json:"after"` }
type ConfigurableAcceptanceTestDriver ¶ added in v0.3.0
type ConfigurableAcceptanceTestDriver struct { Config ConfigurableAcceptanceTestDriverConfig // contains filtered or unexported fields }
ConfigurableAcceptanceTestDriver is the default implementation of AcceptanceTestDriver. It provides a convenient way of configuring the driver without the need of implementing a custom driver from scratch.
func (ConfigurableAcceptanceTestDriver) AfterTest ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)
func (ConfigurableAcceptanceTestDriver) BeforeTest ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)
func (ConfigurableAcceptanceTestDriver) Connector ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) Connector() Connector
func (ConfigurableAcceptanceTestDriver) Context ¶ added in v0.9.0
func (d ConfigurableAcceptanceTestDriver) Context() context.Context
func (ConfigurableAcceptanceTestDriver) DestinationConfig ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) map[string]string
func (ConfigurableAcceptanceTestDriver) GenerateData ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data
GenerateData generates either RawData or StructuredData depending on the configured data type (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
func (ConfigurableAcceptanceTestDriver) GenerateRecord ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operation) Record
func (ConfigurableAcceptanceTestDriver) GenerateValue ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}
GenerateValue generates a random value of a random builtin type.
func (ConfigurableAcceptanceTestDriver) GoleakOptions ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option
func (ConfigurableAcceptanceTestDriver) ReadFromDestination ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []Record) []Record
ReadFromDestination by default opens the source and reads all records from the source. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a source the function will fail the test.
func (ConfigurableAcceptanceTestDriver) ReadTimeout ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) ReadTimeout() time.Duration
func (ConfigurableAcceptanceTestDriver) Skip ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) Skip(t *testing.T)
func (ConfigurableAcceptanceTestDriver) SourceConfig ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) SourceConfig(*testing.T) map[string]string
func (ConfigurableAcceptanceTestDriver) WriteTimeout ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) WriteTimeout() time.Duration
func (ConfigurableAcceptanceTestDriver) WriteToSource ¶ added in v0.3.0
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []Record) []Record
WriteToSource by default opens the destination and writes records to the destination. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a destination the function will fail the test.
type ConfigurableAcceptanceTestDriverConfig ¶ added in v0.3.0
type ConfigurableAcceptanceTestDriverConfig struct { // Context is the context to use in tests. The default is a context with a // logger that writes to the test output. Context context.Context // Connector is the connector to be tested. Connector Connector // SourceConfig should be a valid config for a source connector, reading // from the same location as the destination will write to. SourceConfig map[string]string // DestinationConfig should be a valid config for a destination connector, // writing to the same location as the source will read from. DestinationConfig map[string]string // BeforeTest is executed before each acceptance test. BeforeTest func(t *testing.T) // AfterTest is executed after each acceptance test. AfterTest func(t *testing.T) // GoleakOptions will be applied to goleak.VerifyNone. Can be used to // suppress false positive goroutine leaks. GoleakOptions []goleak.Option // Skip is a slice of regular expressions used to identify tests that should // be skipped. The full test name will be matched against all regular // expressions and the test will be skipped if a match is found. Skip []string // GenerateDataType controls which Data type will be generated in test // records. The default is GenerateMixedData which will produce both RawData // and StructuredData. To generate only one type of data set this field to // GenerateRawData or GenerateStructuredData. GenerateDataType GenerateDataType // ReadTimeout controls the time the test should wait for a read operation // to return a record before it considers the operation as failed. The // default timeout is 5 seconds. This value should be changed only if there // is a good reason (uncontrollable limitations of the 3rd party system). ReadTimeout time.Duration // WriteTimeout controls the time the test should wait for a write operation // to return a record before it considers the operation as failed. The // default timeout is 5 seconds. This value should be changed only if there // is a good reason (uncontrollable limitations of the 3rd party system). WriteTimeout time.Duration }
ConfigurableAcceptanceTestDriverConfig contains the configuration for ConfigurableAcceptanceTestDriver.
type Connector ¶ added in v0.3.0
type Connector struct { // NewSpecification should create a new Specification that describes the // connector. This field is mandatory, if it is empty the connector won't // work. NewSpecification func() Specification // NewSource should create a new Source plugin. If the plugin doesn't // implement a source connector this field can be nil. NewSource func() Source // NewDestination should create a new Destination plugin. If the plugin // doesn't implement a destination connector this field can be nil. NewDestination func() Destination }
Connector combines all constructors for each plugin into one struct.
type Converter ¶ added in v0.5.0
type Converter interface { Name() string Configure(map[string]string) (Converter, error) Convert(Record) (any, error) }
Converter is a type that can change the structure of a Record. It's used in destination connectors to change the output structure (e.g. opencdc records, debezium records etc.).
type Data ¶
type Data interface { Bytes() []byte // contains filtered or unexported methods }
Data is a structure that contains some bytes. The only structs implementing Data are RawData and StructuredData.
type DebeziumConverter ¶ added in v0.5.0
DebeziumConverter outputs a Debezium record.
func (DebeziumConverter) Configure ¶ added in v0.5.0
func (c DebeziumConverter) Configure(opt map[string]string) (Converter, error)
func (DebeziumConverter) Convert ¶ added in v0.5.0
func (c DebeziumConverter) Convert(r Record) (any, error)
func (DebeziumConverter) Name ¶ added in v0.5.0
func (c DebeziumConverter) Name() string
type Destination ¶
type Destination interface { // Parameters is a map of named Parameters that describe how to configure // the Destination. Parameters() map[string]Parameter // Configure is the first function to be called in a connector. It provides the // connector with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Testing if your connector can reach the configured data source should be // done in Open, not in Configure. // The connector SDK will sanitize, apply defaults and validate the // configuration before calling this function. This means that the // configuration will always contain all keys defined in Parameters // (unprovided keys will have their default values) and all non-empty // values will be of the correct type. Configure(context.Context, map[string]string) error // Open is called after Configure to signal the plugin it can prepare to // start writing records. If needed, the plugin should open connections in // this function. Open(context.Context) error // Write writes len(r) records from r to the destination right away without // caching. It should return the number of records written from r // (0 <= n <= len(r)) and any error encountered that caused the write to // stop early. Write must return a non-nil error if it returns n < len(r). Write(ctx context.Context, r []Record) (n int, err error) // Teardown signals to the plugin that all records were written and there // will be no more calls to any other function. After Teardown returns, the // plugin should be ready for a graceful shutdown. Teardown(context.Context) error // LifecycleOnCreated is called after Configure and before Open when the // connector is run for the first time. This call will be skipped if the // connector was already started before. This method can be used to do some // initialization that needs to happen only once in the lifetime of a // connector (e.g. create a bucket). Anything that the connector creates in // this method is considered to be owned by this connector and should be // cleaned up in LifecycleOnDeleted. LifecycleOnCreated(ctx context.Context, config map[string]string) error // LifecycleOnUpdated is called after Configure and before Open when the // connector configuration has changed since the last run. This call will be // skipped if the connector configuration did not change. It can be used to // update anything that was initialized in LifecycleOnCreated, in case the // configuration change affects it. LifecycleOnUpdated(ctx context.Context, configBefore, configAfter map[string]string) error // LifecycleOnDeleted is called when the connector was deleted. It will be // the only method that is called in that case. This method can be used to // clean up anything that was initialized in LifecycleOnCreated. LifecycleOnDeleted(ctx context.Context, config map[string]string) error // contains filtered or unexported methods }
Destination receives records from Conduit and writes them to 3rd party resources. All implementations must embed UnimplementedDestination for forward compatibility.
func DestinationWithMiddleware ¶ added in v0.3.0
func DestinationWithMiddleware(d Destination, middleware ...DestinationMiddleware) Destination
DestinationWithMiddleware wraps the destination into the supplied middleware.
type DestinationMiddleware ¶ added in v0.3.0
type DestinationMiddleware interface {
Wrap(Destination) Destination
}
DestinationMiddleware wraps a Destination and adds functionality to it.
func DefaultDestinationMiddleware ¶ added in v0.3.0
func DefaultDestinationMiddleware() []DestinationMiddleware
DefaultDestinationMiddleware returns a slice of middleware that should be added to all destinations unless there's a good reason not to.
type DestinationUtil ¶ added in v0.3.0
type DestinationUtil struct{}
DestinationUtil provides utility methods for implementing a destination. Use it by calling Util.Destination.*.
func (DestinationUtil) Route ¶ added in v0.3.0
func (DestinationUtil) Route( ctx context.Context, rec Record, handleCreate func(context.Context, Record) error, handleUpdate func(context.Context, Record) error, handleDelete func(context.Context, Record) error, handleSnapshot func(context.Context, Record) error, ) error
Route makes it easier to implement a destination that mutates entities in place and thus handles different operations differently. It will inspect the operation on the record and based on that choose which handler to call.
Example usage:
func (d *Destination) Write(ctx context.Context, r sdk.Record) error { return d.Util.Route(ctx, r, d.handleInsert, d.handleUpdate, d.handleDelete, d.handleSnapshot, // we could also reuse d.handleInsert ) } func (d *Destination) handleInsert(ctx context.Context, r sdk.Record) error { ... }
type DestinationWithBatch ¶ added in v0.3.0
type DestinationWithBatch struct { // DefaultBatchSize is the default value for the batch size. DefaultBatchSize int // DefaultBatchDelay is the default value for the batch delay. DefaultBatchDelay time.Duration }
DestinationWithBatch adds support for batching on the destination. It adds two parameters to the destination config:
- `sdk.batch.size` - Maximum size of batch before it gets written to the destination.
- `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the destination.
To change the defaults of these parameters use the fields of this struct.
func (DestinationWithBatch) BatchDelayParameterName ¶ added in v0.5.0
func (d DestinationWithBatch) BatchDelayParameterName() string
func (DestinationWithBatch) BatchSizeParameterName ¶ added in v0.5.0
func (d DestinationWithBatch) BatchSizeParameterName() string
func (DestinationWithBatch) Wrap ¶ added in v0.3.0
func (d DestinationWithBatch) Wrap(impl Destination) Destination
Wrap a Destination into the batching middleware.
type DestinationWithRateLimit ¶ added in v0.3.0
type DestinationWithRateLimit struct { // DefaultRatePerSecond is the default value for the rate per second. DefaultRatePerSecond float64 // DefaultBurst is the default value for the allowed burst count. DefaultBurst int }
DestinationWithRateLimit adds support for rate limiting to the destination. It adds two parameters to the destination config:
- `sdk.rate.perSecond` - Maximum times the Write function can be called per second (0 means no rate limit).
- `sdk.rate.burst` - Allow bursts of at most X writes (0 means that bursts are not allowed).
To change the defaults of these parameters use the fields of this struct.
func (DestinationWithRateLimit) RateBurstParameterName ¶ added in v0.5.0
func (d DestinationWithRateLimit) RateBurstParameterName() string
func (DestinationWithRateLimit) RatePerSecondParameterName ¶ added in v0.5.0
func (d DestinationWithRateLimit) RatePerSecondParameterName() string
func (DestinationWithRateLimit) Wrap ¶ added in v0.3.0
func (d DestinationWithRateLimit) Wrap(impl Destination) Destination
Wrap a Destination into the rate limiting middleware.
type DestinationWithRecordFormat ¶ added in v0.5.0
type DestinationWithRecordFormat struct { // DefaultRecordFormat is the default record format. DefaultRecordFormat string RecordFormatters []RecordFormatter }
DestinationWithRecordFormat adds support for changing the output format of records, specifically of the Record.Bytes method. It adds two parameters to the destination config:
- `sdk.record.format` - The format of the output record. The inclusion validation exposes a list of valid options.
- `sdk.record.format.options` - Options are used to configure the format.
func (DestinationWithRecordFormat) DefaultRecordFormatters ¶ added in v0.5.0
func (d DestinationWithRecordFormat) DefaultRecordFormatters() []RecordFormatter
DefaultRecordFormatters returns the list of record formatters that are used if DestinationWithRecordFormat.RecordFormatters is nil.
func (DestinationWithRecordFormat) RecordFormatOptionsParameterName ¶ added in v0.5.0
func (d DestinationWithRecordFormat) RecordFormatOptionsParameterName() string
func (DestinationWithRecordFormat) RecordFormatParameterName ¶ added in v0.5.0
func (d DestinationWithRecordFormat) RecordFormatParameterName() string
func (DestinationWithRecordFormat) Wrap ¶ added in v0.5.0
func (d DestinationWithRecordFormat) Wrap(impl Destination) Destination
Wrap a Destination into the record format middleware.
type Encoder ¶ added in v0.5.0
type Encoder interface { Name() string Configure(options map[string]string) (Encoder, error) Encode(r any) ([]byte, error) }
Encoder is a type that can encode a random struct into a byte slice. It's used in destination connectors to encode records into different formats (e.g. JSON, Avro etc.).
type GenerateDataType ¶ added in v0.3.0
type GenerateDataType int
GenerateDataType is used in acceptance tests to control what data type will be generated.
const ( GenerateMixedData GenerateDataType = iota GenerateRawData GenerateStructuredData )
type GenericRecordFormatter ¶ added in v0.5.0
GenericRecordFormatter is a formatter that uses a Converter and Encoder to format a record.
func (GenericRecordFormatter) Configure ¶ added in v0.5.0
func (rf GenericRecordFormatter) Configure(optRaw string) (RecordFormatter, error)
func (GenericRecordFormatter) Format ¶ added in v0.5.0
func (rf GenericRecordFormatter) Format(r Record) ([]byte, error)
Format converts and encodes record into a byte array.
func (GenericRecordFormatter) Name ¶ added in v0.5.0
func (rf GenericRecordFormatter) Name() string
Name returns the name of the record formatter combined from the converter name and encoder name.
type JSONEncoder ¶ added in v0.5.0
type JSONEncoder struct{}
JSONEncoder is an Encoder that outputs JSON.
func (JSONEncoder) Configure ¶ added in v0.5.0
func (e JSONEncoder) Configure(map[string]string) (Encoder, error)
func (JSONEncoder) Name ¶ added in v0.5.0
func (e JSONEncoder) Name() string
type Metadata ¶ added in v0.3.0
func (Metadata) GetCollection ¶ added in v0.9.0
GetCollection returns the value for key MetadataCollection. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDLQNackError ¶ added in v0.9.0
GetConduitDLQNackError returns the value for key MetadataConduitDLQNackError. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDLQNackNodeID ¶ added in v0.9.0
GetConduitDLQNackNodeID returns the value for key MetadataConduitDLQNackNodeID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDestinationPluginName ¶ added in v0.4.0
GetConduitDestinationPluginName returns the value for key MetadataConduitDestinationPluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDestinationPluginVersion ¶ added in v0.4.0
GetConduitDestinationPluginVersion returns the value for key MetadataConduitDestinationPluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourceConnectorID ¶ added in v0.9.0
GetConduitSourceConnectorID returns the value for key MetadataConduitSourceConnectorID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourcePluginName ¶ added in v0.4.0
GetConduitSourcePluginName returns the value for key MetadataConduitSourcePluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourcePluginVersion ¶ added in v0.4.0
GetConduitSourcePluginVersion returns the value for key MetadataConduitSourcePluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetCreatedAt ¶ added in v0.3.0
GetCreatedAt parses the value for key MetadataCreatedAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.
func (Metadata) GetOpenCDCVersion ¶ added in v0.3.0
GetOpenCDCVersion returns the value for key MetadataOpenCDCVersion. If the value is does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetReadAt ¶ added in v0.3.0
GetReadAt parses the value for key MetadataReadAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.
func (Metadata) SetCollection ¶ added in v0.9.0
SetCollection sets the metadata value for key MetadataCollection.
func (Metadata) SetConduitDLQNackError ¶ added in v0.9.0
SetConduitDLQNackError sets the metadata value for key MetadataConduitDLQNackError.
func (Metadata) SetConduitDLQNackNodeID ¶ added in v0.9.0
SetConduitDLQNackNodeID sets the metadata value for key MetadataConduitDLQNackNodeID.
func (Metadata) SetConduitDestinationPluginName ¶ added in v0.4.0
SetConduitDestinationPluginName sets the metadata value for key MetadataConduitDestinationPluginName.
func (Metadata) SetConduitDestinationPluginVersion ¶ added in v0.4.0
SetConduitDestinationPluginVersion sets the metadata value for key MetadataConduitDestinationPluginVersion.
func (Metadata) SetConduitSourceConnectorID ¶ added in v0.9.0
SetConduitSourceConnectorID sets the metadata value for key MetadataConduitSourceConnectorID.
func (Metadata) SetConduitSourcePluginName ¶ added in v0.4.0
SetConduitSourcePluginName sets the metadata value for key MetadataConduitSourcePluginName.
func (Metadata) SetConduitSourcePluginVersion ¶ added in v0.4.0
SetConduitSourcePluginVersion sets the metadata value for key MetadataConduitSourcePluginVersion.
func (Metadata) SetCreatedAt ¶ added in v0.3.0
SetCreatedAt sets the metadata value for key MetadataCreatedAt as a unix timestamp in nanoseconds.
func (Metadata) SetOpenCDCVersion ¶ added in v0.3.0
func (m Metadata) SetOpenCDCVersion()
SetOpenCDCVersion sets the metadata value for key MetadataVersion to the current version of OpenCDC used.
type OpenCDCConverter ¶ added in v0.5.0
type OpenCDCConverter struct{}
OpenCDCConverter outputs an OpenCDC record (it does not change the structure of the record).
func (OpenCDCConverter) Configure ¶ added in v0.5.0
func (c OpenCDCConverter) Configure(map[string]string) (Converter, error)
func (OpenCDCConverter) Convert ¶ added in v0.5.0
func (c OpenCDCConverter) Convert(r Record) (any, error)
func (OpenCDCConverter) Name ¶ added in v0.5.0
func (c OpenCDCConverter) Name() string
type Operation ¶ added in v0.3.0
type Operation int
Operation defines what triggered the creation of a record.
func (Operation) MarshalText ¶ added in v0.3.0
func (*Operation) UnmarshalText ¶ added in v0.3.0
type Parameter ¶
type Parameter struct { // Default is the default value of the parameter, if any. Default string // Required controls if the parameter will be shown as required or optional. // Deprecated: add ValidationRequired to Parameter.Validations instead. Required bool // Description holds a description of the field and how to configure it. Description string // Type defines the parameter data type. Type ParameterType // Validations slice of validations to be checked for the parameter. Validations []Validation }
Parameter defines a single connector parameter.
type ParameterType ¶ added in v0.4.0
type ParameterType config.ParameterType
const ( ParameterTypeString ParameterType = iota + 1 ParameterTypeInt ParameterTypeFloat ParameterTypeBool ParameterTypeFile ParameterTypeDuration )
type Position ¶
type Position []byte
Position is a unique identifier for a record. It is the responsibility of the Source to choose and assign record positions, it can freely choose a format that makes sense and contains everything needed to restart a pipeline at a certain position.
type Record ¶
type Record struct { // Position uniquely represents the record. Position Position `json:"position"` // Operation defines what triggered the creation of a record. There are four // possibilities: create, update, delete or snapshot. The first three // operations are encountered during normal CDC operation, while "snapshot" // is meant to represent records during an initial load. Depending on the // operation, the record will contain either the payload before the change, // after the change, or both (see field Payload). Operation Operation `json:"operation"` // Metadata contains additional information regarding the record. Metadata Metadata `json:"metadata"` // Key represents a value that should identify the entity (e.g. database // row). Key Data `json:"key"` // Payload holds the payload change (data before and after the operation // occurred). Payload Change `json:"payload"` // contains filtered or unexported fields }
Record represents a single data record produced by a source and/or consumed by a destination connector.
type RecordFormatter ¶ added in v0.5.0
type RecordFormatter interface { Name() string Configure(string) (RecordFormatter, error) Format(Record) ([]byte, error) }
RecordFormatter is a type that can format a record to bytes. It's used in destination connectors to change the output structure and format.
type Source ¶
type Source interface { // Parameters is a map of named Parameters that describe how to configure // the Source. Parameters() map[string]Parameter // Configure is the first function to be called in a connector. It provides the // connector with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Testing if your connector can reach the configured data source should be // done in Open, not in Configure. // The connector SDK will sanitize, apply defaults and validate the // configuration before calling this function. This means that the // configuration will always contain all keys defined in Parameters // (unprovided keys will have their default values) and all non-empty // values will be of the correct type. Configure(context.Context, map[string]string) error // Open is called after Configure to signal the plugin it can prepare to // start producing records. If needed, the plugin should open connections in // this function. The position parameter will contain the position of the // last record that was successfully processed, Source should therefore // start producing records after this position. The context passed to Open // will be cancelled once the plugin receives a stop signal from Conduit. Open(context.Context, Position) error // Read returns a new Record and is supposed to block until there is either // a new record or the context gets cancelled. It can also return the error // ErrBackoffRetry to signal to the SDK it should call Read again with a // backoff retry. // If Read receives a cancelled context or the context is cancelled while // Read is running it must stop retrieving new records from the source // system and start returning records that have already been buffered. If // there are no buffered records left Read must return the context error to // signal a graceful stop. If Read returns ErrBackoffRetry while the context // is cancelled it will also signal that there are no records left and Read // won't be called again. // After Read returns an error the function won't be called again (except if // the error is ErrBackoffRetry, as mentioned above). // Read can be called concurrently with Ack. Read(context.Context) (Record, error) // Ack signals to the implementation that the record with the supplied // position was successfully processed. This method might be called after // the context of Read is already cancelled, since there might be // outstanding acks that need to be delivered. When Teardown is called it is // guaranteed there won't be any more calls to Ack. // Ack can be called concurrently with Read. Ack(context.Context, Position) error // Teardown signals to the plugin that there will be no more calls to any // other function. After Teardown returns, the plugin should be ready for a // graceful shutdown. Teardown(context.Context) error // LifecycleOnCreated is called after Configure and before Open when the // connector is run for the first time. This call will be skipped if the // connector was already started before. This method can be used to do some // initialization that needs to happen only once in the lifetime of a // connector (e.g. create a logical replication slot). Anything that the // connector creates in this method is considered to be owned by this // connector and should be cleaned up in LifecycleOnDeleted. LifecycleOnCreated(ctx context.Context, config map[string]string) error // LifecycleOnUpdated is called after Configure and before Open when the // connector configuration has changed since the last run. This call will be // skipped if the connector configuration did not change. It can be used to // update anything that was initialized in LifecycleOnCreated, in case the // configuration change affects it. LifecycleOnUpdated(ctx context.Context, configBefore, configAfter map[string]string) error // LifecycleOnDeleted is called when the connector was deleted. It will be // the only method that is called in that case. This method can be used to // clean up anything that was initialized in LifecycleOnCreated. LifecycleOnDeleted(ctx context.Context, config map[string]string) error // contains filtered or unexported methods }
Source fetches records from 3rd party resources and sends them to Conduit. All implementations must embed UnimplementedSource for forward compatibility.
func SourceWithMiddleware ¶ added in v0.3.0
func SourceWithMiddleware(d Source, middleware ...SourceMiddleware) Source
SourceWithMiddleware wraps the source into the supplied middleware.
type SourceMiddleware ¶ added in v0.3.0
SourceMiddleware wraps a Source and adds functionality to it.
func DefaultSourceMiddleware ¶ added in v0.3.0
func DefaultSourceMiddleware() []SourceMiddleware
DefaultSourceMiddleware returns a slice of middleware that should be added to all sources unless there's a good reason not to.
type SourceUtil ¶ added in v0.3.0
type SourceUtil struct{}
SourceUtil provides utility methods for implementing a source. Use it by calling Util.Source.*.
func (SourceUtil) NewRecordCreate ¶ added in v0.3.0
func (SourceUtil) NewRecordCreate( position Position, metadata Metadata, key Data, payload Data, ) Record
NewRecordCreate can be used to instantiate a record with OperationCreate.
func (SourceUtil) NewRecordDelete ¶ added in v0.3.0
func (SourceUtil) NewRecordDelete( position Position, metadata Metadata, key Data, ) Record
NewRecordDelete can be used to instantiate a record with OperationDelete.
func (SourceUtil) NewRecordSnapshot ¶ added in v0.3.0
func (SourceUtil) NewRecordSnapshot( position Position, metadata Metadata, key Data, payload Data, ) Record
NewRecordSnapshot can be used to instantiate a record with OperationSnapshot.
func (SourceUtil) NewRecordUpdate ¶ added in v0.3.0
func (SourceUtil) NewRecordUpdate( position Position, metadata Metadata, key Data, payloadBefore Data, payloadAfter Data, ) Record
NewRecordUpdate can be used to instantiate a record with OperationUpdate.
type Specification ¶
type Specification struct { // Name is the name of the plugin. Name string // Summary is a brief description of the plugin and what it does. Try not to // exceed 200 characters. Summary string // Description is a more long form area appropriate for README-like text // that the author can provide for explaining the behavior of the connector // or specific parameters. Description string // Version string. Should be prepended with `v` like Go, e.g. `v1.54.3`. Version string // Author declares the entity that created or maintains this plugin. Author string }
Specification contains general information regarding the plugin like its name and what it does.
type StructuredData ¶
type StructuredData map[string]interface{}
StructuredData contains data in form of a map with string keys and arbitrary values.
func (StructuredData) Bytes ¶
func (d StructuredData) Bytes() []byte
Bytes returns the JSON encoding of the map.
type TemplateRecordFormatter ¶ added in v0.5.0
type TemplateRecordFormatter struct {
// contains filtered or unexported fields
}
TemplateRecordFormatter is a RecordFormatter that formats a record using a Go template.
func (TemplateRecordFormatter) Configure ¶ added in v0.5.0
func (e TemplateRecordFormatter) Configure(tmpl string) (RecordFormatter, error)
func (TemplateRecordFormatter) Format ¶ added in v0.5.0
func (e TemplateRecordFormatter) Format(r Record) ([]byte, error)
func (TemplateRecordFormatter) Name ¶ added in v0.5.0
func (e TemplateRecordFormatter) Name() string
type UnimplementedDestination ¶
type UnimplementedDestination struct{}
UnimplementedDestination should be embedded to have forward compatible implementations.
func (UnimplementedDestination) Configure ¶
Configure needs to be overridden in the actual implementation.
func (UnimplementedDestination) LifecycleOnCreated ¶ added in v0.6.0
LifecycleOnCreated won't do anything by default.
func (UnimplementedDestination) LifecycleOnDeleted ¶ added in v0.6.0
LifecycleOnDeleted won't do anything by default.
func (UnimplementedDestination) LifecycleOnUpdated ¶ added in v0.6.0
func (UnimplementedDestination) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error
LifecycleOnUpdated won't do anything by default.
func (UnimplementedDestination) Open ¶
func (UnimplementedDestination) Open(context.Context) error
Open needs to be overridden in the actual implementation.
func (UnimplementedDestination) Parameters ¶ added in v0.3.0
func (UnimplementedDestination) Parameters() map[string]Parameter
Parameters needs to be overridden in the actual implementation.
type UnimplementedSource ¶
type UnimplementedSource struct{}
UnimplementedSource should be embedded to have forward compatible implementations.
func (UnimplementedSource) Ack ¶
func (UnimplementedSource) Ack(context.Context, Position) error
Ack should be overridden if acks need to be forwarded to the source, otherwise it is optional.
func (UnimplementedSource) Configure ¶
Configure needs to be overridden in the actual implementation.
func (UnimplementedSource) LifecycleOnCreated ¶ added in v0.6.0
LifecycleOnCreated won't do anything by default.
func (UnimplementedSource) LifecycleOnDeleted ¶ added in v0.6.0
LifecycleOnDeleted won't do anything by default.
func (UnimplementedSource) LifecycleOnUpdated ¶ added in v0.6.0
func (UnimplementedSource) LifecycleOnUpdated(context.Context, map[string]string, map[string]string) error
LifecycleOnUpdated won't do anything by default.
func (UnimplementedSource) Open ¶
func (UnimplementedSource) Open(context.Context, Position) error
Open needs to be overridden in the actual implementation.
func (UnimplementedSource) Parameters ¶ added in v0.3.0
func (UnimplementedSource) Parameters() map[string]Parameter
Parameters needs to be overridden in the actual implementation.
type Validation ¶ added in v0.4.0
type Validation interface {
// contains filtered or unexported methods
}
type ValidationExclusion ¶ added in v0.4.0
type ValidationExclusion struct {
List []string
}
type ValidationGreaterThan ¶ added in v0.4.0
type ValidationGreaterThan struct {
Value float64
}
type ValidationInclusion ¶ added in v0.4.0
type ValidationInclusion struct {
List []string
}
type ValidationLessThan ¶ added in v0.4.0
type ValidationLessThan struct {
Value float64
}
type ValidationRegex ¶ added in v0.4.0
type ValidationRequired ¶ added in v0.4.0
type ValidationRequired struct{}
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.
|
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data. |