keyvaluestore

package module
v0.0.0-...-02ed867 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2023 License: MIT Imports: 3 Imported by: 2

README

keyvaluestore Documentation

This package provides an interface with a Redis-like API and implementations for multiple key-value store backends. It supports profiling, caching, batching, eventual consistency, and transactions. It has implementations for:

  • Redis, which is ideal for dev environments due to its low overhead.
  • DynamoDB, which is ideal in production due to its incredible scalability.
  • FoundationDB, which is ideal in production when you want control over the hardware or when DynamoDB's latency isn't good enough.
  • An in-memory store, which is ideal for unit tests as it can be trivially destroyed and created.

This project originated at the AAF, where it was used in production until the company went bankrupt. I (@ccbrown) believe it's the best way to use DynamoDB in Go applications, so I'm continuing to maintain it and use it for other projects.

Examples

Let's assume you have a struct representing your persistence layer like so:

type Store struct {
    backend keyvaluestore.Backend
}

See the section below on backends for details on how to initialize the backend field.

Storing an Object
func (s *Store) AddUser(user *model.User) error {
    serialized, err := json.Marshal(user)
    if err != nil {
        return err
    }
    return s.backend.Set("user:" + string(user.Id), serialized)
}

This is the simplest way to store an object: Serialize it (JSON or MessagePack works well), then use Set to store it. Alternatively, you could just implement BinaryMarshaler on your objects and skip the serialization step here.

Getting an Object

Building off of the previous example, if you have a user's id, you can retrieve them like so:

func (s *Store) GetUserById(id model.Id) (*model.User, error) {
    serialized, err := s.backend.Get("user:" + string(id))
    if serialized == nil {
        return nil, err
    }
    var user *model.User
    if err := json.Unmarshal([]byte(*serialized), &user); err != nil {
        return nil, err
    }
    return user, nil
}
Storing an Object, Part 2

The first example has two big problems:

  1. Users aren't accessible unless you have their id and there's no way to enumerate them.
  2. It doesn't enforce uniqueness constraints for usernames or email addresses. (Let's assume all our users have usernames and email addresses.)

The first problem can be most easily solved with sorted sets: Simply add all users to a sorted set, which can be easily enumerated later. The second problem requires the use of transactions. In this case, the type of transaction we want is an atomic write operation.

var ErrEmailAddressInUse = fmt.Errorf("email address in use")
var ErrUsernameInUse = fmt.Errorf("username in use")

func (s *Store) AddUser(user *model.User) error {
    serialized, err := json.Marshal(user)
    if err != nil {
        return err
    }

    tx := s.backend.AtomicWrite()
    tx.Set("user:" + string(user.Id), serialized)
    tx.ZHAdd("usernames", user.Username, user.Id, 0.0)
    usernameSet := tx.SetNX("user_by_username:"+user.Username, user.Id)
    tx.SetNX("user_by_email_address:"+user.EmailAddress, user.Id)

    if didCommit, err := tx.Exec(); err != nil {
        return err
    } else if didCommit {
        return nil
    } else if usernameSet.ConditionalFailed() {
        return ErrUsernameInUse
    }
    return ErrEmailAddressInUse
}

This implementation now covers all the bases:

  • We can enumerate users, sorted by username, by iterating over the "usernames" set.
  • If the username is already taken, the transaction will be aborted and the function will return ErrUsernameInUse.
  • If the email address is already taken, the transaction will be aborted and the function will return ErrEmailAddressInUse.
  • We also have the ability to look up users by their username or email address.
Getting Multiple Objects

In many scenarios, you'll want to fetch more than one user at once. If you made one round-trip to the backend per user, this would be very slow. To efficiently fetch multiple objects or perform multiple operations, you can use batching:

func (s *Store) GetUsersByIds(ids ...model.Id) ([]*model.User, error) {
    batch := s.backend.Batch()
    gets := make([]keyvaluestore.GetResult, len(ids))
    for i, id := range ids {
        gets[i] = batch.Get("user:" + string(id))
    }
    if err := batch.Exec(); err != nil {
        return nil, err
    }

    users := make([]*model.User, 0, len(gets))
    for _, get := range gets {
        if v, _ := get.Result(); v != nil {
            var user *model.User
            if err := json.Unmarshal([]byte(*v), &user); err != nil {
                return nil, err
            }
            users = append(users, user)
        }
    }
    return users, nil
}

Backends

Memory

In-memory backends are ideal for unit tests as you can create and destroy thousands of them quickly and cheaply:

backend := memorystore.NewBackend()
backend.Set("foo", "bar")
Redis

Redis backends are ideal for dev environments as they're lightweight and easy to spin up and tear down:

backend := &redisstore.Backend{
    Client: redis.NewClient(&redis.Options{
        Addr: "127.0.0.1:6379",
    }),
}
backend.Set("foo", "bar")
DynamoDB

DynamoDB is ideal for production in AWS as it's easy to set up and maintain and scales incredibly well. It uses a single-table design, so the schema is minimal and fixed regardless of your data.

When you create your DynamoDB table, you'll need to give it the following schema:

DynamoDBTable:
  Type: AWS::DynamoDB::Table
  Properties:
    AttributeDefinitions:
      - AttributeName: hk
        AttributeType: B
      - AttributeName: rk
        AttributeType: B
      - AttributeName: rk2
        AttributeType: B
    KeySchema:
      - AttributeName: hk
        KeyType: HASH
      - AttributeName: rk
        KeyType: RANGE
    LocalSecondaryIndexes:
      - IndexName: rk2
        KeySchema:
          - AttributeName: hk
            KeyType: HASH
          - AttributeName: rk2
            KeyType: RANGE
        Projection:
          ProjectionType: ALL
    BillingMode: PAY_PER_REQUEST

If you're using CloudFormation, you can just copy/paste that into your template.

Then you can connect to it like so:

session := session.Must(session.NewSession())
backend := &KeyValueStore{
    backend: &dynamodbstore.Backend{
        Client: dynamodb.New(session),
        TableName: cfg.TableName,
    },
}
backend.Set("foo", "bar")

You can also create the backend using a DAX client for improved performance.

Documentation

Index

Constants

View Source
const MaxAtomicWriteOperations = 25

DynamoDB can't do more than 25 operations in an atomic write so all backends should enforce this limit.

Variables

This section is empty.

Functions

func IsAtomicWriteConflict

func IsAtomicWriteConflict(err error) bool

IsAtomicWriteConflict returns true when an atomic write fails due to contention (but not due to a failed conditional). For example, in DynamoDB this error happens when a transaction fails due to a TransactionConflict.

func ToString

func ToString(v interface{}) *string

Types

type AtomicWriteConflictError

type AtomicWriteConflictError struct {
	Err error
}

AtomicWriteConflictError happens when an atomic write fails due to contention (but not due to a failed conditional). For example, in DynamoDB this error happens when a transaction fails due to a TransactionConflict.

func (*AtomicWriteConflictError) Error

func (e *AtomicWriteConflictError) Error() string

func (*AtomicWriteConflictError) Unwrap

func (e *AtomicWriteConflictError) Unwrap() error

type AtomicWriteOperation

type AtomicWriteOperation interface {
	// Sets a key. No conditionals are applied.
	Set(key string, value interface{}) AtomicWriteResult

	// Sets a key. The atomic write operation will be aborted if the key already exists.
	SetNX(key string, value interface{}) AtomicWriteResult

	// Sets a key. The atomic write operation will be aborted if the key does not already exist.
	SetXX(key string, value interface{}) AtomicWriteResult

	// Sets a key. The atomic write operation will be aborted if the key does not exist or does not
	// have the given value.
	SetEQ(key string, value, oldValue interface{}) AtomicWriteResult

	// Deletes a key. No conditionals are applied.
	Delete(key string) AtomicWriteResult

	// Deletes a key. The atomic write operation will be aborted if the key does not exist.
	DeleteXX(key string) AtomicWriteResult

	// Increments the number with the given key by some number. If the key doesn't exist, it's set
	// to the given number instead. No conditionals are applied.
	NIncrBy(key string, n int64) AtomicWriteResult

	// Add to or create a sorted set. The size of the member may be limited by some backends (for
	// example, DynamoDB limits it to approximately 1024 bytes). No conditionals are applied.
	ZAdd(key string, member interface{}, score float64) AtomicWriteResult

	// Adds a member to a sorted set. The atomic write operation will be aborted if the member
	// already exists in the set.
	ZAddNX(key string, member interface{}, score float64) AtomicWriteResult

	// Removes a member from a sorted set. No conditionals are applied.
	ZRem(key string, member interface{}) AtomicWriteResult

	// Add to or create a sorted hash. A sorted hash is like a cross between a hash and sorted set.
	// It uses a field name instead of the member for the purposes of identifying and
	// lexicographically sorting members.
	//
	// With DynamoDB, the field is limited to approximately 1024 bytes while the member is not.
	//
	// No conditionals are applied.
	ZHAdd(key, field string, member interface{}, score float64) AtomicWriteResult

	// Removes a member from a sorted hash. No conditionals are applied.
	ZHRem(key, field string) AtomicWriteResult

	// Adds a member to a set. No conditionals are applied.
	SAdd(key string, member interface{}, members ...interface{}) AtomicWriteResult

	// Removes a member from a set. No conditionals are applied.
	SRem(key string, member interface{}, members ...interface{}) AtomicWriteResult

	// Sets one or more fields of the hash at the given key. No conditionals are applied.
	HSet(key, field string, value interface{}, fields ...KeyValue) AtomicWriteResult

	// Sets one or more fields of the hash at the given key. The atomic write operation will be
	// aborted if the field already exists.
	HSetNX(key, field string, value interface{}) AtomicWriteResult

	// Deletes one or more fields of the hash at the given key. No conditionals are applied.
	HDel(key, field string, fields ...string) AtomicWriteResult

	// Executes the operation. If a condition failed, returns false.
	Exec() (bool, error)
}

type AtomicWriteResult

type AtomicWriteResult interface {
	// Returns true if the transaction failed due to this operation's conditional failing.
	ConditionalFailed() bool
}

type Backend

type Backend interface {
	// Batch allows you to batch up simple operations for better performance potential. Use this
	// only for possible performance benefits. Read isolation is implementation-defined and other
	// properties such as atomicity should not be assumed.
	Batch() BatchOperation

	// AtomicWrite executes up to 25 write operations atomically, failing entirely if any
	// conditional operations (e.g. SetNX) are not executed.
	AtomicWrite() AtomicWriteOperation

	Delete(key string) (success bool, err error)
	Get(key string) (*string, error)
	Set(key string, value interface{}) error

	// Set if the key already exists.
	SetXX(key string, value interface{}) (bool, error)

	// Set if the key doesn't exist.
	SetNX(key string, value interface{}) (bool, error)

	// Set if the key exists and its value is equal to the given one.
	SetEQ(key string, value, oldValue interface{}) (success bool, err error)

	// Increments the number with the given key by some number. If the key doesn't exist, it's set
	// to the given number instead. To get the current value, you can pass 0 as n.
	NIncrBy(key string, n int64) (int64, error)

	// Add to or create a set. Sets are ideal for small sizes, but have implementation-dependent
	// size limitations (400KB for DynamoDB). For large or unbounded sets, use ZAdd instead.
	SAdd(key string, member interface{}, members ...interface{}) error

	// Remove from a set.
	SRem(key string, member interface{}, members ...interface{}) error

	// Get members of a set.
	SMembers(key string) ([]string, error)

	// Sets one or more fields of the hash at the given key. If no hash exists at the key, a new one
	// is created. Hashes are ideal for small sizes, but have implementation-dependent size
	// limitations (400KB for DynamoDB). For large or unbounded sets, use something else.
	HSet(key, field string, value interface{}, fields ...KeyValue) error

	// Deletes one or more fields of the hash at the given key.
	HDel(key, field string, fields ...string) error

	// Gets a field of the hash at the given key or nil if the hash or field does not exist.
	HGet(key, field string) (*string, error)

	// Gets all fields of the hash at the given key.
	HGetAll(key string) (map[string]string, error)

	// Add to or create a sorted set. The size of the member may be limited by some backends (for
	// example, DynamoDB limits it to approximately 1024 bytes).
	ZAdd(key string, member interface{}, score float64) error

	// Gets the score for a member added via ZAdd.
	ZScore(key string, member interface{}) (*float64, error)

	// Remove from a sorted set.
	ZRem(key string, member interface{}) error

	// Increment a score in a sorted set or set the score if the member doesn't exist.
	ZIncrBy(key string, member interface{}, n float64) (float64, error)

	// Get members of a sorted set by ascending score.
	ZRangeByScore(key string, min, max float64, limit int) ([]string, error)

	// Get members (and their scores) of a sorted set by ascending score.
	ZRangeByScoreWithScores(key string, min, max float64, limit int) (ScoredMembers, error)

	// Get members of a sorted set by descending score.
	ZRevRangeByScore(key string, min, max float64, limit int) ([]string, error)

	// Get members (and their scores) of a sorted set by descending score.
	ZRevRangeByScoreWithScores(key string, min, max float64, limit int) (ScoredMembers, error)

	// Gets the number of members with scores between min and max, inclusive. This method can get
	// somewhat expensive on DynamoDB as it is not a constant-time operation.
	ZCount(key string, min, max float64) (int, error)

	// Gets the number of members between min and max. All members of the set must have been added
	// with a zero score. min and max must begin with '(' or '[' to indicate exclusive or inclusive.
	// Alternatively, min can be "-" and max can be "+" to represent infinities. This method can get
	// somewhat expensive on DynamoDB as it is not a constant-time operation.
	ZLexCount(key string, min, max string) (int, error)

	// Get members of a sorted set by lexicographical order. All members of the set must have been
	// added with a zero score. min and max must begin with '(' or '[' to indicate exclusive or
	// inclusive. Alternatively, min can be "-" and max can be "+" to represent infinities.
	ZRangeByLex(key string, min, max string, limit int) ([]string, error)

	// Get members of a sorted set by reverse lexicographical order. All members of the set must
	// have been added with a zero score. min and max must begin with '(' or '[' to indicate
	// exclusive or inclusive. Alternatively, min can be "-" and max can be "+" to represent
	// infinities.
	ZRevRangeByLex(key string, min, max string, limit int) ([]string, error)

	// Add to or create a sorted hash. A sorted hash is like a cross between a hash and sorted set.
	// It uses a field name instead of the member for the purposes of identifying and
	// lexicographically sorting members.
	//
	// With DynamoDB, the field is limited to approximately 1024 bytes while the member is limited
	// to approximately 200KB.
	ZHAdd(key, field string, member interface{}, score float64) error

	// Gets a field of the sorted hash at the given key or nil if the hash or field does not exist.
	ZHGet(key, field string) (*string, error)

	// Remove from a sorted hash.
	ZHRem(key, field string) error

	// Get members of a sorted hash by ascending score.
	ZHRangeByScore(key string, min, max float64, limit int) ([]string, error)

	// Get members (and their scores) of a sorted hash by ascending score.
	ZHRangeByScoreWithScores(key string, min, max float64, limit int) (ScoredMembers, error)

	// Get members of a sorted hash by descending score.
	ZHRevRangeByScore(key string, min, max float64, limit int) ([]string, error)

	// Get members (and their scores) of a sorted hash by descending score.
	ZHRevRangeByScoreWithScores(key string, min, max float64, limit int) (ScoredMembers, error)

	// Get members of a sorted hash by their fields' lexicographical order. All members of the set
	// must have been added with a zero score. min and max must begin with '(' or '[' to indicate
	// exclusive or inclusive. Alternatively, min can be "-" and max can be "+" to represent
	// infinities.
	ZHRangeByLex(key string, min, max string, limit int) ([]string, error)

	// Get members of a sorted hash by their fields' reverse lexicographical order. All members of
	// the set must have been added with a zero score. min and max must begin with '(' or '[' to
	// indicate exclusive or inclusive. Alternatively, min can be "-" and max can be "+" to
	// represent infinities.
	ZHRevRangeByLex(key string, min, max string, limit int) ([]string, error)

	// For performance improvements you may wish to enable eventually consistent reads for backends
	// that support it.
	WithEventuallyConsistentReads() Backend

	// Some backends support metrics via profilers. See the Profiler interfaces in the specific
	// implementation packages.
	WithProfiler(profiler interface{}) Backend

	// If the backend wraps another (e.g. a read cache that wraps a redis backend), this returns the
	// wrapped backend.
	Unwrap() Backend
}

type BatchOperation

type BatchOperation interface {
	Get(key string) GetResult
	Delete(key string) ErrorResult
	Set(key string, value interface{}) ErrorResult
	SMembers(key string) SMembersResult
	SAdd(key string, member interface{}, members ...interface{}) ErrorResult
	SRem(key string, member interface{}, members ...interface{}) ErrorResult
	ZAdd(key string, member interface{}, score float64) ErrorResult
	ZRem(key string, member interface{}) ErrorResult
	ZScore(key string, member interface{}) ZScoreResult
	ZHGet(key, field string) GetResult

	Exec() error
}

type ErrorResult

type ErrorResult interface {
	Result() error
}

type FallbackBatchOperation

type FallbackBatchOperation struct {
	Backend Backend
	// contains filtered or unexported fields
}

FallbackBatchOperation provides a suitable fallback for stores that don't supported optimized batching.

func (*FallbackBatchOperation) Delete

func (op *FallbackBatchOperation) Delete(key string) ErrorResult

func (*FallbackBatchOperation) Exec

func (op *FallbackBatchOperation) Exec() error

func (*FallbackBatchOperation) Get

func (*FallbackBatchOperation) SAdd

func (op *FallbackBatchOperation) SAdd(key string, member interface{}, members ...interface{}) ErrorResult

func (*FallbackBatchOperation) SMembers

func (op *FallbackBatchOperation) SMembers(key string) SMembersResult

func (*FallbackBatchOperation) SRem

func (op *FallbackBatchOperation) SRem(key string, member interface{}, members ...interface{}) ErrorResult

func (*FallbackBatchOperation) Set

func (op *FallbackBatchOperation) Set(key string, value interface{}) ErrorResult

func (*FallbackBatchOperation) ZAdd

func (op *FallbackBatchOperation) ZAdd(key string, member interface{}, score float64) ErrorResult

func (*FallbackBatchOperation) ZHGet

func (op *FallbackBatchOperation) ZHGet(key, field string) GetResult

func (*FallbackBatchOperation) ZRem

func (op *FallbackBatchOperation) ZRem(key string, member interface{}) ErrorResult

func (*FallbackBatchOperation) ZScore

func (op *FallbackBatchOperation) ZScore(key string, member interface{}) ZScoreResult

type GetResult

type GetResult interface {
	Result() (*string, error)
}

type KeyValue

type KeyValue struct {
	Key   string
	Value interface{}
}

type SMembersResult

type SMembersResult interface {
	Result() ([]string, error)
}

type ScoredMember

type ScoredMember struct {
	Score float64
	Value string
}

type ScoredMembers

type ScoredMembers []*ScoredMember

func (ScoredMembers) Values

func (m ScoredMembers) Values() []string

type ZScoreResult

type ZScoreResult interface {
	Result() (*float64, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL