codec

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LenCorrId               = 4
	LenErrorCode            = 2
	LenArray                = 4
	LenApiV0                = 6
	LenApiV3                = 7
	LenThrottleTime         = 4
	LenFetchSessionId       = 4
	LenPartitionId          = 4
	LenOffset               = 8
	LenLastStableOffset     = 8
	LenStartOffset          = 8
	LenAbortTransactions    = 4
	LenReplicaId            = 4
	LenMessageSize          = 4
	LenNodeId               = 4
	LenPort                 = 4
	LenGenerationId         = 4
	LenTime                 = 8
	LenLeaderId             = 4
	LenLeaderEpoch          = 4
	LenControllerId         = 4
	LenIsInternal           = 4
	LenTopicAuthOperation   = 4
	LenClusterAuthOperation = 4
	LenRecordAttributes     = 1
	LenMagicByte            = 1
	LenCrc32                = 4
	LenOffsetDelta          = 4
	LenProducerId           = 8
	LenProducerEpoch        = 2
	LenBaseSequence         = 4
	LenSessionTimeout       = 8
)
View Source
const LenTaggedField = 1

Variables

This section is empty.

Functions

func BytesLen

func BytesLen(bytes []byte) int

func CompactBytesLen

func CompactBytesLen(bytes []byte) int

func CompactNullableStrLen

func CompactNullableStrLen(str *string) int

func CompactStrLen

func CompactStrLen(str string) int

func StrLen

func StrLen(str string) int

Types

type ApiReq

type ApiReq struct {
	BaseReq
	ClientSoftwareName    string
	ClientSoftwareVersion string
}

func DecodeApiReq

func DecodeApiReq(bytes []byte, version int16) (apiReq *ApiReq, err error)

type ApiRespVersion

type ApiRespVersion struct {
	ApiKey     api.Code
	MinVersion int16
	MaxVersion int16
}

type ApiResponse

type ApiResponse struct {
	BaseResp
	ErrorCode       int16
	ApiRespVersions []*ApiRespVersion
	ThrottleTime    int
}

func NewApiVersionResp

func NewApiVersionResp(corrId int) *ApiResponse

func (*ApiResponse) Bytes

func (a *ApiResponse) Bytes(version int16) []byte

func (*ApiResponse) BytesLength

func (a *ApiResponse) BytesLength(version int16) int

type BaseReq

type BaseReq struct {
	CorrelationId int
	ClientId      string
}

type BaseResp

type BaseResp struct {
	CorrelationId int
}

type BrokerMetadata

type BrokerMetadata struct {
	NodeId int
	Host   string
	Port   int
	Rack   *string
}

type EnableMechanism

type EnableMechanism struct {
	SaslMechanism string
}

type FetchPartitionReq

type FetchPartitionReq struct {
	PartitionId        int
	CurrentLeaderEpoch int
	FetchOffset        int64
	LastFetchedEpoch   int
	LogStartOffset     int64
	PartitionMaxBytes  int
}

type FetchPartitionResp

type FetchPartitionResp struct {
	PartitionIndex      int
	ErrorCode           int16
	HighWatermark       int64
	LastStableOffset    int64
	LogStartOffset      int64
	AbortedTransactions int64
	ReplicaData         int64
	RecordBatch         *RecordBatch
}

type FetchReq

type FetchReq struct {
	BaseReq
	ReplicaId         int
	MaxWaitTime       int
	MinBytes          int
	MaxBytes          int
	IsolationLevel    byte
	FetchSessionId    int
	FetchSessionEpoch int
	FetchTopics       []*FetchTopicReq
}

func DecodeFetchReq

func DecodeFetchReq(bytes []byte, version int16) (fetchReq *FetchReq, err error)

type FetchResp

type FetchResp struct {
	BaseResp
	ThrottleTime   int
	ErrorCode      int16
	SessionId      int
	TopicResponses []*FetchTopicResp
}

func NewFetchResp

func NewFetchResp(corrId int) *FetchResp

func (*FetchResp) Bytes

func (f *FetchResp) Bytes(version int16) []byte

func (*FetchResp) BytesLength

func (f *FetchResp) BytesLength(version int16) int

type FetchTopicReq

type FetchTopicReq struct {
	Topic           string
	FetchPartitions []*FetchPartitionReq
}

type FetchTopicResp

type FetchTopicResp struct {
	Topic             string
	PartitionDataList []*FetchPartitionResp
}

type FindCoordinatorReq

type FindCoordinatorReq struct {
	BaseReq
	Key     string
	KeyType byte
}

func DecodeFindCoordinatorReq

func DecodeFindCoordinatorReq(bytes []byte, version int16) (findCoordinatorReq *FindCoordinatorReq, err error)

type FindCoordinatorResp

type FindCoordinatorResp struct {
	BaseResp
	ErrorCode    int16
	ThrottleTime int
	ErrorMessage *string
	NodeId       int
	Host         string
	Port         int
}

func NewFindCoordinatorResp

func NewFindCoordinatorResp(corrId int, config *KafkaProtocolConfig) *FindCoordinatorResp

func (*FindCoordinatorResp) Bytes

func (f *FindCoordinatorResp) Bytes() []byte

func (*FindCoordinatorResp) BytesLength

func (f *FindCoordinatorResp) BytesLength() int

type GroupAssignment

type GroupAssignment struct {
	MemberId string
	// COMPACT_BYTES
	MemberAssignment string
}

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata string
}

type HeartBeatReq

type HeartBeatReq struct {
	BaseReq
	GroupId         string
	GenerationId    int
	MemberId        string
	GroupInstanceId *string
}

func DecodeHeartbeatReq

func DecodeHeartbeatReq(bytes []byte, version int16) (heartBeatReq *HeartBeatReq, err error)

type HeartBeatResp

type HeartBeatResp struct {
	BaseResp
	ErrorCode    int16
	ThrottleTime int
}

func NewHeartBeatResp

func NewHeartBeatResp(corrId int) *HeartBeatResp

func (*HeartBeatResp) Bytes

func (h *HeartBeatResp) Bytes() []byte

func (*HeartBeatResp) BytesLength

func (h *HeartBeatResp) BytesLength() int

type JoinGroupReq

type JoinGroupReq struct {
	BaseReq
	GroupId          string
	SessionTimeout   int
	RebalanceTimeout int
	MemberId         string
	GroupInstanceId  *string
	ProtocolType     string
	GroupProtocols   []*GroupProtocol
}

func DecodeJoinGroupReq

func DecodeJoinGroupReq(bytes []byte, version int16) (joinGroupReq *JoinGroupReq, err error)

type JoinGroupResp

type JoinGroupResp struct {
	BaseResp
	ErrorCode    int16
	ThrottleTime int
	GenerationId int
	ProtocolType *string
	ProtocolName string
	LeaderId     string
	MemberId     string
	Members      []*Member
}

func ErrorJoinGroupResp

func ErrorJoinGroupResp(corrId int, errorCode int16) *JoinGroupResp

func NewJoinGroupResp

func NewJoinGroupResp(corrId int) *JoinGroupResp

func (*JoinGroupResp) Bytes

func (j *JoinGroupResp) Bytes(version int16) []byte

func (*JoinGroupResp) BytesLength

func (j *JoinGroupResp) BytesLength(version int16) int

type KafkaProtocolConfig

type KafkaProtocolConfig struct {
	ClusterId     string
	AdvertiseHost string
	AdvertisePort int
	NeedSasl      bool
	MaxConn       int32
}

type LeaveGroupMember

type LeaveGroupMember struct {
	MemberId        string
	GroupInstanceId *string
}

type LeaveGroupReq

type LeaveGroupReq struct {
	BaseReq
	GroupId string
	Members []*LeaveGroupMember
}

func DecodeLeaveGroupReq

func DecodeLeaveGroupReq(bytes []byte, version int16) (leaveGroupReq *LeaveGroupReq, err error)

type LeaveGroupResp

type LeaveGroupResp struct {
	BaseResp
	ErrorCode       int16
	ThrottleTime    int
	Members         []*LeaveGroupMember
	MemberErrorCode int16
}

func NewLeaveGroupResp

func NewLeaveGroupResp(corrId int) *LeaveGroupResp

func (*LeaveGroupResp) Bytes

func (l *LeaveGroupResp) Bytes() []byte

func (*LeaveGroupResp) BytesLength

func (l *LeaveGroupResp) BytesLength() int

type ListOffsetPartition

type ListOffsetPartition struct {
	PartitionId int
	LeaderEpoch int
	Time        int64
}

type ListOffsetPartitionResp

type ListOffsetPartitionResp struct {
	PartitionId int
	ErrorCode   int16
	Timestamp   int64
	Offset      int64
	LeaderEpoch int
}

type ListOffsetReq

type ListOffsetReq struct {
	BaseReq
	ReplicaId      int32
	IsolationLevel byte
	OffsetTopics   []*ListOffsetTopic
}

func DecodeListOffsetReq

func DecodeListOffsetReq(bytes []byte, version int16) (offsetReq *ListOffsetReq, err error)

type ListOffsetResp

type ListOffsetResp struct {
	BaseResp
	ErrorCode    int16
	ThrottleTime int
	OffsetTopics []*ListOffsetTopicResp
}

func NewListOffsetResp

func NewListOffsetResp(corrId int) *ListOffsetResp

func (*ListOffsetResp) Bytes

func (o *ListOffsetResp) Bytes(version int16) []byte

func (*ListOffsetResp) BytesLength

func (o *ListOffsetResp) BytesLength(version int16) int

type ListOffsetTopic

type ListOffsetTopic struct {
	Topic                string
	ListOffsetPartitions []*ListOffsetPartition
}

type ListOffsetTopicResp

type ListOffsetTopicResp struct {
	Topic                string
	ListOffsetPartitions []*ListOffsetPartitionResp
}

type Member

type Member struct {
	MemberId        string
	GroupInstanceId *string
	Metadata        string
}

type MetadataReq

type MetadataReq struct {
	BaseReq
	Topics                             []*MetadataTopicReq
	AllowAutoTopicCreation             bool
	IncludeClusterAuthorizedOperations bool
	IncludeTopicAuthorizedOperations   bool
}

func DecodeMetadataTopicReq

func DecodeMetadataTopicReq(bytes []byte, version int16) (metadataReq *MetadataReq, err error)

type MetadataResp

type MetadataResp struct {
	BaseResp
	ThrottleTime               int
	ErrorCode                  int16
	BrokerMetadataList         []*BrokerMetadata
	ClusterId                  string
	ControllerId               int
	TopicMetadataList          []*TopicMetadata
	ClusterAuthorizedOperation int
}

func NewMetadataResp

func NewMetadataResp(corrId int, config *KafkaProtocolConfig, topicName string, errorCode int16) *MetadataResp

func (*MetadataResp) Bytes

func (m *MetadataResp) Bytes() []byte

func (*MetadataResp) BytesLength

func (m *MetadataResp) BytesLength() int

type MetadataTopicReq

type MetadataTopicReq struct {
	Topic string
}

type OffsetCommitPartitionReq

type OffsetCommitPartitionReq struct {
	PartitionId int
	Offset      int64
	LeaderEpoch int
	Metadata    string
}

type OffsetCommitPartitionResp

type OffsetCommitPartitionResp struct {
	PartitionId int
	ErrorCode   int16
}

type OffsetCommitReq

type OffsetCommitReq struct {
	BaseReq
	GroupId                  string
	GenerationId             int
	MemberId                 string
	GroupInstanceId          *string
	OffsetCommitTopicReqList []*OffsetCommitTopicReq
}

func DecodeOffsetCommitReq

func DecodeOffsetCommitReq(bytes []byte, version int16) (offsetReq *OffsetCommitReq, err error)

type OffsetCommitResp

type OffsetCommitResp struct {
	BaseResp
	ThrottleTime int
	Topics       []*OffsetCommitTopicResp
}

func NewOffsetCommitResp

func NewOffsetCommitResp(corrId int) *OffsetCommitResp

func (*OffsetCommitResp) Bytes

func (o *OffsetCommitResp) Bytes(version int16) []byte

func (*OffsetCommitResp) BytesLength

func (o *OffsetCommitResp) BytesLength(version int16) int

type OffsetCommitTopicReq

type OffsetCommitTopicReq struct {
	Topic            string
	OffsetPartitions []*OffsetCommitPartitionReq
}

type OffsetCommitTopicResp

type OffsetCommitTopicResp struct {
	Topic      string
	Partitions []*OffsetCommitPartitionResp
}

type OffsetFetchPartitionReq

type OffsetFetchPartitionReq struct {
	PartitionId int
}

type OffsetFetchPartitionResp

type OffsetFetchPartitionResp struct {
	PartitionId int
	Offset      int64
	LeaderEpoch int
	Metadata    *string
	ErrorCode   int16
}

type OffsetFetchReq

type OffsetFetchReq struct {
	BaseReq
	GroupId             string
	TopicReqList        []*OffsetFetchTopicReq
	RequireStableOffset bool
}

func DecodeOffsetFetchReq

func DecodeOffsetFetchReq(bytes []byte, version int16) (fetchReq *OffsetFetchReq, err error)

type OffsetFetchResp

type OffsetFetchResp struct {
	BaseResp
	ThrottleTime  int
	ErrorCode     int16
	TopicRespList []*OffsetFetchTopicResp
}

func NewOffsetFetchResp

func NewOffsetFetchResp(corrId int) *OffsetFetchResp

func (*OffsetFetchResp) Bytes

func (o *OffsetFetchResp) Bytes(version int16) []byte

func (*OffsetFetchResp) BytesLength

func (o *OffsetFetchResp) BytesLength(version int16) int

type OffsetFetchTopicReq

type OffsetFetchTopicReq struct {
	Topic            string
	PartitionReqList []*OffsetFetchPartitionReq
}

type OffsetFetchTopicResp

type OffsetFetchTopicResp struct {
	Topic             string
	PartitionRespList []*OffsetFetchPartitionResp
}

type PartitionMetadata

type PartitionMetadata struct {
	ErrorCode       int16
	PartitionId     int
	LeaderId        int
	LeaderEpoch     int
	Replicas        []*Replica
	CaughtReplicas  []*Replica
	OfflineReplicas []*Replica
}

type Record

type Record struct {
	RecordAttributes  byte
	RelativeTimestamp int64
	RelativeOffset    int
	Key               []byte
	Value             string
	Headers           []byte
}

func (*Record) Bytes

func (r *Record) Bytes() []byte

func (*Record) BytesLength

func (r *Record) BytesLength() int

type RecordBatch

type RecordBatch struct {
	Offset      int64
	MessageSize int
	LeaderEpoch int
	MagicByte   byte
	// 8位flag字节
	Flags uint16

	LastOffsetDelta int
	FirstTimestamp  int64
	LastTimestamp   int64
	ProducerId      int64
	ProducerEpoch   int16
	BaseSequence    int
	Records         []*Record
}

func (*RecordBatch) Bytes

func (r *RecordBatch) Bytes() []byte

func (*RecordBatch) BytesLength

func (r *RecordBatch) BytesLength() int

type Replica

type Replica struct {
	ReplicaId int
}

type SaslAuthenticateReq

type SaslAuthenticateReq struct {
	BaseReq
	Username string
	Password string
}

func DecodeSaslHandshakeAuthReq

func DecodeSaslHandshakeAuthReq(bytes []byte, version int16) (authReq *SaslAuthenticateReq, err error)

type SaslAuthenticateResp

type SaslAuthenticateResp struct {
	BaseResp
	ErrorCode       int16
	ErrorMessage    string
	AuthBytes       []byte
	SessionLifetime int64
}

func NewSaslHandshakeAuthResp

func NewSaslHandshakeAuthResp(corrId int) *SaslAuthenticateResp

func (*SaslAuthenticateResp) Bytes

func (s *SaslAuthenticateResp) Bytes(version int16) []byte

Bytes 转化为字节数组 tagged field 暂不实现

func (*SaslAuthenticateResp) BytesLength

func (s *SaslAuthenticateResp) BytesLength(version int16) int

type SaslHandshakeReq

type SaslHandshakeReq struct {
	BaseReq
	SaslMechanism string
}

func DecodeSaslHandshakeReq

func DecodeSaslHandshakeReq(bytes []byte, version int16) (saslHandshakeReq *SaslHandshakeReq, err error)

DecodeSaslHandshakeReq SaslHandshakeReq

type SaslHandshakeResp

type SaslHandshakeResp struct {
	BaseResp
	ErrorCode        int16
	EnableMechanisms []*EnableMechanism
}

func NewSaslHandshakeResp

func NewSaslHandshakeResp(corrId int) *SaslHandshakeResp

func (*SaslHandshakeResp) Bytes

func (s *SaslHandshakeResp) Bytes() []byte

Bytes 转化为字节数组

func (*SaslHandshakeResp) BytesLength

func (s *SaslHandshakeResp) BytesLength() int

type SyncGroupReq

type SyncGroupReq struct {
	BaseReq
	GroupId          string
	GenerationId     int
	MemberId         string
	GroupInstanceId  *string
	ProtocolType     string
	ProtocolName     string
	GroupAssignments []*GroupAssignment
}

func DecodeSyncGroupReq

func DecodeSyncGroupReq(bytes []byte, version int16) (groupReq *SyncGroupReq, err error)

type SyncGroupResp

type SyncGroupResp struct {
	BaseResp
	ThrottleTime     int
	ErrorCode        int16
	ProtocolType     string
	ProtocolName     string
	MemberAssignment string
}

func NewSyncGroupResp

func NewSyncGroupResp(corrId int) *SyncGroupResp

func (*SyncGroupResp) Bytes

func (s *SyncGroupResp) Bytes(version int16) []byte

func (*SyncGroupResp) BytesLength

func (s *SyncGroupResp) BytesLength(version int16) int

type TopicMetadata

type TopicMetadata struct {
	ErrorCode                int16
	Topic                    string
	IsInternal               bool
	PartitionMetadataList    []*PartitionMetadata
	TopicAuthorizedOperation int
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL