dynastore

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

dynastore

This is a small K/V library written Go, which uses AWS DynamoDB as the data store.

It supports create, read, update and delete (CRUD) for key/value pairs.

GitHub Actions status Go Report Card Documentation

Usage

The following example illustrates CRUD with optimistic locking for create, update and delete. To ensure changes are atomic a version attribute stored with the record in dynamodb.

	awsCfg := &aws.Config{}

	client := dynastore.New(awsCfg)

	tbl := client.Table("CRMTable")

	customersPart := tbl.Partition("customers")

	// the payload of the record contains any data which isn't a part of the sort key
	newCustomer := &Customer{Name: "welcome"}

	// a local index which uses created is provided in the extra fields, this is used to list records by creation date
	indexFields := &Index{Created: time.Now().Format(time.RFC3339)}

	// create a record using atomic put, this will ensure the key is unique with any collision triggering a error
	created, kv, err := customersPart.AtomicPut("01FCFSDXQ8EYFCNMEA7C2WJG74", dynastore.WriteWithString(newCustomer.ToJson()), dynastore.WriteWithFields(indexFields.ToFields()))
	if err != nil {
		log.Fatalf("failed to put: %s", err)
	}

	log.Printf("created: %v, id: %s, name: %s, version: %d", created, kv.Partition, kv.Key, kv.Version)

	// read back the records
	page, err := customersPart.ListPage("", dynastore.ReadWithLocalIndex("idx_created", "created"), dynastore.ReadWithLimit(100))
	if err != nil {
		log.Fatalf("failed to put: %s", err)
	}

	log.Printf("found records count: %d", len(page.Keys))

	// update the record
	newCustomer.Status = "enabled"

	// perfom an atomic update, ensuring that the record hasn't changed version in the time between create and update
	// this uses optimistic locking via a version attribute stored with the record in dynamodb
	created, kv, err = customersPart.AtomicPut("01FCFSDXQ8EYFCNMEA7C2WJG74", dynastore.WriteWithString(newCustomer.ToJson()), dynastore.WriteWithPreviousKV(kv))
	if err != nil {
		log.Fatalf("failed to put: %s", err)
	}

	log.Printf("created: %v, id: %s, name: %s version: %d", created, kv.Partition, kv.Key, kv.Version)

	// perform an atomic delete of the record, this again uses optimistic locking via a version attribute stored with the record in dynamodb
	deleted, err := customersPart.AtomicDelete("01FCFSDXQ8EYFCNMEA7C2WJG74", kv)
	if err != nil {
		log.Fatalf("failed to put: %s", err)
	}

	log.Printf("deleted: %v, id: %s, name: %s version: %d", deleted, kv.Partition, kv.Key, kv.Version)

What is the problem?

The main problems I am trying to solve in with this package are:

  1. Enable users of the API to store and coordinate work across resources, using multiple lambdas, and containers running in a range of services.
  2. Provide a solid and simple storage API which can be used no matter how small your project is.
  3. Try and make this API simple, while also reduce the operations for this service using AWS services.

Why DynamoDB?

DynamoDB is used for storage in a range of Amazon provided APIs and libraries, so I am not the first to do this. see references. This service also satisfy the requirement to be easy to start with as it is a managed service, no EC2 or patching required.

Cost?

I am currently working on some testing around this, but with a bit of tuning you can keep the read/write load very low. But this is specifically designed as a starting point, while ensuring there is a clear abstraction between the underlying services and your code.

To manage this I would recommend you set alarms for read / write metrics, start with on demand but you will probably want to switch to specific read/write limits for production.

References

Prior work in this space:

This borrows a lot of ideas, tests and a subset of the API from https://github.com/abronan/valkeyrie.

Updates to the original API are based on a great blog post by @davecheney https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

License

This code was authored by Mark Wolfe and licensed under the Apache 2.0 license.

Documentation

Overview

dynastore offers a simple storage abstraction for AWS DynamoDB

This library encourages developers to build services which store a number of closely related entities in the same table. For more information on this pattern https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-modeling-nosql-B.html.

This works by creating a DynamoDB table and using the hash key as the partition name, then storing the entity identifier in the range key. This entity identifier can be prefixed with a path to enable storing of hierarchies like a filesystem.

To setup a session, configure a table / partition and retrieve a record.

session := dynastore.New()
kv := session.Table("agents").Partition("users")

key := "user/123"
_, err := kv.Get("user/123")
if err != nil {
    if err == dynastore.ErrKeyNotFound {
        log.Printf("not found: %s", key)
    }
}

Index

Constants

View Source
const (
	// DefaultPartitionKeyAttribute this is the default partion key attribute name used throughout dynastore
	DefaultPartitionKeyAttribute = "id"

	// DefaultSortKeyAttribute this is the default sort key attribute name used throughout dynastore
	DefaultSortKeyAttribute = "name"
)
View Source
const (
	OperationNameKey contextKey = 1 + iota
)

Variables

View Source
var (
	// ErrKeyNotFound record not found in the table
	ErrKeyNotFound = errors.New("key not found in table")

	// ErrKeyExists record already exists in table
	ErrKeyExists = errors.New("key already exists in table")

	// ErrKeyModified record has been modified, this probably means someone beat you to the change/lock
	ErrKeyModified = errors.New("key has been modified")

	// ErrReservedField put contained an field in the write options which was reserved
	ErrReservedField = errors.New("fields contained reserved attribute name")

	// ErrIndexNotSupported dynamodb get operations don't support specifying an index
	ErrIndexNotSupported = errors.New("indexes not supported for this operation")
)

Functions

func MarshalStruct

func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error)

MarshalStruct this helper method marshals a struct into an *dynamodb.AttributeValue which contains a map in the format required to provide to WriteWithAttributeValue.

func OperationName added in v1.1.0

func OperationName(ctx context.Context) string

OperationName extracts the name of the operation being handled in the given context. If it is not known, it returns ("").

func UnmarshalStruct

func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error

UnmarshalStruct this helper method un-marshals a struct from an *dynamodb.AttributeValue returned by KVPair.AttributeValue.

Types

type DynaPartition added in v1.1.0

type DynaPartition struct {
	// contains filtered or unexported fields
}

dynaPartition store which is backed by AWS DynamoDB

func (*DynaPartition) AtomicDelete added in v1.1.0

func (ddb *DynaPartition) AtomicDelete(sortKey string, previous *KVPair) (bool, error)

AtomicDelete delete of a single value

This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist

func (*DynaPartition) AtomicDeleteWithContext added in v1.1.0

func (ddb *DynaPartition) AtomicDeleteWithContext(ctx context.Context, sortKey string, previous *KVPair) (bool, error)

AtomicDelete delete of a single value

This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist

FIXME: should the second case just return false, nil?

func (*DynaPartition) AtomicPut added in v1.1.0

func (ddb *DynaPartition) AtomicPut(sortKey string, options ...WriteOption) (bool, *KVPair, error)

AtomicPut Atomic CAS operation on a single value.

func (*DynaPartition) AtomicPutWithContext added in v1.1.0

func (ddb *DynaPartition) AtomicPutWithContext(ctx context.Context, sortKey string, options ...WriteOption) (bool, *KVPair, error)

AtomicPut Atomic CAS operation on a single value.

func (*DynaPartition) Delete added in v1.1.0

func (ddb *DynaPartition) Delete(sortKey string) error

Delete the value at the specified key

func (*DynaPartition) DeleteWithContext added in v1.1.0

func (ddb *DynaPartition) DeleteWithContext(ctx context.Context, sortKey string) error

Delete the value at the specified key

func (*DynaPartition) Exists added in v1.1.0

func (ddb *DynaPartition) Exists(sortKey string, options ...ReadOption) (bool, error)

Exists if a sort key exists in the store

func (*DynaPartition) ExistsWithContext added in v1.1.0

func (ddb *DynaPartition) ExistsWithContext(ctx context.Context, sortKey string, options ...ReadOption) (bool, error)

Exists if a sort key exists in the store

func (*DynaPartition) Get added in v1.1.0

func (ddb *DynaPartition) Get(sortKey string, options ...ReadOption) (*KVPair, error)

Get a value given its sort key

func (*DynaPartition) GetPartitionName added in v1.1.0

func (ddb *DynaPartition) GetPartitionName() string

func (*DynaPartition) GetTableName added in v1.1.0

func (ddb *DynaPartition) GetTableName() string

func (*DynaPartition) GetWithContext added in v1.1.0

func (ddb *DynaPartition) GetWithContext(ctx context.Context, sortKey string, options ...ReadOption) (*KVPair, error)

Get a value given its key

func (*DynaPartition) List deprecated added in v1.1.0

func (ddb *DynaPartition) List(prefix string, options ...ReadOption) ([]*KVPair, error)

List the content of a given prefix

Deprecated: This function attempts to list all records using a deadline / timeout which turned out to be a bad idea, use ListPage

func (*DynaPartition) ListPage added in v1.1.0

func (ddb *DynaPartition) ListPage(prefix string, options ...ReadOption) (*KVPairPage, error)

List the content of a given prefix

func (*DynaPartition) ListPageWithContext added in v1.1.0

func (ddb *DynaPartition) ListPageWithContext(ctx context.Context, prefix string, options ...ReadOption) (*KVPairPage, error)

List the content of a given prefix

func (*DynaPartition) ListWithContext deprecated added in v1.1.0

func (ddb *DynaPartition) ListWithContext(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)

List the content of a given prefix

Deprecated: This function attempts to list all records using a deadline / timeout which turned out to be a bad idea, use ListPageWithContext

func (*DynaPartition) Put added in v1.1.0

func (ddb *DynaPartition) Put(hashKey string, options ...WriteOption) error

Put a value at the specified key

func (*DynaPartition) PutWithContext added in v1.1.0

func (ddb *DynaPartition) PutWithContext(ctx context.Context, hashKey string, options ...WriteOption) error

Put a value at the specified key

type DynaSession added in v1.1.0

type DynaSession struct {
	*dynamodb.DynamoDB
	// contains filtered or unexported fields
}

func New

func New(cfgs ...*aws.Config) *DynaSession

New construct a DynamoDB backed store with default session / service

func NewWithClient added in v1.2.0

func NewWithClient(dynamoSvc *dynamodb.DynamoDB, storeHooks *StoreHooks) *DynaSession

func NewWithOptions added in v1.1.0

func NewWithOptions(awscfg *aws.Config, options ...SessionOption) *DynaSession

New construct a DynamoDB backed store with default session / service

func (*DynaSession) Table added in v1.1.0

func (ds *DynaSession) Table(tableName string) *DynaTable

type DynaTable added in v1.2.0

type DynaTable struct {
	// contains filtered or unexported fields
}

func (*DynaTable) AtomicDeleteWithContext added in v1.2.0

func (dt *DynaTable) AtomicDeleteWithContext(ctx context.Context, partitionKey, sortKey string, previous *KVPair) (bool, error)

AtomicDeleteWithContext delete of a single value

This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist

FIXME: should the second case just return false, nil?

func (*DynaTable) AtomicPutWithContext added in v1.2.0

func (dt *DynaTable) AtomicPutWithContext(ctx context.Context, partitionKey, sortKey string, options ...WriteOption) (bool, *KVPair, error)

AtomicPutWithContext Atomic CAS operation on a single value.

func (*DynaTable) DeleteWithContext added in v1.2.0

func (dt *DynaTable) DeleteWithContext(ctx context.Context, partitionKey, sortKey string) error

DeleteWithContext the value at the specified key

func (*DynaTable) ExistsWithContext added in v1.2.0

func (dt *DynaTable) ExistsWithContext(ctx context.Context, partitionKey, sortKey string, options ...ReadOption) (bool, error)

ExistsWithContext if a sort key exists in the store

This operation uses the DynamoDB get operation which doesn't support index read options

func (*DynaTable) GetTableName added in v1.2.0

func (dt *DynaTable) GetTableName() string

func (*DynaTable) GetWithContext added in v1.2.0

func (dt *DynaTable) GetWithContext(ctx context.Context, partitionKey, sortKey string, options ...ReadOption) (*KVPair, error)

GetWithContext a value given its key

This operation uses the DynamoDB get operation which doesn't support index read options

func (*DynaTable) ListPageWithContext added in v1.2.0

func (dt *DynaTable) ListPageWithContext(ctx context.Context, partitionKey, prefix string, options ...ReadOption) (*KVPairPage, error)

ListPageWithContext the content of a given prefix

func (*DynaTable) Partition added in v1.2.0

func (dt *DynaTable) Partition(partition string) Partition

func (*DynaTable) PutWithContext added in v1.2.0

func (dt *DynaTable) PutWithContext(ctx context.Context, partitionKey, hashKey string, options ...WriteOption) error

Put a value at the specified key

type KVPair

type KVPair struct {
	Partition string `dynamodbav:"id"`
	Key       string `dynamodbav:"name"`
	Version   int64  `dynamodbav:"version"`
	Expires   int64  `dynamodbav:"expires"`
	// contains filtered or unexported fields
}

KVPair represents {Key, Value, Version} tuple, internally this uses a *dynamodb.AttributeValue which can be used to store strings, slices or structs

func DecodeItem

func DecodeItem(item map[string]*dynamodb.AttributeValue) (*KVPair, error)

DecodeItem decode a DDB attribute value into a KVPair

func (*KVPair) BytesValue

func (kv *KVPair) BytesValue() []byte

BytesValue use the attribute to return a slice of bytes, a nil will be returned if it is empty or nil

func (*KVPair) DecodeFields added in v1.1.0

func (kv *KVPair) DecodeFields(out interface{}) error

DecodeFields decode the extra fields, which are typically index attributes, stored in the DynamoDB record using dynamodbattribute

func (*KVPair) DecodeValue

func (kv *KVPair) DecodeValue(out interface{}) error

DecodeValue decode using dynamodbattribute

func (*KVPair) StringValue added in v1.0.3

func (kv *KVPair) StringValue() string

StringValue use the attribute to return a slice of bytes, an empty string will be returned if it is empty or nil

type KVPairPage

type KVPairPage struct {
	Keys    []*KVPair `json:"keys"`
	LastKey string    `json:"last_key"`
}

KVPairPage provides a page of keys with next token to enable paging

type Partition

type Partition interface {
	GetPartitionName() string

	Put(sortKey string, options ...WriteOption) error

	PutWithContext(ctx context.Context, sortKey string, options ...WriteOption) error

	Get(key string, options ...ReadOption) (*KVPair, error)

	GetWithContext(ctx context.Context, sortKey string, options ...ReadOption) (*KVPair, error)

	List(prefix string, options ...ReadOption) ([]*KVPair, error)

	ListWithContext(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)

	ListPage(prefix string, options ...ReadOption) (*KVPairPage, error)

	ListPageWithContext(ctx context.Context, prefix string, options ...ReadOption) (*KVPairPage, error)

	Delete(sortKey string) error

	DeleteWithContext(ctx context.Context, sortKey string) error

	Exists(sortKey string, options ...ReadOption) (bool, error)

	ExistsWithContext(ctx context.Context, sortKey string, options ...ReadOption) (bool, error)

	AtomicPut(sortKey string, options ...WriteOption) (bool, *KVPair, error)

	AtomicPutWithContext(ctx context.Context, sortKey string, options ...WriteOption) (bool, *KVPair, error)

	AtomicDelete(sortKey string, previous *KVPair) (bool, error)

	AtomicDeleteWithContext(ctx context.Context, sortKey string, previous *KVPair) (bool, error)
}

Partition a partition represents a grouping of data within a DynamoDB table.

type ReadOption

type ReadOption func(opts *ReadOptions)

ReadOption assign various settings to the read options

func ReadConsistentDisable

func ReadConsistentDisable() ReadOption

ReadConsistentDisable disable consistent reads

func ReadScanIndexForwardDisable added in v1.2.0

func ReadScanIndexForwardDisable() ReadOption

ReadScanIndexForwardDisable if this is disabled DynamoDB reads the results in reverse order by sort key value (DESCENDING ORDER)

func ReadWithGlobalIndex added in v1.1.0

func ReadWithGlobalIndex(name, partitionKeyAttribute, sortKeyAttribute string) ReadOption

ReadWithGlobalIndex preform a read using a local index with the given name and the name of the partition and sort key attributes.

func ReadWithLimit

func ReadWithLimit(limit int64) ReadOption

ReadWithLimit read a list of records with the limit provided this will apply to list operations only.

func ReadWithLocalIndex added in v1.1.0

func ReadWithLocalIndex(name, sortKeyAttribute string) ReadOption

ReadWithLocalIndex preform a read using a local index with the given name and the name of the sort key attribute.

func ReadWithStartKey

func ReadWithStartKey(key string) ReadOption

ReadWithStartKey read a list of records with the exclusive start key provided this will apply to list operations only.

type ReadOptions

type ReadOptions struct {
	// contains filtered or unexported fields
}

ReadOptions contains optional request parameters

func NewReadOptions

func NewReadOptions(opts ...ReadOption) *ReadOptions

NewReadOptions create read options, assign defaults then accept overrides enable the read consistent flag by default

func (*ReadOptions) Append added in v1.0.1

func (ro *ReadOptions) Append(opts ...ReadOption)

Append append more options which supports conditional addition

type Session

type Session interface {
	dynamodbiface.DynamoDBAPI

	// Table returns a table
	Table(tableName string) Table
}

Session represents the backend K/V storage using one or more DynamoDB tables containing partitions. This primarily holds the AWS Session settings and configuration, and enables direct access to DynamoDB.

type SessionOption added in v1.1.0

type SessionOption func(opts *SessionOptions)

SessionOption assign various settings to the session options

func SessionWithAWSHooks added in v1.1.0

func SessionWithAWSHooks(storeHooks *StoreHooks) SessionOption

SessionWithAWSHooks hooks invoked while using this session

type SessionOptions added in v1.1.0

type SessionOptions struct {
	// contains filtered or unexported fields
}

SessionOptions contains optional request parameters

func NewSessionOptions added in v1.1.0

func NewSessionOptions(opts ...SessionOption) *SessionOptions

NewSessionOptions create session options, assign defaults then accept overrides

type StoreHooks added in v1.1.0

type StoreHooks struct {
	// RequestBuilt will be invoked prior to dispatching the request to the AWS SDK
	RequestBuilt func(ctx context.Context, params interface{}) context.Context
}

StoreHooks is a container for callbacks that can instrument the datastore

type Table

type Table interface {
	GetTableName() string

	Partition(partitionName string) Partition

	PutWithContext(ctx context.Context, partitionKey, sortKey string, options ...WriteOption) error

	GetWithContext(ctx context.Context, partitionKey, sortKey string, options ...ReadOption) (*KVPair, error)

	ListPageWithContext(ctx context.Context, partitionKey, prefix string, options ...ReadOption) (*KVPairPage, error)

	DeleteWithContext(ctx context.Context, partitionKey, sortKey string) error

	ExistsWithContext(ctx context.Context, partitionKey, sortKey string, options ...ReadOption) (bool, error)

	AtomicPutWithContext(ctx context.Context, partitionKey, sortKey string, options ...WriteOption) (bool, *KVPair, error)

	AtomicDeleteWithContext(ctx context.Context, partitionKey, sortKey string, previous *KVPair) (bool, error)
}

Table represents a table in DynamoDB, this is where you store all your partitioned data for a given model.

type WriteOption

type WriteOption func(opts *WriteOptions)

WriteOption assign various settings to the write options

func WriteWithBytes

func WriteWithBytes(val []byte) WriteOption

WriteWithBytes encode raw data using base64 and assign this value to the key which is written

func WriteWithFields added in v1.1.0

func WriteWithFields(fields map[string]string) WriteOption

WriteWithFields assign fields to the top level record, this is used to assign attributes used in indexes

func WriteWithNoExpires

func WriteWithNoExpires() WriteOption

WriteWithNoExpires time to live (TTL) is set not set so it never expires

func WriteWithPreviousKV

func WriteWithPreviousKV(previous *KVPair) WriteOption

WriteWithPreviousKV previous KV which will be checked prior to update

func WriteWithString added in v1.0.2

func WriteWithString(val string) WriteOption

WriteWithString assign this value to the key which is written

func WriteWithTTL

func WriteWithTTL(ttl time.Duration) WriteOption

WriteWithTTL time to live (TTL) to the key which is written

type WriteOptions

type WriteOptions struct {
	// contains filtered or unexported fields
}

WriteOptions contains optional request parameters

func NewWriteOptions

func NewWriteOptions(opts ...WriteOption) *WriteOptions

NewWriteOptions create write options, assign defaults then accept overrides

func (*WriteOptions) Append added in v1.0.1

func (wo *WriteOptions) Append(opts ...WriteOption)

Append append more options which supports conditional addition

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL