kafka

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: Apache-2.0 Imports: 18 Imported by: 2

README

Venom - Executor Kafka

Step to use read / write on a Kafka topic. We also have possibility to use Avro schema to encode message in Kafka Topic.

Input

In your yaml file, you can use:

  - addrs mandatory
  - with_tls optional
  - with_sasl optional
  - with_sasl_handshaked optional
  - with_avro optional - describes if this test should expect Avro schema to be used. NOTE if you used it for consumer, you will have to use it for Producer too.
  - user optional
  - password optional
  - kafka_version optional, default is 0.10.2.0
  - insecure_tls optional, permit to allow self-signed certificates when using tls

  - client_type mandator: producer or consumer

  # for consumer client type:
  - group_id mandatory
  - topics mandatory
  - timeout optional
  - message_limit optional
  - initial_offset optional - Sarama default is newest
  - mark_offset optional
  - wait_for optional - Wait X seconds before returning the consumed
  messages from the topic.
  - key_filter optional - perform filtering per key

  # for producer client type:
  - messages
  - messages.topic - Topic where to post message
  - messages.headers - Headers for message (optional)
  - messages.value - Value for message
  - messages.valueFile - Take value for message from file provided here
  - messages.avroSchemaFile - Specify Avro schema file. messages.valueFile or messages.value should have value, which can be encoded with that schema. If not provided, then it will retrieve the latest available version from schema registry using Topic Name strategy, that is, ${topicName}-value as subject.

Example without Avro:

name: My Kafka testsuite
version: "2"
testcases:
- name: Kafka test
  description: Test Kafka
  steps:
  - type: kafka
    clientType: producer
    withSASL: true
    withTLS: true
    user: "{{.kafkaUser}}"
    password: "{{.kafkaPwd}}"
    addrs:
      - "{{.kafkaHost}}:{{.kafkaPort}}"
    messages:
    - topic: test-topic
      value: '{"hello":"bar"}'
  - type: kafka
    clientType: consumer
    withTLS: true
    withSASL: true
    user: "{{.kafkaUser}}"
    password: "{{.kafkaPwd}}"
    markOffset: true
    initialOffset: oldest
    messageLimit: 1
    groupID: venom
    addrs:
      - "{{.kafkaHost}}:{{.kafkaPort}}"
    topics:
      - test-topic
    assertions:
    - result.messagesjson.messagesjson0.value.hello ShouldEqual bar
    - result.messages.__len__ ShouldEqual 1

Example with Avro:

name: My Kafka testsuite
version: "2"
testcases:
- name: Kafka test
  description: Test Kafka
  steps:
  - type: kafka
    clientType: producer
    withSASL: true
    withTLS: true
    user: "{{.kafkaUser}}"
    password: "{{.kafkaPwd}}"
    addrs:
      - "{{.kafkaHost}}:{{.kafkaPort}}"
    messages:
    - topic: test-topic
      valueFile: "kafka/values/message2.json"
      avroSchemaFile: "kafka/schemas/message.avsc"
    - topic: test-topic
      valueFile: "kafka/values/message3.json"
  - type: kafka
    clientType: consumer
    withTLS: true
    withSASL: true
    user: "{{.kafkaUser}}"
    password: "{{.kafkaPwd}}"
    markOffset: true
    initialOffset: oldest
    messageLimit: 2
    groupID: venom
    addrs:
      - "{{.kafkaHost}}:{{.kafkaPort}}"
    topics:
      - test-topic
    assertions:
    - result.messagesjson.messagesjson0.value.id ShouldEqual 1
    - result.messagesjson.messagesjson0.value.message ShouldEqual "Some test"
    - result.messagesjson.messagesjson1.value.id ShouldEqual 2
    - result.messages.__len__ ShouldEqual 2

Documentation

Index

Constants

View Source
const (
	// Name of executor
	Name = "kafka"
)

Variables

This section is empty.

Functions

func Convert2Avro added in v1.0.0

func Convert2Avro(value []byte, schema string) ([]byte, error)

Convert2Avro will convert value to Avro encoded binary with help of schema

func ConvertFromAvro added in v1.0.0

func ConvertFromAvro(binary []byte, schema string) (string, error)

ConvertFromAvro will convert value from Avro encoded binary with help of schema to string

func CreateMessage added in v1.0.0

func CreateMessage(message []byte, schemaID int) ([]byte, error)

CreateMessage will convert Avro message to one, which can be sent to Kafka

func GetMessageAvroID added in v1.0.0

func GetMessageAvroID(messageValue []byte) ([]byte, int)

GetMessageAvroID will try to get encoded message Avro ID

func New

func New() venom.Executor

New returns a new Executor

Types

type Executor

type Executor struct {
	Addrs []string `json:"addrs,omitempty" yaml:"addrs,omitempty"`
	// Registry schema address
	SchemaRegistryAddr string `json:"schema_registry_addr,omitempty" yaml:"schemaRegistryAddr,omitempty"`
	WithAVRO           bool   `json:"with_avro,omitempty" yaml:"withAVRO,omitempty"`
	WithTLS            bool   `json:"with_tls,omitempty" yaml:"withTLS,omitempty"`
	WithSASL           bool   `json:"with_sasl,omitempty" yaml:"withSASL,omitempty"`
	WithSASLHandshaked bool   `json:"with_sasl_handshaked,omitempty" yaml:"withSASLHandshaked,omitempty"`
	User               string `json:"user,omitempty" yaml:"user,omitempty"`
	Password           string `json:"password,omitempty" yaml:"password,omitempty"`

	// TLS Config
	InsecureTLS bool `json:"insecure_tls,omitempty" yaml:"insecure_tls,omitempty"`

	// ClientType must be "consumer" or "producer"
	ClientType string `json:"client_type,omitempty" yaml:"clientType,omitempty"`

	// Used when ClientType is consumer
	GroupID string   `json:"group_id,omitempty" yaml:"groupID,omitempty"`
	Topics  []string `json:"topics,omitempty" yaml:"topics,omitempty"`
	// Represents the timeout for reading messages. In Seconds. Default 5
	Timeout int `json:"timeout,omitempty" yaml:"timeout,omitempty"`
	// WaitFor represents the time for reading messages without marking the test as failure.
	WaitFor int `json:"wait_for,omitempty" yaml:"waitFor,omitempty"`
	// Represents the limit of message will be read. After limit, consumer stop read message
	MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"`
	// InitialOffset represents the initial offset for the consumer. Possible value : newest, oldest. default: newest
	InitialOffset string `json:"initial_offset,omitempty" yaml:"initialOffset,omitempty"`
	// MarkOffset allows to mark offset when consuming message
	MarkOffset bool `json:"mark_offset,omitempty" yaml:"markOffset,omitempty"`

	// KeyFilter determines the key to filter from
	KeyFilter string `json:"key_filter,omitempty" yaml:"keyFilter,omitempty"`

	// Only one of JSON or Avro are currently supported
	ConsumerEncoding string `json:"consumer_encoding,omitempty" yaml:"consumerEncoding,omitempty"`

	// Used when ClientType is producer
	// Messages represents the message sended by producer
	Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"`

	// MessagesFile represents the messages into the file sended by producer (messages field would be ignored)
	MessagesFile string `json:"messages_file,omitempty" yaml:"messages_file,omitempty"`

	// Kafka version, default is 0.10.2.0
	KafkaVersion string `json:"kafka_version,omitempty" yaml:"kafka_version,omitempty"`
	// contains filtered or unexported fields
}

Executor represents a Test Exec

func (Executor) GetDefaultAssertions

func (Executor) GetDefaultAssertions() *venom.StepAssertions

GetDefaultAssertions return default assertions for type exec

func (Executor) Run

func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, error)

Run execute TestStep of type exec

func (Executor) ZeroValueResult

func (Executor) ZeroValueResult() interface{}

ZeroValueResult return an empty implementation of this executor result

type Message

type Message struct {
	Topic          string            `json:"topic" yaml:"topic"`
	Key            string            `json:"key" yaml:"key"`
	Headers        map[string]string `json:"headers,omitempty" yaml:"headers,omitempty"`
	Value          string            `json:"value,omitempty" yaml:"value,omitempty"`
	ValueFile      string            `json:"valueFile,omitempty" yaml:"valueFile,omitempty"`
	AvroSchemaFile string            `json:"avroSchemaFile,omitempty" yaml:"avroSchemaFile,omitempty"`
}

Message represents the object sended or received from kafka

type MessageJSON

type MessageJSON struct {
	Topic string
	Key   interface{}
	Value interface{}
}

MessageJSON represents the object sended or received from kafka

type Result

type Result struct {
	TimeSeconds  float64       `json:"timeseconds,omitempty" yaml:"timeSeconds,omitempty"`
	Messages     []Message     `json:"messages,omitempty" yaml:"messages,omitempty"`
	MessagesJSON []interface{} `json:"messagesjson,omitempty" yaml:"messagesJSON,omitempty"`
	Err          string        `json:"err" yaml:"error"`
}

Result represents a step result.

type SchemaRegistry added in v1.0.0

type SchemaRegistry interface {
	GetSchemaByID(id int) (string, error)
	RegisterNewSchema(subject, schema string) (int, error)
	GetLatestSchema(subject string) (int, string, error)
}

SchemaRegistry will provide interface to SchemaRegistry implementation

func NewSchemaRegistry added in v1.0.0

func NewSchemaRegistry(schemaRegistryHost string) (SchemaRegistry, error)

NewSchemaRegistry will create new Schema Registry interface

func NewWithClient added in v1.0.0

func NewWithClient(schemaRegistryHost string, httpClient *http.Client) (SchemaRegistry, error)

NewWithClient will add SchemaRegistry with client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL