biglog

package
v0.0.0-...-5d4656b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2018 License: MPL-2.0 Imports: 14 Imported by: 3

README

BigLog

BigLog is a high level abstraction on top of os.File designed to store large amounts of data. BigLog is NetLog's core component, which can be embedded in any application.

In a log-based queue, logs must be append-only. But most people eventually need to delete data, so instead of a single file we have several "segments". Every segment is just a data file with blob of bytes written to it and a companion index file. Indexes are preallocated in a fixed size every time a segment is created and memory-mapped. Each entry in the index has the format:

Index entry format
+---------------------------------------------------------------+
| relative offset   |   unix timestamp   |   data file offset   |
+---------------------------------------------------------------+
| (4bytes)[uint32]  |  (4bytes)[uint32]  |   (8bytes)[int64]    |
+---------------------------------------------------------------+

Every segment has a base offset, and the index stores the relative offset to that base, with the first offset being always 1. The index can be sparse. The last element of the index is always the NEXT offset to be written, whose timestamp is not set.

Index example
+-----------------------------------+
|    1   |  1456514390   |      0   |  <- first RO, offset 0 in data file
+-----------------------------------+
|    2   |  1456514391   |     32   |  <- RO 2 starts 32 bytes later
+-----------------------------------+
|    4   |  1456514392   |     96   |  <- RO 4 starts 64 bytes later
+-----------------------------------+     RO 3 is embedded somewhere between position 32 and 96
|   11   |           0   |    320   |  <- Next available offset is RO11 which goes at position 320
+-----------------------------------+     (size of the data file)

The segment with the highest base offset is the "hot" segment, the only one which gets writes under the hood via Write() [io.Writer interface] for a single offset or WriteN() for N offsets. You can create a new hot segment calling Split(), and discard the oldest one calling Trim().

There are 2 reading primitives, a Reader [io.Reader] which reads over the data files returning byte blobs, and an Index Reader which reads index files returning entries. Both are initialized (multiple instances allowed) and operate separately.

                     +------------------+
                  -> |   index reader   | ->
                     +------------------+
+-----------------------------------+ +-----------------------------------+
|           index file 0            | |           index file 1            |
+-----------------------------------+ +-----------------------------------+
+-----------------------------------+ +-----------------------------------+
|            data file 0            | |            data file 1            |
+-----------------------------------+ +-----------------------------------+
                             +------------------+
                          -> |    data reader   | ->
                             +------------------+

Readers will transparently jump through segments until their buffer is full or EOF is reached, they can be instantiated to start at any give offset with a specific entry in the index, if an embedded offset is requested the reader will start in the previous known offset position.

Based on these 2 readers, BigLog provides another 2 higher abstractions, Scanner and Streamer. See the godocs.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmbeddedOffset is returned when the offset in embedded in a batch and can not be retrieved individually
	ErrEmbeddedOffset = errors.New("biglog: embedded offset")

	// ErrNotFound is returned when the requested offset is not in the log
	ErrNotFound = errors.New("biglog: offset not found")

	// ErrLastSegment is returned trying to delete the only segment in the biglog
	// To delete all segments use BigLog.Delete()
	ErrLastSegment = errors.New("biglog: last segment can't be deleted")

	// ErrInvalid is returned when the big log format is not recognized
	ErrInvalid = errors.New("biglog: invalid biglog")

	// ErrBusy is returned when there are active readers or watchers while trying
	// to close/delete the biglog
	ErrBusy = errors.New("biglog: resource busy")
)
View Source
var (
	// ErrSegmentFull is returned when the index does not have capacity left.
	ErrSegmentFull = errors.New("biglog: segment full")

	// ErrSegmentBusy is returned when trying to delete a segment that is being read.
	ErrSegmentBusy = errors.New("biglog: segment busy")

	// ErrLoadSegment is returned when segment files could not be loaded, the reason should be logged.
	ErrLoadSegment = errors.New("biglog: failed to load segment")

	// ErrRONotFound is returned when the requested relative offset is not in the segment.
	ErrRONotFound = errors.New("biglog: relative offset not found in segment")

	// ErrROInvalid is returned when the requested offset is out of range.
	ErrROInvalid = errors.New("biglog: invalid relative offset 0 < RO < 4294967295")
)
View Source
var ErrEntryTooLong = errors.New("biglog.Scanner: entry too long")

ErrEntryTooLong is returned when the entry is too big to fit in the allowed buffer size.

View Source
var ErrInvalidIndexReader = errors.New("biglog: invalid reader - use NewIndexReader")

ErrInvalidIndexReader is returned on read with nil pointers

View Source
var ErrInvalidReader = errors.New("biglog: invalid reader - use NewReader")

ErrInvalidReader is returned on read with nil pointers

View Source
var ErrInvalidScanner = errors.New("biglog: invalid reader - use NewScanner")

ErrInvalidScanner is returned when using an uninitialized scanner.

View Source
var ErrNeedMoreBytes = errors.New("biglog: maxBytes too low for any entry")

ErrNeedMoreBytes is returned by in the index reader when a single entry does not fit the requested byte limit usually the client should double the size when possible and request again

View Source
var ErrNeedMoreOffsets = errors.New("biglog: maxOffsets too low for any entry")

ErrNeedMoreOffsets is returned by in the index reader when a single entry does not fit the requested offset limit usually the client should double the size when possible and request again

View Source
var Logger = log.New(os.Stderr, "BIGLOG ", log.LstdFlags)

Logger is the logger instance used by BigLog in case of error.

Functions

This section is empty.

Types

type BigLog

type BigLog struct {
	// contains filtered or unexported fields
}

BigLog is the main structure TODO ...

func Create

func Create(dirPath string, maxIndexEntries int) (*BigLog, error)

Create new biglog `dirPath` does not include the name `indexSize` represents the number of entries that the index can hold current entry size in the index is 4 bytes, so every segment will have a preallocated index file of disk size = maxIndexEntries * 4 bytes. In the index each write will consumed an entry, independently of how many offsets are contained.

func Open

func Open(dirPath string) (*BigLog, error)

Open loads a BigLog from disk by loading all segments from the index files inside the given directory. The last segment - ordered by the base offset encoded in the file names - is set to be the hot segment of the BigLog. The created BigLog is ready to serve any watchers or readers immediately.

ErrInvalid is returned if there are no index files within dirPath. ErrLoadSegment is returned if a segment can not be loaded.

func (*BigLog) After

func (bl *BigLog) After(t time.Time) (int64, error)

After returns the first offset after a given time.

func (*BigLog) Close

func (bl *BigLog) Close() error

Close frees all resources, rendering the BigLog unusable without touching the data persisted on disk.

func (*BigLog) Delete

func (bl *BigLog) Delete(force bool) (err error)

Delete closes bl and deletes all segments and all files stored on disk.

func (*BigLog) DirPath

func (bl *BigLog) DirPath() string

DirPath returns the absolute path to the folder with the BigLog's files

func (*BigLog) Info

func (bl *BigLog) Info() (*Info, error)

Info returns an Info struct with all information about the BigLog.

func (*BigLog) Latest

func (bl *BigLog) Latest() int64

Latest returns latest/highest available offset.

func (*BigLog) Name

func (bl *BigLog) Name() string

Name returns the big log's name, which maps to directory path that contains the index and data files.

func (*BigLog) Oldest

func (bl *BigLog) Oldest() int64

Oldest returns oldest/lowest available offset.

func (*BigLog) ReadFrom

func (bl *BigLog) ReadFrom(src io.Reader) (written int64, err error)

ReadFrom reads data from src into the currently active segment until EOF or the first error. All read data is indexed as a single entry. Splitting up the BigLog into more segments is handled transparently if the currently active segment is full. It returns the number of bytes written and any error encountered.

func (*BigLog) SetOpts

func (bl *BigLog) SetOpts(opts ...Option)

SetOpts sets options after BigLog has been created

func (*BigLog) Split

func (bl *BigLog) Split() error

Split creates a new segment in bl's dirPath starting at the highest available offset+1. The new segment has the same size as the old one and becomes the new hot (active) segment.

func (*BigLog) Sync

func (bl *BigLog) Sync() error

Sync flushes all data of the currently active segment to disk.

func (*BigLog) Trim

func (bl *BigLog) Trim() (err error)

Trim removes the oldest segment from the biglog.

func (*BigLog) Write

func (bl *BigLog) Write(b []byte) (written int, err error)

Write writes len(b) bytes from b into the currently active segment of bl as a single entry. Splitting up the BigLog into more segments is handled transparently if the currently active segment is full. It returns the number of bytes written from b (0 <= n <= len(b)) and any error encountered that caused the write to stop early.

func (*BigLog) WriteN

func (bl *BigLog) WriteN(b []byte, n int) (written int, err error)

WriteN writes a batch of n entries from b into the currently active segment. Splitting up the BigLog into more segments is handled transparently if the currently active segment is full. It returns the number of bytes written from b (0 <= n <= len(b)) and any error encountered that caused the write to stop early.

type Entry

type Entry struct {
	// Timestamp of the entry
	Timestamp time.Time
	// Offset corresponds with the offset of the first message
	Offset int64
	// ODelta is the number of offsets held by this entry
	ODelta int
	// Size is the size of the data mapped by this entry
	Size int
}

Entry n holds information about one single entry in the index

type IndexReader

type IndexReader struct {
	// contains filtered or unexported fields
}

IndexReader keeps the state among separate concurrent reads IndexReaders handle segment transitions transparently

func NewIndexReader

func NewIndexReader(bl *BigLog, from int64) (r *IndexReader, ret int64, err error)

NewIndexReader returns an IndexReader that will start reading from a given offset

func (*IndexReader) Close

func (r *IndexReader) Close() error

Close frees up the segments and renders the reader unusable returns nil error to satisfy io.Closer

func (*IndexReader) Head

func (r *IndexReader) Head() int64

Head returns the current offset position of the reader

func (*IndexReader) ReadEntries

func (r *IndexReader) ReadEntries(n int) (entries []*Entry, err error)

ReadEntries reads n entries from the index. This method is useful when scanning single entries one by one for streaming use, ReadSection is recommended

func (*IndexReader) ReadSection

func (r *IndexReader) ReadSection(maxOffsets, maxBytes int64) (is *IndexSection, err error)

ReadSection reads the section of the index that contains a maximum of offsets or bytes it's lower precision than ReadEntries but better suited for streaming since it does not need to allocate an Entry struct for every entry read in the index.

func (*IndexReader) Seek

func (r *IndexReader) Seek(offset int64, whence int) (ret int64, err error)

Seek implements the io.Seeker interface for an index reader.

type IndexSection

type IndexSection struct {
	// Offset where the section begins, corresponds with the offset of the first message
	Offset int64
	// ODelta is the number of offsets held by the section
	ODelta int64
	// EDelta is the number of index entries held by the section
	EDelta int64
	// Size is the size of the data mapped by this index section
	Size int64
}

IndexSection holds information about a set of entries of an index this information is often used to "drive" a data reader, since this is more performant than getting a set of entries.

type Info

type Info struct {
	Name         string     `json:"name"`
	Path         string     `json:"path"`
	DiskSize     int64      `json:"disk_size"`
	FirstOffset  int64      `json:"first_offset"`
	LatestOffset int64      `json:"latest_offset"`
	Segments     []*SegInfo `json:"segments"`
	ModTime      time.Time  `json:"mod_time"`
}

Info holds all BigLog meta data

type Option

type Option func(*BigLog)

Option is the type of function used to set internal parameters

func BufioWriter

func BufioWriter(size int) Option

BufioWriter option defines the buffer size to use for writing segments

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader keeps the state among separate concurrent reads Readers handle segment transitions transparently

func NewReader

func NewReader(bl *BigLog, from int64) (r *Reader, ret int64, err error)

NewReader returns a Reader that will start reading from a given offset the reader implements the io.ReaderCloser interface

func (*Reader) Close

func (r *Reader) Close() error

Close frees up the segments and renders the reader unusable returns nil error to satisfy io.Closer

func (*Reader) Read

func (r *Reader) Read(b []byte) (n int, err error)

Read reads bytes from the big log, offset is auto-incremented in every read

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (ret int64, err error)

Seek implements the io.Seeker interface for a reader. Only whence=1 (relative) is not supported since the data reader has no knowledge of where it is once it starts reading the biglog.

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

Scanner facilitates reading a BigLog's content one index entry at a time. Instantiate always via NewScanner. A scanner is NOT thread-safe, to use it concurrently your application must protect it with a mutex.

func NewScanner

func NewScanner(bl *BigLog, from int64, opts ...ScannerOption) (s *Scanner, err error)

NewScanner returns a new Scanner starting at `from` offset.

func (*Scanner) Bytes

func (s *Scanner) Bytes() []byte

Bytes returns content of the scanned entry.

func (*Scanner) Close

func (s *Scanner) Close() error

Close implements io.Closer and closes the scanner rendering it unusable.

func (*Scanner) Err

func (s *Scanner) Err() error

Err returns the first non-EOF error that was encountered by the Scanner.

func (*Scanner) ODelta

func (s *Scanner) ODelta() int

ODelta returns the number of offsets included in the scanned entry.

func (*Scanner) Offset

func (s *Scanner) Offset() int64

Offset returns the initial offset of the scanned entry.

func (*Scanner) Scan

func (s *Scanner) Scan() bool

Scan advances the Scanner to the next entry, returning false if there is nothing else to read.

type ScannerOption

type ScannerOption func(*Scanner)

ScannerOption is the type of function used to set internal scanner parameters

func MaxBufferSize

func MaxBufferSize(size int) ScannerOption

MaxBufferSize option defines the maximum buffer size that the scanner is allowed to use. If an entry is larger than the max buffer scanning will fail with error ErrTooLong. Default size 0 means no limit.

func UseBuffer

func UseBuffer(buf []byte) ScannerOption

UseBuffer option facilitates to bring your own buffer to the scanner, if the buffer is not big enough for an entry it will be replaced with a bigger one. You can prevent this behaviours by setting MaxBufferSize to len(buf).

type SegInfo

type SegInfo struct {
	FirstOffset int64     `json:"first_offset"`
	DiskSize    int64     `json:"disk_size"`
	DataSize    int64     `json:"data_size"`
	ModTime     time.Time `json:"mod_time"`
}

SegInfo contains information about a segment as returned by segment.Info().

type StreamDelta

type StreamDelta struct {
	// contains filtered or unexported fields
}

StreamDelta holds a chunk of data from the BigLog. Metadata can be inspected with the associated methods. StreamDelta implements the io.Reader interface to access the stored data.

func (*StreamDelta) EntryDelta

func (d *StreamDelta) EntryDelta() int64

EntryDelta returns the number of index entries to be streamed

func (*StreamDelta) Offset

func (d *StreamDelta) Offset() int64

Offset returns the first message to be streamed

func (*StreamDelta) OffsetDelta

func (d *StreamDelta) OffsetDelta() int64

OffsetDelta returns the number of offsets to be streamed

func (*StreamDelta) Read

func (d *StreamDelta) Read(p []byte) (n int, err error)

Reader implements the io.Reader interface for this delta

func (*StreamDelta) Size

func (d *StreamDelta) Size() int64

Size returns the number of bytes to be streamed

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer reads the BigLog in Streams deltas which underneath are just chunks of the data file with some metadata. Always instantiate with NewScanner. See Get and Put methods for usage details.

func NewStreamer

func NewStreamer(bl *BigLog, from int64) (s *Streamer, err error)

NewStreamer returns a new Streamer starting at `from` offset.

func (*Streamer) Get

func (st *Streamer) Get(maxOffsets, maxBytes int64) (delta *StreamDelta, err error)

Get given a maximum number of offsets and a maximum number of bytes, returns a StreamDelta for the biggest chunk that satisfied the limits until EOF. If the next entry is too big for either limit, either ErrNeedMoreOffsets or ErrNeedMoreBytes is returned. IMPORTANT: The StreamDelta must be "Put" back before a new one can issued.

func (*Streamer) Put

func (st *Streamer) Put(delta *StreamDelta) (err error)

Put must be called once StreamDelta has been successfully read so the reader can advance and a new StreamDelta can be issued.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher provides a notification channel for changes in a given BigLog

func NewWatcher

func NewWatcher(bl *BigLog) (wa *Watcher)

NewWatcher creates a new Watcher for the provided BigLog.

func (*Watcher) Close

func (wa *Watcher) Close() error

Close releases the Watcher.

func (*Watcher) Watch

func (wa *Watcher) Watch() <-chan struct{}

Watch returns a channel that gets sent an empty struct when there has been changes in the BigLog since the last time the channel was read.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL