Documentation ¶
Index ¶
- func GetColumnNameFromTag(tag string) string
- func GetModelFields(v reflect.Value) []reflect.StructField
- func InitConnectorWithGroupMap(config *Configuration, groupMaps ...map[string]RegistrationHelper)
- func IsInnerStruct(structField reflect.StructField) bool
- func MaxwellUnmarshal(kMap map[string]interface{}, r interface{}) error
- func StartConnector()
- type CDCSchema
- type CDCType
- type Configuration
- type Connector
- type Consumer
- type ConsumerGroup
- type ConsumerHandler
- type HandlerFunc
- type KafkaConfiguration
- type Message
- type RegistrationHelper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetModelFields ¶
func GetModelFields(v reflect.Value) []reflect.StructField
func InitConnectorWithGroupMap ¶
func InitConnectorWithGroupMap(config *Configuration, groupMaps ...map[string]RegistrationHelper)
InitConnectorWithGroupMap ...
func IsInnerStruct ¶
func IsInnerStruct(structField reflect.StructField) bool
func MaxwellUnmarshal ¶
Types ¶
type Configuration ¶
type Configuration struct { // 基础Kafka信息 KafkaConfig KafkaConfiguration // 对应初版中的 TopicPrefix,由于需要支持一个 connector 监控多个库, // 故改为 connector server name,database name 当作参数传入 ServerName string // default: 1 NumberOfConsumers int // default: 2.1.1 KafkaVersion string // default: range Assignor string // only read latest message in topics Oldest bool Verbose bool }
Configuration 专用于构建 connector 的 config
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector ...
func (*Connector) AddConsumerGroup ¶
func (c *Connector) AddConsumerGroup(groupName string, cg *ConsumerGroup)
AddConsumerGroup ...
func (*Connector) Register ¶
func (c *Connector) Register(groupName string, dbName string, tableName string, model interface{}, handlers []ConsumerHandler)
Register ...
func (*Connector) RegisterByGroupMap ¶
func (c *Connector) RegisterByGroupMap(groupMap map[string]RegistrationHelper)
RegisterByGroupMap key: GroupName; value: RegistrationHelper
type Consumer ¶
type Consumer struct { Typ reflect.Type ConsumerHandlers []ConsumerHandler // contains filtered or unexported fields }
Consumer ...
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
TODO: 改 ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
type ConsumerGroup ¶
type ConsumerGroup struct { // 反序列化类型信息 Typ reflect.Type Brokers []string Topic string // 实际消费消息方法 ConsumerHandlers []ConsumerHandler NumOfConsumers int Version string Group string Assignor string Oldest bool Verbose bool }
ConsumerGroup ...
func NewConsumerGroup ¶
func NewConsumerGroup( typ reflect.Type, brokers []string, topic string, numOfConsumers int, kafkaVersion string, group string, assignor string, oldest bool, verbose bool, ) ConsumerGroup
NewConsumerGroup ...
func (*ConsumerGroup) AddHandler ¶
func (c *ConsumerGroup) AddHandler(handler ConsumerHandler)
AddHandler ...
type ConsumerHandler ¶
type ConsumerHandler interface { Create(after interface{}) error Update(before interface{}, after interface{}) error Delete(before interface{}) error }
ConsumerHandler ...
type KafkaConfiguration ¶
KafkaConfiguration ...
type Message ¶
type Message struct { After *json.RawMessage `json:"after"` Before *json.RawMessage `json:"before"` Op string `json:"op"` }
Message ...
type RegistrationHelper ¶
type RegistrationHelper struct { DBName string TableName string Model interface{} Handlers []ConsumerHandler }
RegistrationHelper ...
Click to show internal directories.
Click to hide internal directories.