transformer

package module
v0.0.0-...-201dfa5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 10 Imported by: 1

README

Transformer application - simple data copy from one datastore to another, i.e. (aerospike to mysql)

This application was build to provide end to end testing example of etl application.

The end to end test provide use cases to test aerospike to mysql backup with and without transformation.

Prerequisites:

Enable ssh logic you your use on your machine (on osx System Preference / Sharing / Remote Login )

Install docker service

Download endly

Provide a username and password to login to your box.

endly -c=localhost

Verify that secret file were created

cat ~/.secret/localhost.json

Create 'mysql' secret credentials, provide root as username and non empty password for docker mysqladmin

endly -c=mysql
Run reporter webservice workflow

Run the following command:

git clone https://github.com/viant/endly
cd endly/example/etl/transformer/e2e/

run test with inline workflowrun

endly -r=run

To check workflow tasks list

endly  -t='?'

#Troubleshooting

to check you aerospike just run

docker exec -it mydb4 aql SHOW sets;

docker exec -it mydb3 mysql show tables;

Documentation

Index

Constants

View Source
const (
	//StatusTaskNotRunning  represent terminated task
	StatusTaskNotRunning = iota
	//StatusTaskRunning represents active copy task
	StatusTaskRunning
)

Variables

View Source
var Transformers = make(map[string]Transformer)

Transformers represents transformer registry

Functions

func Flatten

func Flatten(source map[string]interface{}) ([]map[string]interface{}, error)

Flatten converts map to slice

Types

type BaseResponse

type BaseResponse struct {
	Status    string
	Error     string
	StartTime time.Time
	EndTime   time.Time
}

BaseResponse represents a base response

type Config

type Config struct {
	Port string
}

Config represents transformer config

type CopyRequest

type CopyRequest struct {
	BatchSize   int
	InsertMode  bool
	Source      *DatasetResource
	Destination *DatasetResource
	Transformer string
}

CopyRequest represents a copy request

type CopyResponse

type CopyResponse struct {
	*BaseResponse
	*TaskInfo
}

CopyResponse represents a copy response

type DatasetResource

type DatasetResource struct {
	DsConfig  *dsc.Config
	Table     string
	PkColumns []string
	Columns   []string
	SQL       string
}

DatasetResource represents a datastore resource

func (*DatasetResource) AsTableDescription

func (r *DatasetResource) AsTableDescription() *dsc.TableDescriptor

AsTableDescription converts data resource as table descriptor

type KillTaskRequest

type KillTaskRequest struct {
	ID string
}

KillTaskRequest represents kill task

type KillTaskResponse

type KillTaskResponse struct {
	*BaseResponse
	Task *Task
}

KillTaskResponse represents kill task response

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents http server

func NewServer

func NewServer(config *Config, service Service) (*Server, error)

NewServer creates a new http server

func (*Server) Start

func (s *Server) Start()

Start start server

type Service

type Service interface {
	Copy(request *CopyRequest) *CopyResponse

	TaskList(request *TaskListRequest) *TaskListResponse

	KillTask(request *KillTaskRequest) *KillTaskResponse
}

Service represents transformer service

func NewService

func NewService() Service

NewService returns new transformer service

type Task

type Task struct {
	ID         string
	Status     string
	StatusCode int32
	Table      string
	Request    interface{}
	*BaseResponse
	*TaskInfo
}

Task represents a task

func (*Task) Expired

func (t *Task) Expired(currentTime time.Time) bool

Expired returns true if task expired

type TaskInfo

type TaskInfo struct {
	StatusCode         int32
	SkippedRecordCount int
	EmptyRecordCount   int
	RecordCount        int
}

TaskInfo represents processed record info

type TaskListRequest

type TaskListRequest struct {
	Table string
}

TaskListRequest represents a task list request

type TaskListResponse

type TaskListResponse struct {
	Status string
	Tasks  []*Task
}

TaskListResponse represents task list response

type Transformer

type Transformer func(source map[string]interface{}) ([]map[string]interface{}, error)

Transformer represents transformer function

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL