toywal

package module
v0.0.0-...-3218583 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2024 License: MIT Imports: 12 Imported by: 0

README

WAL

Go Reference

wal 机制是一种用于持久化操作保证数据可靠性的机制。当数据保存在内存中的时候,需要使用 wal 机制来保证数据可靠性。

由于 wal 通过 append-only 顺序写入来持久化操作,使得 wal 具有较高写入性能。

当出现故障的时候,系统可以通过加载 wal 中的操作来恢复内存中的数据。在很多场景中都会用到 wal,例如 LSM Tree 维护内存表,InnoDB 的 redo log。

golang 实现 wal

将 wal 分为 wal、segment、block 以及 chunk。一个 chunk 表示一条记录,包括 header。一个 block 大小默认是 32kb。

golang 操作底层文件:

s.fd.Write(data)

s.fd.ReadAt(data, offset)
  • 写入文件的时候,先写入 bytebufferpool,然后再写入 buffer 中的内容写入 disk。
  • 读取文件的时候,读入 []byte 中,再使用 binary 库中的函数进行解析

Documentation

Index

Constants

View Source
const (
	B  = 1
	KB = 1024 * B
	MB = 1024 * KB
	GB = 1024 * MB
)
View Source
const (
	CHUNK_HEADER_SIZE = 7 * B   // chunk header is CRC(4B), length(2B), type(1B)
	BLOCK_SIZE        = 32 * KB // block size is 32KB
	FILE_MODE_PERM    = 0644    // mode: -rx-r--r--
	MAX_LEN           = binary.MaxVarintLen32*3 + binary.MaxVarintLen64
)
View Source
const (
	INITIAL_SEGMENT_ID = 0
)

Variables

View Source
var (
	ErrClosed = errors.New("the segment file is closed")
	ErrCRC    = errors.New("invalid crc, the data may be corrupted")
)
View Source
var DefaultOptions = Options{
	DirPath:           os.TempDir(),
	SegmentSize:       GB,
	SegmentFileSuffix: ".seg",
	BlockCacheSize:    32 * KB * 10,
	Sync:              false,
	BytesPerSync:      0,
}

Functions

func NewBlockAndHeader

func NewBlockAndHeader() interface{}

func SegmentFileName

func SegmentFileName(dirPath string, fileSuffix string, id uint32) string

Types

type BlockAndHeader

type BlockAndHeader struct {
	// contains filtered or unexported fields
}

type BlockID

type BlockID = uint32

type ChunkPosition

type ChunkPosition struct {
	SegmentId   SegmentID
	BlockId     BlockID
	ChunkOffset int64
	ChunkSize   uint32
}

func DecodeChunkPosition

func DecodeChunkPosition(buf []byte) *ChunkPosition

func (*ChunkPosition) Encode

func (cp *ChunkPosition) Encode() []byte

func (*ChunkPosition) EncodeFixedSize

func (cp *ChunkPosition) EncodeFixedSize() []byte

type ChunkType

type ChunkType = byte
const (
	ChunkTypeFull ChunkType = iota
	ChunkTypeFirst
	ChunkTypeMiddle
	ChunkTypeLast
)

type Options

type Options struct {
	DirPath string

	SegmentSize       uint64
	SegmentFileSuffix string

	BlockCacheSize uint64

	Sync         bool
	BytesPerSync uint32
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) CurrentChunkPosition

func (r *Reader) CurrentChunkPosition() *ChunkPosition

func (*Reader) CurrentSegmentId

func (r *Reader) CurrentSegmentId() SegmentID

func (*Reader) Next

func (r *Reader) Next() ([]byte, *ChunkPosition, error)

func (*Reader) SkipCurrentSegment

func (r *Reader) SkipCurrentSegment()

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

func OpenSegmentFile

func OpenSegmentFile(dirPath string, fileSuffix string, id uint32, cache *lru.Cache[uint64, []byte]) (*Segment, error)

func (*Segment) Close

func (s *Segment) Close() error

func (*Segment) NewSegmentReader

func (s *Segment) NewSegmentReader() *SegmentReader

func (*Segment) Read

func (s *Segment) Read(blockID BlockID, chunkOffset int64) ([]byte, error)

func (*Segment) Remove

func (s *Segment) Remove() error

func (*Segment) Size

func (s *Segment) Size() uint64

func (*Segment) Sync

func (s *Segment) Sync() error

func (*Segment) Write

func (s *Segment) Write(data []byte) (pos *ChunkPosition, err error)

func (*Segment) WriteAll

func (s *Segment) WriteAll(data [][]byte) (positions []*ChunkPosition, err error)

type SegmentID

type SegmentID = uint32

type SegmentReader

type SegmentReader struct {
	// contains filtered or unexported fields
}

func (*SegmentReader) Next

func (r *SegmentReader) Next() ([]byte, *ChunkPosition, error)

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func New

func New(options *Options) (*WAL, error)

func (*WAL) Close

func (wal *WAL) Close() error

func (*WAL) Delete

func (wal *WAL) Delete() error

func (*WAL) GetActiveSegmentID

func (wal *WAL) GetActiveSegmentID() SegmentID

func (*WAL) IsEmpty

func (wal *WAL) IsEmpty() bool

func (*WAL) NewReader

func (wal *WAL) NewReader() *Reader

func (*WAL) NewReaderWithMax

func (wal *WAL) NewReaderWithMax(sid SegmentID) *Reader

func (*WAL) NewReaderWithStart

func (wal *WAL) NewReaderWithStart(pos *ChunkPosition) (*Reader, error)

func (*WAL) Read

func (wal *WAL) Read(pos *ChunkPosition) ([]byte, error)

func (*WAL) Sync

func (wal *WAL) Sync() error

func (*WAL) Write

func (wal *WAL) Write(data []byte) (*ChunkPosition, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL