mq_protocol_tests

command
v0.0.0-...-c428d36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

Integration Framework

Introduction

The Integration Framework is designed to provide a flexible way for contributors to write integration tests for new sinks or MQ protocols. The core of the framework is stored in {ticdc_root}/tests/mq_protocol_tests/framework, and test cases should be stored in {ticdc_root}/tests/mq_protocol_tests/cases. Currently, although the Framework is still under active development, it is capable of helping test Avro support and it is the only officially supported way for developers to run integration tests with Kafka connect.

Quick Start

To create a test case, you need to:

  • create a struct that implements the Task interface,
  • and ask the Environment to run the task in the main function in integration.go. Note that the second step will be automated soon.
// Task represents a single test case
type Task interface {
	Name() string
	GetCDCProfile() *CDCProfile
	Prepare(taskContext *TaskContext) error
	Run(taskContext *TaskContext) error
}

For the time being, if you would like to write a test case for Avro and Canal, it is recommended to write a base case which define the common operations of test, and write construct function, pass canal.SingleTableTask or canal.SingleTableTask as parameters, which execute the necessary setup steps, including creating the Kafka Connect sink and creating the changefeed with appropriate configurations.

Example:

// cases/base_mycase.go
type MyCase struct {
	framework.Task
}

func NewMyCase(task framework) *MyCase{
	return &MyCase{
        Task: task,  
    }   
}

func (c *MyCase) Name() string {
	return "My Case"
}

func (c *MyCase) Run(ctx *framework.TaskContext) error {
	_, err := ctx.Upstream.ExecContext(ctx.Ctx, "create table test (id int primary key, value int)")
	if err != nil {
		return err
	}

	// Get a handle of an existing table
	table := ctx.SQLHelper().GetTable("test")
	// Create an SQL request, send it to the upstream, wait for completion and check the correctness of replication
	err = table.Insert(map[string]interface{}{
		"id":    0,
		"value": 0,
	}).Send().Wait().Check()
	if err != nil {
		return errors.AddStack(err)
	}

	// To wait on a batch of SQL requests, create a slice of Awaitables
	reqs := make([]framework.Awaitable, 0)
	for i := 1; i < 1000; i++ {
		// Only send, do not wait
		req := table.Insert(map[string]interface{}{
			"id":    i,
			"value": i,
		}).Send()
		reqs = append(reqs, req)
	}

	// Wait on SQL requests in batch and check the correctness
	return framework.All(ctx.SQLHelper(), reqs).Wait().Check()
}


// main.go
func main() {
    task := &canal.SingleTableTask{TableName: "test"}
    testCases := []framework.Task{
        tests.NewMyCase(task),
    }
    task := &avro.SingleTableTask{TableName: "test"}
    testCases := []framework.Task{
        tests.NewMyCase(task),
    }
    //run
}

Documentation

The Go Gopher

There is no documentation for this package.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL