drhlib

package module
v0.0.0-...-832dae8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

README

drhlib

This repo is the Golang implementaion of data replication migration lib

Getting Started

Prerequisites

Prepare environment variables with job info as below

export JOB_TABLE_NAME=test-table
export SQS_QUEUE_NAME=test-queue
export SRC_BUCKET_NAME=test-src
export SRC_BUCKET_PREFIX=
export SRC_REGION=us-west-2
export DEST_BUCKET_NAME=test-dest
export DEST_BUCKET_PREFIX=
export DEST_REGION=us-west-1
export STORAGE_CLASS=STANDARD
...
export SOURCE_TYPE=Amazon_S3
export MULTIPART_THRESHOLD=10
export CHUNK_SIZE=5
export MAX_THREADS=10

Then run go get

go get github.com/daixba/drhlib
Start Finder

Finder lists and compares objects in source and target and get the delta and send that to SQS Queue.

For example:

import (
	"context"
	"github.com/daixba/drhlib"
)

func main() {
	ctx := context.TODO()
	f := drhlib.InitFinder(ctx)
	f.CompareAndSend()
}
Start Migrator

Migrator receives messages from SQS Queue and replicate the object from source to target (Get from Source, Put to Target)

For example:

import (
	"context"
	"github.com/daixba/drhlib"
)

func main() {
	ctx := context.TODO()
	// ... To be updated.
}

Documentation

Index

Constants

View Source
const (

	// MaxKeys is the maximum number of keys returned per listing request, default is 1000
	MaxKeys int32 = 1000

	// MultipartThreshold is the threshold size (in Bytes) to determine to use multipart upload or not.
	// When object size is greater or equals to MultipartThreshold, multipart upload will be used.
	MultipartThreshold int = 10 * 1024 * 1024

	// ChunkSize is the chunk size (in Bytes) for each part when using multipart upload
	ChunkSize int = 5 * 1024 * 1024

	// MaxParts the maximum number of parts is 10000 for multipart upload
	MaxParts int = 10000
)

Variables

DefaultConfig is default config used for replication

Functions

This section is empty.

Types

type Client

type Client interface {
	// GET
	ListObjects(continuationToken *string) ([]JobInfo, error)
	HeadObject(key string)
	GetObject(key string, size, start, chunkSize int, version string) ([]byte, error)

	// PUT
	PutObject(key string, body []byte, storageClass string) string
	CreateMultipartUpload(key string)
	CompleteMultipartUpload(key string)
	UploadPart(key string)
	ListParts(key string)
	ListMultipartUploads(key string)
}

Client is an interface used to contact with Cloud Storage Services

type DBService

type DBService struct {
	// contains filtered or unexported fields
}

DBService is a wrapper service used to interact with Amazon DynamoDB

func NewDBService

func NewDBService(ctx context.Context, tableName string) (*DBService, error)

NewDBService is a ...

type Finder

type Finder struct {
	// contains filtered or unexported fields
}

Finder struct

func InitFinder

func InitFinder(ctx context.Context) (f *Finder)

InitFinder creates a new finder

func (*Finder) CompareAndSend

func (f *Finder) CompareAndSend()

CompareAndSend is main execution function. This function will compare source and target and get a list of delta, and then send delta to SQS Queue.

type Item

type Item struct {
	ObjectKey, StorageClass, DesBucket, DesKey, Status, Version string
	Size                                                        int32
	StartTime, EndTime                                          int
	ExtraInfo                                                   Metadata
}

Item holds info about the items to be stored in DynamoDB

type JobConfig

type JobConfig struct {
	MaxKeys                                 int32
	ChunkSize, MultipartThreshold, MaxParts int
}

JobConfig is extra config

type JobInfo

type JobInfo struct {
	Key  string
	Size int64
}

JobInfo represents an object to be migrated.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is Default message format

type Metadata

type Metadata struct {
	ContentType string
}

Metadata info of object

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator is a struct ..

func (*Migrator) StartMigration

func (m *Migrator) StartMigration()

StartMigration is a function to

type S3Client

type S3Client struct {
	// contains filtered or unexported fields
}

S3Client is an implementation of Client interface for Amazon S3

func NewS3Client

func NewS3Client(ctx context.Context, bucket, prefix, region string) *S3Client

NewS3Client create a S3Client instance

func (*S3Client) CompleteMultipartUpload

func (c *S3Client) CompleteMultipartUpload(key string)

CompleteMultipartUpload is

func (*S3Client) CreateMultipartUpload

func (c *S3Client) CreateMultipartUpload(key string)

CreateMultipartUpload is

func (*S3Client) GetObject

func (c *S3Client) GetObject(key string, size, start, chunkSize int, version string) ([]byte, error)

GetObject is a function to get (download) object from Amazon S3

func (*S3Client) HeadObject

func (c *S3Client) HeadObject(key string)

HeadObject is a function to get extra metadata from Amazon S3

func (*S3Client) ListMultipartUploads

func (c *S3Client) ListMultipartUploads(key string)

ListMultipartUploads is

func (*S3Client) ListObjects

func (c *S3Client) ListObjects(continuationToken *string) ([]JobInfo, error)

ListObjects is a function to list objects from Amazon S3

func (*S3Client) ListParts

func (c *S3Client) ListParts(key string)

ListParts is

func (*S3Client) PutObject

func (c *S3Client) PutObject(key string, body []byte, storageClass string) string

PutObject is a function to put (upload) an object to Amazon S3

func (*S3Client) UploadPart

func (c *S3Client) UploadPart(key string)

UploadPart is

type SqsService

type SqsService struct {
	// contains filtered or unexported fields
}

SqsService is a wrapper service used to interact with Amazon SQS

func NewSqsService

func NewSqsService(ctx context.Context, queueName string) (*SqsService, error)

NewSqsService is helper function used to create a SqsService instance

func (*SqsService) DeleteMessage

func (ss *SqsService) DeleteMessage() (ok bool)

DeleteMessage function is used to delete message from the Queue Returns True if message is deleted successfully

func (*SqsService) IsQueueEmpty

func (ss *SqsService) IsQueueEmpty() bool

IsQueueEmpty is a function to check if the Queue is empty or not

func (*SqsService) ReceiveMessages

func (ss *SqsService) ReceiveMessages()

ReceiveMessages function receives many messages in batch from the Queue

func (*SqsService) SendMessage

func (ss *SqsService) SendMessage(msg Message)

SendMessage function sends 1 message at a time to the Queue

func (*SqsService) SendMessageInBatch

func (ss *SqsService) SendMessageInBatch(msgs []Message)

SendMessageInBatch function sends messages to the Queue in batch. Each batch can only contains up to 10 messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL