Documentation ¶
Overview ¶
Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database.
This implementation is almost 5x faster than the original (when the original is translated to postgres from sqlite, however with sqlite they are still on par), however this is probably due more to doing batch updates than the concurrency aspects.
The major speed inhibiters are going to be how fast we can read from disk and how fast we can write to Postgres. Where this implementation shines is if you need to query other data sources for your ETL and expanding the structure in a uniform way. Or if the data you are pulling is coming from a network source that can send at speed and your writes are to a data store you can write at speed (for example, super fast distributed data stores). This is a little overkill for this tiny ETL file.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRequest ¶
NewRequest returns a new stagedpipe.Request object for use in the Pipelines. By convention we always have NewRequest() function. If NewRequest can return an error, we also include a MustNewRequest().
Types ¶
type Data ¶
type Data struct { // Tx is the transaction to use for the database update. Tx pgx.Tx // TxMutext protects Transaction writes. TxMutex *sync.Mutex // Rows is the input transformed into a []Row representation. Rows []Row }
Data is the data that is passed through the pipeline.
type Row ¶
type Row struct { Time time.Time `db:"time"` ViolDTTM string `csv:"violdttm" db:"-"` Business string `csv:"businessname" db:"business_name"` LicStatus string `csv:"licstatus" db:"license_status"` Result string `csv:"result" db:"result"` ViolDesc string `csv:"violdesc" db:"description"` ViolStatus string `csv:"violstatus" db:"status"` ViolLevel string `csv:"viollevel" db:"-"` Comments string `csv:"comments" db:"comments"` Address string `csv:"address" db:"address"` City string `csv:"city" db:"city"` Zip string `csv:"zip" db:"zip"` Level int `db:"level"` }
Row represents a row in the input CSV file. Differences from original:
Renamed some fields to Go standard, aka Violdttm became ViolDTTM Re-ordered struct to get a minor space savings (that in this case won't probably matter)
func (Row) DeString ¶
DeString converts certain Row fields such as ViolDTTM to Time and VioLevel into Level.
func (Row) InsertQuery ¶
InsertQuery generates the query and arguments needed to insert this Row into the database.
type SM ¶
type SM struct{}
SM implements stagedpipe.StateMachine. It holds all our states for the pipeline.
func NewSM ¶
NewSM creates a new stagepipe.StateMachine from SM. db is used to create a transaction.
func (*SM) Close ¶
func (s *SM) Close()
Close implements stagedpipe.StateMachine.Close(). It shuts down resources in the StateMachine that are no longer needed. This is only safe after all entries have been processed.
func (*SM) Start ¶
func (s *SM) Start(req stagedpipe.Request[Data]) stagedpipe.Request[Data]
Start is the entry point (first stage) of the pipeline. In this case we simply send converts Row fields that are represented as strings into more concrete types such as int and time.Time.
func (*SM) WriteDB ¶
func (s *SM) WriteDB(req stagedpipe.Request[Data]) stagedpipe.Request[Data]
WriteDB takes the rows in the data and writes it via our transaction to the database.