kafka: github.com/optiopay/kafka/proto Index | Files

package proto

import "github.com/optiopay/kafka/proto"

Package proto provides kafka binary protocol implementation.

Index

Package Files

doc.go errors.go messages.go serialization.go snappy.go utils.go

Constants

const (
    KafkaV0 int16 = iota
    KafkaV1
    KafkaV2
    KafkaV3
    KafkaV4
    KafkaV5
)
const (
    ProduceReqKind          = 0
    FetchReqKind            = 1
    OffsetReqKind           = 2
    MetadataReqKind         = 3
    OffsetCommitReqKind     = 8
    OffsetFetchReqKind      = 9
    ConsumerMetadataReqKind = 10
    APIVersionsReqKind      = 18
    CreateTopicsReqKind     = 19
)
const (

    // receive the latest offset (i.e. the offset of the next coming message)
    OffsetReqTimeLatest = -1

    // receive the earliest available offset. Note that because offsets are
    // pulled in descending order, asking for the earliest offset will always
    // return you a single element.
    OffsetReqTimeEarliest = -2

    // Server will not send any response.
    RequiredAcksNone = 0

    // Server will block until the message is committed by all in sync replicas
    // before sending a response.
    RequiredAcksAll = -1

    // Server will wait the data is written to the local log before sending a
    // response.
    RequiredAcksLocal = 1
)
const (
    CorrelationTypeGroup       int8 = 0
    CorrelationTypeTransaction      = 1
)

Variables

var (
    ErrUnknown                                 = &KafkaError{-1, "unknown error"}
    ErrOffsetOutOfRange                        = &KafkaError{1, "offset out of range"}
    ErrInvalidMessage                          = &KafkaError{2, "invalid message"}
    ErrUnknownTopicOrPartition                 = &KafkaError{3, "unknown topic or partition"}
    ErrInvalidMessageSize                      = &KafkaError{4, "invalid message size"}
    ErrLeaderNotAvailable                      = &KafkaError{5, "leader not available"}
    ErrNotLeaderForPartition                   = &KafkaError{6, "not leader for partition"}
    ErrRequestTimeout                          = &KafkaError{7, "request timeed out"}
    ErrBrokerNotAvailable                      = &KafkaError{8, "broker not available"}
    ErrReplicaNotAvailable                     = &KafkaError{9, "replica not available"}
    ErrMessageSizeTooLarge                     = &KafkaError{10, "message size too large"}
    ErrScaleControllerEpoch                    = &KafkaError{11, "scale controller epoch"}
    ErrOffsetMetadataTooLarge                  = &KafkaError{12, "offset metadata too large"}
    ErrNetwork                                 = &KafkaError{13, "server disconnected before response was received"}
    ErrOffsetLoadInProgress                    = &KafkaError{14, "offsets load in progress"}
    ErrNoCoordinator                           = &KafkaError{15, "consumer coordinator not available"}
    ErrNotCoordinator                          = &KafkaError{16, "not coordinator for consumer"}
    ErrInvalidTopic                            = &KafkaError{17, "operation on an invalid topic"}
    ErrRecordListTooLarge                      = &KafkaError{18, "message batch larger than the configured segment size"}
    ErrNotEnoughReplicas                       = &KafkaError{19, "not enough in-sync replicas"}
    ErrNotEnoughReplicasAfterAppend            = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"}
    ErrInvalidRequiredAcks                     = &KafkaError{21, "invalid value for required acks"}
    ErrIllegalGeneration                       = &KafkaError{22, "consumer generation id is not valid"}
    ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"}
    ErrUnknownParititonAssignmentStrategy      = &KafkaError{24, "partition assignment strategy is unknown to the broker"}
    ErrUnknownConsumerID                       = &KafkaError{25, "coordinator is not aware of this consumer"}
    ErrInvalidSessionTimeout                   = &KafkaError{26, "invalid session timeout"}
    ErrRebalanceInProgress                     = &KafkaError{27, "group is rebalancing, so a rejoin is needed"}
    ErrInvalidCommitOffsetSize                 = &KafkaError{28, "offset data size is not valid"}
    ErrTopicAuthorizationFailed                = &KafkaError{29, "topic authorization failed"}
    ErrGroupAuthorizationFailed                = &KafkaError{30, "group authorization failed"}
    ErrClusterAuthorizationFailed              = &KafkaError{31, "cluster authorization failed"}
    ErrInvalidTimeStamp                        = &KafkaError{32, "timestamp of the message is out of acceptable range"}
    ErrUnsupportedSaslMechanism                = &KafkaError{33, "The broker does not support the requested SASL mechanism."}
    ErrIllegalSaslState                        = &KafkaError{34, "Request is not valid given the current SASL state."}
    ErrUnsupportedVersion                      = &KafkaError{35, "The version of API is not supported."}
    ErrTopicAlreadyExists                      = &KafkaError{36, "Topic with this name already exists."}
    ErrInvalidPartitions                       = &KafkaError{37, "Number of partitions is invalid."}
    ErrInvalidReplicationFactor                = &KafkaError{38, "Replication-factor is invalid."}
    ErrInvalidReplicaAssignment                = &KafkaError{39, "Replica assignment is invalid."}
    ErrInvalidConfig                           = &KafkaError{40, "Configuration is invalid."}
    ErrNotController                           = &KafkaError{41, "This is not the correct controller for this cluster."}
    ErrInvalidRequest                          = &KafkaError{42, "" /* 173 byte string literal not displayed */}
    ErrUnsupportedForMessageFormat             = &KafkaError{43, "The message format version on the broker does not support the request."}
    ErrPolicyViolation                         = &KafkaError{44, "Request parameters do not satisfy the configured policy."}
    ErrOutOfOrderSequenceNumber                = &KafkaError{45, "The broker received an out of order sequence number"}
    ErrDuplicateSequenceNumber                 = &KafkaError{46, "The broker received a duplicate sequence number"}
    ErrInvalidProducerEpoch                    = &KafkaError{47, "" /* 178 byte string literal not displayed */}
    ErrInvalidTxnState                         = &KafkaError{48, "The producer attempted a transactional operation in an invalid state"}
    ErrInvalidProducerIdMapping                = &KafkaError{49, "The producer attempted to use a producer id which is not currently assigned to its transactional id"}
    ErrInvalidTransactionTimeout               = &KafkaError{50, "The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)."}
    ErrConcurrentTransactions                  = &KafkaError{51, "The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"}
    ErrTransactionCoordinatorFenced            = &KafkaError{52, "Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"}
    ErrTransactionalIdAuthorizationFailed      = &KafkaError{53, "Transactional Id authorization failed"}
    ErrSecurityDisabled                        = &KafkaError{54, "Security features are disabled."}
    ErrOperationNotAttempted                   = &KafkaError{55, "" /* 186 byte string literal not displayed */}
    ErrKafkaStorageError                       = &KafkaError{56, "Disk error when trying to access log file on the disk."}
    ErrLogDirNotFound                          = &KafkaError{57, "The user-specified log directory is not found in the broker config."}
    ErrSaslAuthenticationFailed                = &KafkaError{58, "SASL Authentication failed."}
    ErrUnknownProducerId                       = &KafkaError{59, "" /* 411 byte string literal not displayed */}
    ErrReassignmentInProgress                  = &KafkaError{60, "A partition reassignment is in progress"}
    ErrDelegationTokenAuthDisabled             = &KafkaError{61, "Delegation Token feature is not enabled."}
    ErrDelegationTokenNotFound                 = &KafkaError{62, "Delegation Token is not found on server."}
    ErrDelegationTokenOwnerMismatch            = &KafkaError{63, "Specified Principal is not valid Owner/Renewer."}
    ErrDelegationTokenRequestNotAllowed        = &KafkaError{64, "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels."}
    ErrDelegationTokenAuthorizationFailed      = &KafkaError{65, "Delegation Token authorization failed."}
    ErrDelegationTokenExpired                  = &KafkaError{66, "Delegation Token is expired."}
    ErrInvalidPrincipalType                    = &KafkaError{67, "Supplied principalType is not supported"}
    ErrNonEmptyGroup                           = &KafkaError{68, "The group The group is not empty is not empty"}
    ErrGroupIdNotFound                         = &KafkaError{69, "The group id The group id does not exist was not found"}
    ErrFetchSessionIdNotFound                  = &KafkaError{70, "The fetch session ID was not found"}
    ErrInvalidFetchSessionEpoch                = &KafkaError{71, "The fetch session epoch is invalid"}
)
var ErrInvalidArrayLen = errors.New("invalid array length")
var ErrNotEnoughData = errors.New("not enough data")
var SupportedByDriver = map[int16]SupportedVersion{
    ProduceReqKind:          SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2},
    FetchReqKind:            SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5},
    OffsetReqKind:           SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2},
    MetadataReqKind:         SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5},
    OffsetCommitReqKind:     SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3},
    OffsetFetchReqKind:      SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3},
    ConsumerMetadataReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1},
    APIVersionsReqKind:      SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1},
}

func ComputeCrc Uses

func ComputeCrc(m *Message, compression Compression) uint32

ComputeCrc returns crc32 hash for given message content.

func ConfigureParser Uses

func ConfigureParser(c ParserConfig) error

ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.

func NewDecoder Uses

func NewDecoder(r io.Reader) *decoder

func NewEncoder Uses

func NewEncoder(w io.Writer) *encoder

func ReadReq Uses

func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)

ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.

func ReadResp Uses

func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)

ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.

func SetCorrelationID Uses

func SetCorrelationID(header *RequestHeader, correlationID int32)

func SetVersion Uses

func SetVersion(header *RequestHeader, version int16)

type APIVersionsReq Uses

type APIVersionsReq struct {
    RequestHeader
}

func ReadAPIVersionsReq Uses

func ReadAPIVersionsReq(r io.Reader) (*APIVersionsReq, error)

func (*APIVersionsReq) Bytes Uses

func (r *APIVersionsReq) Bytes() ([]byte, error)

func (APIVersionsReq) Kind Uses

func (r APIVersionsReq) Kind() int16

func (*APIVersionsReq) WriteTo Uses

func (r *APIVersionsReq) WriteTo(w io.Writer) (int64, error)

type APIVersionsResp Uses

type APIVersionsResp struct {
    Version       int16
    CorrelationID int32
    APIVersions   []SupportedVersion
    ThrottleTime  time.Duration
}

func ReadAPIVersionsResp Uses

func ReadAPIVersionsResp(r io.Reader) (*APIVersionsResp, error)

func ReadVersionedAPIVersionsResp Uses

func ReadVersionedAPIVersionsResp(r io.Reader, version int16) (*APIVersionsResp, error)

func (*APIVersionsResp) Bytes Uses

func (r *APIVersionsResp) Bytes() ([]byte, error)

type Compression Uses

type Compression int8
const (
    CompressionNone   Compression = 0
    CompressionGzip   Compression = 1
    CompressionSnappy Compression = 2
)

type ConfigEntry Uses

type ConfigEntry struct {
    ConfigName  string
    ConfigValue string
}

type ConsumerMetadataReq Uses

type ConsumerMetadataReq struct {
    RequestHeader
    ConsumerGroup   string
    CoordinatorType int8 // >= KafkaV1
}

func ReadConsumerMetadataReq Uses

func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)

func (*ConsumerMetadataReq) Bytes Uses

func (r *ConsumerMetadataReq) Bytes() ([]byte, error)

func (ConsumerMetadataReq) Kind Uses

func (r ConsumerMetadataReq) Kind() int16

func (*ConsumerMetadataReq) WriteTo Uses

func (r *ConsumerMetadataReq) WriteTo(w io.Writer) (int64, error)

type ConsumerMetadataResp Uses

type ConsumerMetadataResp struct {
    Version         int16
    CorrelationID   int32
    ThrottleTime    time.Duration // >= KafkaV1
    Err             error
    ErrMsg          string // >= KafkaV1
    CoordinatorID   int32
    CoordinatorHost string
    CoordinatorPort int32
}

func ReadConsumerMetadataResp Uses

func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)

func ReadVersionedConsumerMetadataResp Uses

func ReadVersionedConsumerMetadataResp(r io.Reader, version int16) (*ConsumerMetadataResp, error)

func (*ConsumerMetadataResp) Bytes Uses

func (r *ConsumerMetadataResp) Bytes() ([]byte, error)

type CreateTopicsReq Uses

type CreateTopicsReq struct {
    RequestHeader
    CreateTopicsRequests []TopicInfo
    Timeout              time.Duration
    ValidateOnly         bool
}

func ReadCreateTopicsReq Uses

func ReadCreateTopicsReq(r io.Reader) (*CreateTopicsReq, error)

func (*CreateTopicsReq) Bytes Uses

func (r *CreateTopicsReq) Bytes() ([]byte, error)

func (CreateTopicsReq) Kind Uses

func (r CreateTopicsReq) Kind() int16

func (*CreateTopicsReq) WriteTo Uses

func (r *CreateTopicsReq) WriteTo(w io.Writer) (int64, error)

type CreateTopicsResp Uses

type CreateTopicsResp struct {
    Version       int16
    CorrelationID int32
    TopicErrors   []TopicError
    ThrottleTime  time.Duration // >= KafkaV2
}

func ReadCreateTopicsResp Uses

func ReadCreateTopicsResp(r io.Reader) (*CreateTopicsResp, error)

func ReadVersionedCreateTopicsResp Uses

func ReadVersionedCreateTopicsResp(r io.Reader, version int16) (*CreateTopicsResp, error)

func (*CreateTopicsResp) Bytes Uses

func (r *CreateTopicsResp) Bytes() ([]byte, error)

type FetchReq Uses

type FetchReq struct {
    RequestHeader
    ReplicaID      int32
    MaxWaitTime    time.Duration
    MinBytes       int32
    MaxBytes       int32 // >= KafkaV3
    IsolationLevel int8  // >= KafkaV4

    Topics []FetchReqTopic
}

func ReadFetchReq Uses

func ReadFetchReq(r io.Reader) (*FetchReq, error)

func (*FetchReq) Bytes Uses

func (r *FetchReq) Bytes() ([]byte, error)

func (FetchReq) Kind Uses

func (r FetchReq) Kind() int16

func (*FetchReq) WriteTo Uses

func (r *FetchReq) WriteTo(w io.Writer) (int64, error)

type FetchReqPartition Uses

type FetchReqPartition struct {
    ID             int32
    FetchOffset    int64
    LogStartOffset int64 // >= KafkaV5
    MaxBytes       int32
}

type FetchReqTopic Uses

type FetchReqTopic struct {
    Name       string
    Partitions []FetchReqPartition
}

type FetchResp Uses

type FetchResp struct {
    Version       int16
    CorrelationID int32
    ThrottleTime  time.Duration
    Topics        []FetchRespTopic
}

func ReadFetchResp Uses

func ReadFetchResp(r io.Reader) (*FetchResp, error)

func ReadVersionedFetchResp Uses

func ReadVersionedFetchResp(r io.Reader, version int16) (*FetchResp, error)

func (*FetchResp) Bytes Uses

func (r *FetchResp) Bytes() ([]byte, error)

type FetchRespAbortedTransaction Uses

type FetchRespAbortedTransaction struct {
    ProducerID  int64
    FirstOffset int64
}

type FetchRespPartition Uses

type FetchRespPartition struct {
    ID                  int32
    Err                 error
    TipOffset           int64
    LastStableOffset    int64
    LogStartOffset      int64
    AbortedTransactions []FetchRespAbortedTransaction
    Messages            []*Message
    MessageVersion      MessageVersion
    RecordBatches       []*RecordBatch
}

type FetchRespTopic Uses

type FetchRespTopic struct {
    Name       string
    Partitions []FetchRespPartition
}

type KafkaError Uses

type KafkaError struct {
    // contains filtered or unexported fields
}

func (*KafkaError) Errno Uses

func (err *KafkaError) Errno() int

func (*KafkaError) Error Uses

func (err *KafkaError) Error() string

type Message Uses

type Message struct {
    Key       []byte
    Value     []byte
    Offset    int64  // set when fetching and after successful producing
    Crc       uint32 // set when fetching, ignored when producing
    Topic     string // set when fetching, ignored when producing
    Partition int32  // set when fetching, ignored when producing
    TipOffset int64  // set when fetching, ignored when processing
}

Message represents single entity of message set.

type MessageVersion Uses

type MessageVersion int8

Message version define which format of messages is using in this particular Produce/Response MessageV0 and MessageV1 indicate usage of MessageSet MessageV3 indicate usage of RecordBatch See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

const MessageV0 MessageVersion = 0
const MessageV1 MessageVersion = 1
const MessageV2 MessageVersion = 2

type MetadataReq Uses

type MetadataReq struct {
    RequestHeader
    Topics                 []string
    AllowAutoTopicCreation bool // >= KafkaV4 only
}

func ReadMetadataReq Uses

func ReadMetadataReq(r io.Reader) (*MetadataReq, error)

func (*MetadataReq) Bytes Uses

func (r *MetadataReq) Bytes() ([]byte, error)

func (MetadataReq) Kind Uses

func (r MetadataReq) Kind() int16

func (*MetadataReq) WriteTo Uses

func (r *MetadataReq) WriteTo(w io.Writer) (int64, error)

type MetadataResp Uses

type MetadataResp struct {
    Version       int16
    CorrelationID int32
    ThrottleTime  time.Duration // >= KafkaV3
    Brokers       []MetadataRespBroker
    ClusterID     string // >= KafkaV2
    ControllerID  int32  // >= KafkaV1
    Topics        []MetadataRespTopic
}

func ReadMetadataResp Uses

func ReadMetadataResp(r io.Reader) (*MetadataResp, error)

func ReadVersionedMetadataResp Uses

func ReadVersionedMetadataResp(r io.Reader, version int16) (*MetadataResp, error)

func (*MetadataResp) Bytes Uses

func (r *MetadataResp) Bytes() ([]byte, error)

type MetadataRespBroker Uses

type MetadataRespBroker struct {
    NodeID int32
    Host   string
    Port   int32
    Rack   string // >= KafkaV1
}

type MetadataRespPartition Uses

type MetadataRespPartition struct {
    Err             error
    ID              int32
    Leader          int32
    Replicas        []int32
    Isrs            []int32
    OfflineReplicas []int32
}

type MetadataRespTopic Uses

type MetadataRespTopic struct {
    Name       string
    Err        error
    IsInternal bool // >= KafkaV1
    Partitions []MetadataRespPartition
}

type OffsetCommitReq Uses

type OffsetCommitReq struct {
    RequestHeader
    ConsumerGroup     string
    GroupGenerationID int32  // >= KafkaV1 only
    MemberID          string // >= KafkaV1 only
    RetentionTime     int64  // >= KafkaV2 only
    Topics            []OffsetCommitReqTopic
}

func ReadOffsetCommitReq Uses

func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)

func (*OffsetCommitReq) Bytes Uses

func (r *OffsetCommitReq) Bytes() ([]byte, error)

func (OffsetCommitReq) Kind Uses

func (r OffsetCommitReq) Kind() int16

func (*OffsetCommitReq) WriteTo Uses

func (r *OffsetCommitReq) WriteTo(w io.Writer) (int64, error)

type OffsetCommitReqPartition Uses

type OffsetCommitReqPartition struct {
    ID        int32
    Offset    int64
    TimeStamp time.Time // == KafkaV1 only
    Metadata  string
}

type OffsetCommitReqTopic Uses

type OffsetCommitReqTopic struct {
    Name       string
    Partitions []OffsetCommitReqPartition
}

type OffsetCommitResp Uses

type OffsetCommitResp struct {
    Version       int16
    CorrelationID int32
    ThrottleTime  time.Duration // >= KafkaV3 only
    Topics        []OffsetCommitRespTopic
}

func ReadOffsetCommitResp Uses

func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)

func ReadVersionedOffsetCommitResp Uses

func ReadVersionedOffsetCommitResp(r io.Reader, version int16) (*OffsetCommitResp, error)

func (*OffsetCommitResp) Bytes Uses

func (r *OffsetCommitResp) Bytes() ([]byte, error)

type OffsetCommitRespPartition Uses

type OffsetCommitRespPartition struct {
    ID  int32
    Err error
}

type OffsetCommitRespTopic Uses

type OffsetCommitRespTopic struct {
    Name       string
    Partitions []OffsetCommitRespPartition
}

type OffsetFetchReq Uses

type OffsetFetchReq struct {
    RequestHeader
    ConsumerGroup string
    Topics        []OffsetFetchReqTopic
}

func ReadOffsetFetchReq Uses

func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)

func (*OffsetFetchReq) Bytes Uses

func (r *OffsetFetchReq) Bytes() ([]byte, error)

func (OffsetFetchReq) Kind Uses

func (r OffsetFetchReq) Kind() int16

func (*OffsetFetchReq) WriteTo Uses

func (r *OffsetFetchReq) WriteTo(w io.Writer) (int64, error)

type OffsetFetchReqTopic Uses

type OffsetFetchReqTopic struct {
    Name       string
    Partitions []int32
}

type OffsetFetchResp Uses

type OffsetFetchResp struct {
    Version       int16
    CorrelationID int32
    ThrottleTime  time.Duration // >= KafkaV3
    Topics        []OffsetFetchRespTopic
    Err           error // >= KafkaV2
}

func ReadOffsetFetchResp Uses

func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)

func ReadVersionedOffsetFetchResp Uses

func ReadVersionedOffsetFetchResp(r io.Reader, version int16) (*OffsetFetchResp, error)

func (*OffsetFetchResp) Bytes Uses

func (r *OffsetFetchResp) Bytes() ([]byte, error)

type OffsetFetchRespPartition Uses

type OffsetFetchRespPartition struct {
    ID       int32
    Offset   int64
    Metadata string
    Err      error
}

type OffsetFetchRespTopic Uses

type OffsetFetchRespTopic struct {
    Name       string
    Partitions []OffsetFetchRespPartition
}

type OffsetReq Uses

type OffsetReq struct {
    RequestHeader
    ReplicaID      int32
    IsolationLevel int8
    Topics         []OffsetReqTopic
}

func ReadOffsetReq Uses

func ReadOffsetReq(r io.Reader) (*OffsetReq, error)

func (*OffsetReq) Bytes Uses

func (r *OffsetReq) Bytes() ([]byte, error)

func (OffsetReq) Kind Uses

func (r OffsetReq) Kind() int16

func (*OffsetReq) WriteTo Uses

func (r *OffsetReq) WriteTo(w io.Writer) (int64, error)

type OffsetReqPartition Uses

type OffsetReqPartition struct {
    ID         int32
    TimeMs     int64 // cannot be time.Time because of negative values
    MaxOffsets int32 // == KafkaV0 only
}

type OffsetReqTopic Uses

type OffsetReqTopic struct {
    Name       string
    Partitions []OffsetReqPartition
}

type OffsetResp Uses

type OffsetResp struct {
    Version       int16
    CorrelationID int32
    ThrottleTime  time.Duration
    Topics        []OffsetRespTopic
}

func ReadOffsetResp Uses

func ReadOffsetResp(r io.Reader) (*OffsetResp, error)

func ReadVersionedOffsetResp Uses

func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error)

func (*OffsetResp) Bytes Uses

func (r *OffsetResp) Bytes() ([]byte, error)

type OffsetRespPartition Uses

type OffsetRespPartition struct {
    ID        int32
    Err       error
    TimeStamp time.Time // >= KafkaV1 only
    Offsets   []int64   // used in KafkaV0
}

type OffsetRespTopic Uses

type OffsetRespTopic struct {
    Name       string
    Partitions []OffsetRespPartition
}

type ParserConfig Uses

type ParserConfig struct {
    // SimplifiedMessageSetParsing enables a simplified version of the
    // MessageSet parser which will not split MessageSet into slices of
    // Message structures. Instead, the entire MessageSet will be read
    // over. This mode improves parsing speed due to reduce memory read at
    // the cost of not providing access to the message payload after
    // parsing.
    SimplifiedMessageSetParsing bool
}

ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig

type ProduceReq Uses

type ProduceReq struct {
    RequestHeader
    Compression     Compression // only used when sending ProduceReqs
    TransactionalID string
    RequiredAcks    int16
    Timeout         time.Duration
    Topics          []ProduceReqTopic
}

func ReadProduceReq Uses

func ReadProduceReq(r io.Reader) (*ProduceReq, error)

func (*ProduceReq) Bytes Uses

func (r *ProduceReq) Bytes() ([]byte, error)

func (ProduceReq) Kind Uses

func (r ProduceReq) Kind() int16

func (*ProduceReq) WriteTo Uses

func (r *ProduceReq) WriteTo(w io.Writer) (int64, error)

type ProduceReqPartition Uses

type ProduceReqPartition struct {
    ID       int32
    Messages []*Message
}

type ProduceReqTopic Uses

type ProduceReqTopic struct {
    Name       string
    Partitions []ProduceReqPartition
}

type ProduceResp Uses

type ProduceResp struct {
    Version       int16
    CorrelationID int32
    Topics        []ProduceRespTopic
    ThrottleTime  time.Duration
}

func ReadProduceResp Uses

func ReadProduceResp(r io.Reader) (*ProduceResp, error)

func ReadVersionedProduceResp Uses

func ReadVersionedProduceResp(r io.Reader, version int16) (*ProduceResp, error)

func (*ProduceResp) Bytes Uses

func (r *ProduceResp) Bytes() ([]byte, error)

type ProduceRespPartition Uses

type ProduceRespPartition struct {
    ID            int32
    Err           error
    Offset        int64
    LogAppendTime int64
}

type ProduceRespTopic Uses

type ProduceRespTopic struct {
    Name       string
    Partitions []ProduceRespPartition
}

type Record Uses

type Record struct {
    Length         int64
    Attributes     int8
    TimestampDelta int64
    OffsetDelta    int64
    Key            []byte
    Value          []byte
    Headers        []RecordHeader
}

type RecordBatch Uses

type RecordBatch struct {
    FirstOffset          int64
    Length               int32
    PartitionLeaderEpoch int32
    Magic                int8
    CRC                  int32
    Attributes           int16
    LastOffsetDelta      int32
    FirstTimestamp       int64
    MaxTimestamp         int64
    ProducerId           int64
    ProducerEpoch        int16
    FirstSequence        int32
    Records              []*Record
}

func (*RecordBatch) Compression Uses

func (rb *RecordBatch) Compression() Compression

type RecordHeader Uses

type RecordHeader struct {
    Key   string
    Value []byte
}

type ReplicaAssignment Uses

type ReplicaAssignment struct {
    Partition int32
    Replicas  []int32
}

type Request Uses

type Request interface {
    Kind() int16
    GetHeader() *RequestHeader
    GetVersion() int16
    GetCorrelationID() int32
    GetClientID() string
    SetClientID(cliendID string)
    io.WriterTo
    Bytes() ([]byte, error)
}

type RequestHeader Uses

type RequestHeader struct {
    ClientID string
    // contains filtered or unexported fields
}

func (*RequestHeader) GetClientID Uses

func (h *RequestHeader) GetClientID() string

func (*RequestHeader) GetCorrelationID Uses

func (h *RequestHeader) GetCorrelationID() int32

func (*RequestHeader) GetHeader Uses

func (h *RequestHeader) GetHeader() *RequestHeader

func (*RequestHeader) GetVersion Uses

func (h *RequestHeader) GetVersion() int16

func (*RequestHeader) SetClientID Uses

func (h *RequestHeader) SetClientID(cliendID string)

type SupportedVersion Uses

type SupportedVersion struct {
    APIKey     int16
    MinVersion int16
    MaxVersion int16
}

type TopicError Uses

type TopicError struct {
    Topic        string
    ErrorCode    int16
    ErrorMessage string // >= KafkaV1
    Err          error
}

type TopicInfo Uses

type TopicInfo struct {
    Topic              string
    NumPartitions      int32
    ReplicationFactor  int16
    ReplicaAssignments []ReplicaAssignment
    ConfigEntries      []ConfigEntry
}

Package proto imports 12 packages (graph) and is imported by 42 packages. Updated 2019-05-23. Refresh now. Tools for package owners.