Documentation ¶
Overview ¶
Package kv turns your S3 bucket into a diffable serverless key-value store.
Requirements ¶
- go1.14+
- S3-compatible storage, like minio, Wasabi, or AWS
Example ¶
package main import ( "context" "fmt" "net/http/httptest" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" "github.com/jrhy/mast" "github.com/jrhy/s3db/kv" ) func s3Config(endpoint string) *aws.Config { return &aws.Config{ Credentials: credentials.NewStaticCredentials( "TEST-ACCESSKEYID", "TEST-SECRETACCESSKEY", "", ), Region: aws.String("ca-west-1"), Endpoint: &endpoint, S3ForcePathStyle: aws.Bool(true), } } func setupS3(bucketName string) (*aws.Config, func()) { backend := s3mem.New() faker := gofakes3.New(backend) ts := httptest.NewServer(faker.Server()) s3cfg, closer := s3Config(ts.URL), ts.Close c := s3.New(session.New(s3cfg)) _, err := c.CreateBucket(&s3.CreateBucketInput{ Bucket: &bucketName, }) if err != nil { panic(err) } return s3cfg, closer } func main() { ctx := context.Background() s3cfg, close := setupS3("bucket") defer close() c := s3.New(session.New(s3cfg)) cfg := kv.Config{ Storage: &kv.S3BucketInfo{ EndpointURL: c.Endpoint, BucketName: "bucket", Prefix: "/my-awesome-database", }, KeysLike: "key", ValuesLike: 1234, NodeCache: mast.NewNodeCache(1024), NodeEncryptor: kv.V1NodeEncryptor([]byte("This is a secret passphrase if ever there was one.")), } s, err := kv.Open(ctx, c, cfg, kv.OpenOptions{}, time.Now()) if err != nil { panic(err) } // setting a value err = s.Set(ctx, time.Now(), "hello", 5) if err != nil { panic(err) } // getting a value var v int ok, err := s.Get(ctx, "hello", &v) if err != nil { panic(err) } if !ok { panic("how is that not OK?") } fmt.Printf("hello=%d\n", v) _, err = s.Commit(ctx) if err != nil { panic(err) } fmt.Printf("size %d\n", s.Size()) }
Output: hello=5 size 1
Index ¶
- Constants
- Variables
- func DeleteHistoricVersions(ctx context.Context, s *DB, before time.Time) error
- type Config
- type Cursor
- type DB
- func (d *DB) BranchFactor() uint
- func (s *DB) Cancel()
- func (s *DB) Clone(ctx context.Context) (*DB, error)
- func (s *DB) Commit(ctx context.Context) (*string, error)
- func (d *DB) Cursor(ctx context.Context) (*Cursor, error)
- func (s DB) Diff(ctx context.Context, from *DB, ...) error
- func (s *DB) Get(ctx context.Context, key interface{}, value interface{}) (bool, error)
- func (s DB) Height() int
- func (s DB) IsDirty() bool
- func (s *DB) IsTombstoned(ctx context.Context, key interface{}) (bool, error)
- func (s *DB) RemoveTombstones(ctx context.Context, before time.Time) error
- func (s *DB) Roots() ([]string, error)
- func (s *DB) Set(ctx context.Context, when time.Time, key interface{}, value interface{}) error
- func (s DB) Size() uint64
- func (d *DB) StartDiff(ctx context.Context, other *DB) (*DiffCursor, error)
- func (s *DB) Tombstone(ctx context.Context, when time.Time, key interface{}) error
- func (s *DB) TraceHistory(ctx context.Context, key interface{}, after time.Time, ...) error
- type DiffCursor
- type Encryptor
- type OnConflictMerged
- type OpenOptions
- type S3BucketInfo
- type S3Interface
Examples ¶
Constants ¶
const ( // DefaultBranchFactor is the number of entries that will be stored per S3 object, if // not overridden by Config.BranchFactor. DefaultBranchFactor = 4096 )
Variables ¶
var ErrMACVerificationFailure = errors.New("MAC verification failure")
var ( // ErrReadOnly is the result of an attempt to Set or Commit on a tree that was not opened for writing. ErrReadOnly = errors.New("opened as read-only") )
Functions ¶
func DeleteHistoricVersions ¶
DeleteeHistoricalVersionsAndData deletes storage associated with old versions, and any nodes from those versions that are no longer necessary for reading later versions. Deletion reduces the number of objects in the bucket but means that attempts to Diff(), TraceHistory(), or merge versions before the cutoff will fail.
Types ¶
type Config ¶
type Config struct { // S3 endpoint, bucket name, and database prefix Storage *S3BucketInfo // An optional label that will be added to committed versions' names for easier listing (e.g. "daily-report-") CustomRootPrefix string // An example of a key, to know what concrete type to make when unmarshaling KeysLike interface{} // An example of a value, to know what concrete type to make when unmarshaling ValuesLike interface{} // Sets the entries-per-S3 object for new trees (ignored unless tree is empty) BranchFactor uint // An optional S3-endpoint-scoped cache, instead of re-downloading recently-used tree nodes NodeCache mast.NodeCache // A way to keep your cloud provider out of your nodes NodeEncryptor Encryptor // OnConflictMerged is a callback that will be invoked whenever entries for a key have // different values in different trees that are being merged. It can be used to detect // when a uniqueness constraint has been broken and which keys need fixing. OnConflictMerged // LogFunc is a callback that will be invoked to provide details on potential corruption. LogFunc func(string) CustomMerge func(key interface{}, v1, v2 crdtpub.Value) crdtpub.Value MastNodeFormat string CustomMarshal func(interface{}) ([]byte, error) // CustomUnmarshal, if using registered types, will be invoked // to read tree nodes which are mast.PersistedNode with keys of // KeysLike, and values of crdtpub.Value of ValuesLike. All of // those should be registered before invoking kv.Open(). CustomUnmarshal func([]byte, interface{}) error UnmarshalUsesRegisteredTypes bool }
Config defines how values are stored and (un)marshaled.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is an Open()ed database.
func Open ¶
func Open(ctx context.Context, S3 S3Interface, cfg Config, opts OpenOptions, when time.Time) (*DB, error)
Open returns a database. 'when' marks the creation time of the new version.
func (*DB) BranchFactor ¶
func (*DB) Cancel ¶
func (s *DB) Cancel()
Cancel indicates you don't want any of the previously-set entries persisted.
func (*DB) Clone ¶
Clone returns an independent database, with the same entries and uncommitted values as its source. Clones don't duplicate nodes that can be shared.
func (DB) Diff ¶
func (s DB) Diff( ctx context.Context, from *DB, f func(key, myValue, fromValue interface{}) (keepGoing bool, err error), ) error
Diff shows the differences from the the given tree to this one, invoking the given callback for each added/removed/changed value.
func (*DB) Get ¶
Get retrieves the value for the given key. value must be a pointer that to which an Config.ValuesLike can be assigned, OR a pointer to a crdt.Value, in which case the CRDT value metadata will be included.
func (DB) Height ¶
Height is the number of nodes between a leaf and the root—the number of nodes that need to be retrieved to do a Get() in the worst case.
func (DB) IsDirty ¶
IsDirty returns true if there are entries in memory that haven't been Commit()ted.
func (*DB) IsTombstoned ¶
IsTombstoned returns true if Tombstone() was invoked and committed since the last RemoveTombstones().
func (*DB) RemoveTombstones ¶
RemoveTombstones gets rid of the old tombstones, in the current version. The time should be chosen such that there are not going to be any merges for affected entries, otherwise merged sets will start new rounds of values for the affected entries. For example, if it would be valid for clients to retry hour-old requests, 'before' should be at least an hour ago.
func (*DB) Set ¶
Set puts a new value in memory. If the database already has a value later than "when", this does nothing. Any new values are buffered in memory until Commit()ted or Cancel()ed.
func (*DB) Tombstone ¶
Tombstone sets a persistent record indicating an entry will no longer be accessible, until RemoveTombstones() is done. If multiple versions have different values or tombstones for a key, they'll eventually get merged into the earliest tombstone. You can set a tombstone even if the entry was never set to a value, just to ensure that any future sets are ineffective.
type DiffCursor ¶ added in v0.1.33
type DiffCursor struct {
*mast.DiffCursor
}
type Encryptor ¶
type Encryptor interface { Encrypt(path string, value []byte) ([]byte, error) Decrypt(path string, value []byte) ([]byte, error) }
Encryptor encrypts bytes with a nonce derived from the given path.
func V1NodeEncryptor ¶
type OnConflictMerged ¶
type OnConflictMerged func(key, v1, v2 interface{}) error
OnConflictMerged is a callback that will be invoked whenever entries for a key have different values in different trees that are being merged. It can be used to detect when a uniqueness constraint has been broken and which keys need fixing.
type OpenOptions ¶
type OpenOptions struct { // ReadOnly means the database should not permit modifications. ReadOnly bool // OnlyVersions is used to see a tree as it looked for a set of historic versions OnlyVersions []string // ForceRebranch is used to change the branch factor of an existing tree, rewriting nodes and roots if necessary ForceRebranch bool }
OpenOptions control how databases are opened.
type S3BucketInfo ¶
type S3BucketInfo struct { // EndpointURL is used to distinguish servers for caching EndpointURL string // BucketName where the node objects are stored BucketName string // Prefix is a key prefix that distinguishes objects for this tree Prefix string }
S3BucketInfo describes where DB objects, including nodes and roots, are stored.
type S3Interface ¶
type S3Interface interface { // à la AWS SDK for S3 DeleteObjectWithContext(ctx aws.Context, input *s3.DeleteObjectInput, opts ...request.Option) (*s3.DeleteObjectOutput, error) // à la AWS SDK for S3 GetObjectWithContext(ctx aws.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) // à la AWS SDK for S3 ListObjectsV2WithContext(ctx aws.Context, input *s3.ListObjectsV2Input, opts ...request.Option) (*s3.ListObjectsV2Output, error) // à la AWS SDK for S3 PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) }
S3Interface is the subset of the AWS SDK for S3 used access compatible buckets