Digital Landscape Traces: Transformation to Parquet Format ¶
Alexander Dunkel, Leibniz Institute of Ecological Urban and Regional Development,
Transformative Capacities & Research Data Centre (IÖR-FDZ)
This notebook performs a critical data transformation step in our analysis pipeline. Its sole purpose is to prepare the raw, full-resolution social media data for high-performance visualization in the next stage. Visualizing tens of millions of points directly from a CSV file is extremely slow. To overcome this, we will convert the data into the columnar Apache Parquet format, which is highly optimized for fast, parallelized read operations with libraries like Dask and Datashader. I have written about this pipeline before here.
The workflow involves several key stages:
- Load Input Data: We will load two datasets created in the previous notebooks:The large CSV file containing 66+ million full-resolution coordinates (latitude, longitude) and their associated user_guid. The smaller CSV file containing the final user classification (Local, Tourist) for the ~600,000 most active users.
- Merge and Classify: The script will process the large coordinate file in memory-efficient chunks. For each point, it will merge the corresponding user classification. Points from users who were filtered out in the home-location inference step (i.e., those not in the classification file) will be explicitly categorized as "Unclassified".
- Coordinate Projection: The latitude and longitude coordinates (in WGS84, EPSG:4326) will be re-projected into the Web Mercator projection (EPSG:3857). This is the standard coordinate system used by datashader and web mapping services, and performing this transformation now saves significant computation time during the visualization step.
- Export to Parquet: The final, prepared data—containing the projected x and y coordinates and the classification for every point—is written to a single, consolidated Parquet file.
The output of this notebook is a clean, optimized Parquet file that will serve as the direct input for the final visualization notebook (03_visualization.ipynb).
Prerequisites¶
Install selected dependences that are not included in Carto-Lab Docker.
- We use PyGeoHash for spatial aggregation of coordinates
import sys
pyexec = sys.executable
print(f"Current Kernel {pyexec}")
!../py/modules/base/pkginstall.sh "{pyexec}" pygeohash
Import all dependencies
import os
import sys
from pathlib import Path
import pandas as pd
import dask.dataframe as dd
import pygeohash as pgh
from datashader.utils import lnglat_to_meters
from IPython.display import clear_output
from typing import Optional, List
Load dependencies:
Activate autoreload of changed python files:
%load_ext autoreload
%autoreload 2
Parameters¶
Define initial parameters that affect processing
OUTPUT = Path.cwd().parents[0] / "out"
WORK_DIR = Path.cwd().parents[0] / "tmp"
DATA_FOLDER = Path.cwd().parents[0] / "00_data"
COORDS_CSV = DATA_FOLDER / "2025-09-30_DE_coords_user.csv"
OUTSIDE_COORDS_CSV = DATA_FOLDER / "2025-10-18_DE_outside_buffer_coords.csv"
CLASSIFICATION_CSV = OUTPUT / "user_classification.csv"
PARQUET_OUTPUT = OUTPUT / "de_classified_points.parquet"
PARQUET_OUTSIDE_DE = OUTPUT / "de_outside_background.parquet"
CHUNK_SIZE = 5000000
GEOHASH_PRECISION = 7
Data Loading¶
The core of this step is to combine our large coordinate file with the smaller classification file. We will use Dask to handle the large dataset efficiently. Users who were filtered out in the home-location inference step (e.g., had < 30 user-days) will be explicitly categorized as "Unclassified".
Load the small classification file into pandas, then convert to Dask.
df_class = pd.read_csv(CLASSIFICATION_CSV)
dd_class = dd.from_pandas(df_class, npartitions=1)
print(f"Loaded {len(df_class)} classified users.")
Load the large coordinate file directly into Dask.
dd_coords = dd.read_csv(
COORDS_CSV,
usecols=['user_guid', 'latitude', 'longitude'],
dtype={'latitude': float, 'longitude': float}
)
Merge the two dataframes using a 'left' join. This keeps all coordinates, adding classification where it exists.
dd_merged = dd_coords.merge(dd_class, on='user_guid', how='left')
# Fill missing classifications with our 'Unclassified' category
dd_merged['classification'] = dd_merged['classification'].fillna('Unclassified')
# Categorize the column for memory efficiency and for datashader
dd_merged['classification'] = dd_merged['classification'].astype('category')
dd_merged.head()
Project Coordinates and Save to Parquet¶
Spatial Aggregation for Enhanced Privacy¶
Before creating the final Parquet files, perform a data generalization step, to further enhance user privacy. This is in line with the principle of data minimization . We spatially aggregate the points by snapping them to the center of a predefined grid using the Geohash algorithm.
The key parameter for this process is the Geohash precision. We have chosen a precision of 7, which corresponds to a grid cell size of approximately 153m x 153m.
This level of aggregation was selected to ensure it has little or no discernible
impact on the final country-scale visualization. The final map is rendered at a
width of 2400 pixels, where each pixel represents an area roughly 263 meters wide.
Since our aggregation grid (153m) is significantly finer than the map's pixel resolution,
the density patterns rendered later by datashader will (likely) be perceptually identical to those
from the original data.
This approach allows us to further reduce the granularity of the shared data, which strenghens privacy protection without compromising the scientific validity or reproducibility of the final visual output.
def geohash_reduce(
df: pd.DataFrame,
precision: int,
lat_col: str = 'latitude',
lon_col: str = 'longitude'
) -> pd.DataFrame:
"""
Generalizes coordinates by snapping them to the center of their Geohash cell
while preserving the original number and order of rows.
This function is optimized for performance by creating a mapping of unique
geohashes to their centroids and applying this map back to the original
DataFrame, avoiding row-wise decoding. The original coordinate columns
are overwritten.
Args:
df (pd.DataFrame): The input DataFrame.
precision (int): The desired Geohash precision.
lat_col (str): The name of the latitude column.
lon_col (str): The name of the longitude column.
Returns:
pd.DataFrame: The original DataFrame with the coordinate columns
modified in-place.
"""
# 1. Encode all coordinates to Geohash strings.
# This is the only row-wise operation, and it's necessary.
geohashes = df.apply(
lambda row: pgh.encode(
row[lat_col], row[lon_col], precision=precision),
axis=1
)
# 2. Create an efficient mapping from each unique geohash to its centroid.
# This is the core performance optimization. We only decode each geohash ONCE.
unique_geohashes = geohashes.unique()
centroid_map = {gh: pgh.decode(gh) for gh in unique_geohashes}
# 3. Use the fast .map() method to apply the centroids back to all rows.
# .map() is vectorized and preserves the original order.
# The result is a Series of (latitude, longitude) tuples.
snapped_coords = geohashes.map(centroid_map)
# 4. Unpack the tuples and overwrite the original coordinate columns.
df[lat_col], df[lon_col] = zip(*snapped_coords)
return df
Process coordinates¶
First, process the inside DE core dataset.
%%time
if PARQUET_OUTPUT.exists():
print(
f"Parquet file already exists at {PARQUET_OUTPUT}. Skipping creation.")
print(
"To re-run the conversion, please delete the directory first.")
else:
PARQUET_OUTPUT.mkdir()
print("Loading user classification data...")
df_class = pd.read_csv(CLASSIFICATION_CSV, index_col='user_guid')
print(f"Loaded {len(df_class)} classified users.")
print("Starting chunk-based processing of coordinate CSV...")
iter_csv = pd.read_csv(
COORDS_CSV,
usecols=['user_guid', 'latitude', 'longitude'],
dtype={'latitude': float, 'longitude': float},
iterator=True,
chunksize=CHUNK_SIZE
)
total_rows = 0
for i, chunk in enumerate(iter_csv):
clear_output(wait=True)
print(f"Processing chunk {i+1}...")
# Merge, classify, and project the chunk
chunk_merged = chunk.join(df_class, on='user_guid', how='left')
chunk_merged['classification'] = \
chunk_merged['classification'] \
.fillna('Unclassified') \
.astype('category')
# Call the geohash function to perform the aggregation
chunk_agg = geohash_reduce(
df=chunk_merged,
precision=GEOHASH_PRECISION
)
# Project the aggregated coordinates to Web Mercator
web_mercator_x, web_mercator_y = lnglat_to_meters(
chunk_agg['longitude'],
chunk_agg['latitude'])
df_final_chunk = pd.DataFrame({
'x': web_mercator_x,
'y': web_mercator_y,
'classification': chunk_agg['classification']
})
# Convert final pandas chunk to Dask for writing
dd_final_chunk = dd.from_pandas(df_final_chunk, npartitions=1)
params = {
"write_index":False,
"compression":"SNAPPY"
}
# Write to Parquet, creating or appending as needed
if i == 0:
dd_final_chunk.to_parquet(PARQUET_OUTPUT, **params)
else:
dd_final_chunk.to_parquet(PARQUET_OUTPUT, append=True, **params)
print("\nParquet file creation complete.")
datasize = sum(f.stat().st_size for f in PARQUET_OUTPUT.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
f"Size: {datasize:,.1f} GB")
Then, process the outside DE supplementary dataset
%%time
if PARQUET_OUTSIDE_DE.exists():
print(f"\nFile already exists: {PARQUET_OUTSIDE_DE}. Skipping.")
else:
print("\n--- Processing OUTSIDE Germany Data ---")
iter_csv_outside = pd.read_csv(
OUTSIDE_COORDS_CSV, iterator=True, chunksize=CHUNK_SIZE
)
for i, chunk in enumerate(iter_csv_outside):
clear_output(wait=True)
print(f"Processing outside-DE chunk {i+1}...")
# Call the geohash function without classification grouping
chunk_agg = geohash_reduce(
df=chunk,
precision=GEOHASH_PRECISION)
# No classification needed, just project and save to the second Parquet file
web_mercator_x, web_mercator_y = lnglat_to_meters(
chunk_agg['longitude'], chunk_agg['latitude'])
df_final_chunk = pd.DataFrame(
{'x': web_mercator_x, 'y': web_mercator_y})
dd_final_chunk = dd.from_pandas(
df_final_chunk, npartitions=1)
if i == 0:
dd_final_chunk.to_parquet(
PARQUET_OUTSIDE_DE, write_index=False,
compression="SNAPPY")
else:
dd_final_chunk.to_parquet(
PARQUET_OUTSIDE_DE, append=True,
write_index=False, compression="SNAPPY")
print("\n'Outside Germany' Parquet file created.")
datasize = sum(f.stat().st_size for f in PARQUET_OUTSIDE_DE.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
f"Size: {datasize:,.1f} GB")
Create notebook HTML¶
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./02_parquet.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&- # create single output file