aof

package
v0.0.0-...-052ef2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MagicTail Little-Endian = [170 36 117 84 99 156 155 65]
	// After each write the MagicTail is appended to the end.
	MagicTail = uint64(4727544184288126122)
	// MagicCheckpoint Little-Endian = [44 219 31 242 165 172 120 248]
	MagicCheckpoint = uint64(17904250147343162156)
)

Variables

View Source
var (
	ErrShrink         = errors.New("shrink prohibited")
	ErrIsDirectory    = errors.New("path is a directory")
	ErrCorrupted      = errors.New("corrupted")
	ErrEmptyFile      = errors.New("eof mode on empty file")
	ErrFileIsReadOnly = errors.New("file is read-only")
	ErrReadPermission = errors.New("file has no read permission")
)
View Source
var (
	SizeNowDefault    = pageSize
	SizeUpperDefault  = int64(1024 * 1024 * 16) // 16MB
	GrowthStepDefault = pageSize
)
View Source
var (
	RecoveryDefault = Recovery{
		Magic: Magic{
			Tail:       MagicTail,
			Checkpoint: MagicCheckpoint,
		},
		Func: RecoverWithMagic,
	}
	RecoveryReadOnly = Recovery{}
)
View Source
var (
	ErrAppendFuncNil = errors.New("append func nil")
)
View Source
var (
	ErrWouldBlock = errors.New("would block")
)
View Source
var (
	OpenFile = Geometry{}
)

Functions

This section is empty.

Types

type AOF

type AOF struct {
	// contains filtered or unexported fields
}

AOF is a single-producer multiple-consumer memory-mapped append only file. The same mapping is shared among any number of consumers. Each instance has a single mmap for its lifetime. The underlying file is then truncated "extended" in increments. Writes are blocked when writing past the current file size and a truncation is in-progress. Reads never block.

In order to minimize truncation blocking, the Manager can schedule AOF truncation to keep up with the writing pace.

func Open

func Open(name string, geometry Geometry, recovery Recovery) (aof *AOF, err error)

func (*AOF) Append

func (aof *AOF) Append(reserve int64, appendFn AppendFunc) error

func (*AOF) AppendNonBlocking

func (aof *AOF) AppendNonBlocking(reserve int64, appendFn AppendFunc) error

func (*AOF) Checkpoint

func (aof *AOF) Checkpoint() (int64, error)

func (*AOF) Close

func (aof *AOF) Close() error

func (*AOF) Finish

func (aof *AOF) Finish() (err error)

func (*AOF) Flush

func (aof *AOF) Flush() error

func (*AOF) IsAnonymous

func (aof *AOF) IsAnonymous() bool

func (*AOF) Subscribe

func (aof *AOF) Subscribe(
	c Consumer,
) (*Tailer, error)

func (*AOF) SubscribeInterval

func (aof *AOF) SubscribeInterval(
	interval time.Duration,
	c Consumer,
) (*Tailer, error)

func (*AOF) SubscribeIntervalOn

func (aof *AOF) SubscribeIntervalOn(
	r *reactor.Reactor,
	interval time.Duration,
	c Consumer,
) (*Tailer, error)

func (*AOF) SubscribeOn

func (aof *AOF) SubscribeOn(
	r *reactor.Reactor,
	c Consumer,
) (*Tailer, error)

func (*AOF) Sync

func (aof *AOF) Sync() error

func (*AOF) Wake

func (aof *AOF) Wake() error

func (*AOF) Write

func (aof *AOF) Write(b []byte) (int, error)

func (*AOF) WriteNonBlocking

func (aof *AOF) WriteNonBlocking(b []byte) (int, error)

type AppendEvent

type AppendEvent struct {
	Begin int64
	End   int64
	Tail  []byte
	// contains filtered or unexported fields
}

func (*AppendEvent) File

func (a *AppendEvent) File() []byte

type AppendFunc

type AppendFunc func(event AppendEvent) (int64, error)

type ClosedEvent

type ClosedEvent struct {
	Tailer *Tailer
	Reason any
}

type Consumer

type Consumer interface {
	PollRead(event ReadEvent) (int64, error)

	PollReadClosed(reason error)
}

type ErrorFunc

type ErrorFunc func(err error)

type FileState

type FileState int32
const (
	FileStateOpening FileState = 0
	FileStateOpened  FileState = 1
	FileStateEOF     FileState = 2
	FileStateClosing FileState = 3
	FileStateClosed  FileState = 4
)

type FileStats

type FileStats struct {
	// contains filtered or unexported fields
}

type Geometry

type Geometry struct {
	SizeNow    int64
	SizeUpper  int64
	GrowthStep int64
	PageSize   int64
	Create     bool
}

func CreateFile

func CreateFile() *Geometry

func (*Geometry) Next

func (g *Geometry) Next(size int64) int64

func (*Geometry) Validate

func (g *Geometry) Validate()

func (*Geometry) With

func (g *Geometry) With(sizeNow, sizeUpper, growthStep int64) *Geometry

func (*Geometry) WithSizeNow

func (g *Geometry) WithSizeNow(sizeNow int64) *Geometry

type Magic

type Magic struct {
	// Tail is the magic number that marks the tail of the file
	Tail uint64
	// Checkpoint is the magic number that marks the end of a chunk.
	// During recovery if the Magic Tail is not found, it will search
	// for the last Checkpoint
	Checkpoint uint64
}

Magic provides magic numbers for Tail and Checkpoint.

func (*Magic) IsDisabled

func (m *Magic) IsDisabled() bool

func (*Magic) IsEnabled

func (m *Magic) IsEnabled() bool

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(dir string, writeMode, readMode os.FileMode) (*Manager, error)

func (*Manager) Close

func (m *Manager) Close() error

func (*Manager) Open

func (m *Manager) Open(name string, geometry Geometry, recovery Recovery) (aof *AOF, err error)

func (*Manager) OpenAnonymous

func (m *Manager) OpenAnonymous(name string, size int64) (*AOF, error)

func (*Manager) Stats

func (m *Manager) Stats() Stats

type ReadEvent

type ReadEvent struct {
	Time       int64
	Tailer     *Tailer
	Begin, End int64
	Tail       []byte

	FileState FileState
	EOF       bool
	Reason    reactor.PollReason
	// contains filtered or unexported fields
}

func (*ReadEvent) CheckGID

func (re *ReadEvent) CheckGID() bool

func (*ReadEvent) Contents

func (re *ReadEvent) Contents() []byte

Contents of the entire AOF

type Recovery

type Recovery struct {
	Magic Magic
	Func  RecoveryFunc
	// contains filtered or unexported fields
}

Recovery recovers an existing file on open by finding the tail. Since files are mapped and truncated according to the Geometry, it will be zero filled between logical tail and the file-system's tail. Furthermore, during crashes there is a potential that a partial write occurred. Using Magic numbers is the first line of defense to identify possible corruption. Since, this package does not define file formats, it is encouraged that the file format have the ability to recover to the last known commit point when the Magic number is not found.

func (*Recovery) Clone

func (r *Recovery) Clone() Recovery

func (*Recovery) Err

func (r *Recovery) Err() error

func (*Recovery) Result

func (r *Recovery) Result() RecoveryResult

func (*Recovery) Tail

func (r *Recovery) Tail() int64

type RecoveryFunc

type RecoveryFunc func(
	fileSize int64,
	data []byte,
	magic Magic,
) (result RecoveryResult)

type RecoveryKind

type RecoveryKind int
const (
	Empty      RecoveryKind = 0 // The Magic value was found at the tail
	Corrupted  RecoveryKind = 1 // The Magic value was found at the tail
	Tail       RecoveryKind = 2 // The Magic value was found at the tail
	Checkpoint RecoveryKind = 3 // The Magic Checkpoint value was found at the tail
	Panic      RecoveryKind = 4 // The Magic Checkpoint value was found at the tail
)

type RecoveryResult

type RecoveryResult struct {
	Magic      Magic
	Outcome    RecoveryKind
	FileSize   int64
	Checkpoint int64
	Tail       int64
	Err        error
}

func RecoverWithMagic

func RecoverWithMagic(fileSize int64, data []byte, magic Magic) (result RecoveryResult)

RecoverWithMagic finds the last magic tail or last checkpoint

type Stats

type Stats struct {
	Creates               Counter
	CreatesDur            TimeCounter
	Opens                 Counter
	OpensDur              TimeCounter
	OpenErrors            Counter
	OpenErrorsDur         TimeCounter
	OpenFileCount         Counter
	OpenFileDur           TimeCounter
	OpenFileErrors        Counter
	OpenFileErrorsDur     TimeCounter
	Closes                Counter
	CloseDur              TimeCounter
	ActiveMaps            Counter
	ActiveFileSize        Counter
	ActiveMappedMemory    Counter
	ActiveAnonymousMemory Counter
	LifetimeMemory        Counter
	Flushes               Counter
	FlushesDur            TimeCounter
	FlushErrors           Counter
	FlushErrorsDur        TimeCounter
	Finishes              Counter
	FinishesDur           TimeCounter
	FinishErrors          Counter
	FinishErrorsDur       TimeCounter
	Syncs                 Counter
	SyncsDur              TimeCounter
	SyncErrors            Counter
	SyncErrorsDur         TimeCounter
	Maps                  Counter
	MapsDur               TimeCounter
	MapErrors             Counter
	MapErrorsDur          TimeCounter
	Unmaps                Counter
	UnmapsDur             TimeCounter
	UnmapErrors           Counter
	UnmapErrorsDur        TimeCounter
	Finalizes             Counter
	FinalizesDur          TimeCounter
	Truncates             Counter
	TruncatesDur          TimeCounter
	TruncateErrors        Counter
	TruncateErrorsDur     TimeCounter
	Chmods                Counter
	ChmodsDur             TimeCounter
	ChmodErrors           Counter
	ChmodErrorsDur        TimeCounter
}

type Tailer

type Tailer struct {
	reactor.TaskProvider
	// contains filtered or unexported fields
}

func (*Tailer) Close

func (t *Tailer) Close() error

func (*Tailer) Poll

func (t *Tailer) Poll(ctx reactor.Context) error

func (*Tailer) State

func (t *Tailer) State() TailerState

type TailerState

type TailerState int32
const (
	TailerStart   TailerState = 1 // Tailer was recently spawned and waiting for first start Dequeue
	TailerReading TailerState = 2 // Tailer is currently reading and is not at the tail yet
	TailerTail    TailerState = 3 // Tailer has started and is now at the tail
	TailerEOF     TailerState = 4 // Tailer parent AOF is finished the tailer goes from tail to Checkpoint state
	TailerClosing TailerState = 5 // Tailer is waiting for next Dequeue to close
	TailerClosed  TailerState = 6 // Tailer is now safe to delete
)

func (*TailerState) Load

func (t *TailerState) Load() TailerState

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL