batching

package module
v0.0.0-...-c21ea48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 1 Imported by: 7

README

Batching

GoDoc

If you have any questions, or want to get attention for a PR or issue please reach out on the #logging-and-metrics channel in the cloudfoundry slack

batching implements a generic Batcher. This batcher uses interface{} and should not be used directly. It should be specialized by creating a type that embeds Batcher but accepts concrete types. See ByteBatcher for an example of specializing the Batcher. Also, see example_test.go for an example of how to use a specialized batcher.

Documentation

Overview

Package batching provides mechanisms for batching writes of various types. A batcher's methods should be invoked from a single goroutine. It is the responsibility of the caller to invoke Flush on the batcher frequently to flush the current batch out to the writer.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher will accept messages and invoke the Writer when the batch requirements have been fulfilled (either batch size or interval have been exceeded). Batcher should be created with NewBatcher().

func NewBatcher

func NewBatcher(size int, interval time.Duration, writer Writer) *Batcher

NewBatcher creates a new Batcher. It is recommenended to use a wrapper type such as NewByteBatcher or NewV2EnvelopeBatcher vs using this directly.

func (*Batcher) Flush

func (b *Batcher) Flush()

Flush will write a partial batch if there is data and the interval has lapsed. Otherwise it is a NOP. This method should be called freqently to make sure batches do not stick around for long periods of time. As a result it would be a bad idea to call Flush after an operation that might block for an un-specified amount of time. NOTE: Flush is *not* thread safe and should be called by the same goroutine that calls Write.

func (*Batcher) ForcedFlush

func (b *Batcher) ForcedFlush()

ForcedFlush bypasses the batch interval and batch size checks and writes immediately.

func (*Batcher) Write

func (b *Batcher) Write(data interface{})

Write stores data to the batch. It will not submit the batch to the writer until either the batch has been filled, or the interval has lapsed. NOTE: Write is *not* thread safe and should be called by the same goroutine that calls Flush.

type ByteBatcher

type ByteBatcher struct {
	*Batcher
}

ByteBatcher batches slices of bytes.

Example
writer := batching.ByteWriterFunc(func(batch [][]byte) {
	for _, data := range batch {
		fmt.Printf("%s\n", data)
	}
})
batcher := batching.NewByteBatcher(100, time.Nanosecond, writer)

dataSource := make(chan []byte)
done := make(chan struct{})
go func() {
	defer close(done)
	for i := 0; i < 3; i++ {
		dataSource <- []byte(fmt.Sprintf("data %d", i))
	}
}()

for {
	// Do a non-blocking read from a data source.
	select {
	case data := <-dataSource:
		// If read succeeds write it out. This will flush if the batch
		// exceeds the batch size.
		batcher.Write(data)
	case <-done:
		return
	default:
		// If read fails make sure to call Flush to ensure data doesn't
		// get stuck in the batch for long periods of time.
		batcher.Flush()
	}
}
Output:

data 0
data 1
data 2

func NewByteBatcher

func NewByteBatcher(size int, interval time.Duration, writer ByteWriter) *ByteBatcher

NewByteBatcher creates a new ByteBatcher.

func (*ByteBatcher) Write

func (b *ByteBatcher) Write(data []byte)

Write stores data to the batch. It will not submit the batch to the writer until either the batch has been filled, or the interval has lapsed. NOTE: Write is *not* thread safe and should be called by the same goroutine that calls Flush.

type ByteWriter

type ByteWriter interface {
	// Write submits the batch.
	Write(batch [][]byte)
}

ByteWriter is used to submit the completed batch of slices of bytes. The batch may be partial if the interval lapsed instead of filling the batch.

type ByteWriterFunc

type ByteWriterFunc func(batch [][]byte)

ByteWriterFunc is an adapter to allow ordinary functions to be a ByteWriter.

func (ByteWriterFunc) Write

func (f ByteWriterFunc) Write(batch [][]byte)

Write implements ByteWriter.

type Writer

type Writer interface {
	// Write submits the batch.
	Write(batch []interface{})
}

Writer is used to submit the completed batch. The batch may be partial if the interval lapsed instead of filling the batch.

type WriterFunc

type WriterFunc func(batch []interface{})

WriterFunc is an adapter to allow ordinary functions to be a Writer.

func (WriterFunc) Write

func (f WriterFunc) Write(batch []interface{})

Write implements Writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL