katamari

package module
v0.0.0-...-d7c4d32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 34 Imported by: 7

README

katamari

Test

katamari

Zero configuration data persistence and communication layer.

Web service that behaves like a distributed filesystem in the sense that all routes are open by default, oposite to rails like frameworks where the user must define the routes before being able to interact with them.

Provides a dynamic websocket and restful http service to quickly prototype realtime applications, the interface has no fixed data structure or access regulations by default, to restrict access see: define limitations.

features

  • dynamic routing
  • glob pattern routes
  • patch updates on subscriptions
  • version check on subscriptions (no message on version match)
  • restful CRUD service that reflects interactions to real-time subscriptions
  • storage interfaces for memory only or leveldb and memory
  • filtering and audit middleware
  • auto managed timestamps (created, updated)

quickstart

client

There's a js client.

server

with go installed get the library

go get github.com/benitogf/katamari

create a file main.go

package main

import "github.com/benitogf/katamari"

func main() {
  app := katamari.Server{}
  app.Start("localhost:8800")
  app.WaitClose()
}

run the service:

go run main.go

routes

method description url
GET key list http://{host}:{port}
websocket clock ws://{host}:{port}
POST create/update http://{host}:{port}/{key}
GET read http://{host}:{port}/{key}
DELETE delete http://{host}:{port}/{key}
websocket subscribe ws://{host}:{port}/{key}

control

static routes

Activating this flag will limit the server to process requests defined in read and write filters

app := katamari.Server{}
app.Static = true
filters
  • Write filters will be called before processing a write operation
  • Read filters will be called before sending the results of a read operation
  • if the static flag is enabled only filtered routes will be available
app.WriteFilter("books/*", func(index string, data []byte) ([]byte, error) {
  // returning an error will deny the write
  return data, nil
})
app.ReadFilter("books/taup", func(index string, data []byte) ([]byte, error) {
  // returning an error will deny the read
  return []byte("intercepted"), nil
})
app.DeleteFilter("books/taup", func(key string) (error) {
  // returning an error will deny the read
  return errors.New("can't delete")
})
audit
app.Audit = func(r *http.Request) bool {
  return false // condition to allow access to the resource
}
subscribe events capture
// new subscription event
server.OnSubscribe = func(key string) error {
  log.Println(key)
  // returning an error will deny the subscription
  return nil
}
// closing subscription event
server.OnUnsubscribe = func(key string) {
  log.Println(key)
}
extra routes
// Predefine the router
app.Router = mux.NewRouter()
app.Router.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
  w.Header().Set("Content-Type", "application/json")
  fmt.Fprintf(w, "123")
})
app.Start("localhost:8800")

libraries

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var TEST_DATA = `` /* 11110-byte string literal not displayed */

https://gist.github.com/slaise/9b9d63e0d59e8c8923bbd9d53f5beb61 https://medium.com/geekculture/my-golang-json-evaluation-20a9ca6ef79c

Functions

func NoopFilter

func NoopFilter(index string, data []byte) ([]byte, error)

NoopFilter open noop filter

func NoopHook

func NoopHook(index string) error

NoopHook open noop hook

func StorageGetNRangeTest

func StorageGetNRangeTest(app *Server, t *testing.T, n int)

StorageGetNRangeTest testing storage GetN function

func StorageGetNTest

func StorageGetNTest(app *Server, t *testing.T, n int)

StorageGetNTest testing storage GetN function

func StorageKeysRangeTest

func StorageKeysRangeTest(app *Server, t *testing.T, n int)

StorageKeysRangeTest testing storage GetN function

func StorageListTest

func StorageListTest(app *Server, t *testing.T, testData string)

StorageListTest testing storage function

func StorageObjectTest

func StorageObjectTest(app *Server, t *testing.T)

StorageObjectTest testing storage function

func StorageSetGetDelTest

func StorageSetGetDelTest(db Database, b *testing.B)

StorageSetGetDelTest testing storage function

func StreamBroadcastFilterTest

func StreamBroadcastFilterTest(t *testing.T, app *Server)

StreamBroadcastFilterTest testing stream function

func StreamBroadcastTest

func StreamBroadcastTest(t *testing.T, app *Server)

StreamBroadcastTest testing stream function

func StreamGlobBroadcastTest

func StreamGlobBroadcastTest(t *testing.T, app *Server, n int)

StreamGlobBroadcastTest testing stream function

func StreamItemGlobBroadcastTest

func StreamItemGlobBroadcastTest(t *testing.T, app *Server)

StreamItemGlobBroadcastTest testing stream function

func Time

func Time() string

Time returns a string timestamp

func WatchStorageNoop

func WatchStorageNoop(dataStore Database)

WatchStorageNoop a noop reader of the watch channel

Types

type Apply

type Apply func(key string, data []byte) ([]byte, error)

Apply filter function type for functions will serve as filters key: the key to filter data: the data received or about to be sent returns data: to be stored or sent to the client error: will prevent data to pass the filter

type ApplyDelete

type ApplyDelete func(key string) error

ApplyDelete callback function

type Database

type Database interface {
	Active() bool
	Start(StorageOpt) error
	Close()
	Keys() ([]byte, error)
	KeysRange(path string, from, to int64) ([]string, error)
	Get(key string) ([]byte, error)
	GetN(path string, limit int) ([]objects.Object, error)
	GetNRange(path string, limit int, from, to int64) ([]objects.Object, error)
	Set(key string, data string) (string, error)
	Pivot(key string, data string, created, updated int64) (string, error)
	Del(key string) error
	Clear()
	Watch() StorageChan
}

Database interface to be implemented by storages

Active: returns a boolean with the state of the storage

Start: will attempt to start a storage client

Close: closes the storage client

Keys: returns a list with existing keys in the storage

Get(key): retrieve a value or list of values, the key can include a glob pattern

GetN(path, N): retrieve N list of values matching a glob pattern

Set(key, data): store data under the provided key, key cannot not include glob pattern

Del(key): Delete a key from the storage

Clear: will clear all keys from the storage (used for testing)

Watch: returns a channel that will receive any set or del operation

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage composition of Database interface

func (*MemoryStorage) Active

func (db *MemoryStorage) Active() bool

Active provides access to the status of the storage client

func (*MemoryStorage) Clear

func (db *MemoryStorage) Clear()

Clear all keys in the storage

func (*MemoryStorage) Close

func (db *MemoryStorage) Close()

Close the storage client

func (*MemoryStorage) Del

func (db *MemoryStorage) Del(path string) error

Del a key/pattern value(s)

func (*MemoryStorage) Get

func (db *MemoryStorage) Get(path string) ([]byte, error)

Get a key/pattern related value(s)

func (*MemoryStorage) GetN

func (db *MemoryStorage) GetN(path string, limit int) ([]objects.Object, error)

GetN get last N elements of a path related value(s)

func (*MemoryStorage) GetNRange

func (db *MemoryStorage) GetNRange(path string, limit int, from, to int64) ([]objects.Object, error)

GetNRange get last N elements of a path related value(s)

func (*MemoryStorage) Keys

func (db *MemoryStorage) Keys() ([]byte, error)

Keys list all the keys in the storage

func (*MemoryStorage) KeysRange

func (db *MemoryStorage) KeysRange(path string, from, to int64) ([]string, error)

KeysRange list keys in a path and time range

func (*MemoryStorage) Peek

func (db *MemoryStorage) Peek(key string, now int64) (int64, int64)

Peek a value timestamps

func (*MemoryStorage) Pivot

func (db *MemoryStorage) Pivot(path string, data string, created int64, updated int64) (string, error)

Pivot set entries on pivot instances (force created/updated values)

func (*MemoryStorage) Set

func (db *MemoryStorage) Set(path string, data string) (string, error)

Set a value

func (*MemoryStorage) Start

func (db *MemoryStorage) Start(storageOpt StorageOpt) error

Start the storage client

func (*MemoryStorage) Watch

func (db *MemoryStorage) Watch() StorageChan

Watch the storage set/del events

type Notify

type Notify func(key string)

Notify after a write is done

type Server

type Server struct {
	Router *mux.Router
	Stream stream.Stream

	Pivot           string
	NoBroadcastKeys []string
	DbOpt           interface{}
	Audit           audit
	Workers         int
	ForcePatch      bool
	OnSubscribe     stream.Subscribe
	OnUnsubscribe   stream.Unsubscribe
	OnClose         func()
	Deadline        time.Duration
	AllowedOrigins  []string
	AllowedMethods  []string
	AllowedHeaders  []string
	ExposedHeaders  []string
	Storage         Database
	Address         string

	Silence           bool
	Static            bool
	Tick              time.Duration
	Console           *coat.Console
	Signal            chan os.Signal
	Client            *http.Client
	ReadTimeout       time.Duration
	WriteTimeout      time.Duration
	ReadHeaderTimeout time.Duration
	IdleTimeout       time.Duration
	// contains filtered or unexported fields
}

Server application

Router: can be predefined with routes and passed to be extended

NoBroadcastKeys: array of keys that should not broadcast on changes

DbOpt: options for storage

Audit: function to audit requests

Workers: number of workers to use as readers of the storage->broadcast channel

ForcePatch: flag to force patch operations even if the patch is bigger than the snapshot

OnSubscribe: function to monitor subscribe events

OnUnsubscribe: function to monitor unsubscribe events

OnClose: function that triggers before closing the application

Deadline: time duration of a request before timing out

AllowedOrigins: list of allowed origins for cross domain access, defaults to ["*"]

AllowedMethods: list of allowed methods for cross domain access, defaults to ["GET", "POST", "DELETE", "PUT"]

AllowedHeaders: list of allowed headers for cross domain access, defaults to ["Authorization", "Content-Type"]

ExposedHeaders: list of exposed headers for cross domain access, defaults to nil

Storage: database interdace implementation

Silence: output silence flag

Static: static routing flag

Tick: time interval between ticks on the clock subscription

Signal: os signal channel

Client: http client to make requests

Example
package main

import (
	"github.com/benitogf/katamari"
)

func main() {
	app := katamari.Server{}
	app.Start("localhost:8800")
	app.WaitClose()
}
Output:

func (*Server) Active

func (app *Server) Active() bool

Active check if the server is active

func (*Server) AfterFilter

func (app *Server) AfterFilter(path string, apply Notify)

AfterFilter add a filter that triggers after a successful write

func (*Server) Close

func (app *Server) Close(sig os.Signal)

Close : shutdown the http server and database connection

func (*Server) DeleteFilter

func (app *Server) DeleteFilter(path string, apply ApplyDelete)

DeleteFilter add a filter that runs before sending a read result

func (*Server) OpenFilter

func (app *Server) OpenFilter(name string)

OpenFilter open noop read and write filters

func (*Server) ReadFilter

func (app *Server) ReadFilter(path string, apply Apply)

ReadFilter add a filter that runs before sending a read result

func (*Server) Start

func (app *Server) Start(address string)

Start : initialize and start the http server and database connection

func (*Server) WaitClose

func (app *Server) WaitClose()

WaitClose : Blocks waiting for SIGINT, SIGTERM, SIGKILL, SIGHUP

func (*Server) WriteFilter

func (app *Server) WriteFilter(path string, apply Apply)

WriteFilter add a filter that triggers on write

type Stats

type Stats struct {
	Keys []string `json:"keys"`
}

Stats data structure of global keys

type Storage

type Storage struct {
	Active bool
	Db     Database
}

Storage abstraction of persistent data layer

type StorageChan

type StorageChan chan StorageEvent

StorageChan an operation events channel

type StorageEvent

type StorageEvent struct {
	Key       string
	Operation string
}

StorageEvent an operation event

type StorageOpt

type StorageOpt struct {
	NoBroadcastKeys []string
	DbOpt           interface{}
}

StorageOpt options of the storage instance

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL