Urban Pipeline¶
In this notebook, we'll show how to go from step by step, that is notebook [1-6]
by bundling up all your workflow steps into one neat pipeline.
Data source used:
- PLUTO data from NYC Open Data. https://www.nyc.gov/content/planning/pages/resources/datasets/mappluto-pluto-change
Let’s get started! 🌟
import urban_mapper as um
from urban_mapper.pipeline import UrbanPipeline
mapper = um.UrbanMapper()
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[1], line 1 ----> 1 import urban_mapper as um 2 from urban_mapper.pipeline import UrbanPipeline 4 mapper = um.UrbanMapper() File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/__init__.py:3 1 from loguru import logger ----> 3 from .mixins import ( 4 LoaderMixin, 5 EnricherMixin, 6 VisualMixin, 7 TableVisMixin, 8 AuctusSearchMixin, 9 PipelineGeneratorMixin, 10 UrbanPipelineMixin, 11 ) 12 from .modules import ( 13 LoaderBase, 14 CSVLoader, (...) 30 PipelineGeneratorFactory, 31 ) 33 from .urban_mapper import UrbanMapper File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/mixins/__init__.py:1 ----> 1 from .loader import LoaderMixin 2 from .enricher import EnricherMixin 3 from .visual import VisualMixin File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/mixins/loader.py:1 ----> 1 from urban_mapper.modules.loader.loader_factory import LoaderFactory 4 class LoaderMixin(LoaderFactory): 5 def __init__(self): File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/__init__.py:1 ----> 1 from .loader import LoaderBase, CSVLoader, ShapefileLoader, ParquetLoader 2 from .imputer import ( 3 GeoImputerBase, 4 SimpleGeoImputer, 5 AddressGeoImputer, 6 ) 7 from .filter import ( 8 GeoFilterBase, 9 BoundingBoxFilter, 10 ) File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/__init__.py:3 1 from .abc_loader import LoaderBase 2 from .loaders import CSVLoader, ShapefileLoader, ParquetLoader ----> 3 from .loader_factory import LoaderFactory 5 __all__ = [ 6 "LoaderBase", 7 "CSVLoader", (...) 10 "LoaderFactory", 11 ] File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/loader_factory.py:19 17 from urban_mapper.modules.loader.loaders.csv_loader import CSVLoader 18 from urban_mapper.modules.loader.loaders.parquet_loader import ParquetLoader ---> 19 from urban_mapper.modules.loader.loaders.raster_loader import RasterLoader # Importing RasterLoader of the new raster loader module 20 from urban_mapper.modules.loader.loaders.shapefile_loader import ShapefileLoader 21 from urban_mapper.utils import require_attributes File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/loaders/raster_loader.py:2 1 from ..abc_loader import LoaderBase ----> 2 import rasterio 3 from typing import Any 4 import numpy as np ModuleNotFoundError: No module named 'rasterio'
# Note: For the documentation interactive mode, we only query 100000 records from the dataset. Feel free to remove for a more realistic analysis.
data = (
um.UrbanMapper()
.loader
.from_huggingface("oscur/pluto", number_of_rows=100000, streaming=True).with_columns("longitude", "latitude").load()
)
data['longitude'] = data['longitude'].astype(float)
data['latitude'] = data['latitude'].astype(float)
data.to_csv("./pluto.csv")
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[2], line 3 1 # Note: For the documentation interactive mode, we only query 100000 records from the dataset. Feel free to remove for a more realistic analysis. 2 data = ( ----> 3 um.UrbanMapper() 4 .loader 5 .from_huggingface("oscur/pluto", number_of_rows=100000, streaming=True).with_columns("longitude", "latitude").load() 6 ) 7 data['longitude'] = data['longitude'].astype(float) 8 data['latitude'] = data['latitude'].astype(float) NameError: name 'um' is not defined
What’s the UrbanPipeline
All About?¶
The UrbanPipeline
class is like the conductor of an orchestra –– for the ML enthusiasts, it is trying to mimic what Scikit-Learn does with the Scikit Pipeline –– —it brings together all the UrbanMapper steps (loading data, creating layers, imputing missing bits, filtering, enriching, and visualising) and makes them play in harmony. You define your steps, pop them into the pipeline, and it handles the rest. It’s brilliant for keeping your workflow tidy and repeatable; yet not only, also shareable and reusable!
Setting Up a Simple Pipeline¶
Let’s build a pipeline that does the following:
- Loads PLUTO data from a CSV file.
- Creates a street intersections layer for Manhattan.
- Imputes missing coordinates.
- Filters data to the layer’s bounding box.
- Enriches the layer with average building floors.
- Sets up an interactive map to visualise it all.
We’ll define each step and slot them into our pipeline.
urban_layer = (
mapper.urban_layer.with_type("streets_intersections")
.from_place("Manhattan, New York City, USA", network_type="drive")
# With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood.
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
output_column="nearest_intersection",
threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping.
)
.build()
)
loader = mapper.loader.from_file("./pluto.csv").with_columns("longitude", "latitude").build()
imputer = mapper.imputer.with_type("SimpleGeoImputer").on_columns("longitude", "latitude").build()
filter_step = mapper.filter.with_type("BoundingBoxFilter").build()
enricher = mapper.enricher.with_data(group_by="nearest_intersection", values_from="numfloors").aggregate_by(method="mean", output_column="avg_floors").build()
visualiser = mapper.visual.with_type("Interactive").with_style({"tiles": "CartoDB dark_matter", "colorbar_text_color": "white"}).build()
# Assemble the pipeline
# Note that a pipeline's step is a tuple with a name and the step itself.
# Later one when loading the pipeline, or sharing it, anyone can use `.get_step("step_name")` to get the step, preview it, re-run it, etc.
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("loader", loader),
("imputer", imputer),
("filter", filter_step),
("enricher", enricher),
("visualiser", visualiser)
]
)
# Note that we can do this in a more concise way, but we are showing the steps for clarity.
# The concise way would be looking alike this for only with urban layer:
# pipeline = UrbanPipeline([
# ("urban_layer", (
# mapper.urban_layer
# .with_type("streets_intersections")
# .from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
# .with_mapping(
# longitude_column="longitude",
# latitude_column="latitude",
# output_column="nearest_intersection",
# threshold_distance=50
# )
# .build()
# )),
# # Add the other steps here
# ])
# Let's preview our urban pipeline workflow
pipeline.preview()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[3], line 2 1 urban_layer = ( ----> 2 mapper.urban_layer.with_type("streets_intersections") 3 .from_place("Manhattan, New York City, USA", network_type="drive") 4 # With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood. 5 .with_mapping( 6 longitude_column="longitude", 7 latitude_column="latitude", 8 output_column="nearest_intersection", 9 threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping. 10 ) 11 .build() 12 ) 14 loader = mapper.loader.from_file("./pluto.csv").with_columns("longitude", "latitude").build() 15 imputer = mapper.imputer.with_type("SimpleGeoImputer").on_columns("longitude", "latitude").build() NameError: name 'mapper' is not defined
Running the Pipeline¶
Time to put it to work! We’ll use compose_transform
to run the entire pipeline in one go—loading, imputing, filtering, mapping, enriching, all sorted. Then, we’ll visualise the results with a snazzy interactive map.
Note however that we could do this in two steps, first calling compose()
and then transform()
, but we are showing the two steps in one for simplicity.
# Execute the pipeline
mapped_data, enriched_layer = pipeline.compose_transform()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[4], line 2 1 # Execute the pipeline ----> 2 mapped_data, enriched_layer = pipeline.compose_transform() NameError: name 'pipeline' is not defined
# Show the results
fig = pipeline.visualise(result_columns=["avg_floors"])
# result_columns is basically the columns that will be displayed in the map.
# If you want to display only one column, you can pass a string as well.
fig # Displays an interactive map in your notebook
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[5], line 2 1 # Show the results ----> 2 fig = pipeline.visualise(result_columns=["avg_floors"]) 3 # result_columns is basically the columns that will be displayed in the map. 4 # If you want to display only one column, you can pass a string as well. 6 fig # Displays an interactive map in your notebook NameError: name 'pipeline' is not defined
Saving and Loading Your Pipeline¶
You can save your pipeline to a file and load it back later (that means every you would need the pipeline, it won't need to redo its entire workflow as saved). Here, we’ll save it, load it, and ensure it’s ready for further use.
pipeline.save("./my_pipeline.dill")
loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill")
loaded_pipeline.preview()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[6], line 1 ----> 1 pipeline.save("./my_pipeline.dill") 3 loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill") 5 loaded_pipeline.preview() NameError: name 'pipeline' is not defined
Accessing the Enriched Layer for Machine Learning Analysis¶
Now, let’s retrieve the enriched urban layer using .get_layer()
for machine learning analysis.
enriched_layer = loaded_pipeline.get_step("urban_layer")
enriched_gdf = enriched_layer.get_layer()
enriched_gdf.head()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[7], line 1 ----> 1 enriched_layer = loaded_pipeline.get_step("urban_layer") 3 enriched_gdf = enriched_layer.get_layer() 5 enriched_gdf.head() NameError: name 'loaded_pipeline' is not defined
Load many different datasets in the same pipeline¶
You can load many datasets. All the provided datasets should have the same columns provided in with_data
, aggregate_by
, etc.
The static visualizer looks into the enriched data with data_id
column and uses it to show data with different markers.
urban_layer = (
mapper.urban_layer.with_type("streets_intersections")
.from_place("Manhattan, New York City, USA", network_type="drive")
# With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood.
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
output_column="nearest_intersection",
threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping.
)
.build()
)
## It is not possible to use from_huggingface directly in the pipeline, because the online method that supports `.build()` is from_file
## This feature should be changed in the next versions
data = um.UrbanMapper().loader.from_huggingface("oscur/pluto", number_of_rows=1000, streaming=True).with_columns("longitude", "latitude").load()
data['longitude'] = data['longitude'].astype(float)
data['latitude'] = data['latitude'].astype(float)
data.to_csv("./pluto.csv")
## It is not possible to use from_huggingface directly in the pipeline, because the online method that supports `.build()` is from_file
## This feature should be changed in the next versions
data = um.UrbanMapper().loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000, streaming=True).with_columns("pickup_longitude", "pickup_latitude").load()
data['pickup_longitude'] = data['pickup_longitude'].astype(float)
data['pickup_latitude'] = data['pickup_latitude'].astype(float)
data.to_csv("./taxisvis1M.csv")
loader1 = mapper.loader.from_file("pluto.csv").with_columns("longitude", "latitude").build()
loader2 = mapper.loader.from_file("taxisvis1M.csv").with_columns("pickup_longitude", "pickup_latitude").with_map({"pickup_longitude": "longitude", "pickup_latitude": "latitude"}).build()
# Both imputer and filter will be applied only to loader2
imputer = mapper.imputer.with_data("taxi_data").with_type("SimpleGeoImputer").on_columns("longitude", "latitude").build()
filter_step = mapper.filter.with_data("taxi_data").with_type("BoundingBoxFilter").build()
# Enricher will be applied to the dataset
enricher1 = mapper.enricher.with_data(group_by="nearest_intersection", values_from="numfloors", data_id="pluto_data").aggregate_by(method="mean", output_column="avg_floors").build()
enricher2 = mapper.enricher.with_data(group_by="pickup_segment", data_id="taxi_data").count_by(output_column="pickup_count").build()
visualiser = mapper.visual.with_type("Interactive").with_style({"tiles": "CartoDB dark_matter"}).build()
# Assemble the pipeline
# Note that a pipeline's step is a tuple with a name and the step itself.
# When more than one loader is defined, the pipeline creates a dictonary with all the loaded data and the step loader names as keys
# Later one when loading the pipeline, or sharing it, anyone can use `.get_step("step_name")` to get the step, preview it, re-run it, etc.
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("pluto_data", loader1),
("taxi_data", loader2),
("imputer", imputer),
("filter", filter_step),
("enricher1", enricher1),
("enricher2", enricher2),
("visualiser", visualiser)
]
)
# Let's preview our urban pipeline workflow
pipeline.preview()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[8], line 2 1 urban_layer = ( ----> 2 mapper.urban_layer.with_type("streets_intersections") 3 .from_place("Manhattan, New York City, USA", network_type="drive") 4 # With mapping is the equivalent of map_nearest_layer, yet this one below is prepping how should the map_nearest_layer be done by UrbanMpapper under the hood. 5 .with_mapping( 6 longitude_column="longitude", 7 latitude_column="latitude", 8 output_column="nearest_intersection", 9 threshold_distance=50, # Optional: sets a 50-meter threshold for nearest mapping. 10 ) 11 .build() 12 ) 14 ## It is not possible to use from_huggingface directly in the pipeline, because the online method that supports `.build()` is from_file 15 ## This feature should be changed in the next versions 16 data = um.UrbanMapper().loader.from_huggingface("oscur/pluto", number_of_rows=1000, streaming=True).with_columns("longitude", "latitude").load() NameError: name 'mapper' is not defined
Proceeding with Machine Learning Analysis¶
With the enriched layer in hand, let’s perform a simple machine learning task. We’ll use K-Means clustering to group street intersections based on the average number of building floors (avg_floors
).
Step 1: Prepare the Data¶
Extract the relevant feature from the enriched layer and handle any missing values.
features = enriched_gdf[['avg_floors']]
features = features.dropna()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[9], line 1 ----> 1 features = enriched_gdf[['avg_floors']] 3 features = features.dropna() NameError: name 'enriched_gdf' is not defined
Step 2: Apply K-Means Clustering¶
Cluster the intersections into 3 groups based on avg_floors
.
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=5, random_state=42)
kmeans.fit(features)
enriched_gdf['cluster'] = kmeans.labels_
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[10], line 4 1 from sklearn.cluster import KMeans 3 kmeans = KMeans(n_clusters=5, random_state=42) ----> 4 kmeans.fit(features) 6 enriched_gdf['cluster'] = kmeans.labels_ NameError: name 'features' is not defined
Step 3: Visualise the Clusters¶
Visualise the clusters on a static map using Matplotlib.
import numpy as np
import matplotlib.pyplot as plt
# Extract the cluster centroids (average floors for each cluster) and flatten to 1D
centroids = kmeans.cluster_centers_.flatten()
# Get the indices that would sort the centroids from low to high
sorted_indices = np.argsort(centroids)
# Create a mapping from original cluster labels to new sorted labels
label_mapping = {original: new for new, original in enumerate(sorted_indices)}
# Apply the mapping to the GeoDataFrame to create sorted cluster labels
enriched_gdf['cluster_sorted'] = enriched_gdf['cluster'].map(label_mapping)
# Round the sorted centroids for display in the legend
rounded_centroids = [round(centroids[idx], 1) for idx in sorted_indices]
# Set up the figure and axis for the plot
fig, ax = plt.subplots(figsize=(10, 10))
# Plot the GeoDataFrame using the sorted cluster labels and 'viridis' colormap
scatter = enriched_gdf.plot(column='cluster_sorted', ax=ax, cmap='viridis')
# Label the axes and set the title
ax.set_xlabel('Longitude')
ax.set_ylabel('Latitude')
plt.title('Street Intersections in Manhattan Clustered by Average Building Floors')
# Add a color bar to the plot
cbar = scatter.get_figure().colorbar(scatter.get_children()[0], ax=ax)
# Set the ticks and labels for the color bar based on the sorted centroids
cbar.set_ticks(range(len(sorted_indices)))
cbar.set_ticklabels([f'Avg. Floors: {centroid}' for centroid in rounded_centroids])
# Display the plot
plt.show()
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[11], line 5 2 import matplotlib.pyplot as plt 4 # Extract the cluster centroids (average floors for each cluster) and flatten to 1D ----> 5 centroids = kmeans.cluster_centers_.flatten() 7 # Get the indices that would sort the centroids from low to high 8 sorted_indices = np.argsort(centroids) AttributeError: 'KMeans' object has no attribute 'cluster_centers_'
Exporting to JupyterGIS (Optional)¶
For collaborative exploration, you can export your pipeline to JupyterGIS. Check out JupyterGIS documentation for more details.
pipeline.to_jgis(
filepath="urban_analysis.JGIS",
urban_layer_name="Manhattan Intersections",
raise_on_existing=False,
)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[12], line 1 ----> 1 pipeline.to_jgis( 2 filepath="urban_analysis.JGIS", 3 urban_layer_name="Manhattan Intersections", 4 raise_on_existing=False, 5 ) NameError: name 'pipeline' is not defined
Wrapping It Up¶
Smashing job! 🌟 You’ve built and run your first UrbanPipeline
, saved it, loaded it back, retrieved the enriched layer, and performed a machine learning analysis. You can now reuse, share, or tweak this workflow as needed!