dstransfer

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2019 License: Apache-2.0 Imports: 14 Imported by: 0

README

dstransfer - simple cross datastore ETL SQL based transfer

Motivation

Traditionally transferring data between different vendor data sources involved data export to text file i.e csv, followed by importing it to destination database. While this may work for the most scenarios, representing null values and converting incompatible data types like DATE/TIMESTAMP could be challenging.

This project provides a simple SQL based alternative addressing these concerns. On top of that it copies data between arbitrary database/datastore (RDBMS/NoSQL) in a way that is both memory and writes optimized. While the first streamlining is achieved with using compacted slices as opposed to generic slice of a map, the latter uses batch insert and concurrent writers.

Installation

With docker
  • Building app
 cd /tmp/ && git clone https://github.com/adrianwit/dstransfer 
cd dstransfer
docker build -t adrianwit/dstransfer:0.1.0 . 
  • Starting app
cd /tmp/dstransfer/config/ && docker-compose up  -d  
Standalone
  • Building app

Prerequisites: go 1.11+

go install github.com/adrianwit/dstransfer/dstransfer
  • Starting app
export GOPATH=~/go/bin/
$GOPATH/bin/dstransfer -port=8080

Usage

 curl  --header "Content-type: text/json" -d "@transfer.json" -X POST http://localhost:8080/v1/api/transfer
 
 curl http://127.0.0.1:8080/v1/api/tasks
 
 while :; do clear; curl http://127.0.0.1:8080/v1/api/tasks; sleep 2; done

@transfer.json

{

  "BatchSize": 256,
  "WriterThreads": 4,
  "Mode": "insert",

  "Source": {
    "Credentials": "source_mysql",
    "Descriptor": "[username]:[password]@tcp(xxxxx:3306)/[dbname]?parseTime=true",
    "DriverName": "mysql",
    "Parameters": {
      "dbname": "db1"
    },
    "Query": "SELECT * FROM source_table"
  },


  "Dest": {
    "Credentials": "bq",
    "DriverName": "bigquery",
    "Parameters": {
      "datasetId": "db2"
    },
    "Table": "target_table"
  }

}

Credentials

Credential are stored in ~/.secret/CREDENTIAL_NAME.json using toolobx/cred/config.go format.

For example:

@source_mysql

{"Username":"root","Password":"dev"}

To generate encrypted credentials download and install the latest endly and run the following

endly -c=source_mysql

For BigQuery: use service account generated JSON key type credentials.

Supported datastores:

Already imported drivers:

  • mysql
  • postgresql
  • aerospike
  • bigquery
  • mongodb
  • casandra
  • dynamodb
  • firebase
  • firestore

Supported but not imported drivers (CGO dependency)

  • oracle
  • vertica (via odbc)

Transfer mode

  • insert use only INSERT INTO statement (suitable as append)
  • persist determine which record needs to be updated or inserted(slower option)

Documentation

Index

Constants

View Source
const (
	TransferModeInsert = "insert"
)

Variables

This section is empty.

Functions

func NewRouter

func NewRouter(dummyService *Service) http.Handler

Types

type Dest

type Dest struct {
	*dsc.Config
	Table string
}

Dest transfer destination

func (*Dest) Validate

func (s *Dest) Validate() error

Validate destination

type Router

type Router struct {
	*http.ServeMux
	// contains filtered or unexported fields
}

type Server

type Server struct {
	*http.Server
	// contains filtered or unexported fields
}

func NewServer

func NewServer(service *Service, port int) *Server

NewServer creates a new server

func (*Server) Stop

func (s *Server) Stop()

func (*Server) StopOnSiginals

func (s *Server) StopOnSiginals(siginals ...os.Signal)

type Service

type Service struct {
	// contains filtered or unexported fields
}

func New

func New(interactive bool, callback func(task *TransferTask)) *Service

func (*Service) Task

func (s *Service) Task(id int, writer http.ResponseWriter) *TransferTask

func (*Service) Tasks

func (s *Service) Tasks() *TasksResponse

func (*Service) Transfer

func (s *Service) Transfer(request *TransferRequest) *TransferResponse

type Source

type Source struct {
	*dsc.Config
	Query string
}

Source source

func (*Source) Validate

func (s *Source) Validate() error

Validate validates source

type Tasks

type Tasks []*TransferTask

func (Tasks) Len

func (a Tasks) Len() int

func (Tasks) Less

func (a Tasks) Less(i, j int) bool

func (Tasks) Swap

func (a Tasks) Swap(i, j int)

type TasksResponse

type TasksResponse struct {
	Tasks Tasks
}

TasksResponse tasks response

type TransferRequest

type TransferRequest struct {
	Source        *Source
	Dest          *Dest
	BatchSize     int
	WriterThreads int    `description:"number of writer go routines"`
	Mode          string `description:"supported values: insert or persist"`
	OmitEmpty     bool   `description:"if set set null for any 0 or empty values"`
}

TransferRequest represents transfer request

func (*TransferRequest) Init

func (r *TransferRequest) Init() error

func (*TransferRequest) Validate

func (r *TransferRequest) Validate() error

Validate validates request

type TransferResponse

type TransferResponse struct {
	TaskId int
	Status string
	Error  string
}

TransferResponse transfer response

func (*TransferResponse) SetError

func (r *TransferResponse) SetError(err error)

type TransferTask

type TransferTask struct {
	ID        int
	Request   *TransferRequest
	StartTime time.Time
	EndTime   *time.Time
	Error     string
	Status    string

	ReadCount   int
	WriteCount  uint64
	TimeTakenMs int
	// contains filtered or unexported fields
}

TransferTask represents a transfer tasks

func NewTransferTask

func NewTransferTask(request *TransferRequest) (*TransferTask, error)

func (*TransferTask) CanEvict

func (t *TransferTask) CanEvict() bool

func (*TransferTask) HasError

func (t *TransferTask) HasError() bool

IsReading returns true if error occured

func (*TransferTask) IsReading

func (t *TransferTask) IsReading() bool

IsReading returns true if transfer read data from the source

func (*TransferTask) SetError

func (t *TransferTask) SetError(err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL