tidb_cdc

package module
v0.0.0-...-b7a0e97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2022 License: MIT Imports: 15 Imported by: 0

README

tidb-cdc

tidb-cdc maxwell 解析

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetColumnNameFromTag

func GetColumnNameFromTag(tag string) string

GetColumnNameFromTag ...

func GetModelFields

func GetModelFields(v reflect.Value) []reflect.StructField

func InitConnectorWithGroupMap

func InitConnectorWithGroupMap(config *Configuration, groupMaps ...map[string]RegistrationHelper)

InitConnectorWithGroupMap ...

func IsInnerStruct

func IsInnerStruct(structField reflect.StructField) bool

func MaxwellUnmarshal

func MaxwellUnmarshal(kMap map[string]interface{}, r interface{}) error

func StartConnector

func StartConnector()

StartConnector ...

Types

type CDCSchema

type CDCSchema struct {
	Database string                 `json:"database"` // 数据库
	Table    string                 `json:"table"`    // 表
	Type     CDCType                `json:"type"`     // delete, insert, update
	Ts       int                    `json:"ts"`       // 时间
	Data     map[string]interface{} `json:"data"`
	Old      map[string]interface{} `json:"old"`
}

type CDCType

type CDCType string
const (
	DELETE CDCType = "delete"
	INSERT CDCType = "insert"
	UPDATE CDCType = "update"
)

type Configuration

type Configuration struct {
	// 基础Kafka信息
	KafkaConfig KafkaConfiguration
	// 对应初版中的 TopicPrefix,由于需要支持一个 connector 监控多个库,
	// 故改为 connector server name,database name 当作参数传入
	ServerName string
	// default: 1
	NumberOfConsumers int
	// default: 2.1.1
	KafkaVersion string
	// default: range
	Assignor string
	// only read latest message in topics
	Oldest  bool
	Verbose bool
}

Configuration 专用于构建 connector 的 config

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector ...

func NewConnector

func NewConnector(config *Configuration) *Connector

NewConnector ...

func (*Connector) AddConsumerGroup

func (c *Connector) AddConsumerGroup(groupName string, cg *ConsumerGroup)

AddConsumerGroup ...

func (*Connector) Register

func (c *Connector) Register(groupName string, dbName string, tableName string, model interface{}, handlers []ConsumerHandler)

Register ...

func (*Connector) RegisterByGroupMap

func (c *Connector) RegisterByGroupMap(groupMap map[string]RegistrationHelper)

RegisterByGroupMap key: GroupName; value: RegistrationHelper

func (*Connector) Start

func (c *Connector) Start()

Start ...

type Consumer

type Consumer struct {
	Typ              reflect.Type
	ConsumerHandlers []ConsumerHandler
	// contains filtered or unexported fields
}

Consumer ...

func (*Consumer) Cleanup

func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

TODO: 改 ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerGroup

type ConsumerGroup struct {
	// 反序列化类型信息
	Typ     reflect.Type
	Brokers []string
	Topic   string
	// 实际消费消息方法
	ConsumerHandlers []ConsumerHandler
	NumOfConsumers   int

	Version  string
	Group    string
	Assignor string
	Oldest   bool
	Verbose  bool
}

ConsumerGroup ...

func NewConsumerGroup

func NewConsumerGroup(
	typ reflect.Type,
	brokers []string,
	topic string,
	numOfConsumers int,
	kafkaVersion string,
	group string,
	assignor string,
	oldest bool,
	verbose bool,
) ConsumerGroup

NewConsumerGroup ...

func (*ConsumerGroup) AddHandler

func (c *ConsumerGroup) AddHandler(handler ConsumerHandler)

AddHandler ...

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start()

Start ...

type ConsumerHandler

type ConsumerHandler interface {
	Create(after interface{}) error
	Update(before interface{}, after interface{}) error
	Delete(before interface{}) error
}

ConsumerHandler ...

type HandlerFunc

type HandlerFunc func(before, after interface{}) error

HandlerFunc ...

type KafkaConfiguration

type KafkaConfiguration struct {
	Brokers    []string
	User       string
	Password   string
	EnableSASL bool
}

KafkaConfiguration ...

type Message

type Message struct {
	After  *json.RawMessage `json:"after"`
	Before *json.RawMessage `json:"before"`
	Op     string           `json:"op"`
}

Message ...

type RegistrationHelper

type RegistrationHelper struct {
	DBName    string
	TableName string
	Model     interface{}
	Handlers  []ConsumerHandler
}

RegistrationHelper ...

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL