kafkake

package module
v0.0.0-...-5d07973 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2020 License: MIT Imports: 13 Imported by: 0

README

kafkake

Go Report Card

Kafka client

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CacheState

type CacheState int

type Client

type Client struct {
	State         State
	Exchanger     *Exchanger
	Consumer      *kafka.Consumer
	ConsumerTopic string
	Producer      *kafka.Producer
	ProducerTopic string
}

Client is the interface to interact with a kafka cluster

func NewClient

func NewClient(config ClientConfig, requestTopic, responseTopic string, opts ...ClientOption) (*Client, error)

NewClient creates a new client object with the specified configuration and options

func (*Client) ExchangeLoop

func (c *Client) ExchangeLoop() error

ExchangeLoop starts a loop to indefinitely wait and handle messages on the Kafka cluster

func (*Client) ExchangeMessage

func (c *Client) ExchangeMessage() error

ExchangeMessage waits for a message and sends a message to the kafka cluster with the parameterized Exchanger

func (*Client) ReadMessage

func (c *Client) ReadMessage(duration time.Duration) (*kafka.Message, error)

ReadMessage waits for a message from the Kafka cluster to be received on the current request topic

func (*Client) SendMessage

func (c *Client) SendMessage(msg *kafka.Message) error

SendMessage sends a message from the Kafka cluster to be received on the current request topic

func (*Client) Shutdown

func (c *Client) Shutdown() error

Shutdown safely closes the client

type ClientConfig

type ClientConfig struct {
	BootstrapServers string
	SecurityProtocol string
	SaslMechanisms   string
	SaslUsername     string
	SaslPassword     string
}

ClientConfig defines the parameters to be used by the client

type ClientHandler

type ClientHandler func([]byte) ([]byte, error)

ClientHandler is a function that takes a specified object and creates a response

type ClientOption

type ClientOption func(client *Client)

ClientOption defines possible options that can be used to customize the client

func WithExchanger

func WithExchanger(e *Exchanger) ClientOption

WithExchanger customizes the client to use a specified Exchanger

func WithNewExchanger

func WithNewExchanger(h ClientHandler, reqSchema, respSchema interface{}) ClientOption

WithNewExchanger customizes the client to use a newly specified Exchanger

type Error

type Error struct {
	Message error `json:"message"`
}

Error is the interface for sending errors through the Kafka client

type Exchanger

type Exchanger struct {
	Client         *Client
	Handler        ClientHandler
	RequestSchema  interface{}
	ResponseSchema interface{}
}

Exchanger represents a request-response exchanger, handling requests and responses with the specified schemas

func NewExchanger

func NewExchanger(c *Client, h ClientHandler, reqSchema, respSchema interface{}) *Exchanger

NewExchanger creates a new message exchanger for a client with a custom handler for messages

func (*Exchanger) Handle

func (e *Exchanger) Handle(req *kafka.Message) (*kafka.Message, error)

Handle produces a kafka response message from a kafka request message

type Handler

type Handler func([]byte) ([]byte, error)

type KafkakeConfig

type KafkakeConfig struct {
	BootstrapServers string
	SecurityProtocol string
	SaslMechanisms   string
	SaslUsername     string
	SaslPassword     string
}

type MessageState

type MessageState struct {
	// contains filtered or unexported fields
}

type MessageStatus

type MessageStatus int

MessageState

type Processor

type Processor struct {
	ProcessorConfig
	State         State
	RequestTopic  string
	ResponseTopic string
	Consumer      *kafka.Consumer
	Producers     map[int32]*kafka.Producer
	Handler       Handler
	MessageStates map[string]*MessageState
}

func NewProcessor

func NewProcessor(config ProcessorConfig, requestTopic, responseTopic string, handler Handler) (*Processor, error)

func (*Processor) Shutdown

func (p *Processor) Shutdown() error

Shutdown safely closes the processor's consumer and producers

func (*Processor) Startup

func (p *Processor) Startup(wg *sync.WaitGroup, termChan chan bool) error

Startup starts the processor

type ProcessorConfig

type ProcessorConfig struct {
	BootstrapServers string
	SecurityProtocol string
	SaslMechanisms   string
	SaslUsername     string
	SaslPassword     string
}

type Requestor

type Requestor struct {
	KeyCache      *cache.Cache
	ValueCache    *cache.Cache
	Consumer      *kafka.Consumer
	ConsumerTopic string
	Producer      *kafka.Producer
	ProducerTopic string
}

func NewRequestor

func NewRequestor(config KafkakeConfig, requestTopic, responseTopic string) (*Requestor, error)

func (*Requestor) Collect

func (r *Requestor) Collect(msg *kafka.Message) (*kafka.Message, error)

func (*Requestor) Consume

func (r *Requestor) Consume(duration time.Duration)

func (*Requestor) Request

func (r *Requestor) Request(msg *kafka.Message)

type State

type State int

State represents the current state of the Kafkake client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL