gobatch

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2022 License: MIT Imports: 19 Imported by: 0

README

GoBatch

GoDoc Reference Go Report Card MIT license

GoBatch is a batch processing framework in Go like Spring Batch in Java. If you are familiar with Spring Batch, you will find GoBatch very easy to use.

Architecture

In GoBatch, Job is divided into multiple Steps, the steps are executed successively.

There are three types of step:

  • Simple Step execute business logic defined in Handler in a single thread.
  • Chunk Step process data by chunks. The process flow is reading a chunk of data, processing it, then writing output. The process is repeated until no more data to read.
  • Partition Step split task into multiple sub tasks, then execute sub tasks parallelly in sub steps, and aggregate result of sub steps at last.

Features

  • Modular construction for batch application
  • Serial and parallel process flow on your need
  • Break point to resume job
  • Builtin file processing component
  • Listeners for job and step execution
  • Easy to extend

Install

go get -u github.com/chararch/gobatch

Use Step

  1. Create or choose a database, eg: gobatch
  2. Create tables from sql/schema_mysql.sql into previous database
  3. Write gobatch code and run it

Code

Example
import (
	"chararch/gobatch"
	"context"
	"database/sql"
	"fmt"
)

// simple task
func mytask() {
	fmt.Println("mytask executed")
}

//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
	if curr < 100 {
		chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
		return fmt.Sprintf("value-%v", curr), nil
	}
	return nil, nil
}

//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	return fmt.Sprintf("processed-%v", item), nil
}

//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
	fmt.Printf("write: %v\n", items)
	return nil
}

func main()  {
	//set db for gobatch to store job&step execution context
	db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
	if err != nil {
		panic(err)
	}
	gobatch.SetDB(db)

	//build steps
	step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
	//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
	step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()

	//build job
	job := gobatch.NewJob("my_job").Step(step1, step2).Build()

	//register job to gobatch
	gobatch.Register(job)

	//run
	//gobatch.StartAsync(context.Background(), job.Name(), "")
	gobatch.Start(context.Background(), job.Name(), "")
}

You can look at the code in test/example.go

Write a Simple step

There are several methods to write a simple step logic:

// 1. write a function with one of the following signature
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()

// 2. implement the Handler interface
type Handler interface {
	Handle(execution *StepExecution) BatchError
}

Once you wrote the function or Handler interface implementation, you can build step like this:

step1 := gobatch.NewStep("step1").Handler(myfunction).Build()
step2 := gobatch.NewStep("step2").Handler(myHandler).Build()
//or
step1 := gobatch.NewStep("step1", myfunction).Build()
step2 := gobatch.NewStep("step2", myHandler).Build()
Write a Chunk step

To build a chunk step, you should implement the following interfaces, only the Reader is required:

type Reader interface {
    //Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
    Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Processor interface {
    //Process process an item from reader and return a result item
    Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Writer interface {
    //Write write items generated by processor in a chunk
    Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}

There is another interface named ItemReader, which you can use instead of Reader:

type ItemReader interface {
    //ReadKeys read all keys of some kind of data
    ReadKeys() ([]interface{}, error)
    //ReadItem read value by one key from ReadKeys result
    ReadItem(key interface{}) (interface{}, error)
}

For convenience, you can implement the following interface along with Reader or Writer to do some initialization or cleanup:

type OpenCloser interface {
	Open(execution *StepExecution) BatchError
	Close(execution *StepExecution) BatchError
}

You could see the chunk step example under test/example2

Write a Partition step

you can implement the Partitioner interface to split a step into multiple sub steps, optionally you can implement the Aggregator interface if you want to do some aggregation after all sub steps completed:

type Partitioner interface {
	//Partition generate sub step executions from specified step execution and partitions count
	Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
	//GetPartitionNames generate sub step names from specified step execution and partitions count
	GetPartitionNames(execution *StepExecution, partitions uint) []string
}

type Aggregator interface {
    //Aggregate aggregate result from all sub step executions
    Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

If you already have a chunk step with an ItemReader, you can easily build a partition step nothing more than specifying partitions count:

    step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()
Read & Write File

Suppose a file with the following content(each field seperated by a '\t'):

trade_1	account_1	cash	1000	normal	2022-02-27 12:12:12
trade_2	account_2	cash	1000	normal	2022-02-27 12:12:12
trade_3	account_3	cash	1000	normal	2022-02-27 12:12:12
……

We want to read the content and insert each record into a database table named 't_trade', then we do it this way:

type Trade struct {
    TradeNo   string    `order:"0"`
    AccountNo string    `order:"1"`
    Type      string    `order:"2"`
    Amount    float64   `order:"3"`
    TradeTime time.Time `order:"5"`
    Status    string    `order:"4"`
}

var tradeFile = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade.data",
    Type:          file.TSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}

type TradeWriter struct {
    db *gorm.DB
}

func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    models := make([]*Trade, len(items))
    for i, item := range items {
        models[i] = item.(*Trade)
    }
    e := p.db.Table("t_trade").Create(models).Error
    if e != nil {
        return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
    }
    return nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
    //...
    job := gobatch.NewJob("my_job").Step(...,step,...).Build()
    gobatch.Register(job)
    gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}

Suppose we want export data in 't_trade' to a csv file, we can do like this:

type Trade struct {
    TradeNo   string    `order:"0" header:"trade_no"`
    AccountNo string    `order:"1" header:"account_no"`
    Type      string    `order:"2" header:"type"`
    Amount    float64   `order:"3" header:"amount"`
    TradeTime time.Time `order:"5" header:"trade_time" format:"2006-01-02_15:04:05"`
    Status    string    `order:"4" header:"trade_no"`
}

var tradeFileCsv = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade_export.csv",
    Type:          file.CSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}


type TradeReader struct {
    db *gorm.DB
}

func (h *TradeReader) ReadKeys() ([]interface{}, error) {
    var ids []int64
    h.db.Table("t_trade").Select("id").Find(&ids)
    var result []interface{}
    for _, id := range ids {
        result = append(result, id)
    }
    return result, nil
}

func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) {
    id := int64(0)
    switch r := key.(type) {
    case int64:
        id = r
    case float64:
        id = int64(r)
    default:
        return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
    }
    trade := &Trade{}
    result := h.db.Table("t_trade").Find(loan, "id = ?", id)
    if result.Error != nil {
        return nil, result.Error
    }
    return trade, nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build()
    //...
}
Listeners

There are different listeners for the lifecycle of job and step execution:

type JobListener interface {
	BeforeJob(execution *JobExecution) BatchError
	AfterJob(execution *JobExecution) BatchError
}

type StepListener interface {
	BeforeStep(execution *StepExecution) BatchError
	AfterStep(execution *StepExecution) BatchError
}

type ChunkListener interface {
	BeforeChunk(context *ChunkContext) BatchError
	AfterChunk(context *ChunkContext) BatchError
	OnError(context *ChunkContext, err BatchError)
}

type PartitionListener interface {
	BeforePartition(execution *StepExecution) BatchError
	AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
	OnError(execution *StepExecution, err BatchError)
}

You can specify listeners during building job:

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
    //...
    job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}
Global Settings
SetDB

GoBatch needs a database to store job and step execution contexts, so you must pass a *sql.DB instance to GoBatch before running job.

    gobatch.SetDB(sqlDb)
SetTransactionManager

If you are trying to build a chunk step, you must register a TransactionManager instance to GoBatch, the interface is:

type TransactionManager interface {
	BeginTx() (tx interface{}, err BatchError)
	Commit(tx interface{}) BatchError
	Rollback(tx interface{}) BatchError
}

GoBatch has a DefaultTxManager, if you have set DB and have no TransactionManager set yet, GoBatch also create a DefaultTxManager instance for you.

SetMaxRunningJobs & SetMaxRunningSteps

GoBatch has internal TaskPools to run jobs and steps, the max running jobs and steps are limited by the pool size. The default value of the max running jobs and steps are 10, 1000. You can change the default settings by:

    gobatch.SetMaxRunningJobs(100)
    gobatch.SetMaxRunningSteps(5000)

Documentation

Index

Constants

View Source
const (
	//ErrCodeRetry an error indicating the caller should retry
	ErrCodeRetry = "retry"
	//ErrCodeStop an error indicating the job is to be stopped
	ErrCodeStop = "stop"
	//ErrCodeConcurrency an error indicating conflict modification
	ErrCodeConcurrency = "concurrency"
	//ErrCodeDbFail an error indicating database access failed
	ErrCodeDbFail = "db_fail"
	//ErrCodeGeneral general error
	ErrCodeGeneral = "general"
)
View Source
const (
	DefaultJobPoolSize      = 10
	DefaultStepTaskPoolSize = 1000
)

task pool

View Source
const (
	//ItemReaderKeyList the key of keyList in StepContext
	ItemReaderKeyList = "gobatch.ItemReader.key.list"
	//ItemReaderCurrentIndex the key of current offset of step's keyList in StepContext
	ItemReaderCurrentIndex = "gobatch.ItemReader.current.index"
	//ItemReaderMaxIndex the key of max index of step's keyList in StepContext
	ItemReaderMaxIndex = "gobatch.ItemReader.max.index"
)
View Source
const (
	//DefaultChunkSize default number of record per chunk to read
	DefaultChunkSize = 10
	//DefaultPartitions default number of partitions to construct a step
	DefaultPartitions = 1

	//DefaultMinPartitionSize default min number of record to process in a sub step of a partitionStep
	DefaultMinPartitionSize = 1
	//DefaultMaxPartitionSize default max number of record to process in a sub step of a partitionStep
	DefaultMaxPartitionSize = 2147483647
)

Variables

This section is empty.

Functions

func NewJob

func NewJob(name string, steps ...Step) *jobBuilder

NewJob new instance of job builder

func NewStep

func NewStep(name string, handler ...interface{}) *stepBuilder

NewStep initialize a step builder

func Register

func Register(job Job) error

Register register job to gobatch

func Restart

func Restart(ctx context.Context, jobId interface{}) (int64, error)

Restart restart job by job name or job execution id

func RestartAsync

func RestartAsync(ctx context.Context, jobId interface{}) (int64, error)

RestartAsync restart job by job name or job execution id asynchronously

func SetDB

func SetDB(sqlDb *sql.DB)

SetDB register a *sql.DB instance for GoBatch

func SetLogger

func SetLogger(l logs.Logger)

SetLogger set a logger instance for GoBatch

func SetMaxRunningJobs

func SetMaxRunningJobs(size int)

SetMaxRunningJobs set max number of parallel jobs for GoBatch

func SetMaxRunningSteps

func SetMaxRunningSteps(size int)

SetMaxRunningSteps set max number of parallel steps for GoBatch

func SetTransactionManager

func SetTransactionManager(txMgr TransactionManager)

SetTransactionManager register a TransactionManager instance for GoBatch

func Start

func Start(ctx context.Context, jobName string, params string) (int64, error)

Start start job by job name and params

func StartAsync

func StartAsync(ctx context.Context, jobName string, params string) (int64, error)

StartAsync start job by job name and params asynchronously

func Stop

func Stop(ctx context.Context, jobId interface{}) error

Stop stop job by job name or job execution id

func Unregister

func Unregister(job Job)

Unregister unregister job to gobatch

Types

type Aggregator

type Aggregator interface {
	//Aggregate aggregate result from all sub step executions
	Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

Aggregator merge results of sub step executions of a chunk step

type BatchContext

type BatchContext struct {
	// contains filtered or unexported fields
}

BatchContext contains properties during a job or step execution

func NewBatchContext

func NewBatchContext() *BatchContext

NewBatchContext new instance

func (*BatchContext) DeepCopy

func (ctx *BatchContext) DeepCopy() *BatchContext

func (*BatchContext) Exists

func (ctx *BatchContext) Exists(key string) bool

func (*BatchContext) Get

func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}

func (*BatchContext) GetBool

func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)

func (*BatchContext) GetInt

func (ctx *BatchContext) GetInt(key string, def ...int) (int, error)

func (*BatchContext) GetInt64

func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)

func (*BatchContext) GetString

func (ctx *BatchContext) GetString(key string, def ...string) (string, error)

func (*BatchContext) MarshalJSON

func (ctx *BatchContext) MarshalJSON() ([]byte, error)

func (*BatchContext) Merge

func (ctx *BatchContext) Merge(other *BatchContext)

func (*BatchContext) Put

func (ctx *BatchContext) Put(key string, value interface{})

func (*BatchContext) Remove

func (ctx *BatchContext) Remove(key string)

func (*BatchContext) UnmarshalJSON

func (ctx *BatchContext) UnmarshalJSON(b []byte) error

type BatchError

type BatchError interface {
	//Code code of the error
	Code() string
	//Message readable message of the error
	Message() string
	//Error error interface
	Error() string
	//StackTrace goroutine stack trace
	StackTrace() string
}

BatchError represent an error during GoBatch executing

func NewBatchError

func NewBatchError(code string, msg string, args ...interface{}) BatchError

NewBatchError new instance

type ChunkContext

type ChunkContext struct {
	StepExecution *StepExecution
	Tx            interface{}
	End           bool
}

type ChunkListener

type ChunkListener interface {
	//BeforeChunk execute before start of a chunk in a chunkStep
	BeforeChunk(context *ChunkContext) BatchError
	//AfterChunk execute after end of a chunk in a chunkStep
	AfterChunk(context *ChunkContext) BatchError
	//OnError execute when an error occured during a chunk in a chunkStep
	OnError(context *ChunkContext, err BatchError)
}

ChunkListener job listener

type DefaultTxManager

type DefaultTxManager struct {
	// contains filtered or unexported fields
}

DefaultTxManager default TransactionManager implementation

func (*DefaultTxManager) BeginTx

func (tm *DefaultTxManager) BeginTx() (interface{}, BatchError)

BeginTx begin a transaction

func (*DefaultTxManager) Commit

func (tm *DefaultTxManager) Commit(tx interface{}) BatchError

Commit commit a transaction

func (*DefaultTxManager) Rollback

func (tm *DefaultTxManager) Rollback(tx interface{}) BatchError

Rollback rollback a transaction

type FilePath

type FilePath struct {
	NamePattern string
}

FilePath an abstract file path

func (*FilePath) Format

func (f *FilePath) Format(execution *StepExecution) (string, error)

Format generate a real file path by formatting FilePath according to *StepExecution instance

type Future

type Future interface {
	Get() (interface{}, error)
}

Future get result in future

type Handler

type Handler interface {
	//Handle implement handler logic here
	Handle(execution *StepExecution) BatchError
}

Handler is a interface for doing work in a simple step

type ItemReader

type ItemReader interface {
	//ReadKeys read all keys of some kind of data
	ReadKeys() ([]interface{}, error)
	//ReadItem read value by one key from ReadKeys result
	ReadItem(key interface{}) (interface{}, error)
}

ItemReader is for loading large amount of data from a datasource like database, used in a chunk step. When the step executing, it first loads all data keys by calling ReadKeys() once, then load full data by key one by one in every chunk.

type Job

type Job interface {
	Name() string
	Start(ctx context.Context, execution *JobExecution) BatchError
	Stop(ctx context.Context, execution *JobExecution) BatchError
	GetSteps() []Step
}

Job job interface used by GoBatch

type JobExecution

type JobExecution struct {
	JobExecutionId int64
	JobInstanceId  int64
	JobName        string
	JobParams      map[string]interface{}
	JobStatus      status.BatchStatus
	StepExecutions []*StepExecution
	JobContext     *BatchContext
	CreateTime     time.Time
	StartTime      time.Time
	EndTime        time.Time
	FailError      BatchError
	Version        int64
}

JobExecution represents context of a job execution

func (*JobExecution) AddStepExecution

func (e *JobExecution) AddStepExecution(execution *StepExecution)

AddStepExecution add a step execution in this job

type JobListener

type JobListener interface {
	//BeforeJob execute before job start
	BeforeJob(execution *JobExecution) BatchError
	//AfterJob execute after job end either normally or abnormally
	AfterJob(execution *JobExecution) BatchError
}

JobListener job listener

type OpenCloser

type OpenCloser interface {
	//Open do initialization for Reader or Writer
	Open(execution *StepExecution) BatchError
	//Close do cleanups for Reader or Writer
	Close(execution *StepExecution) BatchError
}

OpenCloser is used doing initialization and cleanups for Reader or Writer

type PartitionListener

type PartitionListener interface {
	//BeforePartition execute before enter into Partitioner.Partition() in a partitionStep
	BeforePartition(execution *StepExecution) BatchError
	//AfterPartition execute after return from Partitioner.Partition() in a partitionStep
	AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
	//OnError execute when an error return from Partitioner.Partition() in a partitionStep
	OnError(execution *StepExecution, err BatchError)
}

PartitionListener job listener

type Partitioner

type Partitioner interface {
	//Partition generate sub step executions from specified step execution and partitions count
	Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
	//GetPartitionNames generate sub step names from specified step execution and partitions count
	GetPartitionNames(execution *StepExecution, partitions uint) []string
}

Partitioner split an execution of step into multiple sub executions.

type PartitionerFactory

type PartitionerFactory interface {
	GetPartitioner(minPartitionSize, maxPartitionSize uint) Partitioner
}

PartitionerFactory can create Partitioners, it is used by Reader usually.

type Processor

type Processor interface {
	//Process process an item from reader and return a result item
	Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}

Processor is for processing data in a chunk step

type Reader

type Reader interface {
	//Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
	//If there is an error, nil item and a BatchError will return
	Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}

Reader is for loading data in a chunk step

type Step

type Step interface {
	Name() string
	Exec(ctx context.Context, execution *StepExecution) BatchError
	// contains filtered or unexported methods
}

Step step interface

type StepExecution

type StepExecution struct {
	StepExecutionId      int64
	StepName             string
	StepStatus           status.BatchStatus
	StepContext          *BatchContext
	StepContextId        int64
	StepExecutionContext *BatchContext
	JobExecution         *JobExecution
	CreateTime           time.Time
	StartTime            time.Time
	EndTime              time.Time
	ReadCount            int64
	WriteCount           int64
	CommitCount          int64
	FilterCount          int64
	ReadSkipCount        int64
	WriteSkipCount       int64
	ProcessSkipCount     int64
	RollbackCount        int64
	FailError            BatchError
	LastUpdated          time.Time
	Version              int64
}

StepExecution represents context of a step execution

type StepListener

type StepListener interface {
	//BeforeStep execute before step start
	BeforeStep(execution *StepExecution) BatchError
	//AfterStep execute after step end either normally or abnormally
	AfterStep(execution *StepExecution) BatchError
}

StepListener job listener

type Task

type Task func(execution *StepExecution) BatchError

Task is a function type for doing work in a simple step

type TransactionManager

type TransactionManager interface {
	BeginTx() (tx interface{}, err BatchError)
	Commit(tx interface{}) BatchError
	Rollback(tx interface{}) BatchError
}

TransactionManager used by chunk step to execute chunk process in a transaction.

func NewTransactionManager

func NewTransactionManager(db *sql.DB) TransactionManager

NewTransactionManager create a TransactionManager instance

type Writer

type Writer interface {
	//Write write items generated by processor in a chunk
	Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}

Writer is for writing data in a chunk step

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL