log

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxRecordSize  = 1<<31 - 1             // Maximum size of an encoded record
	MaxPayloadSize = MaxRecordSize - 4 - 4 // MaxRecordSize - size - CRC
)

Variables

View Source
var (
	ErrExist      = errors.New("log: already exists")
	ErrNotExist   = errors.New("log: does not exist")
	ErrBadVersion = errors.New("log: bad version")
	ErrCorrupt    = errors.New("log: corrupt")
	ErrOutOfRange = errors.New("log: out of range")
	ErrLagging    = errors.New("log: lagging")
	ErrLocked     = errors.New("log: locked")
	ErrOrphaned   = errors.New("log: orphaned")
	ErrClosed     = errors.New("log: closed")
	ErrTimeout    = errors.New("log: timeout")
)
View Source
var (
	DefaultConfig = Config{
		MaxRecordSize:   1 << 20,
		IndexAfterSize:  1 << 20,
		SegmentMaxCount: -1,
		SegmentMaxSize:  1 << 30,
		SegmentMaxAge:   -1,
		LogMaxCount:     -1,
		LogMaxSize:      -1,
		LogMaxAge:       -1,
	}
)
View Source
var (
	DefaultOptions = Options{
		SyncLock: sync.Mutex{},
	}
)
View Source
var ErrRecordTooLarge = errors.New("log: record too large")

ErrRecordTooLarge is returned when the payload size is too large for the encoded size to stay below the MaxRecordSize hard limit.

Functions

func Delete

func Delete(path string) (err error)

func Restore

func Restore(path string, r io.Reader) (err error)

func Scan

func Scan(path string) (err error)

func Truncate

func Truncate(path string) (err error)

Types

type Config

type Config struct {
	MaxRecordSize   int   // Maximum size of an encoded record.
	IndexAfterSize  int64 // Create an index entry every N bytes.
	SegmentMaxCount int64 // Maximum record count in a segment.
	SegmentMaxSize  int64 // Maximum byte size of a segment.
	SegmentMaxAge   int64 // Maximum age in seconds of a segment.
	LogMaxCount     int64 // Maximum record count in the log.
	LogMaxSize      int64 // Maximum byte size of the log.
	LogMaxAge       int64 // Maximum age in seconds of the log.
}

type Fanin

type Fanin struct {
	// contains filtered or unexported fields
}

func NewFanin

func NewFanin(lw *LogWriter) (f *Fanin)

func (*Fanin) Close

func (f *Fanin) Close() (err error)

func (*Fanin) Flush

func (f *Fanin) Flush() (err error)

func (*Fanin) Write

func (f *Fanin) Write(r *Record) (n int, err error)

type FaninWriter

type FaninWriter struct {
	// contains filtered or unexported fields
}

func NewFaninWriter

func NewFaninWriter(f *Fanin, ioMode recio.IOMode) (fw *FaninWriter)

func (*FaninWriter) Close

func (fw *FaninWriter) Close() (err error)

func (*FaninWriter) Flush

func (fw *FaninWriter) Flush() (err error)

func (*FaninWriter) HandleSync

func (fw *FaninWriter) HandleSync(h SyncHandler)

func (*FaninWriter) Write

func (fw *FaninWriter) Write(r *Record) (n int, err error)

type Log

type Log struct {
	// contains filtered or unexported fields
}

func Create

func Create(path string, config Config, options Options) (l *Log, err error)

func Open

func Open(path string, options Options) (l *Log, err error)

func (*Log) Backup

func (l *Log) Backup(w io.Writer) (err error)

func (*Log) Close

func (l *Log) Close() (err error)

func (*Log) NewReader

func (l *Log) NewReader(bufferSize int, follow bool, ioMode recio.IOMode) (lr *LogReader, err error)

func (*Log) NewWriter

func (l *Log) NewWriter(bufferSize int, ioMode recio.IOMode) (lw *LogWriter, err error)

func (*Log) Stat

func (l *Log) Stat() (stat Stat)

func (*Log) Subscribe

func (l *Log) Subscribe(subscriber chan Stat)

func (*Log) Unsubscribe

func (l *Log) Unsubscribe(subscriber chan Stat)

type LogReader

type LogReader struct {
	// contains filtered or unexported fields
}

func (*LogReader) Close

func (lr *LogReader) Close() (err error)

func (*LogReader) Fill

func (lr *LogReader) Fill() (err error)

func (*LogReader) Read

func (lr *LogReader) Read(r *Record) (n int, err error)

func (*LogReader) Seek

func (lr *LogReader) Seek(position int64, whence Whence) (err error)

func (*LogReader) SetWaitDeadline

func (lr *LogReader) SetWaitDeadline(t time.Time) (err error)

func (*LogReader) Tell

func (lr *LogReader) Tell() (position int64, offset int64)

type LogWriter

type LogWriter struct {
	// contains filtered or unexported fields
}

func (*LogWriter) Close

func (lw *LogWriter) Close() (err error)

func (*LogWriter) Flush

func (lw *LogWriter) Flush() (err error)

func (*LogWriter) HandleSync

func (lw *LogWriter) HandleSync(h SyncHandler)

func (*LogWriter) Tell

func (lw *LogWriter) Tell() (position int64, offset int64)

func (*LogWriter) Write

func (lw *LogWriter) Write(r *Record) (n int, err error)

type Options

type Options struct {
	SyncLock sync.Mutex
}

type Record

type Record []byte

Record implements the encoding and decoding of length-prefixed byte buffers.

Encoded log records are structured as follows.

+----------------+--------------------------------+- - - - - - - - +
|  size (int32)  |      payload (size bytes)      |  CRC (uint32)  |
+----------------+--------------------------------+- - - - - - - - +

Size is a big-endian int32 and encodes the payload length. Payload is a variable length byte buffer. A CRC32-C of the whole record is implicitly appended and checked when using recio atomic readers / writers.

Payload length is limited to 2,147,483,639 bytes (~2GB, max int32 - 8).

DESIGN: Limiting record size to the maximum signed 32 bits integer ensures that records will not overflow Encode and Decode return values, and that record sizes can be delt with on any platform as a standard int, avoiding cascading typing issues.

Overall, sticking to int avoids awkward and brittle type conversions in client code at the relatively low cost of limiting payload sizes to a little bit less that 2GB.

Using int64 for sizes was rejected in favor of int32 both for performance and data integrity reasons, since no CRC for this kind of block length is as well understood and hardware accelerated as CRC32-C.

func (*Record) Decode

func (r *Record) Decode(p []byte) (n int, err error)

Decode implements the recio.Decoder interface. It decodes the record from the provided byte slice. It fails with err == ErrRecordTooLarge if the payload exeeds MaxPayloadSize. If the records is not decodeable, it returns err == ErrInvalidRecord. This method is used by Read to decode records and should not be called directly.

func (*Record) Encode

func (r *Record) Encode(p []byte) (n int, err error)

Encode implements the recio.Encoder interface. It encodes the record to the provided byte slice. It fails with err == ErrRecordTooLarge if the payload exeeds MaxPayloadSize. This method is used by Write to encode records and should not be called directly.

func (*Record) Size

func (r *Record) Size() (size int)

Size returns the record's encoded byte size.

type Stat

type Stat struct {
	StartPosition  int64
	StartOffset    int64
	StartTimestamp int64
	EndPosition    int64
	EndOffset      int64
}

type SyncHandler

type SyncHandler func(syncProgress SyncProgress)

type SyncProgress

type SyncProgress struct {
	Position int64
	Count    int64
}

type Whence

type Whence string
const (
	SeekOrigin  Whence = "origin"  // Seek from the log origin (position 0).
	SeekStart   Whence = "start"   // Seek from the first available record.
	SeekCurrent Whence = "current" // Seek from the current position.
	SeekEnd     Whence = "end"     // Seek from the end of the log.
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL