protocol

package
v0.0.0-...-16dfdc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2018 License: Apache-2.0, MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProduceKey              = 0
	FetchKey                = 1
	OffsetsKey              = 2
	MetadataKey             = 3
	LeaderAndISRKey         = 4
	StopReplicaKey          = 5
	UpdateMetadataKey       = 6
	ControlledShutdownKey   = 7
	OffsetCommitKey         = 8
	OffsetFetchKey          = 9
	FindCoordinator         = 10 //TODO. ??
	JoinGroupKey            = 11
	HeartbeatKey            = 12
	LeaveGroupKey           = 13
	SyncGroupKey            = 14
	DescribeGroupsKey       = 15
	ListGroupsKey           = 16
	SaslHandshakeKey        = 17
	APIVersionsKey          = 18
	CreateTopicsKey         = 19
	DeleteTopicsKey         = 20
	DeleteRecordsKey        = 21
	InitProducerIdKey       = 22
	OffsetForLeaderEpochKey = 23
	AddPartitionsToTxnKey   = 24
	AddOffsetsToTxnKey      = 25
	EndTxnKey               = 26
	WriteTxnMarkersKey      = 27
	TxnOffsetCommitKey      = 28
	DescribeAclsKey         = 29
	CreateAclsKey           = 30
	DeleteAclsKey           = 31
	DescribeConfigsKey      = 32
	AlterConfigsKey         = 33
)

Protocol API keys. See: https://kafka.apache.org/protocol#protocol_api_keys

View Source
const (
	ErrUnknown                 = int16(-1)
	ErrNone                    = int16(0)
	ErrUnknownTopicOrPartition = int16(3)
	ErrNotLeaderForPartition   = int16(6)
)

Variables

View Source
var Encoding = binary.BigEndian
View Source
var ErrArrayIsNull = errors.New("array is null")
View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
View Source
var ErrInvalidArrayLength = errors.New("kafka: invalid array length")
View Source
var ErrInvalidByteSliceLength = errors.New("invalid byteslice length")
View Source
var ErrInvalidStringLength = errors.New("kafka: invalid string length")
View Source
var ErrMagicByte = errors.New("MagicByte is 2,not 1")
View Source
var (
	KeyToString = map[int16]string{
		ProduceKey:              "ProduceKey",
		FetchKey:                "FetchKey",
		OffsetsKey:              "OffsetsKey",
		MetadataKey:             "MetadataKey",
		LeaderAndISRKey:         "LeaderAndISRKey",
		StopReplicaKey:          "StopReplicaKey",
		UpdateMetadataKey:       "UpdateMetadataKey",
		ControlledShutdownKey:   "ControlledShutdownKey",
		OffsetCommitKey:         "ControlledShutdownKey",
		OffsetFetchKey:          "OffsetFetchKey",
		FindCoordinator:         "FindCoordinator",
		JoinGroupKey:            "JoinGroupKey",
		HeartbeatKey:            "HeartbeatKey",
		LeaveGroupKey:           "LeaveGroupKey",
		SyncGroupKey:            "SyncGroupKey",
		DescribeGroupsKey:       "DescribeGroupsKey",
		ListGroupsKey:           "ListGroupsKey",
		SaslHandshakeKey:        "SaslHandshakeKey",
		APIVersionsKey:          "APIVersionsKey",
		CreateTopicsKey:         "CreateTopicsKey",
		DeleteTopicsKey:         "DeleteTopicsKey",
		DeleteRecordsKey:        "DeleteRecordsKey",
		InitProducerIdKey:       "InitProducerIdKey",
		OffsetForLeaderEpochKey: "OffsetForLeaderEpochKey",
		AddPartitionsToTxnKey:   "AddPartitionsToTxnKey",
		AddOffsetsToTxnKey:      "AddOffsetsToTxnKey",
		EndTxnKey:               "EndTxnKey",
		WriteTxnMarkersKey:      "WriteTxnMarkersKey",
		TxnOffsetCommitKey:      "TxnOffsetCommitKey",
		DescribeAclsKey:         "DescribeAclsKey",
		CreateAclsKey:           "CreateAclsKey",
		DeleteAclsKey:           "DeleteAclsKey",
		DescribeConfigsKey:      "DescribeConfigsKey",
		AlterConfigsKey:         "AlterConfigsKey",
	}
)

Functions

func Decode

func Decode(b []byte, in Decoder) error

func Encode

func Encode(e Encoder) ([]byte, error)

func Read

func Read(r io.Reader, data interface{}) error

func Size

func Size(v interface{}) int

func Write

func Write(w io.Writer, data interface{}) error

Types

type APIVersionsResponse

type APIVersionsResponse struct {
	ErrorCode      int16
	ApiVersions    []*ApiVersion
	ThrottleTimeMs int32

	// Save Request Version, use in Encode, which decide Encode func use ThrottleTimeMs or not
	RequestVersion int16
}

func (*APIVersionsResponse) Decode

func (*APIVersionsResponse) Encode

type ApiVersion

type ApiVersion struct {
	ApiKey     int16
	MinVersion int16
	MaxVersion int16
}

type ApiVersionsRequest

type ApiVersionsRequest struct {
}

func (*ApiVersionsRequest) Key

func (r *ApiVersionsRequest) Key() int16

func (*ApiVersionsRequest) Version

func (r *ApiVersionsRequest) Version() int16

type Body

type Body interface {
	Encoder
	Key() int16
	Version() int16
}

type BrokerV0

type BrokerV0 struct {
	NodeID int32
	Host   string
	Port   int32
}

type BrokerV1

type BrokerV1 struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string
}

type BrokerV2

type BrokerV2 struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string
}

type ByteDecoder

type ByteDecoder struct {
	// contains filtered or unexported fields
}

func NewDecoder

func NewDecoder(b []byte) *ByteDecoder

func (*ByteDecoder) ArrayLength

func (d *ByteDecoder) ArrayLength() (int, error)

func (*ByteDecoder) Bool

func (d *ByteDecoder) Bool() (bool, error)

func (*ByteDecoder) Bytes

func (d *ByteDecoder) Bytes() ([]byte, error)

func (*ByteDecoder) Int16

func (d *ByteDecoder) Int16() (int16, error)

func (*ByteDecoder) Int32

func (d *ByteDecoder) Int32() (int32, error)

func (*ByteDecoder) Int32Array

func (d *ByteDecoder) Int32Array() ([]int32, error)

func (*ByteDecoder) Int64

func (d *ByteDecoder) Int64() (int64, error)

func (*ByteDecoder) Int64Array

func (d *ByteDecoder) Int64Array() ([]int64, error)

func (*ByteDecoder) Int8

func (d *ByteDecoder) Int8() (int8, error)

func (*ByteDecoder) Offset

func (d *ByteDecoder) Offset() int

func (*ByteDecoder) Pop

func (d *ByteDecoder) Pop() error

func (*ByteDecoder) Push

func (d *ByteDecoder) Push(pd PushDecoder) error

func (*ByteDecoder) String

func (d *ByteDecoder) String() (string, error)

func (*ByteDecoder) StringArray

func (d *ByteDecoder) StringArray() ([]string, error)

type ByteEncoder

type ByteEncoder struct {
	// contains filtered or unexported fields
}

func NewByteEncoder

func NewByteEncoder(b []byte) *ByteEncoder

func (*ByteEncoder) Bytes

func (b *ByteEncoder) Bytes() []byte

func (*ByteEncoder) Pop

func (e *ByteEncoder) Pop()

func (*ByteEncoder) Push

func (e *ByteEncoder) Push(pe PushEncoder)

func (*ByteEncoder) PutArrayLength

func (e *ByteEncoder) PutArrayLength(in int) error

func (*ByteEncoder) PutBool

func (e *ByteEncoder) PutBool(in bool)

func (*ByteEncoder) PutBytes

func (e *ByteEncoder) PutBytes(in []byte) error

func (*ByteEncoder) PutInt16

func (e *ByteEncoder) PutInt16(in int16)

func (*ByteEncoder) PutInt32

func (e *ByteEncoder) PutInt32(in int32)

func (*ByteEncoder) PutInt32Array

func (e *ByteEncoder) PutInt32Array(in []int32) error

func (*ByteEncoder) PutInt64

func (e *ByteEncoder) PutInt64(in int64)

func (*ByteEncoder) PutInt64Array

func (e *ByteEncoder) PutInt64Array(in []int64) error

func (*ByteEncoder) PutInt8

func (e *ByteEncoder) PutInt8(in int8)

func (*ByteEncoder) PutRawBytes

func (e *ByteEncoder) PutRawBytes(in []byte) error

func (*ByteEncoder) PutString

func (e *ByteEncoder) PutString(in string) error

func (*ByteEncoder) PutStringArray

func (e *ByteEncoder) PutStringArray(in []string) error

type CRCField

type CRCField struct {
	StartOffset int
}

func (*CRCField) Check

func (f *CRCField) Check(curOffset int, buf []byte) error

func (*CRCField) Fill

func (f *CRCField) Fill(curOffset int, buf []byte) error

func (*CRCField) ReserveSize

func (f *CRCField) ReserveSize() int

func (*CRCField) SaveOffset

func (f *CRCField) SaveOffset(in int)

type Coordinator

type Coordinator struct {
	NodeID int32
	Host   string
	Port   int32
}

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic             string
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	Configs           map[string]string
}

type CreateTopicRequests

type CreateTopicRequests struct {
	Requests []*CreateTopicRequest
	Timeout  int32
}

func (*CreateTopicRequests) Decode

func (*CreateTopicRequests) Encode

func (*CreateTopicRequests) Key

func (c *CreateTopicRequests) Key() int16

func (*CreateTopicRequests) Version

func (c *CreateTopicRequests) Version() int16

type CreateTopicsResponse

type CreateTopicsResponse struct {
	TopicErrorCodes []*TopicErrorCode
}

func (*CreateTopicsResponse) Decode

func (*CreateTopicsResponse) Encode

type Decoder

type Decoder interface {
	Decode(d PacketDecoder) error
}

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	Topics  []string
	Timeout int32
}

func (*DeleteTopicsRequest) Decode

func (c *DeleteTopicsRequest) Decode(d PacketDecoder) (err error)

func (*DeleteTopicsRequest) Encode

func (c *DeleteTopicsRequest) Encode(e PacketEncoder) (err error)

func (*DeleteTopicsRequest) Key

func (c *DeleteTopicsRequest) Key() int16

func (*DeleteTopicsRequest) Version

func (c *DeleteTopicsRequest) Version() int16

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	TopicErrorCodes []*TopicErrorCode
}

func (*DeleteTopicsResponse) Decode

func (*DeleteTopicsResponse) Encode

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	GroupIDs []string
}

func (*DescribeGroupsRequest) Decode

func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error)

func (*DescribeGroupsRequest) Encode

func (*DescribeGroupsRequest) Key

func (r *DescribeGroupsRequest) Key() int16

func (*DescribeGroupsRequest) Version

func (r *DescribeGroupsRequest) Version() int16

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	Groups []*Group
}

func (*DescribeGroupsResponse) Decode

func (r *DescribeGroupsResponse) Decode(d PacketDecoder) (err error)

func (*DescribeGroupsResponse) Encode

func (*DescribeGroupsResponse) Key

func (r *DescribeGroupsResponse) Key() int16

func (*DescribeGroupsResponse) Version

func (r *DescribeGroupsResponse) Version() int16

type Encoder

type Encoder interface {
	Encode(e PacketEncoder) error
}

type FetchMessage

type FetchMessage struct {
	Offset int64
	Record []byte
}

type FetchPartition

type FetchPartition struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

type FetchPartitionResponse

type FetchPartitionResponse struct {
	Partition     int32
	ErrorCode     int16
	HighWatermark int64
	Messages      []*FetchMessage
}

type FetchRequest

type FetchRequest struct {
	ReplicaID   int32
	MaxWaitTime int32
	MinBytes    int32
	// MaxBytes    int32
	Topics []*FetchTopic
}

func (*FetchRequest) Decode

func (r *FetchRequest) Decode(d PacketDecoder) error

func (*FetchRequest) Encode

func (r *FetchRequest) Encode(e PacketEncoder) error

func (*FetchRequest) Key

func (r *FetchRequest) Key() int16

func (*FetchRequest) Version

func (r *FetchRequest) Version() int16

type FetchResponse

type FetchResponse struct {
	Topic              string
	PartitionResponses []*FetchPartitionResponse
}

type FetchResponses

type FetchResponses struct {
	ThrottleTimeMs int32
	Responses      []*FetchResponse
	// Temp for msg encode, will not write to client
	APIVersion int16
}

func (*FetchResponses) Decode

func (r *FetchResponses) Decode(d PacketDecoder) error

func (*FetchResponses) Encode

func (r *FetchResponses) Encode(e PacketEncoder) (err error)

func (*FetchResponses) FetchResponseSize

func (r *FetchResponses) FetchResponseSize() int

type FetchTopic

type FetchTopic struct {
	Topic      string
	Partitions []*FetchPartition
}

type Group

type Group struct {
	ErrorCode    int16
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	GroupMembers map[string]*GroupMember
}

func (*Group) Decode

func (r *Group) Decode(d PacketDecoder) (err error)

func (*Group) Encode

func (r *Group) Encode(e PacketEncoder) error

type GroupCoordinatorRequest

type GroupCoordinatorRequest struct {
	GroupID string
}

func (*GroupCoordinatorRequest) Decode

func (r *GroupCoordinatorRequest) Decode(d PacketDecoder) (err error)

func (*GroupCoordinatorRequest) Encode

func (*GroupCoordinatorRequest) Key

func (r *GroupCoordinatorRequest) Key() int16

func (*GroupCoordinatorRequest) Version

func (r *GroupCoordinatorRequest) Version() int16

type GroupCoordinatorResponse

type GroupCoordinatorResponse struct {
	ErrorCode   int16
	Coordinator *Coordinator
}

func (*GroupCoordinatorResponse) Decode

func (r *GroupCoordinatorResponse) Decode(d PacketDecoder) (err error)

func (*GroupCoordinatorResponse) Encode

type GroupMember

type GroupMember struct {
	ClientID              string
	ClientHost            string
	GroupMemberMetadata   []byte
	GroupMemberAssignment []byte
}

func (*GroupMember) Decode

func (r *GroupMember) Decode(d PacketDecoder) (err error)

func (*GroupMember) Encode

func (r *GroupMember) Encode(e PacketEncoder) error

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupID           string
	GroupGenerationID int32
	MemberID          string
}

func (*HeartbeatRequest) Decode

func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error)

func (*HeartbeatRequest) Key

func (r *HeartbeatRequest) Key() int16

func (*HeartbeatRequest) Version

func (r *HeartbeatRequest) Version() int16

type HeartbeatResponse

type HeartbeatResponse struct {
	ErrorCode int16
}

func (*HeartbeatResponse) Decode

func (r *HeartbeatResponse) Decode(d PacketDecoder) (err error)

func (*HeartbeatResponse) Encode

func (r *HeartbeatResponse) Encode(e PacketEncoder) error

func (*HeartbeatResponse) Key

func (r *HeartbeatResponse) Key() int16

func (*HeartbeatResponse) Version

func (r *HeartbeatResponse) Version() int16

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID        string
	SessionTimeout int32
	MemberID       string
	ProtocolType   string
	GroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) Decode

func (r *JoinGroupRequest) Decode(d PacketDecoder) error

func (*JoinGroupRequest) Encode

func (r *JoinGroupRequest) Encode(e PacketEncoder) error

func (*JoinGroupRequest) Key

func (r *JoinGroupRequest) Key() int16

func (*JoinGroupRequest) Version

func (r *JoinGroupRequest) Version() int16

type JoinGroupResponse

type JoinGroupResponse struct {
	ErrorCode     int16
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       map[string][]byte
}

func (*JoinGroupResponse) Decode

func (r *JoinGroupResponse) Decode(d PacketDecoder) error

func (*JoinGroupResponse) Encode

func (r *JoinGroupResponse) Encode(e PacketEncoder) error

func (*JoinGroupResponse) Key

func (r *JoinGroupResponse) Key() int16

func (*JoinGroupResponse) Version

func (r *JoinGroupResponse) Version() int16

type LeaderAndISRPartition

type LeaderAndISRPartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type LeaderAndISRRequest

type LeaderAndISRRequest struct {
	ControllerID    int32
	ControllerEpoch int32
	PartitionStates []*PartitionState
	LiveLeaders     []*LiveLeader
}

func (*LeaderAndISRRequest) Decode

func (*LeaderAndISRRequest) Encode

func (*LeaderAndISRRequest) Key

func (r *LeaderAndISRRequest) Key() int16

func (*LeaderAndISRRequest) Version

func (r *LeaderAndISRRequest) Version() int16

type LeaderAndISRResponse

type LeaderAndISRResponse struct {
	ErrorCode  int16
	Partitions []*LeaderAndISRPartition
}

func (*LeaderAndISRResponse) Decode

func (*LeaderAndISRResponse) Encode

func (*LeaderAndISRResponse) Key

func (r *LeaderAndISRResponse) Key() int16

func (*LeaderAndISRResponse) Version

func (r *LeaderAndISRResponse) Version() int16

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupID  string
	MemberID string
}

func (*LeaveGroupRequest) Decode

func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error)

func (*LeaveGroupRequest) Encode

func (r *LeaveGroupRequest) Encode(e PacketEncoder) error

func (*LeaveGroupRequest) Key

func (r *LeaveGroupRequest) Key() int16

func (*LeaveGroupRequest) Version

func (r *LeaveGroupRequest) Version() int16

type LeaveGroupResponse

type LeaveGroupResponse struct {
	ErrorCode int16
}

func (*LeaveGroupResponse) Decode

func (r *LeaveGroupResponse) Decode(d PacketDecoder) (err error)

func (*LeaveGroupResponse) Encode

func (r *LeaveGroupResponse) Encode(e PacketEncoder) error

func (*LeaveGroupResponse) Key

func (r *LeaveGroupResponse) Key() int16

func (*LeaveGroupResponse) Version

func (r *LeaveGroupResponse) Version() int16

type LenEncoder

type LenEncoder struct {
	Length int
	// contains filtered or unexported fields
}

func (*LenEncoder) Pop

func (e *LenEncoder) Pop()

func (*LenEncoder) Push

func (e *LenEncoder) Push(pe PushEncoder)

func (*LenEncoder) PutArrayLength

func (e *LenEncoder) PutArrayLength(in int) error

func (*LenEncoder) PutBool

func (e *LenEncoder) PutBool(in bool)

func (*LenEncoder) PutBytes

func (e *LenEncoder) PutBytes(in []byte) error

func (*LenEncoder) PutInt16

func (e *LenEncoder) PutInt16(in int16)

func (*LenEncoder) PutInt32

func (e *LenEncoder) PutInt32(in int32)

func (*LenEncoder) PutInt32Array

func (e *LenEncoder) PutInt32Array(in []int32) error

func (*LenEncoder) PutInt64

func (e *LenEncoder) PutInt64(in int64)

func (*LenEncoder) PutInt64Array

func (e *LenEncoder) PutInt64Array(in []int64) error

func (*LenEncoder) PutInt8

func (e *LenEncoder) PutInt8(in int8)

func (*LenEncoder) PutRawBytes

func (e *LenEncoder) PutRawBytes(in []byte) error

func (*LenEncoder) PutString

func (e *LenEncoder) PutString(in string) error

func (*LenEncoder) PutStringArray

func (e *LenEncoder) PutStringArray(in []string) error

type ListGroupsRequest

type ListGroupsRequest struct {
}

func (*ListGroupsRequest) Decode

func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error)

func (*ListGroupsRequest) Encode

func (r *ListGroupsRequest) Encode(e PacketEncoder) error

func (*ListGroupsRequest) Key

func (r *ListGroupsRequest) Key() int16

func (*ListGroupsRequest) Version

func (r *ListGroupsRequest) Version() int16

type ListGroupsResponse

type ListGroupsResponse struct {
	ErrorCode int16
	Groups    map[string]string
}

func (*ListGroupsResponse) Decode

func (r *ListGroupsResponse) Decode(d PacketDecoder) (err error)

func (*ListGroupsResponse) Encode

func (r *ListGroupsResponse) Encode(e PacketEncoder) error

func (*ListGroupsResponse) Key

func (r *ListGroupsResponse) Key() int16

func (*ListGroupsResponse) Version

func (r *ListGroupsResponse) Version() int16

type LiveLeader

type LiveLeader struct {
	ID   int32
	Host string
	Port int32
}

type Member

type Member struct {
	MemberID       string
	MemberMetadata []byte
}

type Message

type Message struct {
	Crc        int32
	MagicByte  int8
	Attributes int8
	Timestamp  int64
	Key        []byte
	Value      []byte
}

func (*Message) Decode

func (m *Message) Decode(d PacketDecoder) error

func (*Message) Encode

func (m *Message) Encode(e PacketEncoder) error

type MessageElement

type MessageElement struct {
	Offset int64
	Size   int32
	*Message
}

type MessageSet

type MessageSet struct {
	Messages                []*MessageElement
	PartialTrailingMessages bool
}

func (*MessageSet) Decode

func (ms *MessageSet) Decode(d PacketDecoder) error

func (*MessageSet) Encode

func (ms *MessageSet) Encode(e PacketEncoder) error

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

func (*MetadataRequest) Decode

func (r *MetadataRequest) Decode(d PacketDecoder) (err error)

func (*MetadataRequest) Encode

func (r *MetadataRequest) Encode(e PacketEncoder) error

func (*MetadataRequest) Key

func (r *MetadataRequest) Key() int16

func (*MetadataRequest) Version

func (r *MetadataRequest) Version() int16

type MetadataResponseV0

type MetadataResponseV0 struct {
	Brokers       []*BrokerV0
	TopicMetadata []*TopicMetadataV0
}

version 0

func (*MetadataResponseV0) Decode

func (r *MetadataResponseV0) Decode(d PacketDecoder) error

func (*MetadataResponseV0) Encode

func (r *MetadataResponseV0) Encode(e PacketEncoder) (err error)

type MetadataResponseV1

type MetadataResponseV1 struct {
	Brokers       []*BrokerV1
	ControllerID  int32
	TopicMetadata []*TopicMetadataV1
}

version 1

func (*MetadataResponseV1) Decode

func (r *MetadataResponseV1) Decode(d PacketDecoder) error

func (*MetadataResponseV1) Encode

func (r *MetadataResponseV1) Encode(e PacketEncoder) (err error)

type MetadataResponseV2

type MetadataResponseV2 struct {
	Brokers       []*BrokerV2
	ClusterID     string
	ControllerID  int32
	TopicMetadata []*TopicMetadataV2
}

version 2

func (*MetadataResponseV2) Decode

func (r *MetadataResponseV2) Decode(d PacketDecoder) error

func (*MetadataResponseV2) Encode

func (r *MetadataResponseV2) Encode(e PacketEncoder) (err error)

type OffsetResponse

type OffsetResponse struct {
	Topic              string
	PartitionResponses []*PartitionResponse
}

type OffsetsPartition

type OffsetsPartition struct {
	Partition     int32
	Timestamp     int64 // -1 to receive latest offset, -2 to receive earliest offset
	MaxNumOffsets int32
}

type OffsetsRequest

type OffsetsRequest struct {
	ReplicaID int32
	Topics    []*OffsetsTopic
}

func (*OffsetsRequest) Decode

func (r *OffsetsRequest) Decode(d PacketDecoder) error

func (*OffsetsRequest) Encode

func (r *OffsetsRequest) Encode(e PacketEncoder) error

func (*OffsetsRequest) Key

func (r *OffsetsRequest) Key() int16

func (*OffsetsRequest) Version

func (r *OffsetsRequest) Version() int16

type OffsetsResponse

type OffsetsResponse struct {
	Responses []*OffsetResponse
}

func (*OffsetsResponse) Decode

func (r *OffsetsResponse) Decode(d PacketDecoder) error

func (*OffsetsResponse) Encode

func (r *OffsetsResponse) Encode(e PacketEncoder) error

type OffsetsTopic

type OffsetsTopic struct {
	Topic      string
	Partitions []*OffsetsPartition
}

type PacketDecoder

type PacketDecoder interface {
	Bool() (bool, error)
	Int8() (int8, error)
	Int16() (int16, error)
	Int32() (int32, error)
	Int64() (int64, error)
	ArrayLength() (int, error)
	Bytes() ([]byte, error)
	String() (string, error)
	Int32Array() ([]int32, error)
	Int64Array() ([]int64, error)
	StringArray() ([]string, error)
	Push(pd PushDecoder) error
	Pop() error
	// contains filtered or unexported methods
}

type PacketEncoder

type PacketEncoder interface {
	PutBool(in bool)
	PutInt8(in int8)
	PutInt16(in int16)
	PutInt32(in int32)
	PutInt64(in int64)
	PutArrayLength(in int) error
	PutRawBytes(in []byte) error
	PutBytes(in []byte) error
	PutString(in string) error
	PutStringArray(in []string) error
	PutInt32Array(in []int32) error
	PutInt64Array(in []int64) error
	Push(pe PushEncoder)
	Pop()
}

type PartitionData

type PartitionData struct {
	Partition      int32
	MessageSetSize int32
	MessageSet
}

type PartitionMetadata

type PartitionMetadata struct {
	PartitionErrorCode int16
	ParititionID       int32
	Leader             int32
	Replicas           []int32
	ISR                []int32
}

type PartitionResponse

type PartitionResponse struct {
	Partition int32
	ErrorCode int16
	// Timestamp int64
	Offsets []int64
}

type PartitionState

type PartitionState struct {
	Topic           string
	Partition       int32
	ControllerEpoch int32
	Leader          int32
	LeaderEpoch     int32
	ISR             []int32
	ZKVersion       int32
	Replicas        []int32
}

type ProducePartitionResponse

type ProducePartitionResponse struct {
	Partition      int32
	ErrorCode      int16
	BaseOffset     int64
	Timestamp      int64
	OffsetChannels []<-chan interface{} //just for vdl pipeline
}

type ProduceRequest

type ProduceRequest struct {
	Acks      int16
	Timeout   int32
	TopicData []*TopicData
}

func (*ProduceRequest) Decode

func (r *ProduceRequest) Decode(d PacketDecoder) (err error)

version 2 解析一个生产消息的请求,赋值到r中

func (*ProduceRequest) Encode

func (r *ProduceRequest) Encode(e PacketEncoder) (err error)

func (*ProduceRequest) Key

func (r *ProduceRequest) Key() int16

func (*ProduceRequest) Print

func (r *ProduceRequest) Print()

func (*ProduceRequest) Version

func (r *ProduceRequest) Version() int16

type ProduceResponse

type ProduceResponse struct {
	Topic              string
	PartitionResponses []*ProducePartitionResponse
}

type ProduceResponses

type ProduceResponses struct {
	Responses      []*ProduceResponse
	ThrottleTimeMs int32
	// contains filtered or unexported fields
}

func (*ProduceResponses) Decode

func (r *ProduceResponses) Decode(d PacketDecoder) error

func (*ProduceResponses) Encode

func (rs *ProduceResponses) Encode(e PacketEncoder) error

func (*ProduceResponses) SetVersion

func (r *ProduceResponses) SetVersion(v int16)

func (*ProduceResponses) Version

func (r *ProduceResponses) Version() int16

type PushDecoder

type PushDecoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
}

type PushEncoder

type PushEncoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
}

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          Body
}

func (*Request) Encode

func (r *Request) Encode(pe PacketEncoder) (err error)

type RequestHeader

type RequestHeader struct {
	// Size of the request
	Size int32
	// ID of the API (e.g. produce, fetch, metadata)
	APIKey int16
	// Version of the API to use
	APIVersion int16
	// User defined ID to correlate requests between server and client
	CorrelationID int32
	// Size of the Client ID
	ClientID string
}

func (*RequestHeader) Decode

func (r *RequestHeader) Decode(d PacketDecoder) error

将PacketDecoder转存到RequestHeader

func (*RequestHeader) Encode

func (r *RequestHeader) Encode(e PacketEncoder)

将RequestHeader写入PacketEncoder

func (*RequestHeader) Print

func (r *RequestHeader) Print()

type Response

type Response struct {
	Size          int32
	CorrelationID int32
	Body          ResponseBody
}

func (*Response) Decode

func (r *Response) Decode(pd PacketDecoder) (err error)

func (*Response) Encode

func (r *Response) Encode(pe PacketEncoder) (err error)

type ResponseBody

type ResponseBody interface {
	Encoder
	Decoder
}

type SizeField

type SizeField struct {
	StartOffset int
}

func (*SizeField) Check

func (s *SizeField) Check(curOffset int, buf []byte) error

func (*SizeField) Fill

func (s *SizeField) Fill(curOffset int, buf []byte) error

set the size from start to curOffset, not include size

func (*SizeField) ReserveSize

func (s *SizeField) ReserveSize() int

func (*SizeField) SaveOffset

func (s *SizeField) SaveOffset(in int)

type StopReplicaPartition

type StopReplicaPartition struct {
	Topic     string
	Partition int32
}

type StopReplicaRequest

type StopReplicaRequest struct {
	ControllerID     int32
	ControllerEpoch  int32
	DeletePartitions bool
	Partitions       []*StopReplicaPartition
}

func (*StopReplicaRequest) Decode

func (r *StopReplicaRequest) Decode(d PacketDecoder) (err error)

func (*StopReplicaRequest) Encode

func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error)

func (*StopReplicaRequest) Key

func (r *StopReplicaRequest) Key() int16

func (*StopReplicaRequest) Version

func (r *StopReplicaRequest) Version() int16

type StopReplicaResponse

type StopReplicaResponse struct {
	ErrorCode  int16
	Partitions []*StopReplicaResponsePartition
}

func (*StopReplicaResponse) Decode

func (r *StopReplicaResponse) Decode(d PacketDecoder) (err error)

func (*StopReplicaResponse) Encode

func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error)

type StopReplicaResponsePartition

type StopReplicaResponsePartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID          string
	GenerationID     int32
	MemberID         string
	GroupAssignments map[string][]byte
}

func (*SyncGroupRequest) Decode

func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error)

func (*SyncGroupRequest) Encode

func (r *SyncGroupRequest) Encode(e PacketEncoder) error

func (*SyncGroupRequest) Key

func (r *SyncGroupRequest) Key() int16

func (*SyncGroupRequest) Version

func (r *SyncGroupRequest) Version() int16

type SyncGroupResponse

type SyncGroupResponse struct {
	ErrorCode        int16
	MemberAssignment []byte
}

func (*SyncGroupResponse) Decode

func (r *SyncGroupResponse) Decode(d PacketDecoder) (err error)

func (*SyncGroupResponse) Encode

func (r *SyncGroupResponse) Encode(e PacketEncoder) error

func (*SyncGroupResponse) Key

func (r *SyncGroupResponse) Key() int16

func (*SyncGroupResponse) Version

func (r *SyncGroupResponse) Version() int16

type TopicData

type TopicData struct {
	Topic string
	Data  []*PartitionData
}

type TopicErrorCode

type TopicErrorCode struct {
	Topic     string
	ErrorCode int16
}

type TopicMetadataV0

type TopicMetadataV0 struct {
	TopicErrorCode    int16
	Topic             string
	PartitionMetadata []*PartitionMetadata
}

version 0

type TopicMetadataV1

type TopicMetadataV1 struct {
	TopicErrorCode    int16
	Topic             string
	IsInternal        bool
	PartitionMetadata []*PartitionMetadata
}

version 1

type TopicMetadataV2

type TopicMetadataV2 struct {
	TopicErrorCode    int16
	Topic             string
	IsInternal        bool
	PartitionMetadata []*PartitionMetadata
}

version 2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL