Documentation ¶
Overview ¶
Package bufpipe implements a buffered pipe.
Index ¶
- Constants
- type BufferPipe
- func (b *BufferPipe) Buffer() []byte
- func (b *BufferPipe) Capacity() int
- func (b *BufferPipe) Close() error
- func (b *BufferPipe) CloseWithError(err error) (errPre error)
- func (b *BufferPipe) Length() int
- func (b *BufferPipe) Mode() int
- func (b *BufferPipe) Pointers() (rdPtr, wrPtr int64)
- func (b *BufferPipe) Read(data []byte) (cnt int, err error)
- func (b *BufferPipe) ReadFrom(rd io.Reader) (cnt int64, err error)
- func (b *BufferPipe) ReadMark(cnt int)
- func (b *BufferPipe) ReadSlices() (bufLo, bufHi []byte, err error)
- func (b *BufferPipe) Reset()
- func (b *BufferPipe) Rollback() int
- func (b *BufferPipe) Write(data []byte) (cnt int, err error)
- func (b *BufferPipe) WriteMark(cnt int)
- func (b *BufferPipe) WriteSlices() (bufLo, bufHi []byte, err error)
- func (b *BufferPipe) WriteTo(wr io.Writer) (cnt int64, err error)
Examples ¶
Constants ¶
const ( Ring = 1 << iota // Ring buffer vs. linear buffer Dual // Dual access IO vs. mono access IO BlockI // Blocking input vs. polling input BlockO // Blocking output vs. polling output // The below flags are the inverse of the ones above. They exist to make it // obvious what the inverse is. Line = 0 // Inverse of Ring Mono = 0 // Inverse of Dual PollI = 0 // Inverse of BlockI PollO = 0 // Inverse of BlockO )
There are a number of modes of operation that BufferPipe can operate in.
As such, there are 4 different (and mostly orthogonal) flags that can be bitwise ORed together to create the mode of operation. Thus, with 4 flags, there are technically 16 different possible combinations (although, some of them are illogical). All 16 combinations are allowed, even if no sensible programmer should be using them.
The first flag determines the buffer's structure (linear vs. ring). In linear mode, a writer can only write up to the internal buffer length's worth of data. On the other hand, in ring mode, the buffer is treated like a circular buffer and allow indefinite reading and writing.
The second flag determines concurrent access to the pipe (mono vs. dual). In mono access mode, the writer has sole access to the pipe. Only after the Close method is called can a reader start reading data. In dual access mode, readers can read written data the moment it is ready.
The third and fourth flag determines waiting protocol for reading and writing (polling vs. blocking). In blocking mode, a writer or reader will block until there is available buffer space or valid data to consume. In polling mode, read and write operations return immediately, possibly with an ErrShortWrite or ErrNoProgress error.
Combining Ring with Mono is an illogical combination. Mono access dictates that no reader can drain the pipe until it is closed. However, the benefit of a Ring buffer is that it can circularly write data as a reader consumes the input. Ring with Mono is effectively Line with Mono.
Combining Line with BlockI is an illogical combination. In Line mode, the amount that can be written is fixed and independent of how much is read. Thus, using BlockI in this case will cause the writer to block forever when the buffer is full.
With all illogical combinations removed, there are only 8 logical combinations that programmers should use.
const ( LineMono = Line | Mono | PollI | BlockO LineDual = Line | Dual | PollI | BlockO RingPoll = Ring | Dual | PollI | PollO RingBlock = Ring | Dual | BlockI | BlockO )
The most common combination of flags are predefined with convenient aliases.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferPipe ¶
type BufferPipe struct {
// contains filtered or unexported fields
}
Example (LineDual) ¶
In LineDual mode, the consumer sees produced data immediately as it becomes available. The producer is only allowed to write as much data as the size of the underlying buffer. The amount that can be written is independent of the operation of the consumer.
package main import ( "fmt" "github.com/dsnet/golib/bufpipe" "io" "math/rand" "sync" "time" ) func randomChars(cnt int, rand *rand.Rand) string { data := make([]byte, cnt) for idx := range data { char := byte(rand.Intn(10 + 26 + 26)) if char < 10 { data[idx] = '0' + char } else if char < 10+26 { data[idx] = 'A' + char - 10 } else { data[idx] = 'a' + char - 36 } } return string(data) } func main() { // The buffer is small enough such that the producer does hit the limit. buffer := bufpipe.NewBufferPipe(make([]byte, 256), bufpipe.LineDual) rand := rand.New(rand.NewSource(0)) group := new(sync.WaitGroup) group.Add(2) // Producer routine. go func() { defer group.Done() defer buffer.Close() buffer.Write([]byte("#### ")) // Write a fake header for idx := 0; idx < 8; idx++ { data := randomChars(rand.Intn(64), rand) + "\n" // So long as the amount of data written has not exceeded the size // of the buffer, Write will never fail. if _, err := buffer.Write([]byte(data)); err == io.ErrShortWrite { break } time.Sleep(100 * time.Millisecond) } }() // Consumer routine. go func() { defer group.Done() for { // Reading can be also done using ReadSlices and ReadMark pairs. data, _, err := buffer.ReadSlices() if err == io.EOF { break } else if err != nil { panic(err) } buffer.ReadMark(len(data)) fmt.Print(string(data)) } fmt.Println() }() group.Wait() }
Output: #### kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck whxMYR1k zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin 2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7 ecE4dY3ZaTrX03xBY
Example (LineMono) ¶
In LineMono mode, the consumer cannot see the written data until the pipe is closed. Thus, it is possible for the producer to go back to the front of the pipe and record the total number of bytes written out. This functionality is useful in cases where a file format's header contains information that is dependent on what is eventually written.
package main import ( "fmt" "github.com/dsnet/golib/bufpipe" "io" "math/rand" "sync" "time" ) func randomChars(cnt int, rand *rand.Rand) string { data := make([]byte, cnt) for idx := range data { char := byte(rand.Intn(10 + 26 + 26)) if char < 10 { data[idx] = '0' + char } else if char < 10+26 { data[idx] = 'A' + char - 10 } else { data[idx] = 'a' + char - 36 } } return string(data) } func main() { // The buffer is small enough such that the producer does hit the limit. buffer := bufpipe.NewBufferPipe(make([]byte, 256), bufpipe.LineMono) rand := rand.New(rand.NewSource(0)) group := new(sync.WaitGroup) group.Add(2) // Producer routine. go func() { defer group.Done() defer buffer.Close() // In LineMono mode only, it is safe to store a reference to written // data and modify later. header, _, err := buffer.WriteSlices() if err != nil { panic(err) } totalCnt, _ := buffer.Write([]byte("#### ")) for idx := 0; idx < 8; idx++ { data := randomChars(rand.Intn(64), rand) + "\n" // So long as the amount of data written has not exceeded the size // of the buffer, Write will never fail. cnt, err := buffer.Write([]byte(data)) totalCnt += cnt if err == io.ErrShortWrite { break } time.Sleep(100 * time.Millisecond) } // Write the header afterwards copy(header[:4], fmt.Sprintf("%04d", totalCnt)) }() // Consumer routine. go func() { defer group.Done() // In LineMono mode only, a call to ReadSlices is guaranteed to block // until the channel is closed. All written data will be made available. data, _, _ := buffer.ReadSlices() buffer.ReadMark(len(data)) // Technically, this is optional fmt.Println(string(data)) }() group.Wait() }
Output: 0256 kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck whxMYR1k zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin 2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7 ecE4dY3ZaTrX03xBY
Example (RingBlock) ¶
In RingBlock mode, the consumer sees produced data immediately as it becomes available. The producer is allowed to write as much data as it wants so long as the consumer continues to read the data in the pipe.
package main import ( "fmt" "github.com/dsnet/golib/bufpipe" "io" "math/rand" "sync" "time" ) func randomChars(cnt int, rand *rand.Rand) string { data := make([]byte, cnt) for idx := range data { char := byte(rand.Intn(10 + 26 + 26)) if char < 10 { data[idx] = '0' + char } else if char < 10+26 { data[idx] = 'A' + char - 10 } else { data[idx] = 'a' + char - 36 } } return string(data) } func main() { // Intentionally small buffer to show that data written into the buffer // can exceed the size of the buffer itself. buffer := bufpipe.NewBufferPipe(make([]byte, 64), bufpipe.RingBlock) rand := rand.New(rand.NewSource(0)) group := new(sync.WaitGroup) group.Add(2) // Producer routine. go func() { defer group.Done() defer buffer.Close() buffer.Write([]byte("#### ")) // Write a fake header for idx := 0; idx < 8; idx++ { data := randomChars(rand.Intn(64), rand) + "\n" // So long as the amount of data written has not exceeded the size // of the buffer, Write will never fail. buffer.Write([]byte(data)) time.Sleep(100 * time.Millisecond) } }() // Consumer routine. go func() { defer group.Done() data := make([]byte, 64) for { // Reading can also be done using the Read method. cnt, err := buffer.Read(data) fmt.Print(string(data[:cnt])) if err == io.EOF { break } } fmt.Println() }() group.Wait() }
Output: #### kdUhQzHYs2LjaukXEC292UgLOCAPQTCNAKfc0XMNCUuJbsqiHmm6GJMFck whxMYR1k zhMYzktxIv10mIPqBCCwm646E6chwIFZfpX0fjqMu0YKLDhfIMnDq8w9J fQhkT1qEkJfEI0jtbDnIrEXx6G4xMgXEB6auAyBUjPk2jMSgCMVZf8L1VgJemin 2Quy1C5aA00KbYqawNeuXYTvgeUXGu3zyjMUoEIrOx7 ecE4dY3ZaTrX03xBYJ04OzomME36yth76CFmg2zTolzKhYByvZ8 FQMuYbcWHLcUu4yL3aBZkwJrbDFUcHpGnBGfbDq4aFlLS5vGOm6mYOjHZll iP0QQKpKp3cz
func NewBufferPipe ¶
func NewBufferPipe(buf []byte, mode int) *BufferPipe
BufferPipe is similar in operation to io.Pipe and is intended to be the communication channel between producer and consumer routines. There are some specific use cases for BufferPipes over io.Pipe.
First, in cases where a writer may need to go back a modify a portion of the past "written" data before allowing the reader to consume it. Second, for performance applications, where the cost of copying of data is noticeable. Thus, it would be more efficient to read/write data from/to the internal buffer directly.
See the defined constants for more on the buffer mode of operation.
func (*BufferPipe) Buffer ¶
func (b *BufferPipe) Buffer() []byte
The entire internal buffer. Be careful when touching the raw buffer. Line buffers are always guaranteed to be aligned to be front of the slice. Ring buffers use wrap around logic and could be physically split apart.
func (*BufferPipe) Capacity ¶
func (b *BufferPipe) Capacity() int
The total number of bytes the buffer can store.
func (*BufferPipe) Close ¶
func (b *BufferPipe) Close() error
Close the buffer down.
All write operations have no effect after this, while all read operations will drain remaining data in the buffer. This operation is somewhat similar to how Go channels operate.
Writers should close the buffer to indicate to readers to mark end-of-stream.
Readers should only close the buffer in the event of unexpected termination. The mechanism allows readers to inform writers of consumer termination and prevents the producer from potentially being blocked forever.
func (*BufferPipe) CloseWithError ¶
func (b *BufferPipe) CloseWithError(err error) (errPre error)
Closes the pipe with the given error. This sets the error value for the pipe and returns the previous error value.
func (*BufferPipe) Length ¶
func (b *BufferPipe) Length() int
The number of valid bytes that can be read.
func (*BufferPipe) Pointers ¶
func (b *BufferPipe) Pointers() (rdPtr, wrPtr int64)
The internal pointer values.
func (*BufferPipe) Read ¶
func (b *BufferPipe) Read(data []byte) (cnt int, err error)
Read data from the buffer.
In all modes, the length of the data slice may exceed the capacity of the buffer. The operation will block until all data has been read or until the EOF is hit.
Under Block mode, this method may block forever if there is no producer.
func (*BufferPipe) ReadFrom ¶
func (b *BufferPipe) ReadFrom(rd io.Reader) (cnt int64, err error)
Continually read the contents of the reader and write them to the pipe.
func (*BufferPipe) ReadMark ¶
func (b *BufferPipe) ReadMark(cnt int)
Advances the read pointer.
The amount that can be advanced must be non-negative and be less than the length of the slices returned by the previous ReadSlices. Calls to Read may not be done between these two calls. Also, another call to ReadMark is invalid until ReadSlices has been called again.
If ReadMark is being used, only one reader routine is allowed.
func (*BufferPipe) ReadSlices ¶
func (b *BufferPipe) ReadSlices() (bufLo, bufHi []byte, err error)
Slices of valid data that can be read. This does not advance the internal read pointer. All of the valid readable data is the logical concatenation of the two slices.
In linear buffers, the first slice obtained is guaranteed to be the entire valid readable buffer slice.
In ring buffers, the first slice obtained may not represent all of the valid buffer data since slices always represent a contiguous pieces of memory. However, the first slice is guaranteed to be a non-empty slice if space is available.
Under the Block mode, this method blocks until there is at least some valid data to read. The Mono mode is special in that, none of the data is considered ready for reading until the writer closes the channel.
func (*BufferPipe) Reset ¶
func (b *BufferPipe) Reset()
Makes the buffer ready for use again by opening the pipe for writing again. The read and write pointers will be reset to zero and errors will be cleared.
func (*BufferPipe) Rollback ¶
func (b *BufferPipe) Rollback() int
Roll back the write pointer and return the number of bytes rolled back. If successful, this effectively makes the valid length zero. In order to prevent race conditions with the reader, this action is only valid in Mono access mode before the channel is closed.
func (*BufferPipe) Write ¶
func (b *BufferPipe) Write(data []byte) (cnt int, err error)
Write data to the buffer.
In linear buffers, the length of the data slice may not exceed the capacity of the buffer. Otherwise, an ErrShortWrite error will be returned.
In ring buffers, the length of the data slice may exceed the capacity.
Under Block mode, this operation will block until all data has been written. If there is no consumer of the data, then this method may block forever.
func (*BufferPipe) WriteMark ¶
func (b *BufferPipe) WriteMark(cnt int)
Advances the write pointer.
The amount that can be advanced must be non-negative and be less than the length of the slices returned by the previous WriteSlices. Calls to Write may not be done between these two calls. Also, another call to WriteMark is invalid until WriteSlices has been called again.
If WriteMark is being used, only one writer routine is allowed.
func (*BufferPipe) WriteSlices ¶
func (b *BufferPipe) WriteSlices() (bufLo, bufHi []byte, err error)
Slices of available buffer that can be written to. This does not advance the internal write pointer. All of the available write space is the logical concatenation of the two slices.
In linear buffers, the first slice obtained is guaranteed to be the entire available writable buffer slice.
In LineMono mode only, slices obtained may still be modified even after WriteMark has been called and before Close. This is valid because this mode blocks readers until the buffer has been closed.
In ring buffers, the first slice obtained may not represent all of the available buffer space since slices always represent a contiguous pieces of memory. However, the first slice is guaranteed to be a non-empty slice if space is available.
In the Block mode, this method blocks until there is available space in the buffer to write. Poll mode, on the contrary, will return empty slices if the buffer is full.