byframe

package module
v4.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2023 License: MIT Imports: 3 Imported by: 3

README

Overview

It's a low overhead length header format with dynamic header length. So we don't waste resources on the header itself when framing data. The algorithm is based on LEB128.

This lib also contains functions to encode and decode the header, so you have the full flexibility to decide how to use it, such as streaming TCP frames, indexing database, etc.

Format

Each frame has two parts: the header and body.

|     frame     |
| header | body |
Header

Each byte (8 bits) in the header has two parts, "continue" and "fraction":

bit index |    0     | 1 2 3 4 5 6 7 |
sections  | continue |   fraction    |

If the "continue" is 0, the header ends. If the "continue" is 1, then the followed byte should also be part of the header.

Sum all the fractions together, we will get the size of the message.

For example:

|                         frame                              |
|                      header                         | body |
| continue |   fraction    | continue |   fraction    |      |
|    0     | 1 0 0 0 0 0 0 |    1     | 1 1 0 1 0 0 0 | ...  |

So the size of the body is 0b1101000,1000000 bytes.

Documentation

Overview

Package byframe provides a simple framing protocol for streaming data.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrInsufficient ...
	ErrInsufficient = errors.New("[byframe] data is not sufficient to construct the body")

	// ErrHeaderInsufficient ...
	ErrHeaderInsufficient = errors.New("[byframe] data is not sufficient to construct the header")

	// ErrHeaderTooLarge ...
	ErrHeaderTooLarge = errors.New("[byframe] header is too long")
)
View Source
var ErrLimitExceeded = fmt.Errorf("[byframe] exceeded the limit")

ErrLimitExceeded ...

Functions

func Decode

func Decode(data []byte) ([]byte, error)

Decode decode frame into data, decoded bytes and error

func DecodeHeader

func DecodeHeader(raw []byte) (header int, data int)

DecodeHeader decode bytes into header length and data length.

func Encode

func Encode(data []byte) []byte

Encode encode data into frame format

Example
data := byframe.Encode([]byte("test"))

decoded, _ := byframe.Decode(data)

fmt.Println(string(decoded))
Output:

test

func EncodeHeader

func EncodeHeader(l int) (header []byte)

EncodeHeader encode data length into header

Example
header := byframe.EncodeHeader(1000)

headerLen, dataLen := byframe.DecodeHeader(header)

fmt.Println(headerLen, dataLen)
Output:

2 1000

func HeaderLength

func HeaderLength(raw []byte) int

HeaderLength decodes the length of the header. Returns zero if the raw is not sufficient. It will return -1 if the header length is too large.

Types

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

Scanner scan frames based on the length header

Example
package main

import (
	"bytes"
	"errors"
	"fmt"
	"io"
	"math/rand"
	"strings"

	"github.com/ysmood/byframe/v4"
)

func main() {
	var buf bytes.Buffer

	for i := 0; i < 3; i++ {
		frame := byframe.Encode([]byte(fmt.Sprintf("%d", i)))
		buf.Write(frame)
	}

	s := byframe.NewScanner(&buf)

	for s.Scan() {
		fmt.Println(string(s.Frame()))
	}

}

func (t T) Scanner() {
	frame := byframe.Encode([]byte("test data"))
	r := bytes.NewReader(frame)
	s := byframe.NewScanner(r)

	for s.Scan() {
		t.Eq([]byte("test data"), s.Frame())
	}
}

func (t T) ScannerNext() {
	frame := byframe.Encode([]byte("test data"))
	r := bytes.NewReader(frame)
	s := byframe.NewScanner(r)

	b, err := s.Next()
	t.E(err)
	t.Eq([]byte("test data"), b)

	t.Err(s.Next())
}

func (t T) ScannerMultiFrames() {
	frameA := byframe.Encode([]byte("test a"))
	frameB := byframe.Encode([]byte("test b"))
	r := bytes.NewReader(append(frameA, frameB...))
	s := byframe.NewScanner(r)

	list := [][]byte{}
	for s.Scan() {
		list = append(list, s.Frame())
	}
	t.Eq([][]byte{[]byte("test a"), []byte("test b")}, list)
}

func (t T) ScannerOptions() {
	frame := byframe.Encode([]byte("test data test data"))
	r := bytes.NewReader(frame)
	s := byframe.NewScanner(r).Limit(10)

	for s.Scan() {
		nop()
	}
	t.Eq(s.Err(), byframe.ErrLimitExceeded)
}

func (t T) ScannerLargeHeaderErr() {
	r := bytes.NewReader(bytes.Repeat([]byte{0b1000_0000}, 20))
	s := byframe.NewScanner(r)

	for s.Scan() {
		nop()
	}
	t.Eq(s.Err(), byframe.ErrHeaderTooLarge)
}

type MultiRead struct {
	i            int
	returnedZero int
	frame        []byte
}

// read only one byte each time
func (mr *MultiRead) Read(buf []byte) (int, error) {
	// simulate (0, nil) edge case
	if mr.i == mr.returnedZero {
		mr.i++
		return 0, nil
	}

	copy(buf, mr.frame[mr.i:mr.i+1])
	mr.i++
	if mr.i == len(mr.frame) {
		return 0, io.EOF
	}
	return 1, nil
}

func (t T) ScannerMultiRead() {
	data := []byte(strings.Repeat("test data", 100))
	frame := byframe.Encode(data)

	s := byframe.NewScanner(&MultiRead{frame: frame, returnedZero: 5})

	for s.Scan() {
		t.Eq(data, s.Frame())
	}
}

type ErrRead struct {
}

func (mr *ErrRead) Read([]byte) (int, error) {
	return 0, errors.New("err")
}

func (t T) ScannerReadErr() {
	s := byframe.NewScanner(&ErrRead{})

	hit := false
	for s.Scan() {
		hit = true
	}
	t.False(hit)
	t.Eq(errors.New("err"), s.Err())
}

func (t T) StreamMonkey() {
	list := [][]byte{}
	buf := bytes.NewBuffer(nil)

	for i := 0; i < 1000; i++ {
		data := bytes.Repeat([]byte{1}, rand.Intn(128*1024))
		buf.Write(byframe.Encode(data))
		list = append(list, data)
	}

	s := byframe.NewScanner(buf)

	for s.Scan() {
		item := list[0]
		list = list[1:]
		t.True(bytes.Equal(s.Frame(), item))
	}
}

func nop() {}
Output:

0
1
2

func NewScanner

func NewScanner(r io.Reader) *Scanner

NewScanner just like line scanner

func (*Scanner) Err

func (s *Scanner) Err() error

Err returns error if errors happen while scanning

func (*Scanner) Frame

func (s *Scanner) Frame() []byte

Frame returns the current frame

func (*Scanner) Limit

func (s *Scanner) Limit(size int) *Scanner

Limit of frame size, default is 10MB

func (*Scanner) Next

func (s *Scanner) Next() ([]byte, error)

Next scan next frame, returns error if errors happen while scanning

func (*Scanner) Scan

func (s *Scanner) Scan() bool

Scan scan next frame, returns true to continue the scan

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL