Skip to content

Commit

Permalink
use gtfs query
Browse files Browse the repository at this point in the history
  • Loading branch information
hamima-halim committed May 6, 2024
1 parent ee4b72e commit 5b3de86
Showing 1 changed file with 50 additions and 16 deletions.
66 changes: 50 additions & 16 deletions mbta-performance/chalicelib/lamp/ingest.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from datetime import date
import io
from typing import Tuple
import requests
import os
import pandas as pd
import requests
from typing import Tuple

from .constants import LAMP_COLUMNS, S3_COLUMNS
from ..date import format_dateint, get_current_service_date
from mbta_gtfs_sqlite import MbtaGtfsArchive
from mbta_gtfs_sqlite.models import (
StopTime,
)
from sqlalchemy import or_

from .. import parallel
from .. import s3

Expand All @@ -19,6 +22,7 @@
S3_BUCKET = "tm-mbta-performance"
# month and day are not zero-padded
S3_KEY_TEMPLATE = "Events-lamp/daily-data/{stop_id}/Year={YYYY}/Month={_M}/Day={_D}/events.csv"
TEMP_GTFS_LOCAL_PREFIX = ".temp/gtfs-feeds/"

COLUMN_RENAME_MAP = {
"headway_trunk_seconds": "headway_seconds",
Expand Down Expand Up @@ -73,16 +77,6 @@ def _process_arrival_departure_times(pq_df: pd.DataFrame) -> pd.DataFrame:
dep_df = pq_df[pq_df["dep_time"].notna()]
dep_df = dep_df.assign(event_type="DEP").rename(columns={"dep_time": "event_time"})

# calculate gtfs traveltimes
mbta_gtfs = MbtaGtfsArchive()
service_date = get_current_service_date()
feed = mbta_gtfs.get_feed_for_date(service_date)
feed.download_or_build()
session = feed.create_sqlite_session()
stop_times = session.query(StopTime).filter(StopTime.trip_id == arr_df["trip_id"]).all()
trip_start_times = stop_times.arrival_time.transform("min")
dep_df["scheduled_tt"] = (stop_times.arrival_time - trip_start_times).dt.seconds

# these departures are from the the previous stop! so set them to the previous stop id
# find the stop id for the departure whose sequence number precences the recorded one
# stop sequences don't necessarily increment by 1 or with a reliable pattern
Expand Down Expand Up @@ -133,7 +127,43 @@ def fetch_pq_file_from_remote(service_date: date) -> pd.DataFrame:
)


def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:
def _recalculate_fields_from_gtfs(pq_df: pd.DataFrame, service_date: date):
# calculate gtfs traveltimes
mbta_gtfs = MbtaGtfsArchive(TEMP_GTFS_LOCAL_PREFIX)
feed = mbta_gtfs.get_feed_for_date(service_date)
feed.download_or_build()
session = feed.create_sqlite_session()
gtfs_stops = pd.read_sql(
session.query(
StopTime.trip_id,
StopTime.stop_id,
StopTime.arrival_time,
# func.min(StopTime.arrival_time).label("trip_start_time")
)
.filter(
or_(StopTime.trip_id == tid for tid in pq_df["trip_id"].unique())
# ).group_by(StopTime.trip_id).statement,
)
.statement,
session.bind,
)
# we could do this groupby/min/merge in sql, but let's keep our computations in
# pandas to stay consistent across services
trip_start_times = gtfs_stops.groupby("trip_id").arrival_time.transform("min")
gtfs_stops["scheduled_tt"] = gtfs_stops.arrival_time - trip_start_times
# merged
# TODO check, hamima: can one conceivably return to a stop_id multiple times in a trip?
augmented_events = pd.merge(
pq_df,
gtfs_stops[["trip_id", "stop_id", "scheduled_tt"]],
how="left",
on=["trip_id", "stop_id"],
suffixes=["", "_gtfs"],
)
return augmented_events


def ingest_pq_file(pq_df: pd.DataFrame, service_date: date) -> pd.DataFrame:
"""Process and tranform columns for the full day's events."""
pq_df["direction_id"] = pq_df["direction_id"].astype("int16")
pq_df["service_date"] = pq_df["service_date"].apply(format_dateint)
Expand All @@ -144,6 +174,8 @@ def ingest_pq_file(pq_df: pd.DataFrame) -> pd.DataFrame:
pq_df = pq_df[~pq_df["trip_id"].str.startswith(TRIP_IDS_TO_DROP)]

processed_daily_events = _process_arrival_departure_times(pq_df)
processed_daily_events = _recalculate_fields_from_gtfs(processed_daily_events, service_date)

return processed_daily_events.sort_values(by=["event_time"])


Expand All @@ -160,8 +192,8 @@ def upload_to_s3(stop_id_and_events: Tuple[str, pd.DataFrame], service_date: dat

# Upload to s3 as csv
s3_key = S3_KEY_TEMPLATE.format(stop_id=stop_id, YYYY=service_date.year, _M=service_date.month, _D=service_date.day)
# _local_save(s3_key, stop_events)
s3.upload_df_as_csv(S3_BUCKET, s3_key, stop_events)
_local_save(s3_key, stop_events)
# s3.upload_df_as_csv(S3_BUCKET, s3_key, stop_events)
return [stop_id]


Expand All @@ -177,12 +209,14 @@ def ingest_today_lamp_data():
# If we can't fetch the file, we can't process it
print(e)
return
processed_daily_events = ingest_pq_file(pq_df)
processed_daily_events = ingest_pq_file(pq_df, service_date)

# split daily events by stop_id and parallel upload to s3
stop_event_groups = processed_daily_events.groupby("stop_id")
_parallel_upload(stop_event_groups, service_date)


if __name__ == "__main__":
if not os.path.exists(os.path.dirname(TEMP_GTFS_LOCAL_PREFIX)):
os.makedirs(TEMP_GTFS_LOCAL_PREFIX)
ingest_today_lamp_data()

0 comments on commit 5b3de86

Please sign in to comment.