evaluator

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package evaluator - Group evaluation subsystem. The evaluator subsystem is responsible for fetching group information from the storage subsystem and calculating the group's status based on that. It responds to EvaluatorRequest objects that are send via a channel, and replies with a ConsumerGroupStatus.

Modules

Currently, only one module is provided:

* caching - Evaluate a consumer group and cache the results in memory for a short period of time

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CachingEvaluator

type CachingEvaluator struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger

	RequestChannel chan *protocol.EvaluatorRequest
	// contains filtered or unexported fields
}

CachingEvaluator is an evaluator module that responds to evaluation requests and checks consumer status using the standard Burrow definitions for stall, stop, and lag. The results are stored in an in-memory cache for a configurable amount of time, in order to avoid duplication of work when multiple modules evaluate the same consumer group.

func (*CachingEvaluator) Configure

func (module *CachingEvaluator) Configure(name, configRoot string)

Configure validates the configuration for the module, creates a channel to receive requests on, and sets up the cache. If no expiration time for cache entries is set, a default value of 10 seconds is used. If there is any problem starting the goswarm cache, this func panics.

func (*CachingEvaluator) GetCommunicationChannel

func (module *CachingEvaluator) GetCommunicationChannel() chan *protocol.EvaluatorRequest

GetCommunicationChannel returns the RequestChannel that has been setup for this module.

func (*CachingEvaluator) Start

func (module *CachingEvaluator) Start() error

Start instantiates the main loop that listens for evaluation requests and returns the result

func (*CachingEvaluator) Stop

func (module *CachingEvaluator) Stop() error

Stop closes the module's RequestChannel, which also terminates the main loop that responds to requests

type Coordinator

type Coordinator struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

Coordinator manages a single evaluator module (only one module is supported at this time), making sure it is configured, started, and stopped at the appropriate time. It is also responsible for listening to the EvaluatorChannel that is provided in the application context and forwarding those requests to the evaluator module. If no evaluator module has been configured explicitly, the coordinator starts the caching module as a default.

func StorageAndEvaluatorCoordinatorsWithOffsets

func StorageAndEvaluatorCoordinatorsWithOffsets() (*Coordinator, *storage.Coordinator)

StorageAndEvaluatorCoordinatorsWithOffsets sets up a Coordinator with a single caching module defined. In order to do this, it also calls the storage subsystem fixture to get a configured storage.Coordinator with offsets for a test cluster and group. This func should never be called in normal code. It is only provided to facilitate testing by other subsystems.

func (*Coordinator) Configure

func (ec *Coordinator) Configure()

Configure is called to create the configured evaluator module and call its Configure func to validate the configuration and set it up. The coordinator will panic is more than one module is configured, and if no modules have been configured, it will set up a default caching evaluator module. If there are any problems, it is expected that this func will panic with a descriptive error message, as configuration failures are not recoverable errors.

func (*Coordinator) Start

func (ec *Coordinator) Start() error

Start calls the evaluator module's underlying Start func. If the module Start returns an error, this func stops immediately and returns that error to the caller.

We also start a request forwarder goroutine. This listens to the EvaluatorChannel that is provided in the application context that all modules receive, and forwards those requests to the evaluator modules. At the present time, the evaluator only supports one module, so this is a simple "accept and forward".

func (*Coordinator) Stop

func (ec *Coordinator) Stop() error

Stop calls the configured evaluator module's underlying Stop func. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure

type Module

type Module interface {
	protocol.Module
	GetCommunicationChannel() chan *protocol.EvaluatorRequest
}

Module is responsible for answering requests to evaluate the status of a consumer group. It fetches offset information from the storage subsystem and transforms that into a protocol.ConsumerGroupStatus response. It conforms to the overall protocol.Module interface, but it adds a func to fetch the channel that the module is listening on for requests, so that requests can be forwarded to it by the coordinator

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL