bufpipe

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2021 License: BSD-3-Clause Imports: 2 Imported by: 0

Documentation

Overview

Package bufpipe implements a buffered pipe.

Index

Examples

Constants

View Source
const (
	Ring   = 1 << iota // Ring buffer vs. linear buffer
	Dual               // Dual access IO vs. mono access IO
	BlockI             // Blocking input vs. polling input
	BlockO             // Blocking output vs. polling output

	// The below flags are the inverse of the ones above. They exist to make it
	// obvious what the inverse is.
	Line  = 0 // Inverse of Ring
	Mono  = 0 // Inverse of Dual
	PollI = 0 // Inverse of BlockI
	PollO = 0 // Inverse of BlockO
)

There are a number of modes of operation that BufferPipe can operate in.

As such, there are 4 different (and mostly orthogonal) flags that can be bitwise ORed together to create the mode of operation. Thus, with 4 flags, there are technically 16 different possible combinations (although, some of them are illogical). All 16 combinations are allowed, even if no sensible programmer should be using them.

The first flag determines the buffer's structure (linear vs. ring). In linear mode, a writer can only write up to the internal buffer length's worth of data. On the other hand, in ring mode, the buffer is treated like a circular buffer and allow indefinite reading and writing.

The second flag determines concurrent access to the pipe (mono vs. dual). In mono access mode, the writer has sole access to the pipe. Only after the Close method is called can a reader start reading data. In dual access mode, readers can read written data the moment it is ready.

The third and fourth flag determines waiting protocol for reading and writing (polling vs. blocking). In blocking mode, a writer or reader will block until there is available buffer space or valid data to consume. In polling mode, read and write operations return immediately, possibly with an ErrShortWrite or ErrNoProgress error.

Combining Ring with Mono is an illogical combination. Mono access dictates that no reader can drain the pipe until it is closed. However, the benefit of a Ring buffer is that it can circularly write data as a reader consumes the input. Ring with Mono is effectively Line with Mono.

Combining Line with BlockI is an illogical combination. In Line mode, the amount that can be written is fixed and independent of how much is read. Thus, using BlockI in this case will cause the writer to block forever when the buffer is full.

With all illogical combinations removed, there are only 8 logical combinations that programmers should use.

View Source
const (
	LineMono  = Line | Mono | PollI | BlockO
	LineDual  = Line | Dual | PollI | BlockO
	RingPoll  = Ring | Dual | PollI | PollO
	RingBlock = Ring | Dual | BlockI | BlockO
)

The most common combination of flags are predefined with convenient aliases.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferPipe

type BufferPipe struct {
	// contains filtered or unexported fields
}
Example (LineDual)

In LineDual mode, the consumer sees produced data immediately as it becomes available. The producer is only allowed to write as much data as the size of the underlying buffer. The amount that can be written is independent of the operation of the consumer.

package main

import (
	"fmt"
	"github.com/dsnet/golib/bufpipe"
	"io"
	"math/rand"
	"sync"
	"time"
)

func randomChars(cnt int, rand *rand.Rand) string {
	data := make([]byte, cnt)
	for idx := range data {
		char := byte(rand.Intn(10 + 26 + 26))
		if char < 10 {
			data[idx] = '0' + char
		} else if char < 10+26 {
			data[idx] = 'A' + char - 10
		} else {
			data[idx] = 'a' + char - 36
		}
	}
	return string(data)
}

func main() {
	// The buffer is small enough such that the producer does hit the limit.
	buffer := bufpipe.NewBufferPipe(make([]byte, 256), bufpipe.LineDual)

	rand := rand.New(rand.NewSource(0))
	group := new(sync.WaitGroup)
	group.Add(2)

	// Producer routine.
	go func() {
		defer group.Done()
		defer buffer.Close()

		buffer.Write([]byte("#### ")) // Write a fake header
		for idx := 0; idx < 8; idx++ {
			data := randomChars(rand.Intn(64), rand) + "\n"

			// So long as the amount of data written has not exceeded the size
			// of the buffer, Write will never fail.
			if _, err := buffer.Write([]byte(data)); err == io.ErrShortWrite {
				break
			}

			time.Sleep(100 * time.Millisecond)
		}
	}()

	// Consumer routine.
	go func() {
		defer group.Done()
		for {
			// Reading can be also done using ReadSlices and ReadMark pairs.
			data, _, err := buffer.ReadSlices()
			if err == io.EOF {
				break
			} else if err != nil {
				panic(err)
			}
			buffer.ReadMark(len(data))
			fmt.Print(string(data))
		}
		fmt.Println()
	}()

	group.Wait()

}
Output:

#### kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck
whxMYR1k
zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J
fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin
2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7
ecE4dY3ZaTrX03xBY
Example (LineMono)

In LineMono mode, the consumer cannot see the written data until the pipe is closed. Thus, it is possible for the producer to go back to the front of the pipe and record the total number of bytes written out. This functionality is useful in cases where a file format's header contains information that is dependent on what is eventually written.

package main

import (
	"fmt"
	"github.com/dsnet/golib/bufpipe"
	"io"
	"math/rand"
	"sync"
	"time"
)

func randomChars(cnt int, rand *rand.Rand) string {
	data := make([]byte, cnt)
	for idx := range data {
		char := byte(rand.Intn(10 + 26 + 26))
		if char < 10 {
			data[idx] = '0' + char
		} else if char < 10+26 {
			data[idx] = 'A' + char - 10
		} else {
			data[idx] = 'a' + char - 36
		}
	}
	return string(data)
}

func main() {
	// The buffer is small enough such that the producer does hit the limit.
	buffer := bufpipe.NewBufferPipe(make([]byte, 256), bufpipe.LineMono)

	rand := rand.New(rand.NewSource(0))
	group := new(sync.WaitGroup)
	group.Add(2)

	// Producer routine.
	go func() {
		defer group.Done()
		defer buffer.Close()

		// In LineMono mode only, it is safe to store a reference to written
		// data and modify later.
		header, _, err := buffer.WriteSlices()
		if err != nil {
			panic(err)
		}

		totalCnt, _ := buffer.Write([]byte("#### "))
		for idx := 0; idx < 8; idx++ {
			data := randomChars(rand.Intn(64), rand) + "\n"

			// So long as the amount of data written has not exceeded the size
			// of the buffer, Write will never fail.
			cnt, err := buffer.Write([]byte(data))
			totalCnt += cnt
			if err == io.ErrShortWrite {
				break
			}

			time.Sleep(100 * time.Millisecond)
		}

		// Write the header afterwards
		copy(header[:4], fmt.Sprintf("%04d", totalCnt))
	}()

	// Consumer routine.
	go func() {
		defer group.Done()

		// In LineMono mode only, a call to ReadSlices is guaranteed to block
		// until the channel is closed. All written data will be made available.
		data, _, _ := buffer.ReadSlices()
		buffer.ReadMark(len(data)) // Technically, this is optional

		fmt.Println(string(data))
	}()

	group.Wait()

}
Output:

0256 kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck
whxMYR1k
zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J
fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin
2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7
ecE4dY3ZaTrX03xBY
Example (RingBlock)

In RingBlock mode, the consumer sees produced data immediately as it becomes available. The producer is allowed to write as much data as it wants so long as the consumer continues to read the data in the pipe.

package main

import (
	"fmt"
	"github.com/dsnet/golib/bufpipe"
	"io"
	"math/rand"
	"sync"
	"time"
)

func randomChars(cnt int, rand *rand.Rand) string {
	data := make([]byte, cnt)
	for idx := range data {
		char := byte(rand.Intn(10 + 26 + 26))
		if char < 10 {
			data[idx] = '0' + char
		} else if char < 10+26 {
			data[idx] = 'A' + char - 10
		} else {
			data[idx] = 'a' + char - 36
		}
	}
	return string(data)
}

func main() {
	// Intentionally small buffer to show that data written into the buffer
	// can exceed the size of the buffer itself.
	buffer := bufpipe.NewBufferPipe(make([]byte, 64), bufpipe.RingBlock)

	rand := rand.New(rand.NewSource(0))
	group := new(sync.WaitGroup)
	group.Add(2)

	// Producer routine.
	go func() {
		defer group.Done()
		defer buffer.Close()

		buffer.Write([]byte("#### ")) // Write a fake header
		for idx := 0; idx < 8; idx++ {
			data := randomChars(rand.Intn(64), rand) + "\n"

			// So long as the amount of data written has not exceeded the size
			// of the buffer, Write will never fail.
			buffer.Write([]byte(data))

			time.Sleep(100 * time.Millisecond)
		}
	}()

	// Consumer routine.
	go func() {
		defer group.Done()

		data := make([]byte, 64)
		for {
			// Reading can also be done using the Read method.
			cnt, err := buffer.Read(data)
			fmt.Print(string(data[:cnt]))
			if err == io.EOF {
				break
			}
		}
		fmt.Println()
	}()

	group.Wait()

}
Output:

#### kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck
whxMYR1k
zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J
fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin
2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7
ecE4dY3ZaTrX03xBYJ04OzomME36yth76CFmg2zTolzKhYByvZ8
FQMuYbcWHLcUu4yL3aBZkwJrbDFUcHpGnBGfbDq4aFlLS5vGOm6mYOjHZll
iP0QQKpKp3cz

func NewBufferPipe

func NewBufferPipe(buf []byte, mode int) *BufferPipe

BufferPipe is similar in operation to io.Pipe and is intended to be the communication channel between producer and consumer routines. There are some specific use cases for BufferPipes over io.Pipe.

First, in cases where a writer may need to go back a modify a portion of the past "written" data before allowing the reader to consume it. Second, for performance applications, where the cost of copying of data is noticeable. Thus, it would be more efficient to read/write data from/to the internal buffer directly.

See the defined constants for more on the buffer mode of operation.

func (*BufferPipe) Buffer

func (b *BufferPipe) Buffer() []byte

The entire internal buffer. Be careful when touching the raw buffer. Line buffers are always guaranteed to be aligned to be front of the slice. Ring buffers use wrap around logic and could be physically split apart.

func (*BufferPipe) Capacity

func (b *BufferPipe) Capacity() int

The total number of bytes the buffer can store.

func (*BufferPipe) Close

func (b *BufferPipe) Close() error

Close the buffer down.

All write operations have no effect after this, while all read operations will drain remaining data in the buffer. This operation is somewhat similar to how Go channels operate.

Writers should close the buffer to indicate to readers to mark end-of-stream.

Readers should only close the buffer in the event of unexpected termination. The mechanism allows readers to inform writers of consumer termination and prevents the producer from potentially being blocked forever.

func (*BufferPipe) CloseWithError

func (b *BufferPipe) CloseWithError(err error) (errPre error)

Closes the pipe with the given error. This sets the error value for the pipe and returns the previous error value.

func (*BufferPipe) Length

func (b *BufferPipe) Length() int

The number of valid bytes that can be read.

func (*BufferPipe) Mode

func (b *BufferPipe) Mode() int

The BufferPipe mode of operation.

func (*BufferPipe) Pointers

func (b *BufferPipe) Pointers() (rdPtr, wrPtr int64)

The internal pointer values.

func (*BufferPipe) Read

func (b *BufferPipe) Read(data []byte) (cnt int, err error)

Read data from the buffer.

In all modes, the length of the data slice may exceed the capacity of the buffer. The operation will block until all data has been read or until the EOF is hit.

Under Block mode, this method may block forever if there is no producer.

func (*BufferPipe) ReadFrom

func (b *BufferPipe) ReadFrom(rd io.Reader) (cnt int64, err error)

Continually read the contents of the reader and write them to the pipe.

func (*BufferPipe) ReadMark

func (b *BufferPipe) ReadMark(cnt int)

Advances the read pointer.

The amount that can be advanced must be non-negative and be less than the length of the slices returned by the previous ReadSlices. Calls to Read may not be done between these two calls. Also, another call to ReadMark is invalid until ReadSlices has been called again.

If ReadMark is being used, only one reader routine is allowed.

func (*BufferPipe) ReadSlices

func (b *BufferPipe) ReadSlices() (bufLo, bufHi []byte, err error)

Slices of valid data that can be read. This does not advance the internal read pointer. All of the valid readable data is the logical concatenation of the two slices.

In linear buffers, the first slice obtained is guaranteed to be the entire valid readable buffer slice.

In ring buffers, the first slice obtained may not represent all of the valid buffer data since slices always represent a contiguous pieces of memory. However, the first slice is guaranteed to be a non-empty slice if space is available.

Under the Block mode, this method blocks until there is at least some valid data to read. The Mono mode is special in that, none of the data is considered ready for reading until the writer closes the channel.

func (*BufferPipe) Reset

func (b *BufferPipe) Reset()

Makes the buffer ready for use again by opening the pipe for writing again. The read and write pointers will be reset to zero and errors will be cleared.

func (*BufferPipe) Rollback

func (b *BufferPipe) Rollback() int

Roll back the write pointer and return the number of bytes rolled back. If successful, this effectively makes the valid length zero. In order to prevent race conditions with the reader, this action is only valid in Mono access mode before the channel is closed.

func (*BufferPipe) Write

func (b *BufferPipe) Write(data []byte) (cnt int, err error)

Write data to the buffer.

In linear buffers, the length of the data slice may not exceed the capacity of the buffer. Otherwise, an ErrShortWrite error will be returned.

In ring buffers, the length of the data slice may exceed the capacity.

Under Block mode, this operation will block until all data has been written. If there is no consumer of the data, then this method may block forever.

func (*BufferPipe) WriteMark

func (b *BufferPipe) WriteMark(cnt int)

Advances the write pointer.

The amount that can be advanced must be non-negative and be less than the length of the slices returned by the previous WriteSlices. Calls to Write may not be done between these two calls. Also, another call to WriteMark is invalid until WriteSlices has been called again.

If WriteMark is being used, only one writer routine is allowed.

func (*BufferPipe) WriteSlices

func (b *BufferPipe) WriteSlices() (bufLo, bufHi []byte, err error)

Slices of available buffer that can be written to. This does not advance the internal write pointer. All of the available write space is the logical concatenation of the two slices.

In linear buffers, the first slice obtained is guaranteed to be the entire available writable buffer slice.

In LineMono mode only, slices obtained may still be modified even after WriteMark has been called and before Close. This is valid because this mode blocks readers until the buffer has been closed.

In ring buffers, the first slice obtained may not represent all of the available buffer space since slices always represent a contiguous pieces of memory. However, the first slice is guaranteed to be a non-empty slice if space is available.

In the Block mode, this method blocks until there is available space in the buffer to write. Poll mode, on the contrary, will return empty slices if the buffer is full.

func (*BufferPipe) WriteTo

func (b *BufferPipe) WriteTo(wr io.Writer) (cnt int64, err error)

Continually read the contents of the pipe and write them to the writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL