Skip to content

Pipeline

What is the pipeline tool?

The pipeline tool is a module that allows you to create a sequence of data processing steps, or "pipeline", to transform your urban layer given one or more urban datasets and some user-defined enrichments.

Meanwhile, we highly recommend to look through the Examples's Pipeline for a more hands-on introduction about the pipeline tool and its usage.

Documentation Under Alpha Construction

This documentation is in its early stages and still being developed. The API may therefore change, and some parts might be incomplete or inaccurate.

Use at your own risk, and please report anything that seems incorrect / outdated you find.

Open An Issue!

UrbanPipeline

Scikit-Learn Inspired Pipeline for Urban Mapper.

Constructs and manages pipelines integrating various urban mapper components into a cohesive workflow, handling execution order and data flow. Yet, not only, you also can save, share, export, and load pipelines, is not that great for reproducibility?

Have a look at how a pipeline could look like:

%%{init: { 'theme': 'base', 'themeVariables': { 'primaryColor': '#57068c', 'primaryTextColor': '#fff', 'primaryBorderColor': '#F49BAB', 'lineColor': '#F49BAB', 'secondaryColor': '#9B7EBD', 'tertiaryColor': '#E5D9F2' } }}%% graph LR subgraph "Data Ingestion" A["Loaders (1..*)"] B["Urban Layer (1)"] A -->|Raw data| B end subgraph "Data Preprocessing" direction TB C["Imputers (0..*)"] D["Filters (0..*)"] C -->|Imputed data| D end subgraph "Data Processing" E["Enrichers (1..*)"] end subgraph "Data Output" F["Visualiser (0, 1)"] end B -->|Spatial data| C D -->|Filtered data| E E -->|Enriched data| F

Notation: (1) = exactly one instance, (0..*) = zero or more instances, (1..*) = one or more instances, (0, 1) = zero or one instance

Note

Pipelines must be composed before transforming or visualising data. Use compose() or compose_transform().

Attributes:

Name Type Description
steps List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]

List of (name, component) tuples defining pipeline steps.

validator PipelineValidator

Validates step compatibility.

executor PipelineExecutor

Executes the pipeline steps.

Examples:

>>> import urban_mapper as um
>>> from urban_mapper.pipeline import UrbanPipeline
>>> mapper = um.UrbanMapper()
>>> steps = [
...     ("loader", mapper.loader.from_file("taxi_data.csv").with_columns("lng", "lat").build()),
...     ("streets", mapper.urban_layer.with_type("streets_roads").from_place("London, UK").build()),
...     ("count_pickups", mapper.enricher.with_data(group_by="nearest_streets").count_by(output_column="pickup_count").build()),
...     ("visualiser", mapper.visualiser.with_type("InteractiveVisualiser").build())
... ]
>>> pipeline = UrbanPipeline(steps)
>>> data, layer = pipeline.compose_transform()
>>> pipeline.visualise(["pickup_count"])
Source code in src/urban_mapper/pipeline/pipeline.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
@beartype
class UrbanPipeline:
    """`Scikit-Learn` Inspired `Pipeline` for `Urban Mapper`.

    Constructs and manages pipelines integrating various urban mapper components into a cohesive workflow,
    handling execution order and data flow. Yet, not only, you also can `save`, `share`, `export`, and `load pipelines`,
    is not that great for reproducibility?

    Have a look at how a pipeline could look like:

    <div class="mermaid">
    %%{init: {
    'theme': 'base',
    'themeVariables': {
    'primaryColor': '#57068c',
    'primaryTextColor': '#fff',
    'primaryBorderColor': '#F49BAB',
    'lineColor': '#F49BAB',
    'secondaryColor': '#9B7EBD',
    'tertiaryColor': '#E5D9F2'
      }
    }}%%
    graph LR
        subgraph "Data Ingestion"
            A["Loaders (1..*)"]
            B["Urban Layer (1)"]
            A -->|Raw data| B
        end
        subgraph "Data Preprocessing"
            direction TB
            C["Imputers (0..*)"]
            D["Filters (0..*)"]
            C -->|Imputed data| D
        end
        subgraph "Data Processing"
            E["Enrichers (1..*)"]
        end
        subgraph "Data Output"
            F["Visualiser (0, 1)"]
        end

        B -->|Spatial data| C
        D -->|Filtered data| E
        E -->|Enriched data| F
    </div>

    <p style="text-align: center; font-style: italic;">
      Notation: (1) = exactly one instance, (0..*) = zero or more instances, (1..*) = one or more instances, (0, 1) = zero or one instance
    </p>


    !!! note
        `Pipelines` must be `composed` before `transforming` or `visualising` data.
        Use `compose()` or `compose_transform()`.

    Attributes:
        steps (List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]):
            List of (name, component) tuples defining pipeline steps.
        validator (PipelineValidator): Validates step compatibility.
        executor (PipelineExecutor): Executes the pipeline steps.

    Examples:
        >>> import urban_mapper as um
        >>> from urban_mapper.pipeline import UrbanPipeline
        >>> mapper = um.UrbanMapper()
        >>> steps = [
        ...     ("loader", mapper.loader.from_file("taxi_data.csv").with_columns("lng", "lat").build()),
        ...     ("streets", mapper.urban_layer.with_type("streets_roads").from_place("London, UK").build()),
        ...     ("count_pickups", mapper.enricher.with_data(group_by="nearest_streets").count_by(output_column="pickup_count").build()),
        ...     ("visualiser", mapper.visualiser.with_type("InteractiveVisualiser").build())
        ... ]
        >>> pipeline = UrbanPipeline(steps)
        >>> data, layer = pipeline.compose_transform()
        >>> pipeline.visualise(["pickup_count"])

    """

    def __init__(
        self,
        steps: Union[
            None,
            List[
                Tuple[
                    str,
                    Union[
                        UrbanLayerBase,
                        LoaderBase,
                        GeoImputerBase,
                        GeoFilterBase,
                        EnricherBase,
                        VisualiserBase,
                        Any,
                    ],
                ]
            ],
        ] = None,
    ) -> None:
        self.steps = steps
        if steps:
            self.validator = PipelineValidator(steps)
            self.executor = PipelineExecutor(steps)

    @require_attributes_not_none("steps")
    @property
    def named_steps(self) -> Bunch:
        """Access steps by name using attribute syntax.

        !!! note "Mimicking the following by Sckit-learn"
            This property allows accessing pipeline steps using attribute-style access.
            For example, `pipeline.named_steps.loader` returns the loader step.

            See more in [named_steps of Sklearn](https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline.named_steps)

        Returns:
            Bunch: Object with step names as attributes.

        Raises:
            ValueError: If no steps are defined.

        Examples:
            >>> pipeline.named_steps.loader
        """
        return Bunch(**dict(self.steps))

    @require_attributes_not_none("steps")
    def get_step_names(self) -> List[str]:
        """List all step names in the pipeline.

        Returns:
            List[str]: Names of all steps.

        Raises:
            ValueError: If no steps are defined.

        Examples:
            >>> names = pipeline.get_step_names()
        """
        return [name for name, _ in self.steps]

    @require_attributes_not_none("steps")
    def get_step(self, name: str) -> Any:
        """Retrieve a step by its name.

        Args:
            name: Name of the step to retrieve.

        Returns:
            Any: The step’s component instance.

        Raises:
            KeyError: If step name doesn’t exist.
            ValueError: If no steps are defined.

        Examples:
            >>> loader = pipeline.get_step("loader")
        """
        for step_name, step_instance in self.steps:
            if step_name == name:
                return step_instance
        raise KeyError(f"Step '{name}' not found in pipeline.")

    @require_attributes_not_none("steps")
    def compose(self) -> "UrbanPipeline":
        """Prepare pipeline for execution without transforming.

        Validates and sets up the pipeline for subsequent transformation.

        Returns:
            UrbanPipeline: Self for chaining.

        Raises:
            ValueError: If no steps or steps are invalid.

        Examples:
            >>> pipeline.compose()
        """
        self.executor.compose()
        return self

    @require_attributes_not_none("steps")
    def transform(
        self,
    ) -> Tuple[
        Union[
            Dict[str, gpd.GeoDataFrame],
            gpd.GeoDataFrame,
        ],
        UrbanLayerBase,
    ]:
        """Execute pipeline transformation.

        Returns processed data and enriched urban layer after composition.

        Returns:
            Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

        Raises:
            ValueError: If no steps or not composed.

        Examples:
            >>> data, layer = pipeline.transform()
        """
        return self.executor.transform()

    @require_attributes_not_none("steps")
    def compose_transform(
        self,
    ) -> Tuple[
        Union[
            Dict[str, gpd.GeoDataFrame],
            gpd.GeoDataFrame,
        ],
        UrbanLayerBase,
    ]:
        """Compose and transform in one step.

        Combines composition and transformation into a single operation.

        Returns:
            Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

        Raises:
            ValueError: If no steps or steps are invalid.

        Examples:
            >>> data, layer = pipeline.compose_transform()
        """
        return self.executor.compose_transform()

    @require_attributes_not_none("steps")
    def visualise(self, result_columns: Union[str, List[str]], **kwargs: Any) -> Any:
        """Visualise pipeline results.

        Displays results using the pipeline’s visualiser.

        Args:
            result_columns: Column(s) to visualise. If more than one a widget is being displayed to select which one to visualise.
            **kwargs: Additional arguments for the visualiser.

        Returns:
            Any: Visualisation output, type depends on visualiser.

        Raises:
            ValueError: If no steps, not composed, or no visualiser.

        Examples:
            >>> pipeline.visualise(result_columns="count")
        """
        return self.executor.visualise(result_columns, **kwargs)

    @require_attributes_not_none("steps")
    def save(self, filepath: str) -> None:
        """Save pipeline to a file.

        Serialises the pipeline and its state using dill.

        Explore more about [Dill, here](https://github.com/uqfoundation/dill).

        !!! note "What if I have custom lambda functions in my own script/cell? How is that saved?"
            If you have custom lambda functions, no worries Dill deals with them pretty neatly.
            Obviously it could increase the size of the object.

        Args:
            filepath: Path to save file, must end with '.dill'.

        Raises:
            ValueError: If filepath lacks '.dill' or no steps.
            IOError: If file cannot be written.

        Examples:
            >>> pipeline.save("my_pipeline.dill")
        """
        path = Path(filepath)
        if path.suffix != ".dill":
            raise ValueError("Filepath must have '.dill' extension.")
        with open(filepath, "wb") as f:
            dill.dump(self, f)

    @staticmethod
    def load(filepath: str) -> "UrbanPipeline":
        """Load pipeline from a file.

        Deserialises a previously saved pipeline. From another paper, a friend, a teammate.

        Args:
            filepath: Path to the saved pipeline file.

        Returns:
            UrbanPipeline: Loaded pipeline instance.

        Raises:
            FileNotFoundError: If file doesn’t exist.
            IOError: If file cannot be read.

        Examples:
            >>> pipeline = um.UrbanPipeline.load("my_pipeline.dill")
        """
        with open(filepath, "rb") as f:
            pipeline = dill.load(f)
        if not pipeline.executor._composed:
            print(
                "WARNING: ",
                "Loaded pipeline has not been composed. Make sure to call compose() "
                "before using methods that require composition.",
            )
        return pipeline

    def __getitem__(self, key: str) -> Any:
        """Access step by name using dictionary syntax.

        Args:
            key: Name of the step.

        Returns:
            Any: Step’s component instance.

        Raises:
            KeyError: If step name doesn’t exist.

        Examples:
            >>> loader = pipeline["loader"]
        """
        return self.get_step(key)

    @require_attributes_not_none("steps")
    def _preview(self, format: str = "ascii") -> Union[dict, str]:
        """Generate a pipeline preview.

        Creates a representation of the pipeline and its steps. Calling in cascade,
        all steps' `.preview()` methods.

        Args:
            format: Output format ("ascii" or "json").

        Returns:
            Union[dict, str]: Preview as dictionary or string.
        """
        if format == "json":
            preview_data = {
                "pipeline": {
                    "steps": [
                        {
                            "name": name,
                            "preview": step.preview(format="json")
                            if hasattr(step, "preview")
                            else "No preview available",
                        }
                        for name, step in self.steps
                    ]
                }
            }
            return preview_data
        else:
            preview_lines = ["Urban Pipeline Preview:"]
            for i, (name, step) in enumerate(self.steps, 1):
                if hasattr(step, "preview"):
                    step_preview = step.preview(format="ascii").replace("\n", "\n    ")
                    preview_lines.append(f"Step {i}: {name}\n    {step_preview}")
                else:
                    preview_lines.append(f"Step {i}: {name}\n    No preview available")
            return "\n".join(preview_lines)

    @require_attributes_not_none("steps")
    def preview(self, format: str = "ascii") -> None:
        """Display pipeline preview.

        Prints a summary of the pipeline and its steps.Calling in cascade,
        all steps' `.preview()` methods.

        Args:
            format: Output format ("ascii" or "json").

        Raises:
            ValueError: If format is unsupported or no steps.

        Examples:
            >>> pipeline.preview()
        """
        if not self.steps:
            print("No Steps available to preview.")
            return
        preview_data = self._preview(format=format)
        if format == "ascii":
            print(preview_data)
        elif format == "json":
            print(json.dumps(preview_data, indent=2, default=str))
        else:
            raise ValueError(f"Unsupported format '{format}'.")

    @require_attributes_not_none("steps")
    def to_jgis(
        self,
        filepath: str,
        base_maps=None,
        include_urban_layer: bool = True,
        urban_layer_name: str = "Enriched Layer",
        urban_layer_type: Optional[str] = None,
        urban_layer_opacity: float = 1.0,
        additional_layers=None,
        zoom: int = 20,
        raise_on_existing: bool = True,
        **kwargs,
    ) -> None:
        """Export pipeline results to JupyterGIS document.

        !!! question "What is JupyterGIS?"

            JupyterGIS is a library that provides interactive & collaborative mapping capabilities in real time,
            all throughout your Jupyter notebooks' workflow.

            See [their documentation for further details](https://jupytergis.readthedocs.io/en/latest/).

            Creates an interactive map visualisation saved as a `.jgis` file.

        Args:
            filepath: Path to save the .jgis file.
            base_maps: List of base map configurations (default: None).
            include_urban_layer: Include urban layer in output (default: True).
            urban_layer_name: Name for urban layer (default: "Enriched Layer").
            urban_layer_type: Visualisation type (default: None, auto-detected).
            urban_layer_opacity: Layer opacity (default: 1.0).
            additional_layers: Extra layers to include (default: None).
            zoom: Initial map zoom level (default: 20).
            raise_on_existing: Raise error if file exists (default: True).
            **kwargs: Additional visualisation arguments.

        Raises:
            ValueError: If no steps or not composed.
            ImportError: If JupyterGIS isn’t installed.
            FileExistsError: If file exists and raise_on_existing is True.

        Examples:
            >>> pipeline.to_jgis("map.jgis")
        """
        if additional_layers is None:
            additional_layers = []
        if base_maps is None:
            base_maps = [
                {
                    "url": "http://basemaps.cartocdn.com/dark_all/{z}/{x}/{y}.png",
                    "attribution": "© OpenStreetMap contributors",
                    "name": "Base Map",
                    "opacity": 0.9,
                }
            ]
        if GISDocument is None:
            raise ImportError(
                "jupytergis is required for this functionality. "
                "Install it with `uv add jupytergis`."
            )
        if not self.executor._composed:
            raise ValueError("Pipeline not composed. Call compose() first.")

        if filepath and os.path.exists(filepath):
            if raise_on_existing:
                raise FileExistsError(
                    f"File already exists: {filepath}. "
                    f"Set raise_on_existing=False for less strictness or delete the file prior to running `to_jgis()`."
                )
            else:
                path = Path(filepath)
                stem = path.stem
                suffix = path.suffix
                random_str = uuid.uuid4().hex[:8]
                new_stem = f"{stem}_{random_str}"
                new_filepath = path.with_name(f"{new_stem}{suffix}")
                original_filepath = filepath
                filepath = str(new_filepath)
                logger.log(
                    "DEBUG_LOW",
                    f"File exists: {original_filepath}. Using new filename: {filepath}",
                )

        enriched_layer = self.executor.urban_layer.layer
        projection = self.executor.urban_layer.coordinate_reference_system
        bbox = enriched_layer.total_bounds
        extent = [bbox[0], bbox[1], bbox[2], bbox[3]]

        doc = GISDocument(
            path=None,
            projection=projection,
            extent=extent,
            zoom=zoom,
        )

        for bm in base_maps:
            doc.add_raster_layer(
                url=bm["url"],
                name=bm["name"],
                attribution=bm.get("attribution", ""),
                opacity=bm.get("opacity", 1.0),
            )

        if include_urban_layer:
            if urban_layer_type is None:
                geometry_type = enriched_layer.geometry.geom_type.iloc[0]
                if geometry_type in ["Point", "MultiPoint"]:
                    urban_layer_type = "circle"
                elif geometry_type in ["LineString", "MultiLineString"]:
                    urban_layer_type = "line"
                elif geometry_type in ["Polygon", "MultiPolygon"]:
                    urban_layer_type = "fill"
                else:
                    raise ValueError(f"Unsupported geometry type: {geometry_type}")

            enriched_layer = enriched_layer.replace({pd.NaT: None})
            for col in enriched_layer.columns:
                if enriched_layer[col].dtype == "object":
                    enriched_layer[col] = enriched_layer[col].apply(
                        self.serialize_value
                    )

            geojson_data = json.loads(enriched_layer.to_json())
            doc.add_geojson_layer(
                data=geojson_data,
                name=urban_layer_name,
                type=urban_layer_type,
                opacity=urban_layer_opacity,
                **kwargs,
            )

        for layer in additional_layers:
            data = layer["data"]
            if isinstance(data, gpd.GeoDataFrame):
                data = json.loads(data.to_json())
            elif not isinstance(data, dict):
                raise ValueError(
                    "Additional layer 'data' must be a GeoDataFrame or GeoJSON dict."
                )
            layer_type = layer.get("type")
            if layer_type is None:
                features = data["features"]
                if not features:
                    raise ValueError("Empty GeoJSON data in additional layer.")
                geometry_type = features[0]["geometry"]["type"]
                if geometry_type in ["Point", "MultiPoint"]:
                    layer_type = "circle"
                elif geometry_type in ["LineString", "MultiLineString"]:
                    layer_type = "line"
                elif geometry_type in ["Polygon", "MultiPolygon"]:
                    layer_type = "fill"
                else:
                    raise ValueError(f"Unsupported geometry type: {geometry_type}")
            doc.add_geojson_layer(
                data=data,
                name=layer["name"],
                type=layer_type,
                opacity=layer.get("opacity", 1.0),
                **layer.get("kwargs", {}),
            )

        doc.save_as(filepath)

    @staticmethod
    def serialize_value(value):
        if isinstance(value, datetime.datetime) or isinstance(value, pd.Timestamp):
            return value.isoformat()
        return value

named_steps property

Access steps by name using attribute syntax.

Mimicking the following by Sckit-learn

This property allows accessing pipeline steps using attribute-style access. For example, pipeline.named_steps.loader returns the loader step.

See more in named_steps of Sklearn

Returns:

Name Type Description
Bunch Bunch

Object with step names as attributes.

Raises:

Type Description
ValueError

If no steps are defined.

Examples:

>>> pipeline.named_steps.loader

get_step_names()

List all step names in the pipeline.

Returns:

Type Description
List[str]

List[str]: Names of all steps.

Raises:

Type Description
ValueError

If no steps are defined.

Examples:

>>> names = pipeline.get_step_names()
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def get_step_names(self) -> List[str]:
    """List all step names in the pipeline.

    Returns:
        List[str]: Names of all steps.

    Raises:
        ValueError: If no steps are defined.

    Examples:
        >>> names = pipeline.get_step_names()
    """
    return [name for name, _ in self.steps]

get_step(name)

Retrieve a step by its name.

Parameters:

Name Type Description Default
name str

Name of the step to retrieve.

required

Returns:

Name Type Description
Any Any

The step’s component instance.

Raises:

Type Description
KeyError

If step name doesn’t exist.

ValueError

If no steps are defined.

Examples:

>>> loader = pipeline.get_step("loader")
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def get_step(self, name: str) -> Any:
    """Retrieve a step by its name.

    Args:
        name: Name of the step to retrieve.

    Returns:
        Any: The step’s component instance.

    Raises:
        KeyError: If step name doesn’t exist.
        ValueError: If no steps are defined.

    Examples:
        >>> loader = pipeline.get_step("loader")
    """
    for step_name, step_instance in self.steps:
        if step_name == name:
            return step_instance
    raise KeyError(f"Step '{name}' not found in pipeline.")

compose()

Prepare pipeline for execution without transforming.

Validates and sets up the pipeline for subsequent transformation.

Returns:

Name Type Description
UrbanPipeline UrbanPipeline

Self for chaining.

Raises:

Type Description
ValueError

If no steps or steps are invalid.

Examples:

>>> pipeline.compose()
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def compose(self) -> "UrbanPipeline":
    """Prepare pipeline for execution without transforming.

    Validates and sets up the pipeline for subsequent transformation.

    Returns:
        UrbanPipeline: Self for chaining.

    Raises:
        ValueError: If no steps or steps are invalid.

    Examples:
        >>> pipeline.compose()
    """
    self.executor.compose()
    return self

transform()

Execute pipeline transformation.

Returns processed data and enriched urban layer after composition.

Returns:

Type Description
Tuple[Union[Dict[str, GeoDataFrame], GeoDataFrame], UrbanLayerBase]

Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

Raises:

Type Description
ValueError

If no steps or not composed.

Examples:

>>> data, layer = pipeline.transform()
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def transform(
    self,
) -> Tuple[
    Union[
        Dict[str, gpd.GeoDataFrame],
        gpd.GeoDataFrame,
    ],
    UrbanLayerBase,
]:
    """Execute pipeline transformation.

    Returns processed data and enriched urban layer after composition.

    Returns:
        Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

    Raises:
        ValueError: If no steps or not composed.

    Examples:
        >>> data, layer = pipeline.transform()
    """
    return self.executor.transform()

compose_transform()

Compose and transform in one step.

Combines composition and transformation into a single operation.

Returns:

Type Description
Tuple[Union[Dict[str, GeoDataFrame], GeoDataFrame], UrbanLayerBase]

Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

Raises:

Type Description
ValueError

If no steps or steps are invalid.

Examples:

>>> data, layer = pipeline.compose_transform()
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def compose_transform(
    self,
) -> Tuple[
    Union[
        Dict[str, gpd.GeoDataFrame],
        gpd.GeoDataFrame,
    ],
    UrbanLayerBase,
]:
    """Compose and transform in one step.

    Combines composition and transformation into a single operation.

    Returns:
        Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

    Raises:
        ValueError: If no steps or steps are invalid.

    Examples:
        >>> data, layer = pipeline.compose_transform()
    """
    return self.executor.compose_transform()

visualise(result_columns, **kwargs)

Visualise pipeline results.

Displays results using the pipeline’s visualiser.

Parameters:

Name Type Description Default
result_columns Union[str, List[str]]

Column(s) to visualise. If more than one a widget is being displayed to select which one to visualise.

required
**kwargs Any

Additional arguments for the visualiser.

{}

Returns:

Name Type Description
Any Any

Visualisation output, type depends on visualiser.

Raises:

Type Description
ValueError

If no steps, not composed, or no visualiser.

Examples:

>>> pipeline.visualise(result_columns="count")
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def visualise(self, result_columns: Union[str, List[str]], **kwargs: Any) -> Any:
    """Visualise pipeline results.

    Displays results using the pipeline’s visualiser.

    Args:
        result_columns: Column(s) to visualise. If more than one a widget is being displayed to select which one to visualise.
        **kwargs: Additional arguments for the visualiser.

    Returns:
        Any: Visualisation output, type depends on visualiser.

    Raises:
        ValueError: If no steps, not composed, or no visualiser.

    Examples:
        >>> pipeline.visualise(result_columns="count")
    """
    return self.executor.visualise(result_columns, **kwargs)

save(filepath)

Save pipeline to a file.

Serialises the pipeline and its state using dill.

Explore more about Dill, here.

What if I have custom lambda functions in my own script/cell? How is that saved?

If you have custom lambda functions, no worries Dill deals with them pretty neatly. Obviously it could increase the size of the object.

Parameters:

Name Type Description Default
filepath str

Path to save file, must end with '.dill'.

required

Raises:

Type Description
ValueError

If filepath lacks '.dill' or no steps.

IOError

If file cannot be written.

Examples:

>>> pipeline.save("my_pipeline.dill")
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def save(self, filepath: str) -> None:
    """Save pipeline to a file.

    Serialises the pipeline and its state using dill.

    Explore more about [Dill, here](https://github.com/uqfoundation/dill).

    !!! note "What if I have custom lambda functions in my own script/cell? How is that saved?"
        If you have custom lambda functions, no worries Dill deals with them pretty neatly.
        Obviously it could increase the size of the object.

    Args:
        filepath: Path to save file, must end with '.dill'.

    Raises:
        ValueError: If filepath lacks '.dill' or no steps.
        IOError: If file cannot be written.

    Examples:
        >>> pipeline.save("my_pipeline.dill")
    """
    path = Path(filepath)
    if path.suffix != ".dill":
        raise ValueError("Filepath must have '.dill' extension.")
    with open(filepath, "wb") as f:
        dill.dump(self, f)

load(filepath) staticmethod

Load pipeline from a file.

Deserialises a previously saved pipeline. From another paper, a friend, a teammate.

Parameters:

Name Type Description Default
filepath str

Path to the saved pipeline file.

required

Returns:

Name Type Description
UrbanPipeline UrbanPipeline

Loaded pipeline instance.

Raises:

Type Description
FileNotFoundError

If file doesn’t exist.

IOError

If file cannot be read.

Examples:

>>> pipeline = um.UrbanPipeline.load("my_pipeline.dill")
Source code in src/urban_mapper/pipeline/pipeline.py
@staticmethod
def load(filepath: str) -> "UrbanPipeline":
    """Load pipeline from a file.

    Deserialises a previously saved pipeline. From another paper, a friend, a teammate.

    Args:
        filepath: Path to the saved pipeline file.

    Returns:
        UrbanPipeline: Loaded pipeline instance.

    Raises:
        FileNotFoundError: If file doesn’t exist.
        IOError: If file cannot be read.

    Examples:
        >>> pipeline = um.UrbanPipeline.load("my_pipeline.dill")
    """
    with open(filepath, "rb") as f:
        pipeline = dill.load(f)
    if not pipeline.executor._composed:
        print(
            "WARNING: ",
            "Loaded pipeline has not been composed. Make sure to call compose() "
            "before using methods that require composition.",
        )
    return pipeline

__getitem__(key)

Access step by name using dictionary syntax.

Parameters:

Name Type Description Default
key str

Name of the step.

required

Returns:

Name Type Description
Any Any

Step’s component instance.

Raises:

Type Description
KeyError

If step name doesn’t exist.

Examples:

>>> loader = pipeline["loader"]
Source code in src/urban_mapper/pipeline/pipeline.py
def __getitem__(self, key: str) -> Any:
    """Access step by name using dictionary syntax.

    Args:
        key: Name of the step.

    Returns:
        Any: Step’s component instance.

    Raises:
        KeyError: If step name doesn’t exist.

    Examples:
        >>> loader = pipeline["loader"]
    """
    return self.get_step(key)

preview(format='ascii')

Display pipeline preview.

Prints a summary of the pipeline and its steps.Calling in cascade, all steps' .preview() methods.

Parameters:

Name Type Description Default
format str

Output format ("ascii" or "json").

'ascii'

Raises:

Type Description
ValueError

If format is unsupported or no steps.

Examples:

>>> pipeline.preview()
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def preview(self, format: str = "ascii") -> None:
    """Display pipeline preview.

    Prints a summary of the pipeline and its steps.Calling in cascade,
    all steps' `.preview()` methods.

    Args:
        format: Output format ("ascii" or "json").

    Raises:
        ValueError: If format is unsupported or no steps.

    Examples:
        >>> pipeline.preview()
    """
    if not self.steps:
        print("No Steps available to preview.")
        return
    preview_data = self._preview(format=format)
    if format == "ascii":
        print(preview_data)
    elif format == "json":
        print(json.dumps(preview_data, indent=2, default=str))
    else:
        raise ValueError(f"Unsupported format '{format}'.")

to_jgis(filepath, base_maps=None, include_urban_layer=True, urban_layer_name='Enriched Layer', urban_layer_type=None, urban_layer_opacity=1.0, additional_layers=None, zoom=20, raise_on_existing=True, **kwargs)

Export pipeline results to JupyterGIS document.

What is JupyterGIS?

JupyterGIS is a library that provides interactive & collaborative mapping capabilities in real time, all throughout your Jupyter notebooks' workflow.

See their documentation for further details.

Creates an interactive map visualisation saved as a .jgis file.

Parameters:

Name Type Description Default
filepath str

Path to save the .jgis file.

required
base_maps

List of base map configurations (default: None).

None
include_urban_layer bool

Include urban layer in output (default: True).

True
urban_layer_name str

Name for urban layer (default: "Enriched Layer").

'Enriched Layer'
urban_layer_type Optional[str]

Visualisation type (default: None, auto-detected).

None
urban_layer_opacity float

Layer opacity (default: 1.0).

1.0
additional_layers

Extra layers to include (default: None).

None
zoom int

Initial map zoom level (default: 20).

20
raise_on_existing bool

Raise error if file exists (default: True).

True
**kwargs

Additional visualisation arguments.

{}

Raises:

Type Description
ValueError

If no steps or not composed.

ImportError

If JupyterGIS isn’t installed.

FileExistsError

If file exists and raise_on_existing is True.

Examples:

>>> pipeline.to_jgis("map.jgis")
Source code in src/urban_mapper/pipeline/pipeline.py
@require_attributes_not_none("steps")
def to_jgis(
    self,
    filepath: str,
    base_maps=None,
    include_urban_layer: bool = True,
    urban_layer_name: str = "Enriched Layer",
    urban_layer_type: Optional[str] = None,
    urban_layer_opacity: float = 1.0,
    additional_layers=None,
    zoom: int = 20,
    raise_on_existing: bool = True,
    **kwargs,
) -> None:
    """Export pipeline results to JupyterGIS document.

    !!! question "What is JupyterGIS?"

        JupyterGIS is a library that provides interactive & collaborative mapping capabilities in real time,
        all throughout your Jupyter notebooks' workflow.

        See [their documentation for further details](https://jupytergis.readthedocs.io/en/latest/).

        Creates an interactive map visualisation saved as a `.jgis` file.

    Args:
        filepath: Path to save the .jgis file.
        base_maps: List of base map configurations (default: None).
        include_urban_layer: Include urban layer in output (default: True).
        urban_layer_name: Name for urban layer (default: "Enriched Layer").
        urban_layer_type: Visualisation type (default: None, auto-detected).
        urban_layer_opacity: Layer opacity (default: 1.0).
        additional_layers: Extra layers to include (default: None).
        zoom: Initial map zoom level (default: 20).
        raise_on_existing: Raise error if file exists (default: True).
        **kwargs: Additional visualisation arguments.

    Raises:
        ValueError: If no steps or not composed.
        ImportError: If JupyterGIS isn’t installed.
        FileExistsError: If file exists and raise_on_existing is True.

    Examples:
        >>> pipeline.to_jgis("map.jgis")
    """
    if additional_layers is None:
        additional_layers = []
    if base_maps is None:
        base_maps = [
            {
                "url": "http://basemaps.cartocdn.com/dark_all/{z}/{x}/{y}.png",
                "attribution": "© OpenStreetMap contributors",
                "name": "Base Map",
                "opacity": 0.9,
            }
        ]
    if GISDocument is None:
        raise ImportError(
            "jupytergis is required for this functionality. "
            "Install it with `uv add jupytergis`."
        )
    if not self.executor._composed:
        raise ValueError("Pipeline not composed. Call compose() first.")

    if filepath and os.path.exists(filepath):
        if raise_on_existing:
            raise FileExistsError(
                f"File already exists: {filepath}. "
                f"Set raise_on_existing=False for less strictness or delete the file prior to running `to_jgis()`."
            )
        else:
            path = Path(filepath)
            stem = path.stem
            suffix = path.suffix
            random_str = uuid.uuid4().hex[:8]
            new_stem = f"{stem}_{random_str}"
            new_filepath = path.with_name(f"{new_stem}{suffix}")
            original_filepath = filepath
            filepath = str(new_filepath)
            logger.log(
                "DEBUG_LOW",
                f"File exists: {original_filepath}. Using new filename: {filepath}",
            )

    enriched_layer = self.executor.urban_layer.layer
    projection = self.executor.urban_layer.coordinate_reference_system
    bbox = enriched_layer.total_bounds
    extent = [bbox[0], bbox[1], bbox[2], bbox[3]]

    doc = GISDocument(
        path=None,
        projection=projection,
        extent=extent,
        zoom=zoom,
    )

    for bm in base_maps:
        doc.add_raster_layer(
            url=bm["url"],
            name=bm["name"],
            attribution=bm.get("attribution", ""),
            opacity=bm.get("opacity", 1.0),
        )

    if include_urban_layer:
        if urban_layer_type is None:
            geometry_type = enriched_layer.geometry.geom_type.iloc[0]
            if geometry_type in ["Point", "MultiPoint"]:
                urban_layer_type = "circle"
            elif geometry_type in ["LineString", "MultiLineString"]:
                urban_layer_type = "line"
            elif geometry_type in ["Polygon", "MultiPolygon"]:
                urban_layer_type = "fill"
            else:
                raise ValueError(f"Unsupported geometry type: {geometry_type}")

        enriched_layer = enriched_layer.replace({pd.NaT: None})
        for col in enriched_layer.columns:
            if enriched_layer[col].dtype == "object":
                enriched_layer[col] = enriched_layer[col].apply(
                    self.serialize_value
                )

        geojson_data = json.loads(enriched_layer.to_json())
        doc.add_geojson_layer(
            data=geojson_data,
            name=urban_layer_name,
            type=urban_layer_type,
            opacity=urban_layer_opacity,
            **kwargs,
        )

    for layer in additional_layers:
        data = layer["data"]
        if isinstance(data, gpd.GeoDataFrame):
            data = json.loads(data.to_json())
        elif not isinstance(data, dict):
            raise ValueError(
                "Additional layer 'data' must be a GeoDataFrame or GeoJSON dict."
            )
        layer_type = layer.get("type")
        if layer_type is None:
            features = data["features"]
            if not features:
                raise ValueError("Empty GeoJSON data in additional layer.")
            geometry_type = features[0]["geometry"]["type"]
            if geometry_type in ["Point", "MultiPoint"]:
                layer_type = "circle"
            elif geometry_type in ["LineString", "MultiLineString"]:
                layer_type = "line"
            elif geometry_type in ["Polygon", "MultiPolygon"]:
                layer_type = "fill"
            else:
                raise ValueError(f"Unsupported geometry type: {geometry_type}")
        doc.add_geojson_layer(
            data=data,
            name=layer["name"],
            type=layer_type,
            opacity=layer.get("opacity", 1.0),
            **layer.get("kwargs", {}),
        )

    doc.save_as(filepath)

PipelineExecutor

Executor for Pipeline Steps in UrbanMapper Pipeline.

Orchestrates the execution of pipeline steps in a predefined order, managing data loading, processing, and enrichment. As a bonus, it also displays a progress bar during execution.

Attributes:

Name Type Description
steps List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]

List of (name, component) tuples representing the pipeline steps.

data Optional[GeoDataFrame]

Processed GeoDataFrame, populated after execution.

urban_layer Optional[UrbanLayerBase]

Enriched urban layer instance, set after execution.

_composed bool

Indicates if the pipeline has been composed.

Examples:

>>> import urban_mapper as um
>>> from urban_mapper.pipeline import UrbanPipeline
>>> mapper = um.UrbanMapper()
>>> steps = [
...     ("loader", mapper.loader.from_file("data.csv").with_columns("lon", "lat").build()),
...     ("streets", mapper.urban_layer.with_type("streets_roads").from_place("London, UK").build())
... ]
>>> executor = UrbanPipeline(steps)
>>> executor.compose()
>>> data, layer = executor.transform()
>>> 👆 Hint: You can `compose_transform()` all in one go!
Source code in src/urban_mapper/pipeline/executor.py
@beartype
class PipelineExecutor:
    """Executor for `Pipeline Steps` in `UrbanMapper Pipeline`.

    Orchestrates the execution of pipeline `steps` in a `predefined order`, managing `data loading`,
    `processing`, and `enrichment`. As a bonus, it also displays a progress bar during execution.

    Attributes:
        steps (List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]):
            List of (name, component) tuples representing the pipeline steps.
        data (Optional[gpd.GeoDataFrame]): Processed GeoDataFrame, populated after execution.
        urban_layer (Optional[UrbanLayerBase]): Enriched urban layer instance, set after execution.
        _composed (bool): Indicates if the pipeline has been composed.

    Examples:
        >>> import urban_mapper as um
        >>> from urban_mapper.pipeline import UrbanPipeline
        >>> mapper = um.UrbanMapper()
        >>> steps = [
        ...     ("loader", mapper.loader.from_file("data.csv").with_columns("lon", "lat").build()),
        ...     ("streets", mapper.urban_layer.with_type("streets_roads").from_place("London, UK").build())
        ... ]
        >>> executor = UrbanPipeline(steps)
        >>> executor.compose()
        >>> data, layer = executor.transform()
        >>> 👆 Hint: You can `compose_transform()` all in one go!
    """

    def __init__(
        self,
        steps: List[
            Tuple[
                str,
                Union[
                    UrbanLayerBase,
                    LoaderBase,
                    GeoImputerBase,
                    GeoFilterBase,
                    EnricherBase,
                    VisualiserBase,
                    Any,
                ],
            ]
        ],
    ) -> None:
        self.steps = steps
        self.data: Optional[Dict[str, gpd.GeoDataFrame]] = None
        self.urban_layer: Optional[UrbanLayerBase] = None
        self._composed: bool = False

    def compose(
        self,
    ) -> None:
        """Compose and Execute Pipeline Steps.

        !!! tip "Steps Execution Order"
            - [x] Load datasets
            - [x] Apply imputers
            - [x] Apply filters
            - [x] Map to urban layer
            - [x] Enrich urban layer

        Raises:
            ValueError: If pipeline is already composed or lacks required steps (loader, urban layer).

        Examples:
            >>> executor.compose()  # Executes all steps with progress updates
        """
        if self._composed:
            raise ValueError(
                "Pipeline already composed. Please re instantiate your pipeline and its steps."
            )
        urban_layer_step = next(
            (
                (name, step)
                for name, step in self.steps
                if isinstance(step, UrbanLayerBase)
            ),
            None,
        )
        if urban_layer_step is None:
            raise ValueError("Pipeline must include exactly one UrbanLayerBase step.")
        urban_layer_name, urban_layer_instance = urban_layer_step

        num_loaders = sum(isinstance(step, LoaderBase) for _, step in self.steps)
        num_imputers = sum(isinstance(step, GeoImputerBase) for _, step in self.steps)
        num_filters = sum(isinstance(step, GeoFilterBase) for _, step in self.steps)
        num_enrichers = sum(isinstance(step, EnricherBase) for _, step in self.steps)
        total_steps = 1 + num_loaders + num_imputers + num_filters + num_enrichers

        if num_loaders == 0:
            raise ValueError("Pipeline must include exactly one LoaderBase step.")

        with alive_bar(
            total_steps,
            title="Pipeline Progress",
            force_tty=True,
            dual_line=False,
        ) as bar:
            self.data = None if num_loaders == 1 else {}

            for name, step in self.steps:
                if isinstance(step, LoaderBase):
                    bar()
                    bar.title = f"~> Loading: {name}..."

                    if num_loaders == 1:
                        self.data = step.load_data_from_file()
                    else:
                        self.data[name] = step.load_data_from_file()

            for name, step in self.steps:
                if isinstance(step, GeoImputerBase):
                    bar()
                    bar.title = f"~> Applying imputer: {name}..."
                    self.data = step.transform(self.data, urban_layer_instance)

            for name, step in self.steps:
                if isinstance(step, GeoFilterBase):
                    bar()
                    bar.title = f"~> Applying filter: {name}..."
                    self.data = step.transform(self.data, urban_layer_instance)

            bar()
            bar.title = (
                f"~> Let's spatial join the {urban_layer_name} layer with the data..."
            )
            _, mapped_data = urban_layer_instance.map_nearest_layer(self.data)
            self.data = mapped_data

            for name, step in self.steps:
                if isinstance(step, EnricherBase):
                    bar()
                    bar.title = f"~> Applying enricher: {name}..."
                    urban_layer_instance = step.enrich(self.data, urban_layer_instance)

            self.urban_layer = urban_layer_instance
            self._composed = True
            bar()
            bar.title = f"🗺️ Successfully composed pipeline with {total_steps} steps!"

    def transform(
        self,
    ) -> Tuple[
        Union[
            Dict[str, gpd.GeoDataFrame],
            gpd.GeoDataFrame,
        ],
        UrbanLayerBase,
    ]:
        """Retrieve Results of `Pipeline Execution`.

        Returns processed data and enriched urban layer post-composition.

        Returns:
            Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

        Raises:
            ValueError: If pipeline hasn’t been composed.

        Examples:
            >>> data, layer = executor.transform()
        """
        if not self._composed:
            raise ValueError("Pipeline not composed. Call compose() first.")
        return self.data, self.urban_layer

    def compose_transform(
        self,
    ) -> Tuple[
        Union[
            Dict[str, gpd.GeoDataFrame],
            gpd.GeoDataFrame,
        ],
        UrbanLayerBase,
    ]:
        """Compose and Transform in One Step.

        Combines compose and transform operations.

        Returns:
            Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

        Raises:
            ValueError: If pipeline is already composed or lacks required steps.

        Examples:
            >>> data, layer = executor.compose_transform()
        """
        self.compose()
        return self.transform()

    def visualise(self, result_columns: Union[str, List[str]], **kwargs: Any) -> Any:
        """Visualise Pipeline Results.

        Uses the pipeline’s visualiser to display results based on specified columns.

        !!! note "If no visualiser is defined"
            If no visualiser is defined in the pipeline, a ValueError will be raised.

            Please make sure to include a visualiser step in your pipeline.

        Args:
            result_columns: Column(s) to visualise from the urban layer.
            **kwargs: Additional arguments for the visualiser’s render method.

        Returns:
            Any: Visualisation output, type depends on visualiser.

        Raises:
            ValueError: If pipeline isn’t composed or lacks a visualiser.

        Examples:
            >>> executor.visualise(result_columns="count")
        """
        if not self._composed:
            raise ValueError("Pipeline not composed. Call compose() first.")
        visualiser = next(
            (
                instance
                for _, instance in self.steps
                if isinstance(instance, VisualiserBase)
            ),
            None,
        )
        if not visualiser:
            raise ValueError("No VisualiserBase step defined.")
        return visualiser.render(
            urban_layer_geodataframe=self.urban_layer.layer,
            columns=result_columns,
            **kwargs,
        )

compose()

Compose and Execute Pipeline Steps.

Steps Execution Order

  • Load datasets
  • Apply imputers
  • Apply filters
  • Map to urban layer
  • Enrich urban layer

Raises:

Type Description
ValueError

If pipeline is already composed or lacks required steps (loader, urban layer).

Examples:

>>> executor.compose()  # Executes all steps with progress updates
Source code in src/urban_mapper/pipeline/executor.py
def compose(
    self,
) -> None:
    """Compose and Execute Pipeline Steps.

    !!! tip "Steps Execution Order"
        - [x] Load datasets
        - [x] Apply imputers
        - [x] Apply filters
        - [x] Map to urban layer
        - [x] Enrich urban layer

    Raises:
        ValueError: If pipeline is already composed or lacks required steps (loader, urban layer).

    Examples:
        >>> executor.compose()  # Executes all steps with progress updates
    """
    if self._composed:
        raise ValueError(
            "Pipeline already composed. Please re instantiate your pipeline and its steps."
        )
    urban_layer_step = next(
        (
            (name, step)
            for name, step in self.steps
            if isinstance(step, UrbanLayerBase)
        ),
        None,
    )
    if urban_layer_step is None:
        raise ValueError("Pipeline must include exactly one UrbanLayerBase step.")
    urban_layer_name, urban_layer_instance = urban_layer_step

    num_loaders = sum(isinstance(step, LoaderBase) for _, step in self.steps)
    num_imputers = sum(isinstance(step, GeoImputerBase) for _, step in self.steps)
    num_filters = sum(isinstance(step, GeoFilterBase) for _, step in self.steps)
    num_enrichers = sum(isinstance(step, EnricherBase) for _, step in self.steps)
    total_steps = 1 + num_loaders + num_imputers + num_filters + num_enrichers

    if num_loaders == 0:
        raise ValueError("Pipeline must include exactly one LoaderBase step.")

    with alive_bar(
        total_steps,
        title="Pipeline Progress",
        force_tty=True,
        dual_line=False,
    ) as bar:
        self.data = None if num_loaders == 1 else {}

        for name, step in self.steps:
            if isinstance(step, LoaderBase):
                bar()
                bar.title = f"~> Loading: {name}..."

                if num_loaders == 1:
                    self.data = step.load_data_from_file()
                else:
                    self.data[name] = step.load_data_from_file()

        for name, step in self.steps:
            if isinstance(step, GeoImputerBase):
                bar()
                bar.title = f"~> Applying imputer: {name}..."
                self.data = step.transform(self.data, urban_layer_instance)

        for name, step in self.steps:
            if isinstance(step, GeoFilterBase):
                bar()
                bar.title = f"~> Applying filter: {name}..."
                self.data = step.transform(self.data, urban_layer_instance)

        bar()
        bar.title = (
            f"~> Let's spatial join the {urban_layer_name} layer with the data..."
        )
        _, mapped_data = urban_layer_instance.map_nearest_layer(self.data)
        self.data = mapped_data

        for name, step in self.steps:
            if isinstance(step, EnricherBase):
                bar()
                bar.title = f"~> Applying enricher: {name}..."
                urban_layer_instance = step.enrich(self.data, urban_layer_instance)

        self.urban_layer = urban_layer_instance
        self._composed = True
        bar()
        bar.title = f"🗺️ Successfully composed pipeline with {total_steps} steps!"

transform()

Retrieve Results of Pipeline Execution.

Returns processed data and enriched urban layer post-composition.

Returns:

Type Description
Tuple[Union[Dict[str, GeoDataFrame], GeoDataFrame], UrbanLayerBase]

Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

Raises:

Type Description
ValueError

If pipeline hasn’t been composed.

Examples:

>>> data, layer = executor.transform()
Source code in src/urban_mapper/pipeline/executor.py
def transform(
    self,
) -> Tuple[
    Union[
        Dict[str, gpd.GeoDataFrame],
        gpd.GeoDataFrame,
    ],
    UrbanLayerBase,
]:
    """Retrieve Results of `Pipeline Execution`.

    Returns processed data and enriched urban layer post-composition.

    Returns:
        Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

    Raises:
        ValueError: If pipeline hasn’t been composed.

    Examples:
        >>> data, layer = executor.transform()
    """
    if not self._composed:
        raise ValueError("Pipeline not composed. Call compose() first.")
    return self.data, self.urban_layer

compose_transform()

Compose and Transform in One Step.

Combines compose and transform operations.

Returns:

Type Description
Tuple[Union[Dict[str, GeoDataFrame], GeoDataFrame], UrbanLayerBase]

Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

Raises:

Type Description
ValueError

If pipeline is already composed or lacks required steps.

Examples:

>>> data, layer = executor.compose_transform()
Source code in src/urban_mapper/pipeline/executor.py
def compose_transform(
    self,
) -> Tuple[
    Union[
        Dict[str, gpd.GeoDataFrame],
        gpd.GeoDataFrame,
    ],
    UrbanLayerBase,
]:
    """Compose and Transform in One Step.

    Combines compose and transform operations.

    Returns:
        Tuple[Union[Dict[str, gpd.GeoDataFrame], gpd.GeoDataFrame], UrbanLayerBase]: Processed data and urban layer.

    Raises:
        ValueError: If pipeline is already composed or lacks required steps.

    Examples:
        >>> data, layer = executor.compose_transform()
    """
    self.compose()
    return self.transform()

visualise(result_columns, **kwargs)

Visualise Pipeline Results.

Uses the pipeline’s visualiser to display results based on specified columns.

If no visualiser is defined

If no visualiser is defined in the pipeline, a ValueError will be raised.

Please make sure to include a visualiser step in your pipeline.

Parameters:

Name Type Description Default
result_columns Union[str, List[str]]

Column(s) to visualise from the urban layer.

required
**kwargs Any

Additional arguments for the visualiser’s render method.

{}

Returns:

Name Type Description
Any Any

Visualisation output, type depends on visualiser.

Raises:

Type Description
ValueError

If pipeline isn’t composed or lacks a visualiser.

Examples:

>>> executor.visualise(result_columns="count")
Source code in src/urban_mapper/pipeline/executor.py
def visualise(self, result_columns: Union[str, List[str]], **kwargs: Any) -> Any:
    """Visualise Pipeline Results.

    Uses the pipeline’s visualiser to display results based on specified columns.

    !!! note "If no visualiser is defined"
        If no visualiser is defined in the pipeline, a ValueError will be raised.

        Please make sure to include a visualiser step in your pipeline.

    Args:
        result_columns: Column(s) to visualise from the urban layer.
        **kwargs: Additional arguments for the visualiser’s render method.

    Returns:
        Any: Visualisation output, type depends on visualiser.

    Raises:
        ValueError: If pipeline isn’t composed or lacks a visualiser.

    Examples:
        >>> executor.visualise(result_columns="count")
    """
    if not self._composed:
        raise ValueError("Pipeline not composed. Call compose() first.")
    visualiser = next(
        (
            instance
            for _, instance in self.steps
            if isinstance(instance, VisualiserBase)
        ),
        None,
    )
    if not visualiser:
        raise ValueError("No VisualiserBase step defined.")
    return visualiser.render(
        urban_layer_geodataframe=self.urban_layer.layer,
        columns=result_columns,
        **kwargs,
    )

PipelineValidator

Validator for Pipeline Steps.

The Stricter The Better!

To avoid side-effects, the validator is strict about the types of components it accepts. The number of components of each type is also strictly enforced.

Schema Key Component Type Class Path Min Max
urban_layer Urban Layer urban_mapper.modules.urban_layer.UrbanLayerBase 1 1
loader Loader urban_mapper.modules.loader.LoaderBase 1 1
geo_imputer Geo Imputer urban_mapper.modules.imputer.GeoImputerBase 0 unlimited
geo_filter Geo Filter urban_mapper.modules.filter.GeoFilterBase 0 unlimited
enricher Enricher urban_mapper.modules.enricher.EnricherBase 1 unlimited
visualiser Visualiser urban_mapper.modules.visualiser.VisualiserBase 0 1

Information About The Table Above

  • Min and Max indicate the allowed number of components of each type in the pipeline.
  • A Min of 1 means the component is required; 0 means it’s optional.
  • unlimited in the Max column means you can include as many instances as needed—great for stacking multiple enrichers or filters to enhance your analysis.

Ensures pipeline steps comply with schema requirements, checking uniqueness, counts, and types.

Attributes:

Name Type Description
steps List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]

List of (name, component) tuples to validate.

pipeline_schema Dict[Type[Any], Dict[str, int]]

Schema defining step requirements.

Examples:

>>> validator = um.PipelineValidator(steps)  # Validation occurs on init
Source code in src/urban_mapper/pipeline/validator.py
@beartype
class PipelineValidator:
    """Validator for Pipeline Steps.

    !!! note "The Stricter The Better!"
        To avoid side-effects, the validator is strict about the types of components
        it accepts. The number of components of each type is also strictly enforced.


        | Schema Key  | Component Type    | Class Path                                    | Min | Max       |
        |-------------|-------------------|-----------------------------------------------|-----|-----------|
        | urban_layer | Urban Layer       | `urban_mapper.modules.urban_layer.UrbanLayerBase` | 1   | 1         |
        | loader      | Loader            | `urban_mapper.modules.loader.LoaderBase`         | 1   | 1         |
        | geo_imputer | Geo Imputer       | `urban_mapper.modules.imputer.GeoImputerBase`    | 0   | unlimited |
        | geo_filter  | Geo Filter        | `urban_mapper.modules.filter.GeoFilterBase`      | 0   | unlimited |
        | enricher    | Enricher          | `urban_mapper.modules.enricher.EnricherBase`     | 1   | unlimited |
        | visualiser  | Visualiser        | `urban_mapper.modules.visualiser.VisualiserBase` | 0   | 1         |

        Information About The Table Above

        - [x] **Min** and **Max** indicate the allowed number of components of each type in the pipeline.
        - [x] A **Min** of `1` means the component is required; `0` means it’s optional.
        - [x] **unlimited** in the Max column means you can include as many instances as needed—great for stacking multiple enrichers or filters to enhance your analysis.

    Ensures pipeline steps comply with schema requirements, checking uniqueness, counts, and types.

    Attributes:
        steps (List[Tuple[str, Union[UrbanLayerBase, LoaderBase, GeoImputerBase, GeoFilterBase, EnricherBase, VisualiserBase, Any]]]):
            List of (name, component) tuples to validate.
        pipeline_schema (Dict[Type[Any], Dict[str, int]]): Schema defining step requirements.

    Examples:
        >>> validator = um.PipelineValidator(steps)  # Validation occurs on init
    """

    def __init__(
        self,
        steps: List[
            Tuple[
                str,
                Union[
                    UrbanLayerBase,
                    LoaderBase,
                    GeoImputerBase,
                    GeoFilterBase,
                    EnricherBase,
                    VisualiserBase,
                    Any,
                ],
            ]
        ],
    ) -> None:
        self.steps = steps
        self.pipeline_schema = container.pipeline_schema()
        self._validate_steps()

    def _validate_steps(self) -> None:
        """Validate pipeline steps against schema.

        Checks `uniqueness of names`, `valid types`, and `count constraints`.

        Raises:
            ValueError: If names are duplicated or counts don’t meet schema.
            TypeError: If step type isn’t valid.
        """
        step_counts: Dict[Type[Any], int] = {
            cls: 0 for cls in self.pipeline_schema.keys()
        }
        unique_names = set()

        for name, instance in self.steps:
            if name in unique_names:
                raise ValueError(
                    f"Duplicate step name '{name}'. Step names must be unique."
                )
            unique_names.add(name)

            cls = instance.__class__
            found = False
            for base_class in self.pipeline_schema.keys():
                if issubclass(cls, base_class):
                    step_counts[base_class] += 1
                    found = True
                    break
            if not found:
                raise TypeError(
                    f"Step '{name}' is not an instance of a valid step class."
                    f"It is currently of type '{cls.__name__}'. "
                    f"Did you forget to call .build() on this step?"
                )

        for base_class, constraints in self.pipeline_schema.items():
            count = step_counts[base_class]
            min_count = constraints["min"]
            max_count = constraints["max"]
            if count < min_count:
                raise ValueError(
                    f"At least {min_count} {base_class.__name__} step(s) required, got {count}."
                )
            if max_count is not None and count > max_count:
                raise ValueError(
                    f"Only {max_count} {base_class.__name__} step(s) allowed, got {count}."
                )

_validate_steps()

Validate pipeline steps against schema.

Checks uniqueness of names, valid types, and count constraints.

Raises:

Type Description
ValueError

If names are duplicated or counts don’t meet schema.

TypeError

If step type isn’t valid.

Source code in src/urban_mapper/pipeline/validator.py
def _validate_steps(self) -> None:
    """Validate pipeline steps against schema.

    Checks `uniqueness of names`, `valid types`, and `count constraints`.

    Raises:
        ValueError: If names are duplicated or counts don’t meet schema.
        TypeError: If step type isn’t valid.
    """
    step_counts: Dict[Type[Any], int] = {
        cls: 0 for cls in self.pipeline_schema.keys()
    }
    unique_names = set()

    for name, instance in self.steps:
        if name in unique_names:
            raise ValueError(
                f"Duplicate step name '{name}'. Step names must be unique."
            )
        unique_names.add(name)

        cls = instance.__class__
        found = False
        for base_class in self.pipeline_schema.keys():
            if issubclass(cls, base_class):
                step_counts[base_class] += 1
                found = True
                break
        if not found:
            raise TypeError(
                f"Step '{name}' is not an instance of a valid step class."
                f"It is currently of type '{cls.__name__}'. "
                f"Did you forget to call .build() on this step?"
            )

    for base_class, constraints in self.pipeline_schema.items():
        count = step_counts[base_class]
        min_count = constraints["min"]
        max_count = constraints["max"]
        if count < min_count:
            raise ValueError(
                f"At least {min_count} {base_class.__name__} step(s) required, got {count}."
            )
        if max_count is not None and count > max_count:
            raise ValueError(
                f"Only {max_count} {base_class.__name__} step(s) allowed, got {count}."
            )
Fabio, Provost Simon