tracedb

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2020 License: AGPL-3.0 Imports: 31 Imported by: 0

README

tracedb GoDoc Go Report Card Coverage Status

Trace

tracedb: blazing fast time-series database for IoT and real-time messaging applications

tracedb is blazing fast time-series database for IoT, realtime messaging applications. Access tracedb with pubsub over tcp or websocket using trace application.

Tracedb can be used for online gaming and mobile apps as it satisfy the requirements for low latency and binary messaging. Tracedb is a perfect time-series database for applications such as internet of things and internet connected devices.

About tracedb

Key characteristics

  • 100% Go
  • Optimized for fast lookups and hyper scale writes
  • Can store larger-than-memory data sets
  • Data is safely written to disk with accuracy and high performant block sync technique
  • Supports time-to-live on message entry
  • Supports writing to wildcard topics
  • Queried data is returned complete and correct

The tracedb engine includes the following components:

  • Buffer Pool
  • Block Cache
  • Write Ahead Log (WAL)
  • Lookup Trie
  • Writing to timeWindow file
  • Writing to Block Index file
  • Writing to Data file
Writing data to disk

The tracedb engine handles data from the point put request is received through writing data to the physical disk. Data is written to tracedb using low latency binary messaging entry. Data is compressed and encrypted (if encryption is set) then written to a WAL for immediate durability. Entries are written to memdb block cache and become immediately queryable. The memdb block cache is periodically written to disk in the form of blocks.

Write Ahead Log (WAL)

The Write Ahead Log (WAL) retains tracedb data when the db restarts. The WAL ensures data is durable in case of an unexpected failure.

When the tracedb engine receives a put request, the following steps occur:

  • The put request is parsed, packed and appended to a tinyBatch buffer.
  • Topic is parsed into parts and added to the lookup Trie. Contract is added to the first part of the parts in the lookup Trie.
  • The data is added to the memdb block cache.
  • The tinyBatch is appended to the WAL in cyclic order.
  • The last offset of topic from timeWindow block is added to the Trie.
  • Data is written to disk using block sync.
  • The memdb block cache is updated with free offset. The memdb block cache shrinks if it reaches target size.
  • When data is successfully written to WAL, a response confirms the write request was successful.

Blocks sync writes the timeWindow blocks, index blocks, and data blocks to disk.

When the tracedb restarts, last offset of all topics is loaded into Trie, the WAL file is read back and pending writes are applied to the tracedb.

Block Cache

The memdb block cache is an in-memory copy of entries that currently stored in the WAL. The block cache:

  • Organizes entries as per contract into shards.
  • Stores keys and offsets into map
  • Stores compressed data into data blocks.

Queries to the tracedb merge data from the block cache with data from the files. Queries first lookup topic offset from lookup Trie. Topic offset is used to traverse timeWindow blocks and get entries sequence. Entry sequence is used to calculate index block offset and index block is read from the index file, then it uses entry information from index block to read data from data file and un-compresses the data. As encryption flag is set on first bit of sequence so if data is encrypted then it get un-encrypted while data is read.

Block Sync

To efficiently compact and store data, the tracedb engine groups entries sequence by topic key, and then orders those sequences by time and each block keep offset to next field of previous block in reverse time order.

Index block stores entry sequence, data block offset, message size and expiry details. The block offset of index block is calculated from entry sequence. The tracedb compress data and store it into data blocks. If an entry expires or deleted then the data offset and size is marked as free and added to the lease blocks so that it can get allocated by new request.

After data is stored safely in files, the WAL is truncated and the block cache free offset is updated to shrink memdb.

Quick Start

To build tracedb from source code use go get command.

go get -u github.com/unit-io/tracedb

Usage

The tracedb support Get, Put, Delete operations. It also support encryption, batch operations, group batch operations, and writing to wildcard topics. See complete usage guide for more advanced use case.

Opening a database

To open or create a new database, use the tracedb.Open() function:


	package main

	import (
		"log"

		"github.com/unit-io/tracedb"
	)

	func main() {
		db, err := tracedb.Open("tracedb.example", nil)
		if err != nil {
			log.Fatal(err)
			return
		}	
		defer db.Close()
	}

Writing to a database
Store a message

Use DB.Put() to store message to a topic or use DB.PutEntry() to store message entry to a topic. DB.PutEntry() allows client to specify ID and Contract parameters. See topic isolation section for more detail.


	topic := []byte("unit8.b.b1")
	msg := []byte("msg.b.b1.1")
	db.Put(topic, msg)

	or
	
	db.PutEntry(tracedb.NewEntry(topic, msg))

Specify ttl

Specify ttl parameter to a topic while storing messages to expire it after specific duration. Note, DB.Get() or DB.Items() function does not fetch expired messages.

	topic := []byte("unit8.b.b1?ttl=1h")
	msg := []byte("msg.b.b1.1")
	b.PutEntry(tracedb.NewEntry(topic, msg))

Read messages

Use DB.Get() to read messages from a topic. Use last parameter to specify duration or specify number of recent messages to read from a topic. for example, "last=1h" gets messages from tracedb stored in last 1 hour, or "last=100" to get last 100 messages from tracedb. Specify an optional parameter Query.Limit to retrieve messages from a topic with a limit.


	var err error
	var msg [][]byte
	msgs, err = db.Get(&tracedb.Query{Topic: []byte("unit8.b.b1?last=100")})
    ....
	msgs, err = db.Get(&tracedb.Query{Topic: []byte("unit8.b.b1?last=1h", Limit: 100}))

Deleting a message

Deleting a message in tracedb is rare and it require additional steps to delete message from a given topic. Generate a unique message ID using DB.NewID() and use this unique message ID while putting message to the tracedb using DB.PutEntry(). To delete message provide message ID to the DB.DeleteEntry() function.


	messageId := db.NewID()
	err := db.PutEntry(&tracedb.Entry{
		ID:       messageId,
		Topic:    []byte("unit8.b.b1"),
		Payload:  []byte("msg.b.b1.deleting"),
	})
	
	err := db.DeleteEntry(&tracedb.Entry{
		ID:       messageId,
		Topic:    []byte("unit8.b.b1"),
	})

Topic isolation

Topic isolation can be achieved using Contract while putting messages into tracedb or querying messages from a topic. Use DB.NewContract() to generate a new Contract and then specify Contract while putting messages using DB.PutEntry() method. Use Contract in the query to get messages from a topic specific to the contract.

	contract, err := db.NewContract()

	messageId := db.NewID()
	err := db.PutEntry(&tracedb.Entry{
		ID:       messageId,
		Topic:    []byte("unit8.b.b1"),
		Payload:  []byte("msg.b.b1.1"),
		Contract: contract,
	})
	....
	msgs, err := db.Get(&tracedb.Query{Topic: []byte("unit8.b.b1?last=1h", Contract: contract, Limit: 100}))

Iterating over items

Use the DB.Items() function which returns a new instance of ItemIterator. Specify topic to retrieve values and use last parameter to specify duration or specify number of recent messages to retrieve from the topic. for example, "last=1h" retrieves messages from tracedb stored in last 1 hour, or "last=100" to retrieves last 100 messages from the tracedb:


	func print(topic []byte, db *tracedb.DB) {
		// topic -> "unit8.b.b1?last=1h"
		it, err := db.Items(&tracedb.Query{Topic: topic})
		if err != nil {
			log.Fatal(err)
			return
	}
	for it.First(); it.Valid(); it.Next() {
		err := it.Error()
		if err != nil {
			log.Fatal(err)
			return
		}
		log.Printf("%s %s", it.Item().Topic(), it.Item().Value())
	}
}

Statistics

The tracedb keeps a running metrics of internal operations it performs. To get tracedb metrics use DB.Varz() function.


	if varz, err := db.Varz(); err == nil {
		fmt.Printf("%+v\n", varz)
	}

Contributing

If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are welcome.

Licensing

Copyright (c) 2016-2020 Saffat IT Solutions Pvt Ltd. This project is licensed under Affero General Public License v3.

Documentation

Index

Constants

View Source
const (

	// MaxTopicLength is the maximum size of a topic in bytes.
	MaxTopicLength = 1 << 16

	// MaxValueLength is the maximum size of a value in bytes.
	MaxValueLength = 1 << 30

	// MaxKeys is the maximum numbers of keys in the DB.
	MaxKeys = math.MaxInt64

	// MaxSeq is the maximum number of seq supported.
	MaxSeq = uint64(1<<56 - 1)
)

Variables

View Source
var DefaultBatchOptions = &BatchOptions{
	Order:           0,
	Topic:           nil,
	Contract:        message.MasterContract,
	Encryption:      false,
	AllowDuplicates: false,
}

DefaultBatchOptions contains default options when writing batches to Tracedb key-value store.

Functions

func Debug

func Debug(context, msg string)

Debug logs the debug message with tag if it is turned on.

func Fatal

func Fatal(context, msg string, err error)

Fatal logs the fatal error messages.

func Info

func Info(context, action string)

Info logs the action with a tag.

func ParseLevel

func ParseLevel(level string, defaultLevel zerolog.Level) zerolog.Level

ParseLevel parses a string which represents a log level and returns a zerolog.Level.

func ResponseHandler

func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)

ResponseHandler handles responses for monitoring routes

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a write batch.

func (*Batch) Abort

func (b *Batch) Abort()

Abort abort is a batch cleanup operation on batch complete

func (*Batch) Commit

func (b *Batch) Commit() error

Commit commits changes to the DB. In batch operation commit is managed and client is not allowed to call Commit. On Commit complete batch operation signal to the cliend if the batch is fully commmited to DB.

func (*Batch) Delete

func (b *Batch) Delete(id, topic []byte) error

Delete appends delete entry to batch for given key. It is safe to modify the contents of the argument after Delete returns but not before.

func (*Batch) DeleteEntry

func (b *Batch) DeleteEntry(e *Entry) error

DeleteEntry appends entry for deletion to a batch for given key. It is safe to modify the contents of the argument after Delete returns but not before.

func (*Batch) Len

func (b *Batch) Len() int

Len returns number of records in the batch.

func (*Batch) Put

func (b *Batch) Put(value []byte) error

Put adds entry to batch for given topic->key/value. Client must provide Topic to the BatchOptions. It is safe to modify the contents of the argument after Put returns but not before.

func (*Batch) PutEntry

func (b *Batch) PutEntry(e *Entry) error

PutEntry appends entries to a bacth for given topic->key/value pair. It is safe to modify the contents of the argument after Put returns but not before.

func (*Batch) Reset

func (b *Batch) Reset()

Reset resets the batch.

func (*Batch) SetOptions

func (b *Batch) SetOptions(opts *BatchOptions)

SetOptions sets batch options to defer default option and use options specified by client program

func (*Batch) Write

func (b *Batch) Write() error

Write starts writing entries into DB. It returns an error if batch write fails.

type BatchGroup

type BatchGroup struct {
	*DB
	// contains filtered or unexported fields
}

BatchGroup runs multiple batches concurrently without causing conflicts

func (*BatchGroup) Abort

func (g *BatchGroup) Abort()

Abort abort is a batch cleanup operation on batch group complete

func (*BatchGroup) Add

func (g *BatchGroup) Add(fn func(*Batch, <-chan struct{}) error)

Add adds a function to the Group. The function will be executed in its own goroutine when Run is called. Add must be called before Run.

func (*BatchGroup) Run

func (g *BatchGroup) Run() error

Run exectues each function registered via Add in its own goroutine. Run blocks until all functions have returned. The first function to return will trigger the closure of the channel passed to each function, who should in turn, return. The return value from the first function to exit will be returned to the caller of Run.

type BatchOptions

type BatchOptions struct {
	// In concurrent batch writes order determines how to handle conflicts
	Order           int8
	Topic           []byte
	Contract        uint32
	Encryption      bool
	AllowDuplicates bool
}

BatchOptions is used to set options when using batch operation

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents the message storage for topic->keys-values. All DB methods are safe for concurrent use by multiple goroutines.

func Open

func Open(path string, opts *Options) (*DB, error)

Open opens or creates a new DB.

func (*DB) Batch

func (db *DB) Batch(fn func(*Batch, <-chan struct{}) error) error

Batch executes a function within the context of a read-write managed transaction. If no error is returned from the function then the transaction is written. If an error is returned then the entire transaction is rolled back. Any error that is returned from the function or returned from the write is returned from the Batch() method.

Attempting to manually commit or rollback within the function will cause a panic.

func (*DB) Close

func (db *DB) Close() error

Close closes the DB.

func (*DB) Count

func (db *DB) Count() int64

Count returns the number of items in the DB.

func (*DB) Delete

func (db *DB) Delete(id, topic []byte) error

Delete sets entry for deletion. It is safe to modify the contents of the argument after Delete returns but not before.

func (*DB) DeleteEntry

func (db *DB) DeleteEntry(e *Entry) error

DeleteEntry deletes an entry from DB. you must provide an ID to delete an entry. It is safe to modify the contents of the argument after Delete returns but not before.

func (*DB) ExpireOldEntries

func (db *DB) ExpireOldEntries()

ExpireOldEntries run expirer to delete entries from db if ttl was set on entries and it has expired

func (*DB) FileSize

func (db *DB) FileSize() (int64, error)

FileSize returns the total size of the disk storage used by the DB.

func (*DB) Get

func (db *DB) Get(q *Query) (items [][]byte, err error)

Get return items matching the query paramater

func (*DB) HandleVarz

func (db *DB) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz will process HTTP requests for tracedb stats information.

func (*DB) Items

func (db *DB) Items(q *Query) (*ItemIterator, error)

Items returns a new ItemIterator.

func (*DB) LogSeq

func (db *DB) LogSeq() uint64

func (*DB) NewBatchGroup

func (db *DB) NewBatchGroup() *BatchGroup

NewBatchGroup create new group to runs multiple batches concurrently without causing conflicts

func (*DB) NewContract

func (db *DB) NewContract() (uint32, error)

NewContract generates a new Contract.

func (*DB) NewID

func (db *DB) NewID() []byte

NewID generates new ID that is later used to put entry or delete entry.

func (*DB) Put

func (db *DB) Put(topic, value []byte) error

Put puts entry into DB. It uses default Contract to put entry into DB. It is safe to modify the contents of the argument after Put returns but not before.

func (*DB) PutEntry

func (db *DB) PutEntry(e *Entry) error

PutEntry puts entry into the DB, if Contract is not specified then it uses master Contract. It is safe to modify the contents of the argument after PutEntry returns but not before.

func (*DB) Seq

func (db *DB) Seq() uint64

func (*DB) SetEntry

func (db *DB) SetEntry(e *Entry, payload []byte) error

SetEntry sets payload to the provided entry and out the entry into the DB, if Contract is not specified then it uses master Contract. It is safe to modify the contents of the argument after PutEntry returns but not before.

func (*DB) Varz

func (db *DB) Varz() (*Varz, error)

Varz returns a Varz struct containing the tracedb information.

type Entry

type Entry struct {
	ID        []byte // The ID of the message
	Topic     []byte // The topic of the message
	Payload   []byte // The payload of the message
	ExpiresAt uint32 // The time expiry of the message
	Contract  uint32 // The contract is used to as salt to hash topic parts and also used as prefix in the message Id
	// contains filtered or unexported fields
}

Entry represents an entry which is stored into DB.

func NewEntry

func NewEntry(topic, payload []byte) *Entry

NewEntry creates a new entry structure from the topic and payload.

func (*Entry) SetContract

func (e *Entry) SetContract(contract uint32) *Entry

func (*Entry) SetID

func (e *Entry) SetID(id []byte) *Entry

func (*Entry) SetPayload

func (e *Entry) SetPayload(payload []byte) *Entry

func (*Entry) SetTTL

func (e *Entry) SetTTL(ttl []byte) *Entry

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter filter is bloom filter generator

func (*Filter) Append

func (f *Filter) Append(h uint64)

Append appends an entry to bloom filter

func (*Filter) Size

func (f *Filter) Size() int64

func (*Filter) Test

func (f *Filter) Test(h uint64) bool

Test tests entry in bloom filter. It returns false if entry definitely does not exist or entry maybe existing in DB

type Item

type Item struct {
	// contains filtered or unexported fields
}

Item items returned by the iterator

func (*Item) Topic

func (item *Item) Topic() []byte

Topic returns the topic of the current item, or nil if done. The caller should not modify the contents of the returned slice, and its contents may change on the next call to Next.

func (*Item) Value

func (item *Item) Value() []byte

Value returns the value of the current item, or nil if done. The caller should not modify the contents of the returned slice, and its contents may change on the next call to Next.

type ItemIterator

type ItemIterator struct {
	// contains filtered or unexported fields
}

ItemIterator is an iterator over DB topic->key/value pairs. It iterates the items in an unspecified order.

func (*ItemIterator) Error

func (it *ItemIterator) Error() error

Error returns any accumulated error. Exhausting all the key/value pairs is not considered to be an error. A memory iterator cannot encounter errors.

func (*ItemIterator) First

func (it *ItemIterator) First()

First is similar to init. It query and loads window entries from trie/timeWindowBucket or summary file if available.

func (*ItemIterator) Item

func (it *ItemIterator) Item() *Item

Item returns pointer to the current item. This item is only valid until it.Next() gets called.

func (*ItemIterator) Next

func (it *ItemIterator) Next()

Next returns the next topic->key/value pair if available, otherwise it returns ErrIterationDone error.

func (*ItemIterator) Release

func (it *ItemIterator) Release()

Release releases associated resources. Release should always succeed and can be called multiple times without causing error.

func (*ItemIterator) Valid

func (it *ItemIterator) Valid() bool

Valid returns false when iteration is done.

type Meter

type Meter struct {
	Metrics    metrics.Metrics
	TimeSeries metrics.TimeSeries
	Gets       metrics.Counter
	Puts       metrics.Counter
	Leased     metrics.Counter
	Syncs      metrics.Counter
	Recovers   metrics.Counter
	Dels       metrics.Counter
	InMsgs     metrics.Counter
	OutMsgs    metrics.Counter
	InBytes    metrics.Counter
	OutBytes   metrics.Counter
}

Meter meter provides various db statistics

func NewMeter

func NewMeter() *Meter

NewMeter provide meter to capture statistics

func (*Meter) UnregisterAll

func (m *Meter) UnregisterAll()

UnregisterAll unregister all metrics from meter

type Once

type Once struct {
	// contains filtered or unexported fields
}

Once is an object that will perform exactly one action until Reset is called. See http://golang.org/pkg/sync/#Once

func (*Once) Do

func (o *Once) Do(f func())

Do simulates sync.Once.Do by executing the specified function only once, until Reset is called. See http://golang.org/pkg/sync/#Once

func (*Once) Reset

func (o *Once) Reset()

Reset indicates that the next call to Do should actually be called once again.

type Options

type Options struct {
	// BackgroundSyncInterval sets the amount of time between background fsync() calls.
	//
	// Setting the value to 0 disables the automatic background synchronization.
	// Setting the value to -1 makes the DB call fsync() after every write operation.
	BackgroundSyncInterval time.Duration

	// BackgroundKeyExpiry sets flag to run key expirer
	BackgroundKeyExpiry bool

	// Encryption flag to encrypt keys
	Encryption bool

	// Encryption Key
	EncryptionKey []byte

	// Tiny Batch interval to group tiny batches and write into db on tiny batch interval
	// Setting the value to 0 immediately writes entries into db.
	TinyBatchWriteInterval time.Duration

	// DefaultQueryLimit limits maximum number of records to fetch if the DB Get or DB Iterator method does not specify a limit.
	DefaultQueryLimit int

	// MaxQueryLimit limits maximum number of records to fetch if the DB Get or DB Iterator method does not specify a limit or specify a limit larger than MaxQueryResults.
	MaxQueryLimit int

	// Size of buffer to use for pooling
	BufferSize int64

	// Size of memory db
	MemdbSize int64

	// Size of write ahead log
	LogSize int64

	// Minimum freeblocks size before free blocks are allocated and reused.
	MinimumFreeBlocksSize int64

	FileSystem fs.FileSystem
}

Options holds the optional DB parameters.

type Query

type Query struct {
	Topic    []byte // The topic of the message
	Contract uint32 // The contract is used as prefix in the message Id

	Limit int // The maximum number of elements to return.
	// contains filtered or unexported fields
}

Query represents a topic to query and optional contract information.

type Varz

type Varz struct {
	Start    time.Time `json:"start"`
	Now      time.Time `json:"now"`
	Uptime   string    `json:"uptime"`
	Seq      int64     `json:"seq"`
	Count    int64     `json:"count"`
	Blocks   int64     `json:"blocks"`
	Gets     int64     `json:"gets"`
	Puts     int64     `json:"puts"`
	Leased   int64     `json:"leased"`
	Syncs    int64     `json:"syncs"`
	Recovers int64     `json:"recovers"`
	Dels     int64     `json:"Dels"`
	InMsgs   int64     `json:"in_msgs"`
	OutMsgs  int64     `json:"out_msgs"`
	InBytes  int64     `json:"in_bytes"`
	OutBytes int64     `json:"out_bytes"`
	HMean    float64   `json:"hmean"` // Event duration harmonic mean.
	P50      float64   `json:"p50"`   // Event duration nth percentiles ..
	P75      float64   `json:"p75"`
	P95      float64   `json:"p95"`
	P99      float64   `json:"p99"`
	P999     float64   `json:"p999"`
	Long5p   float64   `json:"long_5p"`  // Average of the longest 5% event durations.
	Short5p  float64   `json:"short_5p"` // Average of the shortest 5% event durations.
	Max      float64   `json:"max"`      // Highest event duration.
	Min      float64   `json:"min"`      // Lowest event duration.
	StdDev   float64   `json:"stddev"`   // Standard deviation.

}

Varz outputs tracedb stats on the monitoring port at /varz.

Directories

Path Synopsis
Package hash provides a minimal-memory AnchorHash consistent-hash implementation for Go.
Package hash provides a minimal-memory AnchorHash consistent-hash implementation for Go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL