bufit

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2021 License: MIT Imports: 4 Imported by: 0

README

Bufit

GoDoc Release Software License Build Status Coverage Status Go Report Card

Usage

A moving buffer which supports multiple concurrent readers.

This buffer shares a single in-memory buffer with multiple readers who can independently Read from the buffer.


import (
  "io"
  "sync"

  "github.com/djherbis/bufit"
)

func main(){
  // Start a new buffer
  buf := bufit.New()

  // Create two readers
  r1, r2 := buf.NextReader(), buf.NextReader()

  // Broadcast a message
  io.WriteString(buf, "Hello World\n")

  // Wait
  var grp sync.WaitGroup
  grp.Add(4)

  // Read fast
  go func() {
    defer grp.Done()
    io.Copy(os.Stdout, r1) // "Hello World\n"
  }()

  // Read slow
  go func() {
    defer grp.Done()
    io.CopyN(os.Stdout, r2, 5) // "Hello"
    <-time.After(time.Second)
    io.Copy(os.Stdout, r2) // "World\n"
  }()

  // Both readers will read the entire buffer! The slow reader
  // won't block the fast one from reading ahead either.

  // Late reader
  // Since this reader joins after all existing readers have Read "Hello"
  // "Hello" has already been cleared from the Buffer, this Reader will only see
  // "World\n" and beyond.
  go func() {
    defer grp.Done()
    <-time.After(500 * time.Millisecond)
    r3 := buf.NextReader()
    io.Copy(os.Stdout, r3) // "World\n"
  }()

  // Short Reader
  // **Important!** if your reader isn't going to read until the buffer is empty
  // you'll need to call Close() when you are done with it to tell the buffer
  // it's done reading data.
  go func() {
    defer grp.Done()
    r4 := buf.NextReader()
    io.CopyN(os.Stdout, r4, 5) // "Hello"
    r4.Close()                 // tell the buffer you're done reading
  }()

  // **Important!** mark close so that readers can ret. io.EOF
  buf.Close()

  grp.Wait()
}

Installation

go get github.com/djherbis/bufit

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is used to provide multiple readers with access to a shared buffer. Readers may join/leave at any time, however a joining reader will only see whats currently in the buffer onwards. Data is evicted from the buffer once all active readers have read that section.

Example
// Start a new buffer
buf := New()

// Create two readers
r1, r2 := buf.NextReader(), buf.NextReader()

// Broadcast a message
io.WriteString(buf, "Hello World\n")

// Wait
var grp sync.WaitGroup
grp.Add(4)

// Read fast
go func() {
	defer grp.Done()
	io.Copy(os.Stdout, r1) // "Hello World\n"
}()

// Read slow
go func() {
	defer grp.Done()
	<-time.After(100 * time.Millisecond)
	io.CopyN(os.Stdout, r2, 5) // "Hello"
	<-time.After(time.Second)
	io.Copy(os.Stdout, r2) // "World\n"
}()

// Both readers will read the entire buffer! The slow reader
// won't block the fast one from reading ahead either.

// Late reader
// Since this reader joins after all existing readers have Read "Hello"
// "Hello" has already been cleared from the Buffer, this Reader will only see
// "World\n" and beyond.
go func() {
	defer grp.Done()
	<-time.After(500 * time.Millisecond)
	r3 := buf.NextReader()
	io.Copy(os.Stdout, r3) // "World\n"
}()

// Short Reader
// **Important!** if your reader isn't going to read until the buffer is empty
// you'll need to call Close() when you are done with it to tell the buffer
// it's done reading data.
go func() {
	defer grp.Done()
	<-time.After(100 * time.Millisecond)
	r4 := buf.NextReader()
	io.CopyN(os.Stdout, r4, 5) // "Hello"
	r4.Close()                 // tell the buffer you're done reading
}()

// **Important!** mark close so that readers can ret. io.EOF
buf.Close()

grp.Wait()
Output:

Hello World
HelloHelloHello World
 World

func New

func New() *Buffer

New creates and returns a new Buffer

func NewBuffer

func NewBuffer(w Writer) *Buffer

NewBuffer creates and returns a new Buffer backed by the passed Writer

func NewCapped added in v1.1.0

func NewCapped(cap int) *Buffer

NewCapped creates a new in-memory Buffer whose Write() call blocks to prevent Len() from exceeding the passed capacity

func NewCappedBuffer added in v1.1.0

func NewCappedBuffer(w Writer, cap int) *Buffer

NewCappedBuffer creates a new Buffer whose Write() call blocks to prevent Len() from exceeding the passed capacity

func (*Buffer) Close

func (b *Buffer) Close() error

Close marks the buffer as complete. Readers will return io.EOF instead of blocking when they reach the end of the buffer.

func (*Buffer) Keep added in v1.2.0

func (b *Buffer) Keep(keep int)

Keep sets the minimum amount of bytes to keep in the buffer even if all other current readers have read those bytes. This allows new readers to join slightly behind. Keep is safe to call concurrently with other methods. Fewer than keep bytes may be in the buffer if less than keep bytes have been written since keep was set. If this buffer has a cap, it is invalid to call this method with a keep >= cap since the buffer would never be able to write new bytes once it reached the cap.

func (*Buffer) Len added in v1.1.0

func (b *Buffer) Len() int

Len returns the current size of the buffer. This is safe to call concurrently with all other methods.

func (*Buffer) NextReader

func (b *Buffer) NextReader() io.ReadCloser

NextReader returns a new io.ReadCloser for this shared buffer. Read/Close are safe to call concurrently with the buffers Write/Close methods. Read calls will block if the Buffer is not Closed and contains no data. Note that the returned reader sees all data that is currently in the buffer, data is only dropped out of the buffer once all active readers point to locations in the buffer after that section.

func (*Buffer) NextReaderFromNow added in v1.1.0

func (b *Buffer) NextReaderFromNow() io.ReadCloser

NextReaderFromNow returns a new io.ReadCloser for this shared buffer. Unlike NextReader(), this reader will only see writes which occur after this reader is returned even if there is other data in the buffer. In other words, this reader points to the end of the buffer.

func (*Buffer) NumReaders added in v1.1.0

func (b *Buffer) NumReaders() int

NumReaders returns the number of readers returned by NextReader() which have not called Reader.Close(). This method is safe to call concurrently with all methods.

func (*Buffer) OnLastReaderClose added in v1.1.0

func (b *Buffer) OnLastReaderClose(runOnLastClose func() error)

OnLastReaderClose registers the passed callback to be run after any call to Reader.Close() which drops the NumReaders() to 0. This method is safe to call concurrently with all other methods and Reader methods, however it's only guaranteed to be triggered if it completes before the Reader.Close call which would trigger it.

func (*Buffer) Write

func (b *Buffer) Write(p []byte) (int, error)

Write appends the given data to the buffer. All active readers will see this write.

type Reader

type Reader interface {

	// Len returns the unread # of bytes in this Reader
	Len() int

	// Discard drops the next n bytes from the Reader, as if it were Read()
	// it returns the # of bytes actually dropped. It may return io.EOF
	// if all remaining bytes have been discarded.
	Discard(int) (int, error)

	// Read bytes into the provided buffer.
	io.Reader
}

Reader provides an io.Reader whose methods MUST be concurrent-safe with the Write method of the Writer from which it was generated. It also MUST be safe for concurrent calls to Writer.Discard for bytes which have already been read by this Reader.

type Writer

type Writer interface {

	// Len returns the # of bytes buffered for Readers
	Len() int

	// Discard drops the next n buffered bytes. It returns the actual number of
	// bytes dropped and may return io.EOF if all remaining bytes have been
	// discarded. Discard must be concurrent-safe with methods calls
	// on generated Readers, when discarding bytes that have been read
	// by all Readers.
	Discard(int) (int, error)

	// NextReader returns a Reader which reads a "snapshot" of the current written bytes
	// (excluding discarded bytes). The Reader should work independently of the Writer
	// and be concurrent-safe with the Write method on the Writer.
	NextReader() Reader

	// Write writes the given bytes into the Writer's underlying buffer. Which will
	// be available for reading using NextReader() to grab a snapshot of the current
	// written bytes.
	io.Writer
}

Writer accepts bytes and generates Readers who consume those bytes. Generated Readers methods must be concurrent-safe with the Write method.

Example
buf := newWriter(make([]byte, 0, 10))
io.Copy(os.Stdout, buf)
io.Copy(os.Stdout, io.NewSectionReader(*&buf, 0, 100))

io.WriteString(buf, "Hello ")
r := io.NewSectionReader(*&buf, 0, int64(buf.Len()))
io.CopyN(os.Stdout, r, 5)
io.CopyN(os.Stdout, buf, 5)
io.WriteString(buf, "World")
r = io.NewSectionReader(*&buf, 0, int64(buf.Len()))
io.CopyN(os.Stdout, r, 6)

io.WriteString(buf, "abcdefg")
io.Copy(os.Stdout, buf)
io.Copy(os.Stdout, buf)

io.WriteString(buf, "Hello World")
r = io.NewSectionReader(*&buf, 0, int64(buf.Len()))
io.CopyN(os.Stdout, r, 5)
io.CopyN(os.Stdout, buf, 4)

io.WriteString(buf, "abcdefg")
io.Copy(os.Stdout, buf)
io.Copy(os.Stdout, buf)
Output:

HelloHello World WorldabcdefgHelloHello Worldabcdefg

func NewMemoryWriter added in v1.2.2

func NewMemoryWriter(p []byte) Writer

NewMemoryWriter returns a new Writer for use with NewBuffer that internally stores bytes in a []byte. The given []byte will be the initial buffer used. It's a good idea to preallocate the normal size of your buffer here. ex. NewMemoryWriter(make([]byte, 0, CAPACITY)) where CAPACITY is large enough for Keep() + the number of buffered Write chunks you expect to need to hold in the buffer at any given time. This saves swapping the internal buffer out for a larger one as capacity needs grow, this means less large locking copies of the internal buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL