exportcenter

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2023 License: MIT Imports: 16 Imported by: 0

README

exportcenter

大数据导出中心,生成excel

简介

exportcenter能够简便的帮助大家构建大数据导出系统,exportcenter载入后提供导出任务的自动创建,以及大数据量并发安全写入excel

获取

go get github.com/DanPlayer/exportcenter

使用

初始化
getWd, _ := os.Getwd()
// 开启导出中心
center, err := exportcenter.NewClient(exportcenter.Options{
    Db:           db,                                 // 数据库实例
    QueuePrefix:  "ec_",                              // 队列前缀
    Queue:        redis.Client,                       // 使用redis队列
    SheetMaxRows: 500000,                             // 表格最大接收行数
    PoolMax:      2,                                  // 最大并发池
    GoroutineMax: 30,                                 // 最大协程数
    LogRootPath:  fmt.Sprintf("%s/%s", getWd, "log"), // 日志文件存储地址
    OutTime:      5 * time.Second,
})
if err != nil {
    return
}
创建导出任务
// 创建任务
// 返回值:
// id:任务ID
// keys:任务队列所使用的所有key值,这些key需要用户自己去使用,比如同时向所有队列推入数据,这样的话,导出数据的效率会更高
id, keys, err := center.CreateTask(
    "test",
    "test_name",
    "test_file",
    "测试使用",
    "本地处理的数据",
    "xlsx",
    2,
    exportcenter.ExportOptions{
        Header: []string{
            "header1",
            "header2",
            "header3",
        },
    },
)
导入数据
err := center.PushData(key, datum)
if err != nil {
    return
}
开启任务
center.StartTask(int64(id))
导出表格
err = center.ExportToExcel(int64(id), "./test.xlsx", func(key string) error {
    // 在数据导入表格之前,做你想做的事情
    
    return nil
})
if err != nil {
    return
}
日志生成

LogRootPath 配置后,会将日志自动写入该目录下,并且会根据时间7天来分割日志,保存时间为28天,同一日志最多保存3个,计划将此配置化

案例

Redis案例
func TestRedisTaskExport(t *testing.T) {
	getWd, _ := os.Getwd()
	// 开启导出中心
	center, err := exportcenter.NewClient(exportcenter.Options{
		Db:           db,                                 // 数据库实例
		QueuePrefix:  "ec_",                              // 队列前缀
		Queue:        redis.Client,                       // 使用redis队列
		SheetMaxRows: 500000,                             // 表格最大接收行数
		PoolMax:      2,                                  // 最大并发池
		GoroutineMax: 30,                                 // 最大协程数
		LogRootPath:  fmt.Sprintf("%s/%s", getWd, "log"), // 日志文件存储地址
		OutTime:      5 * time.Second,
	})
	if err != nil {
		return
	}

	id, keys, err := center.CreateTask(
		"test",
		"test_name",
		"test_file",
		"测试使用",
		"本地处理的数据",
		"xlsx",
		2,
		exportcenter.ExportOptions{
			Header: []string{
				"header1",
				"header2",
				"header3",
			},
		},
	)

	for _, key := range keys {
		data := []string{
			"[\"get1\",\"get1\",\"get1\"]",
		}
		for _, datum := range data {
			err := center.PushData(key, datum)
			if err != nil {
				return
			}
		}
	}
	
	center.StartTask(int64(id))

	err = center.ExportToExcel(int64(id), "./test.xlsx", nil)
	if err != nil {
		return
	}
}
RabbitMQ案例
func demo() {
	getWd, _ := os.Getwd()
	// 开启导出中心
	center, err := exportcenter.NewClient(exportcenter.Options{
		Db:           db,                                 // 数据库实例
		QueuePrefix:  "ec_",                              // 队列前缀
		Queue:        rabbitmq.Client,                    // 使用mq队列
		SheetMaxRows: 500000,                             // 表格最大接收行数
		PoolMax:      2,                                  // 最大并发池
		GoroutineMax: 30,                                 // 最大协程数
		LogRootPath:  fmt.Sprintf("%s/%s", getWd, "log"), // 日志文件存储地址
		OutTime:      5 * time.Second,
	})
	if err != nil {
		return
	}

	id, keys, err := center.CreateTask(
		"test",
		"test_name",
		"test_file",
		"测试使用",
		"本地处理的数据",
		"xlsx",
		2,
		exportcenter.ExportOptions{
			Header: []string{
				"header1",
				"header2",
				"header3",
			},
		},
	)

	for _, key := range keys {
		data := []string{
			"[\"get1\",\"get1\",\"get1\"]",
		}
		for _, datum := range data {
			err := center.PushData(key, datum)
			if err != nil {
				return
			}
		}
	}
	
	center.StartTask(int64(id))

	err = center.ExportToExcel(int64(id), "./test.xlsx", func(key string) error {
		// 重新开启消费者
		err := rabbitmq.Client.DeclareConsume(key)
		if err != nil {
			fmt.Println(err)
		}
		return nil
	})
	if err != nil {
		return
	}
}
性能测试

本地使用了mq进行测试,开启了5个队列进行测试,写入150w的数据,导出excel的时间30s左右

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DbClient *gorm.DB

Functions

This section is empty.

Types

type ExportCenter

type ExportCenter struct {
	Db    *gorm.DB
	Queue Queue
	// contains filtered or unexported fields
}

func NewClient

func NewClient(options Options) (*ExportCenter, error)

func (*ExportCenter) CompleteTask

func (ec *ExportCenter) CompleteTask(id, writeNum int64) error

CompleteTask 完成任务

func (*ExportCenter) ConsultTask

func (ec *ExportCenter) ConsultTask(id int64) error

ConsultTask 任务进行中

func (*ExportCenter) CreateTask

func (ec *ExportCenter) CreateTask(key, name, description, source, destination, format string, count int64, options ExportOptions) (uint, []string, error)

CreateTask 创建导出任务

func (*ExportCenter) ExportToExcel

func (ec *ExportCenter) ExportToExcel(id int64, filePath string, before func(key string) error) (err error)

ExportToExcel 导出成excel表格,格式

func (*ExportCenter) FailTask

func (ec *ExportCenter) FailTask(id int64, errNum, writeNum int64) error

FailTask 任务失败

func (*ExportCenter) GetTask

func (ec *ExportCenter) GetTask(id int64) (info Task, err error)

GetTask 获取任务信息

func (*ExportCenter) PopData

func (ec *ExportCenter) PopData(key string) <-chan string

PopData 拉取队列数据

func (*ExportCenter) PushData

func (ec *ExportCenter) PushData(key string, data string) error

PushData 推送导出数据到队列

func (*ExportCenter) StartTask

func (ec *ExportCenter) StartTask(id int64)

StartTask 开启任务

func (*ExportCenter) UpdateTaskDownloadUrl

func (ec *ExportCenter) UpdateTaskDownloadUrl(id int64, url string) error

UpdateTaskDownloadUrl 更新任务文件下载链接

func (*ExportCenter) UpdateTaskErrLogUrl

func (ec *ExportCenter) UpdateTaskErrLogUrl(id int64, url string) error

UpdateTaskErrLogUrl 更新错误日志地址

type ExportOptions

type ExportOptions struct {
	FileName string   `json:"file_name"` // 文件名称
	Header   []string `json:"header"`    // 表头配置
}

ExportOptions 导出选项

type Options

type Options struct {
	Db            *gorm.DB                              // gorm实例
	QueuePrefix   string                                // 队列前缀
	Queue         Queue                                 // 队列配置(必须配置)
	SheetMaxRows  int64                                 // 数据表最大行数,用于生成队列key,可以用不同的队列同时并发写入数据,队列数量由【任务数据量】/【数据表最大行数】计算所得
	PoolMax       int                                   // 协程池最大数量
	GoroutineMax  int                                   // 协程最大数量
	IsUploadCloud bool                                  // 是否上传云端
	Upload        func(filePath string) (string, error) // 上传接口
	LogRootPath   string                                // 日志存储根目录
	OutTime       time.Duration                         // 超时时间
}

Options 配置

type Queue

type Queue interface {
	CreateQueue(ctx context.Context, key string) error       // 创建队列
	Pop(ctx context.Context, key string) <-chan string       // 拉取数据
	Push(ctx context.Context, key string, data string) error // 推送数据
	Destroy(ctx context.Context, key string) error           // 删除队列
}

Queue 队列

type Task

type Task struct {
	gorm.Model
	Name          string       `gorm:"type:varchar(255);comment:'任务名称'"`
	Description   string       `gorm:"type:text;comment:'描述'"`
	Status        int          `gorm:"type:tinyint(1);default:1;comment:'状态 1-待处理、2-处理中、3-已完成、4-失败、5-任务废弃'"`
	ProgressRate  int          `gorm:"type:tinyint(3);default:0;comment:'任务进度1-100'"`
	StartTime     sql.NullTime `gorm:"type:datetime;comment:'任务开始时间'"`
	EndTime       sql.NullTime `gorm:"type:datetime;comment:'任务结束时间'"`
	Source        string       `gorm:"type:varchar(255);comment:'数据源,描述导出数据的来源'"`
	Destination   string       `gorm:"type:varchar(255);comment:'数据目标,描述导出数据的存储位置'"`
	ExportFormat  string       `gorm:"type:varchar(255);comment:'导出格式,如CSV、JSON、XML等'"`
	ExportOptions string       `gorm:"type:text;comment:'导出选项,可存储导出任务的配置信息(可选)'"`
	QueueKey      string       `gorm:"type:varchar(255);comment:'队列key'"`
	CountNum      int64        `gorm:"type:int(11);default:0;comment:'数据总数'"`
	WriteNum      int64        `gorm:"type:int(11);default:0;comment:'已写入数据数量'"`
	ErrNum        int64        `gorm:"type:int(11);default:0;comment:'错误数据数'"`
	ErrLogUrl     string       `gorm:"type:text;comment:'错误日志地址'"`
	DownloadUrl   string       `gorm:"type:text;comment:'文件下载地址'"`
}

Task 任务表 用于记录所有的到处任务以及导出状态

func (*Task) CompleteTaskByID

func (m *Task) CompleteTaskByID(id int64, writeNum int64) error

func (*Task) Create

func (m *Task) Create() error

func (*Task) FailTaskByID

func (m *Task) FailTaskByID(id int64, errNum, writeNum int64) error

func (*Task) FindByID

func (m *Task) FindByID(id int64) (info Task, err error)

func (*Task) UpdateDownloadUrlByID

func (m *Task) UpdateDownloadUrlByID(id int64, url string) error

func (*Task) UpdateErrLogUrlByID

func (m *Task) UpdateErrLogUrlByID(id int64, url string) error

func (*Task) UpdateStatusByID

func (m *Task) UpdateStatusByID(id int64, status TaskStatus) error

type TaskStatus

type TaskStatus int
const (
	TaskStatusWait      TaskStatus = 1
	TaskStatusConsult   TaskStatus = 2
	TaskStatusCompleted TaskStatus = 3
	TaskStatusFail      TaskStatus = 4
	TaskStatusAbandon   TaskStatus = 5
)

func (TaskStatus) ParseInt

func (s TaskStatus) ParseInt() int

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL