batch

package
v0.0.30 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: BSD-3-Clause Imports: 11 Imported by: 2

Documentation

Overview

Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.

Producing

When producting messages, call NewBuilder, and Add records to it. Call Builder.Build and pass the returned Batch to the producer. Release the reference to Builder when done with it to release references to added records.

Fetching ("consuming")

Fetch result (if successful) will contain RecordSet. Call its Batches method to get byte slices containing individual batches. Unmarshal each batch individually. To get individual records, call Batch.Records and then record.Unmarshal. Passing around batches is much more efficient than passing individual records, so save record unmarshaling until the very end.

Index

Constants

View Source
const (
	TimestampCreate    = 0b0000
	TimestampLogAppend = 0b1000
)

Variables

View Source
var (
	ErrEmpty     = errors.New("empty batch")
	ErrNilRecord = errors.New("nil record in batch")
)
View Source
var (
	CorruptedBatchError   = errors.New("batch crc does not match bytes")
	UnsupportedMagicError = errors.New("magic value is not 2")
)

Functions

This section is empty.

Types

type Batch

type Batch struct {
	BaseOffset           int64
	BatchLengthBytes     int32
	PartitionLeaderEpoch int32
	Magic                int8 // this should be =2
	Crc                  uint32
	Attributes           int16
	LastOffsetDelta      int32 // NumRecords-1 // TODO: is this always true?
	FirstTimestamp       int64 // ms since epoch
	MaxTimestamp         int64 // ms since epoch
	ProducerId           int64 // for transactions only see KIP-360
	ProducerEpoch        int16 // for transactions only see KIP-360
	BaseSequence         int32
	NumRecords           int32 // LastOffsetDelta+1
	//
	MarshaledRecords []byte `wire:"omit" json:"-"`
}

Batch defines Kafka record batch in wire format. Not safe for concurrent use.

func Unmarshal

func Unmarshal(b []byte) (*Batch, error)

Unmarshal the batch. On error batch is nil. If there is an error, it is most likely because the crc failed. In that case there is no way to tell how many records there were in the batch (and to adjust offsets accordingly).

func (*Batch) Compress added in v0.0.5

func (batch *Batch) Compress(c Compressor) error

Compress batch records with supplied compressor. Mutates batch on success only. Call before Marshal. Not idempotent (on success).

func (*Batch) CompressionType

func (batch *Batch) CompressionType() int16

func (*Batch) Decompress added in v0.0.5

func (batch *Batch) Decompress(d Decompressor) error

Decompress batch with supplied decompressor. Mutates batch. Call after Unmarshal and before Records. Not idempotent.

func (*Batch) LastOffset added in v0.0.3

func (batch *Batch) LastOffset() int64

func (*Batch) Marshal

func (batch *Batch) Marshal() RecordSet

Marshal batch header and append marshaled records. If you want the batch to be compressed call Compress before Marshal. Mutates the batch Crc.

func (*Batch) Records

func (batch *Batch) Records() [][]byte

Records retrieves individual records from the batch. If batch records are compressed you must call Decompress first.

func (*Batch) TimestampType

func (batch *Batch) TimestampType() int16

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is used for building record batches. There is no limit on the number of records (up to the user). Not safe for concurrent use.

func NewBuilder

func NewBuilder(now time.Time) *Builder

func (*Builder) Add

func (b *Builder) Add(records ...*record.Record)

Add records to the batch. References to added records are not released on call to Build. This means you can add more records and call Build again. Don't know why you would want to, but you can.

func (*Builder) AddStrings

func (b *Builder) AddStrings(values ...string) *Builder

func (*Builder) Build

func (b *Builder) Build(now time.Time) (*Batch, error)

Build a record batch (marshal individual records and set batch metadata). Call this after adding records to the batch. Returns ErrEmpty if batch has no records. Returns ErrNilRecord if any of the records is nil. Marshaled records are not compressed (call Batch.Compress). Batch FirstTimestamp is set to the time when the builder was created (with NewBuilder) and the MaxTimestamp is set to the time passed to Build. Within the batch, each record's TimestampDelta is 0, meaning that all records will appear to have been produced at the time the builder was created (TODO: change? how?) Idempotent.

func (*Builder) NumRecords

func (b *Builder) NumRecords() int

NumRecords that have been added to the builder.

type Compressor

type Compressor interface {
	Compress([]byte) ([]byte, error)
	Type() int16
}

type Decompressor

type Decompressor interface {
	Decompress([]byte) ([]byte, error)
	Type() int16
}

type RecordSet

type RecordSet []byte

RecordSet is composed of 1 or more record batches. Fetch API calls respond with record sets. Byte representation of a record set with only one record batch is identical to the record batch.

func (RecordSet) Batches added in v0.0.3

func (b RecordSet) Batches() [][]byte

Batches returns the batches in the record set. Because Kafka limits response byte sizes, the last record batch in the set may be truncated (bytes will be missing from the end). In such case the last batch is discarded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL