core: go.gazette.dev/core/examples/bike-share Index | Files | Directories

package bike_share

import "go.gazette.dev/core/examples/bike-share"

Package bike_share implements a Gazette consumer application which processes and serves streaming Citi Bike system data. It indexes a window of recent rides for each bike, serves a simple history API, and detects long graph cycles as they're completed by individual bikes.

Index

Package Files

api.go application.go sql_statements.go

Constants

const CreateTableStmt = "" /* 877 byte string literal not displayed */

CreateTableStmt bootstraps an embedded SQLite database with the "rides" table.

const InsertStmt = "" /* 354 byte string literal not displayed */

InsertStmt inserts a CSV bike-share input record into the "rides" table.

const QueryCycleStmt = "" /* 1622 byte string literal not displayed */

QueryCycleStmt returns a path taken by a bike ID ($1) of at least length $2, such that the path starts and ends at the bike's final station but does not visit it in between, and where the bike is not relocated between rides.

const QueryHistoryStmt = `
SELECT uuid, start_time, end_time, start_station_name, end_station_name
FROM rides WHERE bike_id = $1
`

QueryHistoryStmt retrieves the current window of rides of the given bike ($1). It powers the ServeBikeHistory API.

const WindowStmt = "" /* 175 byte string literal not displayed */

WindowStmt windows a bike ID ($1) to the $2 most-recent rides.

type Application Uses

type Application struct {
    runconsumer.BaseConfig
    DBAddr string `long:"postgres" description:"Database connection string" default:"host=/var/run/postgresql" required:"true"`
    // contains filtered or unexported fields
}

Application is a consumer framework application which finds cycles in bike-share data. It implements the consumer.Application interface as well as runconsumer.Application, which extends consumer.Application with configuration parsing and initialization.

func (*Application) ConsumeMessage Uses

func (app *Application) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

ConsumeMessage inserts a CSVRecord of bike-share ride data into the store and queries for a now-completed cycle of the record's bike ID. If matched, it publishes a Cycle event.

func (Application) FinalizeTxn Uses

func (Application) FinalizeTxn(consumer.Shard, consumer.Store, *message.Publisher) error

FinalizeTxn is a no-op for this Application. If the application kept in-memory-only updates while consuming messages, FinalizeTxn would be the time for it to flush them out.

func (*Application) InitApplication Uses

func (app *Application) InitApplication(args runconsumer.InitArgs) (err error)

InitApplication initializes dynamic mappings of bike-share data types to responsible partitions, opens the configured database, prepares DB statements for future use, and registers a simple bike-share history HTTP API.

func (*Application) NewConfig Uses

func (app *Application) NewConfig() runconsumer.Config

NewConfig returns the Config struct of our Application, parse-able with `github.com/jessevdk/go-flags`. In this case our Application's type is also its config.

func (Application) NewMessage Uses

func (Application) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns an instance of the appropriate message type for decoding from the given journal. For this use-case, we use the provided message.CSVRecord type.

func (*Application) NewStore Uses

func (app *Application) NewStore(_ consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore instantiates either a SQLStore using the remote DB or, if the Shard has a recovery log, an embedded store_sqlite.Store.

func (*Application) ServeBikeHistory Uses

func (app *Application) ServeBikeHistory(w http.ResponseWriter, r *http.Request)

ServeBikeHistory is an http.HandlerFunc which returns the most recent rides of a bike ID provided via URL parameter. Invoke as:

/api/bikes?id=12345

If the bike ID is served by a non-local shard, ServeBikeHistory will proxy to the appropriate peer.

type Cycle Uses

type Cycle struct {
    UUID   message.UUID
    BikeID int
    Steps  []CycleStep
}

Cycle describes a graph cycle completed by a bike.

func (*Cycle) GetUUID Uses

func (c *Cycle) GetUUID() message.UUID

GetUUID returns the Cycle's UUID.

func (*Cycle) NewAcknowledgement Uses

func (c *Cycle) NewAcknowledgement(pb.Journal) message.Message

NewAcknowledgement returns a new & zero-valued Cycle. message.Publisher uses it to build a message which acknowledge other Cycle messages previously published.

func (*Cycle) SetUUID Uses

func (c *Cycle) SetUUID(uuid message.UUID)

SetUUID sets the Cycle's UUID. It's called by message.Publisher.

type CycleStep Uses

type CycleStep struct {
    Time    time.Time
    Station string
}

CycleStep is a path step of a bike's graph cycle.

Directories

PathSynopsis
bike-share

Package bike_share imports 20 packages (graph) and is imported by 3 packages. Updated 2019-10-29. Refresh now. Tools for package owners.