Digital Landscape Traces: Clustering ¶
Alexander Dunkel, Leibniz Institute of Ecological Urban and Regional Development,
Transformative Capacities & Research Data Centre (IÖR-FDZ)
This notebook continues the analysis of social media activity to determine user home locations. It takes the privacy-preserving, HLL-aggregated dataset of worldwide user activity generated in the previous notebook and performs the final home location inference for approximately 6 million users (who were active in Germany at least once).
The primary challenge at this stage is processing the multi-gigabyte dataset efficiently without loading it into memory. This notebook implements a scalable, resumable batch processing workflow with the following key stages:
- Memory-Efficient Data Streaming: The 2.5 GB sorted CSV file is read using a custom Python generator. This function streams the data chunk by chunk, yielding the complete location history for one user at a time, keeping memory usage minimal.
- Spatial Aggregation by Country: For each user, instead of computationally intensive point clustering, we perform an efficient spatial join. Each of the user's location points is associated with the country polygon it falls within.
- Cardinality Estimation with HyperLogLog: The core of the analysis is to determine where a user is most active. Rather than simply counting posts, we aggregate the
userday_hllobjects for each country. By performing ahll_unionoperation via a high-performance PostgreSQL backend, we can accurately estimate the total number of distinct active days a user has spent in each country. - Home Location Inference and Export: The country with the highest estimated user-day cardinality is designated as that user's most probable home. This result (
user_guid,countr) is immediately appended to a final output CSV. The script is designed to be resumable, allowing it to be stopped and restarted without losing progress.
The final output is a simple mapping of each user to their inferred home country, which will serve as the basis for the subsequent "local" vs. "tourist" classification.
Preparations¶
import os, sys
import csv
from pathlib import Path
import psycopg2
import geopandas as gp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import seaborn as sns
from typing import Generator, Optional, Tuple
from IPython.display import clear_output, display
from pyproj import Transformer
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base import tools, hll
from modules.base.hll import union_hll, cardinality_hll
%load_ext autoreload
%autoreload 2
OUTPUT = Path.cwd().parents[0] / "out" # output directory for figures (etc.)
WORK_DIR = Path.cwd().parents[0] / "tmp" # Working directory
DATA_FOLDER = Path.cwd().parents[0] / "00_data" # Static Data Folder
OUTPUT.mkdir(exist_ok=True)
(OUTPUT / "figures").mkdir(exist_ok=True)
(OUTPUT / "svg").mkdir(exist_ok=True)
WORK_DIR.mkdir(exist_ok=True)
Plot styling
plt.style.use('default')
CRS_WGS = "epsg:4326" # WGS1984
CRS_PROJ = "esri:54009" # Mollweide
Define Transformer ahead of time with xy-order of coordinates
PROJ_TRANSFORMER = Transformer.from_crs(
CRS_WGS, CRS_PROJ, always_xy=True)
Set global font
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']
Load User Location Data¶
We begin by loading the user location data, which was pre-aggregated and approximated (HLL) and exported to a CSV in the previous notebook. This dataset is too large to load into memory at once, so we will only preview its structure here.
Data Structure:
- user_guid: A unique identifier for each user.
- latitude, longitude: The coordinates of an aggregated location.
- userday_hll: A binary HyperLogLog (HLL) object. It efficiently represents the set of distinct days the user was active at this specific coordinate.
This file is sorted by user_guid, which is a critical precondition for the efficient streaming process that follows.
DE_USERDAYS = DATA_FOLDER / "2025-09-19_userdays_DE_HLL.csv"
Preview CSV
chunks = pd.read_csv(DE_USERDAYS, chunksize=20)
df = next(chunks)
df.head(20)
%%time
data_files = {
"DE_USERDAYS":DE_USERDAYS,
}
tools.display_file_stats(data_files)
Streaming and Processing User Data¶
To handle the multi-gigabyte CSV file without running out of memory, we'll process it as a stream. The following function reads the file chunk by chunk and yields a complete DataFrame for one user at a time. This allows us to analyze each user's entire location history individually.
def stream_user_locations(
filepath: Path, chunksize: int = 100_000
) -> Generator[pd.DataFrame, None, None]:
"""
Reads a large CSV sorted by 'user_guid' and yields a complete
DataFrame for one user at a time. Handles multiple user
boundaries within a single chunk.
"""
user_chunks = []
current_user_id = None
# Use pd.read_csv with chunksize to create an iterator
chunk_iterator = pd.read_csv(
filepath, chunksize=chunksize)
for chunk in chunk_iterator:
if current_user_id is None:
# Initialize with the first user ID from the very first chunk
current_user_id = chunk[
"user_guid"].iloc[0]
# Use a while loop to process the current chunk until it's empty
# This handles chunks that contain data for multiple users
while not chunk.empty:
# Find the row index where the user_guid changes
last_idx = chunk["user_guid"] \
.searchsorted(current_user_id, side="right")
if last_idx < len(chunk):
# Boundary is within this chunk.
# 1. Take the part for the current user and append it
user_part = chunk.iloc[:last_idx]
user_chunks.append(user_part)
# 2. Yield the complete data for this user
yield pd.concat(user_chunks)
# 3. The rest of the chunk belongs to the next user(s)
chunk = chunk.iloc[last_idx:]
# 4. Reset for the next user
user_chunks = []
current_user_id = chunk["user_guid"].iloc[0]
else:
# This entire remaining chunk belongs to the current user.
user_chunks.append(chunk)
# Break the while loop to get the next chunk from the file
break
# After the loop, yield the data for the very last user in the file
if user_chunks:
yield pd.concat(user_chunks)
Test on the first user
user_generator = stream_user_locations(DE_USERDAYS)
df = next(user_generator)
len(df)
df.head(10)
--> The first user in our dataset has 5 aggregated locations. As a test case, we will visualize these points on a map.
userday_locations = gp.GeoDataFrame(
df, geometry=gp.points_from_xy(df.longitude, df.latitude), crs=CRS_WGS)
userday_locations.to_crs(CRS_PROJ, inplace=True)
Get worldmap
world = tools.get_shapes("world", shape_dir=OUTPUT / "shapes")
world.to_crs(CRS_PROJ, inplace=True)
OBSERVATION_COLOR = "red"
Create legend manually
def plot_map(
gdf: gp.GeoDataFrame, x_lim: [Tuple[float, float]] = None, y_lim: [Tuple[float, float]] = None,
world: gp.GeoDataFrame = None, observation_color: str = OBSERVATION_COLOR, fig_size: Tuple[int, int] = None,
return_ax: bool = None):
"""Plot a point map with legend and optional range-shape"""
if fig_size is None:
fig_size = (6, 9)
fig, ax = plt.subplots(1, 1, figsize=fig_size)
gdf.plot(
ax=ax,
markersize=.1,
facecolor=observation_color,
edgecolor=None,
alpha=0.9)
if world is not None:
world.plot(
ax=ax, color='none', edgecolor='black', linewidth=0.3)
if not None in [x_lim, y_lim]:
ax.set_xlim(*x_lim)
ax.set_ylim(*y_lim)
ax.axis('off')
fig.tight_layout()
if return_ax:
return fig, ax
fig.show()
plot
plot_map(
userday_locations, world=world)
This user was obviously only active in Germany. Let's zoom in to Europe:
bbox_europe = -25, 35, 35, 59
# convert to Mollweide
minx, miny = PROJ_TRANSFORMER.transform(
bbox_europe[0], bbox_europe[1])
maxx, maxy = PROJ_TRANSFORMER.transform(
bbox_europe[2], bbox_europe[3])
plot_map(
userday_locations, world=world, x_lim=(minx, maxx), y_lim=(miny, maxy))
Let's see if we can find a user with more userday locations, by looping through every user in the huge CSV, one at a time, until we find one with more than 500 userdays.
for i, user_df in enumerate(user_generator):
user_id = user_df['user_guid'].iloc[0]
print(f"Processing user {i+1}: {user_id} with {len(user_df)} locations...")
# For demonstration, let's just process the first 100 users (max) and stop
if i >= 100 or (len(user_df) >= 500):
break
userday_locations = gp.GeoDataFrame(
user_df, geometry=gp.points_from_xy(user_df.longitude, user_df.latitude), crs=CRS_WGS)
userday_locations.to_crs(CRS_PROJ, inplace=True)
plot_map(
userday_locations, world=world)
This user was obviously active a lot in UK and Japan. We'll have to aggregate userdays to find out which country has more userdays. For aggregating userdays, we can use HLL union to sum the estimate number of distinct userdays per country. Let's look at the world geometry, to see the aggregation base:
world.head()
Connect to HLL Calculation Backend¶
While HLL operations can be performed in Python, they are significantly slower than optimized database implementations. To speed up cardinality estimation, we connect to a local PostgreSQL instance with the citus-hll extension installed. This database acts as a high-performance "calculation backend" for two key operations:
hll_union_agg(): To merge all of a user's HLL objects for a given country.hll_cardinality(): To estimate the number of distinct days from the final unioned HLL.
db_user = "postgres"
db_pass = os.getenv('POSTGRES_PASSWORD')
db_host = "hlldb"
db_port = "5432"
db_name = "hlldb"
db_connection_hll = psycopg2.connect(
host=db_host,
port=db_port,
dbname=db_name,
user=db_user,
password=db_pass
)
db_conn_hll = tools.DbConn(db_connection_hll)
cur_hll = db_connection_hll.cursor()
cur_hll.execute("SELECT 1;")
print(cur_hll.statusmessage)
Simplify query access:
db_conn = tools.DbConn(db_connection_hll)
db_conn.query("SELECT 1;")
Determine Home Location by Country Aggregation¶
For our selected user, we will now determine their most likely home country. The core logic is to spatially join the user's location points with country polygons, and then aggregate the userday_hll data for each country to find which one contains the most activity.
joined_gdf = gp.sjoin(
userday_locations,
world, how="inner", predicate='within')
joined_gdf.head()
Group by country and collect HLLs. This creates a Series where the index is the country and the value is a list of HLL strings.
country_hll_groups = joined_gdf.groupby('country')['userday_hll'].apply(list)
country_hll_groups[:2]
Union all HLLs for each country
unioned_hlls_per_country = country_hll_groups.apply(
lambda hll_list: union_hll(hll_list, cur_hll)
)
unioned_hlls_per_country[:2]
Calculate the cardinality estimate of each resulting unioned HLL
userdays_per_country = unioned_hlls_per_country.apply(
lambda final_hll: cardinality_hll(final_hll, cur_hll)
)
userdays_per_country[:2]
Let's display the results on a choropleth world map. A critical first step is to dissolve the country geometries. The original world shapefile contains multiple polygons for countries with islands or exclaves (e.g., UK, France). Dissolving merges these into a single geometry per country, ensuring a clean spatial join and correct visualization. There is also a buffer operation applied (world.buffer(0.0001)), this is a fix to prevent tiny overlapping shapes see my explanation here).
world['geometry'] = world.buffer(0.0001)
world_dissolved = world.dissolve(by='country')
userdays_per_country.rename("estimated_userdays", inplace=True)
Create map
map_data = world_dissolved.join(userdays_per_country)
map_data["estimated_userdays"] = map_data["estimated_userdays"].fillna(0)
display(map_data[map_data['estimated_userdays'] > 0])
We can already see that UK is likely this user's home location, with 918 total userdays of activity.
def choropleth_map(df: pd.DataFrame):
fig, ax = plt.subplots(1, 1, figsize=(15, 10))
df.plot(
column="estimated_userdays",
cmap="Reds",
linewidth=0.5,
ax=ax,
edgecolor="0.6",
legend=True,
legend_kwds={
"label": "Estimated Number of User-Days",
"orientation": "horizontal",
"shrink": 0.6,
"pad": 0.01
},
missing_kwds={
"color": "lightgrey",
"label": "No Activity"
}
)
user_id = user_df['user_guid'].iloc[0]
ax.set_title(f"User-Day Activity per Country for User {i}", fontsize=16)
ax.set_axis_off()
plt.show()
choropleth_map(map_data)
We can extract this by sorting by userday estimage in descending order
map_data_sorted = map_data.sort_values(by="estimated_userdays", ascending=False)
print(f"\n=> The most likely home country for this user is: {map_data_sorted.index[0]}")
Batch Processing the Entire Dataset¶
Now that the logic for a single user is established, we will apply it to the entire dataset. The following script is designed to be robust and resumable:
- It reads an output CSV to identify and skip users that have already been processed.
- It streams users one by one to keep memory usage low.
- It appends each new result to the CSV immediately, ensuring progress is saved continuously.
This allows the job to be stopped and restarted at any time without losing work.
def determine_home_country(
user_df: pd.DataFrame,
world_gdf: gp.GeoDataFrame,
db_cursor: psycopg2.extensions.cursor
) -> Optional[str]:
"""
Processes a single user's DataFrame to determine their most likely home country.
Args:
user_df: The DataFrame containing all locations for a single user.
world_gdf: The pre-dissolved world GeoDataFrame.
db_cursor: An active psycopg2 database cursor.
Returns:
The name of the most likely home country as a string, or None if no locations match.
"""
# 1. Convert user's posts to a GeoDataFrame
user_locations_proj = gp.GeoDataFrame(
user_df,
geometry=gp.points_from_xy(user_df.longitude, user_df.latitude),
crs=CRS_WGS
).to_crs(world_gdf.crs)
# 2. Spatial Join
joined_gdf = gp.sjoin(user_locations_proj, world_gdf, how="inner", predicate='within')
# If user has no locations within any country polygons, return None
if joined_gdf.empty:
return None
# 3. Group by country and collect HLLs
country_hll_groups = joined_gdf.groupby('country')['userday_hll'].apply(list)
# 4. Calculate total user-days per country
userdays_per_country = country_hll_groups.apply(
lambda hlls: union_hll(hlls, db_cursor)
)
# 5. Determine home country by finding the max
if userdays_per_country.empty:
return None
return userdays_per_country.idxmax()
Reset/load base data
OUTPUT_CSV = OUTPUT / "user_home_locations.csv"
world = tools.get_shapes("world", shape_dir=OUTPUT / "shapes")
world.to_crs(CRS_PROJ, inplace=True)
world['geometry'] = world.buffer(0.0001)
world_dissolved = world.dissolve(by='country')
Optionally resume from previous run
def check_output():
if not OUTPUT_CSV.exists():
return
df_processed = pd.read_csv(OUTPUT_CSV)
processed_users = set(df_processed['user_guid'])
print(f"Found {len(processed_users)} users in existing output file. These will be skipped.")
processed_users = set()
try:
check_output()
except pd.errors.EmptyDataError:
print("Output file exists but is empty. Starting from scratch.")
The final loop. This opens the output file in append mode. The with statement ensures it's properly closed.
if OUTPUT_CSV.exists():
print("Skipping existing output file. Remove it to process again")
else:
with open(OUTPUT_CSV, 'a', newline='', encoding='utf-8') as f:
# Create a CSV writer object
writer = csv.writer(f)
# Write the header only if the file is new (i.e., was just created)
if not processed_users:
writer.writerow(['user_guid', 'country'])
# Create the user data streamer
user_generator = stream_user_locations(DE_USERDAYS)
print("\nStarting user processing...")
# Loop through every user in the huge input file
for i, user_df in enumerate(user_generator):
user_id = user_df['user_guid'].iloc[0]
if user_id in processed_users:
# Skip to the next user
continue
home_country = determine_home_country(
user_df, world_dissolved, cur_hll)
if home_country:
writer.writerow([user_id, home_country])
if (i + 1) % 100 == 0:
clear_output(wait=True)
display(f"Processed {i + 1} users...")
print(f"Processing complete. Results are in {OUTPUT_CSV}")
Running the Batch Process as a Standalone Script¶
For a long-running task like this, it is recommended to execute the core logic as a Python script within a persistent terminal session. This ensures the process continues to run even if the connection to JupyterLab is lost or the user logs out of JupyterLab.
Using the Jupytext Paired Script
This notebook is configured with the Jupytext extension to automatically sync its content to a paired Python file at ../py/_01_clustering.py. The logic of pairing is configured in ../jupytext.toml. To ensure the script is runnable, exploratory cells intended only for interactive use in Jupyter have been tagged with active-ipynb. Jupytext is set up to exclude these tagged cells from the final Python script, leaving only the necessary code for the batch process.
Execution Steps
To run the script, connect to the Carto-Lab instance via shell and use a terminal multiplexer like byobu to create a detachable session.
# 1. Start a new persistent session
byobu
# 2. Open a shell inside the running jupyterlab container
docker compose exec jupyterlab /bin/bash
# 3. Navigate to the directory containing the paired Python script
cd /home/jovyan/work/digital_traces_map/py
# 4. Activate the correct Conda environment
conda activate worker_env
# 5. Run the script.
python _01_clustering.py
Once the script is running, you can safely disconnect from the byobu session by pressing F6. To check on the process later, reconnect to the server and run byobu to re-attach to your session.
Verify the output CSV¶
Once the script is finished, check the CSV again with:
check_output()
These ~600k users exclude the 90% of users who were active in DE that did not have enough userday activity. Attempting to infer a home location for the 5.6 million casual users would have produced extremely noisy, unreliable, and often incorrect results is a benefit here. By filtering them out, we ensure that when our script assigns a home country to a user, it is based on a substantial amount of evidence. This significantly increases the confidence and validity of our final ~600k results.
Classify Tourists and Locals¶
The final step in this workflow is to classify users as either "Locals" or "Tourists" based on their inferred home location. The classification rule is straightforward: if a user's home is determined to be Germany, they are classified as a Local; otherwise, they are a Tourist.
This simple binary classification is a deliberate choice. While more nuanced methods (e.g., based on spatial distance or fine-grained clustering) may be applicable in other contexts, this robust, high-level scheme is the most appropriate starting point given the coarse, country-level granularity of our home location inference.
The code infrastructure established in this and the preceding notebook provides a solid foundation for future extensions, such as implementing a more detailed classification based on distance measures or sub-national clustering.
INPUT_ORIGIN_CSV = OUTPUT_CSV
OUTPUT_CLASSIFICATION_CSV = OUTPUT / "user_classification.csv"
Load the CSV
df_homes = pd.read_csv(
INPUT_ORIGIN_CSV,
header=0, # Explicitly state the first row (index 0) is the header.
skiprows=[1] # Skip the second row (index 1), which is a duplicate header accidentally added in our above script.
)
df_homes.head()
Default classification as Tourist, then select the subset with Germany as origin and classify as Local.
df_homes['classification'] = 'Tourist'
df_homes.loc[
df_homes['country'] == 'Germany',
'classification'] = 'Local'
Write to new output file.
usecols = ["user_guid", "classification"]
df_homes.to_csv(OUTPUT_CLASSIFICATION_CSV, index=False, columns=usecols)
We can also visualize the distribution of Locals vs. Tourists in our dataset.
classification_counts = df_homes['classification'].value_counts()
print("Distribution of Users:")
print(classification_counts)
ratio_value = classification_counts.iloc[0] / classification_counts.iloc[1]
larger_group_name = classification_counts.index[0]
smaller_group_name = classification_counts.index[1]
print(f"The ratio of {larger_group_name}s to {smaller_group_name}s is {ratio_value:.1f} to 1.")
This is what other publications reported as well, a 1:2 to 1:3 distribution of Locals vs. Tourists in Geo-Social Media data.
plt.figure(figsize=(9, 4))
sns.set_theme(style="white")
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']
plt.rcParams.update({
"axes.labelsize": 14,
"axes.titlesize": 16,
"xtick.labelsize": 12,
"ytick.labelsize": 12,
"legend.fontsize": 12
})
order = df_homes['classification'].value_counts().sort_values().index
ax = sns.countplot(
x='classification',
data=df_homes,
order=order,
hue='classification',
palette=sns.color_palette("vlag", n_colors=2)[::-1], # reversed colors
legend=False
)
ax.yaxis.set_major_formatter(
mticker.FuncFormatter(lambda x, pos: f'{x:,.0f}'))
fig = ax.get_figure()
# ax.set_title(
# 'Distribution of Locals vs. Tourists in Germany Dataset',
# fontsize=16)
ax.set_xlabel('User Classification', fontsize=12)
ax.set_ylabel('Number of Users', fontsize=12)
for p in ax.patches:
ax.annotate(
f'{p.get_height():,.0f}',
(p.get_x() + p.get_width() / 2., p.get_height()),
ha='center', va='center',
xytext=(0, 9), textcoords='offset points',
fontsize=16, fontweight='bold')
sns.despine()
plt.show()
tools.save_fig(fig, output=OUTPUT, name="barplot_locals_tourists_de")
Create notebook HTML¶
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./01_clustering.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-