Digital Landscape Traces: Transformation to Parquet Format

Alexander Dunkel, Leibniz Institute of Ecological Urban and Regional Development,
Transformative Capacities & Research Data Centre (IÖR-FDZ)

No description has been provided for this image
•••
Out[24]:

Last updated: Nov-04-2025, Carto-Lab Docker Version 1.1.0

This notebook performs a critical data transformation step in our analysis pipeline. Its sole purpose is to prepare the raw, full-resolution social media data for high-performance visualization in the next stage. Visualizing tens of millions of points directly from a CSV file is extremely slow. To overcome this, we will convert the data into the columnar Apache Parquet format, which is highly optimized for fast, parallelized read operations with libraries like Dask and Datashader. I have written about this pipeline before here.

The workflow involves several key stages:

  1. Load Input Data: We will load two datasets created in the previous notebooks:The large CSV file containing 66+ million full-resolution coordinates (latitude, longitude) and their associated user_guid. The smaller CSV file containing the final user classification (Local, Tourist) for the ~600,000 most active users.
  2. Merge and Classify: The script will process the large coordinate file in memory-efficient chunks. For each point, it will merge the corresponding user classification. Points from users who were filtered out in the home-location inference step (i.e., those not in the classification file) will be explicitly categorized as "Unclassified".
  3. Coordinate Projection: The latitude and longitude coordinates (in WGS84, EPSG:4326) will be re-projected into the Web Mercator projection (EPSG:3857). This is the standard coordinate system used by datashader and web mapping services, and performing this transformation now saves significant computation time during the visualization step.
  4. Export to Parquet: The final, prepared data—containing the projected x and y coordinates and the classification for every point—is written to a single, consolidated Parquet file.

The output of this notebook is a clean, optimized Parquet file that will serve as the direct input for the final visualization notebook (03_visualization.ipynb).

Prerequisites

Install selected dependences that are not included in Carto-Lab Docker.

  • We use PyGeoHash for spatial aggregation of coordinates
In [25]:
import sys
pyexec = sys.executable
print(f"Current Kernel {pyexec}")
!../py/modules/base/pkginstall.sh "{pyexec}" pygeohash
Current Kernel /opt/conda/envs/worker_env/bin/python3.12
pygeohash already installed (version unknown).

Import all dependencies

In [26]:
import os
import sys
from pathlib import Path
import pandas as pd
import dask.dataframe as dd
import pygeohash as pgh
from datashader.utils import lnglat_to_meters
from IPython.display import clear_output
from typing import Optional, List
•••
List of package versions used in this notebook
package python geopandas pandas datashader dask matplotlib
version 3.12.11 1.1.1 2.3.3 0.18.2 2025.9.1 3.10.6

Load dependencies:

Activate autoreload of changed python files:

In [28]:
%load_ext autoreload
%autoreload 2
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload

Parameters

Define initial parameters that affect processing

In [29]:
OUTPUT = Path.cwd().parents[0] / "out"
WORK_DIR = Path.cwd().parents[0] / "tmp"
DATA_FOLDER = Path.cwd().parents[0] / "00_data"
In [30]:
COORDS_CSV = DATA_FOLDER / "2025-09-30_DE_coords_user.csv"
OUTSIDE_COORDS_CSV = DATA_FOLDER / "2025-10-18_DE_outside_buffer_coords.csv"
CLASSIFICATION_CSV = OUTPUT / "user_classification.csv"
In [31]:
PARQUET_OUTPUT = OUTPUT / "de_classified_points.parquet"
PARQUET_OUTSIDE_DE = OUTPUT / "de_outside_background.parquet"
In [32]:
CHUNK_SIZE = 5000000
In [33]:
GEOHASH_PRECISION = 7

Data Loading

The core of this step is to combine our large coordinate file with the smaller classification file. We will use Dask to handle the large dataset efficiently. Users who were filtered out in the home-location inference step (e.g., had < 30 user-days) will be explicitly categorized as "Unclassified".

Load the small classification file into pandas, then convert to Dask.

In [34]:
df_class = pd.read_csv(CLASSIFICATION_CSV)
dd_class = dd.from_pandas(df_class, npartitions=1)
print(f"Loaded {len(df_class)} classified users.")
Loaded 582851 classified users.

Load the large coordinate file directly into Dask.

In [35]:
dd_coords = dd.read_csv(
    COORDS_CSV,
    usecols=['user_guid', 'latitude', 'longitude'],
    dtype={'latitude': float, 'longitude': float}
)

Merge the two dataframes using a 'left' join. This keeps all coordinates, adding classification where it exists.

In [36]:
dd_merged = dd_coords.merge(dd_class, on='user_guid', how='left')

# Fill missing classifications with our 'Unclassified' category
dd_merged['classification'] = dd_merged['classification'].fillna('Unclassified')

# Categorize the column for memory efficiency and for datashader
dd_merged['classification'] = dd_merged['classification'].astype('category')
In [37]:
dd_merged.head()
Out[37]:
user_guid latitude longitude classification
0 s35OsHREf675JMcqzWo/ErEkRNfiYwqZKY05uxVQf3I 52.502161 13.326692 Tourist
1 nMfjZ4QrdKIF4U/vgVki5QKoE48gm1h+IRJvsUlwsF8 50.935788 6.841120 Tourist
2 vnRT1cxi5btm78R82YjgIdq/8+l7U4Ez4OiB2BStOlA 52.529762 13.401509 Tourist
3 LvaPHXU6qXYhkT68t/T2hiL/xL6iJp/yIjKDYrGlKMg 47.741380 8.724399 Tourist
4 ZuKnXcasjiYNM5dTHXzf8qh2iwbZ/YXjKQfD1DRVmPc 50.543818 7.098820 Tourist

Project Coordinates and Save to Parquet

Spatial Aggregation for Enhanced Privacy

Before creating the final Parquet files, perform a data generalization step, to further enhance user privacy. This is in line with the principle of data minimization . We spatially aggregate the points by snapping them to the center of a predefined grid using the Geohash algorithm.

The key parameter for this process is the Geohash precision. We have chosen a precision of 7, which corresponds to a grid cell size of approximately 153m x 153m.

This level of aggregation was selected to ensure it has little or no discernible impact on the final country-scale visualization. The final map is rendered at a width of 2400 pixels, where each pixel represents an area roughly 263 meters wide. Since our aggregation grid (153m) is significantly finer than the map's pixel resolution, the density patterns rendered later by datashader will (likely) be perceptually identical to those from the original data.

This approach allows us to further reduce the granularity of the shared data, which strenghens privacy protection without compromising the scientific validity or reproducibility of the final visual output.

In [38]:
def geohash_reduce(
    df: pd.DataFrame,
    precision: int,
    lat_col: str = 'latitude',
    lon_col: str = 'longitude'
) -> pd.DataFrame:
    """
    Generalizes coordinates by snapping them to the center of their Geohash cell
    while preserving the original number and order of rows.

    This function is optimized for performance by creating a mapping of unique
    geohashes to their centroids and applying this map back to the original
    DataFrame, avoiding row-wise decoding. The original coordinate columns
    are overwritten.

    Args:
        df (pd.DataFrame): The input DataFrame.
        precision (int): The desired Geohash precision.
        lat_col (str): The name of the latitude column.
        lon_col (str): The name of the longitude column.

    Returns:
        pd.DataFrame: The original DataFrame with the coordinate columns
            modified in-place.
    """
    # 1. Encode all coordinates to Geohash strings.
    # This is the only row-wise operation, and it's necessary.
    geohashes = df.apply(
        lambda row: pgh.encode(
            row[lat_col], row[lon_col], precision=precision),
            axis=1
    )

    # 2. Create an efficient mapping from each unique geohash to its centroid.
    # This is the core performance optimization. We only decode each geohash ONCE.
    unique_geohashes = geohashes.unique()
    centroid_map = {gh: pgh.decode(gh) for gh in unique_geohashes}

    # 3. Use the fast .map() method to apply the centroids back to all rows.
    # .map() is vectorized and preserves the original order.
    # The result is a Series of (latitude, longitude) tuples.
    snapped_coords = geohashes.map(centroid_map)

    # 4. Unpack the tuples and overwrite the original coordinate columns.
    df[lat_col], df[lon_col] = zip(*snapped_coords)

    return df

Process coordinates

First, process the inside DE core dataset.

In [39]:
%%time
if PARQUET_OUTPUT.exists():
    print(
        f"Parquet file already exists at {PARQUET_OUTPUT}. Skipping creation.")
    print(
        "To re-run the conversion, please delete the directory first.")
else:
    PARQUET_OUTPUT.mkdir()
    print("Loading user classification data...")
    df_class = pd.read_csv(CLASSIFICATION_CSV, index_col='user_guid')
    print(f"Loaded {len(df_class)} classified users.")
    print("Starting chunk-based processing of coordinate CSV...")
    iter_csv = pd.read_csv(
        COORDS_CSV,
        usecols=['user_guid', 'latitude', 'longitude'],
        dtype={'latitude': float, 'longitude': float},
        iterator=True,
        chunksize=CHUNK_SIZE
    )

    total_rows = 0
    
    for i, chunk in enumerate(iter_csv):
        clear_output(wait=True)
        print(f"Processing chunk {i+1}...")
        
        # Merge, classify, and project the chunk
        chunk_merged = chunk.join(df_class, on='user_guid', how='left')
        chunk_merged['classification'] = \
            chunk_merged['classification'] \
            .fillna('Unclassified') \
            .astype('category')

        # Call the geohash function to perform the aggregation
        chunk_agg = geohash_reduce(
            df=chunk_merged,
            precision=GEOHASH_PRECISION
        )

        # Project the aggregated coordinates to Web Mercator
        web_mercator_x, web_mercator_y = lnglat_to_meters(
            chunk_agg['longitude'],
            chunk_agg['latitude'])
        df_final_chunk = pd.DataFrame({
            'x': web_mercator_x,
            'y': web_mercator_y,
            'classification': chunk_agg['classification']
        })

        # Convert final pandas chunk to Dask for writing
        dd_final_chunk = dd.from_pandas(df_final_chunk, npartitions=1)

        params = {
            "write_index":False,
            "compression":"SNAPPY"
        }
        # Write to Parquet, creating or appending as needed
        if i == 0:
            dd_final_chunk.to_parquet(PARQUET_OUTPUT, **params)
        else:
            dd_final_chunk.to_parquet(PARQUET_OUTPUT, append=True, **params)


    print("\nParquet file creation complete.")
Processing chunk 14...

Parquet file creation complete.
CPU times: user 10min 14s, sys: 16.9 s, total: 10min 31s
Wall time: 10min 31s
In [40]:
datasize = sum(f.stat().st_size for f in PARQUET_OUTPUT.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
    f"Size: {datasize:,.1f} GB")
Size: 0.1 GB

Then, process the outside DE supplementary dataset

In [41]:
%%time
if PARQUET_OUTSIDE_DE.exists():
    print(f"\nFile already exists: {PARQUET_OUTSIDE_DE}. Skipping.")
else:
    print("\n--- Processing OUTSIDE Germany Data ---")
    iter_csv_outside = pd.read_csv(
        OUTSIDE_COORDS_CSV, iterator=True, chunksize=CHUNK_SIZE
    )
    for i, chunk in enumerate(iter_csv_outside):
        clear_output(wait=True)
        print(f"Processing outside-DE chunk {i+1}...")

        # Call the geohash function without classification grouping
        chunk_agg = geohash_reduce(
            df=chunk,
            precision=GEOHASH_PRECISION)

        # No classification needed, just project and save to the second Parquet file
        web_mercator_x, web_mercator_y = lnglat_to_meters(
            chunk_agg['longitude'], chunk_agg['latitude'])
        df_final_chunk = pd.DataFrame(
            {'x': web_mercator_x, 'y': web_mercator_y})
        dd_final_chunk = dd.from_pandas(
            df_final_chunk, npartitions=1)

        if i == 0:
            dd_final_chunk.to_parquet(
                PARQUET_OUTSIDE_DE, write_index=False, 
                compression="SNAPPY")
        else:
            dd_final_chunk.to_parquet(
                PARQUET_OUTSIDE_DE, append=True, 
                write_index=False, compression="SNAPPY")
    print("\n'Outside Germany' Parquet file created.")
Processing outside-DE chunk 15...

'Outside Germany' Parquet file created.
CPU times: user 9min 40s, sys: 8.94 s, total: 9min 49s
Wall time: 9min 49s
In [42]:
datasize = sum(f.stat().st_size for f in PARQUET_OUTSIDE_DE.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
    f"Size: {datasize:,.1f} GB")
Size: 0.1 GB

Create notebook HTML

In [ ]:
!jupyter nbconvert --to html_toc \
    --output-dir=../resources/html/ ./02_parquet.ipynb \
    --template=../nbconvert.tpl \
    --ExtractOutputPreprocessor.enabled=False >&- 2>&- # create single output file
In [ ]:
 

IOER FDZ Jupyter Base Template v0.13.0