ddb

package module
v0.0.0-...-b6a7f6c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2024 License: Apache-2.0 Imports: 21 Imported by: 1

README

GoDoc Go Report Card Coverage Status

ddb

ddb is a high level library for accessing DynamoDB.

QuickStart

type Example struct {
  PlayerID string `ddb:"hash"`
  Date     string `ddb:"range"`
}

func example() {
  var (
    ctx       = context.Background()
    s         = session.Must(session.NewSession(aws.NewConfig()))
    api       = dynamodb.New(s)
    tableName = "examples"
    model     = Example{}
    db        = ddb.New(api)
  )  
  
  table := db.MustTable(tableName, model)
  err := table.CreateTableIfNotExists(ctx)
  // handle err ...
  
  record := Example {
    PlayerID: "abc",
    Date:     "2019-01-01",
  }
  err = table.Put(record).RunWithContext(ctx) 
  // handle err ...
  
  var got Example
  err = table.Get(String(record.PlayerID)). // with hash key
    Range(String(record.Date)).             // and range key 
    ScanWithContext(ctx, &got)
  // handle err ...
  
  err = table.DeleteTableIfExists(ctx)
  // handle err ...
}

Models

ddb leverages the the github.com/aws/aws-sdk-go package for encoding and decoding DynamoDB records to and from structs.

  • Use dynamodbav tag option for encoding information
  • Use ddb tag option to provide meta data about table
  • Use ; to separate multiple tag options within a tag e.g. a;b;c
Hash Key

Use the hash tag to define the hash (e.g. partition) key.

type Example struct {
  ID string `ddb:"hash"`
}
Range Key

Use the range tag to define the range (e.g. sort) key.

type Example struct {
  ID   string `ddb:"hash"`
  Date string `ddb:"range"`
}
Local Secondary Indexes (LSI)

To setup local secondary indexes, use the following tags:

  • lsi_range:{index-name} define the range (e.g. sort) key of the LSI
  • lsi_range:{index-name},keys_only - same as above, but indicate LSI should contains KEYS_ONLY
  • lsi:{index-name} include specific attribute within the LSI

In this example, we define a local secondary index with index name, blah, whose range key is Alt that includes Field1.

type Example struct {
  ID     string `ddb:"hash"`
  Date   string `ddb:"range"`
  Alt    string `ddb:"lsi_range:blah"`
  Field1 string `ddb:"lsi:blah"`
  Field2 string
}
Global Secondary Indexes (GSI)

To setup global secondary indexes, use the following tags:

  • gsi_hash:{index-name} define the hash (e.g. partition) key of the GSI
  • gsi_range:{index-name} define the range (e.g. sort) key of the GSI
  • gsi_range:{index-name},keys_only - same as above, but indicate GSI should contains KEYS_ONLY
  • gsi:{index-name} include specific attribute within the GSI

In this example, we define a global secondary index with index name, blah, whose hash key is VerifiedAt and whose range key is ID.

type Example struct {
  ID         string `ddb:"hash;gsi_range:blah"`
  Date       string `ddb:"range"`
  VerifiedAt int64  `ddb:"gsi_hash:blah"`
}
Using dynamodbav to specify attribute values

This example illustrates using the dynamodbav in conjunction with the ddb to define the schema. Here the hash key of the table will be set to id (not ID)

type Example struct {
  ID string `ddb:"hash" dynamodbav:"id"`
}

Documentation

Index

Constants

View Source
const (
	DefaultBillingMode   = dynamodb.BillingModeProvisioned
	DefaultReadCapacity  = int64(3)
	DefaultWriteCapacity = int64(3)
)
View Source
const (
	ErrInvalidFieldName     = "InvalidFieldName"
	ErrItemNotFound         = "ItemNotFound"
	ErrMismatchedValueCount = "MismatchedValueCount"
	ErrUnableToMarshalItem  = "UnableToMarshalItem"
)

Variables

This section is empty.

Functions

func IsInvalidFieldNameError

func IsInvalidFieldNameError(err error) bool

func IsItemNotFoundError

func IsItemNotFoundError(err error) bool

IsItemNotFoundError returns true if any error in the cause change contains the code, ErrItemNotFound

func IsMismatchedValueCountError

func IsMismatchedValueCountError(err error) bool

func TableName

func TableName(eventSourceARN string) (string, bool)

TableName returns the table name for a given record

Types

type Change

type Change struct {
	// The approximate date and time when the stream record was created, in UNIX
	// epoch time (http://www.epochconverter.com/) format.
	ApproximateCreationDateTime EpochSeconds `json:"ApproximateCreationDateTime,omitempty"`

	// Keys for dynamodb modified dynamodb item
	Keys map[string]*dynamodb.AttributeValue `json:"Keys,omitempty"`

	// NewImage holds dynamodb item AFTER modification
	NewImage map[string]*dynamodb.AttributeValue `json:"NewImage,omitempty"`

	// OldImage holds dynamodb item BEFORE modification
	OldImage map[string]*dynamodb.AttributeValue `json:"OldImage,omitempty"`

	// SequenceNumber of stream record
	SequenceNumber string `json:"SequenceNumber"`

	// SizeBytes contains size of record
	SizeBytes int64 `json:"SizeBytes"`

	// StreamViewType indicates what type of information is being held
	// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_StreamSpecification.html
	StreamViewType string `json:"StreamViewType"`
}

Change represents the change performed

type ConsumedCapacity

type ConsumedCapacity struct {
	ReadUnits  int64
	WriteUnits int64
	// contains filtered or unexported fields
}

func (*ConsumedCapacity) CapacityUnits

func (c *ConsumedCapacity) CapacityUnits() float64

type DDB

type DDB struct {
	// contains filtered or unexported fields
}

func New

func New(api dynamodbiface.DynamoDBAPI) *DDB

func (*DDB) MustTable

func (d *DDB) MustTable(tableName string, model interface{}) *Table

func (*DDB) Table

func (d *DDB) Table(tableName string, model interface{}) (*Table, error)

func (*DDB) TransactGetItems

func (d *DDB) TransactGetItems(items ...GetTx) error

TransactGetItems allows TransactGetItems to be called without a context

func (*DDB) TransactGetItemsWithContext

func (d *DDB) TransactGetItemsWithContext(ctx context.Context, gets ...GetTx) (err error)

TransactGetItemsWithContext wraps the get operations using a TransactGetItems

func (*DDB) TransactWriteItems

func (d *DDB) TransactWriteItems(items ...WriteTx) (*dynamodb.TransactWriteItemsOutput, error)

func (*DDB) TransactWriteItemsWithContext

func (d *DDB) TransactWriteItemsWithContext(ctx context.Context, items ...WriteTx) (*dynamodb.TransactWriteItemsOutput, error)

TransactWriteItemsWithContext applies the provided operations in a dynamodb transaction. Subject to the limits of of TransactWriteItems.

func (*DDB) WithTokenFunc

func (d *DDB) WithTokenFunc(fn func() string) *DDB

WithTokenFunc allows the generator func for dynamodb transactions to be overwritten

func (*DDB) WithTransactAttempts

func (d *DDB) WithTransactAttempts(n int) *DDB

WithTransactAttempts overrides the number of times to attempt a Transact before giving up. Defaults to 4

func (*DDB) WithTransactTimeout

func (d *DDB) WithTransactTimeout(fn func(i int) time.Duration) *DDB

WithTransactTimeout allows the timeout progression to be customized. By default uses exponential backoff e.g. attempt^2 * duration

type Delete

type Delete struct {
	// contains filtered or unexported fields
}

func (*Delete) Condition

func (d *Delete) Condition(expr string, values ...interface{}) *Delete

func (*Delete) ConsumedCapacity

func (d *Delete) ConsumedCapacity(capture *ConsumedCapacity) *Delete

ConsumedCapacity captures consumed capacity to the property provided

func (*Delete) DeleteItemInput

func (d *Delete) DeleteItemInput() (*dynamodb.DeleteItemInput, error)

func (*Delete) Range

func (d *Delete) Range(rangeKey interface{}) *Delete

func (*Delete) ReturnValuesOnConditionCheckFailure

func (d *Delete) ReturnValuesOnConditionCheckFailure(value string) *Delete

Use ReturnValuesOnConditionCheckFailure to get the item attributes if the Delete condition fails. For ReturnValuesOnConditionCheckFailure, the valid values are: NONE and ALL_OLD.

Only used by Tx()

func (*Delete) Run

func (d *Delete) Run() error

func (*Delete) RunWithContext

func (d *Delete) RunWithContext(ctx context.Context) error

func (*Delete) Tx

func (d *Delete) Tx() (*dynamodb.TransactWriteItem, error)

type EpochSeconds

type EpochSeconds int64

EpochSeconds expresses time in unix seconds

func (EpochSeconds) MarshalJSON

func (e EpochSeconds) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler

func (EpochSeconds) Time

func (e EpochSeconds) Time() time.Time

Time returns time.Time

func (*EpochSeconds) UnmarshalJSON

func (e *EpochSeconds) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler

type Error

type Error interface {
	error
	Cause() error
	Code() string
	Keys() (hashKey, rangeKey *dynamodb.AttributeValue)
	Message() string
	TableName() string
}

Error provides a unified error definition that includes a code and message along with an optional original error.

type Event

type Event struct {
	// EventSourceARN holds the arn of the resource that generated the record
	EventSourceARN string `json:"eventSourceARN,omitempty"`

	// IsFinalInvokeForWindow - indicates if this is the last invocation for the tumbling window. This only occurs once per window period. [Tumbling Window]
	IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow,omitempty"`

	// IsWindowTerminatedEarly - a window ends early only if the state exceeds the maximum allowed size of 1 MB [Tumbling Window]
	IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly,omitempty"`

	// Records contains the modified records in order
	Records []Record `json:"Records"`

	// ShardId uniquely identifies the shard
	ShardId string `json:"shardId,omitempty"`

	// State holds optional tumbling window state [Tumbling Window]
	State json.RawMessage `json:"state,omitempty"`

	// Window holds the endpoints of this window [Tumbling Window]
	Window *Window `json:"window,omitempty"`
}

Event record emitted by dynamodb streams.

Motivation: While github.com/aws/aws-lambda-go is a fantastic library for working with lambda in Go, the dynamodb type defined in the Record cannot be unmarshaled by github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute

type Get

type Get struct {
	// contains filtered or unexported fields
}

func (*Get) ConsistentRead

func (g *Get) ConsistentRead(enabled bool) *Get

func (*Get) ConsumedCapacity

func (g *Get) ConsumedCapacity(capture *ConsumedCapacity) *Get

ConsumedCapacity captures consumed capacity to the property provided

func (*Get) GetItemInput

func (g *Get) GetItemInput() (*dynamodb.GetItemInput, error)

func (*Get) Range

func (g *Get) Range(value interface{}) *Get

func (*Get) Scan

func (g *Get) Scan(v interface{}) error

func (*Get) ScanTx

func (g *Get) ScanTx(v interface{}) GetTx

func (*Get) ScanWithContext

func (g *Get) ScanWithContext(ctx context.Context, v interface{}) error

type GetTx

type GetTx interface {
	// Decode the response from AWS
	Decode(v *dynamodb.ItemResponse) error
	// Tx generates the get input
	Tx() (*dynamodb.TransactGetItem, error)
}

GetTx encapsulates a transactional get operation

type Int64Set

type Int64Set []int64

Int64Set represents an array expressed as a set. (otherwise than a List which would be the default)

func (Int64Set) MarshalDynamoDBAttributeValue

func (ii Int64Set) MarshalDynamoDBAttributeValue(item *dynamodb.AttributeValue) error

MarshalDynamoDBAttributeValue implements Marshaler

func (*Int64Set) UnmarshalDynamoDBAttributeValue

func (ii *Int64Set) UnmarshalDynamoDBAttributeValue(item *dynamodb.AttributeValue) error

UnmarshalDynamoDBAttributeValue implements Unmarshaler

type Item

type Item interface {
	// Raw returns the raw value of the element
	Raw() map[string]*dynamodb.AttributeValue

	// Unmarshal the record into the provided interface
	Unmarshal(v interface{}) error
}

Item provides handle to each record that can be unmarshalled

type Put

type Put struct {
	// contains filtered or unexported fields
}

func (*Put) Condition

func (p *Put) Condition(expr string, values ...interface{}) *Put

func (*Put) ConsumedCapacity

func (p *Put) ConsumedCapacity(capture *ConsumedCapacity) *Put

ConsumedCapacity captures consumed capacity to the property provided

func (*Put) PutItemInput

func (p *Put) PutItemInput() (*dynamodb.PutItemInput, error)

func (*Put) ReturnValuesOnConditionCheckFailure

func (p *Put) ReturnValuesOnConditionCheckFailure(value string) *Put

func (*Put) Run

func (p *Put) Run() error

func (*Put) RunWithContext

func (p *Put) RunWithContext(ctx context.Context) error

func (*Put) Tx

func (p *Put) Tx() (*dynamodb.TransactWriteItem, error)

type Query

type Query struct {
	// contains filtered or unexported fields
}

func (*Query) ConsistentRead

func (q *Query) ConsistentRead(enabled bool) *Query

func (*Query) ConsumedCapacity

func (q *Query) ConsumedCapacity(capture *ConsumedCapacity) *Query

ConsumedCapacity captures consumed capacity to the property provided

func (*Query) Each

func (q *Query) Each(fn func(item Item) (bool, error)) error

func (*Query) EachWithContext

func (q *Query) EachWithContext(ctx context.Context, fn func(item Item) (bool, error)) (err error)

func (*Query) Filter

func (q *Query) Filter(expr string, values ...interface{}) *Query

Filter allows for the query to be conditionally filtered

func (*Query) FindAll

func (q *Query) FindAll(v interface{}) error

FindAll returns all record

func (*Query) FindAllWithContext

func (q *Query) FindAllWithContext(ctx context.Context, v interface{}) error

FindAllWithContext returns all record using context provided

func (*Query) First

func (q *Query) First(v interface{}) error

First binds the first value and returns

func (*Query) FirstWithContext

func (q *Query) FirstWithContext(ctx context.Context, v interface{}) error

FirstWithContext binds the first value and returns

func (*Query) IndexName

func (q *Query) IndexName(indexName string) *Query

func (*Query) KeyCondition

func (q *Query) KeyCondition(expr string, values ...interface{}) *Query

func (*Query) LastEvaluatedKey

func (q *Query) LastEvaluatedKey(lastEvaluatedKey *map[string]*dynamodb.AttributeValue) *Query

LastEvaluatedKey stores the last evaluated key into the provided value

func (*Query) LastEvaluatedToken

func (q *Query) LastEvaluatedToken(lastEvaluatedToken *string) *Query

LastEvaluatedToken stores the last evaluated key as a base64 encoded string

func (*Query) Limit

func (q *Query) Limit(limit int64) *Query

Limit returns at most N elements; 0 indicates return all elements

func (*Query) QueryInput

func (q *Query) QueryInput() (*dynamodb.QueryInput, error)

QueryInput returns the raw dynamodb QueryInput that will be submitted

func (*Query) ScanIndexForward

func (q *Query) ScanIndexForward(enabled bool) *Query

ScanIndexForward when true returns the values in reverse sort key order https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html

func (*Query) Select

func (q *Query) Select(s string) *Query

Select attributes to return; defaults to dynamodb.SelectAllAttributes

func (*Query) StartKey

func (q *Query) StartKey(startKey map[string]*dynamodb.AttributeValue) *Query

StartKey assigns the continuation key used for query pagination

func (*Query) StartToken

func (q *Query) StartToken(token string) *Query

StartToken encodes start key as a base64 encoded string

type Record

type Record struct {
	// AWSRegion update occurred within
	AWSRegion string `json:"awsRegion"`
	// Change holds the modification to the dynamodb record
	Change Change `json:"dynamodb"`
	// EventID holds a unique identifier for event
	EventID string `json:"eventID"`
	// EventName will be one of INSERT, MODIFY, or REMOVE
	// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html
	EventName string `json:"eventName"`
	// EventSource for record.  Will generally be aws:dynamodb
	EventSource string `json:"eventSource"`
	// EventSourceARN holds the arn of the resource that generated the record
	EventSourceARN string `json:"eventSourceARN"`
	// EventVersion number of the stream format
	EventVersion string `json:"eventVersion"`
}

Record holds the metadata for the dynamodb change

type Scan

type Scan struct {
	// contains filtered or unexported fields
}

Scan encapsulates a scan request

func (*Scan) ConsistentRead

func (s *Scan) ConsistentRead(enabled bool) *Scan

ConsistentRead enables or disables consistent reading

func (*Scan) ConsumedCapacity

func (s *Scan) ConsumedCapacity(capture *ConsumedCapacity) *Scan

ConsumedCapacity captures consumed capacity to the property provided

func (*Scan) Debug

func (s *Scan) Debug(w io.Writer) *Scan

Debug dynamodb request

func (*Scan) Each

func (s *Scan) Each(callback func(item Item) (bool, error)) error

Each is identical to EachWithContext except that it does not allow for cancellation via the context.

func (*Scan) EachWithContext

func (s *Scan) EachWithContext(ctx context.Context, callback func(item Item) (bool, error)) error

EachWithContext iterates invokes the callback for each record that matches the scan. So long as the callback returns `true, nil`, the scan will continue. If the callback either returns an error OR false, the scan will stop. The scan will also stop if the context has been canceled.

func (*Scan) Filter

func (s *Scan) Filter(expr string, values ...interface{}) *Scan

Filter allows for the scan record to be conditionally filtered

func (*Scan) First

func (s *Scan) First(v interface{}) error

First returns the first scanned record

func (*Scan) FirstWithContext

func (s *Scan) FirstWithContext(ctx context.Context, v interface{}) error

FirstWithContext returns the first scanned record and allows for cancellation

func (*Scan) IndexName

func (s *Scan) IndexName(indexName string) *Scan

IndexName to scan for

func (*Scan) TotalSegments

func (s *Scan) TotalSegments(n int64) *Scan

TotalSegments allows for the Scan operation to run in parallel. If not set, defaults to 1 segment

type StringSet

type StringSet []string

StringSet represents an array expressed as a set. (otherwise than a List which would be the default)

func (StringSet) Contains

func (ss StringSet) Contains(want string) bool

Contains returns true if want is contained in the StringSet

func (StringSet) ContainsRegexp

func (ss StringSet) ContainsRegexp(re *regexp.Regexp) bool

ContainsRegexp returns true if regexp matches any element of the Regexp

func (StringSet) MarshalDynamoDBAttributeValue

func (ss StringSet) MarshalDynamoDBAttributeValue(item *dynamodb.AttributeValue) error

MarshalDynamoDBAttributeValue implements Marshaler

func (StringSet) StringSlice

func (ss StringSet) StringSlice() []string

StringSlice returns StringSet as []string

func (StringSet) Sub

func (ss StringSet) Sub(that StringSet) StringSet

Sub returns a new StringSet that contains the original StringSet minus the elements contained in the provided StringSet

func (*StringSet) UnmarshalDynamoDBAttributeValue

func (ss *StringSet) UnmarshalDynamoDBAttributeValue(item *dynamodb.AttributeValue) error

UnmarshalDynamoDBAttributeValue implements Unmarshaler

type Table

type Table struct {
	// contains filtered or unexported fields
}

func (*Table) ConsumedCapacity

func (t *Table) ConsumedCapacity() ConsumedCapacity

func (*Table) CreateTableIfNotExists

func (t *Table) CreateTableIfNotExists(ctx context.Context, opts ...TableOption) error

func (*Table) DDB

func (t *Table) DDB() *DDB

func (*Table) Delete

func (t *Table) Delete(hashKey interface{}) *Delete

func (*Table) DeleteTableIfExists

func (t *Table) DeleteTableIfExists(ctx context.Context) error

func (*Table) Get

func (t *Table) Get(hashKey interface{}) *Get

func (*Table) Put

func (t *Table) Put(v interface{}) *Put

func (*Table) Query

func (t *Table) Query(expr string, values ...interface{}) *Query

func (*Table) Scan

func (t *Table) Scan() *Scan

Scan initiates the scan operation

func (*Table) Update

func (t *Table) Update(hashKey interface{}) *Update

type TableIndexOption

type TableIndexOption interface {
	TableOption
}

func WithReadCapacity

func WithReadCapacity(rcap int64) TableIndexOption

func WithWriteCapacity

func WithWriteCapacity(wcap int64) TableIndexOption

type TableOption

type TableOption interface {
	ApplyTable(o *tableOptions)
}

func WithBillingMode

func WithBillingMode(mode string) TableOption

func WithStreamSpecification

func WithStreamSpecification(streamViewType string) TableOption

type Update

type Update struct {
	// contains filtered or unexported fields
}

Update encapsulates the UpdateItem action

func (*Update) Add

func (u *Update) Add(expr string, values ...interface{}) *Update

Add updates a number or a set

func (*Update) Condition

func (u *Update) Condition(expr string, values ...interface{}) *Update

Condition applies a condition to the update. When called multiple times, the conditions will be and-ed with each other.

func (*Update) ConsumedCapacity

func (u *Update) ConsumedCapacity(capture *ConsumedCapacity) *Update

func (*Update) Delete

func (u *Update) Delete(expr string, values ...interface{}) *Update

Delete deletes elements from a set

func (*Update) NewValues

func (u *Update) NewValues(v interface{}) *Update

func (*Update) OldValues

func (u *Update) OldValues(v interface{}) *Update

OldValues captures the old values into the provided value

func (*Update) Range

func (u *Update) Range(rangeKey interface{}) *Update

Range specifies the optional range key for the update

func (*Update) Remove

func (u *Update) Remove(expr string, values ...interface{}) *Update

Remove an attribute from an Item

func (*Update) ReturnValuesOnConditionCheckFailure

func (u *Update) ReturnValuesOnConditionCheckFailure(value string) *Update

func (*Update) Run

func (u *Update) Run() error

func (*Update) RunWithContext

func (u *Update) RunWithContext(ctx context.Context) error

RunWithContext invokes the update command using the provided context

func (*Update) Set

func (u *Update) Set(expr string, values ...interface{}) *Update

func (*Update) Tx

func (u *Update) Tx() (*dynamodb.TransactWriteItem, error)

Tx returns *dynamodb.TransactWriteItem suitable for use in a transaction

func (*Update) UpdateItemInput

func (u *Update) UpdateItemInput() (*dynamodb.UpdateItemInput, error)

type Window

type Window struct {
	Start string `json:"start,omitempty"`
	End   string `json:"end,omitempty"`
}

Window refers to the tumbling window https://aws.amazon.com/blogs/compute/using-aws-lambda-for-streaming-analytics/

type WriteTx

type WriteTx interface {
	Tx() (*dynamodb.TransactWriteItem, error)
}

WriteTx converts ddb operations into instances of *dynamodb.TransactWriteItem

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL