etl

package
v0.0.0-...-0afe5ed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database.

This implementation is almost 5x faster than the original (when the original is translated to postgres from sqlite, however with sqlite they are still on par), however this is probably due more to doing batch updates than the concurrency aspects.

The major speed inhibiters are going to be how fast we can read from disk and how fast we can write to Postgres. Where this implementation shines is if you need to query other data sources for your ETL and expanding the structure in a uniform way. Or if the data you are pulling is coming from a network source that can send at speed and your writes are to a data store you can write at speed (for example, super fast distributed data stores). This is a little overkill for this tiny ETL file.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRequest

func NewRequest(ctx context.Context, data Data) stagedpipe.Request[Data]

NewRequest returns a new stagedpipe.Request object for use in the Pipelines. By convention we always have NewRequest() function. If NewRequest can return an error, we also include a MustNewRequest().

Types

type Data

type Data struct {
	// Tx is the transaction to use for the database update.
	Tx pgx.Tx
	// TxMutext protects Transaction writes.
	TxMutex *sync.Mutex
	// Rows is the input transformed into a []Row representation.
	Rows []Row
}

Data is the data that is passed through the pipeline.

type Row

type Row struct {
	Time       time.Time `db:"time"`
	ViolDTTM   string    `csv:"violdttm" db:"-"`
	Business   string    `csv:"businessname" db:"business_name"`
	LicStatus  string    `csv:"licstatus" db:"license_status"`
	Result     string    `csv:"result" db:"result"`
	ViolDesc   string    `csv:"violdesc" db:"description"`
	ViolStatus string    `csv:"violstatus" db:"status"`
	ViolLevel  string    `csv:"viollevel" db:"-"`
	Comments   string    `csv:"comments" db:"comments"`
	Address    string    `csv:"address" db:"address"`
	City       string    `csv:"city" db:"city"`
	Zip        string    `csv:"zip" db:"zip"`
	Level      int       `db:"level"`
}

Row represents a row in the input CSV file. Differences from original:

Renamed some fields to Go standard, aka Violdttm became ViolDTTM
Re-ordered struct to get a minor space savings (that in this case won't probably matter)

func (Row) DeString

func (r Row) DeString() (Row, error)

DeString converts certain Row fields such as ViolDTTM to Time and VioLevel into Level.

func (Row) InsertQuery

func (r Row) InsertQuery() (query string, args []any)

InsertQuery generates the query and arguments needed to insert this Row into the database.

type SM

type SM struct{}

SM implements stagedpipe.StateMachine. It holds all our states for the pipeline.

func NewSM

func NewSM() (*SM, error)

NewSM creates a new stagepipe.StateMachine from SM. db is used to create a transaction.

func (*SM) Close

func (s *SM) Close()

Close implements stagedpipe.StateMachine.Close(). It shuts down resources in the StateMachine that are no longer needed. This is only safe after all entries have been processed.

func (*SM) Start

func (s *SM) Start(req stagedpipe.Request[Data]) stagedpipe.Request[Data]

Start is the entry point (first stage) of the pipeline. In this case we simply send converts Row fields that are represented as strings into more concrete types such as int and time.Time.

func (*SM) WriteDB

func (s *SM) WriteDB(req stagedpipe.Request[Data]) stagedpipe.Request[Data]

WriteDB takes the rows in the data and writes it via our transaction to the database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL