codec

package
v0.0.0-...-c51d11d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2023 License: Apache-2.0 Imports: 8 Imported by: 4

Documentation

Index

Constants

View Source
const (
	LenAbortTransactions                  = 4
	LenAllowAutoTopicCreation             = 1
	LenApiKey                             = 2
	LenApiV0to2                           = 6
	LenApiV3                              = 7
	LenApiVersion                         = 2
	LenArray                              = 4
	LenBaseSequence                       = 4
	LenBatchIndex                         = 4
	LenClusterAuthOperation               = 4
	LenControllerId                       = 4
	LenCoordinatorType                    = 1
	LenCorrId                             = 4
	LenCrc32                              = 4
	LenErrorCode                          = 2
	LenFetchBytes                         = 4
	LenFetchMaxWaitTime                   = 4
	LenFetchSessionId                     = 4
	LenFetchSessionEpoch                  = 4
	LenGenerationId                       = 4
	LenIncludeClusterAuthorizedOperations = 1
	LenIncludeTopicAuthorizedOperations   = 1
	LenIsInternalV1                       = 1
	LenIsInternalV9                       = 4
	LenIsolationLevel                     = 1
	LenLastStableOffset                   = 8
	LenLeaderEpoch                        = 4
	LenLeaderId                           = 4
	LenLength                             = 4
	LenMagicByte                          = 1
	LenMessageSize                        = 4
	LenNodeId                             = 4
	LenOffset                             = 8
	LenOffsetDelta                        = 4
	LenPartitionId                        = 4
	LenPort                               = 4
	LenProducerId                         = 8
	LenProducerEpoch                      = 2
	LenRecordAttributes                   = 1
	LenReplicaId                          = 4
	LenRequiredAcks                       = 2
	LenRequireStableOffset                = 1
	LenSessionTimeout                     = 8
	LenStartOffset                        = 8
	LenTaggedField                        = 1
	LenThrottleTime                       = 4
	LenTime                               = 8
	LenTimeout                            = 4
	LenTopicAuthOperation                 = 4
	LenTransactionalId                    = 2
)

Variables

View Source
var InvalidProtocolContent = errors.New("InvalidProtocolContent")

Functions

func BytesLen

func BytesLen(bytes []byte) int

func CompactArrayLen

func CompactArrayLen(length int) int

func CompactBytesLen

func CompactBytesLen(bytes []byte) int

func CompactNullableBytesLen

func CompactNullableBytesLen(bytes []byte) int

func CompactNullableStrLen

func CompactNullableStrLen(str *string) int

func CompactStrLen

func CompactStrLen(str string) int

func CompactVarintBytesLen

func CompactVarintBytesLen(bytes []byte) int

func ConvertCompactLen

func ConvertCompactLen(compactLen int) int

ConvertCompactLen convert compactLen into realLen

func FourByteLength

func FourByteLength(bytes []byte) int

func NullableStrLen

func NullableStrLen(str *string) int

func PanicToError

func PanicToError(r any, stack []byte) error

func StrLen

func StrLen(str string) int

Types

type ApiCode

type ApiCode int16
const (
	Produce ApiCode = iota
	Fetch
	ListOffsets
	Metadata
	LeaderAndIsr
	StopReplica
	UpdateMetadata
	ControlledShutdown
	OffsetCommit
	OffsetFetch
	FindCoordinator
	JoinGroup
	Heartbeat
	LeaveGroup
	SyncGroup
	DescribeGroups
	ListGroups
	SaslHandshake
	ApiVersions
	CreateTopics
	DeleteTopics
	DeleteRecords
	InitProducerId
	OffsetForLeaderEpoch
	AddPartitionsToTxn
	AddOffsetsToTxn
	EndTxn
	WriteTxnMarkers
	TxnOffsetCommit
	DescribeAcls
	CreateAcls
	DeleteAcls
	DescribeConfigs
	AlterConfigs
	AlterReplicaLogDirs
	DescribeLogDirs
	SaslAuthenticate
	CreatePartitions
	CreateDelegationToken
	RenewDelegationToken
	ExpireDelegationToken
	DescribeDelegationToken
	DeleteGroups
	ElectLeaders
	IncrementalAlterConfigs
	AlterPartitionReassignments
	ListPartitionReassignments
	OffsetDelete
	DescribeClientQuotas
	AlterClientQuotas
	DescribeUserScramCredentials
	AlterUserScramCredentials
	AlterIsr
	UpdateFeatures
	DescribeCluster
	DescribeProducers
)

type ApiReq

type ApiReq struct {
	BaseReq
	ClientSoftwareName    string
	ClientSoftwareVersion string
}

func DecodeApiReq

func DecodeApiReq(bytes []byte, version int16) (apiReq *ApiReq, err error)

func (*ApiReq) Bytes

func (a *ApiReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*ApiReq) BytesLength

func (a *ApiReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type ApiResp

type ApiResp struct {
	BaseResp
	ErrorCode       ErrorCode
	ApiRespVersions []*ApiRespVersion
	ThrottleTime    int
}

func DecodeApiResp

func DecodeApiResp(bytes []byte, version int16) (apiResp *ApiResp, err error)

func (*ApiResp) Bytes

func (a *ApiResp) Bytes(version int16, containLen bool) []byte

func (*ApiResp) BytesLength

func (a *ApiResp) BytesLength(version int16) int

type ApiRespVersion

type ApiRespVersion struct {
	ApiKey     ApiCode
	MinVersion int16
	MaxVersion int16
}

type BaseReq

type BaseReq struct {
	ApiVersion    int16
	CorrelationId int
	ClientId      string
}

type BaseResp

type BaseResp struct {
	CorrelationId int
}

type BrokerMetadata

type BrokerMetadata struct {
	NodeId int32
	Host   string
	Port   int
	Rack   *string
}

type EnableMechanism

type EnableMechanism struct {
	SaslMechanism string
}

type ErrorCode

type ErrorCode int16
const (
	// UNKNOWN_SERVER_ERROR
	// The server experienced an unexpected error when processing the request.
	UNKNOWN_SERVER_ERROR ErrorCode = iota - 1
	NONE
	// OFFSET_OUT_OF_RANGE
	// The requested offset is not within the range of offsets maintained by the server.
	OFFSET_OUT_OF_RANGE
	// CORRUPT_MESSAGE
	// This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
	CORRUPT_MESSAGE
	// UNKNOWN_TOPIC_OR_PARTITION
	// This server does not host this topic-partition.
	UNKNOWN_TOPIC_OR_PARTITION
	// INVALID_FETCH_SIZE
	// The requested fetch size is invalid.
	INVALID_FETCH_SIZE
	// LEADER_NOT_AVAILABLE
	// There is no leader for this topic-partition as we are in the middle of a leadership election.
	LEADER_NOT_AVAILABLE
	// NOT_LEADER_OR_FOLLOWER
	// For requests intended only for the leader, this error indicates that the broker is not the current leader.
	// For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.
	NOT_LEADER_OR_FOLLOWER
	// REQUEST_TIMED_OUT
	// The request timed out.
	REQUEST_TIMED_OUT

	BROKER_NOT_AVAILABLE

	REPLICA_NOT_AVAILABLE

	MESSAGE_TOO_LARGE

	STALE_CONTROLLER_EPOCH

	OFFSET_METADATA_TOO_LARGE

	NETWORK_EXCEPTION

	COORDINATOR_LOAD_IN_PROGRESS

	COORDINATOR_NOT_AVAILABLE

	NOT_COORDINATOR

	INVALID_TOPIC_EXCEPTION

	RECORD_LIST_TOO_LARGE

	NOT_ENOUGH_REPLICAS

	NOT_ENOUGH_REPLICAS_AFTER_APPEND

	INVALID_REQUIRED_ACKS

	ILLEGAL_GENERATION

	INCONSISTENT_GROUP_PROTOCOL

	INVALID_GROUP_ID

	UNKNOWN_MEMBER_ID

	INVALID_SESSION_TIMEOUT

	REBALANCE_IN_PROGRESS

	INVALID_COMMIT_OFFSET_SIZE

	TOPIC_AUTHORIZATION_FAILED

	GROUP_AUTHORIZATION_FAILED

	CLUSTER_AUTHORIZATION_FAILED

	INVALID_TIMESTAMP

	UNSUPPORTED_SASL_MECHANISM

	ILLEGAL_SASL_STATE

	UNSUPPORTED_VERSION

	TOPIC_ALREADY_EXISTS

	INVALID_PARTITIONS

	INVALID_REPLICATION_FACTOR

	INVALID_REPLICA_ASSIGNMENT

	INVALID_CONFIG

	NOT_CONTROLLER

	INVALID_REQUEST

	UNSUPPORTED_FOR_MESSAGE_FORMAT

	POLICY_VIOLATION

	OUT_OF_ORDER_SEQUENCE_NUMBER

	DUPLICATE_SEQUENCE_NUMBER

	INVALID_PRODUCER_EPOCH

	INVALID_TXN_STATE

	INVALID_PRODUCER_ID_MAPPING

	INVALID_TRANSACTION_TIMEOUT

	CONCURRENT_TRANSACTIONS

	TRANSACTION_COORDINATOR_FENCED

	TRANSACTIONAL_ID_AUTHORIZATION_FAILED

	SECURITY_DISABLED

	OPERATION_NOT_ATTEMPTED

	KAFKA_STORAGE_ERROR

	LOG_DIR_NOT_FOUND

	SASL_AUTHENTICATION_FAILED

	UNKNOWN_PRODUCER_ID

	REASSIGNMENT_IN_PROGRESS

	DELEGATION_TOKEN_AUTH_DISABLED

	DELEGATION_TOKEN_NOT_FOUND

	DELEGATION_TOKEN_OWNER_MISMATCH

	DELEGATION_TOKEN_REQUEST_NOT_ALLOWED

	DELEGATION_TOKEN_AUTHORIZATION_FAILED

	DELEGATION_TOKEN_EXPIRED

	INVALID_PRINCIPAL_TYPE

	NON_EMPTY_GROUP

	GROUP_ID_NOT_FOUND

	FETCH_SESSION_ID_NOT_FOUND

	INVALID_FETCH_SESSION_EPOCH

	LISTENER_NOT_FOUND

	TOPIC_DELETION_DISABLED

	FENCED_LEADER_EPOCH

	UNKNOWN_LEADER_EPOCH

	UNSUPPORTED_COMPRESSION_TYPE

	STALE_BROKER_EPOCH

	OFFSET_NOT_AVAILABLE
	// MEMBER_ID_REQUIRED
	// The group member needs to have a valid member id before actually entering a consumer group.
	MEMBER_ID_REQUIRED

	PREFERRED_LEADER_NOT_AVAILABLE

	GROUP_MAX_SIZE_REACHED

	FENCED_INSTANCE_ID

	ELIGIBLE_LEADERS_NOT_AVAILABLE

	ELECTION_NOT_NEEDED

	NO_REASSIGNMENT_IN_PROGRESS

	GROUP_SUBSCRIBED_TO_TOPIC

	INVALID_RECORD

	UNSTABLE_OFFSET_COMMIT

	THROTTLING_QUOTA_EXCEEDED

	PRODUCER_FENCED

	RESOURCE_NOT_FOUND

	DUPLICATE_RESOURCE

	UNACCEPTABLE_CREDENTIAL

	INCONSISTENT_VOTER_SET

	INVALID_UPDATE_VERSION

	FEATURE_UPDATE_FAILED

	PRINCIPAL_DESERIALIZATION_FAILURE

	SNAPSHOT_NOT_FOUND

	POSITION_OUT_OF_RANGE

	UNKNOWN_TOPIC_ID

	DUPLICATE_BROKER_REGISTRATION

	BROKER_ID_NOT_REGISTERED

	INCONSISTENT_TOPIC_ID

	INCONSISTENT_CLUSTER_ID
)

type FetchPartitionReq

type FetchPartitionReq struct {
	PartitionId        int
	CurrentLeaderEpoch int32
	FetchOffset        int64
	LastFetchedEpoch   int
	LogStartOffset     int64
	PartitionMaxBytes  int
}

type FetchPartitionResp

type FetchPartitionResp struct {
	PartitionIndex      int
	ErrorCode           ErrorCode
	HighWatermark       int64
	LastStableOffset    int64
	LogStartOffset      int64
	AbortedTransactions int64
	ReplicaId           int32
	RecordBatch         *RecordBatch
}

type FetchReq

type FetchReq struct {
	BaseReq
	ReplicaId         int32
	MaxWaitTime       int
	MinBytes          int
	MaxBytes          int
	IsolationLevel    byte
	FetchSessionId    int
	FetchSessionEpoch int32
	TopicReqList      []*FetchTopicReq
}

func DecodeFetchReq

func DecodeFetchReq(bytes []byte, version int16) (fetchReq *FetchReq, err error)

func (*FetchReq) Bytes

func (f *FetchReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*FetchReq) BytesLength

func (f *FetchReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type FetchResp

type FetchResp struct {
	BaseResp
	ThrottleTime  int
	ErrorCode     ErrorCode
	SessionId     int
	TopicRespList []*FetchTopicResp
}

func DecodeFetchResp

func DecodeFetchResp(bytes []byte, version int16) (fetchResp *FetchResp, err error)

func NewFetchResp

func NewFetchResp(corrId int) *FetchResp

func (*FetchResp) Bytes

func (f *FetchResp) Bytes(version int16, containLen bool) []byte

func (*FetchResp) BytesLength

func (f *FetchResp) BytesLength(version int16) int

type FetchTopicReq

type FetchTopicReq struct {
	Topic            string
	PartitionReqList []*FetchPartitionReq
}

type FetchTopicResp

type FetchTopicResp struct {
	Topic             string
	PartitionRespList []*FetchPartitionResp
}

type FindCoordinatorReq

type FindCoordinatorReq struct {
	BaseReq
	Key     string
	KeyType byte
}

func DecodeFindCoordinatorReq

func DecodeFindCoordinatorReq(bytes []byte, version int16) (findCoordinatorReq *FindCoordinatorReq, err error)

func (*FindCoordinatorReq) Bytes

func (f *FindCoordinatorReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*FindCoordinatorReq) BytesLength

func (f *FindCoordinatorReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type FindCoordinatorResp

type FindCoordinatorResp struct {
	BaseResp
	ErrorCode    ErrorCode
	ThrottleTime int
	ErrorMessage *string
	NodeId       int32
	Host         string
	Port         int
}

func DecodeFindCoordinatorResp

func DecodeFindCoordinatorResp(bytes []byte, version int16) (fResp *FindCoordinatorResp, err error)

func (*FindCoordinatorResp) Bytes

func (f *FindCoordinatorResp) Bytes(version int16, containLen bool) []byte

func (*FindCoordinatorResp) BytesLength

func (f *FindCoordinatorResp) BytesLength(version int16) int

type GroupAssignment

type GroupAssignment struct {
	MemberId         string
	MemberAssignment []byte
}

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}
type Header struct {
	Key   string
	Value []byte
}

type HeartbeatReq

type HeartbeatReq struct {
	BaseReq
	GroupId         string
	GenerationId    int
	MemberId        string
	GroupInstanceId *string
}

func DecodeHeartbeatReq

func DecodeHeartbeatReq(bytes []byte, version int16) (heartBeatReq *HeartbeatReq, err error)

func (*HeartbeatReq) Bytes

func (h *HeartbeatReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*HeartbeatReq) BytesLength

func (h *HeartbeatReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type HeartbeatResp

type HeartbeatResp struct {
	BaseResp
	ErrorCode    ErrorCode
	ThrottleTime int
}

func DecodeHeartbeatResp

func DecodeHeartbeatResp(bytes []byte, version int16) (resp *HeartbeatResp, err error)

func (*HeartbeatResp) Bytes

func (h *HeartbeatResp) Bytes(version int16, containLen bool) []byte

func (*HeartbeatResp) BytesLength

func (h *HeartbeatResp) BytesLength(version int16) int

type JoinGroupReq

type JoinGroupReq struct {
	BaseReq
	GroupId          string
	SessionTimeout   int
	RebalanceTimeout int
	MemberId         string
	GroupInstanceId  *string
	ProtocolType     string
	GroupProtocols   []*GroupProtocol
}

func DecodeJoinGroupReq

func DecodeJoinGroupReq(bytes []byte, version int16) (joinGroupReq *JoinGroupReq, err error)

func (*JoinGroupReq) Bytes

func (j *JoinGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*JoinGroupReq) BytesLength

func (j *JoinGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type JoinGroupResp

type JoinGroupResp struct {
	BaseResp
	ErrorCode    ErrorCode
	ThrottleTime int
	GenerationId int
	ProtocolType *string
	ProtocolName string
	LeaderId     string
	MemberId     string
	Members      []*Member
}

func DecodeJoinGroupResp

func DecodeJoinGroupResp(bytes []byte, version int16) (resp *JoinGroupResp, err error)

func (*JoinGroupResp) Bytes

func (j *JoinGroupResp) Bytes(version int16, containLen bool) []byte

func (*JoinGroupResp) BytesLength

func (j *JoinGroupResp) BytesLength(version int16) int

type LeaveGroupMember

type LeaveGroupMember struct {
	MemberId        string
	GroupInstanceId *string
}

type LeaveGroupReq

type LeaveGroupReq struct {
	BaseReq
	GroupId string
	Members []*LeaveGroupMember
}

func DecodeLeaveGroupReq

func DecodeLeaveGroupReq(bytes []byte, version int16) (leaveGroupReq *LeaveGroupReq, err error)

func (*LeaveGroupReq) Bytes

func (l *LeaveGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*LeaveGroupReq) BytesLength

func (l *LeaveGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type LeaveGroupResp

type LeaveGroupResp struct {
	BaseResp
	ErrorCode       ErrorCode
	ThrottleTime    int
	Members         []*LeaveGroupMember
	MemberErrorCode ErrorCode
}

func DecodeLeaveGroupResp

func DecodeLeaveGroupResp(bytes []byte, version int16) (resp *LeaveGroupResp, err error)

func (*LeaveGroupResp) Bytes

func (l *LeaveGroupResp) Bytes(version int16, containLen bool) []byte

func (*LeaveGroupResp) BytesLength

func (l *LeaveGroupResp) BytesLength(version int16) int

type ListOffsetsPartition

type ListOffsetsPartition struct {
	PartitionId int
	LeaderEpoch int32
	Time        int64
}

type ListOffsetsPartitionResp

type ListOffsetsPartitionResp struct {
	PartitionId int
	ErrorCode   ErrorCode
	Timestamp   int64
	Offset      int64
	LeaderEpoch int32
}

type ListOffsetsReq

type ListOffsetsReq struct {
	BaseReq
	ReplicaId      int32
	IsolationLevel byte
	TopicReqList   []*ListOffsetsTopic
}

func DecodeListOffsetsReq

func DecodeListOffsetsReq(bytes []byte, version int16) (offsetReq *ListOffsetsReq, err error)

func (*ListOffsetsReq) Bytes

func (l *ListOffsetsReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*ListOffsetsReq) BytesLength

func (l *ListOffsetsReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type ListOffsetsResp

type ListOffsetsResp struct {
	BaseResp
	ErrorCode     ErrorCode
	ThrottleTime  int
	TopicRespList []*ListOffsetsTopicResp
}

func DecodeListOffsetsResp

func DecodeListOffsetsResp(bytes []byte, version int16) (resp *ListOffsetsResp, err error)

func (*ListOffsetsResp) Bytes

func (o *ListOffsetsResp) Bytes(version int16, containLen bool) []byte

func (*ListOffsetsResp) BytesLength

func (o *ListOffsetsResp) BytesLength(version int16) int

type ListOffsetsTopic

type ListOffsetsTopic struct {
	Topic            string
	PartitionReqList []*ListOffsetsPartition
}

type ListOffsetsTopicResp

type ListOffsetsTopicResp struct {
	Topic             string
	PartitionRespList []*ListOffsetsPartitionResp
}

type Member

type Member struct {
	MemberId        string
	GroupInstanceId *string
	Metadata        []byte
}

type MetadataReq

type MetadataReq struct {
	BaseReq
	Topics                             []*MetadataTopicReq
	AllowAutoTopicCreation             bool
	IncludeClusterAuthorizedOperations bool
	IncludeTopicAuthorizedOperations   bool
}

func DecodeMetadataReq

func DecodeMetadataReq(bytes []byte, version int16) (metadataReq *MetadataReq, err error)

func (*MetadataReq) Bytes

func (m *MetadataReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*MetadataReq) BytesLength

func (m *MetadataReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type MetadataResp

type MetadataResp struct {
	BaseResp
	ThrottleTime               int
	ErrorCode                  int16
	BrokerMetadataList         []*BrokerMetadata
	ClusterId                  string
	ControllerId               int32
	TopicMetadataList          []*TopicMetadata
	ClusterAuthorizedOperation int
}

func DecodeMetadataResp

func DecodeMetadataResp(bytes []byte, version int16) (metadataResp *MetadataResp, err error)

func (*MetadataResp) Bytes

func (m *MetadataResp) Bytes(version int16, containLen bool) []byte

func (*MetadataResp) BytesLength

func (m *MetadataResp) BytesLength(version int16) int

type MetadataTopicReq

type MetadataTopicReq struct {
	Topic string
}

type OffsetCommitPartitionReq

type OffsetCommitPartitionReq struct {
	PartitionId int
	Offset      int64
	LeaderEpoch int32
	Metadata    string
}

type OffsetCommitPartitionResp

type OffsetCommitPartitionResp struct {
	PartitionId int
	ErrorCode   ErrorCode
}

type OffsetCommitReq

type OffsetCommitReq struct {
	BaseReq
	GroupId         string
	GenerationId    int
	MemberId        string
	RetentionTime   int64
	GroupInstanceId *string
	TopicReqList    []*OffsetCommitTopicReq
}

func DecodeOffsetCommitReq

func DecodeOffsetCommitReq(bytes []byte, version int16) (offsetReq *OffsetCommitReq, err error)

func (*OffsetCommitReq) Bytes

func (o *OffsetCommitReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*OffsetCommitReq) BytesLength

func (o *OffsetCommitReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type OffsetCommitResp

type OffsetCommitResp struct {
	BaseResp
	ThrottleTime  int
	TopicRespList []*OffsetCommitTopicResp
}

func DecodeOffsetCommitResp

func DecodeOffsetCommitResp(bytes []byte, version int16) (resp *OffsetCommitResp, err error)

func (*OffsetCommitResp) Bytes

func (o *OffsetCommitResp) Bytes(version int16, containLen bool) []byte

func (*OffsetCommitResp) BytesLength

func (o *OffsetCommitResp) BytesLength(version int16) int

type OffsetCommitTopicReq

type OffsetCommitTopicReq struct {
	Topic            string
	PartitionReqList []*OffsetCommitPartitionReq
}

type OffsetCommitTopicResp

type OffsetCommitTopicResp struct {
	Topic             string
	PartitionRespList []*OffsetCommitPartitionResp
}

type OffsetFetchPartitionReq

type OffsetFetchPartitionReq struct {
	PartitionId int
}

type OffsetFetchPartitionResp

type OffsetFetchPartitionResp struct {
	PartitionId int
	Offset      int64
	LeaderEpoch int32
	Metadata    *string
	ErrorCode   ErrorCode
}

type OffsetFetchReq

type OffsetFetchReq struct {
	BaseReq
	GroupId             string
	TopicReqList        []*OffsetFetchTopicReq
	RequireStableOffset bool
}

func DecodeOffsetFetchReq

func DecodeOffsetFetchReq(bytes []byte, version int16) (fetchReq *OffsetFetchReq, err error)

func (*OffsetFetchReq) Bytes

func (o *OffsetFetchReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*OffsetFetchReq) BytesLength

func (o *OffsetFetchReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type OffsetFetchResp

type OffsetFetchResp struct {
	BaseResp
	ThrottleTime  int
	ErrorCode     ErrorCode
	TopicRespList []*OffsetFetchTopicResp
}

func DecodeOffsetFetchResp

func DecodeOffsetFetchResp(bytes []byte, version int16) (resp *OffsetFetchResp, err error)

func (*OffsetFetchResp) Bytes

func (o *OffsetFetchResp) Bytes(version int16, containLen bool) []byte

func (*OffsetFetchResp) BytesLength

func (o *OffsetFetchResp) BytesLength(version int16) int

type OffsetFetchTopicReq

type OffsetFetchTopicReq struct {
	Topic            string
	PartitionReqList []*OffsetFetchPartitionReq
}

type OffsetFetchTopicResp

type OffsetFetchTopicResp struct {
	Topic             string
	PartitionRespList []*OffsetFetchPartitionResp
}

type OffsetForLeaderEpochPartitionResp

type OffsetForLeaderEpochPartitionResp struct {
	ErrorCode   ErrorCode
	PartitionId int
	LeaderEpoch int32
	Offset      int64
}

type OffsetForLeaderEpochReq

type OffsetForLeaderEpochReq struct {
	BaseReq
	ReplicaId    int32
	TopicReqList []*OffsetLeaderEpochTopicReq
}

func DecodeOffsetForLeaderEpochReq

func DecodeOffsetForLeaderEpochReq(bytes []byte, version int16) (leaderEpochReq *OffsetForLeaderEpochReq, err error)

func (*OffsetForLeaderEpochReq) Bytes

func (o *OffsetForLeaderEpochReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*OffsetForLeaderEpochReq) BytesLength

func (o *OffsetForLeaderEpochReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type OffsetForLeaderEpochResp

type OffsetForLeaderEpochResp struct {
	BaseResp
	ThrottleTime  int
	TopicRespList []*OffsetForLeaderEpochTopicResp
}

func DecodeOffsetForLeaderEpochResp

func DecodeOffsetForLeaderEpochResp(bytes []byte, version int16) (resp *OffsetForLeaderEpochResp, err error)

func (*OffsetForLeaderEpochResp) Bytes

func (o *OffsetForLeaderEpochResp) Bytes(version int16, containLen bool) []byte

func (*OffsetForLeaderEpochResp) BytesLength

func (o *OffsetForLeaderEpochResp) BytesLength(version int16) int

type OffsetForLeaderEpochTopicResp

type OffsetForLeaderEpochTopicResp struct {
	Topic             string
	PartitionRespList []*OffsetForLeaderEpochPartitionResp
}

type OffsetLeaderEpochPartitionReq

type OffsetLeaderEpochPartitionReq struct {
	PartitionId        int
	CurrentLeaderEpoch int32
	LeaderEpoch        int32
}

type OffsetLeaderEpochTopicReq

type OffsetLeaderEpochTopicReq struct {
	Topic            string
	PartitionReqList []*OffsetLeaderEpochPartitionReq
}

type PartitionMetadata

type PartitionMetadata struct {
	ErrorCode       ErrorCode
	PartitionId     int
	LeaderId        int32
	LeaderEpoch     int32
	Replicas        []*Replica
	CaughtReplicas  []*Replica
	OfflineReplicas []*Replica
}

type ProducePartitionReq

type ProducePartitionReq struct {
	PartitionId int
	RecordBatch *RecordBatch
}

type ProducePartitionResp

type ProducePartitionResp struct {
	PartitionId     int
	ErrorCode       ErrorCode
	Offset          int64
	Time            int64
	LogStartOffset  int64
	RecordErrorList []*RecordError
	ErrorMessage    *string
}

type ProduceReq

type ProduceReq struct {
	BaseReq
	ClientId      string
	TransactionId *string
	RequiredAcks  int16
	Timeout       int
	TopicReqList  []*ProduceTopicReq
}

func DecodeProduceReq

func DecodeProduceReq(bytes []byte, version int16) (produceReq *ProduceReq, err error)

func (*ProduceReq) Bytes

func (p *ProduceReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*ProduceReq) BytesLength

func (p *ProduceReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type ProduceResp

type ProduceResp struct {
	BaseResp
	TopicRespList []*ProduceTopicResp
	ThrottleTime  int
}

func DecodeProduceResp

func DecodeProduceResp(bytes []byte, version int16) (resp *ProduceResp, err error)

func (*ProduceResp) Bytes

func (p *ProduceResp) Bytes(version int16, containLen bool) []byte

func (*ProduceResp) BytesLength

func (p *ProduceResp) BytesLength(version int16) int

type ProduceTopicReq

type ProduceTopicReq struct {
	Topic            string
	PartitionReqList []*ProducePartitionReq
}

type ProduceTopicResp

type ProduceTopicResp struct {
	Topic             string
	PartitionRespList []*ProducePartitionResp
}

type Record

type Record struct {
	RecordAttributes  byte
	RelativeTimestamp int64
	RelativeOffset    int
	Key               []byte
	Value             []byte
	Headers           []*Header
}

func DecodeRecord

func DecodeRecord(bytes []byte, version int16) *Record

func (*Record) Bytes

func (r *Record) Bytes() []byte

func (*Record) BytesLength

func (r *Record) BytesLength() int

type RecordBatch

type RecordBatch struct {
	Offset      int64
	MessageSize int
	LeaderEpoch int32
	MagicByte   byte
	Flags       uint16

	LastOffsetDelta int
	FirstTimestamp  int64
	LastTimestamp   int64
	ProducerId      int64
	ProducerEpoch   int16
	BaseSequence    int32
	Records         []*Record
}

func DecodeRecordBatch

func DecodeRecordBatch(bytes []byte, version int16) *RecordBatch

func (*RecordBatch) Bytes

func (r *RecordBatch) Bytes() []byte

func (*RecordBatch) BytesLength

func (r *RecordBatch) BytesLength() int

func (*RecordBatch) RecordBatchMessageLength

func (r *RecordBatch) RecordBatchMessageLength() int

type RecordError

type RecordError struct {
	BatchIndex             int32
	BatchIndexErrorMessage *string
}

type Replica

type Replica struct {
	ReplicaId int32
}

type SaslAuthenticateReq

type SaslAuthenticateReq struct {
	BaseReq
	Username string
	Password string
}

func DecodeSaslAuthenticateReq

func DecodeSaslAuthenticateReq(bytes []byte, version int16) (authReq *SaslAuthenticateReq, err error)

func (*SaslAuthenticateReq) Bytes

func (s *SaslAuthenticateReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*SaslAuthenticateReq) BytesLength

func (s *SaslAuthenticateReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type SaslAuthenticateResp

type SaslAuthenticateResp struct {
	BaseResp
	ErrorCode       ErrorCode
	ErrorMessage    string
	AuthBytes       []byte
	SessionLifetime int64
}

func DecodeSaslAuthenticateResp

func DecodeSaslAuthenticateResp(bytes []byte, version int16) (resp *SaslAuthenticateResp, err error)

func (*SaslAuthenticateResp) Bytes

func (s *SaslAuthenticateResp) Bytes(version int16, containLen bool) []byte

func (*SaslAuthenticateResp) BytesLength

func (s *SaslAuthenticateResp) BytesLength(version int16) int

type SaslHandshakeReq

type SaslHandshakeReq struct {
	BaseReq
	SaslMechanism string
}

func DecodeSaslHandshakeReq

func DecodeSaslHandshakeReq(bytes []byte, version int16) (saslHandshakeReq *SaslHandshakeReq, err error)

func (*SaslHandshakeReq) Bytes

func (s *SaslHandshakeReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*SaslHandshakeReq) BytesLength

func (s *SaslHandshakeReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type SaslHandshakeResp

type SaslHandshakeResp struct {
	BaseResp
	ErrorCode        ErrorCode
	EnableMechanisms []*EnableMechanism
}

func DecodeSaslHandshakeResp

func DecodeSaslHandshakeResp(bytes []byte, version int16) (resp *SaslHandshakeResp, err error)

func (*SaslHandshakeResp) Bytes

func (s *SaslHandshakeResp) Bytes(version int16, containLen bool) []byte

func (*SaslHandshakeResp) BytesLength

func (s *SaslHandshakeResp) BytesLength(version int16) int

type SyncGroupReq

type SyncGroupReq struct {
	BaseReq
	GroupId          string
	GenerationId     int
	MemberId         string
	GroupInstanceId  *string
	ProtocolType     string
	ProtocolName     string
	GroupAssignments []*GroupAssignment
}

func DecodeSyncGroupReq

func DecodeSyncGroupReq(bytes []byte, version int16) (groupReq *SyncGroupReq, err error)

func (*SyncGroupReq) Bytes

func (s *SyncGroupReq) Bytes(containLen bool, containApiKeyVersion bool) []byte

func (*SyncGroupReq) BytesLength

func (s *SyncGroupReq) BytesLength(containLen bool, containApiKeyVersion bool) int

type SyncGroupResp

type SyncGroupResp struct {
	BaseResp
	ThrottleTime     int
	ErrorCode        ErrorCode
	ProtocolType     string
	ProtocolName     string
	MemberAssignment []byte
}

func DecodeSyncGroupResp

func DecodeSyncGroupResp(bytes []byte, version int16) (resp *SyncGroupResp, err error)

func (*SyncGroupResp) Bytes

func (s *SyncGroupResp) Bytes(version int16, containLen bool) []byte

func (*SyncGroupResp) BytesLength

func (s *SyncGroupResp) BytesLength(version int16) int

type TopicMetadata

type TopicMetadata struct {
	ErrorCode                ErrorCode
	Topic                    string
	IsInternal               bool
	PartitionMetadataList    []*PartitionMetadata
	TopicAuthorizedOperation int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL