builder

package
v0.0.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: BSD-3-Clause Imports: 7 Imported by: 0

Documentation

Overview

Package builder implements a partitioning record batch builder.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

Uses Fnv32a.

func (*HashPartitioner) Partition

func (p *HashPartitioner) Partition(key []byte) int32

func (*HashPartitioner) SetNumPartitions

func (p *HashPartitioner) SetNumPartitions(n int32)

type NopPartitioner

type NopPartitioner struct{}

Always sets partition to -1.

func (*NopPartitioner) Partition

func (*NopPartitioner) Partition([]byte) int32

func (*NopPartitioner) SetNumPartitions

func (*NopPartitioner) SetNumPartitions(int32)

type Partitioner

type Partitioner interface {
	// Partition must be safe for concurrent use. Nil is a valid value for
	// the key. Behavior for when number of partitions set with
	// SetNumPartitions <1 is undefined.
	Partition(key []byte) int32
	// SetNumPartitions does not need to be safe for concurrent use. The
	// intention is that it be called once as part of builder setup.
	// Behavior on subsequent calls, or on calls while partitioner is being
	// used, is undefined. Number of partitions should be >0. If <1
	// behavior is undefined.
	SetNumPartitions(int32)
}

type Record

type Record interface {
	Key() []byte
	Value() []byte
}

type SequentialBuilder

type SequentialBuilder struct {
	// Compressor must be safe for concurrent use
	Compressor batch.Compressor
	// Each batch will have at least this many records. There is no "max":
	// user can send slices or any size on the input channel. It is up to
	// the user to enforce sanity of input slices.
	MinRecords int
	// Each batch will have uncompressed payload (sum of uncompressed
	// record values) of at least this many bytes. Combined with MinRecords
	// (both have to be true) this determines when to "flush".
	MinUncompressedBytes int
	// Incoming records are collected into sets, the size of which (the
	// number of records in each set) is determined by MinRecords and
	// MinUncompressedBytes. Each of these sets of records must be built
	// into a batch: records need to be serialized into wire format and
	// then compressed.
	//
	// Each set of records is processed by a worker and results in a single
	// producer.Batch. NumWorkers determines the number of workers doing
	// the serialization and compression. This is most likely the most
	// expensive part of the whole pipeline (especially when compression is
	// enabled) so set this accordingly (but doesn't make sense for it to
	// be more than the number of available cores). Must be >0
	NumWorkers int
	// Partitioner to use. Max number of "in flight" batches will be equal
	// to number of partitions plus NumWorkers. See NopPartitioner.
	Partitioner Partitioner
	// contains filtered or unexported fields
}

Builder for record batches. Make sure to set public field values before calling Start. Do not change them after calling Start. Safe for concurrent use.

The builder collects records from its input channel. It groups (partitions) incoming records. When a group (set of records for given partition) reaches thresholds defined by MinRecords and MinUncompressedBytes, it is sent to a worker that marshals the records into a batch and compresses the batch.

func (*SequentialBuilder) Flush

func (b *SequentialBuilder) Flush(d time.Duration)

Flush all batches "older" than d. If the builder is partitioned, will look through all partition batches; if the builder is not partitioned, there is only 1 batch. This takes precedence over MinRecords and MinUncompressedBytes. Passing d=0 immediately flushes all batches. Empty batches are never flushed. Does not block. If there is already a flush enqueued / in progress, flush returns immediately as nop.

func (*SequentialBuilder) Start

func (b *SequentialBuilder) Start(input <-chan []Record) <-chan *producer.Batch

Start building batches. Returns channel on which workers return completed batches. The depth of that channel is equal to the number of workers. When input channel is closed the workers drain it, output any remaining batches (even if smaller than MinRecords), exit, and the output channel is closed. It is more efficient to send multiple records at a time on the input channel but the size of the input slices is independent of MinRecords (and so open to abuse: you could send a huge input slice; up to you to ensure slice sanity). Empty slices and nil records within slices are silently dropped, and so batches returned on the output channel SHOULD always be error free and have >0 records. You should call Start only once.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL