dekaf

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2021 License: MIT Imports: 11 Imported by: 0

README

DEKAF

PkgGoDev

This is a Kafka emulator that has just enough functionality to serve as a data provider to a handful of systems. It may not be compatible with all Kafka clients. The API version it supports is roughly equivalent to Kakfa 0.10.

It is heavily based on this repo: https://github.com/travisjeffery/jocko

It is designed to be used as a library and there are several examples.

See Examples here.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ClusterID = "dekafclusterid"

ClusterID we will use when talking to clients.

Functions

This section is empty.

Types

type Config

type Config struct {
	// The Host we should tell Kafka clients to connect to.
	Host string
	// The Port we should tell Kafka clients to connect to.
	Port int32
	// The maximum number of messages we will provide per topic.
	// Defaults to 10 if not set.
	MaxMessagesPerTopic int
	// How long to wait for messages from the provider.
	// The config value will take precedence followed by the client request time
	// and finally if neither are set, will default to 5 seconds.
	MessageWaitDeadline time.Duration
	// Debug dumps message request/response.
	Debug bool
}

Config defines the handler config

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context holds a Kafka request/response pair for processing.

func (*Context) Deadline

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

func (*Context) Err

func (ctx *Context) Err() error

func (*Context) Header

func (c *Context) Header() *protocol.RequestHeader

func (*Context) Request

func (ctx *Context) Request() interface{}

func (*Context) Response

func (ctx *Context) Response() interface{}

func (*Context) String

func (ctx *Context) String() string

func (*Context) Value

func (ctx *Context) Value(key interface{}) interface{}

type Handler

type Handler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Handler configuration.

func NewHandler

func NewHandler(config Config) (*Handler, error)

func (*Handler) AddTopic

func (h *Handler) AddTopic(name string, mp MessageProvider)

AddTopic adds a new topic to the server and registers the MessageProvider with that topic.

func (*Handler) Run

func (h *Handler) Run(ctx context.Context, requests <-chan *Context, responses chan<- *Context)

Run starts a loop to handle requests send back responses.

func (*Handler) Shutdown

func (h *Handler) Shutdown() error

Shutdown the handler.

type MessageProvider

type MessageProvider func(ctx context.Context, startOffset int64) (int64, []byte, error)

A MessageProvider function is used to provide messages for a topic. The handler will request a message at startOffset. The MessageProvider should return a message offset, payload and error to the request. If there are no more messages return io.EOF for the error. This function may block up until the provided context.Context cancels in which case it should return io.EOF.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is used to handle the TCP connections, decode requests, defer to the handler, and encode the responses.

func NewServer

func NewServer(ctx context.Context, listen string, handler *Handler) (*Server, error)

NewServer creates a server using the passed handler

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr returns the address on which the Server is listening

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown closes the service.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL