Taxi Trips Study - Advanced Pipeline¶
This notebook analyzes taxi trips with multiple enrichments. Similar to [3]. The extras are simply that we leverages "custom_function" from the enricher module to compute more from the Taxis dataset. See further below.
Data Sources¶
- Yellow NYC Taxis 2015: Sample taxi trip data for NYC.
In [ ]:
Copied!
import urban_mapper as um
data = (
um.UrbanMapper()
.loader
.from_huggingface("oscur/taxisvis1M")
.with_columns(longitude_column="pickup_longitude", latitude_column="pickup_latitude")
.load()
)
data['pickup_longitude'] = data['pickup_longitude'].astype(float)
data['pickup_latitude'] = data['pickup_latitude'].astype(float)
data['dropoff_longitude'] = data['dropoff_longitude'].astype(float)
data['dropoff_latitude'] = data['dropoff_latitude'].astype(float)
data.to_csv("./taxisvis1M.csv")
import urban_mapper as um
data = (
um.UrbanMapper()
.loader
.from_huggingface("oscur/taxisvis1M")
.with_columns(longitude_column="pickup_longitude", latitude_column="pickup_latitude")
.load()
)
data['pickup_longitude'] = data['pickup_longitude'].astype(float)
data['pickup_latitude'] = data['pickup_latitude'].astype(float)
data['dropoff_longitude'] = data['dropoff_longitude'].astype(float)
data['dropoff_latitude'] = data['dropoff_latitude'].astype(float)
data.to_csv("./taxisvis1M.csv")
In [ ]:
Copied!
#####################################################################################
# ⚠️ INFORMATION ABOUT THE CURRENT CELL ⚠️
# The following shows custom aggregation functions
# used later on in the pipeline
#####################################################################################
import pandas as pd
def most_common_payment(series):
if series.empty:
return None
mode = series.mode()
return mode.iloc[0] if not mode.empty else None
def peak_pickup_hour(series):
if series.empty:
return None
if not pd.api.types.is_datetime64_any_dtype(series):
try:
series = pd.to_datetime(series)
except Exception as e:
raise ValueError(f"Could not convert series to datetime: {e}")
hours = series.dt.hour
mode = hours.mode()
return mode.iloc[0] if not mode.empty else None
def peak_dropoff_hour(series):
if series.empty:
return None
if not pd.api.types.is_datetime64_any_dtype(series):
try:
series = pd.to_datetime(series)
except Exception as e:
raise ValueError(f"Could not convert series to datetime: {e}")
hours = series.dt.hour
mode = hours.mode()
return mode.iloc[0] if not mode.empty else None
def average_trip_distance(series):
return series.mean() if not series.empty else 0
def average_fare_amount(series):
return series.mean() if not series.empty else 0
#####################################################################################
# ⚠️ INFORMATION ABOUT THE CURRENT CELL ⚠️
# The following shows custom aggregation functions
# used later on in the pipeline
#####################################################################################
import pandas as pd
def most_common_payment(series):
if series.empty:
return None
mode = series.mode()
return mode.iloc[0] if not mode.empty else None
def peak_pickup_hour(series):
if series.empty:
return None
if not pd.api.types.is_datetime64_any_dtype(series):
try:
series = pd.to_datetime(series)
except Exception as e:
raise ValueError(f"Could not convert series to datetime: {e}")
hours = series.dt.hour
mode = hours.mode()
return mode.iloc[0] if not mode.empty else None
def peak_dropoff_hour(series):
if series.empty:
return None
if not pd.api.types.is_datetime64_any_dtype(series):
try:
series = pd.to_datetime(series)
except Exception as e:
raise ValueError(f"Could not convert series to datetime: {e}")
hours = series.dt.hour
mode = hours.mode()
return mode.iloc[0] if not mode.empty else None
def average_trip_distance(series):
return series.mean() if not series.empty else 0
def average_fare_amount(series):
return series.mean() if not series.empty else 0
In [ ]:
Copied!
#####################################################################################
# ⚠️ INFORMAITON ABOUT THE CURRENT CELL ⚠️
# Some data wrangling are necessary due to the raw data being not
# computable enough hence the "manual" load to create a pre-processed
# version of the dataset
#####################################################################################
from urban_mapper import CSVLoader
# Define the payment type mapping
# Dictionary found @ https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
payment_type_mapping = {
1: 'Credit card',
2: 'Cash',
3: 'No charge',
4: 'Dispute',
5: 'Unknown',
6: 'Voided trip'
}
# Load the parquet file
file_path = "./taxisvis1M.csv"
df = CSVLoader(file_path, "pickup_latitude", "pickup_longitude")._load_data_from_file()
# Apply the mapping to the payment_type column
df['payment_type'] = df['payment_type'].map(payment_type_mapping)
# Optional: Verify the preprocessing
print("Sample of preprocessed payment_type:")
print(df['payment_type'].head())
df.to_parquet("./taxisvis1M_preprocessed.parquet")
#####################################################################################
# ⚠️ INFORMAITON ABOUT THE CURRENT CELL ⚠️
# Some data wrangling are necessary due to the raw data being not
# computable enough hence the "manual" load to create a pre-processed
# version of the dataset
#####################################################################################
from urban_mapper import CSVLoader
# Define the payment type mapping
# Dictionary found @ https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
payment_type_mapping = {
1: 'Credit card',
2: 'Cash',
3: 'No charge',
4: 'Dispute',
5: 'Unknown',
6: 'Voided trip'
}
# Load the parquet file
file_path = "./taxisvis1M.csv"
df = CSVLoader(file_path, "pickup_latitude", "pickup_longitude")._load_data_from_file()
# Apply the mapping to the payment_type column
df['payment_type'] = df['payment_type'].map(payment_type_mapping)
# Optional: Verify the preprocessing
print("Sample of preprocessed payment_type:")
print(df['payment_type'].head())
df.to_parquet("./taxisvis1M_preprocessed.parquet")
In [ ]:
Copied!
import urban_mapper as um
from urban_mapper.pipeline import UrbanPipeline
# Initialise UrbanMapper
mapper = um.UrbanMapper()
# Build urban layer for street segments
urban_layer = (
mapper.urban_layer
.with_type("streets_roads")
.from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
.with_mapping(
longitude_column="pickup_longitude",
latitude_column="pickup_latitude",
output_column="pickup_segment"
)
.with_mapping(
longitude_column="dropoff_longitude",
latitude_column="dropoff_latitude",
output_column="dropoff_segment"
)
.build()
)
# Build loader with datetime parsing
loader = (
mapper.loader
.from_file("./taxisvis1M_preprocessed.parquet")
.with_columns(longitude_column="pickup_longitude", latitude_column="pickup_latitude")
.build()
)
# Build imputers for pickup and dropoff coordinates
imputer_pickup = (
mapper.imputer
.with_type("SimpleGeoImputer")
.on_columns("pickup_longitude", "pickup_latitude")
.build()
)
imputer_dropoff = (
mapper.imputer
.with_type("SimpleGeoImputer")
.on_columns("dropoff_longitude", "dropoff_latitude")
.build()
)
# Build filter for bounding box
filter_step = mapper.filter.with_type("BoundingBoxFilter").build()
# Build enrichers
enrich_pickup_count = (
mapper.enricher
.with_data(group_by="pickup_segment")
.count_by(output_column="pickup_count")
.build()
)
enrich_dropoff_count = (
mapper.enricher
.with_data(group_by="dropoff_segment")
.count_by(output_column="dropoff_count")
.build()
)
enrich_avg_fare = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="fare_amount")
.aggregate_by(method=average_fare_amount, output_column="avg_fare")
.build()
)
enrich_avg_distance = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="trip_distance")
.aggregate_by(method=average_trip_distance, output_column="avg_distance")
.build()
)
enrich_most_common_payment = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="payment_type")
.aggregate_by(method=most_common_payment, output_column="most_common_payment")
.build()
)
enrich_peak_pickup_hour = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="tpep_pickup_datetime")
.aggregate_by(method=peak_pickup_hour, output_column="peak_pickup_hour")
.build()
)
enrich_peak_dropoff_hour = (
mapper.enricher
.with_data(group_by="dropoff_segment", values_from="tpep_dropoff_datetime")
.aggregate_by(method=peak_dropoff_hour, output_column="peak_dropoff_hour")
.build()
)
# Build interactive visualiser with dark theme
visualiser = (
mapper.visual
.with_type("Interactive")
.with_style({"tiles": "CartoDB dark_matter", "colorbar_text_color": "white"})
.build()
)
# Create the pipeline
pipeline = UrbanPipeline([
("urban_layer", urban_layer),
("loader", loader),
("impute_pickup", imputer_pickup),
("impute_dropoff", imputer_dropoff),
("filter", filter_step),
("enrich_pickup_count", enrich_pickup_count),
("enrich_dropoff_count", enrich_dropoff_count),
("enrich_avg_fare", enrich_avg_fare),
("enrich_avg_distance", enrich_avg_distance),
("enrich_most_common_payment", enrich_most_common_payment),
("enrich_peak_pickup_hour", enrich_peak_pickup_hour),
("enrich_peak_dropoff_hour", enrich_peak_dropoff_hour),
("visualiser", visualiser)
])
import urban_mapper as um
from urban_mapper.pipeline import UrbanPipeline
# Initialise UrbanMapper
mapper = um.UrbanMapper()
# Build urban layer for street segments
urban_layer = (
mapper.urban_layer
.with_type("streets_roads")
.from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
.with_mapping(
longitude_column="pickup_longitude",
latitude_column="pickup_latitude",
output_column="pickup_segment"
)
.with_mapping(
longitude_column="dropoff_longitude",
latitude_column="dropoff_latitude",
output_column="dropoff_segment"
)
.build()
)
# Build loader with datetime parsing
loader = (
mapper.loader
.from_file("./taxisvis1M_preprocessed.parquet")
.with_columns(longitude_column="pickup_longitude", latitude_column="pickup_latitude")
.build()
)
# Build imputers for pickup and dropoff coordinates
imputer_pickup = (
mapper.imputer
.with_type("SimpleGeoImputer")
.on_columns("pickup_longitude", "pickup_latitude")
.build()
)
imputer_dropoff = (
mapper.imputer
.with_type("SimpleGeoImputer")
.on_columns("dropoff_longitude", "dropoff_latitude")
.build()
)
# Build filter for bounding box
filter_step = mapper.filter.with_type("BoundingBoxFilter").build()
# Build enrichers
enrich_pickup_count = (
mapper.enricher
.with_data(group_by="pickup_segment")
.count_by(output_column="pickup_count")
.build()
)
enrich_dropoff_count = (
mapper.enricher
.with_data(group_by="dropoff_segment")
.count_by(output_column="dropoff_count")
.build()
)
enrich_avg_fare = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="fare_amount")
.aggregate_by(method=average_fare_amount, output_column="avg_fare")
.build()
)
enrich_avg_distance = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="trip_distance")
.aggregate_by(method=average_trip_distance, output_column="avg_distance")
.build()
)
enrich_most_common_payment = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="payment_type")
.aggregate_by(method=most_common_payment, output_column="most_common_payment")
.build()
)
enrich_peak_pickup_hour = (
mapper.enricher
.with_data(group_by="pickup_segment", values_from="tpep_pickup_datetime")
.aggregate_by(method=peak_pickup_hour, output_column="peak_pickup_hour")
.build()
)
enrich_peak_dropoff_hour = (
mapper.enricher
.with_data(group_by="dropoff_segment", values_from="tpep_dropoff_datetime")
.aggregate_by(method=peak_dropoff_hour, output_column="peak_dropoff_hour")
.build()
)
# Build interactive visualiser with dark theme
visualiser = (
mapper.visual
.with_type("Interactive")
.with_style({"tiles": "CartoDB dark_matter", "colorbar_text_color": "white"})
.build()
)
# Create the pipeline
pipeline = UrbanPipeline([
("urban_layer", urban_layer),
("loader", loader),
("impute_pickup", imputer_pickup),
("impute_dropoff", imputer_dropoff),
("filter", filter_step),
("enrich_pickup_count", enrich_pickup_count),
("enrich_dropoff_count", enrich_dropoff_count),
("enrich_avg_fare", enrich_avg_fare),
("enrich_avg_distance", enrich_avg_distance),
("enrich_most_common_payment", enrich_most_common_payment),
("enrich_peak_pickup_hour", enrich_peak_pickup_hour),
("enrich_peak_dropoff_hour", enrich_peak_dropoff_hour),
("visualiser", visualiser)
])
In [ ]:
Copied!
# Execute the pipeline
mapped_data, enriched_layer = pipeline.compose_transform()
# Execute the pipeline
mapped_data, enriched_layer = pipeline.compose_transform()
In [ ]:
Copied!
# Visualise the enriched metrics
fig = pipeline.visualise([
"pickup_count", "dropoff_count", "avg_fare", "avg_distance",
"most_common_payment", "peak_pickup_hour", "peak_dropoff_hour"
])
fig
# Visualise the enriched metrics
fig = pipeline.visualise([
"pickup_count", "dropoff_count", "avg_fare", "avg_distance",
"most_common_payment", "peak_pickup_hour", "peak_dropoff_hour"
])
fig
In [ ]:
Copied!
# Save the pipeline
pipeline.save("./taxi_advanced_pipeline.dill")
# Save the pipeline
pipeline.save("./taxi_advanced_pipeline.dill")
In [ ]:
Copied!
# Export the pipeline to JupyterGIS for collaborative exploration
pipeline.to_jgis(
filepath="taxi_trips.JGIS",
urban_layer_name="Taxi Trips Pickup and Dropoff analysis",
raise_on_existing=False,
)
# Export the pipeline to JupyterGIS for collaborative exploration
pipeline.to_jgis(
filepath="taxi_trips.JGIS",
urban_layer_name="Taxi Trips Pickup and Dropoff analysis",
raise_on_existing=False,
)
In [ ]:
Copied!
In [ ]:
Copied!