protocol

package
v0.0.0-...-2c05570 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2017 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProduceKey            = 0
	FetchKey              = 1
	OffsetsKey            = 2
	MetadataKey           = 3
	LeaderAndISRKey       = 4
	StopReplicaKey        = 5
	UpdateMetadataKey     = 6
	ControlledShutdownKey = 7
	OffsetCommitKey       = 8
	OffsetFetchKey        = 9
	GroupCoordinatorKey   = 10
	JoinGroupKey          = 11
	HeartbeatKey          = 12
	LeaveGroupKey         = 13
	SyncGroupKey          = 14
	DescribeGroupsKey     = 15
	ListGroupsKey         = 16
	SaslHandshakeKey      = 17
	APIVersionsKey        = 18
	CreateTopicsKey       = 19
	DeleteTopicsKey       = 20
)

Protocol API keys. See: https://kafka.apache.org/protocol#protocol_api_keys

Variables

View Source
var (
	ErrUnknown                            = Error{/* contains filtered or unexported fields */}
	ErrNone                               = Error{/* contains filtered or unexported fields */}
	ErrOffsetOutOfRange                   = Error{/* contains filtered or unexported fields */}
	ErrCorruptMessage                     = Error{/* contains filtered or unexported fields */}
	ErrUnknownTopicOrPartition            = Error{/* contains filtered or unexported fields */}
	ErrInvalidFetchSize                   = Error{/* contains filtered or unexported fields */}
	ErrLeaderNotAvailable                 = Error{/* contains filtered or unexported fields */}
	ErrNotLeaderForPartition              = Error{/* contains filtered or unexported fields */}
	ErrRequestTimedOut                    = Error{/* contains filtered or unexported fields */}
	ErrBrokerNotAvailable                 = Error{/* contains filtered or unexported fields */}
	ErrReplicaNotAvailable                = Error{/* contains filtered or unexported fields */}
	ErrMessageTooLarge                    = Error{/* contains filtered or unexported fields */}
	ErrStaleControllerEpoch               = Error{/* contains filtered or unexported fields */}
	ErrOffsetMetadataTooLarge             = Error{/* contains filtered or unexported fields */}
	ErrNetworkException                   = Error{/* contains filtered or unexported fields */}
	ErrCoordinatorLoadInProgress          = Error{/* contains filtered or unexported fields */}
	ErrCoordinatorNotAvailable            = Error{/* contains filtered or unexported fields */}
	ErrNotCoordinator                     = Error{/* contains filtered or unexported fields */}
	ErrInvalidTopicException              = Error{/* contains filtered or unexported fields */}
	ErrRecordListTooLarge                 = Error{/* contains filtered or unexported fields */}
	ErrNotEnoughReplicas                  = Error{/* contains filtered or unexported fields */}
	ErrNotEnoughReplicasAfterAppend       = Error{/* contains filtered or unexported fields */}
	ErrInvalidRequiredAcks                = Error{/* contains filtered or unexported fields */}
	ErrIllegalGeneration                  = Error{/* contains filtered or unexported fields */}
	ErrInconsistentGroupProtocol          = Error{/* contains filtered or unexported fields */}
	ErrInvalidGroupId                     = Error{/* contains filtered or unexported fields */}
	ErrUnknownMemberId                    = Error{/* contains filtered or unexported fields */}
	ErrInvalidSessionTimeout              = Error{/* contains filtered or unexported fields */}
	ErrRebalanceInProgress                = Error{/* contains filtered or unexported fields */}
	ErrInvalidCommitOffsetSize            = Error{/* contains filtered or unexported fields */}
	ErrTopicAuthorizationFailed           = Error{/* contains filtered or unexported fields */}
	ErrGroupAuthorizationFailed           = Error{/* contains filtered or unexported fields */}
	ErrClusterAuthorizationFailed         = Error{/* contains filtered or unexported fields */}
	ErrInvalidTimestamp                   = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedSaslMechanism           = Error{/* contains filtered or unexported fields */}
	ErrIllegalSaslState                   = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedVersion                 = Error{/* contains filtered or unexported fields */}
	ErrTopicAlreadyExists                 = Error{/* contains filtered or unexported fields */}
	ErrInvalidPartitions                  = Error{/* contains filtered or unexported fields */}
	ErrInvalidReplicationFactor           = Error{/* contains filtered or unexported fields */}
	ErrInvalidReplicaAssignment           = Error{/* contains filtered or unexported fields */}
	ErrInvalidConfig                      = Error{/* contains filtered or unexported fields */}
	ErrNotController                      = Error{/* contains filtered or unexported fields */}
	ErrInvalidRequest                     = Error{/* contains filtered or unexported fields */}
	ErrUnsupportedForMessageFormat        = Error{/* contains filtered or unexported fields */}
	ErrPolicyViolation                    = Error{/* contains filtered or unexported fields */}
	ErrOutOfOrderSequenceNumber           = Error{/* contains filtered or unexported fields */}
	ErrDuplicateSequenceNumber            = Error{/* contains filtered or unexported fields */}
	ErrInvalidProducerEpoch               = Error{/* contains filtered or unexported fields */}
	ErrInvalidTxnState                    = Error{/* contains filtered or unexported fields */}
	ErrInvalidProducerIdMapping           = Error{/* contains filtered or unexported fields */}
	ErrInvalidTransactionTimeout          = Error{/* contains filtered or unexported fields */}
	ErrConcurrentTransactions             = Error{/* contains filtered or unexported fields */}
	ErrTransactionCoordinatorFenced       = Error{/* contains filtered or unexported fields */}
	ErrTransactionalIdAuthorizationFailed = Error{/* contains filtered or unexported fields */}
	ErrSecurityDisabled                   = Error{/* contains filtered or unexported fields */}
	ErrOperationNotAttempted              = Error{/* contains filtered or unexported fields */}

	// Errs maps err codes to their errs.
	Errs = map[int16]Error{
		-1: ErrUnknown,
		0:  ErrNone,
		1:  ErrOffsetOutOfRange,
		2:  ErrCorruptMessage,
		3:  ErrUnknownTopicOrPartition,
		4:  ErrInvalidFetchSize,
		5:  ErrLeaderNotAvailable,
		6:  ErrNotLeaderForPartition,
		7:  ErrRequestTimedOut,
		8:  ErrBrokerNotAvailable,
		9:  ErrReplicaNotAvailable,
		10: ErrMessageTooLarge,
		11: ErrStaleControllerEpoch,
		12: ErrOffsetMetadataTooLarge,
		13: ErrNetworkException,
		14: ErrCoordinatorLoadInProgress,
		15: ErrCoordinatorNotAvailable,
		16: ErrNotCoordinator,
		17: ErrInvalidTopicException,
		18: ErrRecordListTooLarge,
		19: ErrNotEnoughReplicas,
		20: ErrNotEnoughReplicasAfterAppend,
		21: ErrInvalidRequiredAcks,
		22: ErrIllegalGeneration,
		23: ErrInconsistentGroupProtocol,
		24: ErrInvalidGroupId,
		25: ErrUnknownMemberId,
		26: ErrInvalidSessionTimeout,
		27: ErrRebalanceInProgress,
		28: ErrInvalidCommitOffsetSize,
		29: ErrTopicAuthorizationFailed,
		30: ErrGroupAuthorizationFailed,
		31: ErrClusterAuthorizationFailed,
		32: ErrInvalidTimestamp,
		33: ErrUnsupportedSaslMechanism,
		34: ErrIllegalSaslState,
		35: ErrUnsupportedVersion,
		36: ErrTopicAlreadyExists,
		37: ErrInvalidPartitions,
		38: ErrInvalidReplicationFactor,
		39: ErrInvalidReplicaAssignment,
		40: ErrInvalidConfig,
		41: ErrNotController,
		42: ErrInvalidRequest,
		43: ErrUnsupportedForMessageFormat,
		44: ErrPolicyViolation,
		45: ErrOutOfOrderSequenceNumber,
		46: ErrDuplicateSequenceNumber,
		47: ErrInvalidProducerEpoch,
		48: ErrInvalidTxnState,
		49: ErrInvalidProducerIdMapping,
		50: ErrInvalidTransactionTimeout,
		51: ErrConcurrentTransactions,
		52: ErrTransactionCoordinatorFenced,
		53: ErrTransactionalIdAuthorizationFailed,
		54: ErrSecurityDisabled,
		55: ErrOperationNotAttempted,
	}
)
View Source
var Encoding = binary.BigEndian
View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
View Source
var ErrInvalidArrayLength = errors.New("kafka: invalid array length")
View Source
var ErrInvalidByteSliceLength = errors.New("invalid byteslice length")
View Source
var ErrInvalidStringLength = errors.New("kafka: invalid string length")

Functions

func Decode

func Decode(b []byte, in Decoder) error

func Encode

func Encode(e Encoder) ([]byte, error)

func Read

func Read(r io.Reader, data interface{}) error

func Size

func Size(v interface{}) int

func Write

func Write(w io.Writer, data interface{}) error

Types

type APIVersion

type APIVersion struct {
	APIKey     int16
	MinVersion int16
	MaxVersion int16
}

type APIVersionsRequest

type APIVersionsRequest struct{}

func (*APIVersionsRequest) Decode

func (c *APIVersionsRequest) Decode(_ PacketDecoder) error

func (*APIVersionsRequest) Encode

func (c *APIVersionsRequest) Encode(_ PacketEncoder) error

func (*APIVersionsRequest) Key

func (c *APIVersionsRequest) Key() int16

func (*APIVersionsRequest) Version

func (c *APIVersionsRequest) Version() int16

type APIVersionsResponse

type APIVersionsResponse struct {
	APIVersions []APIVersion
	ErrorCode   int16
}

func (*APIVersionsResponse) Decode

func (*APIVersionsResponse) Encode

type Body

type Body interface {
	Encoder
	Key() int16
	Version() int16
}

type Broker

type Broker struct {
	NodeID int32
	Host   string
	Port   int32
}

type ByteDecoder

type ByteDecoder struct {
	// contains filtered or unexported fields
}

func NewDecoder

func NewDecoder(b []byte) *ByteDecoder

func (*ByteDecoder) ArrayLength

func (d *ByteDecoder) ArrayLength() (int, error)

func (*ByteDecoder) Bool

func (d *ByteDecoder) Bool() (bool, error)

func (*ByteDecoder) Bytes

func (d *ByteDecoder) Bytes() ([]byte, error)

func (*ByteDecoder) Int16

func (d *ByteDecoder) Int16() (int16, error)

func (*ByteDecoder) Int32

func (d *ByteDecoder) Int32() (int32, error)

func (*ByteDecoder) Int32Array

func (d *ByteDecoder) Int32Array() ([]int32, error)

func (*ByteDecoder) Int64

func (d *ByteDecoder) Int64() (int64, error)

func (*ByteDecoder) Int64Array

func (d *ByteDecoder) Int64Array() ([]int64, error)

func (*ByteDecoder) Int8

func (d *ByteDecoder) Int8() (int8, error)

func (*ByteDecoder) Offset

func (d *ByteDecoder) Offset() int

func (*ByteDecoder) Pop

func (d *ByteDecoder) Pop() error

func (*ByteDecoder) Push

func (d *ByteDecoder) Push(pd PushDecoder) error

func (*ByteDecoder) String

func (d *ByteDecoder) String() (string, error)

func (*ByteDecoder) StringArray

func (d *ByteDecoder) StringArray() ([]string, error)

type ByteEncoder

type ByteEncoder struct {
	// contains filtered or unexported fields
}

func NewByteEncoder

func NewByteEncoder(b []byte) *ByteEncoder

func (*ByteEncoder) Bytes

func (b *ByteEncoder) Bytes() []byte

func (*ByteEncoder) Pop

func (e *ByteEncoder) Pop()

func (*ByteEncoder) Push

func (e *ByteEncoder) Push(pe PushEncoder)

func (*ByteEncoder) PutArrayLength

func (e *ByteEncoder) PutArrayLength(in int) error

func (*ByteEncoder) PutBool

func (e *ByteEncoder) PutBool(in bool)

func (*ByteEncoder) PutBytes

func (e *ByteEncoder) PutBytes(in []byte) error

func (*ByteEncoder) PutInt16

func (e *ByteEncoder) PutInt16(in int16)

func (*ByteEncoder) PutInt32

func (e *ByteEncoder) PutInt32(in int32)

func (*ByteEncoder) PutInt32Array

func (e *ByteEncoder) PutInt32Array(in []int32) error

func (*ByteEncoder) PutInt64

func (e *ByteEncoder) PutInt64(in int64)

func (*ByteEncoder) PutInt64Array

func (e *ByteEncoder) PutInt64Array(in []int64) error

func (*ByteEncoder) PutInt8

func (e *ByteEncoder) PutInt8(in int8)

func (*ByteEncoder) PutRawBytes

func (e *ByteEncoder) PutRawBytes(in []byte) error

func (*ByteEncoder) PutString

func (e *ByteEncoder) PutString(in string) error

func (*ByteEncoder) PutStringArray

func (e *ByteEncoder) PutStringArray(in []string) error

type CRCField

type CRCField struct {
	StartOffset int
}

func (*CRCField) Check

func (f *CRCField) Check(curOffset int, buf []byte) error

func (*CRCField) Fill

func (f *CRCField) Fill(curOffset int, buf []byte) error

func (*CRCField) ReserveSize

func (f *CRCField) ReserveSize() int

func (*CRCField) SaveOffset

func (f *CRCField) SaveOffset(in int)

type Coordinator

type Coordinator struct {
	NodeID int32
	Host   string
	Port   int32
}

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic             string
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	Configs           map[string]string
}

type CreateTopicRequests

type CreateTopicRequests struct {
	Requests []*CreateTopicRequest
	Timeout  int32
}

func (*CreateTopicRequests) Decode

func (*CreateTopicRequests) Encode

func (*CreateTopicRequests) Key

func (c *CreateTopicRequests) Key() int16

func (*CreateTopicRequests) Version

func (c *CreateTopicRequests) Version() int16

type CreateTopicsResponse

type CreateTopicsResponse struct {
	TopicErrorCodes []*TopicErrorCode
}

func (*CreateTopicsResponse) Decode

func (*CreateTopicsResponse) Encode

type Data

type Data struct {
	Partition int32
	RecordSet []byte
}

type Decoder

type Decoder interface {
	Decode(d PacketDecoder) error
}

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	Topics  []string
	Timeout int32
}

func (*DeleteTopicsRequest) Decode

func (c *DeleteTopicsRequest) Decode(d PacketDecoder) (err error)

func (*DeleteTopicsRequest) Encode

func (c *DeleteTopicsRequest) Encode(e PacketEncoder) (err error)

func (*DeleteTopicsRequest) Key

func (c *DeleteTopicsRequest) Key() int16

func (*DeleteTopicsRequest) Version

func (c *DeleteTopicsRequest) Version() int16

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	TopicErrorCodes []*TopicErrorCode
}

func (*DeleteTopicsResponse) Decode

func (*DeleteTopicsResponse) Encode

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	GroupIDs []string
}

func (*DescribeGroupsRequest) Decode

func (r *DescribeGroupsRequest) Decode(d PacketDecoder) (err error)

func (*DescribeGroupsRequest) Encode

func (*DescribeGroupsRequest) Key

func (r *DescribeGroupsRequest) Key() int16

func (*DescribeGroupsRequest) Version

func (r *DescribeGroupsRequest) Version() int16

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	Groups []*Group
}

func (*DescribeGroupsResponse) Decode

func (r *DescribeGroupsResponse) Decode(d PacketDecoder) (err error)

func (*DescribeGroupsResponse) Encode

func (*DescribeGroupsResponse) Key

func (r *DescribeGroupsResponse) Key() int16

func (*DescribeGroupsResponse) Version

func (r *DescribeGroupsResponse) Version() int16

type Encoder

type Encoder interface {
	Encode(e PacketEncoder) error
}

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error represents a protocol err. It makes it so the errors can have their error code and description too.

func (Error) Code

func (e Error) Code() int16

func (Error) Error

func (e Error) Error() string

func (Error) String

func (e Error) String() string

func (Error) WithErr

func (e Error) WithErr(err error) Error

type FetchPartition

type FetchPartition struct {
	Partition   int32
	FetchOffset int64
	MaxBytes    int32
}

type FetchPartitionResponse

type FetchPartitionResponse struct {
	Partition     int32
	ErrorCode     int16
	HighWatermark int64
	RecordSet     []byte
}

type FetchRequest

type FetchRequest struct {
	ReplicaID   int32
	MaxWaitTime int32
	MinBytes    int32
	// MaxBytes    int32
	Topics []*FetchTopic
}

func (*FetchRequest) Decode

func (r *FetchRequest) Decode(d PacketDecoder) error

func (*FetchRequest) Encode

func (r *FetchRequest) Encode(e PacketEncoder) error

func (*FetchRequest) Key

func (r *FetchRequest) Key() int16

func (*FetchRequest) Version

func (r *FetchRequest) Version() int16

type FetchResponse

type FetchResponse struct {
	Topic              string
	PartitionResponses []*FetchPartitionResponse
}

type FetchResponses

type FetchResponses struct {
	ThrottleTimeMs int32
	Responses      []*FetchResponse
}

func (*FetchResponses) Decode

func (r *FetchResponses) Decode(d PacketDecoder) error

func (*FetchResponses) Encode

func (r *FetchResponses) Encode(e PacketEncoder) (err error)

type FetchTopic

type FetchTopic struct {
	Topic      string
	Partitions []*FetchPartition
}

type Group

type Group struct {
	ErrorCode    int16
	GroupID      string
	State        string
	ProtocolType string
	Protocol     string
	GroupMembers map[string]*GroupMember
}

func (*Group) Decode

func (r *Group) Decode(d PacketDecoder) (err error)

func (*Group) Encode

func (r *Group) Encode(e PacketEncoder) error

type GroupCoordinatorRequest

type GroupCoordinatorRequest struct {
	GroupID string
}

func (*GroupCoordinatorRequest) Decode

func (r *GroupCoordinatorRequest) Decode(d PacketDecoder) (err error)

func (*GroupCoordinatorRequest) Encode

func (*GroupCoordinatorRequest) Key

func (r *GroupCoordinatorRequest) Key() int16

func (*GroupCoordinatorRequest) Version

func (r *GroupCoordinatorRequest) Version() int16

type GroupCoordinatorResponse

type GroupCoordinatorResponse struct {
	ErrorCode   int16
	Coordinator *Coordinator
}

func (*GroupCoordinatorResponse) Decode

func (r *GroupCoordinatorResponse) Decode(d PacketDecoder) (err error)

func (*GroupCoordinatorResponse) Encode

type GroupMember

type GroupMember struct {
	ClientID              string
	ClientHost            string
	GroupMemberMetadata   []byte
	GroupMemberAssignment []byte
}

func (*GroupMember) Decode

func (r *GroupMember) Decode(d PacketDecoder) (err error)

func (*GroupMember) Encode

func (r *GroupMember) Encode(e PacketEncoder) error

type GroupProtocol

type GroupProtocol struct {
	ProtocolName     string
	ProtocolMetadata []byte
}

type HeartbeatRequest

type HeartbeatRequest struct {
	GroupID           string
	GroupGenerationID int32
	MemberID          string
}

func (*HeartbeatRequest) Decode

func (r *HeartbeatRequest) Decode(d PacketDecoder) (err error)

func (*HeartbeatRequest) Key

func (r *HeartbeatRequest) Key() int16

func (*HeartbeatRequest) Version

func (r *HeartbeatRequest) Version() int16

type HeartbeatResponse

type HeartbeatResponse struct {
	ErrorCode int16
}

func (*HeartbeatResponse) Decode

func (r *HeartbeatResponse) Decode(d PacketDecoder) (err error)

func (*HeartbeatResponse) Encode

func (r *HeartbeatResponse) Encode(e PacketEncoder) error

func (*HeartbeatResponse) Key

func (r *HeartbeatResponse) Key() int16

func (*HeartbeatResponse) Version

func (r *HeartbeatResponse) Version() int16

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID        string
	SessionTimeout int32
	MemberID       string
	ProtocolType   string
	GroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) Decode

func (r *JoinGroupRequest) Decode(d PacketDecoder) error

func (*JoinGroupRequest) Encode

func (r *JoinGroupRequest) Encode(e PacketEncoder) error

func (*JoinGroupRequest) Key

func (r *JoinGroupRequest) Key() int16

func (*JoinGroupRequest) Version

func (r *JoinGroupRequest) Version() int16

type JoinGroupResponse

type JoinGroupResponse struct {
	ErrorCode     int16
	GenerationID  int32
	GroupProtocol string
	LeaderID      string
	MemberID      string
	Members       map[string][]byte
}

func (*JoinGroupResponse) Decode

func (r *JoinGroupResponse) Decode(d PacketDecoder) error

func (*JoinGroupResponse) Encode

func (r *JoinGroupResponse) Encode(e PacketEncoder) error

func (*JoinGroupResponse) Key

func (r *JoinGroupResponse) Key() int16

func (*JoinGroupResponse) Version

func (r *JoinGroupResponse) Version() int16

type LeaderAndISRPartition

type LeaderAndISRPartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type LeaderAndISRRequest

type LeaderAndISRRequest struct {
	ControllerID    int32
	ControllerEpoch int32
	PartitionStates []*PartitionState
	LiveLeaders     []*LiveLeader
}

func (*LeaderAndISRRequest) Decode

func (*LeaderAndISRRequest) Encode

func (*LeaderAndISRRequest) Key

func (r *LeaderAndISRRequest) Key() int16

func (*LeaderAndISRRequest) Version

func (r *LeaderAndISRRequest) Version() int16

type LeaderAndISRResponse

type LeaderAndISRResponse struct {
	ErrorCode  int16
	Partitions []*LeaderAndISRPartition
}

func (*LeaderAndISRResponse) Decode

func (*LeaderAndISRResponse) Encode

func (*LeaderAndISRResponse) Key

func (r *LeaderAndISRResponse) Key() int16

func (*LeaderAndISRResponse) Version

func (r *LeaderAndISRResponse) Version() int16

type LeaveGroupRequest

type LeaveGroupRequest struct {
	GroupID  string
	MemberID string
}

func (*LeaveGroupRequest) Decode

func (r *LeaveGroupRequest) Decode(d PacketDecoder) (err error)

func (*LeaveGroupRequest) Encode

func (r *LeaveGroupRequest) Encode(e PacketEncoder) error

func (*LeaveGroupRequest) Key

func (r *LeaveGroupRequest) Key() int16

func (*LeaveGroupRequest) Version

func (r *LeaveGroupRequest) Version() int16

type LeaveGroupResponse

type LeaveGroupResponse struct {
	ErrorCode int16
}

func (*LeaveGroupResponse) Decode

func (r *LeaveGroupResponse) Decode(d PacketDecoder) (err error)

func (*LeaveGroupResponse) Encode

func (r *LeaveGroupResponse) Encode(e PacketEncoder) error

func (*LeaveGroupResponse) Key

func (r *LeaveGroupResponse) Key() int16

func (*LeaveGroupResponse) Version

func (r *LeaveGroupResponse) Version() int16

type LenEncoder

type LenEncoder struct {
	Length int
	// contains filtered or unexported fields
}

func (*LenEncoder) Pop

func (e *LenEncoder) Pop()

func (*LenEncoder) Push

func (e *LenEncoder) Push(pe PushEncoder)

func (*LenEncoder) PutArrayLength

func (e *LenEncoder) PutArrayLength(in int) error

func (*LenEncoder) PutBool

func (e *LenEncoder) PutBool(in bool)

func (*LenEncoder) PutBytes

func (e *LenEncoder) PutBytes(in []byte) error

func (*LenEncoder) PutInt16

func (e *LenEncoder) PutInt16(in int16)

func (*LenEncoder) PutInt32

func (e *LenEncoder) PutInt32(in int32)

func (*LenEncoder) PutInt32Array

func (e *LenEncoder) PutInt32Array(in []int32) error

func (*LenEncoder) PutInt64

func (e *LenEncoder) PutInt64(in int64)

func (*LenEncoder) PutInt64Array

func (e *LenEncoder) PutInt64Array(in []int64) error

func (*LenEncoder) PutInt8

func (e *LenEncoder) PutInt8(in int8)

func (*LenEncoder) PutRawBytes

func (e *LenEncoder) PutRawBytes(in []byte) error

func (*LenEncoder) PutString

func (e *LenEncoder) PutString(in string) error

func (*LenEncoder) PutStringArray

func (e *LenEncoder) PutStringArray(in []string) error

type ListGroupsRequest

type ListGroupsRequest struct {
}

func (*ListGroupsRequest) Decode

func (r *ListGroupsRequest) Decode(d PacketDecoder) (err error)

func (*ListGroupsRequest) Encode

func (r *ListGroupsRequest) Encode(e PacketEncoder) error

func (*ListGroupsRequest) Key

func (r *ListGroupsRequest) Key() int16

func (*ListGroupsRequest) Version

func (r *ListGroupsRequest) Version() int16

type ListGroupsResponse

type ListGroupsResponse struct {
	ErrorCode int16
	Groups    map[string]string
}

func (*ListGroupsResponse) Decode

func (r *ListGroupsResponse) Decode(d PacketDecoder) (err error)

func (*ListGroupsResponse) Encode

func (r *ListGroupsResponse) Encode(e PacketEncoder) error

func (*ListGroupsResponse) Key

func (r *ListGroupsResponse) Key() int16

func (*ListGroupsResponse) Version

func (r *ListGroupsResponse) Version() int16

type LiveLeader

type LiveLeader struct {
	ID   int32
	Host string
	Port int32
}

type Member

type Member struct {
	MemberID       string
	MemberMetadata []byte
}

type Message

type Message struct {
	Timestamp time.Time
	Key       []byte
	Value     []byte
	MagicByte int8
}

func (*Message) Decode

func (m *Message) Decode(d PacketDecoder) error

func (*Message) Encode

func (m *Message) Encode(e PacketEncoder) error

type MessageSet

type MessageSet struct {
	Offset                  int64
	Size                    int32
	Messages                []*Message
	PartialTrailingMessages bool
}

func (*MessageSet) Decode

func (ms *MessageSet) Decode(d PacketDecoder) error

func (*MessageSet) Encode

func (ms *MessageSet) Encode(e PacketEncoder) error

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

func (*MetadataRequest) Decode

func (r *MetadataRequest) Decode(d PacketDecoder) (err error)

func (*MetadataRequest) Encode

func (r *MetadataRequest) Encode(e PacketEncoder) error

func (*MetadataRequest) Key

func (r *MetadataRequest) Key() int16

func (*MetadataRequest) Version

func (r *MetadataRequest) Version() int16

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	// unsupported: ClusterID *string
	// unsupported: ControllerID string
	TopicMetadata []*TopicMetadata
}

func (*MetadataResponse) Decode

func (r *MetadataResponse) Decode(d PacketDecoder) error

func (*MetadataResponse) Encode

func (r *MetadataResponse) Encode(e PacketEncoder) (err error)

type OffsetResponse

type OffsetResponse struct {
	Topic              string
	PartitionResponses []*PartitionResponse
}

type OffsetsPartition

type OffsetsPartition struct {
	Partition int32
	Timestamp int64 // -1 to receive latest offset, -2 to receive earliest offset
}

type OffsetsRequest

type OffsetsRequest struct {
	ReplicaID     int32
	Topics        []*OffsetsTopic
	MaxNumOffsets int32
}

func (*OffsetsRequest) Decode

func (r *OffsetsRequest) Decode(d PacketDecoder) error

func (*OffsetsRequest) Encode

func (r *OffsetsRequest) Encode(e PacketEncoder) error

func (*OffsetsRequest) Key

func (r *OffsetsRequest) Key() int16

func (*OffsetsRequest) Version

func (r *OffsetsRequest) Version() int16

type OffsetsResponse

type OffsetsResponse struct {
	Responses []*OffsetResponse
}

func (*OffsetsResponse) Decode

func (r *OffsetsResponse) Decode(d PacketDecoder) error

func (*OffsetsResponse) Encode

func (r *OffsetsResponse) Encode(e PacketEncoder) error

type OffsetsTopic

type OffsetsTopic struct {
	Topic      string
	Partitions []*OffsetsPartition
}

type PacketDecoder

type PacketDecoder interface {
	Bool() (bool, error)
	Int8() (int8, error)
	Int16() (int16, error)
	Int32() (int32, error)
	Int64() (int64, error)
	ArrayLength() (int, error)
	Bytes() ([]byte, error)
	String() (string, error)
	Int32Array() ([]int32, error)
	Int64Array() ([]int64, error)
	StringArray() ([]string, error)
	Push(pd PushDecoder) error
	Pop() error
	// contains filtered or unexported methods
}

type PacketEncoder

type PacketEncoder interface {
	PutBool(in bool)
	PutInt8(in int8)
	PutInt16(in int16)
	PutInt32(in int32)
	PutInt64(in int64)
	PutArrayLength(in int) error
	PutRawBytes(in []byte) error
	PutBytes(in []byte) error
	PutString(in string) error
	PutStringArray(in []string) error
	PutInt32Array(in []int32) error
	PutInt64Array(in []int64) error
	Push(pe PushEncoder)
	Pop()
}

type PartitionMetadata

type PartitionMetadata struct {
	PartitionErrorCode int16
	ParititionID       int32
	Leader             int32
	Replicas           []int32
	ISR                []int32
}

type PartitionResponse

type PartitionResponse struct {
	Partition int32
	ErrorCode int16
	// Timestamp int64
	Offsets []int64
}

type PartitionState

type PartitionState struct {
	Topic           string
	Partition       int32
	ControllerEpoch int32
	Leader          int32
	LeaderEpoch     int32
	ISR             []int32
	ZKVersion       int32
	Replicas        []int32
}

type ProducePartitionResponse

type ProducePartitionResponse struct {
	Partition  int32
	ErrorCode  int16
	BaseOffset int64
	Timestamp  int64
}

type ProduceRequest

type ProduceRequest struct {
	Acks      int16
	Timeout   int32
	TopicData []*TopicData
}

func (*ProduceRequest) Decode

func (r *ProduceRequest) Decode(d PacketDecoder) (err error)

func (*ProduceRequest) Encode

func (r *ProduceRequest) Encode(e PacketEncoder) (err error)

func (*ProduceRequest) Key

func (r *ProduceRequest) Key() int16

func (*ProduceRequest) Version

func (r *ProduceRequest) Version() int16

type ProduceResponse

type ProduceResponse struct {
	Topic              string
	PartitionResponses []*ProducePartitionResponse
}

type ProduceResponses

type ProduceResponses struct {
	Responses      []*ProduceResponse
	ThrottleTimeMs int32
}

func (*ProduceResponses) Decode

func (r *ProduceResponses) Decode(d PacketDecoder) error

func (*ProduceResponses) Encode

func (r *ProduceResponses) Encode(e PacketEncoder) error

func (*ProduceResponses) Version

func (r *ProduceResponses) Version() int16

type PushDecoder

type PushDecoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
}

type PushEncoder

type PushEncoder interface {
	SaveOffset(in int)
	ReserveSize() int
	Fill(curOffset int, buf []byte) error
}

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          Body
}

func (*Request) Encode

func (r *Request) Encode(pe PacketEncoder) (err error)

type RequestHeader

type RequestHeader struct {
	// Size of the request
	Size int32
	// ID of the API (e.g. produce, fetch, metadata)
	APIKey int16
	// Version of the API to use
	APIVersion int16
	// User defined ID to correlate requests between server and client
	CorrelationID int32
	// Size of the Client ID
	ClientID string
}

func (*RequestHeader) Decode

func (r *RequestHeader) Decode(d PacketDecoder) error

func (*RequestHeader) Encode

func (r *RequestHeader) Encode(e PacketEncoder)

type Response

type Response struct {
	Size          int32
	CorrelationID int32
	Body          ResponseBody
}

func (*Response) Decode

func (r *Response) Decode(pd PacketDecoder) (err error)

func (*Response) Encode

func (r *Response) Encode(pe PacketEncoder) (err error)

type ResponseBody

type ResponseBody interface {
	Encoder
	Decoder
}

type SizeField

type SizeField struct {
	StartOffset int
}

func (*SizeField) Check

func (s *SizeField) Check(curOffset int, buf []byte) error

func (*SizeField) Fill

func (s *SizeField) Fill(curOffset int, buf []byte) error

func (*SizeField) ReserveSize

func (s *SizeField) ReserveSize() int

func (*SizeField) SaveOffset

func (s *SizeField) SaveOffset(in int)

type StopReplicaPartition

type StopReplicaPartition struct {
	Topic     string
	Partition int32
}

type StopReplicaRequest

type StopReplicaRequest struct {
	ControllerID     int32
	ControllerEpoch  int32
	DeletePartitions bool
	Partitions       []*StopReplicaPartition
}

func (*StopReplicaRequest) Decode

func (r *StopReplicaRequest) Decode(d PacketDecoder) (err error)

func (*StopReplicaRequest) Encode

func (r *StopReplicaRequest) Encode(e PacketEncoder) (err error)

func (*StopReplicaRequest) Key

func (r *StopReplicaRequest) Key() int16

func (*StopReplicaRequest) Version

func (r *StopReplicaRequest) Version() int16

type StopReplicaResponse

type StopReplicaResponse struct {
	ErrorCode  int16
	Partitions []*StopReplicaResponsePartition
}

func (*StopReplicaResponse) Decode

func (r *StopReplicaResponse) Decode(d PacketDecoder) (err error)

func (*StopReplicaResponse) Encode

func (r *StopReplicaResponse) Encode(e PacketEncoder) (err error)

type StopReplicaResponsePartition

type StopReplicaResponsePartition struct {
	Topic     string
	Partition int32
	ErrorCode int16
}

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID          string
	GenerationID     int32
	MemberID         string
	GroupAssignments map[string][]byte
}

func (*SyncGroupRequest) Decode

func (r *SyncGroupRequest) Decode(d PacketDecoder) (err error)

func (*SyncGroupRequest) Encode

func (r *SyncGroupRequest) Encode(e PacketEncoder) error

func (*SyncGroupRequest) Key

func (r *SyncGroupRequest) Key() int16

func (*SyncGroupRequest) Version

func (r *SyncGroupRequest) Version() int16

type SyncGroupResponse

type SyncGroupResponse struct {
	ErrorCode        int16
	MemberAssignment []byte
}

func (*SyncGroupResponse) Decode

func (r *SyncGroupResponse) Decode(d PacketDecoder) (err error)

func (*SyncGroupResponse) Encode

func (r *SyncGroupResponse) Encode(e PacketEncoder) error

func (*SyncGroupResponse) Key

func (r *SyncGroupResponse) Key() int16

func (*SyncGroupResponse) Version

func (r *SyncGroupResponse) Version() int16

type TopicData

type TopicData struct {
	Topic string
	Data  []*Data
}

type TopicErrorCode

type TopicErrorCode struct {
	Topic     string
	ErrorCode int16
}

type TopicMetadata

type TopicMetadata struct {
	TopicErrorCode    int16
	Topic             string
	PartitionMetadata []*PartitionMetadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL