iosupport

package module
v0.0.0-...-7a17b1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2018 License: MIT Imports: 15 Imported by: 0

README

iosupport

CircleCI GoDoc Go Report Card License

It provides some io supports for GoLang:

  • Read large files (line length and large amount of lines)
  • Parse CSV files according the RFC4180, but:
    • It does not support \r\n in quoted field
    • It does not support comment
  • Sort CSV on one or several columns

Usage

In order to start, go get this repository:

$ go get github.com/mdouchement/iosupport
Example
  • Scanner & wc
package main

import(
  "os"

  "github.com/mdouchement/iosupport"
)

func main() {
  // With local filesystem
  file, _ := os.Open("my_file.txt")
  defer file.Close()

  // Or with HDFS "github.com/colinmarc/hdfs"
  // client, _ := hdfs.New("localhost:9000")
  // file, _ := client.Open("/iris.csv")

  // See scanner.go for more examples
  sc := iosupport.NewScanner(file)
  sc.EachString(func(line string, err error) {
    check(err)
    println(line)
  })

  // See wc.go for more examples
  wc := iosupport.NewWordCount(file)
  wc.Perform()
  println(wc.Chars)
  println(wc.Words)
  println(wc.Lines)
}

func check(err error) {
  if err != nil {
    panic(err)
  }
}
  • TSV sort
package main

import(
  "os"

  "github.com/mdouchement/iosupport"
)

func main() {
  sc := func() *iosupport.Scanner {
    file, _ := os.Open("iris.csv")
    // Or with HDFS "github.com/colinmarc/hdfs"
    // client, _ := hdfs.New("localhost:9000")
    // file, _ := client.Open("/iris.csv")
    return iosupport.NewScanner(file)
  }

  // See tsv_indexer.go for more examples
  indexer = iosupport.NewTsvIndexer(sc, iosupport.HasHeader(), iosupport.Separator(","), iosupport.Fields("col2", "col1")) // scanner, headerIsPresent, separator, fieldsForSorting
  defer indexer.CloseIO()
  err := indexer.Analyze() // creates lines index
  check(err)
  indexer.Sort() // sorts indexed lines
  ofile, _ := os.Open("my_sorted.tsv")
  defer ofile.Close()
  indexer.Transfer(ofile) // transfers the input TSV in sorted output TSV
}

func check(err error) {
  if err != nil {
    panic(err)
  }
}

Tests

  • Installation
$ go get github.com/onsi/ginkgo/ginkgo
$ go get github.com/onsi/gomega
$ go get github.com/golang/mock/gomock

_ Run tests

# One shot
$ ginko

# With watch
$ ginkgo watch
  • Generate package test file
$ ginkgo bootstrap # set up a new ginkgo suite
$ ginkgo generate my_file.go # will create a sample test file.  edit this file and add your tests then...
  • Benchmarks
$ go test -run=NONE -bench=ParseFields
  • Generate mocks
# go get github.com/golang/mock/mockgen
$ mockgen -package=iosupport_test -source=storage_service.go -destination=storage_service_mock_test.go

License

MIT

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Documentation

Overview

Package iosupport reads and writes large text files. It can count lines, bytes, characters and words contained in the give file and it can parse and sort CSV files.

There are many kinds of CSV files; this package supports the format described in RFC 4180 except newlines in quoted fields.

A CSV file contains zero or more records of one or more fields per record. Each record is separated by the newline character. The final record may optionally be followed by a newline character.

field1,field2,field3

White space is considered part of a field.

Carriage returns before newline characters are silently removed.

Fields which start and stop with the quote character " are called quoted-fields. The beginning and ending quote are not part of the field.

The source:

normal string,"quoted-field"

results in the fields

{`normal string`, `quoted-field`}

Within a quoted-field a quote character followed by a second quote character is considered a single quote.

"the ""word"" is true","a ""quoted-field"""

results in

{`the "word" is true`, `a "quoted-field"`}

Index

Constants

View Source
const COMPARABLE_SEPARATOR = "\u0000"

COMPARABLE_SEPARATOR defines the separator added between each indexed fields.

Variables

View Source
var (
	// LF -> linefeed
	LF byte = '\n'
	// CR -> carriage return
	CR byte = '\r'
)
View Source
var (
	// ErrBareQuote -> bare \" in non-quoted-field
	ErrBareQuote = errors.New("bare \" in non-quoted-field")
	// ErrQuote -> extraneous \" in field
	ErrQuote = errors.New("extraneous \" in field")
)
View Source
var CompareFunc = func(i, j TsvLine) bool {
	return i.Comparable < j.Comparable
}
View Source
var GetMemoryUsage = func() *HeapMemStat {
	initOnce.Do(initializeMemoryTracking)

	responseChannel := make(chan []TimedMemStats)
	proxyStatsRequestChannel <- responseChannel
	memUsages := TimedMemStatsToHeapMemStats(<-responseChannel)
	l := len(memUsages)
	if l == 0 {
		return &HeapMemStat{}
	} else {
		return memUsages[l-1]
	}
}

GetMemoryUsage returns the latest read memory usage.

Functions

func CountWords

func CountWords(str string) int

CountWords counts all words in the given string.

func TrackMemoryStatistics

func TrackMemoryStatistics(bufferSize int, sampleIntervalMs int64, memStatsRequestChannel <-chan chan []TimedMemStats, quitChannel chan bool)

TrackMemoryStatistics keeps track of runtime.MemStats. Sample memory statistics every [sampleIntervalMs] milliseconds, keeping the last [bufferSize] samples. Returns a request/response channel to get recent memory statistics and a quit channel to stop polling. Parameters:

bufferSize: 				number of samples to keep
sampleIntervalMs: 			number of milliseconds to wait between polling
memStatsRequestChannel:		request/reply channel to report back the most recent N samples
quitChannel: 				once closed, all polling stops

func TrimNewline

func TrimNewline(line []byte) []byte

TrimNewline removes newline characters at the end of line.

func UnescapeSeparator

func UnescapeSeparator(separator string) byte

UnescapeSeparator cleans composed separator like `\t'.

Types

type FileReader

type FileReader interface {
	ReadAt(b []byte, off int64) (n int, err error)
	Seek(offset int64, whence int) (ret int64, err error)
	Name() string
	Close() error

	io.Reader
}

FileReader is an interface for supported File by Scanner.

type FileWriter

type FileWriter interface {
	Close() error

	io.Writer
}

FileWriter is an interface for supported File by Scanner.

type HDDStorageService

type HDDStorageService struct {
	// contains filtered or unexported fields
}

A HDDStorageService allows to reads and writes blobs on filesystem.

func NewHDDStorageService

func NewHDDStorageService(basepath string) *HDDStorageService

NewHDDStorageService instanciates a new HDDStorageService.

func (*HDDStorageService) EraseAll

func (s *HDDStorageService) EraseAll() error

EraseAll removes all stored data.

func (*HDDStorageService) Marshal

func (s *HDDStorageService) Marshal(key string, v interface{}) error

Marshal writes the blob encoding of v to the given key.

func (*HDDStorageService) Unmarshal

func (s *HDDStorageService) Unmarshal(key string, v interface{}) error

Unmarshal parses the blob-encoded data and stores the result in the value pointed to by v.

type HeapMemStat

type HeapMemStat struct {
	TimeMsAgo      int64 // ms ago (negative)
	SysKb          uint64
	HeapSysKb      uint64
	HeapAllocKb    uint64
	HeapIdleKb     uint64
	HeapReleasedKb uint64
}

HeapMemStat represents a snapshot of the heap memory at a given time.

func TimedMemStatsToHeapMemStats

func TimedMemStatsToHeapMemStats(timedMemStats []TimedMemStats) []*HeapMemStat

TimedMemStatsToHeapMemStats converts the input slice of TimedMemStats to a slice of HeapMemStats

type LineIterator

type LineIterator interface {
	// Next returns true if an next element is found.
	Next() bool
	// Value returns the current TsvLine.
	Value() TsvLine
	// Error allows to check if an error has occurred.
	Error() error
}

A LineIterator allows to iterate across TSV lines structure.

type MemStorageService

type MemStorageService struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

A MemStorageService allows to reads and writes blobs in memory.

func NewMemStorageService

func NewMemStorageService() *MemStorageService

NewMemStorageService instanciates a new MemStorageService.

func (*MemStorageService) EraseAll

func (s *MemStorageService) EraseAll() error

EraseAll removes all stored data.

func (*MemStorageService) Marshal

func (s *MemStorageService) Marshal(key string, v interface{}) error

Marshal writes the blob encoding of v to the given key.

func (*MemStorageService) Unmarshal

func (s *MemStorageService) Unmarshal(key string, v interface{}) error

Unmarshal parses the blob-encoded data and stores the result in the value pointed to by v.

type Option

type Option func(*Options)

Option is a function used in the Functional Options pattern.

func DropEmptyIndexedFields

func DropEmptyIndexedFields() Option

DropEmptyIndexedFields removes the lines where the comparable is empty.

func Fields

func Fields(fields ...string) Option

Fields on which the TSV can be sorted.

func HasHeader

func HasHeader() Option

HasHeader is present.

func Header(header bool) Option

Header is present or not.

func LazyQuotesMode

func LazyQuotesMode() Option

LazyQuotesMode allows lazy quotes in CSV.

func LineThreshold

func LineThreshold(threshold int) Option

LineThreshold defines the number of file's seekers. One seeker per LineThreshold. The number of seekers increase the Transfer speed during sort. default: 2500000

func Separator

func Separator(separator string) Option

Separator of the TSV.

func SkipMalformattedLines

func SkipMalformattedLines() Option

SkipMalformattedLines ignores mal-formatted lines.

func SwapperOpts

func SwapperOpts(limit uint64, basepath string) Option

SwapperOpts defines the memory swapper of the TSV indexer. The number of seekers increase the Transfer speed during sort.

type Options

type Options struct {
	Header                 bool
	Separator              byte
	Fields                 []string
	DropEmptyIndexedFields bool
	SkipMalformattedLines  bool
	LineThreshold          int
	Swapper                *Swapper
	LazyQuotes             bool
}

Options contains information for TSV interations.

type ParseError

type ParseError struct {
	Line   int   // Line where the error occurred
	Column int   // Column (rune index) where the error occurred
	Err    error // The actual error
}

A ParseError is returned for parsing errors. The first line is 1. The first column is 0.

func (*ParseError) Error

func (e *ParseError) Error() string

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

Scanner contains all stuff for reading a buffered file.

func NewScanner

func NewScanner(f FileReader) *Scanner

NewScanner instanciates a Scanner

func (*Scanner) Bytes

func (s *Scanner) Bytes() []byte

Bytes returns the most recent line generated by a call to Scan. The underlying array may point to data that will be overwritten by a subsequent call to Scan. It does no allocation.

func (*Scanner) EachLine

func (s *Scanner) EachLine(fn func([]byte, error))

EachLine iterate on each line and execute the given function.

func (*Scanner) EachString

func (s *Scanner) EachString(fn func(string, error))

EachString iterates on each line as string format and execute the given function.

func (*Scanner) Err

func (s *Scanner) Err() error

Err returns the first non-EOF error that was encountered by the Scanner.

func (*Scanner) IsLineEmpty

func (s *Scanner) IsLineEmpty() bool

IsLineEmpty says if the current line is empty (only when newline character is not keeped).

func (*Scanner) KeepNewlineSequence

func (s *Scanner) KeepNewlineSequence(b bool)

KeepNewlineSequence keeps the newline sequence in read lines.

func (*Scanner) Limit

func (s *Scanner) Limit() uint32

Limit return the byte length of the current line including newline sequence.

func (*Scanner) Line

func (s *Scanner) Line() int

Line return the index of the current line.

func (*Scanner) NewlineSequence

func (s *Scanner) NewlineSequence() []byte

NewlineSequence returns the found line terminators sequence in the file when newlines are keeped

func (*Scanner) Offset

func (s *Scanner) Offset() uint64

Offset return the byte offset of the current line.

func (*Scanner) ReadAt

func (s *Scanner) ReadAt(offset int64, limit int) ([]byte, error)

ReadAt reads len(b) bytes from the Scanner starting at byte offset off. It returns the number of bytes read and the error, if any. ReadAt always returns a non-nil error when n < len(b). At end of scanner, that error is io.EOF.

func (*Scanner) Reset

func (s *Scanner) Reset()

Reset seek to top of file and clean buffer.

func (*Scanner) ScanLine

func (s *Scanner) ScanLine() bool

ScanLine advances the Scanner to the next line), which will then be available through the Bytes or Text method. It returns false when the scan stops, either by reaching the end of the input or an error. After Scan returns false, the Err method will return any error that occurred during scanning, except that if it was io.EOF, Err will return nil.

func (*Scanner) Text

func (s *Scanner) Text() string

Text returns the most recent line generated by a call to Scan as a newly allocated string holding its bytes.

type StorageService

type StorageService interface {
	// Marshal writes the blob encoding of v to the given key.
	Marshal(key string, v interface{}) error
	// Unmarshal parses the blob-encoded data and stores the result in the value pointed to by v.
	Unmarshal(key string, v interface{}) error
	// EraseAll removes all stored data.
	EraseAll() error
}

A StorageService allows to store blobs

type Swapper

type Swapper struct {

	// Storage is the handler that stores all the dumps.
	Storage StorageService
	// Chunksize is the number of elements per chunk within a dump.
	ChunkSize func(nbOfElements int) int
	// contains filtered or unexported fields
}

A Swapper dumps to filesystem the current index when memory limit is reached.

func NewNullSwapper

func NewNullSwapper() *Swapper

NewNullSwapper inatanciates a new Swapper without memory limit.

func NewSwapper

func NewSwapper(limit uint64, basepath string) *Swapper

NewSwapper inatanciates a new Swapper with a memory limit in bytes.

func (*Swapper) EraseAll

func (s *Swapper) EraseAll() error

EraseAll removes all stored data.

func (*Swapper) HasSwapped

func (s *Swapper) HasSwapped() bool

HasSwapped returns true if there is at least one dump.

func (*Swapper) IsTimeToSwap

func (s *Swapper) IsTimeToSwap(elements TsvLines) bool

IsTimeToSwap returns true when the memory limit is reached.

func (*Swapper) KeepWithoutSwap

func (s *Swapper) KeepWithoutSwap(elements TsvLines)

KeepWithoutSwap is used to track TsvLines from the caller and provide an in memory read iterator. Use this function instead of Swap when you do not want to dump memory.

func (*Swapper) NbOfDumps

func (s *Swapper) NbOfDumps() int

NbOfDumps returns the number of processed dumps.

func (*Swapper) ReadIterator

func (s *Swapper) ReadIterator() LineIterator

ReadIterator returns an iterator on stored dumps.

func (*Swapper) Swap

func (s *Swapper) Swap(elements TsvLines) error

Swap dumps the given TsvLines to the configured StorageService.

type TimedMemStats

type TimedMemStats struct {
	// the unix time (seconds) the stats were polled
	TimeEpochMs int64
	// memory statistics
	MemStats runtime.MemStats
}

TimedMemStats represents a MemStats reading with the time it was taken

type TsvIndexer

type TsvIndexer struct {
	*Options

	FieldsIndex map[string]int
	Lines       TsvLines
	// contains filtered or unexported fields
}

TsvIndexer contains all stuff for indexing and sorting columns from a TSV.

func NewTsvIndexer

func NewTsvIndexer(scannerFunc func() *Scanner, setters ...Option) *TsvIndexer

NewTsvIndexer instanciates a new TsvIndexer.

func (*TsvIndexer) Analyze

func (ti *TsvIndexer) Analyze() error

Analyze parses the TSV and generates the indexes.

func (*TsvIndexer) CloseIO

func (ti *TsvIndexer) CloseIO()

CloseIO closes all opened IO.

func (*TsvIndexer) Sort

func (ti *TsvIndexer) Sort()

Sort sorts TsvLine on its comparables.

func (*TsvIndexer) Transfer

func (ti *TsvIndexer) Transfer(output FileWriter) error

Transfer writes sorted TSV into a new file.

type TsvLine

type TsvLine struct {
	Comparable string
	Offset     uint64
	Limit      uint32
}

TsvLine describes the line's details from a TSV.

func (*TsvLine) CodecDecodeSelf

func (x *TsvLine) CodecDecodeSelf(d *codec1978.Decoder)

func (*TsvLine) CodecEncodeSelf

func (x *TsvLine) CodecEncodeSelf(e *codec1978.Encoder)

type TsvLines

type TsvLines []TsvLine

TsvLine describes the line's details from a TSV.

func (*TsvLines) CodecDecodeSelf

func (x *TsvLines) CodecDecodeSelf(d *codec1978.Decoder)

func (TsvLines) CodecEncodeSelf

func (x TsvLines) CodecEncodeSelf(e *codec1978.Encoder)

func (TsvLines) Len

func (slice TsvLines) Len() int

func (TsvLines) Less

func (slice TsvLines) Less(i, j int) bool

func (TsvLines) Swap

func (slice TsvLines) Swap(i, j int)

type TsvParser

type TsvParser struct {
	*Scanner

	Separator  byte
	QuoteChar  byte
	LazyQuotes bool // allow lazy quotes
	// contains filtered or unexported fields
}

A TsvParser reads records from a TSV-encoded file. As returned by NewTsvParser, a TsvParser expects input conforming to RFC 4180 (except the warning section).

If LazyQuotes is true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field.

/!\ Warning:

- It does not support `\r\n' in quoted field.

- It does not support comment.

func NewTsvParser

func NewTsvParser(sc *Scanner, separator byte) *TsvParser

NewTsvParser inatanciates a new TsvParser.

func (*TsvParser) Err

func (tp *TsvParser) Err() error

Err returns the first non-EOF error that was encountered by the Scanner.

func (*TsvParser) Reset

func (tp *TsvParser) Reset()

Reset resets parser and its underliying scanner. It freeing the memory.

func (*TsvParser) Row

func (tp *TsvParser) Row() [][]byte

Row returns a slice of fields for the current row.

func (*TsvParser) ScanRow

func (tp *TsvParser) ScanRow() bool

ScanRow advances the TSV parser to the next row.

func (*TsvParser) SyncConfig

func (tp *TsvParser) SyncConfig()

SyncConfig synchronizes the internal configuration to the Separator and QuoteChar attributes.

type WordCount

type WordCount struct {
	Opts  *WordCountOptions // Defines what is counted
	Bytes int               // Bytes counter
	Chars int               // Chars counter
	Words int               // Words counter
	Lines int               // Lines counter
	// contains filtered or unexported fields
}

A WordCount counts bytes, chars, words and lines from a given file.

func NewWordCount

func NewWordCount(f FileReader) *WordCount

NewWordCount instanciates a new WordCount for the given file.

func (*WordCount) Perform

func (wc *WordCount) Perform() error

Perform starts the count

type WordCountOptions

type WordCountOptions struct {
	CountByte bool
	CountChar bool
	CountWord bool
	CountLine bool
}

WordCountOptions lets you define which thing you want to count.

func NewWordCountOptions

func NewWordCountOptions() *WordCountOptions

NewWordCountOptions instanciates a new WordCountOptions

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL