worker

package
v1.9.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: Apache-2.0 Imports: 15 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DataSourceTypeSql = &DataSourceType{
		Name:       "sql",
		FileSuffix: "sql",
		New:        NewDataSourceSql,
	}
	DataSourceTypeExcel = &DataSourceType{
		Name:       "excel",
		FileSuffix: "xlsx",
		New:        NewDataSourceExcel,
	}
	DataSourceTypeText = &DataSourceType{
		Name:       "text",
		FileSuffix: "txt",
		New:        NewDataSourceText,
	}
	DataSourceTypeCsv = &DataSourceType{
		Name:       "csv",
		FileSuffix: "csv",
		New:        NewDataSourceCsv,
	}

	DataSourceTypeList = []*DataSourceType{
		DataSourceTypeSql,
		DataSourceTypeExcel,
		DataSourceTypeText,
		DataSourceTypeCsv,
	}
)

Functions

func ClearTask added in v1.2.7

func ClearTask(taskId string)

func ColumnsSelect

func ColumnsSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string, ignoreError bool) (list []*dialect.ColumnModel, err error)

func DoExec added in v1.0.3

func DoExec(db *sql.DB, sqlInfo string, args []interface{}) (result sql.Result, err error)

func DoExecs added in v1.0.5

func DoExecs(db *sql.DB, sqlList []string, argsList [][]interface{}) (resultList []sql.Result, errSql string, errArgs []interface{}, err error)

func DoOwnerExecs added in v1.7.2

func DoOwnerExecs(dia dialect.Dialect, db *sql.DB, ownerName string, sqlList []string, argsList [][]interface{}) (resultList []sql.Result, errSql string, errArgs []interface{}, err error)

func DoQuery

func DoQuery(db *sql.DB, sqlInfo string, args []interface{}) (list []map[string]interface{}, err error)

func DoQueryCount added in v1.0.8

func DoQueryCount(db *sql.DB, sqlInfo string, args []interface{}) (count int, err error)

func DoQueryOne added in v1.0.9

func DoQueryOne(db *sql.DB, sqlInfo string, args []interface{}) (data map[string]interface{}, err error)

func DoQueryPage added in v1.0.9

func DoQueryPage(db *sql.DB, dia dialect.Dialect, sqlInfo string, args []interface{}, page *Page) (list []map[string]interface{}, err error)

func DoQueryPageStructs added in v1.0.9

func DoQueryPageStructs(db *sql.DB, dia dialect.Dialect, sqlInfo string, args []interface{}, page *Page, list interface{}) (err error)

func DoQueryStruct added in v1.0.9

func DoQueryStruct(db *sql.DB, sqlInfo string, args []interface{}, str interface{}) (find bool, err error)

func DoQueryStructs added in v1.0.9

func DoQueryStructs(db *sql.DB, sqlInfo string, args []interface{}, list interface{}) (err error)

func DoQueryWithColumnTypes added in v1.0.4

func DoQueryWithColumnTypes(db *sql.DB, sqlInfo string, args []interface{}) (columns []string, columnTypes []*sql.ColumnType, list []map[string]interface{}, err error)

func ExecByPrepare added in v1.9.16

func ExecByPrepare(prepare prepareFunc, ctx context.Context, sqlInfo string, sqlArgs ...interface{}) (result sql.Result, err error)

func GetColumnNames added in v1.0.3

func GetColumnNames(columnList []*dialect.ColumnModel) (columnNames []string)

func GetListStructType added in v1.0.9

func GetListStructType(list interface{}) reflect.Type

func GetSqlValue

func GetSqlValue(columnType *sql.ColumnType, data interface{}) (value interface{})

func GetSqlValueCache

func GetSqlValueCache(columnTypes []*sql.ColumnType) (cache []interface{})

func GetTime added in v1.0.3

func GetTime(time time.Time) int64

GetTime 获取当前时间戳

func IndexesSelect

func IndexesSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string, ignoreError bool) (list []*dialect.IndexModel, err error)

func NewTaskExec added in v1.0.9

func NewTaskExec(db *sql.DB, dia dialect.Dialect, newDb func(ownerName string) (db *sql.DB, err error), taskExecParam *TaskExecParam) (res *taskExec)

func NewTaskExport added in v1.0.3

func NewTaskExport(db *sql.DB, dia dialect.Dialect, targetDialect dialect.Dialect, taskExportParam *TaskExportParam) (res *taskExport)

func NewTaskImport added in v1.0.3

func NewTaskImport(db *sql.DB, dia dialect.Dialect, newDb func(owner *TaskImportOwner) (db *sql.DB, err error), taskImportParam *TaskImportParam) (res *taskImport)

func NewTaskSync added in v1.0.3

func NewTaskSync(sourceDB *sql.DB, sourceDialect dialect.Dialect, targetDb *sql.DB, targetDialect dialect.Dialect, newDb func(owner *TaskSyncOwner) (db *sql.DB, err error), taskSyncParam *TaskSyncParam) (res *taskSync)

func Now added in v1.0.3

func Now() time.Time

Now 获取当前时间

func NowTime added in v1.0.3

func NowTime() int64

NowTime 获取当前时间戳

func OwnerCover added in v1.0.3

func OwnerCover(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, owner *dialect.OwnerModel) (success bool, err error)

OwnerCover 库或表所属者 覆盖,如果 库或表所属者 已经存在,则删除后 再创建

func OwnerCreate added in v1.0.3

func OwnerCreate(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, owner *dialect.OwnerModel) (created bool, err error)

func OwnerDelete added in v1.0.3

func OwnerDelete(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string) (deleted bool, err error)

func OwnerSelect added in v1.0.3

func OwnerSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string) (one *dialect.OwnerModel, err error)

func OwnersSelect added in v1.0.3

func OwnersSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel) (list []*dialect.OwnerModel, err error)

func PathExists added in v1.0.3

func PathExists(path string) (bool, error)

func PathIsDir added in v1.2.6

func PathIsDir(path string) (bool, error)

func PrimaryKeysSelect

func PrimaryKeysSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string, ignoreError bool) (list []*dialect.PrimaryKeyModel, err error)

func SetStructColumnValues added in v1.0.9

func SetStructColumnValues(columnValueMap map[string]interface{}, strValue reflect.Value)

func SplitArrayMap added in v1.0.3

func SplitArrayMap(arr []map[string]interface{}, num int) [][]map[string]interface{}

SplitArrayMap 分割数组,根据传入的数组和分割大小,将数组分割为大小等于指定大小的多个数组,如果不够分,则最后一个数组元素小于其他数组

func StopTask added in v1.0.3

func StopTask(taskId string)

func TableCover added in v1.0.3

func TableCover(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, table *dialect.TableModel) (err error)

TableCover 表 覆盖,如果 表 已经存在,则删除后 再创建

func TableCreate added in v1.0.3

func TableCreate(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableDetail *dialect.TableModel) (err error)

func TableDelete added in v1.0.3

func TableDelete(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string) (err error)

func TableDetail

func TableDetail(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string, ignoreError bool) (table *dialect.TableModel, err error)

func TableSelect added in v1.0.3

func TableSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, tableName string, ignoreError bool) (one *dialect.TableModel, err error)

func TableUpdate added in v1.0.3

func TableUpdate(db *sql.DB, oldDia dialect.Dialect, oldTableDetail *dialect.TableModel, newDia dialect.Dialect, newTableDetail *dialect.TableModel) (err error)

func TablesDetail added in v1.1.8

func TablesDetail(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string, ignoreError bool) (list []*dialect.TableModel, err error)

func TablesSelect

func TablesSelect(db *sql.DB, dia dialect.Dialect, param *dialect.ParamModel, ownerName string) (list []*dialect.TableModel, err error)

Types

type DataSource added in v1.0.3

type DataSource interface {
	Stop()
	ReadStart() (err error)
	Read(columnList []*dialect.ColumnModel, onRead func(data *DataSourceData) (err error)) (err error)
	ReadEnd() (err error)
	WriteStart() (err error)
	Write(data *DataSourceData) (err error)
	WriteEnd() (err error)
	WriteHeader(columnList []*dialect.ColumnModel) (err error)
}

func NewDataSourceCsv added in v1.0.3

func NewDataSourceCsv(param *DataSourceParam) (res DataSource)

func NewDataSourceExcel added in v1.0.3

func NewDataSourceExcel(param *DataSourceParam) (res DataSource)

func NewDataSourceSql added in v1.0.3

func NewDataSourceSql(param *DataSourceParam) (res DataSource)

func NewDataSourceText added in v1.0.3

func NewDataSourceText(param *DataSourceParam) (res DataSource)

type DataSourceData added in v1.0.3

type DataSourceData struct {
	HasSql     bool
	Sql        string
	HasData    bool
	Data       map[string]interface{}
	ColumnList []*dialect.ColumnModel
}

type DataSourceParam added in v1.0.3

type DataSourceParam struct {
	Path       string
	Separator  string
	SheetIndex int
	StartRow   int
	SheetName  string
	Linefeed   string
	TitleList  []string
	Dia        dialect.Dialect
}

func (*DataSourceParam) GetCsvSeparator added in v1.0.3

func (this_ *DataSourceParam) GetCsvSeparator() string

func (*DataSourceParam) GetLinefeed added in v1.0.3

func (this_ *DataSourceParam) GetLinefeed() string

func (*DataSourceParam) GetTextSeparator added in v1.0.3

func (this_ *DataSourceParam) GetTextSeparator() string

type DataSourceType added in v1.0.3

type DataSourceType struct {
	Name       string `json:"name"`
	New        func(param *DataSourceParam) (dataSource DataSource)
	FileSuffix string `json:"fileSuffix"`
}

func GetDataSource added in v1.0.3

func GetDataSource(dataSourceType string) (res *DataSourceType)

type Page added in v1.0.9

type Page struct {
	PageSize   int `json:"pageSize"`
	PageNo     int `json:"pageNo"`
	TotalCount int `json:"totalCount"`
	TotalPage  int `json:"totalPage"`
}

func NewPage added in v1.0.9

func NewPage() *Page

type Task added in v1.0.3

type Task struct {
	TaskId     string `json:"taskId"`
	StartTime  int64  `json:"startTime"`
	EndTime    int64  `json:"endTime"`
	UseTime    int64  `json:"useTime"`
	Error      string `json:"error"`
	PanicError string `json:"panicError"`
	IsEnd      bool   `json:"isEnd"`
	IsStop     bool   `json:"isStop"`

	OwnerCount        int `json:"ownerCount"`
	OwnerSuccessCount int `json:"ownerSuccessCount"`
	OwnerErrorCount   int `json:"ownerErrorCount"`

	TableCount        int `json:"tableCount"`
	TableSuccessCount int `json:"tableSuccessCount"`
	TableErrorCount   int `json:"tableErrorCount"`

	DataCount        int `json:"dataCount"`
	DataReadyCount   int `json:"dataReadyCount"`
	DataSuccessCount int `json:"dataSuccessCount"`
	DataErrorCount   int `json:"dataErrorCount"`

	Extend map[string]interface{} `json:"extend"`
	Errors []string               `json:"errors"`

	Param *dialect.ParamModel
	// contains filtered or unexported fields
}

func GetTask added in v1.0.3

func GetTask(taskId string) (task *Task)

func (*Task) Start added in v1.0.3

func (this_ *Task) Start() (err error)

type TaskExecOwner added in v1.0.9

type TaskExecOwner struct {
	Name   string           `json:"name,omitempty"`
	Tables []*TaskExecTable `json:"tables,omitempty"`
}

type TaskExecParam added in v1.0.9

type TaskExecParam struct {
	Owners []*TaskExecOwner `json:"owners"`

	BatchNumber     int  `json:"batchNumber"`
	ContinueIsError bool `json:"continueIsError"`

	OnProgress func(progress *TaskProgress)
}

type TaskExecTable added in v1.0.9

type TaskExecTable struct {
	Name            string                   `json:"name,omitempty"`
	ColumnList      []*dialect.ColumnModel   `json:"columnList,omitempty"`
	InsertList      []map[string]interface{} `json:"insertList"`
	UpdateList      []map[string]interface{} `json:"updateList"`
	UpdateWhereList []map[string]interface{} `json:"updateWhereList"`
	DeleteList      []map[string]interface{} `json:"deleteList"`
}

type TaskExportColumn added in v1.2.6

type TaskExportColumn struct {
	SourceName string `json:"sourceName"`
	TargetName string `json:"targetName"`
	Value      string `json:"value"`
}

type TaskExportOwner added in v1.0.3

type TaskExportOwner struct {
	SourceName     string             `json:"sourceName"`
	TargetName     string             `json:"targetName"`
	SkipTableNames []string           `json:"skipTableNames"`
	Tables         []*TaskExportTable `json:"tables"`
}

type TaskExportParam added in v1.0.3

type TaskExportParam struct {
	Owners         []*TaskExportOwner `json:"owners"`
	SkipOwnerNames []string           `json:"skipOwnerNames"`

	DataSourceType  *DataSourceType `json:"dataSourceType"`
	BatchNumber     int             `json:"batchNumber"`
	ExportStruct    bool            `json:"exportStruct"`
	ExportData      bool            `json:"exportData"`
	ExportBatchSql  bool            `json:"exportBatchSql"`
	ErrorContinue   bool            `json:"errorContinue"`
	Dir             string          `json:"dir"`
	AppendOwnerName bool            `json:"appendOwnerName"`

	IsDataListExport bool                     `json:"isDataListExport"`
	DataList         []map[string]interface{} `json:"dataList"`

	MergeIntoOneFile bool `json:"mergeIntoOneFile"`

	FormatIndexName func(ownerName string, tableName string, index *dialect.IndexModel) string `json:"-"`
	OnProgress      func(progress *TaskProgress)                                               `json:"-"`
}

type TaskExportTable added in v1.0.3

type TaskExportTable struct {
	SourceName string              `json:"sourceName"`
	TargetName string              `json:"targetName"`
	Columns    []*TaskExportColumn `json:"columns"`
}

type TaskImportColumn added in v1.2.6

type TaskImportColumn struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}

type TaskImportOwner added in v1.0.3

type TaskImportOwner struct {
	Name           string             `json:"name"`
	Path           string             `json:"path"`
	SkipTableNames []string           `json:"skipTableNames"`
	Tables         []*TaskImportTable `json:"tables"`
	Username       string             `json:"username"`
	Password       string             `json:"password"`
}

type TaskImportParam added in v1.0.3

type TaskImportParam struct {
	Owners []*TaskImportOwner `json:"owners"`

	DataSourceType        *DataSourceType `json:"dataSourceType"`
	BatchNumber           int             `json:"batchNumber"`
	OwnerCreateIfNotExist bool            `json:"ownerCreateIfNotExist"`
	ErrorContinue         bool            `json:"errorContinue"`

	FormatIndexName func(ownerName string, tableName string, index *dialect.IndexModel) string `json:"-"`
	OnProgress      func(progress *TaskProgress)
}

type TaskImportTable added in v1.0.3

type TaskImportTable struct {
	Name    string              `json:"name"`
	Path    string              `json:"path"`
	Columns []*TaskImportColumn `json:"columns"`
}

type TaskProgress added in v1.0.3

type TaskProgress struct {
	Title   string          `json:"title"`
	Infos   []string        `json:"infos"`
	Error   string          `json:"error"`
	OnError func(err error) `json:"-"`
}

type TaskSyncColumn added in v1.3.1

type TaskSyncColumn struct {
	SourceName string `json:"sourceName"`
	TargetName string `json:"targetName"`
}

type TaskSyncOwner added in v1.0.3

type TaskSyncOwner struct {
	SourceName     string           `json:"sourceName"`
	TargetName     string           `json:"targetName"`
	SkipTableNames []string         `json:"skipTableNames"`
	Tables         []*TaskSyncTable `json:"tables"`
	Username       string           `json:"username"`
	Password       string           `json:"password"`
}

type TaskSyncParam added in v1.0.3

type TaskSyncParam struct {
	Owners []*TaskSyncOwner `json:"owners"`

	BatchNumber           int  `json:"batchNumber"`
	SyncStruct            bool `json:"syncStruct"`
	SyncData              bool `json:"syncData"`
	ErrorContinue         bool `json:"errorContinue"`
	OwnerCreateIfNotExist bool `json:"ownerCreateIfNotExist"`

	FormatIndexName func(ownerName string, tableName string, index *dialect.IndexModel) string `json:"-"`
	OnProgress      func(progress *TaskProgress)                                               `json:"-"`
}

type TaskSyncTable added in v1.0.3

type TaskSyncTable struct {
	SourceName string            `json:"sourceName"`
	TargetName string            `json:"targetName"`
	Columns    []*TaskSyncColumn `json:"columns"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL