esdb

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: MIT Imports: 11 Imported by: 2

README

Event Stream Database

Immutable storage for timestamped event streams. Inspired by CDB and LevelDB's SSTable file format.

At Customer.io, we process billions of events every month, and have a need for maintaining all historic events in full resolution in a simple manner (not feasible to simply distill into timeseries data).

After investigating strategies for maintaining these events, we enjoyed the simple approach of archiving old events into structured flat files which provides dead simple backups, restores, and needs no running process to work. After investigating current strategies and getting down and dirty with CDB and LevelDB, we choose to create ESDB as a stategy dedicated to querying event stream data.

WARNING: version 0.0.00000001. I wrote this in one day as a proof of concept. We're planning on using it in production but there's a lot of testing, fine-tuning, and potential large changes to do before then.

Example

Let's assume you're tracking website activity.

You would like to store all pageviews, clicks, and purchases for every user of your site. You'd like to quickly scan through the entire history for processing. However, you'd also like to occasionally just retrieve purchase events without the overhead of scanning through all the pageview and click data.

package main

import (
	"encoding/json"
	"fmt"
	"os"

  "github.com/customerio/esdb"
)

type event struct {
	customerId string
	timestamp  int
	eventType  string
	data       map[string]string
}

func main() {
	events := []event{
		event{"1", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"1", 1403534920, "click", map[string]string{"button_text": "Checkout"}},
		event{"1", 1403534921, "page", map[string]string{"url": "http://mysite.com/checkout"}},
		event{"1", 1403534923, "purchase", map[string]string{"total": "42.99"}},
		event{"1", 1403534923, "page", map[string]string{"url": "http://mysite.com/thankyou"}},
		event{"2", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"2", 1403534920, "click", map[string]string{"button_text": "About"}},
		event{"2", 1403534921, "page", map[string]string{"url": "http://mysite.com/about"}},
		event{"3", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"3", 1403534920, "click", map[string]string{"button_text": "About"}},
		event{"3", 1403534921, "page", map[string]string{"url": "http://mysite.com/about"}},
		event{"3", 1403534922, "click", map[string]string{"button_text": "Checkout"}},
		event{"3", 1403534923, "purchase", map[string]string{"total": "126.99"}},
		event{"3", 1403534923, "page", map[string]string{"url": "http://mysite.com/thankyou"}},
	}

	os.MkdirAll("tmp", 0755)

	// In case we've already created the file.
	os.Remove("tmp/activity.esdb")

	writer, err := esdb.New("tmp/activity.esdb")
	if err != nil {
		panic(err)
	}

	for _, e := range events {
		value, _ := json.Marshal(e.data)

		writer.Add(
			[]byte(e.customerId), // space the event will be stored under.
			value,                // value can be any binary data.
			e.timestamp,          // all events will be stored sorted by this value.
			"",                   // grouping. "" here means no grouping, store sequentially by timestamp.
			map[string]string{
				"type": e.eventType, // We'll define one secondary index on event type.
			},
		)
	}

	err = writer.Write()
	if err != nil {
		panic(err)
	}

	db, err := esdb.Open("tmp/activity.esdb")
	if err != nil {
		panic(err)
	}

	// Stream through all customer 1's activity
	fmt.Println("activity for 1:")
	db.Find([]byte("1")).Scan("", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Stream through all customer 2's activity
	fmt.Println("\nactivity for 2:")
	db.Find([]byte("2")).Scan("", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Just retrieve customer 1's purchases
	fmt.Println("\npurchases for 1:")
	db.Find([]byte("1")).ScanIndex("type", "purchase", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Just retrieve customer 3's clicks ordered descending
	fmt.Println("\nclicks for 3:")
	db.Find([]byte("3")).RevScanIndex("type", "click", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Output:
	// activity for 1:
	// {"total":"42.99"}
	// {"url":"http://mysite.com/thankyou"}
	// {"url":"http://mysite.com/checkout"}
	// {"button_text":"Checkout"}
	// {"url":"http://mysite.com/"}
	//
	// activity for 2:
	// {"url":"http://mysite.com/about"}
	// {"button_text":"About"}
	// {"url":"http://mysite.com/"}
	//
	// purchases for 1:
	// {"total":"42.99"}
	//
	// clicks for 3:
	// {"button_text":"Checkout"}
	// {"button_text":"About"}
}
Goals/Benefits
  1. Fast streaming of events ordered by timestamp.

    Events are stored on disk sequentially ordered by timestamp (or, an optional grouping and timestamp). This means scanning through a series of ordered events is extremely fast!

  2. Secondary indexes for quickly scanning through just a subset of events.

    Scanning though a subset of events is preferable to scanning through the entire event stream, especially when the event stream contains many events which you aren't currently interested. ESDB allows definition of secondary indexes which allow quick retrieval of just events in the index.

    Secondary indexes are retrieved in timestamp order, and can be scanned forward and backwards.

  3. Low overhead.

    Event overhead: as little 3 bytes + 17 bytes for each secondary index for each event stored. The 17 byte secondary index overhead is only applied for indexes which the particular event is apart.

    File overhead: offset information based on the number of spaces and groupings which are created is maintained at the end of the file. Generally, in a reasonably sized file, this should be negligible as event data and per event overhead should be the main driver of file size.

Format

TODO :(

Benchmarks

TODO :(

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Db

type Db struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"encoding/json"
	"fmt"
	"os"
)

type event struct {
	customerId string
	timestamp  int
	eventType  string
	data       map[string]string
}

func main() {
	events := []event{
		event{"1", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"1", 1403534920, "click", map[string]string{"button_text": "Checkout"}},
		event{"1", 1403534921, "page", map[string]string{"url": "http://mysite.com/checkout"}},
		event{"1", 1403534923, "purchase", map[string]string{"total": "42.99"}},
		event{"1", 1403534923, "page", map[string]string{"url": "http://mysite.com/thankyou"}},
		event{"2", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"2", 1403534920, "click", map[string]string{"button_text": "About"}},
		event{"2", 1403534921, "page", map[string]string{"url": "http://mysite.com/about"}},
		event{"3", 1403534919, "page", map[string]string{"url": "http://mysite.com/"}},
		event{"3", 1403534920, "click", map[string]string{"button_text": "About"}},
		event{"3", 1403534921, "page", map[string]string{"url": "http://mysite.com/about"}},
		event{"3", 1403534922, "click", map[string]string{"button_text": "Checkout"}},
		event{"3", 1403534923, "purchase", map[string]string{"total": "126.99"}},
		event{"3", 1403534923, "page", map[string]string{"url": "http://mysite.com/thankyou"}},
	}

	os.MkdirAll("tmp", 0755)

	// In case we've already created the file.
	os.Remove("tmp/activity.esdb")

	writer, err := New("tmp/activity.esdb")
	if err != nil {
		panic(err)
	}

	for _, e := range events {
		value, _ := json.Marshal(e.data)

		writer.Add(
			[]byte(e.customerId), // space the event will be stored under.
			value,                // value can be any binary data.
			e.timestamp,          // all events will be stored sorted by this value.
			"",                   // grouping. "" here means no grouping, store sequentially by timestamp.
			map[string]string{
				"type": e.eventType, // We'll define one secondary index on event type.
			},
		)
	}

	err = writer.Write()
	if err != nil {
		panic(err)
	}

	db, err := Open("tmp/activity.esdb")
	if err != nil {
		panic(err)
	}

	// Stream through all customer 1's activity
	fmt.Println("activity for 1:")
	db.Find([]byte("1")).Scan("", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Stream through all customer 2's activity
	fmt.Println("\nactivity for 2:")
	db.Find([]byte("2")).Scan("", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Just retrieve customer 1's purchases
	fmt.Println("\npurchases for 1:")
	db.Find([]byte("1")).ScanIndex("type", "purchase", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

	// Just retrieve customer 3's clicks
	fmt.Println("\nclicks for 3:")
	db.Find([]byte("3")).ScanIndex("type", "click", func(event *Event) bool {
		fmt.Println(string(event.Data))
		return true // continue
	})

}
Output:

activity for 1:
{"total":"42.99"}
{"url":"http://mysite.com/thankyou"}
{"url":"http://mysite.com/checkout"}
{"button_text":"Checkout"}
{"url":"http://mysite.com/"}

activity for 2:
{"url":"http://mysite.com/about"}
{"button_text":"About"}
{"url":"http://mysite.com/"}

purchases for 1:
{"total":"42.99"}

clicks for 3:
{"button_text":"Checkout"}
{"button_text":"About"}

func Open

func Open(path string) (*Db, error)

Opens a .esdb file for reading.

func (*Db) Close

func (db *Db) Close()

func (*Db) Find

func (db *Db) Find(id []byte) *Space

Finds and returns a space by it's id.

func (*Db) Iterate

func (db *Db) Iterate(process func(s *Space) bool) error

Iterates and returns each defined space.

type Event

type Event struct {
	Data      []byte
	Timestamp int
	// contains filtered or unexported fields
}

type Scanner

type Scanner func(*Event) bool

type Space

type Space struct {
	Id []byte
	// contains filtered or unexported fields
}

func (*Space) Iterate

func (s *Space) Iterate(process func(g string) bool) error

Iterates over grouping index and returns each grouping.

func (*Space) Scan

func (s *Space) Scan(grouping string, scanner Scanner)

func (*Space) ScanIndex

func (s *Space) ScanIndex(name, value string, scanner Scanner)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer provides an interface for creating a new ESDB file.

func New

func New(path string) (*Writer, error)

Creates a new ESDB database at the given path. If the file already exists, an error will be returned.

func (*Writer) Add

func (w *Writer) Add(spaceId []byte, data []byte, timestamp int, grouping string, indexes map[string]string) error

Adds a new event to the specified space, with grouping and indexes. Events aren't written to the file until writer.Flush(spaceId) or writer.Write() is called.

func (*Writer) Flush

func (w *Writer) Flush(spaceId []byte) (err error)

Flush writes an individual space to the file. This prevents any additional events from being added to the space. It may be advantagous to flush spaces individually once you've added events for that space, as flushing will reduce use the memory usage of creating a new ESDB file.

func (*Writer) Write

func (w *Writer) Write() (err error)

Write flushes any remaining spaces to the file, writes the index for locating spaces, and closes the file.

Directories

Path Synopsis
The blocks package is responsible for reading and writing potentially compressed binary data in blocks or chunks.
The blocks package is responsible for reading and writing potentially compressed binary data in blocks or chunks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL