server

package
v0.0.0-...-e6c7ec5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerManager

type ConsumerManager interface {
	SetOffset(group, topic string, id int64) error
	GetOffset(group, topic string, reqID int64) (id int64, Unlock func(), err error)
}

type Distributor

type Distributor interface {
	GetTopicOwner(topic string) (string, error)
}

type Logger

type Logger interface {
	Errorf(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Debugf(format string, args ...interface{})
}

Logger is a handler for log messages of varying severities

type Metrics

type Metrics interface {
	ProduceMsgs(int)
	ConsumeMsgs(int)
}

Metrics allows for custom metric handlers for counting the number of messages and/or batch size

type Option

type Option func(*Server) error

Option represents a optional function argument to NewServer

func WithConsumerManager

func WithConsumerManager(consumerManager ConsumerManager) Option

WithConsumerManager overrides the default consumer manager

func WithDefaultConsumeLimit

func WithDefaultConsumeLimit(n int64) Option

WithDefaultConsumeLimit sets the default consume limit for clients that consume with limit < 0

func WithDefaultQueue

func WithDefaultQueue(dirs []string, cache bool, entries int64) Option

WithDefaultQueue sets the queue

func WithDistributor

func WithDistributor(distributor Distributor) Option

WithDistributor overrides the default distributor

func WithFileQueue

func WithFileQueue(dirs []string, cache bool, entries int64) Option

WithFileQueue sets the queue

func WithLogger

func WithLogger(logger Logger) Option

WithLogger adds a logger to the server

func WithMetrics

func WithMetrics(metrics Metrics) Option

WithMetrics sets the handler for produce and consume metrics

func WithMiddleware

func WithMiddleware(middleware ...func(http.Handler) http.Handler) Option

WithMiddleware adds the given middleware to the endpoints defined in the http router

func WithPublicAddr

func WithPublicAddr(addr string) Option

WithPublicAddr sets the public address of the current server

func WithQueue

func WithQueue(q Queue) Option

WithQueue overrides the default file queue

func WithWebsocketInterval

func WithWebsocketInterval(d time.Duration) Option

WithWebsocketInterval sets the interval between pings for a websocket connection

type Queue

type Queue interface {
	Close() error

	ListTopics(regex *regexp.Regexp) ([]string, error)
	CreateTopic(topic string) error
	DeleteTopic(topic string) error
	ModifyTopic(topic string, request headers.ModifyRequest) (*headers.TopicInfo, error)
	WatchTopics(topics []string) (written, deleted chan string, closer io.Closer, err error)

	Produce(topic string, msgSizes []int64, timestamp uint64, r io.Reader) error
	Consume(group, topic string, id int64, limit int64, w http.ResponseWriter) (int, error)
}

Queue is the interface used by the server to produce and consume messages from different distinct categories called topics

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an http server on top of the given queue (defaults to a file based queue)

func NewServer

func NewServer(options ...Option) (*Server, error)

NewServer creates a new server with the given options

func (*Server) Close

func (s *Server) Close() error

Close closes the server and returns any associated errors

func (*Server) HandleConsume

func (s *Server) HandleConsume(w http.ResponseWriter, r *http.Request)

HandleConsume handles requests to the /topics/... endpoints with method == GET. It will retrieve messages from the queue topic

func (*Server) HandleCreateTopic

func (s *Server) HandleCreateTopic(w http.ResponseWriter, r *http.Request)

HandleCreateTopic handles requests to the /topics/... endpoints with method == PUT. It will create a topic if the topic does not exist.

func (*Server) HandleDeleteTopic

func (s *Server) HandleDeleteTopic(w http.ResponseWriter, r *http.Request)

HandleDeleteTopic handles requests to the /topics/... endpoints with method == DELETE. It will delete a topic if the topic exists.

func (*Server) HandleGetAllTopics

func (s *Server) HandleGetAllTopics(w http.ResponseWriter, r *http.Request)

HandleGetAllTopics handles requests to the /topics endpoints with method == GET. It returns all topics currently defined in the queue as either a json or csv depending on the request content-type header

func (*Server) HandleModifyTopic

func (s *Server) HandleModifyTopic(w http.ResponseWriter, r *http.Request)

HandleModifyTopic handles requests to the /topics/... endpoints with method == PATCH. It will modify the topic if the topic exists. This is used to truncate topics by message offset or mod time.

func (*Server) HandleOptions

func (s *Server) HandleOptions(w http.ResponseWriter, r *http.Request)

HandleOptions handles requests to the /topics/... endpoints with method == OPTIONS

func (*Server) HandleProduce

func (s *Server) HandleProduce(w http.ResponseWriter, r *http.Request)

HandleProduce handles requests to the /topics/... endpoints with method == POST. It will add the given messages to the queue topic

func (*Server) HandleWatchTopics

func (s *Server) HandleWatchTopics(w http.ResponseWriter, r *http.Request)

HandleWatchTopics accepts websocket connections and watches the topic files for writes

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

type TestLogger

type TestLogger struct {
	TB testing.TB
}

func (TestLogger) Debugf

func (t TestLogger) Debugf(format string, args ...interface{})

func (TestLogger) Errorf

func (t TestLogger) Errorf(format string, args ...interface{})

func (TestLogger) Infof

func (t TestLogger) Infof(format string, args ...interface{})

func (TestLogger) Warnf

func (t TestLogger) Warnf(format string, args ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL