readers

package
v0.0.0-...-48e7155 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetReaderConfigConstructor

func GetReaderConfigConstructor(name string) func() interface{}

GetReaderConfigConstructor 获取读取器配置构造函数

func NewHttpReaderConfigFunc

func NewHttpReaderConfigFunc() interface{}

func NewKafkaReaderConfigFunc

func NewKafkaReaderConfigFunc() interface{}

func SetReaderConfigConstructor

func SetReaderConfigConstructor(name string, fn func() interface{})

SetReaderConfigConstructor 注册读取器配置构造函数

func SetReaderConstructor

func SetReaderConstructor(name string, fn ReaderConstructor)

SetReaderConstructor 注册读取器构造函数

Types

type HttpReader

type HttpReader struct {
	ReaderBase
	// contains filtered or unexported fields
}

func (*HttpReader) Close

func (h *HttpReader) Close() error

func (*HttpReader) Complete

func (h *HttpReader) Complete(params *types.BinlogParams) error

func (*HttpReader) Read

func (h *HttpReader) Read() (*types.BinlogParams, error)

type HttpReaderConfig

type HttpReaderConfig struct {
	Listen       string        `json:"listen" yaml:"listen"`                                                  // 监听端口
	PushPath     string        `json:"push_path" yaml:"push_path"`                                            // 请求接收路径
	PreParamsLen int32         `json:"pre_params_len,omitempty" yaml:"pre_params_len,omitempty" default:"10"` // params 管道缓冲长度
	PushTimeout  time.Duration `json:"push_timeout,omitempty" yaml:"push_timeout,omitempty" default:"1s"`     // 超时时间
}

func (*HttpReaderConfig) Equal

func (h *HttpReaderConfig) Equal(config ReaderConfig) bool

func (*HttpReaderConfig) GetUniqueId

func (h *HttpReaderConfig) GetUniqueId() string

type KafkaReader

type KafkaReader struct {
	ReaderBase
	// contains filtered or unexported fields
}

func (*KafkaReader) Close

func (k *KafkaReader) Close() error

func (*KafkaReader) Complete

func (k *KafkaReader) Complete(params *types.BinlogParams) error

func (*KafkaReader) Read

func (k *KafkaReader) Read() (*types.BinlogParams, error)

type KafkaReaderConfig

type KafkaReaderConfig struct {
	Brokers        []string      `json:"brokers" yaml:"brokers" default:"localhost:9092"`
	Username       string        `json:"username,omitempty" yaml:"username,omitempty"`
	Password       string        `json:"password,omitempty" yaml:"password,omitempty"`
	Group          string        `json:"group" yaml:"group" default:"test"`
	Topic          string        `json:"topic" yaml:"topic"`
	Partition      int           `json:"partition" yaml:"partition"`
	MinBytes       int           `json:"min_bytes,omitempty" yaml:"min_bytes,omitempty" default:"10240"`
	MaxBytes       int           `json:"max_bytes,omitempty" yaml:"max_bytes,omitempty" default:"10485760"`
	StartOffset    int64         `json:"start_offset,omitempty" yaml:"start_offset,omitempty" default:"-1"`
	MaxWait        time.Duration `json:"max_wait,omitempty" yaml:"max_wait,omitempty" default:"1s"`
	CommitInterval time.Duration `json:"commit_interval,omitempty" yaml:"commit_interval,omitempty" default:"1s"`
	QueueCapacity  int           `json:"queue_capacity,omitempty" yaml:"queue_capacity,omitempty" default:"1000"`
}

func (*KafkaReaderConfig) Equal

func (k *KafkaReaderConfig) Equal(config ReaderConfig) bool

func (*KafkaReaderConfig) GetUniqueId

func (k *KafkaReaderConfig) GetUniqueId() string

type Reader

type Reader interface {
	Read() (*types.BinlogParams, error)
	Complete(params *types.BinlogParams) error
	GetConfig() ReaderConfig
	GetCtx() context.Context
	Close() error
}

func NewHttpReaderFunc

func NewHttpReaderFunc(conf ReaderConfig, wg *sync.WaitGroup, ctx context.Context) (Reader, error)

func NewKafkaReaderFunc

func NewKafkaReaderFunc(conf ReaderConfig, wg *sync.WaitGroup, parent context.Context) (Reader, error)

type ReaderBase

type ReaderBase struct {
	// contains filtered or unexported fields
}

func NewReaderBase

func NewReaderBase(conf ReaderConfig, parent context.Context) ReaderBase

func (*ReaderBase) FirstClose

func (r *ReaderBase) FirstClose() bool

func (*ReaderBase) GetConfig

func (r *ReaderBase) GetConfig() ReaderConfig

func (*ReaderBase) GetCtx

func (r *ReaderBase) GetCtx() context.Context

type ReaderConfig

type ReaderConfig interface {
	GetUniqueId() string
	Equal(ReaderConfig) bool
}

type ReaderConfigByType

type ReaderConfigByType struct {
	Type   string       `json:"type"`
	Config ReaderConfig `json:"-"`
}

ReaderConfigByType 读取器配置携带type参数 json 反序列化时通过type获取具体的类型,再进行实例化

func (*ReaderConfigByType) UnmarshalJSON

func (r *ReaderConfigByType) UnmarshalJSON(bytes []byte) error

UnmarshalJSON 根据type 获取到具体到 config 结构体,并重新序列化赋值

type ReaderConstructor

type ReaderConstructor func(ReaderConfig, *sync.WaitGroup, context.Context) (Reader, error)

func GetReaderConstructor

func GetReaderConstructor(name string) ReaderConstructor

GetReaderConstructor 获取读取器构造函数

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL