bpool

package module
v0.0.0-...-1643bbf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2020 License: MIT Imports: 6 Imported by: 10

README

GoDoc Go Report Card

Buffer pool prevent from excess memory usage and CPU trashing.

Quick Start

To import bpool from source code use go get command.

go get -u github.com/unit-io/bpool

Usage

Use buffer pool for writing incoming requests to buffer such as Put or Batch operations or use buffer pool while writing data to log file (during commit operation). The objective of creating BufferPool library with capacity is to perform initial writes to buffer without backoff until buffer pool reaches its target size. Buffer pool does not discard any Get or Write requests but it add gradual delay to it to limit the memory usage that can used for other operations such writing to log or db sync operations.

Detailed API documentation is available using the godoc.org service.

Make use of the client by importing it in your Go client source code. For example,

import "github.com/unit-io/bpool"

Following code snippet if executed without buffer capacity will consume all system memory and will cause a panic.

	buf := bytes.NewBuffer(make([]byte, 0, 2))

	defer func() {
		if r := recover(); r != nil {
			fmt.Println("panics from blast")
		}
	}()

	for {
		_, err := buf.Write([]byte("create blast"))
		if err != nil {
			fmt.Println(err.Error())
			return
		}
	}

Code snippet to use BufferPool with capacity will limit usage of system memory by adding gradual delay to the requests and will not cause a panic.

  	pool := bpool.NewBufferPool(1<<20, &bpool.Options{MaxElapsedTime: 1 * time.Minute, WriteBackOff: true}) // creates BufferPool of 16MB target size
	buf := pool.Get()
	defer	pool.Put(buf)
	
	for {
		_, err := buf.Write([]byte("create blast"))
		if err != nil {
			fmt.Println(err.Error())
			return
		}
	}

New Buffer Pool

Use bpool.NewBufferPool() method and pass BufferSize parameter to create new buffer pool.

	const (
		BufferSize = 1<<30 // (1GB size)
	)

	pool := bpool.NewBufferPool(BufferSize, nil)

Get Buffer

To get buffer from buffer pool use BufferPool.Get(). When buffer pool reaches its capacity Get method runs with gradual delay to limit system memory usage.

	....
	var buffer *bpool.Buffer
	buffer = pool.Get()

Writing to Buffer

To write to buffer use Buffer.Write() method.

	var scratch [8]byte
	binary.LittleEndian.PutUint64(scratch[0:8], uint64(buffer.Size()))

	b.buffer.Write(scratch[:])
	....

Reading from Buffer

To read buffer use Buffer.Bytes() method. This operation returns underline data slice stored into buffer.

	data := buffer.Bytes()
	...

Put Buffer to Pool

To put buffer to the pool when finished using buffer use BufferPool.Put() method, this operation resets the underline slice. It also resets the buffer pool interval that was used to delay the Get operation if capacity is below the target size.

	pool.Put(buffer)
	...

To reset the underline slice stored to the buffer and continue using the buffer use Buffer.Reset() method instead of using BufferPool.Put() operation.

	buffer.Reset()
	....

Contributing

If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are welcome.

Licensing

This project is licensed under MIT License.

Documentation

Index

Constants

View Source
const (

	// DefaultInitialInterval duration for waiting in the queue due to system memory surge operations
	DefaultInitialInterval = 500 * time.Millisecond
	// DefaultRandomizationFactor sets factor to backoff when buffer pool reaches target size
	DefaultRandomizationFactor = 0.5
	// DefaultMaxElapsedTime sets maximum elapsed time to wait during backoff
	DefaultMaxElapsedTime   = 15 * time.Second
	DefaultBackoffThreshold = 0.7
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	sync.RWMutex // Read Write mutex, guards access to internal buffer.
	// contains filtered or unexported fields
}

Buffer managed buffer by BufferPool for optimum memory usage

func (*Buffer) Bytes

func (buf *Buffer) Bytes() []byte

Bytes gets data from internal buffer

func (*Buffer) Extend

func (buf *Buffer) Extend(size int64) (int64, error)

Extend allocates size to the buffer. Must extend buffer before calling buffer.Internal() method

func (*Buffer) Internal

func (buf *Buffer) Internal() []byte

Internal returns underline internal buffer. This method is useful when you need an underline byte slice of a buffer for example while reading chunk from os.File i.e file.ReatAt() method. It is not safe to call buffer.Write() method once you get underline byte slice. You need to perform an external locking if calling buffer.Internal() and buffer.Write() methods.

func (*Buffer) Read

func (buf *Buffer) Read(p []byte) (int, error)

Read reads specific number of bytes from internal buffer

func (*Buffer) ReadAt

func (buf *Buffer) ReadAt(p []byte, off int64) (int, error)

ReadAt read byte of size at the given offset from internal buffer

func (*Buffer) Reset

func (buf *Buffer) Reset() (ok bool)

Reset resets the buffer

func (*Buffer) Size

func (buf *Buffer) Size() int64

Size internal buffer size

func (*Buffer) Slice

func (buf *Buffer) Slice(start int64, end int64) ([]byte, error)

Slice provide the data for start and end offset

func (*Buffer) Write

func (buf *Buffer) Write(p []byte) (int, error)

Write writes to the buffer

func (*Buffer) WriteAt

func (buf *Buffer) WriteAt(p []byte, off int64) (int, error)

WriteAt writes to the buffer at the given offset

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool represents the thread safe buffer pool. All BufferPool methods are safe for concurrent use by multiple goroutines.

func NewBufferPool

func NewBufferPool(size int64, opts *Options) *BufferPool

NewBufferPool creates a new buffer pool.

func (*BufferPool) Backoff

func (pool *BufferPool) Backoff()

Backoff backs off buffer pool if currentInterval is greater than Backoff threshold.

func (*BufferPool) Capacity

func (pool *BufferPool) Capacity() float64

Capacity return the buffer pool capacity in proportion to target size.

func (*BufferPool) Done

func (pool *BufferPool) Done()

Done closes the buffer pool and stops the drain goroutine.

func (*BufferPool) Get

func (pool *BufferPool) Get() (buf *Buffer)

Get returns buffer if any in the pool or creates a new buffer

func (*BufferPool) NewBuffer

func (pool *BufferPool) NewBuffer(buf []byte) *Buffer

NewBuffer returns buffer and initializes it using buf as its initial content.

func (*BufferPool) Put

func (pool *BufferPool) Put(buf *Buffer)

Put resets the buffer and put it to the pool

type Capacity

type Capacity struct {
	sync.RWMutex

	InitialInterval     time.Duration
	RandomizationFactor float64

	MaxElapsedTime time.Duration

	WriteBackOff bool
	// contains filtered or unexported fields
}

Capacity manages the BufferPool capacity to limit excess memory usage.

func (*Capacity) NewTicker

func (cap *Capacity) NewTicker() *time.Timer

NewTicker creates or get ticker from timer pool. It uses backoff duration of the pool for the timer.

func (*Capacity) NextBackOff

func (cap *Capacity) NextBackOff(multiplier float64) time.Duration

NextBackOff calculates the next backoff interval using the formula:

Randomized interval = RetryInterval * (1 ± RandomizationFactor)

func (*Capacity) Reset

func (cap *Capacity) Reset()

Reset the interval back to the initial interval. Reset must be called before using pool.

type Options

type Options struct {
	// Maximum concurrent buffer can get from pool
	MaxPoolSize int

	// The duration for waiting in the queue if buffer pool reaches its target size
	InitialInterval time.Duration

	// RandomizationFactor sets factor to backoff when buffer pool reaches target size
	RandomizationFactor float64

	// MaxElapsedTime sets maximum elapsed time to wait during backoff
	MaxElapsedTime time.Duration

	// WriteBackOff to turn on Backoff for buffer writes
	WriteBackOff bool
}

Options holds the optional BufferPool parameters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL