producer

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const ResponseBodyMaxLength = 512

The only responses with body are errors, leave 511 for the error message

Variables

View Source
var Endianness = binary.BigEndian
View Source
var HeaderSize = binarySize(BinaryHeader{})

Functions

func WriteHeader

func WriteHeader(w *bytes.Buffer, header *BinaryHeader) error

func WriteString

func WriteString(w *bytes.Buffer, value string) error

Types

type BinaryHeader

type BinaryHeader struct {
	Version    uint8
	Flags      Flags
	StreamId   StreamId
	Op         OpCode
	BodyLength uint32
	Crc        uint32
}

Header for producer messages. Order of fields defines the serialization format.

type BinaryRequest

type BinaryRequest interface {
	Marshal(w *bytes.Buffer, header *BinaryHeader) error

	ResponseChannels() []chan<- BinaryResponse

	StreamId() StreamId
}

func NewProduceRequest

func NewProduceRequest(streamId StreamId, parts []*ProduceRequestPart) BinaryRequest

type BinaryResponse

type BinaryResponse interface {
	Op() OpCode
}

func NewClientErrorResponse

func NewClientErrorResponse(message string) BinaryResponse

func NewEmptyResponse

func NewEmptyResponse(op OpCode) BinaryResponse

Represents a response without body

type ErrorCode

type ErrorCode uint8
const (
	ServerError         ErrorCode = 0
	RoutingError        ErrorCode = 1
	LeaderNotFoundError ErrorCode = 2
	ClientError         ErrorCode = 255
)

type ErrorResponse

type ErrorResponse struct {
	Code    ErrorCode
	Message string
}

func (*ErrorResponse) Op

func (r *ErrorResponse) Op() OpCode

func (*ErrorResponse) ToError

func (r *ErrorResponse) ToError() error

type Flags

type Flags uint8
const (
	WithTimestamp Flags = 0b00000001
)

Flags. Use fixed numbers (not iota) to make it harder to break the protocol by moving stuff around.

type OpCode

type OpCode uint8
const (
	StartupOp         OpCode = 1
	ReadyOp           OpCode = 2
	ErrorOp           OpCode = 3
	ProduceOp         OpCode = 4
	ProduceResponseOp OpCode = 5
	HeartbeatOp       OpCode = 6
)

Operation codes. Use fixed numbers (not iota) to make it harder to break the protocol by moving stuff around.

type ProduceRequest

type ProduceRequest struct {
	// contains filtered or unexported fields
}

func (*ProduceRequest) Marshal

func (r *ProduceRequest) Marshal(w *bytes.Buffer, header *BinaryHeader) error

func (*ProduceRequest) ResponseChannels

func (r *ProduceRequest) ResponseChannels() []chan<- BinaryResponse

func (*ProduceRequest) StreamId

func (r *ProduceRequest) StreamId() StreamId

type ProduceRequestPart

type ProduceRequestPart struct {
	Topic        string
	Message      FixedLengthReader
	PartitionKey string
	Response     chan BinaryResponse
}

Represents a part of a potential produce request

func NewProduceRequestPart

func NewProduceRequestPart(
	topic string,
	message FixedLengthReader,
	partitionKey string,
) *ProduceRequestPart

type StreamId

type StreamId uint16

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL