consumer

package
v1.0.0-k8s-kafka-proxy... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2019 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoQueueAddresses = errors.New("no kafka-rest-proxy addresses configured")

Functions

This section is empty.

Types

type AgeingClient

type AgeingClient struct {
	Client *http.Client
	MaxAge time.Duration
	Logger *log.UPPLogger
}

AgeingClient defines an ageing http client for consuming messages

func NewAgeingClient

func NewAgeingClient(client *http.Client, maxAge time.Duration, logger *log.UPPLogger) (AgeingClient, error)

NewAgeingClient returns a new instance of AgeingClient. It guarantees that all required properties are set

func (AgeingClient) StartAgeingProcess

func (c AgeingClient) StartAgeingProcess()

StartAgeingProcess periodically close idle connections according to the MaxAge of an AgeingClient

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides methods to consume messages from a kafka proxy

func (*Consumer) ConnectivityCheck

func (c *Consumer) ConnectivityCheck() (string, error)

ConnectivityCheck returns the connection status with the kafka proxy

func (*Consumer) Start

func (c *Consumer) Start()

Start is a method that triggers the consumption of messages from the queue Start is a blocking methode, it will return only when Stop() is called. If you don't want to block start it in a different goroutine.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop is a methode to stop the consumer

type Message

type Message struct {
	Headers map[string]string
	Body    string
}

Message defines the consumed messages

type MessageConsumer

type MessageConsumer interface {
	Start()
	Stop()
	ConnectivityCheck() (string, error)
}

MessageConsumer is a high level generic interface for consumers.

Start triggers the consumption of messages.

Stop method stops the consumption of messages.

ConnectivityCheck implements the logic to check the current connectivity to the queue. The method should return a message about the status of the connection and an error in case of connectivity failure.

func NewAgeingConsumer

func NewAgeingConsumer(config QueueConfig, handler func(m Message), agingClient AgeingClient, logger *log.UPPLogger) MessageConsumer

NewAgeingConsumer returns a new instance of a Consumer with an AgeingClient

func NewBatchedConsumer

func NewBatchedConsumer(config QueueConfig, handler func(m []Message), client *http.Client, logger *log.UPPLogger) MessageConsumer

NewBatchedConsumer returns a Consumer to manage batches of messages

func NewConsumer

func NewConsumer(config QueueConfig, handler func(m Message), client *http.Client, logger *log.UPPLogger) MessageConsumer

NewConsumer returns a new instance of a Consumer

type QueueConfig

type QueueConfig struct {
	Addrs                []string `json:"address"` //list of queue addresses.
	Group                string   `json:"group"`
	Topic                string   `json:"topic"`
	Queue                string   `json:"queue"` //The name of the queue.
	Offset               string   `json:"offset"`
	BackoffPeriod        int      `json:"backoffPeriod"`
	StreamCount          int      `json:"streamCount"`
	ConcurrentProcessing bool     `json:"concurrentProcessing"`
	AuthorizationKey     string   `json:"authorizationKey"`
	AutoCommitEnable     bool     `json:"autoCommitEnable"`
	NoOfProcessors       int      `json:"noOfProcessors"`
}

QueueConfig represents the configuration of the queue, consumer group and topic the consumer interested about.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL