logic

package
v0.0.0-...-6501577 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2018 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package logic provides the business logic for Trimetric. It handles interacting with the database and processing data.

Index

Constants

View Source
const (
	TripUpdatesTopic      = "trip_updates"
	VehiclePositionsTopic = "vehicle_positions"
)

Kafka topics

Variables

This section is empty.

Functions

func ConsumeKafkaTopic

func ConsumeKafkaTopic(ctx context.Context, c ConsumerFunc, topic string, addrs []string) error

ConsumeKafkaTopic reads from a Kafka partition and writes the messages into a ConsumerFunc.

func PollGTFSData

func PollGTFSData(ctx context.Context, ld LoaderDataset, redisPool *redis.Pool, dur time.Duration)

PollGTFSData makes periodic queries to fetch static GTFS data from Trimet. It stores a timestamp of the last query in Redis and will only download if it has been more than 24 hours since the last download.

func ProduceTripUpdates

func ProduceTripUpdates(ctx context.Context, baseURL string, apiKey string, p Producer) error

ProduceTripUpdates makes requests to the Trimet API and sends the results to a Producer.

func ProduceVehiclePositions

func ProduceVehiclePositions(ctx context.Context, p Producer, baseURL, apiKey string, delay time.Duration) error

ProduceVehiclePositions makes requests to the Trimet API and passes the result to a Producer

Types

type Arrival

type Arrival struct {
	RouteID         string          `json:"route_id"`
	RouteShortName  string          `json:"route_short_name"`
	RouteLongName   string          `json:"route_long_name"`
	RouteType       int             `json:"route_type"`
	RouteColor      string          `json:"route_color"`
	RouteTextColor  string          `json:"route_text_color"`
	TripID          string          `json:"trip_id"`
	StopID          string          `json:"stop_id"`
	Headsign        string          `json:"headsign"`
	ArrivalTime     *trimet.Time    `json:"arrival_time"`
	DepartureTime   *trimet.Time    `json:"departure_time"`
	VehicleID       *string         `json:"vehicle_id"`
	VehicleLabel    *string         `json:"vehicle_label"`
	VehiclePosition trimet.Position `json:"vehicle_position"`
	Date            time.Time       `json:"date"`
}

Arrival ...

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, b []byte) error

ConsumerFunc defines a generic functioin type to read messages from a Producer

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

KafkaProducer provides a struct with methods that implement a Producer.

func NewKafkaProducer

func NewKafkaProducer(topic string, addrs []string) (*KafkaProducer, error)

NewKafkaProducer returns a new KafkaProducer

func (*KafkaProducer) Close

func (k *KafkaProducer) Close() error

Close wraps a sarama Close method to provide a generic method that satisifies the Producer interface.

func (*KafkaProducer) Produce

func (k *KafkaProducer) Produce(b []byte) error

Produce wraps a sarama Producer to provide a generic method that satisifies the Producer interface.

type LoaderDataset

type LoaderDataset interface {
	LoadGTFSData(baseURL string) error
}

LoaderDataset provides a method to bulk load static GTFS data.

type LoaderSQLDataset

type LoaderSQLDataset struct {
	DB *sql.DB
}

LoaderSQLDataset implements LoaderDataset for a SQL database.

func (*LoaderSQLDataset) LoadCalendarDates

func (ld *LoaderSQLDataset) LoadCalendarDates(tx *sql.Tx, c *trimet.CSV) error

LoadCalendarDates loads calendar_dates.txt.

func (*LoaderSQLDataset) LoadGTFSData

func (ld *LoaderSQLDataset) LoadGTFSData(baseURL string) error

LoadGTFSData downloads the lastes static GTFS data from Trimet, and updates the GTFS data in the database.

func (*LoaderSQLDataset) LoadRoutes

func (ld *LoaderSQLDataset) LoadRoutes(tx *sql.Tx, c *trimet.CSV) error

LoadRoutes loads routes.txt.

func (*LoaderSQLDataset) LoadServices

func (ld *LoaderSQLDataset) LoadServices(tx *sql.Tx, c *trimet.CSV) error

LoadServices populates the services table with the unique set of service_ids from the calendar_dates.txt file.

func (*LoaderSQLDataset) LoadShapes

func (ld *LoaderSQLDataset) LoadShapes(tx *sql.Tx, c *trimet.CSV) error

LoadShapes loads shapes.txt.

func (*LoaderSQLDataset) LoadStopTimes

func (ld *LoaderSQLDataset) LoadStopTimes(tx *sql.Tx, c *trimet.CSV) error

LoadStopTimes loads stop_times.txt.

func (*LoaderSQLDataset) LoadStops

func (ld *LoaderSQLDataset) LoadStops(tx *sql.Tx, c *trimet.CSV) error

LoadStops loads stops.txt.

func (*LoaderSQLDataset) LoadTrips

func (ld *LoaderSQLDataset) LoadTrips(tx *sql.Tx, c *trimet.CSV) error

LoadTrips loads trips.txt.

type Point

type Point struct {
	Lat          float64 `json:"lat"`
	Lng          float64 `json:"lng"`
	DistTraveled float64 `json:"dist_traveled"`
}

Point represents a single point along a route shape

type Producer

type Producer interface {
	Close() error
	Produce(b []byte) error
}

Producer implements methods to Produce Kafka messages

type RouteDataset

type RouteDataset interface {
	FetchRoutes() ([]trimet.Route, error)
}

RouteDataset provides methods to query and update a database table of Shapes

type RoutePoint

type RoutePoint struct {
	Lat float64 `json:"lat"`
	Lng float64 `json:"lng"`
}

RoutePoint represents a single point along a route

type RouteSQLDataset

type RouteSQLDataset struct {
	DB *sql.DB
}

RouteSQLDataset stores a DB instance and provides access to methods to retrieve and update shapes from the database

func (*RouteSQLDataset) FetchRoutes

func (sd *RouteSQLDataset) FetchRoutes() ([]trimet.Route, error)

FetchRoutes returns a slice of all routes in the database.

type RouteShape

type RouteShape struct {
	DirectionID int          `json:"direction_id"`
	RouteID     string       `json:"route_id"`
	Color       string       `json:"color"`
	Points      []RoutePoint `json:"points"`
}

RouteShape represents a complete shape line for a given RouteID

type Shape

type Shape struct {
	ID          string  `json:"id"`
	DirectionID int     `json:"direction_id"`
	RouteID     string  `json:"route_id"`
	Point       []Point `json:"point"`
}

Shape represents the shape line for a given route.

type ShapeDataset

type ShapeDataset interface {
	FetchRouteShapes() ([]*RouteShape, error)
	FetchShapes(routeIDS, shapeIDs []string) ([]Shape, error)
	FetchTripShapes(tripIDs []string) (map[string]*TripShape, error)
}

ShapeDataset provides methods to query and update a database table of Shapes

type ShapeSQLDataset

type ShapeSQLDataset struct {
	DB *sql.DB
}

ShapeSQLDataset stores a DB instance and provides access to methods to retrieve and update shapes from the database

func (*ShapeSQLDataset) FetchRouteShapes

func (sd *ShapeSQLDataset) FetchRouteShapes() ([]*RouteShape, error)

FetchRouteShapes returns all shapes for train routes and flattens them to reduce the amount of data.

func (*ShapeSQLDataset) FetchShapes

func (sd *ShapeSQLDataset) FetchShapes(routeIDs, shapeIDs []string) ([]Shape, error)

FetchShapes takes a slice of routes or shapeID's and returns an array of shapes.

func (*ShapeSQLDataset) FetchTripShapes

func (sd *ShapeSQLDataset) FetchTripShapes(tripIDs []string) (map[string]*TripShape, error)

FetchTripShapes takes a slice of trip ID's and returns a map that associates trip id's with all of their trip shapes. This is used client side to render a route line for a specific trip a vehicle is on.

type StopDataset

type StopDataset interface {
	FetchAllStops() ([]StopWithDistance, error)
	FetchWithinDistance(lat, lng, dist string) ([]StopWithDistance, error)
	FetchWithinBox(w, s, e, n string) ([]StopWithDistance, error)
	FetchArrivals(stopIDs []string) ([]Arrival, error)
}

StopDataset provides methods to query and update a database table of Stops

type StopSQLDataset

type StopSQLDataset struct {
	DB *sql.DB
}

StopSQLDataset stores a DB instance and provides access to methods to retrieve and update stops from the database

func (*StopSQLDataset) FetchAllStops

func (sd *StopSQLDataset) FetchAllStops() ([]StopWithDistance, error)

FetchAllStops ...

func (*StopSQLDataset) FetchArrivals

func (sd *StopSQLDataset) FetchArrivals(stopIDs []string) ([]Arrival, error)

FetchArrivals returns a list of arrivals by combining multiple data sets.

func (*StopSQLDataset) FetchWithinBox

func (sd *StopSQLDataset) FetchWithinBox(w, s, e, n string) ([]StopWithDistance, error)

FetchWithinBox ...

func (*StopSQLDataset) FetchWithinDistance

func (sd *StopSQLDataset) FetchWithinDistance(lat, lng, dist string) ([]StopWithDistance, error)

FetchWithinDistance takes a point (lat,lng) and finds all stops located within 'dist'. It uses the PostGIS extension to calculate the distance to stops stored in the DB.

type StopWithDistance

type StopWithDistance struct {
	trimet.Stop
	Distance float64 `json:"distance"`
}

StopWithDistance augments the Stop struct with a distance field. Distance is calculated via PostGIS and the result is used to provide a list of stops within a specified distance from a specific point.

type TripShape

type TripShape struct {
	RouteShape
	TripID string `json:"trip_id"`
}

TripShape adds a TripID to a RouteShape in order for it to be associated later in the client.

type TripUpdateSQLDataset

type TripUpdateSQLDataset struct {
	DB *sql.DB
}

TripUpdateSQLDataset wraps a DB instance that is used to store trip update data

func (*TripUpdateSQLDataset) FetchTripUpdates

func (tuds *TripUpdateSQLDataset) FetchTripUpdates() ([]trimet.TripUpdate, error)

FetchTripUpdates return Tripupdates from the DB

func (*TripUpdateSQLDataset) UpdateTripUpdateBytes

func (tuds *TripUpdateSQLDataset) UpdateTripUpdateBytes(ctx context.Context, b []byte) error

UpdateTripUpdateBytes reads bytes and updates the TripUpdates DB

func (*TripUpdateSQLDataset) UpdateTripUpdates

func (tuds *TripUpdateSQLDataset) UpdateTripUpdates(tus []trimet.TripUpdate) error

UpdateTripUpdates updates trip data in the db.

type TripUpdatesDataset

type TripUpdatesDataset interface {
	UpdateTripUpdates(tus []trimet.TripUpdate) error
	UpdateTripUpdateBytes(ctx context.Context, b []byte) error
	FetchTripUpdates() ([]trimet.TripUpdate, error)
}

TripUpdatesDataset provides methods to update and retrieve trip update data

type VehicleDataset

type VehicleDataset interface {
	FetchVehiclePositions(since int) ([]VehiclePositionWithRouteType, error)
	UpsertVehiclePosition(v *trimet.VehiclePosition) error
	UpsertVehiclePositionBytes(ctx context.Context, b []byte) error
}

VehicleDataset provides methods to update and retrieve vehicle data

type VehiclePositionWithRouteType

type VehiclePositionWithRouteType struct {
	trimet.VehiclePosition
	RouteType trimet.RouteType `json:"route_type" msg:"route_type"`
}

VehiclePositionWithRouteType adds routetype to identify the vehicle type

type VehicleSQLDataset

type VehicleSQLDataset struct {
	DB *sql.DB
}

VehicleSQLDataset wraps a DB instance that is used to store vehicle data

func (*VehicleSQLDataset) FetchVehiclePositions

func (vd *VehicleSQLDataset) FetchVehiclePositions(since int) ([]VehiclePositionWithRouteType, error)

FetchVehiclePositions makes a query against the DB and retrieves a list of vehicle data. If IDs are passed in, then vehicle data is restricted to those specific vehicle IDs. Otherwise, all vehicles with a non-expired timestamp are returned.

func (*VehicleSQLDataset) UpsertVehiclePosition

func (vd *VehicleSQLDataset) UpsertVehiclePosition(v *trimet.VehiclePosition) error

UpsertVehiclePosition updates/inserts a vehicle in the DB.

func (*VehicleSQLDataset) UpsertVehiclePositionBytes

func (vd *VehicleSQLDataset) UpsertVehiclePositionBytes(ctx context.Context, b []byte) error

UpsertVehiclePositionBytes writes decodes bytes into VehiclePositions and updates the DB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL