Pipeline¶
This notebook demonstrates a streamlined UrbanMapper workflow using the UrbanPipeline
class, replicating the step-by-step example with PLUTO data in Downtown Brooklyn
. We’ll define all steps upfront, execute them in one go, and visualise the results.
Essentially, this notebook covers the Basics/[7]urban_pipeline.ipynb
example.
Data source used:
- PLUTO data from NYC Open Data. https://www.nyc.gov/content/planning/pages/resources/datasets/mappluto-pluto-change
from urban_mapper import UrbanMapper
from urban_mapper.pipeline import UrbanPipeline
# Initialise UrbanMapper
um = UrbanMapper()
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[1], line 1 ----> 1 from urban_mapper import UrbanMapper 2 from urban_mapper.pipeline import UrbanPipeline 4 # Initialise UrbanMapper File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/__init__.py:3 1 from loguru import logger ----> 3 from .mixins import ( 4 LoaderMixin, 5 EnricherMixin, 6 VisualMixin, 7 TableVisMixin, 8 AuctusSearchMixin, 9 PipelineGeneratorMixin, 10 UrbanPipelineMixin, 11 ) 12 from .modules import ( 13 LoaderBase, 14 CSVLoader, (...) 30 PipelineGeneratorFactory, 31 ) 33 from .urban_mapper import UrbanMapper File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/mixins/__init__.py:1 ----> 1 from .loader import LoaderMixin 2 from .enricher import EnricherMixin 3 from .visual import VisualMixin File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/mixins/loader.py:1 ----> 1 from urban_mapper.modules.loader.loader_factory import LoaderFactory 4 class LoaderMixin(LoaderFactory): 5 def __init__(self): File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/__init__.py:1 ----> 1 from .loader import LoaderBase, CSVLoader, ShapefileLoader, ParquetLoader 2 from .imputer import ( 3 GeoImputerBase, 4 SimpleGeoImputer, 5 AddressGeoImputer, 6 ) 7 from .filter import ( 8 GeoFilterBase, 9 BoundingBoxFilter, 10 ) File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/__init__.py:3 1 from .abc_loader import LoaderBase 2 from .loaders import CSVLoader, ShapefileLoader, ParquetLoader ----> 3 from .loader_factory import LoaderFactory 5 __all__ = [ 6 "LoaderBase", 7 "CSVLoader", (...) 10 "LoaderFactory", 11 ] File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/loader_factory.py:19 17 from urban_mapper.modules.loader.loaders.csv_loader import CSVLoader 18 from urban_mapper.modules.loader.loaders.parquet_loader import ParquetLoader ---> 19 from urban_mapper.modules.loader.loaders.raster_loader import RasterLoader # Importing RasterLoader of the new raster loader module 20 from urban_mapper.modules.loader.loaders.shapefile_loader import ShapefileLoader 21 from urban_mapper.utils import require_attributes File ~/checkouts/readthedocs.org/user_builds/urbanmapper/checkouts/70/src/urban_mapper/modules/loader/loaders/raster_loader.py:2 1 from ..abc_loader import LoaderBase ----> 2 import rasterio 3 from typing import Any 4 import numpy as np ModuleNotFoundError: No module named 'rasterio'
# Note: For the documentation interactive mode, we only query 5000 records from the dataset. Feel free to remove for a more realistic analysis.
data = (
UrbanMapper()
.loader
.from_huggingface("oscur/pluto", number_of_rows=5000, streaming=True)
.with_columns("longitude", "latitude")
.load()
)
data['longitude'] = data['longitude'].astype(float)
data['latitude'] = data['latitude'].astype(float)
data.to_csv("pluto.csv")
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[2], line 3 1 # Note: For the documentation interactive mode, we only query 5000 records from the dataset. Feel free to remove for a more realistic analysis. 2 data = ( ----> 3 UrbanMapper() 4 .loader 5 .from_huggingface("oscur/pluto", number_of_rows=5000, streaming=True) 6 .with_columns("longitude", "latitude") 7 .load() 8 ) 10 data['longitude'] = data['longitude'].astype(float) 11 data['latitude'] = data['latitude'].astype(float) NameError: name 'UrbanMapper' is not defined
Step 1: Define the Pipeline¶
Goal: Set up all components of the workflow in a single pipeline.
Input: Configurations for each UrbanMapper module.
Output: An UrbanPipeline
object ready to process data.
We define each step—urban layer, loader, imputer, filter, enricher, and visualiser—with their specific roles:
- Urban Layer: Street intersections in Downtown Brooklyn.
- Loader: PLUTO data from CSV.
- Imputer: Fills missing coordinates.
- Filter: Trims data to the bounding box.
- Enricher: Adds average floors per intersection.
- Visualiser: Prepares an interactive map.
urban_layer = (
um.urban_layer.with_type("streets_intersections")
.from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
output_column="nearest_intersection",
threshold_distance=50,
) # Recall that with mapping is to tell `map_nearest_layer` how it should map the urban data with the urban layer.
.build()
)
loader = (
um.loader.from_file("./pluto.csv").with_columns("longitude", "latitude").build()
)
imputer = (
um.imputer.with_type("SimpleGeoImputer").on_columns("longitude", "latitude").build()
)
filter_step = um.filter.with_type("BoundingBoxFilter").build()
enricher = (
um.enricher.with_data(group_by="nearest_intersection", values_from="numfloors")
.aggregate_by(method="mean", output_column="avg_floors")
.build()
)
visualiser = (
um.visual.with_type("Interactive")
.with_style({"tiles": "CartoDB dark_matter", "colorbar_text_color": "white"})
.build()
)
# Assemble the pipeline
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("loader", loader),
("imputer", imputer),
("filter", filter_step),
("enricher", enricher),
("visualiser", visualiser),
]
)
# Let's preview the urban pipeline we just created
pipeline.preview()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[3], line 2 1 urban_layer = ( ----> 2 um.urban_layer.with_type("streets_intersections") 3 .from_place("Downtown Brooklyn, New York City, USA", network_type="drive") 4 .with_mapping( 5 longitude_column="longitude", 6 latitude_column="latitude", 7 output_column="nearest_intersection", 8 threshold_distance=50, 9 ) # Recall that with mapping is to tell `map_nearest_layer` how it should map the urban data with the urban layer. 10 .build() 11 ) 13 loader = ( 14 um.loader.from_file("./pluto.csv").with_columns("longitude", "latitude").build() 15 ) 17 imputer = ( 18 um.imputer.with_type("SimpleGeoImputer").on_columns("longitude", "latitude").build() 19 ) NameError: name 'um' is not defined
Step 2: Execute the Pipeline¶
Goal: Process the data through all defined steps in one operation.
Input: The UrbanPipeline
object from Step 1.
Output: A mapped GeoDataFrame and an enriched UrbanLayer
with processed data.
The compose_transform
method runs the entire workflow—loading data, imputing, filtering, mapping, and enriching—in a single call, ensuring seamless data flow.
mapped_data, enriched_layer = pipeline.compose_transform()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[4], line 1 ----> 1 mapped_data, enriched_layer = pipeline.compose_transform() NameError: name 'pipeline' is not defined
Step 3: Visualise Results¶
Goal: Present the enriched data on an interactive map.
Input: The enriched layer from Step 2 and columns to display (avg_floors
).
Output: An interactive Folium map showing average floors per intersection.
The pipeline’s visualise
method leverages the pre-configured visualiser to generate the map directly from the enriched layer.
fig = pipeline.visualise(["avg_floors"])
fig # Display the interactive map
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[5], line 1 ----> 1 fig = pipeline.visualise(["avg_floors"]) 2 fig # Display the interactive map NameError: name 'pipeline' is not defined
Step 4: Save and Load Pipeline¶
Goal: Preserve the pipeline for future use or sharing.
Input: A file path (./my_pipeline.dill
) for saving.
Output: A saved pipeline file and a reloaded UrbanPipeline
object.
Saving with save
and loading with load
allows you to reuse or distribute your workflow effortlessly.
# Save the pipeline
pipeline.save("./my_pipeline.dill")
# Load it back
loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill")
# Preview the loaded pipeline
loaded_pipeline.preview()
# Visualise with the loaded pipeline
fig = loaded_pipeline.visualise(["avg_floors"])
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[6], line 2 1 # Save the pipeline ----> 2 pipeline.save("./my_pipeline.dill") 4 # Load it back 5 loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill") NameError: name 'pipeline' is not defined
Conclusion¶
Well done! Using UrbanPipeline
, you’ve efficiently processed and visualised PLUTO data with less code than the step-by-step approach. This method shines for its simplicity and reusability. Compare it with the Step-by-Step notebook for a detailed breakdown of each stage!