morebeam: github.com/bramp/morebeam/csvio Index | Examples | Files

package csvio

import "github.com/bramp/morebeam/csvio"

Package csvio contains Apache Beam transforms for reading CSV files with https://github.com/gocarina/gocsv.

TODO: * Create a CSV Writer

Code:

// Example of using the csvio package.
package main

import (
    "context"
    "reflect"

    "bramp.net/morebeam/csvio"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "github.com/apache/beam/sdks/go/pkg/beam/x/debug"
)

// Painting represents a single record in the csv file.
type Painting struct {
    Artist  string `csv:"artist"`
    Title   string `csv:"title"`
    Year    int    `csv:"year"`
    NotUsed string `csv:"-"` // Ignored field
}

func extractFn(painting Painting) string {
    return painting.Artist
}

func main() {
    beam.Init()

    p, s := beam.NewPipelineWithRoot()

    // Read the CSV file.
    paintings := csvio.Read(s, "testdata/paintings.csv", reflect.TypeOf(Painting{}))

    // Extract just the artist's name.
    artists := beam.ParDo(s, extractFn, paintings)

    // Count the number of paintings by each artist.
    counts := stats.Count(s, artists)
    debug.Print(s, counts)

    ctx := context.Background()
    if err := beamx.Run(ctx, p); err != nil {
        log.Fatalf(ctx, "Failed to execute job: %v", err)
    }

}

Index

Examples

Package Files

csvio.go

func Read Uses

func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection

Read reads a set of CSV files and returns the lines as a PCollection<T>. T is defined by the reflect.TypeOf( YourType{} ) with csv tags as descripted by https://github.com/gocarina/gocsv

Package csvio imports 9 packages (graph). Updated 2018-12-16. Refresh now. Tools for package owners.