controllers

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CLUSTER_NETWORK_NAME = "kafka-cluster"
	ZOOKEEPER_PORT       = "2181"
	KAFKA_BROKER_PORT    = "9092"
	KAFKA_CLIENT_PORT    = "9093"
	KAFKA_BROKER_ID      = "1"
	ZOOKEEPER_IMAGE      = "confluentinc/cp-zookeeper:5.5.2"
	KAFKA_IMAGE          = "confluentinc/cp-kafka:5.5.2"
)
View Source
const (
	CleanupPolicy                        = "cleanup.policy"
	CompressionType                      = "compression.type"
	DeleteRetentionMs                    = "delete.retention.ms"
	FileDeleteDelayMs                    = "file.delete.delay.ms"
	FlushMessages                        = "flush.messages"
	FlushMs                              = "flush.ms"
	FollowerReplicationThrottledReplicas = "follower.replication.throttled.replicas"
	IndexIntervalBytes                   = "index.interval.bytes"
	LeaderReplicationThrottledReplicas   = "leader.replication.throttled.replicas"
	MaxMessageBytes                      = "max.message.bytes"
	MessageDownconversionEnable          = "message.downconversion.enable"
	MessageFormatVersion                 = "message.format.version"
	MessageTimestampDifferenceMaxMs      = "message.timestamp.difference.max.ms"
	MessageTimestampType                 = "message.timestamp.type"
	MinCleanableDirtyRatio               = "min.cleanable.dirty.ratio"
	MinCompactionLagMs                   = "min.compaction.lag.ms"
	MaxCompactionLagMs                   = "max.compaction.lag.ms"
	MinInsyncReplicas                    = "min.insync.replicas"
	Preallocate                          = "preallocate"
	RetentionBytes                       = "retention.bytes"
	RetentionMs                          = "retention.ms"
	SegmentBytes                         = "segment.bytes"
	SegmentIndexBytes                    = "segment.index.bytes"
	SegmentJitterMs                      = "segment.jitter.ms"
	SegmentMs                            = "segment.ms"
	UncleanLeaderElectionEnable          = "unclean.leader.election.enable"
)

Variables

View Source
var KafkaTopicConfigTestData = map[string][]KafkaTopicConfigHolder{
	CleanupPolicy: {
		{
			infrav1beta1.CleanupPolicyCompact, infrav1beta1.CleanupPolicyCompact, func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					CleanupPolicy: &vv,
				}
			},
		},
	},
	CompressionType: {
		{
			infrav1beta1.CompressionTypeSnappy, infrav1beta1.CompressionTypeSnappy, func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					CompressionType: &vv,
				}
			},
		},
	},
	DeleteRetentionMs: {
		{
			int64(60000), "60000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					DeleteRetentionMs: &vv,
				}
			},
		},
	},
	FileDeleteDelayMs: {
		{
			int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					FileDeleteDelayMs: &vv,
				}
			},
		},
	},
	FlushMessages: {
		{
			int64(5), "5", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					FlushMessages: &vv,
				}
			},
		},
	},
	FlushMs: {
		{
			int64(666), "666", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					FlushMs: &vv,
				}
			},
		},
	},
	FollowerReplicationThrottledReplicas: {
		{
			"*", "*", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					FollowerReplicationThrottledReplicas: &vv,
				}
			},
		},
	},
	IndexIntervalBytes: {
		{
			int64(2048), "2048", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					IndexIntervalBytes: &vv,
				}
			},
		},
	},
	LeaderReplicationThrottledReplicas: {
		{
			"*", "*", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					LeaderReplicationThrottledReplicas: &vv,
				}
			},
		},
	},
	MaxMessageBytes: {
		{
			int64(999999), "999999", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					MaxMessageBytes: &vv,
				}
			},
		},
	},
	MessageDownconversionEnable: {
		{
			true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(bool)
				return &infrav1beta1.KafkaTopicConfig{
					MessageDownconversionEnable: &vv,
				}
			},
		},
	},
	MessageFormatVersion: {
		{
			"0.10.0", "0.10.0", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					MessageFormatVersion: &vv,
				}
			},
		},
	},
	MessageTimestampDifferenceMaxMs: {
		{
			int64(10), "10", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					MessageTimestampDifferenceMaxMs: &vv,
				}
			},
		},
	},
	MessageTimestampType: {
		{
			"LogAppendTime", "LogAppendTime", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(string)
				return &infrav1beta1.KafkaTopicConfig{
					MessageTimestampType: &vv,
				}
			},
		},
	},
	MinCleanableDirtyRatio: {
		{
			minCleanableDirtyRatio, "0.9", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(resource.Quantity)
				return &infrav1beta1.KafkaTopicConfig{
					MinCleanableDirtyRatio: &vv,
				}
			},
		},
	},
	MinCompactionLagMs: {
		{
			int64(10000), "10000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					MinCompactionLagMs: &vv,
				}
			},
		},
	},
	MaxCompactionLagMs: {
		{
			int64(10000), "10000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					MaxCompactionLagMs: &vv,
				}
			},
		},
	},
	MinInsyncReplicas: {
		{
			int64(2), "2", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					MinInsyncReplicas: &vv,
				}
			},
		},
	},
	Preallocate: {
		{
			true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(bool)
				return &infrav1beta1.KafkaTopicConfig{
					Preallocate: &vv,
				}
			},
		},
	},
	RetentionBytes: {
		{
			int64(1000000), "1000000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					RetentionBytes: &vv,
				}
			},
		},
	},
	RetentionMs: {
		{
			int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					RetentionMs: &vv,
				}
			},
		},
	},
	SegmentBytes: {
		{
			int64(500000), "500000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					SegmentBytes: &vv,
				}
			},
		},
	},
	SegmentIndexBytes: {
		{
			int64(250000), "250000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					SegmentIndexBytes: &vv,
				}
			},
		},
	},
	SegmentJitterMs: {
		{
			int64(1000), "1000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					SegmentJitterMs: &vv,
				}
			},
		},
	},
	SegmentMs: {
		{
			int64(2000), "2000", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(int64)
				return &infrav1beta1.KafkaTopicConfig{
					SegmentMs: &vv,
				}
			},
		},
	},
	UncleanLeaderElectionEnable: {
		{
			true, "true", func(v interface{}) *infrav1beta1.KafkaTopicConfig {
				vv := v.(bool)
				return &infrav1beta1.KafkaTopicConfig{
					UncleanLeaderElectionEnable: &vv,
				}
			},
		},
	},
}

KafkaTopicConfigTestData

KafkaTopicConfigTestData holds testing data for each topic configuration option.

It is a map of slices, where each entry in the map represents one configuration option, and entries in slices are possible tests variations. To add a new test for existing configuration option, simply create new entry in slice for appropriate config option To add a completely new config option, add a new key with map (value is a slice of KafkaTopicConfigHolder) To change existing tests, simply change it :)

Functions

func TranslateKafkaTopicV1Beta1

func TranslateKafkaTopicV1Beta1(topic v1beta1.KafkaTopic) *kafka.Topic

NOTE: kafka internal config values are intentionally not put in api.v1beta1 package, in order not to couple our API with kafka internal values. If kafka internals change between versions, we can create new translations without touching our API

Types

type KafkaTopicConfigHolder

type KafkaTopicConfigHolder struct {
	// contains filtered or unexported fields
}

KafkaTopicConfigHolder

KafkaTopicConfigHolder holds testing data.

valueToSet : new value to set for config option in test expectedValue : value that is expected as result of the test createKafkaTopicObjectF : function that creates KafkaTopic k8s object from given config value input

type KafkaTopicReconciler

type KafkaTopicReconciler struct {
	client.Client
	Log         logr.Logger
	Scheme      *runtime.Scheme
	Recorder    record.EventRecorder
	KafkaClient kafka.KafkaClient
}

KafkaTopicReconciler reconciles a KafkaTopic object

func (*KafkaTopicReconciler) Reconcile

func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

func (*KafkaTopicReconciler) SetupWithManager

func (r *KafkaTopicReconciler) SetupWithManager(mgr ctrl.Manager, opts KafkaTopicReconcilerOptions) error

type KafkaTopicReconcilerOptions

type KafkaTopicReconcilerOptions struct {
	MaxConcurrentReconciles int
}

type TestingKafkaCluster

type TestingKafkaCluster struct {
	// contains filtered or unexported fields
}

func NewTestingKafkaCluster

func NewTestingKafkaCluster() (*TestingKafkaCluster, error)

func (*TestingKafkaCluster) GetKafkaHost

func (kc *TestingKafkaCluster) GetKafkaHost() (string, error)

func (*TestingKafkaCluster) IsAlive

func (kc *TestingKafkaCluster) IsAlive() (bool, error)

func (*TestingKafkaCluster) StartCluster

func (kc *TestingKafkaCluster) StartCluster() error

func (*TestingKafkaCluster) StopCluster

func (kc *TestingKafkaCluster) StopCluster() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL