performance

package
v0.3553.0-7.3.0-0-ce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

README

Performance Tests

The performance_test package contains functionality to orchestrate configurable tests that stress a Kafka cluster. It allows you to schedule scenarios, which consist of multiple tests.

We use Kafka's Trogdor framework to achieve this and plan on supporting multiple test types (throughput tests, connection/authentication/rebalance storms, etc).

Each kind of test has a different type, specified in a JSON format. Currently supported types:

  • ProgressiveWorkload
  • TailConsume

The framework allows you to schedule multiple tests at once with flexible schedules (one after the other, overlapping, etc)

Definitions

There can be confusion when reading this code because we divide it into many subdivisions of work, all with similar names. Here is what each subdivision means.

  • Workload (Java) - What is actually running inside Trogdor.
  • Task - This is the smallest unit of work in the Go client. This has a 1:1 mapping to a workload within Trogdor.
  • Step - A group of task all sharing the same configuration. In general, a step has an equal number of tasks as trogdor agents, but that can be overridden.
  • Fanout - This should be a property of a step and will duplicate the tasks linearly. For example: A fanout of 2 will double the tasks, and a fanout of 3 will triple it.
  • Scenario - A scenario is a metadata wrapper around step configurations.
  • Workload (Go) - An overarching configuration that defines all scenario configurations.
General JSON Parameters
  • scenario_name
  • schedule - defines when each test in the scenario should run in relation to one another
    • start_after_begin - a list of tasks that should all begin before the given task starts
    • start_after_end - a list of tasks that should all end before the given task starts
      • we do not support configuring both start_after_begin and start_after_end at once
    • start_delay - defines a duration that should pass before we start the task
    • run_until_end_of - defines the duration of the given test. It will run until the test with the latest end time in the list
  • test_definitions - defines the different tests this scenario will consist of
    • test_name - name of the test
    • test_type - defines the type of test we will run. Each test supports different test_parameters
    • test_parameters - custom parameters for this specific test type
Progressive Workload

We define a progressive workload as a continuous series of test scenarios where each step progressively issues more load on the cluster. Each step essentially consists of multiple Trogdor tasks. We schedule exactly one Trogdor task per Trogdor agent at any one given time.

  • workload_type - This currently only supports "Produce".
  • topic_name (optional) - The topic to produce to. The default is derived from the name parameter: [workload_name]-topic.
  • partition_count - The number of partitions the topic should have.
  • step_duration_ms - The duration, in milliseconds, of a single iteration.
  • start_throughput_mbs - The throughput, in MB/s, we want to start at.
  • end_throughput_mbs - The throughput, in MB/s, we want this test to end at (inclusive).
  • step_cooldown_ms (optional) - A configurable amount of time, in milliseconds, in between each iteration. Only applicable if the start and end throughput are different.
  • throughput_increase_mbs (optional) - The amount of throughput we want to progressively increase each step by. Only applicable if the start and end throughput are different.
  • message_size_bytes (optional) - An approximation of the message size. The default is 900 bytes.
  • message_padding_bytes (optional) - The amount of bytewise 0's to append the end of the message as padding so compression can work. The default is 100 bytes, and this value is not used unless message_size_bytes is specified as well.
  • tasks_per_step (optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.
  • slow_start_per_step_ms (optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by (N-1) * [slow_start_per_step_ms] milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.
Tail Consumer

A tail consumer test consists of multiple consumers subscribed to a topic. They read from the end of the log at all times with no throttling. We schedule exactly one Trogdor ConsumeBench task per Trogdor agent for every consumer group at any one given time.

  • fanout (optional) - Defines the number of consumer groups and sets the number of tasks created as [fanout] * [number of trogdor agents]. If consumer_group is specified, the test will not create a different consumer group for each fanout.
  • topics_from_test (optional) - The name of the test that produces to topics which consumers of this test will subscribe to. One and only one of this or topics below must be set.
    • It is expected for this produce test to be defined in the scenario, the tail consumer to be scheduled to run until the end of said produce test and for the topics field to not be populated.
  • topics (optional) - The topics these consumers will subscribe to. One and only one of this or topics_from_test below must be set.
  • duration (optional) - The duration, as a Go duration construct, that this test will run.
  • step_messages_per_second (optional) - The number of messages per second this workload will limit itself to. Note: This number is divided between each task per step (fanout). The default is math.MaxInt32.
  • tasks_per_step (optional) = The number of Trogdor tasks to create per step (fanout). The default is equal to the number of trogdor agents.
  • slow_start_per_step_ms (optional) - If specified, each task in a given step (fanout) will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by (N-1) * [slow_start_per_step_ms] milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.
  • consumer_group (optional) - Override the generated consumer groups and use this one instead. If specified, fanout does not generate new consumer groups, and all tasks are part of the same one.
Connection Stress

This creates a test that creates and closes connections rapidly.

  • duration - The duration, as a Go duration construct, that this test will run.
  • target_connections_per_sec - The number of connections to create and close per second. For best results, this should be a multiple of num_threads.
  • num_threads - The number of threads used per task/fanout. Overall work is split between all available threads.
  • action - The action this test will take. Valid values are:
    • CONNECT - This uses basic Java connection classes to initiate a TCP connection, skipping all Kafka client code.
    • FETCH_METADATA - This uses the Kafka AdminClient to perform a fetch of basic cluster metadata.
  • tasks_per_step (optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.
  • slow_start_per_step_ms (optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by (N-1) * [slow_start_per_step_ms] milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.
  • fanout - The amount of times to duplicate this workload as a new step.
Sustained Connections

This creates a test that generates sustained connections against Kafka. There are three different components we can stress with this, KafkaConsumer, KafkaProducer, and AdminClient. This test tries to use minimal bandwidth per connection to reduce overhead impacts.

  • duration - The duration, as a Go duration construct, that this test will run.
  • producer_connection_count - The total amount of producer connections to maintain per task.
  • consumer_connection_count - The total amount of consumer connections to maintain per task.
  • metadata_connection_count - The total amount of metadata connections to maintain per task.
  • num_threads - The number of threads used per task/fanout to maintain the above connections.
  • refresh_rate_ms - The rate in which to refresh every connection.
  • topic_name (optional) - The topic that this test will run against. This must be specified if either producerConnectionCount or consumerConnectionCount are greater than 0.
  • message_size_bytes (optional) - The size, in bytes, for the produce task to use when sending records. The default is 512 bytes.
  • tasks_per_step (optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.
  • slow_start_per_step_ms (optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by (N-1) * [slow_start_per_step_ms] milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.
  • fanout - The amount of times to duplicate this workload as a new step.

Note: If a 1:1:1 connection ratio is used, you don't see an equal number of connections on the brokers as are specified in the test. Testing has shown results with a decrease of 2.4x the number of connections specified. For example, 333:333:333 connections will result in about 999/2.4 = 413 connections on the cluster.

Single Test Example

See example.json for a sample configuration. A configuration like

{
  "scenario_name": "ExampleTest",
  "test_definitions": [{
    "test_type": "ProgressiveWorkload",
    "test_name": "test-produce",
    "test_parameters": {
      "workload_type": "Produce",
      "step_duration_ms": 60000,
      "partition_count": 10,
      "step_cooldown_ms": 60000,
      "start_throughput_mbs": 10,
      "end_throughput_mbs": 20,
      "throughput_increase_per_step_mbs": 5,
      "message_size_bytes": 1000
    }
  }]
}

would result in 3 steps, consisting of the following throughputs (10 MB/s, 15 MB/s, 20 MB/s). Each step would last one minute and there would be one minute of downtime in between each step. Note that the schedule field is optional. If omitted, all tasks get scheduled at once.

Multi-Test Example
{
  "scenario_name": "TestCPKAFKA",
  "schedule": {
    "A": {},
    "B": {
      "start_delay": "1m",
      "start_after_begin": ["A"]
    },
    "C": {
      "run_until_end_of": ["A"]
    },
    "D": {
      "start_delay": "0s",
      "start_after_begin": ["B"],
      "run_until_end_of": ["B"]
    }
  },
  "test_definitions": [
    {
      "test_type": "ProgressiveWorkload",
      "test_name": "A",
      "test_parameters": {
        "workload_type": "Produce",
        "step_duration_ms": 60000,
        "partition_count": 10,
        "step_cooldown_ms": 60000,
        "start_throughput_mbs": 10,
        "end_throughput_mbs": 20,
        "throughput_increase_per_step_mbs": 5,
        "message_size_bytes": 1000
      }
    },
    {
      "test_type": "ProgressiveWorkload",
      "test_name": "B",
      "test_parameters": {
        "workload_type": "Produce",
        "step_duration_ms": 30000,
        "partition_count": 15,
        "step_cooldown_ms": 1000,
        "start_throughput_mbs": 10,
        "end_throughput_mbs": 20,
        "throughput_increase_per_step_mbs": 5,
        "message_size_bytes": 255
      }
    },
    {
      "test_type": "TailConsume",
      "test_name": "C",
      "test_parameters": {
        "fanout": 2,
        "topics_from_test": "A"
      }
    },
    {
      "test_type": "TailConsume",
      "test_name": "D",
      "test_parameters": {
        "fanout": 2,
        "topics_from_test": "B"
      }
    }
  ]
}

In this example, we have two produce tasks. The second task, B, will start one minute after A starts. Task C will start in the beginning with A and run until A ends, and task D will start with B and run until B ends.

How to Run

Pre-requisite: Have Trogdor and the soak clients helm charts deployed. (see cc-services/README.md)

# open a shell into the running soak-clients pod
$ kubectl get pods --all-namespaces | grep clients-cli
soak-tests          cc-soak-clients-clients-cli-76568867b5-bcmdh                 0/1     Running              0          1h
$ kubectl exec -it -n soak-tests cc-soak-clients-clients-cli-76568867b5-bcmdh sh

Once inside the pod, create a JSON test configuration and run the tests with it:

vi /mnt/test/test_config.json
export PERFORMANCE_TEST_CONFIG_PATH=/mnt/test/test_config.json
./soak-clients performance-tests

Documentation

Index

Constants

View Source
const CONNECTION_STRESS_TEST_TYPE = "ConnectionStress"
View Source
const PRODUCE_WORKLOAD_TYPE = "Produce"
View Source
const PROGRESSIVE_WORKLOAD_TEST_TYPE = "ProgressiveWorkload"
View Source
const SUSTAINED_CONNECTION_TEST_TYPE = "SustainedConnection"
View Source
const TAIL_CONSUMER_TEST_TYPE = "TailConsume"

Variables

This section is empty.

Functions

func Run

func Run(testConfigPath string, trogdorCoordinatorHost string, trogdorAgentsCount int, bootstrapServers string)

Types

type ConnectionStress

type ConnectionStress struct {
	Name string

	Duration                common.Duration `json:"duration"`
	TargetConnectionsPerSec int             `json:"target_connections_per_sec"`
	NumThreads              int             `json:"num_threads"`
	Fanout                  int             `json:"fanout"`
	TasksPerStep            int             `json:"tasks_per_step"`
	Action                  string          `json:"action"`
	SlowStartPerStepMs      uint64          `json:"slow_start_per_step_ms"`
	// contains filtered or unexported fields
}

func (*ConnectionStress) CreateTest

func (cs *ConnectionStress) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)

func (*ConnectionStress) GetDuration

func (cs *ConnectionStress) GetDuration() time.Duration

func (*ConnectionStress) GetEndTime

func (cs *ConnectionStress) GetEndTime() (time.Time, error)

func (*ConnectionStress) GetName

func (cs *ConnectionStress) GetName() string

func (*ConnectionStress) GetStartTime

func (cs *ConnectionStress) GetStartTime() (time.Time, error)

func (*ConnectionStress) SetEndTime

func (cs *ConnectionStress) SetEndTime(endTime time.Time)

func (*ConnectionStress) SetStartTime

func (cs *ConnectionStress) SetStartTime(startTime time.Time)

type NotEnoughContextError

type NotEnoughContextError struct {
	// contains filtered or unexported fields
}

the NotEnoughContextError error indicates that a test needs more than the provided scenario context to be parsed correctly

func (*NotEnoughContextError) Error

func (nec *NotEnoughContextError) Error() string

type PerformanceTestConfig

type PerformanceTestConfig struct {
	Type       string          `json:"test_type"`
	Name       string          `json:"test_name"`
	Parameters json.RawMessage `json:"test_parameters"`
	// contains filtered or unexported fields
}

PerformanceTestConfig is a generic definition of a test. It it meant to support different types of tests, each of which define their own set of test_parameters Each test should implement the SchedulableTest interface

func (*PerformanceTestConfig) CreateTest

func (ptc *PerformanceTestConfig) CreateTest(trogdorAgentsCount int, bootstrapServers string) ([]trogdor.TaskSpec, error)

func (*PerformanceTestConfig) ParseTest

func (ptc *PerformanceTestConfig) ParseTest(context *ScenarioContext) error

ParseTest() parses the configuration into the concrete test struct it can return a retriable error of type NotEnoughContext which means that we should try parsing this test again when we have more context from the scenario Parsing will be done only once, if successful

type ScenarioContext

type ScenarioContext struct {
	TestsWithTopics  map[string]TestWithTopics
	SchedulableTests map[string]SchedulableTest
}

ScenarioContext holds the tests that are parsed for this scenario run

func (*ScenarioContext) AddSchedulableTest

func (sc *ScenarioContext) AddSchedulableTest(st SchedulableTest)

func (*ScenarioContext) AddTestWithTopics

func (sc *ScenarioContext) AddTestWithTopics(twt TestWithTopics)

type ScenarioTestConfig

type ScenarioTestConfig struct {
	Name               string                   `json:"scenario_name"`
	TestDefinitions    []*PerformanceTestConfig `json:"test_definitions"`
	ScheduleDefinition ScheduleDef              `json:"schedule"`
	// contains filtered or unexported fields
}

ScenarioTestConfig is the top-most definition for all the performance tests scheduled to run

func (*ScenarioTestConfig) CreateSchedules

func (sct *ScenarioTestConfig) CreateSchedules(startTime time.Time) error

CreateSchedules() parses the user-defined scheduling and sets each test's startTime/endTime accordingly

func (*ScenarioTestConfig) CreateTests

func (sct *ScenarioTestConfig) CreateTests(trogdorAgentsCount int, bootstrapServers string) ([]trogdor.TaskSpec, error)

CreateTests() creates all the Trogdor tasks for each test that is part of this scenario. It requires that the config is parsed and that the schedules are created

func (*ScenarioTestConfig) ParseConfig

func (sct *ScenarioTestConfig) ParseConfig(configPath string) error

type SchedulableTest

type SchedulableTest interface {
	// CreateTest() should return Trogdor task specifications that compose the whole test.
	// Said tasks should start no earlier than GetStartTime(),
	// should have at least one tasks that ends at GetEndTime() and none ending later than that.
	CreateTest(trogdorAgentsCount int, bootstrapServers string) ([]trogdor.TaskSpec, error)

	GetName() string

	// returns the duration of the test. If the test is scheduled to run until another test, this method should return 0
	GetDuration() time.Duration
	// returns an error if StartTime is not set
	GetStartTime() (time.Time, error)
	// returns an error if EndTime is not set
	GetEndTime() (time.Time, error)

	SetStartTime(time.Time)
	SetEndTime(time.Time)
}

SchedulableTest is an interface for a test that is schedulable. To be eligible for scheduling, the test should have a known duration time or be scheduled to run until a test with a known duration time. After the scheduling is determined, the start and end times of the test

will be set via the appropriate methods

type Schedule

type Schedule struct {
	StartDelay      string   `json:"start_delay"`
	StartAfterBegin []string `json:"start_after_begin"`
	StartAfterEnd   []string `json:"start_after_end"`
	RunUntilEndOf   []string `json:"run_until_end_of"`
}

type ScheduleDef

type ScheduleDef map[string]*Schedule

type Step

type Step struct {
	// contains filtered or unexported fields
}

a Step is a part of a Workload. It is to be converted to multiple Trogdor tasks which in combination achieve the desired throughput

type SustainedConnection

type SustainedConnection struct {
	Name string

	Duration                common.Duration `json:"duration"`
	ProducerConnectionCount uint64          `json:"producer_connection_count"`
	ConsumerConnectionCount uint64          `json:"consumer_connection_count"`
	MetadataConnectionCount uint64          `json:"metadata_connection_count"`
	NumThreads              uint64          `json:"num_threads"`
	Fanout                  int             `json:"fanout"`
	TasksPerStep            int             `json:"tasks_per_step"`
	RefreshRateMs           uint64          `json:"refresh_rate_ms"`
	TopicName               string          `json:"topic_name"`
	MessageSizeBytes        uint64          `json:"message_size_bytes"`
	SlowStartPerStepMs      uint64          `json:"slow_start_per_step_ms"`
	// contains filtered or unexported fields
}

func (*SustainedConnection) CreateTest

func (sc *SustainedConnection) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)

func (*SustainedConnection) GetDuration

func (sc *SustainedConnection) GetDuration() time.Duration

func (*SustainedConnection) GetEndTime

func (sc *SustainedConnection) GetEndTime() (time.Time, error)

func (*SustainedConnection) GetName

func (sc *SustainedConnection) GetName() string

func (*SustainedConnection) GetStartTime

func (sc *SustainedConnection) GetStartTime() (time.Time, error)

func (*SustainedConnection) SetEndTime

func (sc *SustainedConnection) SetEndTime(endTime time.Time)

func (*SustainedConnection) SetStartTime

func (sc *SustainedConnection) SetStartTime(startTime time.Time)

type TailConsumer

type TailConsumer struct {
	Name string

	// the ProduceTestName must refer to a test that implements the TopicWithTests interface
	ProduceTestName       string          `json:"topics_from_test"`
	Fanout                int             `json:"fanout"`
	Topics                []string        `json:"topics"`
	Duration              common.Duration `json:"duration"`
	StepMessagesPerSecond uint64          `json:"step_messages_per_second"`
	TasksPerStep          int             `json:"tasks_per_step"`
	SlowStartPerStepMs    uint64          `json:"slow_start_per_step_ms"`
	ConsumerGroup         string          `json:"consumer_group"`
	// contains filtered or unexported fields
}

func (*TailConsumer) CreateTest

func (tc *TailConsumer) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)

func (*TailConsumer) GetDuration

func (tc *TailConsumer) GetDuration() time.Duration

func (*TailConsumer) GetEndTime

func (tc *TailConsumer) GetEndTime() (time.Time, error)

func (*TailConsumer) GetName

func (tc *TailConsumer) GetName() string

func (*TailConsumer) GetStartTime

func (tc *TailConsumer) GetStartTime() (time.Time, error)

func (*TailConsumer) SetEndTime

func (tc *TailConsumer) SetEndTime(endTime time.Time)

func (*TailConsumer) SetStartTime

func (tc *TailConsumer) SetStartTime(startTime time.Time)

type TestWithTopics

type TestWithTopics interface {
	GetName() string
	// TopicNames() should return all the topics this test will use
	TopicNames() []string
}

TestWithTopics is an interface for a test that makes use of topics.

type Workload

type Workload struct {
	Name                  string
	Type                  string  `json:"workload_type"`
	PartitionCount        uint64  `json:"partition_count"`
	StepDurationMs        uint64  `json:"step_duration_ms"`
	StepCooldownMs        uint64  `json:"step_cooldown_ms"`
	StartThroughputMbs    float32 `json:"start_throughput_mbs"`
	EndThroughputMbs      float32 `json:"end_throughput_mbs"`
	ThroughputIncreaseMbs float32 `json:"throughput_increase_per_step_mbs"`
	MessageSizeBytes      uint64  `json:"message_size_bytes"`
	MessagePaddingBytes   uint64  `json:"message_padding_bytes"`
	TasksPerStep          int     `json:"tasks_per_step"`
	SlowStartPerStepMs    uint64  `json:"slow_start_per_step_ms"`
	TopicName             string  `json:"topic_name"`
	// contains filtered or unexported fields
}

func (*Workload) CreateTest

func (workload *Workload) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)

Returns all the Trogdor tasks that should be ran as part of this workload

func (*Workload) GetDuration

func (workload *Workload) GetDuration() time.Duration

func (*Workload) GetEndTime

func (workload *Workload) GetEndTime() (time.Time, error)

func (*Workload) GetName

func (workload *Workload) GetName() string

func (*Workload) GetStartTime

func (workload *Workload) GetStartTime() (time.Time, error)

func (*Workload) SetEndTime

func (workload *Workload) SetEndTime(endTime time.Time)

func (*Workload) SetStartTime

func (workload *Workload) SetStartTime(startTime time.Time)

func (*Workload) TopicNames

func (workload *Workload) TopicNames() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL