stream

package module
v2.0.0-...-a2e8ab8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2023 License: MIT Imports: 12 Imported by: 0

README

go-dynamodb-stream-subscriber

Usage

Go channel for streaming Dynamodb Updates

package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams"
	"github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types"
	"time"
)

func main() {
	region := "ap-northeast-2"
	table := "test"

	cfg, err := config.LoadDefaultConfig(
		context.Background(),
		config.WithRegion(region),
	)
	if err != nil {
		panic(err)
	}

	streamCfg, err := config.LoadDefaultConfig(
		context.Background(),
		config.WithRegion(region),
	)
	if err != nil {
		panic(err)
	}

	subscriber := stream.NewStreamSubscriber(
		table,
		dynamodb.NewFromConfig(cfg),
		dynamodbstreams.NewFromConfig(streamCfg),
	)
	recordCh, errCh := subscriber.GetStreamData()

	go func() {
		for record := range recordCh {
			fmt.Println("from record channel: ", record)
		}
	}()

	go func() {
		for err := range errCh {
			fmt.Println("from error channel: ", err)
		}
	}()

	time.Sleep(100 * time.Second)
	
	subscriber.Shutdown()
}

Deployment

If using this in actual deployment. There is a throttle on the shard implementation on AWS. That means that if you have a large deployment and have multiple calls towards the same shard AWS may very well throttle the calls resulting in ProvisionedThroughputExceededException and triggering a back-off.

The solution (before actually doing this) may be to have a 1:1 connection of applicatoin and shard to guarantee not hitting the limit.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamoService

type DynamoService interface {
	DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error)
}

type ShardSequence

type ShardSequence struct {
	ShardId        string
	SequenceNumber string
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	dynamoSvc DynamoService,
	streamSvc StreamService,
	table string,
) *Subscriber

func (*Subscriber) SetErrorBufferSize

func (r *Subscriber) SetErrorBufferSize(bufferSize int32)

func (*Subscriber) SetMaximumRecords

func (r *Subscriber) SetMaximumRecords(maximumRecords int32)

func (*Subscriber) SetRecordBufferSize

func (r *Subscriber) SetRecordBufferSize(bufferSize int32)

func (*Subscriber) SetShardIteratorInitialInterval

func (r *Subscriber) SetShardIteratorInitialInterval(shardIteratorInitialInterval time.Duration)

func (*Subscriber) SetShardIteratorMaxInterval

func (r *Subscriber) SetShardIteratorMaxInterval(shardIteratorMaxInterval time.Duration)

func (*Subscriber) SetShardIteratorType

func (r *Subscriber) SetShardIteratorType(shardIteratorType types.ShardIteratorType)

func (*Subscriber) SetShardProcessQueueSize

func (r *Subscriber) SetShardProcessQueueSize(shardProcessQueueSize int)

func (*Subscriber) SetShardProcessWorkers

func (r *Subscriber) SetShardProcessWorkers(shardProcessWorkers int)

func (*Subscriber) SetShardSequenceIteratorType

func (r *Subscriber) SetShardSequenceIteratorType(shardSequenceIteratorType types.ShardIteratorType)

func (*Subscriber) SetShardSequences

func (r *Subscriber) SetShardSequences(shardSequences []*ShardSequence)

func (*Subscriber) SetShardUpdateInterval

func (r *Subscriber) SetShardUpdateInterval(shardUpdateInterval time.Duration)

func (*Subscriber) ShardSequences

func (r *Subscriber) ShardSequences() []*ShardSequence

func (*Subscriber) Shutdown

func (r *Subscriber) Shutdown()

func (*Subscriber) Subscribe

func (r *Subscriber) Subscribe() (<-chan *types.Record, <-chan error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL