serializer

package
v1.0.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OperationCreate = "CREATE"
	OperationUpdate = "UPDATE"
	OperationDelete = "DELETE"
)
View Source
const DefaultMessageBufferSize = 10

Variables

This section is empty.

Functions

This section is empty.

Types

type ExtraMaskInfo

type ExtraMaskInfo struct {
	Masked     bool
	ColumnType string
	DefaultVal string
}

type MaskInfo

type MaskInfo struct {
	Masked bool

	SortCol bool
	DistCol bool

	LengthCol              bool
	MobileCol              bool
	MappingPIICol          bool
	ConditionalNonPIICol   bool
	DependentNonPIICol     bool
	RegexPatternBooleanCol bool
}

type Message

type Message struct {
	SchemaId  int
	Topic     string
	Partition int32
	Offset    int64
	Key       string
	Value     interface{}
	Bytes     int64

	Operation       string
	MaskSchema      map[string]MaskInfo
	ExtraMaskSchema map[string]ExtraMaskInfo
}

type MessageAsyncBatch

type MessageAsyncBatch struct {
	// contains filtered or unexported fields
}

func NewMessageAsyncBatch

func NewMessageAsyncBatch(
	topic string,
	partition int32,
	maxSize int,
	maxBufSize int,
	maxBytesPerBatch *int64,
	processChan chan []*Message,
) *MessageAsyncBatch

func (*MessageAsyncBatch) Flush

func (b *MessageAsyncBatch) Flush(ctx context.Context)

func (*MessageAsyncBatch) Insert

func (b *MessageAsyncBatch) Insert(ctx context.Context, msg *Message)

insert makes the batch and also and flushes to the processor if batchSize >= maxSize

type MessageBatchAsyncProcessor

type MessageBatchAsyncProcessor interface {
	Process(
		wg *sync.WaitGroup,
		session sarama.ConsumerGroupSession,
		processChan <-chan []*Message,
		errChan chan<- error,
	)
}

type MessageBatchSyncProcessor

type MessageBatchSyncProcessor interface {
	Process(session sarama.ConsumerGroupSession, msgBuf []*Message) error
}

type MessageSyncBatch

type MessageSyncBatch struct {
	// contains filtered or unexported fields
}

func NewMessageSyncBatch

func NewMessageSyncBatch(
	topic string,
	partition int32,
	maxSize int,
	maxBufSize int,
	maxBytesPerBatch *int64,
	processor MessageBatchSyncProcessor,
) *MessageSyncBatch

func (*MessageSyncBatch) Insert

func (b *MessageSyncBatch) Insert(msg *Message)

func (*MessageSyncBatch) Process

func (b *MessageSyncBatch) Process(session sarama.ConsumerGroupSession) error

process calls the processor to process the batch

func (*MessageSyncBatch) Size

func (b *MessageSyncBatch) Size() int

func (*MessageSyncBatch) SizeHit

func (b *MessageSyncBatch) SizeHit(batchBytes int64) bool

type Serializer

type Serializer interface {
	Deserialize(message *sarama.ConsumerMessage) (*Message, error)
}

func NewSerializer

func NewSerializer(schemaRegistryURL string) Serializer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL