bigcsvreader

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 12 Imported by: 1

README

BigCsvReader

Build Status License Coverage Status Goreportcard Go Reference


Package bigcscvreader offers a multi-threaded approach for reading a large CSV file in order to improve the time of reading and processing it.
It spawns multiple goroutines, each reading a piece of the file.
Read rows are put into channels equal in number to the spawned goroutines, in this way also the processing of those rows can be parallelized.

Installation
$ go get github.com/actforgood/bigcsvreader
Example

Please refer to this example.

How it is designed to work

BigCsvReader-HowItWorks

Benchmarks
go version go1.22.1 darwin/amd64
go test -timeout=15m -benchmem -benchtime=2x -bench . 
goos: darwin
goarch: amd64
pkg: github.com/actforgood/bigcsvreader
cpu: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
Benchmark50000Rows_50Mb_withBigCsvReader-8                                 2    8076491568 ns/op     61744680 B/op    100269 allocs/op
Benchmark50000Rows_50Mb_withStdGoCsvReaderReadAll-8   	                   2    65237799108 ns/op    67924264 B/op    100043 allocs/op
Benchmark50000Rows_50Mb_withStdGoCsvReaderReadOneByOneAndReuseRecord-8     2    66750849960 ns/op    57606432 B/op     50020 allocs/op
Benchmark50000Rows_50Mb_withStdGoCsvReaderReadOneByOneProcessParalell-8    2    8184433872 ns/op     61607624 B/op    100040 allocs/op

Benchmarks are made with a file of ~50Mb in size, also a fake processing of any given row of 1ms was taken into consideration.
bigcsvreader was launched with 8 goroutines.
Other benchmarks are made using directly the encoding/csv go package.
As you can see, bigcsvreader reads and processes all rows in ~8s.
Go standard csv package reads and processes all rows in ~65s (sequentially).
Go standard csv package read and a parallel processing of rows timing is comparable to the one of bigcsvreader (so this strategy is a good alternative to this package).
ReadAll API has the disadvantage of keeping all rows into memory.
Read rows one by one API with ReuseRecord flag set has the advantage of fewer allocations, but has the cost of sequentially reading rows.

Note: It's a coincidence that parallelized version timing was ~equal to sequential timing divided by no of started goroutines. You should not take this as a rule.

Bellow are some process stats captured with unix TOP command while running each benchmark.

Bench %CPU MEM
Benchmark50000Rows_50Mb_withBigCsvReader 17.3 9652K
Benchmark50000Rows_50Mb_withStdGoCsvReaderReadAll 5.8 66M
Benchmark50000Rows_50Mb_withStdGoCsvReaderReadOneByOneAndReuseRecord 11.3 6908K

(!) Known issue: This package does not work as expected with multiline columns.

License

This package is released under a MIT license. See LICENSE.

Documentation

Overview

Package bigcsvreader offers a multi-threaded approach for reading a large CSV file in order to improve the time of reading and processing it. It spawns multiple goroutines, each reading a piece of the file. Read rows are put into channels equal in number to the spawned goroutines, in this way also the processing of those rows can be parallelized.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmptyFile = errors.New("empty csv file")

ErrEmptyFile is an error returned if CSV file is empty.

Functions

This section is empty.

Types

type CsvReader

type CsvReader struct {
	// MaxGoroutinesNo is the maximum goroutines to start parsing the CSV file.
	// Minimum required bytes to start a new goroutine is 2048 bytes.
	// Defaults to [runtime.NumCPU].
	MaxGoroutinesNo int
	// FileHasHeader is a flag indicating if file's first row is the header (columns names).
	// If so, the header line is disregarded and not returned as a row.
	// Defaults to false.
	FileHasHeader bool
	// ColumnsCount is the number of columns the CSV file has.
	ColumnsCount int
	// ColumnsDelimiter is the delimiter char between columns. Defaults to comma.
	ColumnsDelimiter rune
	// BufferSize is used internally for [bufio.Reader] size. Has a default value of 4096.
	// If you have lines bigger than this value, adjust it not to get "buffer full" error.
	BufferSize int
	// Logger can be set to perform some debugging/error logging.
	// Defaults to a no-operation logger (no log is performed).
	// You can enable logging by passing a logger that implements [internal.Logger] contract.
	Logger internal.Logger

	// LazyQuotes is a flag used to allow quotes in an unquoted field and non-doubled quotes
	// in a quoted field
	LazyQuotes bool
	// contains filtered or unexported fields
}

CsvReader reads async rows from a CSV file. It does that by initializing multiple goroutines, each of them handling a chunk of data from the file.

Example
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"

	"github.com/actforgood/bigcsvreader"
)

const (
	columnProductID = iota
	columnProductName
	columnProductDescription
	columnProductPrice
	columnProductQty
)

const noOfColumns = 5

type Product struct {
	ID    int
	Name  string
	Desc  string
	Price float64
	Qty   int
}

func main() {
	// initialize the big csv reader
	bigCSV := bigcsvreader.New()
	bigCSV.SetFilePath("testdata/example_products.csv")
	bigCSV.ColumnsCount = noOfColumns
	bigCSV.MaxGoroutinesNo = 16

	ctx, cancelCtx := context.WithCancel(context.Background())
	defer cancelCtx()
	var wg sync.WaitGroup

	// start multi-thread reading
	rowsChans, errsChan := bigCSV.Read(ctx)

	// process rows and errors:

	for i := 0; i < len(rowsChans); i++ {
		wg.Add(1)
		go rowWorker(rowsChans[i], &wg)
	}

	wg.Add(1)
	go errWorker(errsChan, &wg)

	wg.Wait()

}

func rowWorker(rowsChan bigcsvreader.RowsChan, waitGr *sync.WaitGroup) {
	for row := range rowsChan {
		processRow(row)
	}
	waitGr.Done()
}

func errWorker(errsChan bigcsvreader.ErrsChan, waitGr *sync.WaitGroup) {
	for err := range errsChan {
		handleError(err)
	}
	waitGr.Done()
}

// processRow can be used to implement business logic
// like validation / converting to a struct / persisting row into a storage.
func processRow(row []string) {
	id, _ := strconv.Atoi(row[columnProductID])
	price, _ := strconv.ParseFloat(row[columnProductPrice], 64)
	qty, _ := strconv.Atoi(row[columnProductQty])
	name := row[columnProductName]
	desc := row[columnProductDescription]

	product := Product{
		ID:    id,
		Name:  name,
		Desc:  desc,
		Price: price,
		Qty:   qty,
	}

	fmt.Printf("%+v\n", product)
}

// handleError handles the error.
// errors can be fatal like file does not exist, or row related like a given row could not be parsed, etc...
func handleError(err error) {
	fmt.Println(err)
}
Output:

{ID:1 Name:Apple iPhone 13 Desc:Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc eleifend felis quis magna auctor, ut lacinia eros efficitur. Maecenas mattis dolor a pharetra gravida. Aenean at eros sed metus posuere feugiat in vitae libero. Morbi a diam volutpat, tempor lacus sed, sagittis velit. Donec eget dignissim mauris, sed aliquam ex. Duis eros dolor, vestibulum ac aliquam eget, viverra in enim. Aenean ut turpis quis purus porta lobortis. Etiam sollicitudin lectus vitae velit tincidunt, ut volutpat justo aliquam. Aenean vitae vehicula arcu. Interdum et malesuada fames ac ante ipsum primis in faucibus. Nunc viverra enim nec risus mollis elementum nec dictum ex. Nunc lorem eros, vulputate a rutrum nec, scelerisque non augue. Sed in egestas eros. Quisque felis lorem, vehicula ac venenatis vel, tristique id sapien. Morbi vitae odio eget orci facilisis suscipit. Cras sodales, augue vitae tincidunt tempus, diam turpis volutpat est, vitae fringilla augue leo semper augue. Integer scelerisque tempor mauris, ac posuere sem aenean Price:1025.99 Qty:100}
{ID:2 Name:Samsung Galaxy S22 Desc:Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc eleifend felis quis magna auctor, ut lacinia eros efficitur. Maecenas mattis dolor a pharetra gravida. Aenean at eros sed metus posuere feugiat in vitae libero. Morbi a diam volutpat, tempor lacus sed, sagittis velit. Donec eget dignissim mauris, sed aliquam ex. Duis eros dolor, vestibulum ac aliquam eget, viverra in enim. Aenean ut turpis quis purus porta lobortis. Etiam sollicitudin lectus vitae velit tincidunt, ut volutpat justo aliquam. Aenean vitae vehicula arcu. Interdum et malesuada fames ac ante ipsum primis in faucibus. Nunc viverra enim nec risus mollis elementum nec dictum ex. Nunc lorem eros, vulputate a rutrum nec, scelerisque non augue. Sed in egestas eros. Quisque felis lorem, vehicula ac venenatis vel, tristique id sapien. Morbi vitae odio eget orci facilisis suscipit. Cras sodales, augue vitae tincidunt tempus, diam turpis volutpat est, vitae fringilla augue leo semper augue. Integer scelerisque tempor mauris, ac posuere sem aenean Price:400.99 Qty:12}
{ID:3 Name:Apple MacBook Air Desc:Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc eleifend felis quis magna auctor, ut lacinia eros efficitur. Maecenas mattis dolor a pharetra gravida. Aenean at eros sed metus posuere feugiat in vitae libero. Morbi a diam volutpat, tempor lacus sed, sagittis velit. Donec eget dignissim mauris, sed aliquam ex. Duis eros dolor, vestibulum ac aliquam eget, viverra in enim. Aenean ut turpis quis purus porta lobortis. Etiam sollicitudin lectus vitae velit tincidunt, ut volutpat justo aliquam. Aenean vitae vehicula arcu. Interdum et malesuada fames ac ante ipsum primis in faucibus. Nunc viverra enim nec risus mollis elementum nec dictum ex. Nunc lorem eros, vulputate a rutrum nec, scelerisque non augue. Sed in egestas eros. Quisque felis lorem, vehicula ac venenatis vel, tristique id sapien. Morbi vitae odio eget orci facilisis suscipit. Cras sodales, augue vitae tincidunt tempus, diam turpis volutpat est, vitae fringilla augue leo semper augue. Integer scelerisque tempor mauris, ac posuere sem aenean Price:700.99 Qty:34}
{ID:4 Name:Lenovo ThinkPad X1 Desc:Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc eleifend felis quis magna auctor, ut lacinia eros efficitur. Maecenas mattis dolor a pharetra gravida. Aenean at eros sed metus posuere feugiat in vitae libero. Morbi a diam volutpat, tempor lacus sed, sagittis velit. Donec eget dignissim mauris, sed aliquam ex. Duis eros dolor, vestibulum ac aliquam eget, viverra in enim. Aenean ut turpis quis purus porta lobortis. Etiam sollicitudin lectus vitae velit tincidunt, ut volutpat justo aliquam. Aenean vitae vehicula arcu. Interdum et malesuada fames ac ante ipsum primis in faucibus. Nunc viverra enim nec risus mollis elementum nec dictum ex. Nunc lorem eros, vulputate a rutrum nec, scelerisque non augue. Sed in egestas eros. Quisque felis lorem, vehicula ac venenatis vel, tristique id sapien. Morbi vitae odio eget orci facilisis suscipit. Cras sodales, augue vitae tincidunt tempus, diam turpis volutpat est, vitae fringilla augue leo semper augue. Integer scelerisque tempor mauris, ac posuere sem aenean Price:550.99 Qty:90}
{ID:5 Name:Logitech Mouse G203 Desc:Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nunc eleifend felis quis magna auctor, ut lacinia eros efficitur. Maecenas mattis dolor a pharetra gravida. Aenean at eros sed metus posuere feugiat in vitae libero. Morbi a diam volutpat, tempor lacus sed, sagittis velit. Donec eget dignissim mauris, sed aliquam ex. Duis eros dolor, vestibulum ac aliquam eget, viverra in enim. Aenean ut turpis quis purus porta lobortis. Etiam sollicitudin lectus vitae velit tincidunt, ut volutpat justo aliquam. Aenean vitae vehicula arcu. Interdum et malesuada fames ac ante ipsum primis in faucibus. Nunc viverra enim nec risus mollis elementum nec dictum ex. Nunc lorem eros, vulputate a rutrum nec, scelerisque non augue. Sed in egestas eros. Quisque felis lorem, vehicula ac venenatis vel, tristique id sapien. Morbi vitae odio eget orci facilisis suscipit. Cras sodales, augue vitae tincidunt tempus, diam turpis volutpat est, vitae fringilla augue leo semper augue. Integer scelerisque tempor mauris, ac posuere sem aenean Price:30.5 Qty:35}

func New

func New() *CsvReader

New instantiates a new CsvReader object with some default fields preset.

func (*CsvReader) Read

func (cr *CsvReader) Read(ctx context.Context) ([]RowsChan, ErrsChan)

Read extracts asynchronously CSV rows, each started goroutine putting them into a RowsChan. Error(s) occurred during parsing are sent through ErrsChan.

func (*CsvReader) SetFilePath

func (cr *CsvReader) SetFilePath(csvFilePath string)

SetFilePath sets the CSV file path.

type ErrsChan

type ErrsChan <-chan error

ErrsChan is the channel where error(s) will be pushed in case an error occurs during file read. Has a buffer of 256 entries. Some errors can be fatal, like file does not exist, some errors like rows parsing may occur for each affected row.

type RowsChan

type RowsChan <-chan []string

RowsChan is the channel where read rows will be pushed into. Has a buffer of 256 entries.

Directories

Path Synopsis
cmd
pprof
Package main contains an executable for profiling different strategies of reading a CSV.
Package main contains an executable for profiling different strategies of reading a CSV.
Package internal contains internal logic.
Package internal contains internal logic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL