Documentation ¶
Overview ¶
Package dstream provides facilities for manipulating streams of typed, multivariate data. A Dstream is a data container that holds a rectangular array of data in which the columns are variables and the rows are cases or observations. The dstream framework facilitates processing data of this type in a streaming manner, with a focus on feeding the data into statistical modeling tools.
The data held by a Dstream are retrieved as chunks of contiguous rows. Within each chunk, the data are stored column-wise. A Dstream visits its chunks in order. When processing a Dstream, call Next to advance to the next chunk, then call Get or GetPos to retrieve the data for one column.
Most operations on Dstreams take the form of a transformation d = f(d), where d is a Dstream. Many transformations are defined in the package, and it is easy to add new transformations. Transformations can be composed to produce complex transformations from simple components. Examples of transformations are Mutate (modify a column in-place) and DropCols (drop one or more columns from the Dstream).
Index ¶
- func CheckValid(data Dstream) bool
- func Describe(data Dstream) map[string]Stats
- func Equal(x, y Dstream) bool
- func EqualReport(x, y Dstream, report bool) bool
- func GetCol(da Dstream, na string) interface{}
- func GetColPos(da Dstream, j int) interface{}
- func NewBCols(path string, chunksize int) *bcols
- func Save(ds Dstream, w io.Writer)
- func VarMap(d Dstream) map[string]interface{}
- func VarPos(d Dstream) map[string]int
- func VarTypes(d Dstream) map[string]string
- type BColsWriter
- type CSVReader
- func (cs *CSVReader) ChunkSize(c int) *CSVReader
- func (cs *CSVReader) Close()
- func (cs *CSVReader) Comma(c rune) *CSVReader
- func (cs *CSVReader) Done() Dstream
- func (cs *CSVReader) Get(na string) interface{}
- func (cs *CSVReader) GetPos(j int) interface{}
- func (cs *CSVReader) HasHeader() *CSVReader
- func (cs *CSVReader) LimitChunk(n int) *CSVReader
- func (cs *CSVReader) Names() []string
- func (cs *CSVReader) Next() bool
- func (cs *CSVReader) NumObs() int
- func (cs *CSVReader) NumVar() int
- func (cs *CSVReader) ParseTime(f func(string) time.Time) *CSVReader
- func (cs *CSVReader) Reset()
- func (cs *CSVReader) SetTypes(types []VarType) *CSVReader
- func (cs *CSVReader) SkipErrors() *CSVReader
- type CSVWriter
- type DataFrame
- type Dstream
- func AddCol(da Dstream, newdat []float64, newname string) Dstream
- func Center(source Dstream, names ...string) Dstream
- func ConcatHorizontal(streams ...Dstream) Dstream
- func ConcatVertical(streams ...Dstream) Dstream
- func Convert(da Dstream, vname string, dtype Dtype) Dstream
- func Dechunk(source Dstream) Dstream
- func DiffChunk(data Dstream, order map[string]int) Dstream
- func DropCols(data Dstream, dropvars ...string) Dstream
- func DropNA(data Dstream) Dstream
- func Filter(data Dstream, f FilterFunc) Dstream
- func Generate(data Dstream, name string, fnc GenerateFunc, dtype Dtype) Dstream
- func LagChunk(data Dstream, lags map[string]int) Dstream
- func Linapply(data Dstream, coeffs [][]float64, basename string) Dstream
- func MaxChunkSize(data Dstream, size int) Dstream
- func MemCopy(data Dstream, reset bool) Dstream
- func Mutate(ds Dstream, name string, f MutateFunc) Dstream
- func NewFromArrays(data [][]interface{}, names []string) Dstream
- func NewFromFlat(data []interface{}, names []string) Dstream
- func NewLoad(r io.Reader) Dstream
- func Regroup(ds Dstream, groupvar string, sortchunks bool) Dstream
- func ReplaceColumn(data Dstream, name string, coldata interface{}) Dstream
- func Segment(data Dstream, vars ...string) Dstream
- func SelectCols(data Dstream, keepvars ...string) Dstream
- func Shallow(data Dstream) Dstream
- type Dtype
- type FilterFunc
- type GenerateFunc
- type Join
- type MutateFunc
- type Stats
- type VarType
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CheckValid ¶
CheckValid runs through the chunks and confirms that the lengths of the slices within the chunks are the same. If CheckValid returns false, the dstream is in a corrupted state. On completion, the dstream is in its initial state.
func EqualReport ¶
EqualReport compares two Dstream values. If they are not equal, further information is written to the standard error stream. Equality here implies that the data values, types, order, and chunk boundaries are all identical.
func GetCol ¶
GetCol returns a copy of the data for one variable. The data are returned as a slice. The column is returned starting with the current chunk, call Reset to ensure that the column is extracted from the first chunk.
func GetColPos ¶
GetColPos returns a copy of the data for one variable. The data are returned as a slice, which is a coy of the underlying data.
func NewBCols ¶
NewBCols takes data stored in a column-wise compressed format under the given directory path, and returns it as a Dstream. 'path' is the directory containing the data, 'chunksize' is the number of values included in each chunk, 'include' and 'exclude' are lists of variable names to include (exclude) respectively.
The underlying bcols format is simple. Each column of data is stored in its own file, in binary native format, compressed using either gzip or snappy compression. A file called 'dtypes.json' contains a dictionary mapping variable names to data types.
Types ¶
type BColsWriter ¶
type BColsWriter struct {
// contains filtered or unexported fields
}
BColsWriter writes a dstream to disk in bcols format.
func NewBColsWriter ¶
func NewBColsWriter(d Dstream) *BColsWriter
NewBColsWriter creates a new BColsWriter that writes the given dstream.
func (*BColsWriter) Path ¶
func (bw *BColsWriter) Path(p string) *BColsWriter
Path sets the location (a directory path) to which the data are written.
type CSVReader ¶
type CSVReader struct {
// contains filtered or unexported fields
}
CSVReader supports reading a Dstream from an io.Reader.
func FromCSV ¶
FromCSV returns a Dstream that reads from a CSV source. Call at least one SetXX method to define variables to be retrieved. For further configuration, chain calls to other SetXXX methods, and finally call Done to produce the Dstream.
Example ¶
data := `Food,Type,Weight,Price Banana,Fruit,13,9 Cucumber,Vegetable,15,5 Cheese,Dairy,12,35 Lamb,Meat,40,76 ` // Here we read from an in-memory byte buffer, // but this can be any io.Reader, e.g. a file. b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"Food", String}, {"Type", String}, {"Weight", Float64}, } da := FromCSV(b).SetTypes(types).HasHeader().Done() da.Next() // Always call Next before first call to Get or GetPos y := da.Get("Type").([]string) fmt.Printf("%v\n", y) x := da.Get("Weight").([]float64) fmt.Printf("%v\n", x)
Output: [Fruit Vegetable Dairy Meat] [13 15 12 40]
func (*CSVReader) ChunkSize ¶
ChunkSize sets the size of chunks for this Dstream, it can only be called before reading begins.
func (*CSVReader) Close ¶
func (cs *CSVReader) Close()
Close does nothing and is implemented to satisfy the Dstream interface. If any io.Reader values passed to FromCSV need closing, they should be closed by the caller.
func (*CSVReader) Comma ¶
Comma sets the delimiter (comma rune) for the CSVReader. By default, the comma rune is a comma.
func (*CSVReader) Done ¶
Done is called when all configuration is complete. After calling Done, the DStream can be used.
func (*CSVReader) HasHeader ¶
HasHeader indicates that the first row of the data file contains column names. The default behavior is that there is no header.
func (*CSVReader) LimitChunk ¶
LimitChunk sets the number of chunks to read.
func (*CSVReader) NumObs ¶
NumObs returns the number of observations in the dstream. If the dstream has not been fully read, returns -1.
func (*CSVReader) Reset ¶
func (cs *CSVReader) Reset()
Reset attempts to reset the Dstream that is reading from an io.Reader. This is only possible if the underlying reader is seekable, so reset panics if the seek cannot be performed.
func (*CSVReader) SetTypes ¶
SetTypes species the types of the variables. If the CSV file has a header, these values may appear in any order, and variables not included in types are omitted. If the CSV file does not have a header, then types must match the columns in the file, in the correct order.
func (*CSVReader) SkipErrors ¶
SkipErrors results in lines with unpareable CSV content being skipped (the csv.ParseError is printed to stdio).
type CSVWriter ¶
type CSVWriter struct {
// contains filtered or unexported fields
}
CSVWriter supports writing a Dstream to an io.Writer in csv format.
func ToCSV ¶
ToCSV writes a Dstream in CSV format. Call SetWriter or Filename to configure the underlying writer, then call additional methods for customization as desired, and finally call Done to complete the writing.
Example ¶
data := `Food,Type,Weight,Price Banana,Fruit,13,9 Cucumber,Vegetable,15,5 Cheese,Dairy,12,35 Lamb,Meat,40,76 ` b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"Food", String}, {"Type", String}, {"Weight", Float64}, } da := FromCSV(b).SetTypes(types).HasHeader().Done() var buf bytes.Buffer _ = ToCSV(da).SetWriter(&buf).FloatFmt("%.0f").Done() fmt.Printf("%s\n", buf.String())
Output: Food,Type,Weight Banana,Fruit,13 Cucumber,Vegetable,15 Cheese,Dairy,12 Lamb,Meat,40
func (*CSVWriter) FloatFmt ¶
FloatFmt sets the format string to be used when writing float values. This value is ignored for columns specified in a call to the Formats method.
type DataFrame ¶
type DataFrame struct {
// contains filtered or unexported fields
}
DataFrame is an implementation of Dstream based on sharded arrays.
func (*DataFrame) Next ¶
Next advances to the next chunk and returns true if successful. If there are no more chunks, it returns false.
func (*DataFrame) NumObs ¶
NumObs returns the number of observations in the DataFrame, if known. If the number of observations is not known, it returns -1.
type Dstream ¶
type Dstream interface { // Next attempts to advance to the next chunk and returns true // if successful. Next() bool // Names returns the variable names. Names() []string // Get returns the values for one variable in the current // chunk, referring to the variable by name Get(string) interface{} // Get returns the values for one variable for the current // chunk, referring to the variable by position. GetPos(int) interface{} // NumVar returns the number of variables in the data set. NumVar() int // NumObs returns the number of rows in the data set, it may // return -1 if not known. NumObs() int // Reset sets the provider so that the data are read from the // beginning of the dataset. Reset() // Close frees any resources suh as file handles used by the dstream. Close() }
Dstream streams chunks of data to a consumer.
func AddCol ¶
AddCol appends a new column of data to a Dstream. The new data is provided as a single array.
func Center ¶
Center returns a new Dstream in which the given columns have been mean-centered. Currently only works with float64 type data.
func ConcatHorizontal ¶
ConcatHorizontal concatenates a collection of Dstreams horizontally. The column names of all the Dstreams being combined must be distinct.
func ConcatVertical ¶
ConcatVertical concatenates a collection of Dstreams vertically (appending additional observations). The column names and data types of all the Dstreams being combined must be identical.
func Convert ¶
Convert returns a Dstream in which the named variable is converted to the given type.
func DiffChunk ¶
DiffChunk returns a new Dstream in which specified variables are differenced. The differenced values are only computed within a chunk, not across chunk boundaries, and the first value of each chunk is omitted.
Example ¶
data := `V1,V2,V3,V4 1,2,3,4 1,0,4,5 2,4,5,6 3,0,6,8 3,1,5,9 ` b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } da := FromCSV(b).SetTypes(types).HasHeader().Done() da = DiffChunk(da, map[string]int{"V2": 1, "V4": 2}) for da.Next() { y := da.Get("V2$d1") fmt.Printf("%v\n", y) y = da.Get("V4$d2") fmt.Printf("%v\n", y) }
Output: [4 -4 1] [0 1 -1]
func Filter ¶
func Filter(data Dstream, f FilterFunc) Dstream
Filter selects rows from the dstream. A filtering function determines which rows are selected.
Example ¶
data := `V1,V2,V3,V4 1,2,3,4 2,0,4,5 3,4,5,6 4,0,6,7 ` // A filtering function, selects if not equal to 0. f := func(v map[string]interface{}, b []bool) { x := v["V2"].([]float64) for i := range x { b[i] = x[i] != 0 } } types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } b := bytes.NewBuffer([]byte(data)) da := FromCSV(b).SetTypes(types).HasHeader().Done() da = Filter(da, f) da.Next() // Always call Next before first call to Get or GetPos y := da.Get("V1") fmt.Printf("%v\n", y)
Output: [1 3]
func Generate ¶
func Generate(data Dstream, name string, fnc GenerateFunc, dtype Dtype) Dstream
Generate appends a new variable to a Dstream, obtaining its values by applying the given function to the other variables in the Dstream. The new variable must not already exist in the Dstream.
Example ¶
data := `V1,V2,V3,V4 1,2,3,4 1,0,4,5 2,4,5,6 3,0,6,7 ` f := func(v map[string]interface{}, x interface{}) { v1 := v["V1"].([]float64) v2 := v["V2"].([]float64) y := x.([]float64) for i := range v1 { y[i] = v1[i] + v2[i] } } b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } da := FromCSV(b).SetTypes(types).HasHeader().Done() da = Generate(da, "V1p2", f, Float64) for da.Next() { y := da.Get("V1p2") fmt.Printf("%v\n", y) }
Output: [3 1 6 3]
func LagChunk ¶
LagChunk returns a new Dstream in which specified variables are included with lagged values. Lagged values are only computed within a chunk, not across chunk boundaries, and the first m values of each chunk are omitted, where m is the maximum lag value.
Example ¶
data := `1,2,3,4 2,3,4,5 3,4,5,6 4,5,6,7 ` b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } da := FromCSV(b).SetTypes(types).Done() da = LagChunk(da, map[string]int{"V2": 2}) da.Next() // Always call Next before first call to Get or GetPos y := da.Get("V2[0]") fmt.Printf("%v\n", y) y = da.Get("V2[-1]") fmt.Printf("%v\n", y)
Output: [4 5] [3 4]
func Linapply ¶
Linapply adds new variables to Dstream by taking linear combinations of the other variables in the Dstream.
func MaxChunkSize ¶
MaxChunkSize splits the chunks of the input Dstream so that no chunk has more than size rows.
func MemCopy ¶
MemCopy returns a Dstream that copies the provided Dstream into in-memory storage. Pass reset as true to reset the source data before copying
func Mutate ¶
func Mutate(ds Dstream, name string, f MutateFunc) Dstream
Mutate returns a Dstream in which the variable with the given name is transformed using the given function.
Example ¶
data := `V1,V2,V3,V4 1,2,3,4 2,3,4,5 3,4,5,6 4,5,6,7 ` // A mutating function, scales all values by 2. timesTwo := func(x interface{}) { v := x.([]float64) for i := range v { v[i] *= 2 } } types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } b := bytes.NewBuffer([]byte(data)) da := FromCSV(b).SetTypes(types).HasHeader().Done() da = Mutate(da, "V2", timesTwo) da.Next() // Always call Next before first call to Get or GetPos y := da.Get("V2") fmt.Printf("%v\n", y)
Output: [4 6 8 10]
func NewFromArrays ¶
NewFromArrays creates a Dstream from raw data stored as slices; data[i][j] is the data for the i^th variable in the j^th chunk.
func NewFromFlat ¶
NewFromFlat creates a Dstream from raw data stored as contiguous (flat) arrays. data[i] is the data for the i^th variable, it is a slice of fixed-width values, e.g. a []float64.
func NewLoad ¶
NewLoad returns a dstream that loads data from the given file. The file must have been created using the Save function.
func Regroup ¶
Regroup creates a new Dstream from the provided Dstream having identical rows, but with the chunks defined by the values of a provided id variable. The resulting Dstream will have a chunk for each distinct level of the id variable, containing all the rows of the input Dstream with the given id value. The id variable must have uint64 type.
Example ¶
data := `V1,V2,V3 1,2,3 3,3,4 2,4,5 2,5,6 5,2,3 0,3,4 1,4,5 5,5,6 ` b := bytes.NewBuffer([]byte(data)) types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, } d := FromCSV(b).SetTypes(types).HasHeader().Done() d = Convert(d, "V1", Uint64) d = Regroup(d, "V1", true) for d.Next() { fmt.Printf("%v\n", d.GetPos(0)) fmt.Printf("%v\n", d.GetPos(1)) fmt.Printf("%v\n\n", d.GetPos(2)) }
Output: [0] [3] [4] [1 1] [2 4] [3 5] [2 2] [4 5] [5 6] [3] [3] [4] [5 5] [2 5] [3 6]
func ReplaceColumn ¶
ReplaceColumn returns a new Dstream in which the column with the given name is replaced with the given data. The col value must be an array type of a valid primitive type (e.g. int, float64, string), and must have length equal to the number of rows of data.
func Segment ¶
Segment restructures the chunks of a Dstream so that chunk boundaries are determined by any change in the consecutive values of a specified set of variables.
Example ¶
data := `V1,V2,V3,V4 1,2,3,4 1,0,4,5 2,4,5,6 3,0,6,7 ` types := []VarType{ {"V1", Float64}, {"V2", Float64}, {"V3", Float64}, {"V4", Float64}, } b := bytes.NewBuffer([]byte(data)) da := FromCSV(b).SetTypes(types).HasHeader().Done() da = Segment(da, "V1") for da.Next() { y := da.Get("V2") fmt.Printf("%v\n", y) }
Output: [2 0] [4] [0]
func SelectCols ¶
SelectCols retains only the given variables in a Dstream.
type FilterFunc ¶
FilterFunc is a filtering function for use with Filter. The first argument holds a map from variable names to data slices. The second argument is a boolean slice that is initialized to all 'true'. The FilterFunc should set elements of the boolean slice to false wherever the corresponding dstream record should be excluded.
type GenerateFunc ¶
type GenerateFunc func(map[string]interface{}, interface{})
GenerateFunc is a function that can be used to generate a new variable from existing variables. The first argument is a map from variable names to data (whose concrete types are slices held as empty interfaces). The second argument is a pre-allocated array (a slice provided as an interface{}) into which the new variable's values are to be written. Note that the destination array is not set to zeros before passing to the generate function.
type Join ¶
type Join struct { // A sequence of segmented Dstreams to advance in unison. Data []Dstream // Status[j] means that the id variable for Data value j is // equal to the id variable for Data value 0. Status[0] is // not used. Status []bool // contains filtered or unexported fields }
Join performs a streaming join on several Dstreams that have been segmented by id variables. If join has type Join, then join.Data[i] is the current chunk of the i^th stream. All streams being joined must have been segmented by an id variable whose values are ascending. The id variable must have type uint64.
A call to the Next method always advances the first stream (Data[0]) by one chunk. The other elements of Data are advanced until their id variable is equal to (if possible) or greater than the id variable of Data[0]. If equality is achieved, the corresponding element of join.status is set to true. join.Status[0] is always false and has no meaning.
The dstream values to be joined must be segmented so that the id variable is constant within chunks, and increases in numeric value with subsequent calls to the Next method.
Example ¶
data1 := `V1,V2,V3,V4 1,2,3,4 1,3,4,5 3,4,5,6 3,5,6,7 ` data2 := `V1,V2,V3 1,2,3 1,3,4 1,4,5 3,5,6 ` data3 := `V1,V2,V3,V4,V5 2,2,3,5,6 2,3,4,7,5 3,4,5,3,4 4,5,6,2,3 ` names := [][]string{{"V1", "V2", "V3", "V4"}, {"V1", "V2", "V3"}, {"V1", "V2", "V3", "V4", "V5"}} var da []Dstream for j, data := range []string{data1, data2, data3} { b := bytes.NewBuffer([]byte(data)) var types []VarType for _, na := range names[j] { types = append(types, VarType{na, Float64}) } d := FromCSV(b).SetTypes(types).HasHeader().Done() d = Convert(d, "V1", Uint64) d = Segment(d, "V1") da = append(da, d) } join := NewJoin(da, []string{"V1", "V1", "V1"}) for join.Next() { fmt.Printf("%v\n", da[0].GetPos(0)) if join.Status[1] { fmt.Printf("%v\n", da[1].GetPos(0)) } if join.Status[2] { fmt.Printf("%v\n\n", da[2].GetPos(0)) } }
Output: [1 1] [1 1 1] [3 3] [3] [3]
func NewJoin ¶
NewJoin creates a Join of the given Dstreams, using the variable names in names as ids. The Dstreams in data must be segmented by the inames variables before calling NewJoin.
func (*Join) Next ¶
Next advances to the next chunk. The first dstream, which is contained in join.Data[0], always advances to the next sequential value of its id variable. The other dstreams (join.Data[j] for j > 0) advance until their id variables are equal to or greater than the id variable for the current chunk of join.Data[0]. The status field (join.status) indicates which dstreams in the join are currently on the same id value as the first dstream (join.Data[0]).
type MutateFunc ¶
type MutateFunc func(interface{})
MutateFunc is a function that can be used to change the values of a variable in-place.
type Stats ¶
type Stats struct { // The mean value Mean float64 // The minimum value Min float64 // The maximum value Max float64 // The standard deviation of the values SD float64 // The number of non inf/nan values N int // The number of Nan values NaN int // The number of Inf values Inf int }
Stats contains summary statistics for a float64 Dstream variable.
Source Files ¶
- addcol.go
- base.go
- bcols.go
- bcols_gen.go
- center.go
- comparisons.go
- comparisons_gen.go
- concathorizontal.go
- concatvertical.go
- convert.go
- convert_gen.go
- dechunk_gen.go
- describe.go
- diffchunk.go
- doc.go
- drop.go
- dropna.go
- filtercol.go
- filtercol_gen.go
- generate.go
- generate_gen.go
- join.go
- lagchunk.go
- linapply.go
- loadsave_gen.go
- memcopy_gen.go
- mutate.go
- regroup.go
- regroup_gen.go
- replace.go
- replace_gen.go
- segment.go
- segment_gen.go
- select_cols.go
- sizechunk.go
- streamcsv.go
- streamcsv_gen.go
- utils.go
- utils_gen.go
- xform.go