Skip to content

Loaders

What is the loader module?

The loader module is responsible for loading geospatial data into UrbanMapper. It provides a unified interface for loading various data formats, including shapefiles, parquet, and CSV files with geospatial information.

UrbanMapper steps support using multiple datasets. The user can create multiple loader instances, one for each dataset, combine them in a single dictionary with suitable keys, and use it in your pipeline.

Meanwhile, we recommend to look through the Example's Loader for a more hands-on introduction about the Loader module and its usage.

Documentation Under Alpha Construction

This documentation is in its early stages and still being developed. The API may therefore change, and some parts might be incomplete or inaccurate.

Use at your own risk, and please report anything that seems incorrect / outdated you find.

Open An Issue!

LoaderBase

Bases: ABC

Base Class For Loaders.

This abstract class defines the common interface that all loader implementations must implement. Loaders are responsible for reading spatial data from various file formats and converting them to GeoDataFrames data structure. They handle coordinate system transformations and validation of required spatial columns.

Attributes:

Name Type Description
file_path Path

Path to the file to load.

latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

coordinate_reference_system str

The coordinate reference system to use. Default: EPSG:4326.

additional_loader_parameters Dict[str, Any]

Additional parameters specific to the loader implementation. Consider this as kwargs.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@beartype
class LoaderBase(ABC):
    """Base Class For `Loaders`.

    This abstract class defines the common interface that all loader implementations
    **must implement**. `Loaders` are responsible for reading spatial data from various
    file formats and converting them to `GeoDataFrames` data structure. They handle coordinate system
    transformations and validation of required spatial columns.

    Attributes:
        file_path (Path): Path to the file to load.
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        coordinate_reference_system (str): The coordinate reference system to use. Default: `EPSG:4326`.
        additional_loader_parameters (Dict[str, Any]): Additional parameters specific to the loader implementation. Consider this as `kwargs`.
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        coordinate_reference_system: str = DEFAULT_CRS,
        **additional_loader_parameters: Any,
    ) -> None:
        self.file_path: Path = Path(file_path)
        self.latitude_column: str = latitude_column or ""
        self.longitude_column: str = longitude_column or ""
        self.coordinate_reference_system: str = coordinate_reference_system
        self.additional_loader_parameters: Dict[str, Any] = additional_loader_parameters

    @abstractmethod
    def _load_data_from_file(self) -> Any:
        """Internal implementation method for loading data from a file.

        This method is called by `load_data_from_file()` after validation is performed.

        !!! warning "Method Not Implemented"
            This method must be implemented by subclasses. It should contain the logic
            for reading the file and converting it to a `GeoDataFrame`.

        Returns:
            A `GeoDataFrame` containing the loaded spatial data (expect for the 
            Raster Loader for which two loaders exist : one which return a `GeoDataFrame` 
            and one which return the data in a 3D NumpyArray).

        Raises:
            ValueError: If required columns are missing or the file format is invalid.
            FileNotFoundError: If the file does not exist.
        """
        ...

    @file_exists("file_path")
    @ensure_coordinate_reference_system
    def load_data_from_file(self) -> gpd.GeoDataFrame:
        """Load spatial data from a file.

        This is the main public method for using `loaders`. It performs validation
        on the inputs before delegating to the implementation-specific `_load_data_from_file` method.
        It also ensures the file exists and that the coordinate reference system is properly set.

        Returns:
            A `GeoDataFrame` containing the loaded spatial data.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If required columns are missing or the file format is invalid.

        Examples:
            >>> from urban_mapper.modules.loader import CSVLoader
            >>> loader = CSVLoader("taxi_data.csv", latitude_column="pickup_lat", longitude_column="pickup_lng")
            >>> gdf = loader.load_data_from_file()
        """
        loaded_file = self._load_data_from_file()

        if self.additional_loader_parameters.get("map_columns") is not None:
            loaded_file = loaded_file.rename(
                columns=self.additional_loader_parameters["map_columns"]
            )

        return loaded_file

    @abstractmethod
    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of the instance's `loader`.

        Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

        !!! warning "Method Not Implemented"
            This method must be implemented by subclasses. It should provide a preview
            of the loader's configuration and data. Make sure to support all formats.

        Args:
            format: The output format for the preview. Options include:

                - [x] `ascii`: Text-based format for terminal display
                - [x] `json`: JSON-formatted data for programmatic use

        Returns:
            A representation of the `loader` in the requested format.
            Return type varies based on the format parameter.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        pass
        pass

load_data_from_file()

Load spatial data from a file.

This is the main public method for using loaders. It performs validation on the inputs before delegating to the implementation-specific _load_data_from_file method. It also ensures the file exists and that the coordinate reference system is properly set.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded spatial data.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If required columns are missing or the file format is invalid.

Examples:

>>> from urban_mapper.modules.loader import CSVLoader
>>> loader = CSVLoader("taxi_data.csv", latitude_column="pickup_lat", longitude_column="pickup_lng")
>>> gdf = loader.load_data_from_file()
Source code in src/urban_mapper/modules/loader/abc_loader.py
@file_exists("file_path")
@ensure_coordinate_reference_system
def load_data_from_file(self) -> gpd.GeoDataFrame:
    """Load spatial data from a file.

    This is the main public method for using `loaders`. It performs validation
    on the inputs before delegating to the implementation-specific `_load_data_from_file` method.
    It also ensures the file exists and that the coordinate reference system is properly set.

    Returns:
        A `GeoDataFrame` containing the loaded spatial data.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If required columns are missing or the file format is invalid.

    Examples:
        >>> from urban_mapper.modules.loader import CSVLoader
        >>> loader = CSVLoader("taxi_data.csv", latitude_column="pickup_lat", longitude_column="pickup_lng")
        >>> gdf = loader.load_data_from_file()
    """
    loaded_file = self._load_data_from_file()

    if self.additional_loader_parameters.get("map_columns") is not None:
        loaded_file = loaded_file.rename(
            columns=self.additional_loader_parameters["map_columns"]
        )

    return loaded_file

_load_data_from_file() abstractmethod

Internal implementation method for loading data from a file.

This method is called by load_data_from_file() after validation is performed.

Method Not Implemented

This method must be implemented by subclasses. It should contain the logic for reading the file and converting it to a GeoDataFrame.

Returns:

Type Description
Any

A GeoDataFrame containing the loaded spatial data (expect for the

Any

Raster Loader for which two loaders exist : one which return a GeoDataFrame

Any

and one which return the data in a 3D NumpyArray).

Raises:

Type Description
ValueError

If required columns are missing or the file format is invalid.

FileNotFoundError

If the file does not exist.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@abstractmethod
def _load_data_from_file(self) -> Any:
    """Internal implementation method for loading data from a file.

    This method is called by `load_data_from_file()` after validation is performed.

    !!! warning "Method Not Implemented"
        This method must be implemented by subclasses. It should contain the logic
        for reading the file and converting it to a `GeoDataFrame`.

    Returns:
        A `GeoDataFrame` containing the loaded spatial data (expect for the 
        Raster Loader for which two loaders exist : one which return a `GeoDataFrame` 
        and one which return the data in a 3D NumpyArray).

    Raises:
        ValueError: If required columns are missing or the file format is invalid.
        FileNotFoundError: If the file does not exist.
    """
    ...

preview(format='ascii') abstractmethod

Generate a preview of the instance's loader.

Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

Method Not Implemented

This method must be implemented by subclasses. It should provide a preview of the loader's configuration and data. Make sure to support all formats.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • ascii: Text-based format for terminal display
  • json: JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A representation of the loader in the requested format.

Any

Return type varies based on the format parameter.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@abstractmethod
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of the instance's `loader`.

    Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

    !!! warning "Method Not Implemented"
        This method must be implemented by subclasses. It should provide a preview
        of the loader's configuration and data. Make sure to support all formats.

    Args:
        format: The output format for the preview. Options include:

            - [x] `ascii`: Text-based format for terminal display
            - [x] `json`: JSON-formatted data for programmatic use

    Returns:
        A representation of the `loader` in the requested format.
        Return type varies based on the format parameter.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    pass
    pass

CSVLoader

Bases: LoaderBase

Loader for CSV files containing spatial data.

This loader reads data from CSV (or other delimiter-separated) files and converts them to GeoDataFrames with point geometries. It requires latitude and longitude columns to create point geometries for each row.

Attributes:

Name Type Description
file_path Path

Path to the CSV file to load.

latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

coordinate_reference_system str

The coordinate reference system to use. Default: EPSG:4326

separator str

The delimiter character used in the CSV file. Default: ","

encoding str

The character encoding of the CSV file. Default: "utf-8"

Examples:

>>> from urban_mapper.modules.loader import CSVLoader
>>>
>>> # Basic usage
>>> loader = CSVLoader(
...     file_path="taxi_trips.csv",
...     latitude_column="pickup_lat",
...     longitude_column="pickup_lng"
... )
>>> gdf = loader.load_data_from_file()
>>>
>>> # With custom separator and encoding
>>> loader = CSVLoader(
...     file_path="custom_data.csv",
...     latitude_column="lat",
...     longitude_column="lng",
...     separator=";",
...     encoding="latin-1"
... )
>>> gdf = loader.load_data_from_file()
Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
@beartype
class CSVLoader(LoaderBase):
    """Loader for `CSV` files containing spatial data.

    This loader reads data from `CSV` (or other delimiter-separated) files and
    converts them to `GeoDataFrames` with point geometries. It requires latitude
    and longitude columns to create point geometries for each row.

    Attributes:
        file_path (Path): Path to the `CSV` file to load.
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        coordinate_reference_system (str): The coordinate reference system to use. Default: `EPSG:4326`
        separator (str): The delimiter character used in the CSV file. Default: `","`
        encoding (str): The character encoding of the CSV file. Default: `"utf-8"`

    Examples:
        >>> from urban_mapper.modules.loader import CSVLoader
        >>>
        >>> # Basic usage
        >>> loader = CSVLoader(
        ...     file_path="taxi_trips.csv",
        ...     latitude_column="pickup_lat",
        ...     longitude_column="pickup_lng"
        ... )
        >>> gdf = loader.load_data_from_file()
        >>>
        >>> # With custom separator and encoding
        >>> loader = CSVLoader(
        ...     file_path="custom_data.csv",
        ...     latitude_column="lat",
        ...     longitude_column="lng",
        ...     separator=";",
        ...     encoding="latin-1"
        ... )
        >>> gdf = loader.load_data_from_file()
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        coordinate_reference_system: str = DEFAULT_CRS,
        separator: str = ",",
        encoding: str = "utf-8",
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            file_path=file_path,
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.separator = separator
        self.encoding = encoding

    @require_attributes(["latitude_column", "longitude_column"])
    def _load_data_from_file(self) -> gpd.GeoDataFrame:
        """Load data from a CSV file and convert it to a `GeoDataFrame`.

        This method reads a `CSV` file using pandas, validates the latitude and
        longitude columns, and converts the data to a `GeoDataFrame` with point
        geometries using the specified coordinate reference system.

        Returns:
            A `GeoDataFrame` containing the loaded data with point geometries
            created from the latitude and longitude columns.

        Raises:
            ValueError: If latitude_column or longitude_column is None.
            ValueError: If the specified columns are not found in the CSV file.
            pd.errors.ParserError: If the CSV file cannot be parsed.
            UnicodeDecodeError: If the file encoding is incorrect.
        """
        dataframe = pd.read_csv(
            self.file_path, sep=self.separator, encoding=self.encoding
        )

        if self.latitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.latitude_column}' not found in the CSV file."
            )
        if self.longitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.longitude_column}' not found in the CSV file."
            )

        dataframe[self.latitude_column] = pd.to_numeric(
            dataframe[self.latitude_column], errors="coerce"
        )
        dataframe[self.longitude_column] = pd.to_numeric(
            dataframe[self.longitude_column], errors="coerce"
        )

        geodataframe = gpd.GeoDataFrame(
            dataframe,
            geometry=gpd.points_from_xy(
                dataframe[self.longitude_column],
                dataframe[self.latitude_column],
            ),
            crs=self.coordinate_reference_system,
        )
        return geodataframe

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `CSV` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        if format == "ascii":
            return (
                f"Loader: CSVLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Separator: {self.separator}\n"
                f"  Encoding: {self.encoding}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "CSVLoader",
                "file": self.file_path,
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "separator": self.separator,
                "encoding": self.encoding,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load_data_from_file()

Load data from a CSV file and convert it to a GeoDataFrame.

This method reads a CSV file using pandas, validates the latitude and longitude columns, and converts the data to a GeoDataFrame with point geometries using the specified coordinate reference system.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with point geometries

GeoDataFrame

created from the latitude and longitude columns.

Raises:

Type Description
ValueError

If latitude_column or longitude_column is None.

ValueError

If the specified columns are not found in the CSV file.

ParserError

If the CSV file cannot be parsed.

UnicodeDecodeError

If the file encoding is incorrect.

Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
@require_attributes(["latitude_column", "longitude_column"])
def _load_data_from_file(self) -> gpd.GeoDataFrame:
    """Load data from a CSV file and convert it to a `GeoDataFrame`.

    This method reads a `CSV` file using pandas, validates the latitude and
    longitude columns, and converts the data to a `GeoDataFrame` with point
    geometries using the specified coordinate reference system.

    Returns:
        A `GeoDataFrame` containing the loaded data with point geometries
        created from the latitude and longitude columns.

    Raises:
        ValueError: If latitude_column or longitude_column is None.
        ValueError: If the specified columns are not found in the CSV file.
        pd.errors.ParserError: If the CSV file cannot be parsed.
        UnicodeDecodeError: If the file encoding is incorrect.
    """
    dataframe = pd.read_csv(
        self.file_path, sep=self.separator, encoding=self.encoding
    )

    if self.latitude_column not in dataframe.columns:
        raise ValueError(
            f"Column '{self.latitude_column}' not found in the CSV file."
        )
    if self.longitude_column not in dataframe.columns:
        raise ValueError(
            f"Column '{self.longitude_column}' not found in the CSV file."
        )

    dataframe[self.latitude_column] = pd.to_numeric(
        dataframe[self.latitude_column], errors="coerce"
    )
    dataframe[self.longitude_column] = pd.to_numeric(
        dataframe[self.longitude_column], errors="coerce"
    )

    geodataframe = gpd.GeoDataFrame(
        dataframe,
        geometry=gpd.points_from_xy(
            dataframe[self.longitude_column],
            dataframe[self.latitude_column],
        ),
        crs=self.coordinate_reference_system,
    )
    return geodataframe

preview(format='ascii')

Generate a preview of this CSV loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `CSV` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    if format == "ascii":
        return (
            f"Loader: CSVLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Separator: {self.separator}\n"
            f"  Encoding: {self.encoding}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "CSVLoader",
            "file": self.file_path,
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "separator": self.separator,
            "encoding": self.encoding,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

ParquetLoader

Bases: LoaderBase

Loader for Parquet files containing spatial data.

This loader reads data from Parquet files and converts them to GeoDataFrames with point geometries. It requires latitude and longitude columns to create point geometries for each row.

Attributes:

Name Type Description
file_path Union[str, Path]

Path to the Parquet file to load.

latitude_column Optional[str]

Name of the column containing latitude values. Default: None

longitude_column Optional[str]

Name of the column containing longitude values. Default: None

coordinate_reference_system str

The coordinate reference system to use. Default: EPSG:4326

engine str

The engine to use for reading Parquet files. Default: "pyarrow"

columns Optional[list[str]]

List of columns to read from the Parquet file. Default: None, which reads all columns.

Examples:

>>> from urban_mapper.modules.loader import ParquetLoader
>>>
>>> # Basic usage
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="lat",
...     longitude_column="lon"
... )
>>> gdf = loader.load_data_from_file()
>>>
>>> # With custom columns and engine
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="latitude",
...     longitude_column="longitude",
...     engine="fastparquet",
...     columns=["latitude", "longitude", "value"]
... )
>>> gdf = loader.load_data_from_file()
Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
@beartype
class ParquetLoader(LoaderBase):
    """Loader for `Parquet` files containing spatial data.

    This loader reads data from `Parquet` files and converts them to `GeoDataFrames`
    with point geometries. It requires latitude and longitude columns to create
    point geometries for each row.

    Attributes:
        file_path (Union[str, Path]): Path to the Parquet file to load.
        latitude_column (Optional[str]): Name of the column containing latitude values. Default: `None`
        longitude_column (Optional[str]): Name of the column containing longitude values. Default: `None`
        coordinate_reference_system (str): The coordinate reference system to use. Default: `EPSG:4326`
        engine (str): The engine to use for reading Parquet files. Default: `"pyarrow"`
        columns (Optional[list[str]]): List of columns to read from the Parquet file. Default: `None`, which reads all columns.

    Examples:
        >>> from urban_mapper.modules.loader import ParquetLoader
        >>>
        >>> # Basic usage
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="lat",
        ...     longitude_column="lon"
        ... )
        >>> gdf = loader.load_data_from_file()
        >>>
        >>> # With custom columns and engine
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="latitude",
        ...     longitude_column="longitude",
        ...     engine="fastparquet",
        ...     columns=["latitude", "longitude", "value"]
        ... )
        >>> gdf = loader.load_data_from_file()
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        coordinate_reference_system: str = DEFAULT_CRS,
        engine: str = "pyarrow",
        columns: Optional[list[str]] = None,
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            file_path=file_path,
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.engine = engine
        self.columns = columns

    @require_attributes(["latitude_column", "longitude_column"])
    def _load_data_from_file(self) -> gpd.GeoDataFrame:
        """Load data from a `Parquet` file and convert it to a `GeoDataFrame`.

        This method reads a `Parquet` file using `pandas`, validates the latitude and
        longitude columns, and converts the data to a `GeoDataFrame` with point
        geometries using the specified coordinate reference system.

        Returns:
            A `GeoDataFrame` containing the loaded data with point geometries
            created from the latitude and longitude columns.

        Raises:
            ValueError: If `latitude_column` or `longitude_column` is `None`.
            ValueError: If the specified latitude or longitude columns are not found in the Parquet file.
            IOError: If the Parquet file cannot be read.
        """
        dataframe = pd.read_parquet(
            self.file_path,
            engine=self.engine,
            columns=self.columns,
        )

        if self.latitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.latitude_column}' not found in the Parquet file."
            )
        if self.longitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.longitude_column}' not found in the Parquet file."
            )

        dataframe[self.latitude_column] = pd.to_numeric(
            dataframe[self.latitude_column], errors="coerce"
        )
        dataframe[self.longitude_column] = pd.to_numeric(
            dataframe[self.longitude_column], errors="coerce"
        )

        geodataframe = gpd.GeoDataFrame(
            dataframe,
            geometry=gpd.points_from_xy(
                dataframe[self.longitude_column],
                dataframe[self.latitude_column],
            ),
            crs=self.coordinate_reference_system,
        )
        return geodataframe

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `Parquet` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        cols = self.columns if self.columns else "All columns"

        if format == "ascii":
            return (
                f"Loader: ParquetLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Engine: {self.engine}\n"
                f"  Columns: {cols}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "ParquetLoader",
                "file": self.file_path,
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "engine": self.engine,
                "columns": cols,
                "coordinate_reference_system": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format '{format}'")

_load_data_from_file()

Load data from a Parquet file and convert it to a GeoDataFrame.

This method reads a Parquet file using pandas, validates the latitude and longitude columns, and converts the data to a GeoDataFrame with point geometries using the specified coordinate reference system.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with point geometries

GeoDataFrame

created from the latitude and longitude columns.

Raises:

Type Description
ValueError

If latitude_column or longitude_column is None.

ValueError

If the specified latitude or longitude columns are not found in the Parquet file.

IOError

If the Parquet file cannot be read.

Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
@require_attributes(["latitude_column", "longitude_column"])
def _load_data_from_file(self) -> gpd.GeoDataFrame:
    """Load data from a `Parquet` file and convert it to a `GeoDataFrame`.

    This method reads a `Parquet` file using `pandas`, validates the latitude and
    longitude columns, and converts the data to a `GeoDataFrame` with point
    geometries using the specified coordinate reference system.

    Returns:
        A `GeoDataFrame` containing the loaded data with point geometries
        created from the latitude and longitude columns.

    Raises:
        ValueError: If `latitude_column` or `longitude_column` is `None`.
        ValueError: If the specified latitude or longitude columns are not found in the Parquet file.
        IOError: If the Parquet file cannot be read.
    """
    dataframe = pd.read_parquet(
        self.file_path,
        engine=self.engine,
        columns=self.columns,
    )

    if self.latitude_column not in dataframe.columns:
        raise ValueError(
            f"Column '{self.latitude_column}' not found in the Parquet file."
        )
    if self.longitude_column not in dataframe.columns:
        raise ValueError(
            f"Column '{self.longitude_column}' not found in the Parquet file."
        )

    dataframe[self.latitude_column] = pd.to_numeric(
        dataframe[self.latitude_column], errors="coerce"
    )
    dataframe[self.longitude_column] = pd.to_numeric(
        dataframe[self.longitude_column], errors="coerce"
    )

    geodataframe = gpd.GeoDataFrame(
        dataframe,
        geometry=gpd.points_from_xy(
            dataframe[self.longitude_column],
            dataframe[self.latitude_column],
        ),
        crs=self.coordinate_reference_system,
    )
    return geodataframe

preview(format='ascii')

Generate a preview of this Parquet loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `Parquet` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    cols = self.columns if self.columns else "All columns"

    if format == "ascii":
        return (
            f"Loader: ParquetLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Engine: {self.engine}\n"
            f"  Columns: {cols}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "ParquetLoader",
            "file": self.file_path,
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "engine": self.engine,
            "columns": cols,
            "coordinate_reference_system": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format '{format}'")

ShapefileLoader

Bases: LoaderBase

Loader for shapefiles containing spatial data.

This loader reads data from shapefiles and returns a GeoDataFrame. Shapefiles inherently contain geometry information, so explicit latitude and longitude columns are not required. However, if specified, they can be used; otherwise, representative points are generated.

Representative points are a simplified representation of the geometry, which can be useful for visualisations or when the geometry is complex. The loader will automatically create temporary columns for latitude and longitude if they are not provided or if the specified columns contain only NaN values.

Attributes:

Name Type Description
file_path Union[str, Path]

Path to the shapefile to load.

latitude_column Optional[str]

Name of the column containing latitude values. If not provided or empty, a temporary latitude column is generated from representative points. Default: None

longitude_column Optional[str]

Name of the column containing longitude values. If not provided or empty, a temporary longitude column is generated from representative points. Default: None

coordinate_reference_system str

The coordinate reference system to use. Default: EPSG:4326

Examples:

>>> from urban_mapper.modules.loader import ShapefileLoader
>>>
>>> # Basic usage
>>> loader = ShapefileLoader(
...     file_path="data.shp"
... )
>>> gdf = loader.load_data_from_file()
>>>
>>> # With specified latitude and longitude columns
>>> loader = ShapefileLoader(
...     file_path="data.shp",
...     latitude_column="lat",
...     longitude_column="lon"
... )
>>> gdf = loader.load_data_from_file()
Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
@beartype
class ShapefileLoader(LoaderBase):
    """Loader for `shapefiles` containing spatial data.

    This loader reads data from `shapefiles` and returns a `GeoDataFrame`. Shapefiles
    inherently contain geometry information, so explicit latitude and longitude
    columns are not required. However, if specified, they can be used; otherwise,
    `representative points` are generated.

    `Representative points` are a simplified representation of the geometry, which can be
    useful for visualisations or when the geometry is complex. The loader will
    automatically create temporary columns for latitude and longitude if they are not
    provided or if the specified columns contain only `NaN` values.

    Attributes:
        file_path (Union[str, Path]): Path to the `shapefile` to load.
        latitude_column (Optional[str]): Name of the column containing latitude values. If not provided or empty,
            a temporary latitude column is generated from representative points. Default: `None`
        longitude_column (Optional[str]): Name of the column containing longitude values. If not provided or empty,
            a temporary longitude column is generated from representative points. Default: `None`
        coordinate_reference_system (str): The coordinate reference system to use. Default: `EPSG:4326`

    Examples:
        >>> from urban_mapper.modules.loader import ShapefileLoader
        >>>
        >>> # Basic usage
        >>> loader = ShapefileLoader(
        ...     file_path="data.shp"
        ... )
        >>> gdf = loader.load_data_from_file()
        >>>
        >>> # With specified latitude and longitude columns
        >>> loader = ShapefileLoader(
        ...     file_path="data.shp",
        ...     latitude_column="lat",
        ...     longitude_column="lon"
        ... )
        >>> gdf = loader.load_data_from_file()
    """

    def _load_data_from_file(self) -> gpd.GeoDataFrame:
        """Load data from a shapefile and return a `GeoDataFrame`.

        This method reads a `shapefile` using geopandas, ensures it has a geometry column,
        reprojects it to the specified `CRS` if necessary, and handles latitude and
        longitude columns. If latitude and longitude columns are not provided or are
        empty, it generates temporary columns using `representative points` of the geometries.

        Returns:
            A `GeoDataFrame` containing the loaded data with geometries and
            latitude/longitude columns as specified or generated.

        Raises:
            ValueError: If no geometry column is found in the shapefile.
            Exception: If the shapefile cannot be read (e.g., file not found or invalid format).
        """
        gdf = gpd.read_file(self.file_path)

        if "geometry" not in gdf.columns:
            raise ValueError(
                "No geometry column found in shapefile. "
                "Standard shapefile format requires a geometry column."
            )

        if gdf.crs.to_string() != self.coordinate_reference_system:
            gdf = gdf.to_crs(self.coordinate_reference_system)

        if (
            not self.latitude_column
            or not self.longitude_column
            or gdf[self.latitude_column].isna().all()
            or gdf[self.longitude_column].isna().all()
        ):
            gdf["representative_points"] = gdf.geometry.representative_point()
            gdf["temporary_longitude"] = gdf["representative_points"].x
            gdf["temporary_latitude"] = gdf["representative_points"].y
            self.latitude_column = "temporary_latitude"
            self.longitude_column = "temporary_longitude"

        return gdf

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `CSV` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        lat_col = self.latitude_column or "temporary_latitude (generated)"
        lon_col = self.longitude_column or "temporary_longitude (generated)"

        if format == "ascii":
            return (
                f"Loader: ShapefileLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {lat_col}\n"
                f"  Longitude Column: {lon_col}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "ShapefileLoader",
                "file": self.file_path,
                "latitude_column": lat_col,
                "longitude_column": lon_col,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load_data_from_file()

Load data from a shapefile and return a GeoDataFrame.

This method reads a shapefile using geopandas, ensures it has a geometry column, reprojects it to the specified CRS if necessary, and handles latitude and longitude columns. If latitude and longitude columns are not provided or are empty, it generates temporary columns using representative points of the geometries.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with geometries and

GeoDataFrame

latitude/longitude columns as specified or generated.

Raises:

Type Description
ValueError

If no geometry column is found in the shapefile.

Exception

If the shapefile cannot be read (e.g., file not found or invalid format).

Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
def _load_data_from_file(self) -> gpd.GeoDataFrame:
    """Load data from a shapefile and return a `GeoDataFrame`.

    This method reads a `shapefile` using geopandas, ensures it has a geometry column,
    reprojects it to the specified `CRS` if necessary, and handles latitude and
    longitude columns. If latitude and longitude columns are not provided or are
    empty, it generates temporary columns using `representative points` of the geometries.

    Returns:
        A `GeoDataFrame` containing the loaded data with geometries and
        latitude/longitude columns as specified or generated.

    Raises:
        ValueError: If no geometry column is found in the shapefile.
        Exception: If the shapefile cannot be read (e.g., file not found or invalid format).
    """
    gdf = gpd.read_file(self.file_path)

    if "geometry" not in gdf.columns:
        raise ValueError(
            "No geometry column found in shapefile. "
            "Standard shapefile format requires a geometry column."
        )

    if gdf.crs.to_string() != self.coordinate_reference_system:
        gdf = gdf.to_crs(self.coordinate_reference_system)

    if (
        not self.latitude_column
        or not self.longitude_column
        or gdf[self.latitude_column].isna().all()
        or gdf[self.longitude_column].isna().all()
    ):
        gdf["representative_points"] = gdf.geometry.representative_point()
        gdf["temporary_longitude"] = gdf["representative_points"].x
        gdf["temporary_latitude"] = gdf["representative_points"].y
        self.latitude_column = "temporary_latitude"
        self.longitude_column = "temporary_longitude"

    return gdf

preview(format='ascii')

Generate a preview of this CSV loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `CSV` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    lat_col = self.latitude_column or "temporary_latitude (generated)"
    lon_col = self.longitude_column or "temporary_longitude (generated)"

    if format == "ascii":
        return (
            f"Loader: ShapefileLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {lat_col}\n"
            f"  Longitude Column: {lon_col}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "ShapefileLoader",
            "file": self.file_path,
            "latitude_column": lat_col,
            "longitude_column": lon_col,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

LoaderFactory

Factory class for creating and configuring data loaders.

This class implements a fluent chaining methods-based interface for creating and configuring data loaders.

The factory manages the details of loader instantiation, coordinate reference system conversion, column mapping, and other data loading concerns, providing a consistent interface regardless of the underlying data source.

Attributes:

Name Type Description
source_type Optional[str]

The type of data source ("file" or "dataframe").

source_data Optional[Union[str, DataFrame, GeoDataFrame]]

The actual data source (file path or dataframe).

latitude_column Optional[str]

The name of the column containing latitude values.

longitude_column Optional[str]

The name of the column containing longitude values.

crs str

The coordinate reference system to use for the loaded data.

_instance Optional[LoaderBase]

The underlying loader instance (internal use only).

_preview Optional[dict]

Preview configuration (internal use only).

Examples:

>>> from urban_mapper import UrbanMapper
>>> 
>>> # Initialise UrbanMapper
>>> mapper = UrbanMapper()
>>> 
>>> # Load data from a CSV file with coordinate columns
>>> gdf = (
...         mapper.loader\
...         .from_file("your_file_path.csv")\
...         .with_columns(longitude_column="lon", latitude_column="lat")\
...         .load()
...     )
>>>
>>> # Load data from a GeoDataFrame
>>> import geopandas as gpd
>>> existing_data = gpd.read_file("data/some_shapefile.shp")
>>> gdf = mapper.loader.from_dataframe(existing_data).load() # Concise inline manner
Source code in src/urban_mapper/modules/loader/loader_factory.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
@beartype
class LoaderFactory:
    """Factory class for creating and configuring data loaders.

    This class implements a fluent chaining methods-based interface for creating and configuring data loaders.

    The factory manages the details of `loader instantiation`, `coordinate reference system`
    conversion, `column mapping`, and other data loading concerns, providing a consistent
    interface regardless of the underlying data source.

    Attributes:
        source_type: The type of data source ("file" or "dataframe").
        source_data: The actual data source (file path or dataframe).
        latitude_column: The name of the column containing latitude values.
        longitude_column: The name of the column containing longitude values.
        crs: The coordinate reference system to use for the loaded data.
        _instance: The underlying loader instance (internal use only).
        _preview: Preview configuration (internal use only).

    Examples:
        >>> from urban_mapper import UrbanMapper
        >>> 
        >>> # Initialise UrbanMapper
        >>> mapper = UrbanMapper()
        >>> 
        >>> # Load data from a CSV file with coordinate columns
        >>> gdf = (
        ...         mapper.loader\\
        ...         .from_file("your_file_path.csv")\\
        ...         .with_columns(longitude_column="lon", latitude_column="lat")\\
        ...         .load()
        ...     )
        >>>
        >>> # Load data from a GeoDataFrame
        >>> import geopandas as gpd
        >>> existing_data = gpd.read_file("data/some_shapefile.shp")
        >>> gdf = mapper.loader.from_dataframe(existing_data).load() # Concise inline manner
    """

    def __init__(self):
        self.source_type: Optional[str] = None
        self.source_data: Optional[Union[str, pd.DataFrame, gpd.GeoDataFrame]] = None
        self.latitude_column: Optional[str] = None
        self.longitude_column: Optional[str] = None
        self.map_columns: Optional[Dict[str, str]] = None
        self.crs: str = DEFAULT_CRS
        self._instance: Optional[LoaderBase] = None
        self._preview: Optional[dict] = None
        self.options = {}

    @reset_attributes_before(
        ["source_type", "source_data", "latitude_column", "longitude_column"]
    )
    def from_file(self, file_path: str) -> "LoaderFactory":
        """Configure the factory to load data from a file.

        This method sets up the factory to load data from a file path. The file format
        is determined by the file extension. Supported formats include `CSV`, `shapefile`,
        and `Parquet`. 

        Args:
            file_path: Path to the data file to load.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")
            >>> # Next steps would typically be to call with_columns() and load()
        """
        self.source_type = "file"
        self.latitude_column = None
        self.longitude_column = None
        self.map_columns = None
        self.source_data = file_path
        logger.log(
            "DEBUG_LOW",
            f"FROM_FILE: Initialised LoaderFactory with file_path={file_path}",
        )
        return self

    def from_dataframe(
        self, dataframe: Union[pd.DataFrame, gpd.GeoDataFrame]
    ) -> "LoaderFactory":
        """Configure the factory to load data from an existing dataframe.

        This method sets up the factory to load data from a pandas `DataFrame` or
        geopandas `GeoDataFrame`. For `DataFrames` without geometry, you will need
        to call `with_columns()` to specify the latitude and longitude columns.

        Args:
            dataframe: The pandas DataFrame or geopandas GeoDataFrame to load.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> import pandas as pd
            >>> df = pd.read_csv("data/points.csv")
            >>> loader = mapper.loader.from_dataframe(df)
            >>> # For regular DataFrames, you must specify coordinate columns:
            >>> loader.with_columns(longitude_column="lon", latitude_column="lat")
        """
        self.source_type = "dataframe"
        self.source_data = dataframe
        self.latitude_column = "None"
        self.longitude_column = "None"
        self.map_columns = "None"
        logger.log(
            "DEBUG_LOW",
            f"FROM_DATAFRAME: Initialised LoaderFactory with dataframe={dataframe}",
        )
        return self

    def _build_dataset_dict(self, limit: Optional[int] = None):
        all_datasets = [
            dataset.id
            for dataset in (
                huggingface_hub.list_datasets(limit=limit)
                if limit
                else huggingface_hub.list_datasets()
            )
        ]
        dataset_dict = defaultdict(list)
        for dataset_id in all_datasets:
            if "/" in dataset_id:
                repo_name, dataset_name = dataset_id.split("/", 1)
                dataset_dict[repo_name].append(dataset_name)
        return dataset_dict

    def from_huggingface(
        self,
        repo_id: str,
        number_of_rows: Optional[int] = None,
        streaming: Optional[bool] = False,
        debug_limit_list_datasets: Optional[int] = None,
    ) -> "LoaderFactory":
        """
        Load a dataset from `Hugging Face's Hub` using the `datasets` library.

        !!! info "What Are Hugging Face Datasets?"
            πŸ€— **Hugging Face Datasets** is your gateway to a vast list of datasets tailored for various application domains
            such as urban computing. In a nuthsell, this library simplifies data access, letting you load datasets
            with a single line of code.

            **How to Find and Use Datasets**: Head to the [Hugging Face Datasets Hub](https://huggingface.co/datasets),
            where you can search anything you like (e.g., "PLUTO" for NYC buildings information).

            For `from_huggingface`, you need the `repo_id` of the dataset you want to load. To find the `repo_id`, look for the
            `<namespace>/<dataset_name>` format in each card displaying / dataset's URL.
            For example, click on one of the card / dataset of interest, and lookup for the website's URL. E.g. `https://huggingface.co/datasets/oscur/pluto`,
            the `repo_id` is `oscur/pluto`. The `namespace` is the organisation or user who created the dataset,
            and the `dataset_name` is the specific dataset name.
            In this case, `oscur` is the namespace and `pluto` is the dataset name.

        !!! success "OSCUR: Pioneering Urban Science"
            🌍 **OSCUR** (Open-Source Cyberinfrastructure for Urban Computing) integrates tools for data exploration,
            analytics, and machine learning, all while fostering a collaborative community to advance urban science.

            All datasets used by any of the initiatives under OSCUR are open-source and available on Hugging Face
            Datasets Hub. As `UrbanMapper` is one of the initiatives under OSCUR, all datasets throughout our examples
            and case studies are available under the `oscur` namespace.

            Feel free to explore our datasets, at [https://huggingface.co/oscur](https://huggingface.co/oscur).

            Load them easily:
            ```python
            loader = mapper.loader.from_huggingface("oscur/taxisvis1M")
            ```

            Dive deeper at [oscur.org](https://oscur.org/) for other open-source initiatives and tools.

        !!! warning "Potential Errors Explained"
            Mistakes happenβ€”here’s what might go wrong and how we help:

            If `repo_id` is invalid, a `ValueError` pops up with smart suggestions powered by
            [TheFuzz](https://github.com/seatgeek/thefuzz), a fuzzy matching library. We compare your input to
            existing datasets and offer the closest matches:

            - **No Slash (e.g., `plutoo`)**: Assumes it’s a dataset name and suggests full `repo_id`s (e.g., `oscur/pluto`). Or closest matches.
            - **Bad Namespace (e.g., `oscurq/pluto`)**: If the namespace doesn’t exist, we suggest similar ones (e.g., `oscur`).
            - **Bad Dataset Name (e.g., `oscur/plutoo`)**: If the namespace is valid but the dataset isn’t, we suggest close matches.

            Errors come with contextβ€”like available datasets in a namespaceβ€”so you can fix it fast.

        Args:
            repo_id (str): The dataset repository ID on Hugging Face.
            number_of_rows (Optional[int]): Number of rows to load. Defaults to None.
            streaming (Optional[bool]): Whether to use streaming mode. Defaults to False.
            debug_limit_list_datasets (Optional[int]): Limit on datasets fetched for error handling. Defaults to None.

        Returns:
            LoaderFactory: The updated LoaderFactory instance for method chaining.

        Raises:
            ValueError: If the dataset cannot be loaded due to an invalid `repo_id` or other issues.

        Examples:
            >>> # Load a full dataset
            >>> loader = mapper.loader.from_huggingface("oscur/pluto")
            >>> gdf = loader.load()
            >>> print(gdf.head())  # Next steps: analyze or visualize the data

            >>> # Load 500 rows with streaming (i.e without loading the entire dataset)
            >>> loader = mapper.loader.from_huggingface("oscur/NYC_311", number_of_rows=500, streaming=True)
            >>> gdf = loader.load()
            >>> print(gdf.head())  # Next steps: process the loaded subset

            >>> # Load 1000 rows without streaming
            >>> loader = mapper.loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000)
            >>> gdf = loader.load()
            >>> print(gdf.head())  # Next steps: explore the sliced data

            >>> # Handle typo in namespace
            >>> try:
            ...     loader = mapper.loader.from_huggingface("oscurq/pluto")
            ... except ValueError as e:
            ...     print(e)
            ValueError: The repository 'oscurq' does not exist on Hugging Face. Maybe you meant one of these:
            - oscur (similarity: 90%)
            - XXX (similarity: 85%)

            >>> # Handle typo in dataset name
            >>> try:
            ...     loader = mapper.loader.from_huggingface("oscur/plutoo")
            ... except ValueError as e:
            ...     print(e)
            ValueError: The dataset 'plutoo' does not exist in repository 'oscur'. Maybe you meant one of these:
            - oscur/pluto (similarity: 90%)
            - XXX (similarity: 80%)

            >>> # Handle input without namespace
            >>> try:
            ...     loader = mapper.loader.from_huggingface("plutoo")
            ... except ValueError as e:
            ...     print(e)
            ValueError: The dataset 'plutoo' does not exist on Hugging Face. Maybe you meant one of these:
            - oscur/pluto (similarity: 90%)
            - XXX (similarity: 85%)

        """
        self.source_type = "huggingface"
        try:
            if number_of_rows:
                if streaming:
                    # Use streaming mode to fetch only the required rows
                    dataset = datasets.load_dataset(
                        repo_id, split="train", streaming=True
                    )
                    limited_rows = list(islice(dataset, number_of_rows))
                    self.source_data = pd.DataFrame(limited_rows)
                    logger.log(
                        "DEBUG_LOW",
                        f"Loaded {number_of_rows} rows in streaming mode from {repo_id}.",
                    )
                else:
                    # Use slicing with split for non-streaming mode
                    dataset = datasets.load_dataset(
                        repo_id, split=f"train[:{number_of_rows}]"
                    )
                    self.source_data = pd.DataFrame(dataset)
                    logger.log(
                        "DEBUG_LOW", f"Loaded {number_of_rows} rows from {repo_id}."
                    )
            else:
                dataset = datasets.load_dataset(repo_id, split="train")
                self.source_data = pd.DataFrame(dataset)
                logger.log("DEBUG_LOW", f"Loaded dataset {repo_id}.")

        except datasets.exceptions.DatasetNotFoundError as e:
            dataset_dict = self._build_dataset_dict(limit=debug_limit_list_datasets)
            if "/" not in repo_id:
                all_datasets = [
                    f"{repo}/{ds}"
                    for repo, ds_list in dataset_dict.items()
                    for ds in ds_list
                ]
                matches = process.extract(
                    repo_id,
                    all_datasets,
                    processor=lambda x: x.split("/")[-1] if "/" in x else x,
                )
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{match} (similarity: {score}%)" for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The dataset '{repo_id}' does not exist on Hugging Face. "
                    f"Please verify the dataset ID.{suggestion_text}"
                ) from e
            else:
                repo_name, dataset_name = repo_id.split("/", 1)
                if repo_name not in dataset_dict:
                    all_repos = list(dataset_dict.keys())
                    matches = process.extract(repo_name, all_repos, limit=1000)
                    filtered_matches = [
                        (match, score) for match, score in matches if score > 80
                    ]
                    top_matches = filtered_matches[:10]
                    suggestions = [
                        f"{match} (similarity: {score}%)"
                        for match, score in top_matches
                    ]
                    suggestion_text = (
                        " Maybe you meant one of these:\n" + "\n".join(suggestions)
                        if suggestions
                        else ""
                    )
                    raise ValueError(
                        f"The repository '{repo_name}' does not exist on Hugging Face. "
                        f"Please verify the repository name.{suggestion_text}"
                    ) from e
                else:
                    available_datasets = dataset_dict[repo_name]
                    matches = process.extract(
                        dataset_name, available_datasets, limit=None
                    )
                    filtered_matches = [
                        (match, score) for match, score in matches if score > 80
                    ]
                    top_matches = filtered_matches[:10]
                    suggestions = [
                        f"{repo_name}/{match} (similarity: {score}%)"
                        for match, score in top_matches
                    ]
                    suggestion_text = (
                        " Maybe you meant one of these:\n" + "\n".join(suggestions)
                        if suggestions
                        else ""
                    )
                    raise ValueError(
                        f"The dataset '{dataset_name}' does not exist in repository '{repo_name}'. "
                        f"Available datasets: {', '.join(available_datasets)}.{suggestion_text}"
                    ) from e

        except Exception as e:
            raise ValueError(f"Error loading dataset '{repo_id}': {str(e)}") from e

        self.latitude_column = "None"
        self.longitude_column = "None"
        self.map_columns = "None"
        logger.log(
            "DEBUG_LOW",
            f"FROM_HUGGINGFACE: Loaded dataset {repo_id} with "
            f"{'all rows' if number_of_rows is None else number_of_rows} rows "
            f"{'(streaming mode)' if streaming else '(non-streaming mode)'}.",
        )
        return self

    def with_columns(
        self,
        longitude_column: str,
        latitude_column: str,
    ) -> "LoaderFactory":
        """Specify the latitude and longitude columns in the data source.

        This method configures which columns in the data source contain the latitude
        and longitude coordinates. This is required for `CSV` and `Parquet` files, as well
        as for `pandas DataFrames` without geometry.

        Args:
            longitude_column: Name of the column containing longitude values.
            latitude_column: Name of the column containing latitude values.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")
        """
        self.latitude_column = latitude_column
        self.longitude_column = longitude_column
        logger.log(
            "DEBUG_LOW",
            f"WITH_COLUMNS: Initialised LoaderFactory "
            f"with latitude_column={latitude_column} and longitude_column={longitude_column}",
        )
        return self

    def with_crs(self, crs: str = DEFAULT_CRS) -> "LoaderFactory":
        """Specify the coordinate reference system for the loaded data.

        This method configures the `coordinate reference system (CRS)` to use for the loaded
        data. If the source data already has a `CRS`, it will be converted to the specified `CRS`.

        Args:
            crs: The coordinate reference system to use, in any format accepted by geopandas
                (default: `EPSG:4326`, which is standard `WGS84` coordinates).

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
        """
        self.crs = crs
        logger.log(
            "DEBUG_LOW",
            f"WITH_CRS: Initialised LoaderFactory with crs={crs}",
        )
        return self

    def with_map(
        self,
        map_columns: Dict[str, str],
    ) -> "LoaderFactory":
        """Specify a set of source-target to map column names.

        This method configures which columns in the data source should have column names changed.

        Args:
            map_columns: dictionary with source-target (key-value) columns to map from source to target names.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_map(map_columns={"long": "longitude", "lat": "latitude"})
        """
        self.map_columns = map_columns
        logger.log(
            "DEBUG_LOW",
            f"WITH_MAP: Initialised LoaderFactory with map_columns={map_columns}",
        )
        return self

    def with_options(self, **options,) -> "LoaderFactory":
        """
        Set additional key-value options to configure loader behavior.

        This method allows you to specify arbitrary configuration options, such as block size, resolution, or other loader parameters. These options will be forwarded to the loader upon instantiation.

        Args:
            **options: Arbitrary keyword arguments representing loader configuration options.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/raster.tif")\
            ...     .with_options(block_size=10, use_polygons=True)
        """
        self.options.update(options)
        logger.log(
            "DEBUG_LOW",
            f"WITH_OPTIONS: Updated LoaderFactory with options={options}",
        )
        return self


    def _load_from_file(self, coordinate_reference_system: str):
        file_path: str = self.source_data
        file_ext = Path(file_path).suffix.lower()
        loader_class = FILE_LOADER_FACTORY[file_ext]["class"]
        self._instance = loader_class(
            file_path,
            latitude_column=self.latitude_column,
            longitude_column=self.longitude_column,
            coordinate_reference_system=coordinate_reference_system,
            map_columns=self.map_columns,
            **self.options
        )

        return self._instance._load_data_from_file()

    def _load_from_dataframe(
        self, coordinate_reference_system: str
    ) -> gpd.GeoDataFrame:
        input_dataframe: Union[pd.DataFrame, gpd.GeoDataFrame] = self.source_data
        if isinstance(input_dataframe, gpd.GeoDataFrame):
            geo_dataframe: gpd.GeoDataFrame = input_dataframe.copy()
        else:
            geo_dataframe = gpd.GeoDataFrame(
                input_dataframe,
                geometry=gpd.points_from_xy(
                    input_dataframe[self.longitude_column],
                    input_dataframe[self.latitude_column],
                ),
                crs=coordinate_reference_system,
            )
        if geo_dataframe.crs is None:
            geo_dataframe.set_crs(coordinate_reference_system, inplace=True)
        elif geo_dataframe.crs.to_string() != coordinate_reference_system:
            geo_dataframe = geo_dataframe.to_crs(coordinate_reference_system)
        if self.map_columns is not None and self.map_columns != "None":
            geo_dataframe = geo_dataframe.rename(columns=self.map_columns)

        return geo_dataframe

    @require_attributes(["source_type", "source_data"])
    def load(self, coordinate_reference_system: str = DEFAULT_CRS):
        """Load the data and return it as a `GeoDataFrame` or raster object.

        This method loads the data from the configured source and returns it as a
        geopandas `GeoDataFrame`. It handles the details of loading from different
        source types and formats.

        Args:
            coordinate_reference_system: The coordinate reference system to use for the
                loaded data (default: "EPSG:4326", which is standard WGS84 coordinates).

        Returns:
            A GeoDataFrame containing the loaded data.

        Raises:
            ValueError: If the source type is invalid, the file format is unsupported,
                or required parameters (like latitude/longitude columns) are missing.

        Examples:
            >>> # Load CSV data
            >>> gdf = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .load()
            >>> 
            >>> # Load shapefile data
            >>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
        """
        if self.source_type == "file":
            file_ext = Path(self.source_data).suffix.lower()
            if file_ext not in FILE_LOADER_FACTORY:
                raise ValueError(f"Unsupported file format: {file_ext}")
            loader_info = FILE_LOADER_FACTORY[file_ext]
            if loader_info["requires_columns"] and (
                self.latitude_column is None or self.longitude_column is None
            ):
                raise ValueError(
                    f"Loader for {file_ext} requires latitude and longitude columns. Call with_columns() first."
                )
            loaded_data = self._load_from_file(coordinate_reference_system)
            if self._preview is not None:
                self.preview(format=self._preview["format"])
            return loaded_data 
        elif self.source_type == "dataframe":
            if self.latitude_column == "None" or self.longitude_column == "None":
                raise ValueError(
                    "DataFrame loading requires latitude and longitude columns. Call with_columns() with valid column names."
                )
            loaded_data = self._load_from_dataframe(coordinate_reference_system)
            if self._preview is not None:
                logger.log(
                    "DEBUG_LOW",
                    "Note: Preview is not supported for DataFrame sources.",
                )
            return loaded_data
        elif self.source_type == "huggingface":
            if self.latitude_column == "None" or self.longitude_column == "None":
                raise ValueError(
                    "Hugging Face dataset loading requires latitude and longitude columns. "
                    "Call with_columns() with valid column names."
                )
            loaded_data = self._load_from_dataframe(coordinate_reference_system)
            if self._preview is not None:
                logger.log(
                    "DEBUG_LOW",
                    "Note: Preview is not supported for DataFrame sources.",
                )
            return loaded_data
        else:
            raise ValueError("Invalid source type.")

    def build(self) -> LoaderBase:
        """Build and return a `loader` instance without loading the data.

        This method creates and returns a loader instance without immediately loading
        the data. It is primarily intended for use in the `UrbanPipeline`, where the
        actual loading is deferred until pipeline execution.

        Returns:
            A LoaderBase instance configured to load the data when needed.

        Raises:
            ValueError: If the source type is not supported, the file format is unsupported,
                or required parameters (like latitude/longitude columns) are missing.

        Note:
            For most use cases outside of pipelines, using load() is preferred as it
            directly returns the loaded data.

        Examples:
            >>> # Creating a pipeline component
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .build()
            >>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
        """
        logger.log(
            "DEBUG_MID",
            "WARNING: build() should only be used in UrbanPipeline. "
            "In other cases, using .load() is a better option.",
        )
        if self.source_type != "file":
            raise ValueError("Build only supports file sources for now.")
        file_ext = Path(self.source_data).suffix.lower()
        if file_ext not in FILE_LOADER_FACTORY:
            raise ValueError(f"Unsupported file format: {file_ext}")
        loader_info = FILE_LOADER_FACTORY[file_ext]
        loader_class = loader_info["class"]
        requires_columns = loader_info["requires_columns"]
        if requires_columns and (
            self.latitude_column is None or self.longitude_column is None
        ):
            raise ValueError(
                f"Loader for {file_ext} requires latitude and longitude columns. Call with_columns() first."
            )
        self._instance = loader_class(
            file_path=self.source_data,
            latitude_column=self.latitude_column,
            longitude_column=self.longitude_column,
            coordinate_reference_system=self.crs,
            map_columns=self.map_columns,
        )
        if self._preview is not None:
            self.preview(format=self._preview["format"])
        return self._instance

    def preview(self, format="ascii") -> None:
        """Display a preview of the `loader` configuration and settings.

        This method generates and displays a preview of the `loader`, showing its
        `configuration`, `settings`, and `other metadata`. The preview can be displayed
        in different formats.

        Args:
            format: The format to display the preview in (default: "ascii").

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Raises:
            ValueError: If an unsupported format is specified.

        Note:
            This method requires a loader instance to be available. Call load()
            or build() first to create an instance.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")
            >>> # Preview after loading data
            >>> loader.load()
            >>> loader.preview()
            >>> # Or JSON format
            >>> loader.preview(format="json")
        """
        if self._instance is None:
            logger.log(
                "DEBUG_LOW",
                "No loader instance available to preview. Call load() first.",
            )
            return

        if hasattr(self._instance, "preview"):
            preview_data = self._instance.preview(format=format)
            if format == "ascii":
                print(preview_data)
            elif format == "json":
                print(json.dumps(preview_data, indent=2))
            else:
                raise ValueError(f"Unsupported format '{format}'.")
        else:
            logger.log("DEBUG_LOW", "Preview not supported for this loader's instance.")

    def with_preview(self, format="ascii") -> "LoaderFactory":
        """Configure the factory to display a preview after loading or building.

        This method configures the factory to automatically display a preview after
        loading data with `load()` or building a loader with `build()`. It's a convenient
        way to inspect the loader configuration and the loaded data.

        Args:
            format: The format to display the preview in (default: "ascii").

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> # Auto-preview after loading
            >>> gdf = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .with_preview(format="json")\
            ...     .load()
        """
        self._preview = {
            "format": format,
        }
        return self

from_file(file_path)

Configure the factory to load data from a file.

This method sets up the factory to load data from a file path. The file format is determined by the file extension. Supported formats include CSV, shapefile, and Parquet.

Parameters:

Name Type Description Default
file_path str

Path to the data file to load.

required

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")
>>> # Next steps would typically be to call with_columns() and load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
@reset_attributes_before(
    ["source_type", "source_data", "latitude_column", "longitude_column"]
)
def from_file(self, file_path: str) -> "LoaderFactory":
    """Configure the factory to load data from a file.

    This method sets up the factory to load data from a file path. The file format
    is determined by the file extension. Supported formats include `CSV`, `shapefile`,
    and `Parquet`. 

    Args:
        file_path: Path to the data file to load.

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")
        >>> # Next steps would typically be to call with_columns() and load()
    """
    self.source_type = "file"
    self.latitude_column = None
    self.longitude_column = None
    self.map_columns = None
    self.source_data = file_path
    logger.log(
        "DEBUG_LOW",
        f"FROM_FILE: Initialised LoaderFactory with file_path={file_path}",
    )
    return self

from_dataframe(dataframe)

Configure the factory to load data from an existing dataframe.

This method sets up the factory to load data from a pandas DataFrame or geopandas GeoDataFrame. For DataFrames without geometry, you will need to call with_columns() to specify the latitude and longitude columns.

Parameters:

Name Type Description Default
dataframe Union[DataFrame, GeoDataFrame]

The pandas DataFrame or geopandas GeoDataFrame to load.

required

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> import pandas as pd
>>> df = pd.read_csv("data/points.csv")
>>> loader = mapper.loader.from_dataframe(df)
>>> # For regular DataFrames, you must specify coordinate columns:
>>> loader.with_columns(longitude_column="lon", latitude_column="lat")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def from_dataframe(
    self, dataframe: Union[pd.DataFrame, gpd.GeoDataFrame]
) -> "LoaderFactory":
    """Configure the factory to load data from an existing dataframe.

    This method sets up the factory to load data from a pandas `DataFrame` or
    geopandas `GeoDataFrame`. For `DataFrames` without geometry, you will need
    to call `with_columns()` to specify the latitude and longitude columns.

    Args:
        dataframe: The pandas DataFrame or geopandas GeoDataFrame to load.

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> import pandas as pd
        >>> df = pd.read_csv("data/points.csv")
        >>> loader = mapper.loader.from_dataframe(df)
        >>> # For regular DataFrames, you must specify coordinate columns:
        >>> loader.with_columns(longitude_column="lon", latitude_column="lat")
    """
    self.source_type = "dataframe"
    self.source_data = dataframe
    self.latitude_column = "None"
    self.longitude_column = "None"
    self.map_columns = "None"
    logger.log(
        "DEBUG_LOW",
        f"FROM_DATAFRAME: Initialised LoaderFactory with dataframe={dataframe}",
    )
    return self

from_huggingface(repo_id, number_of_rows=None, streaming=False, debug_limit_list_datasets=None)

Load a dataset from Hugging Face's Hub using the datasets library.

What Are Hugging Face Datasets?

πŸ€— Hugging Face Datasets is your gateway to a vast list of datasets tailored for various application domains such as urban computing. In a nuthsell, this library simplifies data access, letting you load datasets with a single line of code.

How to Find and Use Datasets: Head to the Hugging Face Datasets Hub, where you can search anything you like (e.g., "PLUTO" for NYC buildings information).

For from_huggingface, you need the repo_id of the dataset you want to load. To find the repo_id, look for the <namespace>/<dataset_name> format in each card displaying / dataset's URL. For example, click on one of the card / dataset of interest, and lookup for the website's URL. E.g. https://huggingface.co/datasets/oscur/pluto, the repo_id is oscur/pluto. The namespace is the organisation or user who created the dataset, and the dataset_name is the specific dataset name. In this case, oscur is the namespace and pluto is the dataset name.

OSCUR: Pioneering Urban Science

🌍 OSCUR (Open-Source Cyberinfrastructure for Urban Computing) integrates tools for data exploration, analytics, and machine learning, all while fostering a collaborative community to advance urban science.

All datasets used by any of the initiatives under OSCUR are open-source and available on Hugging Face Datasets Hub. As UrbanMapper is one of the initiatives under OSCUR, all datasets throughout our examples and case studies are available under the oscur namespace.

Feel free to explore our datasets, at https://huggingface.co/oscur.

Load them easily:

loader = mapper.loader.from_huggingface("oscur/taxisvis1M")

Dive deeper at oscur.org for other open-source initiatives and tools.

Potential Errors Explained

Mistakes happenβ€”here’s what might go wrong and how we help:

If repo_id is invalid, a ValueError pops up with smart suggestions powered by TheFuzz, a fuzzy matching library. We compare your input to existing datasets and offer the closest matches:

  • No Slash (e.g., plutoo): Assumes it’s a dataset name and suggests full repo_ids (e.g., oscur/pluto). Or closest matches.
  • Bad Namespace (e.g., oscurq/pluto): If the namespace doesn’t exist, we suggest similar ones (e.g., oscur).
  • Bad Dataset Name (e.g., oscur/plutoo): If the namespace is valid but the dataset isn’t, we suggest close matches.

Errors come with contextβ€”like available datasets in a namespaceβ€”so you can fix it fast.

Parameters:

Name Type Description Default
repo_id str

The dataset repository ID on Hugging Face.

required
number_of_rows Optional[int]

Number of rows to load. Defaults to None.

None
streaming Optional[bool]

Whether to use streaming mode. Defaults to False.

False
debug_limit_list_datasets Optional[int]

Limit on datasets fetched for error handling. Defaults to None.

None

Returns:

Name Type Description
LoaderFactory LoaderFactory

The updated LoaderFactory instance for method chaining.

Raises:

Type Description
ValueError

If the dataset cannot be loaded due to an invalid repo_id or other issues.

Examples:

>>> # Load a full dataset
>>> loader = mapper.loader.from_huggingface("oscur/pluto")
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: analyze or visualize the data
>>> # Load 500 rows with streaming (i.e without loading the entire dataset)
>>> loader = mapper.loader.from_huggingface("oscur/NYC_311", number_of_rows=500, streaming=True)
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: process the loaded subset
>>> # Load 1000 rows without streaming
>>> loader = mapper.loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000)
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: explore the sliced data
>>> # Handle typo in namespace
>>> try:
...     loader = mapper.loader.from_huggingface("oscurq/pluto")
... except ValueError as e:
...     print(e)
ValueError: The repository 'oscurq' does not exist on Hugging Face. Maybe you meant one of these:
- oscur (similarity: 90%)
- XXX (similarity: 85%)
>>> # Handle typo in dataset name
>>> try:
...     loader = mapper.loader.from_huggingface("oscur/plutoo")
... except ValueError as e:
...     print(e)
ValueError: The dataset 'plutoo' does not exist in repository 'oscur'. Maybe you meant one of these:
- oscur/pluto (similarity: 90%)
- XXX (similarity: 80%)
>>> # Handle input without namespace
>>> try:
...     loader = mapper.loader.from_huggingface("plutoo")
... except ValueError as e:
...     print(e)
ValueError: The dataset 'plutoo' does not exist on Hugging Face. Maybe you meant one of these:
- oscur/pluto (similarity: 90%)
- XXX (similarity: 85%)
Source code in src/urban_mapper/modules/loader/loader_factory.py
def from_huggingface(
    self,
    repo_id: str,
    number_of_rows: Optional[int] = None,
    streaming: Optional[bool] = False,
    debug_limit_list_datasets: Optional[int] = None,
) -> "LoaderFactory":
    """
    Load a dataset from `Hugging Face's Hub` using the `datasets` library.

    !!! info "What Are Hugging Face Datasets?"
        πŸ€— **Hugging Face Datasets** is your gateway to a vast list of datasets tailored for various application domains
        such as urban computing. In a nuthsell, this library simplifies data access, letting you load datasets
        with a single line of code.

        **How to Find and Use Datasets**: Head to the [Hugging Face Datasets Hub](https://huggingface.co/datasets),
        where you can search anything you like (e.g., "PLUTO" for NYC buildings information).

        For `from_huggingface`, you need the `repo_id` of the dataset you want to load. To find the `repo_id`, look for the
        `<namespace>/<dataset_name>` format in each card displaying / dataset's URL.
        For example, click on one of the card / dataset of interest, and lookup for the website's URL. E.g. `https://huggingface.co/datasets/oscur/pluto`,
        the `repo_id` is `oscur/pluto`. The `namespace` is the organisation or user who created the dataset,
        and the `dataset_name` is the specific dataset name.
        In this case, `oscur` is the namespace and `pluto` is the dataset name.

    !!! success "OSCUR: Pioneering Urban Science"
        🌍 **OSCUR** (Open-Source Cyberinfrastructure for Urban Computing) integrates tools for data exploration,
        analytics, and machine learning, all while fostering a collaborative community to advance urban science.

        All datasets used by any of the initiatives under OSCUR are open-source and available on Hugging Face
        Datasets Hub. As `UrbanMapper` is one of the initiatives under OSCUR, all datasets throughout our examples
        and case studies are available under the `oscur` namespace.

        Feel free to explore our datasets, at [https://huggingface.co/oscur](https://huggingface.co/oscur).

        Load them easily:
        ```python
        loader = mapper.loader.from_huggingface("oscur/taxisvis1M")
        ```

        Dive deeper at [oscur.org](https://oscur.org/) for other open-source initiatives and tools.

    !!! warning "Potential Errors Explained"
        Mistakes happenβ€”here’s what might go wrong and how we help:

        If `repo_id` is invalid, a `ValueError` pops up with smart suggestions powered by
        [TheFuzz](https://github.com/seatgeek/thefuzz), a fuzzy matching library. We compare your input to
        existing datasets and offer the closest matches:

        - **No Slash (e.g., `plutoo`)**: Assumes it’s a dataset name and suggests full `repo_id`s (e.g., `oscur/pluto`). Or closest matches.
        - **Bad Namespace (e.g., `oscurq/pluto`)**: If the namespace doesn’t exist, we suggest similar ones (e.g., `oscur`).
        - **Bad Dataset Name (e.g., `oscur/plutoo`)**: If the namespace is valid but the dataset isn’t, we suggest close matches.

        Errors come with contextβ€”like available datasets in a namespaceβ€”so you can fix it fast.

    Args:
        repo_id (str): The dataset repository ID on Hugging Face.
        number_of_rows (Optional[int]): Number of rows to load. Defaults to None.
        streaming (Optional[bool]): Whether to use streaming mode. Defaults to False.
        debug_limit_list_datasets (Optional[int]): Limit on datasets fetched for error handling. Defaults to None.

    Returns:
        LoaderFactory: The updated LoaderFactory instance for method chaining.

    Raises:
        ValueError: If the dataset cannot be loaded due to an invalid `repo_id` or other issues.

    Examples:
        >>> # Load a full dataset
        >>> loader = mapper.loader.from_huggingface("oscur/pluto")
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: analyze or visualize the data

        >>> # Load 500 rows with streaming (i.e without loading the entire dataset)
        >>> loader = mapper.loader.from_huggingface("oscur/NYC_311", number_of_rows=500, streaming=True)
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: process the loaded subset

        >>> # Load 1000 rows without streaming
        >>> loader = mapper.loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000)
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: explore the sliced data

        >>> # Handle typo in namespace
        >>> try:
        ...     loader = mapper.loader.from_huggingface("oscurq/pluto")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The repository 'oscurq' does not exist on Hugging Face. Maybe you meant one of these:
        - oscur (similarity: 90%)
        - XXX (similarity: 85%)

        >>> # Handle typo in dataset name
        >>> try:
        ...     loader = mapper.loader.from_huggingface("oscur/plutoo")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The dataset 'plutoo' does not exist in repository 'oscur'. Maybe you meant one of these:
        - oscur/pluto (similarity: 90%)
        - XXX (similarity: 80%)

        >>> # Handle input without namespace
        >>> try:
        ...     loader = mapper.loader.from_huggingface("plutoo")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The dataset 'plutoo' does not exist on Hugging Face. Maybe you meant one of these:
        - oscur/pluto (similarity: 90%)
        - XXX (similarity: 85%)

    """
    self.source_type = "huggingface"
    try:
        if number_of_rows:
            if streaming:
                # Use streaming mode to fetch only the required rows
                dataset = datasets.load_dataset(
                    repo_id, split="train", streaming=True
                )
                limited_rows = list(islice(dataset, number_of_rows))
                self.source_data = pd.DataFrame(limited_rows)
                logger.log(
                    "DEBUG_LOW",
                    f"Loaded {number_of_rows} rows in streaming mode from {repo_id}.",
                )
            else:
                # Use slicing with split for non-streaming mode
                dataset = datasets.load_dataset(
                    repo_id, split=f"train[:{number_of_rows}]"
                )
                self.source_data = pd.DataFrame(dataset)
                logger.log(
                    "DEBUG_LOW", f"Loaded {number_of_rows} rows from {repo_id}."
                )
        else:
            dataset = datasets.load_dataset(repo_id, split="train")
            self.source_data = pd.DataFrame(dataset)
            logger.log("DEBUG_LOW", f"Loaded dataset {repo_id}.")

    except datasets.exceptions.DatasetNotFoundError as e:
        dataset_dict = self._build_dataset_dict(limit=debug_limit_list_datasets)
        if "/" not in repo_id:
            all_datasets = [
                f"{repo}/{ds}"
                for repo, ds_list in dataset_dict.items()
                for ds in ds_list
            ]
            matches = process.extract(
                repo_id,
                all_datasets,
                processor=lambda x: x.split("/")[-1] if "/" in x else x,
            )
            filtered_matches = [
                (match, score) for match, score in matches if score > 80
            ]
            top_matches = filtered_matches[:10]
            suggestions = [
                f"{match} (similarity: {score}%)" for match, score in top_matches
            ]
            suggestion_text = (
                " Maybe you meant one of these:\n" + "\n".join(suggestions)
                if suggestions
                else ""
            )
            raise ValueError(
                f"The dataset '{repo_id}' does not exist on Hugging Face. "
                f"Please verify the dataset ID.{suggestion_text}"
            ) from e
        else:
            repo_name, dataset_name = repo_id.split("/", 1)
            if repo_name not in dataset_dict:
                all_repos = list(dataset_dict.keys())
                matches = process.extract(repo_name, all_repos, limit=1000)
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{match} (similarity: {score}%)"
                    for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The repository '{repo_name}' does not exist on Hugging Face. "
                    f"Please verify the repository name.{suggestion_text}"
                ) from e
            else:
                available_datasets = dataset_dict[repo_name]
                matches = process.extract(
                    dataset_name, available_datasets, limit=None
                )
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{repo_name}/{match} (similarity: {score}%)"
                    for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The dataset '{dataset_name}' does not exist in repository '{repo_name}'. "
                    f"Available datasets: {', '.join(available_datasets)}.{suggestion_text}"
                ) from e

    except Exception as e:
        raise ValueError(f"Error loading dataset '{repo_id}': {str(e)}") from e

    self.latitude_column = "None"
    self.longitude_column = "None"
    self.map_columns = "None"
    logger.log(
        "DEBUG_LOW",
        f"FROM_HUGGINGFACE: Loaded dataset {repo_id} with "
        f"{'all rows' if number_of_rows is None else number_of_rows} rows "
        f"{'(streaming mode)' if streaming else '(non-streaming mode)'}.",
    )
    return self

with_columns(longitude_column, latitude_column)

Specify the latitude and longitude columns in the data source.

This method configures which columns in the data source contain the latitude and longitude coordinates. This is required for CSV and Parquet files, as well as for pandas DataFrames without geometry.

Parameters:

Name Type Description Default
longitude_column str

Name of the column containing longitude values.

required
latitude_column str

Name of the column containing latitude values.

required

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_columns(
    self,
    longitude_column: str,
    latitude_column: str,
) -> "LoaderFactory":
    """Specify the latitude and longitude columns in the data source.

    This method configures which columns in the data source contain the latitude
    and longitude coordinates. This is required for `CSV` and `Parquet` files, as well
    as for `pandas DataFrames` without geometry.

    Args:
        longitude_column: Name of the column containing longitude values.
        latitude_column: Name of the column containing latitude values.

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")
    """
    self.latitude_column = latitude_column
    self.longitude_column = longitude_column
    logger.log(
        "DEBUG_LOW",
        f"WITH_COLUMNS: Initialised LoaderFactory "
        f"with latitude_column={latitude_column} and longitude_column={longitude_column}",
    )
    return self

with_crs(crs=DEFAULT_CRS)

Specify the coordinate reference system for the loaded data.

This method configures the coordinate reference system (CRS) to use for the loaded data. If the source data already has a CRS, it will be converted to the specified CRS.

Parameters:

Name Type Description Default
crs str

The coordinate reference system to use, in any format accepted by geopandas (default: EPSG:4326, which is standard WGS84 coordinates).

DEFAULT_CRS

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_crs(self, crs: str = DEFAULT_CRS) -> "LoaderFactory":
    """Specify the coordinate reference system for the loaded data.

    This method configures the `coordinate reference system (CRS)` to use for the loaded
    data. If the source data already has a `CRS`, it will be converted to the specified `CRS`.

    Args:
        crs: The coordinate reference system to use, in any format accepted by geopandas
            (default: `EPSG:4326`, which is standard `WGS84` coordinates).

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
    """
    self.crs = crs
    logger.log(
        "DEBUG_LOW",
        f"WITH_CRS: Initialised LoaderFactory with crs={crs}",
    )
    return self

with_preview(format='ascii')

Configure the factory to display a preview after loading or building.

This method configures the factory to automatically display a preview after loading data with load() or building a loader with build(). It's a convenient way to inspect the loader configuration and the loaded data.

Parameters:

Name Type Description Default
format

The format to display the preview in (default: "ascii").

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> # Auto-preview after loading
>>> gdf = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .with_preview(format="json")            ...     .load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_preview(self, format="ascii") -> "LoaderFactory":
    """Configure the factory to display a preview after loading or building.

    This method configures the factory to automatically display a preview after
    loading data with `load()` or building a loader with `build()`. It's a convenient
    way to inspect the loader configuration and the loaded data.

    Args:
        format: The format to display the preview in (default: "ascii").

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> # Auto-preview after loading
        >>> gdf = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .with_preview(format="json")\
        ...     .load()
    """
    self._preview = {
        "format": format,
    }
    return self

load(coordinate_reference_system=DEFAULT_CRS)

Load the data and return it as a GeoDataFrame or raster object.

This method loads the data from the configured source and returns it as a geopandas GeoDataFrame. It handles the details of loading from different source types and formats.

Parameters:

Name Type Description Default
coordinate_reference_system str

The coordinate reference system to use for the loaded data (default: "EPSG:4326", which is standard WGS84 coordinates).

DEFAULT_CRS

Returns:

Type Description

A GeoDataFrame containing the loaded data.

Raises:

Type Description
ValueError

If the source type is invalid, the file format is unsupported, or required parameters (like latitude/longitude columns) are missing.

Examples:

>>> # Load CSV data
>>> gdf = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .load()
>>> 
>>> # Load shapefile data
>>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
@require_attributes(["source_type", "source_data"])
def load(self, coordinate_reference_system: str = DEFAULT_CRS):
    """Load the data and return it as a `GeoDataFrame` or raster object.

    This method loads the data from the configured source and returns it as a
    geopandas `GeoDataFrame`. It handles the details of loading from different
    source types and formats.

    Args:
        coordinate_reference_system: The coordinate reference system to use for the
            loaded data (default: "EPSG:4326", which is standard WGS84 coordinates).

    Returns:
        A GeoDataFrame containing the loaded data.

    Raises:
        ValueError: If the source type is invalid, the file format is unsupported,
            or required parameters (like latitude/longitude columns) are missing.

    Examples:
        >>> # Load CSV data
        >>> gdf = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .load()
        >>> 
        >>> # Load shapefile data
        >>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
    """
    if self.source_type == "file":
        file_ext = Path(self.source_data).suffix.lower()
        if file_ext not in FILE_LOADER_FACTORY:
            raise ValueError(f"Unsupported file format: {file_ext}")
        loader_info = FILE_LOADER_FACTORY[file_ext]
        if loader_info["requires_columns"] and (
            self.latitude_column is None or self.longitude_column is None
        ):
            raise ValueError(
                f"Loader for {file_ext} requires latitude and longitude columns. Call with_columns() first."
            )
        loaded_data = self._load_from_file(coordinate_reference_system)
        if self._preview is not None:
            self.preview(format=self._preview["format"])
        return loaded_data 
    elif self.source_type == "dataframe":
        if self.latitude_column == "None" or self.longitude_column == "None":
            raise ValueError(
                "DataFrame loading requires latitude and longitude columns. Call with_columns() with valid column names."
            )
        loaded_data = self._load_from_dataframe(coordinate_reference_system)
        if self._preview is not None:
            logger.log(
                "DEBUG_LOW",
                "Note: Preview is not supported for DataFrame sources.",
            )
        return loaded_data
    elif self.source_type == "huggingface":
        if self.latitude_column == "None" or self.longitude_column == "None":
            raise ValueError(
                "Hugging Face dataset loading requires latitude and longitude columns. "
                "Call with_columns() with valid column names."
            )
        loaded_data = self._load_from_dataframe(coordinate_reference_system)
        if self._preview is not None:
            logger.log(
                "DEBUG_LOW",
                "Note: Preview is not supported for DataFrame sources.",
            )
        return loaded_data
    else:
        raise ValueError("Invalid source type.")

build()

Build and return a loader instance without loading the data.

This method creates and returns a loader instance without immediately loading the data. It is primarily intended for use in the UrbanPipeline, where the actual loading is deferred until pipeline execution.

Returns:

Type Description
LoaderBase

A LoaderBase instance configured to load the data when needed.

Raises:

Type Description
ValueError

If the source type is not supported, the file format is unsupported, or required parameters (like latitude/longitude columns) are missing.

Note

For most use cases outside of pipelines, using load() is preferred as it directly returns the loaded data.

Examples:

>>> # Creating a pipeline component
>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .build()
>>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
Source code in src/urban_mapper/modules/loader/loader_factory.py
def build(self) -> LoaderBase:
    """Build and return a `loader` instance without loading the data.

    This method creates and returns a loader instance without immediately loading
    the data. It is primarily intended for use in the `UrbanPipeline`, where the
    actual loading is deferred until pipeline execution.

    Returns:
        A LoaderBase instance configured to load the data when needed.

    Raises:
        ValueError: If the source type is not supported, the file format is unsupported,
            or required parameters (like latitude/longitude columns) are missing.

    Note:
        For most use cases outside of pipelines, using load() is preferred as it
        directly returns the loaded data.

    Examples:
        >>> # Creating a pipeline component
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .build()
        >>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
    """
    logger.log(
        "DEBUG_MID",
        "WARNING: build() should only be used in UrbanPipeline. "
        "In other cases, using .load() is a better option.",
    )
    if self.source_type != "file":
        raise ValueError("Build only supports file sources for now.")
    file_ext = Path(self.source_data).suffix.lower()
    if file_ext not in FILE_LOADER_FACTORY:
        raise ValueError(f"Unsupported file format: {file_ext}")
    loader_info = FILE_LOADER_FACTORY[file_ext]
    loader_class = loader_info["class"]
    requires_columns = loader_info["requires_columns"]
    if requires_columns and (
        self.latitude_column is None or self.longitude_column is None
    ):
        raise ValueError(
            f"Loader for {file_ext} requires latitude and longitude columns. Call with_columns() first."
        )
    self._instance = loader_class(
        file_path=self.source_data,
        latitude_column=self.latitude_column,
        longitude_column=self.longitude_column,
        coordinate_reference_system=self.crs,
        map_columns=self.map_columns,
    )
    if self._preview is not None:
        self.preview(format=self._preview["format"])
    return self._instance

preview(format='ascii')

Display a preview of the loader configuration and settings.

This method generates and displays a preview of the loader, showing its configuration, settings, and other metadata. The preview can be displayed in different formats.

Parameters:

Name Type Description Default
format

The format to display the preview in (default: "ascii").

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Raises:

Type Description
ValueError

If an unsupported format is specified.

Note

This method requires a loader instance to be available. Call load() or build() first to create an instance.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")
>>> # Preview after loading data
>>> loader.load()
>>> loader.preview()
>>> # Or JSON format
>>> loader.preview(format="json")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def preview(self, format="ascii") -> None:
    """Display a preview of the `loader` configuration and settings.

    This method generates and displays a preview of the `loader`, showing its
    `configuration`, `settings`, and `other metadata`. The preview can be displayed
    in different formats.

    Args:
        format: The format to display the preview in (default: "ascii").

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Raises:
        ValueError: If an unsupported format is specified.

    Note:
        This method requires a loader instance to be available. Call load()
        or build() first to create an instance.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")
        >>> # Preview after loading data
        >>> loader.load()
        >>> loader.preview()
        >>> # Or JSON format
        >>> loader.preview(format="json")
    """
    if self._instance is None:
        logger.log(
            "DEBUG_LOW",
            "No loader instance available to preview. Call load() first.",
        )
        return

    if hasattr(self._instance, "preview"):
        preview_data = self._instance.preview(format=format)
        if format == "ascii":
            print(preview_data)
        elif format == "json":
            print(json.dumps(preview_data, indent=2))
        else:
            raise ValueError(f"Unsupported format '{format}'.")
    else:
        logger.log("DEBUG_LOW", "Preview not supported for this loader's instance.")
Fabio, Provost Simon, sonia