stitchdb

package module
v0.0.0-...-98ba78d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2017 License: LGPL-3.0 Imports: 14 Imported by: 0

README

StitchDB
Build Report Coverage Docs Version

Yet another key value store - StitchDB is an in memory key-value store persisted with an append only log with support for geo-location and time series data. StitchDB is accompanied by StitchQL and StitchServer. Together these projects result in an entire database ecosystem complete with query language and standalone server daemon.

StitchDB's API is inspired by boltdb/bolt and tidwall/buntdband making use of their elegant API design. StitchDB strives to add a feature set that is tailored to a high throughput and less rigidly persistent use case with builtin multidimensional geo-location and time-series data support.

All contributions, ideas, and criticisms are welcome.

Coming Soon

  • StitchQL: A Query Language for StitchDB
    • Interpreted Language of Some Sort
    • Verb-like Syntax Tailored to Geo/Time Use Case
  • StitchServer: An HTTP API Around Stitch DB With Management System
    • DB Viewer/Editor
    • Users/Authentication
    • Performance Monitor/System Info
    • R-Tree Viewer
  • Built in Time Series Support

Goals

StitchDB was born out of a need to replace a legacy timeseries/geolocation package with a more robust real-time solution that could stand alone as a separate service with little work. It needed to have separation of data or buckets, searchable indexes, invalidation, expiration, and custom event callbacks. Additionally, we wanted the operation and code to remain as light weight and manipulable as possible.

Tradeoffs and Consideraitons

  • Fast operation and real-time snapshot of data over hardened and optimized data persistence.
  • Geographical functionality built in.
  • Easily extensible API and feature set.
  • Definable triggers for items, and buckets.
  • Ability for items to be expired and invalidated.
  • Minimal "stop-the-world" db/bucket manager executions.

Documentation

API documentation is available at stitchdb Godoc

The Wiki is full of explanations and examples:

https://github.com/cbergoon/stitchdb/wiki

Usage

There are more complete examples and how-to's in the resources above but to get started all you need to do is install StitchDB.

The dependencies for StitchDB are available by running the following:

go get github.com/cbergoon/btree
go get github.com/tidwall/gjson
go get github.com/dhconnelly/rtreego
go get github.com/juju/errors
go get github.com/cbergoon/StitchDB

Here is some boiler plate code to get started with:

package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/cbergoon/stitchdb"
)

func main() {

	c, _ := stitchdb.NewConfig(stitchdb.Persist, stitchdb.DirPath("path/to/loc/"), stitchdb.Sync(stitchdb.MNGFREQ), stitchdb.ManageFrequency(1*time.Second), stitchdb.Developer, stitchdb.PerformanceMonitor, stitchdb.BucketFileMultLimit(10))
	s, _ := stitchdb.NewStitchDB(c)

	s.Open()

	opts, _ := stitchdb.NewBucketOptions(stitchdb.BTreeDegree(32), stitchdb.Geo)
	s.CreateBucket("test", opts)

	s.Update("test", func(t *stitchdb.Tx) error {
		t.CreateIndex("user", stitchdb.INT_INDEX)
		for i := 0; i < 10; i++ {
			eopt, _ := stitchdb.NewEntryOptions()
			e, _ := stitchdb.NewEntry("k"+strconv.Itoa(i), "{ \"user\":\""+strconv.Itoa(10-i)+"\", \"coords\": ["+strconv.Itoa(i)+", 3.0]}", true, eopt)
			t.Set(e)
		}
		return nil
	})

	s.View("test", func(t *stitchdb.Tx) error {
		sz, _ := t.Size("")
		fmt.Println("Size: ", sz)

		t.Ascend("", func(e *stitchdb.Entry) bool {
			fmt.Println("Ascend Entries: ", e)
			return true
		})
		rect, _ := stitchdb.NewRect(stitchdb.Point{0.0, 0.0}, []float64{10, 10})
		fmt.Print("Nearest Neighbor: ")
		fmt.Print(t.NearestNeighbor(stitchdb.Point{5.2, 3.0}))
		fmt.Print("\n")
		fmt.Print("Search Within Radius: ")
		fmt.Print(t.SearchWithinRadius(stitchdb.Point{0.0, 0.0}, 5))
		fmt.Print("\n")
		fmt.Print("Search Intersect: ")
		fmt.Print(t.SearchIntersect(rect))
		fmt.Print("\n")
		return nil
	})

	time.Sleep(time.Second * 4)
	s.Close()
}

Then run it with:

go run <filename>.go

Performance

Benchmarks are ran on a MacBook Pro 3.1Ghz, 16GB RAM, SSD

Insertion

Retrieval

StitchDB Ecosystem (Future Work)

  • stitchserver - Builds a HTTP and RPC API layer over StitchDB allowing it to operate as a standalone service.
  • stitchraft - An distributed and consistent service that adds RAFT to stitchserver (work in progress name).
  • stitchql - A query language that interpreter that provides implements a simple language to access/manipulate StitchDB.

License

This project is licensed un the GNU Lesser General Public License. See the LICENSE file.

For license information on included libraries see LICENSE-3RD-PARTY file.

Documentation

Index

Constants

View Source
const (
	//BUCKET_CONFIG_FILE is the main DB AOF
	BUCKET_CONFIG_FILE string = "sbkt.conf"
	//BUCKET_FILE_EXTENSION is the bucket AOF file extension
	BUCKET_FILE_EXTENSION string = ".stitch"
	//BUCKET_TMP_FILE_EXTENSION is the bucket AOF file extension used when replacing file
	BUCKET_TMP_FILE_EXTENSION string = ".stitch.tmp"
)
View Source
const COMPACT_FACTOR int = 10

COMPACT_FACTOR is the Multiplier factor for to determine when to compact log.

View Source
const STITCH_VERSION string = "0.1.0"

Variables

This section is empty.

Functions

func BTreeDegree

func BTreeDegree(degree int) func(*BucketOptions) error

BTreeDegree sets the degree of the trees ued for the bucket.

func BucketFileMultLimit

func BucketFileMultLimit(limit int) func(*Config) error

BucketFileMultLimit sets the file compaction factor.

func Developer

func Developer(c *Config) error

Developer enables developer mode.

func Dims

func Dims(dims int) func(*BucketOptions) error

Dims sets the number of dimensions that will be utilized.

func DirPath

func DirPath(path string) func(*Config) error

DirPath sets the path where the db should be stored.

func ExpireTime

func ExpireTime(time time.Time) func(*EntryOptions) error

ExpireTime sets the time the entry will expire and enables expiration for the entry.

func Geo

func Geo(b *BucketOptions) error

Geo enables geo-location functionality.

func GeoRangeIsInclusive

func GeoRangeIsInclusive(b *BucketOptions) error

GeoRangeIsInclusive enables inclusive range checks.

func GetEntryComparator

func GetEntryComparator() func(obj1, obj2 rtreego.Spatial) bool

GetEntryComparator returns a function that is used by the rtree to compare entries. This function will compare on the key value (k) of the entry as a string. Todo: Maybe make the returned function an option that can be set

func InvalidTime

func InvalidTime(time time.Time) func(*EntryOptions) error

InvalidTime sets the time the entry will invalidate and enables invalidation for the entry.

func ManageFrequency

func ManageFrequency(frequency time.Duration) func(*Config) error

ManageFrequency sets the frequency at which the the db manager should run.

func PerformanceMonitor

func PerformanceMonitor(c *Config) error

PerformanceMonitor enables the performance monitor.

func Persist

func Persist(c *Config) error

Persist enables the db to persist to disk.

func Sync

func Sync(frequency IOFrequency) func(*Config) error

Sync sets the frequency at which the db file should be sync'd.

func System

func System(b *BucketOptions) error

System sets the system option.

func Time

func Time(b *BucketOptions) error

Time enable the time series functionality.

func Tol

func Tol(t float64) func(*EntryOptions) error

Tol sets the tolerance (accuracy) of the geo-location for the entry primarily used to build the rtree.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket represents a bucket in the database. Think 'table' but for key-value store.

func NewBucketFromStmt

func NewBucketFromStmt(db *StitchDB, stmtParts []string) (*Bucket, error)

NewBucketFromStmt creates a bucket for a given bucket statement.

type BucketOptions

type BucketOptions struct {
	// contains filtered or unexported fields
}

BucketOptions holds bucket metadata.

func NewBucketOptions

func NewBucketOptions(options ...func(*BucketOptions) error) (*BucketOptions, error)

NewBucketOptions creates a new bucket options using the provided option modifiers.

func NewBucketOptionsFromStmt

func NewBucketOptionsFromStmt(stmt []string) (*BucketOptions, error)

NewBucketOptionsFromStmt returns bucket options representing the options portion of the statement. Returns an error if the bucket statement could not be parsed.

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config holds StitchDB metadata.

func NewConfig

func NewConfig(options ...func(*Config) error) (*Config, error)

NewConfig creates a new config using the provided option modifiers.

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

Entry represents an item to be stored in the database.

func NewEntry

func NewEntry(k string, v string, geo bool, options *EntryOptions) (*Entry, error)

NewEntry creates a new entry object with the provided values. Returns an error if the default options failed to create.

func NewEntryFromStmt

func NewEntryFromStmt(stmtParts []string) (*Entry, error)

NewEntryFromStmt parses the statement provided and returns an entry representation. Returns an error if the statement could not be parsed or if the resulting entry could not be created.

func NewEntryWithGeo

func NewEntryWithGeo(k string, v string, options *EntryOptions) (*Entry, error)

NewEntryWithGeo creates a new entry; the entry value is expected to provide a "coords" field in the json provided. Returns an error if the default options failed to create.

func (*Entry) Bounds

func (e *Entry) Bounds() *rtreego.Rect

Bounds is used by rtree. Returns a Rect representation of the specified point using the entry options tolerance.

func (*Entry) EntryDeleteStmt

func (e *Entry) EntryDeleteStmt() []byte

EntryDeleteStmt builds and returns the delete statement for a given entity.

func (*Entry) EntryInsertStmt

func (e *Entry) EntryInsertStmt() []byte

EntryInsertStmt builds and returns the insert statement for a given entity.

func (*Entry) ExpiresAt

func (e *Entry) ExpiresAt() time.Time

ExpiresAt returns the time that an entry will expire.

func (*Entry) GetKey

func (e *Entry) GetKey() string

GetKey returns the key value of the entry.

func (*Entry) GetValue

func (e *Entry) GetValue() string

GetValue returns the value of the entry.

func (*Entry) InvalidatesAt

func (e *Entry) InvalidatesAt() time.Time

InvalidatesAt returns the time that an entry will invalidate.

func (*Entry) IsExpired

func (e *Entry) IsExpired() bool

IsExpired checks if the expire time for an entry has passed.

func (*Entry) IsInvalid

func (e *Entry) IsInvalid() bool

IsInvalid checks if the invalid time for an entry has passed.

func (*Entry) Less

func (e *Entry) Less(than btree.Item, itype interface{}) bool

Less is the comparator provided used to build the indexes over a bucket.

type EntryOptions

type EntryOptions struct {
	// contains filtered or unexported fields
}

EntryOptions represents the configuration for an entry determining how an entry will function within a bucket.

func NewEntryOptions

func NewEntryOptions(options ...func(*EntryOptions) error) (*EntryOptions, error)

NewEntryOptions creates a new entry using the provided option modifiers.

func NewEntryOptionsFromStmt

func NewEntryOptionsFromStmt(stmt []string) (*EntryOptions, error)

NewEntryOptionsFromStmt returns entry options representing the options portion of the statement. Returns an error if the entry statement could not be parsed.

type IOFrequency

type IOFrequency int

IOFrequency represents the frequency in which management operations will be executed.

const (
	//EACH action will take place at each commit/manage cycle
	EACH IOFrequency = iota
	//MNGFREQ action will take place at each manage cycle
	MNGFREQ
	//NONE action will never take place
	NONE
)

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index represents an index for a bucket. Buckets can have multiple indexes but indexes cannot have entries from multiple buckets.

func NewIndex

func NewIndex(ppath string, vtype IndexValueType, bkt *Bucket) (*Index, error)

NewIndex returns an index for the values provided. The index will be initialized but NOT built.

type IndexValueType

type IndexValueType int

IndexValueType represents the type of the field that the index will be built over.

const (
	//STRING_INDEX indicates data sorting should assume string.
	STRING_INDEX IndexValueType = iota
	//UINT_INDEX indicates data sorting should assume uint.
	UINT_INDEX
	//INT_INDEX indicates data sorting should assume int.
	INT_INDEX
	//FLOAT_INDEX indicates data sorting should assume float.
	FLOAT_INDEX
)

type Point

type Point []float64

Point provides an abstraction over the rtreego Point type so that dhconnelly/rtreego does not need to be included from the application using this library.

type RWMode

type RWMode int

RWMode represents the R/W access modifier.

const (
	//MODE_READ Read Only
	MODE_READ RWMode = iota
	//MODE_READ_WRITE Read and Write
	MODE_READ_WRITE
)

type RbCtx

type RbCtx struct {
	// contains filtered or unexported fields
}

RbCtx preserves the state of the tree during a transaction representing the changes made to allow for commits/rollbacks.

type Rect

type Rect struct {
	// contains filtered or unexported fields
}

Rect provides an abstraction over the rtreego Rect type so that dhconnelly/rtreego does not need to be included from the application using this library.

func NewRect

func NewRect(p Point, lengths []float64) (r *Rect, err error)

NewRect returns a new Rect.

type StitchDB

type StitchDB struct {
	// contains filtered or unexported fields
}

StitchDB represents the database object. All operations on the database originate from this object.

func NewStitchDB

func NewStitchDB(config *Config) (*StitchDB, error)

NewStitchDB returns a new StitchDB with the specified configuration. Note: this function only creates the representation of the DB and does not open or start the db.

func (*StitchDB) Close

func (db *StitchDB) Close() error

Close closes each bucket including system, flushes bucket config file, and closes the file. Waits until all bucket managers have exited.

func (*StitchDB) CreateBucket

func (db *StitchDB) CreateBucket(name string, options *BucketOptions) error

CreateBucket creates and opens a new bucket.

func (*StitchDB) DropBucket

func (db *StitchDB) DropBucket(name string) error

DropBucket closes bucket and removes the bucket from the db.

func (*StitchDB) GetConfig

func (db *StitchDB) GetConfig() *Config

GetConfig returns a the configuration for the db.

func (*StitchDB) Open

func (db *StitchDB) Open() error

Open initializes the db for use and starts the manager routine. Open opens/creates the main db append only file, parses the statements within, creates the buckets stored in the file, and opens each bucket. Returns an error if the process was not able to create the directory, failed to read the stitch db

func (*StitchDB) SetConfig

func (db *StitchDB) SetConfig(config *Config)

SetConfig sets the configuration for the db.

func (*StitchDB) Update

func (db *StitchDB) Update(bucket string, f func(t *Tx) error) error

Update creates a read only transaction and passes the open transaction to the provided function. The created transaction will provide read/write access to the bucket specified by the bucket name provided. Returns an error if the db is closed or the bucket is invalid.

func (*StitchDB) View

func (db *StitchDB) View(bucket string, f func(t *Tx) error) error

View creates a read only transaction and passes the open transaction to the provided function. The created transaction will provide read only access to the bucket specified by the bucket name provided. Returns an error if the db is closed or the bucket is invalid.

type SystemEntry

type SystemEntry struct {
	InitialLoadTime time.Time     `json:"InitialLoadTime"`
	StartUpTime     time.Duration `json:"startUpTime"`
	LoadTime        time.Duration `json:"loadTime"`
	BucketCount     int           `json:"bucketCount"`
	BucketList      []string      `json:"bucketList"`
	DbManagerTime   time.Duration `json:"dbManagerTime"`
	Version         string        `json:"version"`
}

type SystemPerformanceEntry

type SystemPerformanceEntry struct {
	Transaction    bool          `json:"transaction"`
	Mode           RWMode        `json:"mode"`
	Bucket         string        `json:"bucket"`
	Commit         bool          `json:"commit"`
	Rollback       bool          `json:"rollback"`
	SyncTime       time.Duration `json:"SyncTime"`
	TxTime         time.Duration `json:"TxTime"`
	ManageTime     time.Duration `json:"manageTime"`
	ManageSynced   bool          `json:"manageSynced"`
	ManageSyncTime time.Duration `json:"manageSyncTime"`
}

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx represents the a transaction including rollback information.

func (*Tx) Ascend

func (t *Tx) Ascend(index string, f func(e *Entry) bool) error

Ascend iterates over the items in the bucket using the specified index for each item calling the provided function f terminating only when there are no more entries in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) AscendGreaterOrEqual

func (t *Tx) AscendGreaterOrEqual(index string, pivot *Entry, f func(e *Entry) bool) error

AscendGreaterOrEqual iterates over the items in the bucket using the specified index for each item greater than or equal to the pivot entry calling the provided function f terminating only when there are no more entries in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) AscendLessThan

func (t *Tx) AscendLessThan(index string, pivot *Entry, f func(e *Entry) bool) error

AscendLessThan iterates over the items in the bucket using the specified index for each item less than the pivot entry calling the provided function f. Iteration terminates only when there are no more entries less than pivot in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) AscendRange

func (t *Tx) AscendRange(index string, greaterOrEqual *Entry, lessThan *Entry, f func(e *Entry) bool) error

AscendRange iterates over the items in the bucket that are greater than or equal to greaterOrEqual and less than lessThan calling the provided function f. Iteration terminates only when there are no more entries in the range or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) CreateIndex

func (t *Tx) CreateIndex(pattern string, vtype IndexValueType) error

CreateIndex builds an index over a field of the value of the entry. The field is identified by pattern and its type is described by vtype. Returns an error if the db or bucket is closed, the index already exists, or if an error occurred while populating the index.

func (*Tx) Delete

func (t *Tx) Delete(e *Entry) (*Entry, error)

Delete removes an entry from the bucket. If an entry is removed returns the removed entry otherwise returns nil. Returns an error if the db or bucket is closed.

func (*Tx) Descend

func (t *Tx) Descend(index string, f func(e *Entry) bool) error

Descend iterates over the items in the bucket using the specified index for each item calling the provided function f terminating only when there are no more entries in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) DescendGreaterThan

func (t *Tx) DescendGreaterThan(index string, pivot *Entry, f func(e *Entry) bool) error

DescendGreaterThan iterates over the items in the bucket using the specified index for each item greater than to the pivot entry calling the provided function f terminating only when there are no more entries greater than pivot in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) DescendLessOrEqual

func (t *Tx) DescendLessOrEqual(index string, pivot *Entry, f func(e *Entry) bool) error

DescendLessOrEqual iterates over the items in the bucket using the specified index for each item less than the pivot entry calling the provided function f. Iteration terminates only when there are no more entries less than or equal to pivot in the bucket or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) DescendRange

func (t *Tx) DescendRange(index string, lessOrEqual *Entry, greaterThan *Entry, f func(e *Entry) bool) error

DescendRange iterates over the items in the bucket that are less than or equal to lessOrEqual and greater than greaterThan calling the provided function f. Iteration terminates only when there are no more entries in the range or the provided function returns false. An empty string represents no index in which case entries will use the default key ordering. Note: only the portion of the entry that the index is built with needs to be populated.

func (*Tx) DropIndex

func (t *Tx) DropIndex(pattern string) error

DropIndex removes an index specified by pattern. Returns an error if the db or bucket is closed or if the index does not exist.

func (*Tx) Get

func (t *Tx) Get(e *Entry) (*Entry, error)

Get returns an entry from the bucket using the default tree to search (i.e. searches on entry key). Returns nil if the the entry is invalid, expired, or not found in the bucket. Returns an error if the db or bucket is closed.

func (*Tx) Has

func (t *Tx) Has(index string, e *Entry) (bool, error)

Has chacks if an entry exists in the bucket for a given index. An empty string represents no index in which case entries will use the default key ordering.

func (*Tx) Indexes

func (t *Tx) Indexes() ([]string, error)

Indexes returns a slice of strings containing the names (patterns) of all indexes in the bucket.

func (*Tx) Max

func (t *Tx) Max(index string) (*Entry, error)

Max returns the maximum value entry inthe bucket for a given index. An empty string represents no index in which case the entry with the maximum key will be found.

func (*Tx) Min

func (t *Tx) Min(index string) (*Entry, error)

Min returns the minimum value entry inthe bucket for a given index. An empty string represents no index in which case the entry with the minimum key will be found.

func (*Tx) NearestNeighbor

func (t *Tx) NearestNeighbor(pt Point) (*Entry, error)

NearestNeighbor returns the closest neighbor to a given point pt. Returns an error if the bucket is not geo enabled.

func (*Tx) NearestNeighbors

func (t *Tx) NearestNeighbors(k int, pt Point) ([]*Entry, error)

NearestNeighbors returns a slice of the k closest entries to a given point pt. Returns an error if the bucket is not geo enabled.

func (*Tx) SearchIntersect

func (t *Tx) SearchIntersect(rbb *Rect) ([]*Entry, error)

SearchIntersect finds entries of the bucket that fall within the bounds of the provided rectangle. Bucket must be configured for geolocation. Returns a slice containing pointers to the entries that are within the bounds of the rectangle. Returns an error if the bucket is not geo enabled.

func (*Tx) SearchWithinRadius

func (t *Tx) SearchWithinRadius(pt Point, radius float64) ([]*Entry, error)

SearchWithinRadius finds entries that are within an n-dimensional sphere centered at point pt with a radius of radius. This condition is determined by finding the Euclidean distance between the center point of the n-sphere and the point in question. The result is then determined by comparing the radius of the n-sphere and the distance between the two points. If the GeoRangeIsInclusive option is set for the bucket then the point is found the be within the n-sphere if the distance between the two points is less than the specified radius. If the GeoRangeIsInclusive option is not set then the point is found to be within the n-sphere if the distance between the two points is less than or equal to the specified radius. Returns an error if the bucket is not geo enabled.

func (*Tx) Set

func (t *Tx) Set(e *Entry) (*Entry, error)

Set inserts an entry into the bucket. If the key of the entry to insert already exists in the tree the old entry is replaced and returned otherwise returns nil. Returns an error if the transaction is iterating and if the the db or bucket is closed.

func (*Tx) Size

func (t *Tx) Size(index string) (int, error)

Size returns the number of entries in the bucket.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL