endpoints

package
v0.0.0-...-3491b9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EndpointTypeKafka = 1
	EndpointTypeES    = 2
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BulkItem

type BulkItem esutil.BulkIndexerItem

type Config

type Config struct {
	EndpointType  int
	Kafka         KafkaConfig
	Elasticsearch EsConfig
}

type ElasticsearchConfig

type ElasticsearchConfig elasticsearch.Config

type EsBulkIdxCfg

type EsBulkIdxCfg esutil.BulkIndexerConfig

type EsConfig

type EsConfig struct {
	BulkIndexerCfg   EsBulkIdxCfg
	ElasticsearchCfg ElasticsearchConfig

	OnSuccess func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem)
	OnFailure func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem, error)
	Hook      func(data *handler.BinlogEvent) (item BulkItem, err error)
}

type EsEndpoint

type EsEndpoint struct {
	Cli *elasticsearch.Client

	OnSuccess func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem)
	OnFailure func(context.Context, esutil.BulkIndexerItem, esutil.BulkIndexerResponseItem, error)

	Hook func(data *handler.BinlogEvent) (item BulkItem, err error)
	// contains filtered or unexported fields
}

func NewEsEndpoint

func NewEsEndpoint(cfg EsConfig) (*EsEndpoint, error)

func (*EsEndpoint) Add

func (e *EsEndpoint) Add(item BulkItem) error

func (*EsEndpoint) Close

func (e *EsEndpoint) Close() (err error)

func (*EsEndpoint) Send

func (e *EsEndpoint) Send(data *handler.BinlogEvent) error

type IEndpoint

type IEndpoint interface {
	Send(data *handler.BinlogEvent) error
	Close() error
}

func NewEndPoint

func NewEndPoint(cfg Config) (IEndpoint, error)

type KafkaCfg

type KafkaCfg sarama.Config

type KafkaConfig

type KafkaConfig struct {
	Addrs          []string
	Cfg            KafkaCfg
	OnSuccess      func(msg *sarama.ProducerMessage)
	OnFailure      func(err *sarama.ProducerError)
	TopicConfigMap map[string]TopicCfg `json:"topic_config_map"`
}

type KafkaEndPoint

type KafkaEndPoint struct {
	Addrs []string
	Cfg   sarama.Config

	TopicCfgMap map[string]TopicCfg
	// contains filtered or unexported fields
}
var DefaultEndpoint *KafkaEndPoint

func NewKafkaEndpoint

func NewKafkaEndpoint(kafkaCfg KafkaConfig) (*KafkaEndPoint, error)

func (*KafkaEndPoint) Close

func (p *KafkaEndPoint) Close() (err error)

func (*KafkaEndPoint) Init

func (p *KafkaEndPoint) Init() error

func (*KafkaEndPoint) Ping

func (p *KafkaEndPoint) Ping() error

func (*KafkaEndPoint) Send

func (p *KafkaEndPoint) Send(data *handler.BinlogEvent) error

type TopicCfg

type TopicCfg struct {
	Topic string `json:"topic"`
	Shard int    `json:"shard_count"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL