proxy

package module
v0.0.0-...-fd3e866 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2018 License: BSD-3-Clause Imports: 13 Imported by: 1

README

kafka-proxy · CircleCI Status GitHub license PRs Welcome

kafka-proxy - simple wrapper around confluent kafka rest proxy in golang

Currently wraps producing Avro encoded messages and consuming many kinds of message.

Documentation

Index

Constants

View Source
const (
	//JSON formated consumer
	JSON = Format("json")
	//Binary formated consumer
	Binary = Format("binary")
	//Avro formated consumer
	Avro = Format("avro")

	//Smallest is the offset that is oldest
	Smallest = "smallest"
	//Largest is the offset that is newest
	Largest = "largest"
)
View Source
const (
	//TestURLEnvVar is the url to run functional tests against
	TestURLEnvVar = "KAFKA_PROXY_TEST_URL"
	//TestTopicEnvVar is the topic to run functional tests against
	TestTopicEnvVar = "KAFKA_PROXY_TEST_TOPIC"
	//TestRequiredEnvVar if set to true will make tests fail
	TestRequiredEnvVar = "KAFKA_PROXY_TEST_REQUIRED"
)

Variables

This section is empty.

Functions

func ConsumeEndpointRequest

func ConsumeEndpointRequest(consumer *ConsumerEndpoint, topic string, format Format) (*http.Request, error)

ConsumeEndpointRequest will create a request for consumerURL/topics/<topic>

func ConsumeRequest

func ConsumeRequest(baseURL string, topic string, partition int32, offset int64, count int, format Format) (*http.Request, error)

ConsumeRequest builds the request for the /topics/<topic>/partitions/<partition>/messages route

func CreateConsumerRequest

func CreateConsumerRequest(baseURL string, group string, request *ConsumerRequest) (*http.Request, error)

CreateConsumerRequest will create a request for the /consumers/<group> endpoint

func DeleteConsumerEndpoint

func DeleteConsumerEndpoint(client HTTPClient, endpoint *ConsumerEndpoint) error

DeleteConsumerEndpoint will delete a previously created consumer endpoint

func GetFunctionalTestTopic

func GetFunctionalTestTopic(t *testing.T) string

GetFunctionalTestTopic returns the configured test topic

func GetFunctionalTestURL

func GetFunctionalTestURL(t *testing.T) string

GetFunctionalTestURL skips, fails, or returns the config variable passed in

func HandleFunctionalTestError

func HandleFunctionalTestError(t testing.TB, err error)

HandleFunctionalTestError will skip or fail based on whether SR_TEST_REQUIRED is set

func IsFunctionalTestRequired

func IsFunctionalTestRequired() bool

IsFunctionalTestRequired returns whether SR_TEST_REQUIRED is set

func ProduceRequest

func ProduceRequest(baseURL string, topic string, format Format, message *ProducerMessage) (*http.Request, error)

ProduceRequest creates the request for POST /topics/<topic> endpoint

Types

type ConsumerEndpoint

type ConsumerEndpoint struct {
	InstanceID string `json:"instance_id"`
	BaseURI    string `json:"base_uri"`
}

ConsumerEndpoint is returned on a create

func CreateConsumer

func CreateConsumer(client HTTPClient, baseURL string, group string, request *ConsumerRequest) (resp *ConsumerEndpoint, err error)

CreateConsumer will create a consumer on the proxy for later use

type ConsumerRequest

type ConsumerRequest struct {
	Format     Format `json:"format"`
	Offset     Offset `json:"auto.offset.reset"`
	AutoCommit string `json:"auto.commit.enable"`
	Name       string `json:"name,omitempty"`
}

ConsumerRequest is the meta information needed to create a consumer

func NewConsumerRequest

func NewConsumerRequest(format Format, offset Offset) *ConsumerRequest

NewConsumerRequest creates a consumer request for the provided format and offset

type Format

type Format string

Format is one of json, binary or avro

type HTTPClient

type HTTPClient interface {
	Do(request *http.Request) (*http.Response, error)
}

HTTPClient is any client that can do a http request

type Message

type Message struct {
	Key       json.RawMessage `json:"key"`
	Value     json.RawMessage `json:"value"`
	Partition int32           `json:"partition"`
	Offset    int64           `json:"offset"`
}

Message is a single kafka message

func Consume

func Consume(client HTTPClient, baseURL string, topic string, partition int32, offset int64, count int, format Format) ([]*Message, error)

Consume takes a kafka location and consumes messages for it

func ConsumeEndpoint

func ConsumeEndpoint(client HTTPClient, endpoint *ConsumerEndpoint, topic string, format Format) ([]*Message, error)

ConsumeEndpoint will get the next messages off of the previously created consumer endpoint. Format must match the previously created format

func WaitFor

func WaitFor(client HTTPClient, endpoint *ConsumerEndpoint, topic string, format Format, count int, timeout time.Duration) ([]*Message, error)

WaitFor will return all the messages up to the timeout or count

type Offset

type Offset string

Offset is either smallest or largest

type ProducerMessage

type ProducerMessage struct {
	KeySchema     string            `json:"key_schema,omitempty"`
	KeySchemaID   int               `json:"key_schema_id,omitempty"`
	ValueSchema   string            `json:"value_schema,omitempty"` //either value schema or value schema id must be provided for avro messages
	ValueSchemaID int               `json:"value_schema_id,omitempty"`
	Records       []*ProducerRecord `json:"records"`
}

ProducerMessage is the wrapper for the data to the kafka rest proxy producer endpoint

type ProducerOffsets

type ProducerOffsets struct {
	Partition int32  `json:"partition"`
	Offset    int64  `json:"offset"`
	ErrorCode int64  `json:"error_code"`
	Error     string `json:"error"`
}

ProducerOffsets are the resulting offsets for a produced set of messages

type ProducerRecord

type ProducerRecord struct {
	Key       json.RawMessage `json:"key,omitempty"`
	Value     json.RawMessage `json:"value"`
	Partition int32           `json:"partition,omitempty"`
}

ProducerRecord is an individual message to be produced on kafka

type ProducerResponse

type ProducerResponse struct {
	KeySchemaID   int                `json:"key_schema_id"`
	ValueSchemaID int                `json:"value_schema_id"`
	Offsets       []*ProducerOffsets `json:"offsets"`
}

ProducerResponse is the response the kafka rest proxy returns on message production

func Produce

func Produce(client HTTPClient, baseURL string, topic string, message *ProducerMessage, format Format) (*ProducerResponse, error)

Produce will publish the message to the topic

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL