records

package
v0.0.0-...-fcf41b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInsufficientData = errors.New("kafka record: insufficient data to decode packet, more bytes expected")
View Source
var ErrVarIntOverflow = errors.New("kafka record: varint overflow")
View Source
var UnsupportedMagic = fmt.Errorf("record batch magic number is unsupported")

Functions

func ComputeAttributes

func ComputeAttributes(codec int16, control, logAppendTime, isTransactional bool) int16

Types

type CompressionCodec

type CompressionCodec int8
const (
	// CompressionNone no compression
	CompressionNone CompressionCodec = iota
	// CompressionGZIP compression using GZIP
	CompressionGZIP
	// CompressionSnappy compression using snappy
	CompressionSnappy
	// CompressionLZ4 compression using LZ4
	CompressionLZ4
	// CompressionZSTD compression using ZSTD
	CompressionZSTD
)

type Record

type Record struct {
	Attributes     int8
	TimeStampDelta time.Duration
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	Headers        []RecordHeader
}

type RecordBatch

type RecordBatch struct {
	BaseOffset           int64
	PartitionLeaderEpoch int32
	Magic                int8
	CRC                  int32
	Attributes           int16
	LastOffsetDelta      int32
	FirstTimestamp       time.Time
	MaxTimestamp         time.Time
	ProducerID           int64
	ProducerEpoch        int16
	BaseSequence         int32
	Records              []*Record
}

func ParseRecordBatch

func ParseRecordBatch(recordBatchBytes []byte) (*RecordBatch, error)

func (*RecordBatch) Encode

func (rb *RecordBatch) Encode() ([]byte, error)

func (*RecordBatch) GetCodec

func (rb *RecordBatch) GetCodec() CompressionCodec

func (*RecordBatch) IsControl

func (rb *RecordBatch) IsControl() bool

func (*RecordBatch) IsLogAppendTime

func (rb *RecordBatch) IsLogAppendTime() bool

func (*RecordBatch) IsTransactional

func (rb *RecordBatch) IsTransactional() bool

type RecordDecoder

type RecordDecoder struct {
	Buff *bytes.Buffer
}

func (*RecordDecoder) ArrayLength

func (pd *RecordDecoder) ArrayLength() (int32, error)

func (*RecordDecoder) Close

func (pd *RecordDecoder) Close() error

func (*RecordDecoder) Int16

func (pd *RecordDecoder) Int16() (int16, error)

func (*RecordDecoder) Int32

func (pd *RecordDecoder) Int32() (int32, error)

func (*RecordDecoder) Int64

func (pd *RecordDecoder) Int64() (int64, error)

func (*RecordDecoder) Int8

func (pd *RecordDecoder) Int8() (int8, error)

func (*RecordDecoder) Length

func (pd *RecordDecoder) Length() int

func (*RecordDecoder) RawBytes

func (pd *RecordDecoder) RawBytes(size int) []byte

func (*RecordDecoder) VarInt

func (pd *RecordDecoder) VarInt() (int64, error)

func (*RecordDecoder) VarIntBytes

func (pd *RecordDecoder) VarIntBytes() ([]byte, error)

type RecordEncoder

type RecordEncoder struct {
	Buff *bytes.Buffer
}

func (*RecordEncoder) ArrayLength

func (pe *RecordEncoder) ArrayLength(in int)

func (*RecordEncoder) Int16

func (pe *RecordEncoder) Int16(in int16)

func (*RecordEncoder) Int32

func (pe *RecordEncoder) Int32(in int32)

func (*RecordEncoder) Int64

func (pe *RecordEncoder) Int64(in int64)

func (*RecordEncoder) Int8

func (pe *RecordEncoder) Int8(in int8)

func (*RecordEncoder) RawBytes

func (pe *RecordEncoder) RawBytes(in []byte)

func (*RecordEncoder) ToBytes

func (pe *RecordEncoder) ToBytes() []byte

func (*RecordEncoder) VarInt

func (pe *RecordEncoder) VarInt(in int64) int

func (*RecordEncoder) VarIntBytes

func (pe *RecordEncoder) VarIntBytes(in []byte)

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL