client

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 13 Imported by: 0

README

kaproxy client

install

go get github.com/bitleak/kaproxy

Usage

import github.com/bitleak/kaproxy/client

c := client.NewKaproxyClient(host, port, token)

c = c.WithTimeout(5 * time.Second) //optional, return a pointer to kaproxyClient which http client's timeout is specified.  

Producer
//produce a message, key and value is []byte
msg := client.Message{key, value}

resp, err := c.Produce(topis, msg)
if err !=nil {
    panic(err)
}

// produce a message, message will be hashed to the partition according to the key
resp, err = c.ProduceWithHash(topic, msg)

// produce to specified partition
resp, err = c.ProduceWithPartition(topic, partition, msg)
Consumer
//AtMostOnce semantic
blockingTimeout := 3 * time.Second
resp, err := c.Consume(group, topic, blockingTimeout)

//AtLeastOnce semantic. 
//After the user receives the message, if kaproxy does not receive the ack, when the user sends a consume request after ttr, the message will be replied to the user again.
//This usage for atLesastOnce group only

ttr := 30 * time.Second
resp, err := c.Consume(group, topic, blockingTimeout, ttr)
if err != nil {
    panic(err)
}

err = c.ACK(group, topic, resp)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoMessage = &APIError{requestErr, "[204]no message in brokers", ""}

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Type              ErrType
	Reason, RequestID string
}

func (*APIError) Error

func (e *APIError) Error() string

type ConsumeResp

type ConsumeResp struct {
	Partition int    `json:"partition"`
	Offset    int64  `json:"offset"`
	Key       []byte `json:"key"`
	Value     []byte `json:"value"`
}

func (*ConsumeResp) UnmarshalJSON

func (c *ConsumeResp) UnmarshalJSON(data []byte) error

type ErrType

type ErrType string

type KaproxyClient

type KaproxyClient struct {
	// contains filtered or unexported fields
}

func NewKaproxyClient

func NewKaproxyClient(host string, port int, token string) *KaproxyClient

func (*KaproxyClient) ACK

func (c *KaproxyClient) ACK(group, topic string, message *ConsumeResp) *APIError

func (*KaproxyClient) BatchProduce

func (c *KaproxyClient) BatchProduce(topic string, messages []Message) (int, *APIError)

func (*KaproxyClient) Consume

func (c *KaproxyClient) Consume(group, topic string, durationArg ...time.Duration) (*ConsumeResp, *APIError)

func (*KaproxyClient) Produce

func (c *KaproxyClient) Produce(topic string, message Message) (*ProduceResp, *APIError)

func (*KaproxyClient) ProduceWithHash

func (c *KaproxyClient) ProduceWithHash(topic string, message Message) (*ProduceResp, *APIError)

func (*KaproxyClient) ProduceWithPartition

func (c *KaproxyClient) ProduceWithPartition(topic string, partition int, message Message) (*ProduceResp, *APIError)

func (*KaproxyClient) WithHttpClient

func (c *KaproxyClient) WithHttpClient(httpCli *http.Client, blockingHttpCli *http.Client) *KaproxyClient

WithHttpClient was used to allow the user to custom the http client

func (*KaproxyClient) WithTimeout

func (c *KaproxyClient) WithTimeout(timeout time.Duration) *KaproxyClient

WithTimeout return a pointer to kaproxyClient which http client's timeout is specified

func (*KaproxyClient) WithoutReplicate

func (c *KaproxyClient) WithoutReplicate() *KaproxyClient

For airbus only

type Message

type Message struct {
	Key   []byte
	Value []byte
}

type ProduceResp

type ProduceResp struct {
	Partition int   `json:"partition"`
	Offset    int64 `json:"offset"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL