pgcapture

package
v0.0.59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const TableRegexOption = "TableRegex"

Variables

View Source
var DefaultErrorFn = func(source source.Change, err error) {}

Functions

func MarshalJSON

func MarshalJSON(m Model) ([]byte, error)

func ModelName

func ModelName(namespace, table string) string

Types

type BounceHandler

type BounceHandler interface {
	Initialize(ctx context.Context, mh ModelAsyncHandlers) error
	Handle(fn ModelAsyncHandlerFunc, checkpoint cursor.Checkpoint, change Change)
}

type Change

type Change struct {
	Op         pb.Change_Operation
	Checkpoint cursor.Checkpoint
	New        interface{}
	Old        interface{}
}

type Consumer

type Consumer struct {
	Source  source.RequeueSource
	Bouncer BounceHandler
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(ctx context.Context, conn *grpc.ClientConn, option ConsumerOption) *Consumer

NewConsumer Deprecated: please use NewDBLogConsumer instead

func NewDBLogConsumer

func NewDBLogConsumer(ctx context.Context, conn *grpc.ClientConn, option ConsumerOption) *Consumer

func NewSimpleConsumer

func NewSimpleConsumer(ctx context.Context, src source.RequeueSource, option ConsumerOption) *Consumer

func (*Consumer) Consume

func (c *Consumer) Consume(mh ModelHandlers) error

func (*Consumer) ConsumeAsync

func (c *Consumer) ConsumeAsync(mh ModelAsyncHandlers) error

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerOption

type ConsumerOption struct {
	URI              string
	TableRegex       string
	DebounceInterval time.Duration
	OnDecodeError    OnDecodeError
}

type DBLogGatewayConsumer

type DBLogGatewayConsumer struct {
	// contains filtered or unexported fields
}

func (*DBLogGatewayConsumer) Capture

func (c *DBLogGatewayConsumer) Capture(cp cursor.Checkpoint) (changes chan source.Change, err error)

func (*DBLogGatewayConsumer) Commit

func (c *DBLogGatewayConsumer) Commit(cp cursor.Checkpoint)

func (*DBLogGatewayConsumer) Error

func (c *DBLogGatewayConsumer) Error() error

func (*DBLogGatewayConsumer) Requeue

func (c *DBLogGatewayConsumer) Requeue(cp cursor.Checkpoint, reason string)

func (*DBLogGatewayConsumer) Stop

func (c *DBLogGatewayConsumer) Stop() error

type DebounceHandler

type DebounceHandler struct {
	Interval time.Duration
	// contains filtered or unexported fields
}

func (*DebounceHandler) Handle

func (b *DebounceHandler) Handle(fn ModelAsyncHandlerFunc, checkpoint cursor.Checkpoint, change Change)

func (*DebounceHandler) Initialize

func (b *DebounceHandler) Initialize(ctx context.Context, mh ModelAsyncHandlers) error

type DebounceModel

type DebounceModel interface {
	Model
	DebounceKey() string
}

type Model

type Model interface {
	TableName() (schema, table string)
}

type ModelAsyncHandlerFunc

type ModelAsyncHandlerFunc func(change Change, done func(err error))

type ModelAsyncHandlers

type ModelAsyncHandlers map[Model]ModelAsyncHandlerFunc

type ModelHandlerFunc

type ModelHandlerFunc func(change Change) error

type ModelHandlers

type ModelHandlers map[Model]ModelHandlerFunc

type NoBounceHandler

type NoBounceHandler struct {
	// contains filtered or unexported fields
}

func (*NoBounceHandler) Handle

func (b *NoBounceHandler) Handle(fn ModelAsyncHandlerFunc, checkpoint cursor.Checkpoint, change Change)

func (*NoBounceHandler) Initialize

func (b *NoBounceHandler) Initialize(ctx context.Context, mh ModelAsyncHandlers) error

type OnDecodeError

type OnDecodeError func(source source.Change, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL