compaction

package
v0.0.0-...-7055b2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = Config{
	MinLevel:        core.LevelStreaming,
	MaxLevel:        core.LevelStreaming,
	MinSegmentAge:   time.Hour,
	MinSegmentCount: 10,
	MinSegmentSize:  1 * 1024 * 1024 * 1024,
	MaxSegmentCount: 10000,
	MaxSegmentSize:  4 * 1024 * 1024 * 1024,
	BatchSize:       10000,
	Delete:          true,
}

DefaultConfig is the default compaction configuration.

View Source
var (

	// ErrSkipped indicates that compaction was skipped.
	ErrSkipped = errors.New("compaction skipped")
)

Functions

This section is empty.

Types

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor encapsulates segment compaction logic.

func NewCompactor

func NewCompactor(store core.SegmentStore) (*Compactor, error)

NewCompactor creates a new Compactor instance

func (*Compactor) CompactPartition

func (r *Compactor) CompactPartition(ctx context.Context, region, topic string, partition uint32, config Config) error

CompactPartition performs the segment compaction for a single partition.

type Config

type Config struct {
	// Minimum level of segments to include in compaction.
	MinLevel uint32

	// Maximum level of segments to include in compaction.
	MaxLevel uint32

	// Minimum age a segment must be in order to be considered for compaction.
	//
	// A higher value increases the chance segment was replicated to all
	// destinations and combats issues related to S3 eventual consistency model.
	MinSegmentAge time.Duration `required:"false" min:"0ms"`

	// Minimum number of segments required for compaction to run.
	MinSegmentCount int `min:"2"`

	// Minimum byte size of segments required for compaction to run.
	MinSegmentSize uint64 `min:"1"`

	// Maximum number of segments compacted in one run.
	MaxSegmentCount int `min:"2"`

	// Maximum byte size of segments compacted in one run.
	MaxSegmentSize uint64 `min:"1"`

	// Number of segment messages to read/write in each request
	//
	// A higher value usually results in better throughput.
	BatchSize int `min:"1"`

	// Allows to disable the removal of compacted segments.
	//
	// In normal operation, it does not make sense to keep around the compacted
	// segments, but it can be useful during troubleshooting.
	Delete bool
}

Config represents the compaction configuration.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller represents the compaction controller.

func New

func New(config ControllerConfig) (*Controller, error)

New returns a new compaction controller instance

func (*Controller) Compact

func (c *Controller) Compact()

Compact starts the compaction for current assigned topic partitions that are not already running or scheduled to run.

func (*Controller) Start

func (c *Controller) Start() error

Start will start the controller

func (*Controller) Stop

func (c *Controller) Stop()

Stop will stop the controller

type ControllerConfig

type ControllerConfig struct {
	// Consumer is used to provide group membership functionality used to distribute
	// work across multiple instances. It ensures that only one instance is allowed to
	// process a certain source topic partition at any given moment.
	Consumer core.Factory `required:"true"`

	// SegmentStore provides access to segment contents.
	SegmentStore core.Factory `required:"true"`

	// Unique name that identifies the local region/data center/cloud.
	//
	// Field value is required.
	LocalRegion string `required:"true"`

	// Source Kafka topic names that will be compacted.
	//
	// Will use DefaultConfig if topic config was not set.
	//
	// Field value is required.
	Topics map[string]*Config `required:"true"`

	// Cron expression that determines compaction schedule.
	//
	// If not set, automatic compaction will not be executed and
	// is required to call Compact method to trigger the operation.
	CronSchedule string

	// Time zone location used for cron schedule
	//
	// Default is the system's local time zone.
	CronLocation *time.Location

	// Maximum number of compactions running simultaneously.
	Parallelism int `min:"1"`
}

ControllerConfig represents the compaction controller configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL