Reddit query using pmaw + praw ¶
Alexander Dunkel, Leibniz Institute of Ecological Urban and Regional Development,
Transformative Capacities & Research Data Centre (IÖR-FDZ)
Publication:
Dunkel, A., Burghardt, D. (2024). Assessing perceived landscape change from opportunistic spatio-temporal occurrence data. Land 2024
This is part 2 of the Reddit API notebooks. In the first notebook (02_reddit_api.html), we used the official Reddit API. Since this is limited to the latest 1000 entries, we use the Pushshift.io API here to retrieve all posts for a given subreddit:
- Pushshift Reddit API Documentation
- pmaw - Multithread
- praw (the original Reddit API) is used to collect additional attributes
Prepare environment¶
We install dependencies required to run this notebook in the cells below.
import os
import time
import sys
import logging
import json
import calendar
import pprint
import datetime as dt
from pathlib import Path
from typing import List, Tuple, Dict, Optional, Set
from IPython.display import clear_output, display, HTML, Markdown
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules.base.tools import display_file
Activate autoreload of changed python files:
%load_ext autoreload
%autoreload 2
Parameters¶
Define initial parameters that affect processing
WORK_DIR = Path.cwd().parents[0] / "tmp" # Working directory
OUTPUT = Path.cwd().parents[0] / "out" # Define path to output directory (figures etc.)
WORK_DIR.mkdir(exist_ok=True)
OUTPUT.mkdir(exist_ok=True)
Environment setup¶
Install pmaw from pip to the existing praw env.
%%bash
if [ ! -d "/envs/praw/lib/python3.9/site-packages/pmaw" ]; then
/envs/praw/bin/python -m pip install pmaw > /dev/null 2>&1
else
echo "Already installed."
fi
# link
if [ ! -d "/root/.local/share/jupyter/kernels/praw_env" ]; then
echo "Linking environment to jupyter"
/envs/praw/bin/python -m ipykernel install --user --name=praw_env
else
echo "Already linked."
fi
Pushshift API¶
from pmaw import PushshiftAPI
API = PushshiftAPI()
api.praw
can be used to enrich the Pushshift items retrieved with metadata directly from Reddit
from dotenv import load_dotenv
load_dotenv(
Path.cwd().parents[0] / '.env', override=True)
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
USER_AGENT = os.getenv("USER_AGENT")
REFRESH_TOKEN = os.getenv("REFRESH_TOKEN")
import praw
REDDIT = praw.Reddit(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
user_agent=USER_AGENT,
refresh_token=REFRESH_TOKEN
)
API.praw = REDDIT
Search submissions¶
before = int(dt.datetime(2021,4,11,0,0).timestamp())
after = int(dt.datetime(2021,1,1,0,0).timestamp())
We will query the following subreddits (based on Wikipedia's list, sorted by visitors descending):
Click
r/yosemite
r/yellowstone
r/GlacierNationalPark
r/ZionNP
,r/ZionNationalPark/
r/GSMNP
(Great Smoky Mountains National Park)r/grandcanyon
,,r/GrandCanyonNP
r/Grandcanyonhiking
r/RockyMountain
r/acadianationalpark
r/GrandTetonNatlPark
,r/GrandTeton
r/IndianaDunes
r/JoshuaTree
r/OlympicNationalPark
- (
r/CuyahogaFalls
) - (
r/HotSprings
) r/BryceCanyon
r/archesnationalpark
,r/arches
r/NewRiverGorgeNP
r/Mount_Rainier
r/shenandoah
,r/ShenandoahPark
r/CapitolReefNP
,r/capitolreef
r/DeathValleyNP
,r/deathvalley
r/Sequoia
,r/KingsCanyon
r/Everglades
r/Canyonlands
r/haleakala
r/CraterLake
r/PetrifiedForest
r/BigBendNationalPark
(deprecated),r/BigBend
,BigBendTX
r/MammothCave
r/Redwoodnationalpark
r/KenaiPeninsula
r/lassenvolcanic
r/CarlsbadCavernsNP
r/PinnaclesNP
,r/Pinnacles
r/virginislands
r/GreatBasinStories
r/glacier
r/isleroyale
r/northcascades
r/AmericanSamoa
* Crossed entries have been excluded due to too little content (submissions < 50)
** Entries in parentheses are alternative subreddits, or subreddits referring to popular landscape features within National Parks
SUBREDDIT = "RockyMountain"
Get created date and number of members using praw
(the original Reddit API):
sub_yosemite = REDDIT.subreddit(SUBREDDIT)
print(f'Created: {dt.datetime.fromtimestamp(sub_yosemite.created_utc):%Y-%m-%d}')
print(f'Subscribers ("Members"): {sub_yosemite.subscribers}')
(OUTPUT / SUBREDDIT).mkdir(exist_ok=True)
%%time
all_submissions = list(API.search_submissions(subreddit=SUBREDDIT, until=before, since=after))
print(f'{len(all_submissions)}')
Store data to json¶
Select fields to store below.
SUBMISSION_FIELDS = (
'id', 'created_utc', 'author_flair_text', 'author', 'is_original_content', 'is_self',
'link_flair_text', 'name', 'num_comments', 'permalink', 'media', 'over_18', 'score',
'selftext', 'title', 'total_awards_received', 'url', 'view_count', 'thumbnail')
def get_new_filename(name: Path) -> Path:
counter = 0
while Path(name.parent / f'{name.stem}_{counter:02}{name.suffix}').exists():
counter += 1
return Path(name.parent / f'{name.stem}_{counter:02}{name.suffix}')
def write_json(
items: List[Dict[str, str]], name: str, output: Path = OUTPUT,
submission_fields: Tuple[str] = SUBMISSION_FIELDS, subreddit: str = SUBREDDIT):
"""Filter attributes and write list of dictionaries as json dump to disk"""
list_of_items = []
for submission in items:
sub_dict = {
field:str(submission[field]) if field == 'author' \
else submission[field] for field in submission_fields}
sub_dict['subreddit'] = subreddit
list_of_items.append(sub_dict)
filename = output / subreddit / name
if filename.exists():
filename = get_new_filename(filename)
with open(filename, 'w') as f:
json.dump(list_of_items, f)
Loop through all months¶
Let's define a few functions to automate looping through months and storing data to json on disk.
We first need a few methods to catch PushShift API RuntimeError
, in case not all pushshift shards are active. In these cases, we want to wait for some time and query again, or ask the user how to proceed.
class WaitingException(Exception):
pass
def get_submissions(
subreddit: str, before: int, after: int, api: PushshiftAPI) -> List[Dict[str, str]]:
"""Retrieve submissions from PushshiftAPI, given a subreddit and a before and after date"""
return list(api.search_submissions(subreddit=subreddit, until=before, since=after))
def query(
subreddit: str, before: int, after: int,
api: PushshiftAPI, num_retries: int = 3) -> List[Dict[str, str]]:
"""Query pushshift API until valid query returned,
wait if RuntimeError occurs (not all shards active)
"""
for ix in range(num_retries):
try:
return get_submissions(
subreddit=subreddit, before=before, after=after, api=api)
except RuntimeError:
if ix < (num_retries - 1):
print(
f"Waiting for all PushShift shards "
f"to become active again (try {ix+1} of {num_retries})")
if ix == 0:
print("Sleeping for 10 Minutes")
time.sleep(600)
elif ix == 1:
print("Sleeping for 2 Hours")
time.sleep(7200)
elif ix == 2:
print("Sleeping for 24 Hours")
time.sleep(86400)
else:
raise WaitingException()
def query_pushshift(
subreddit: str, before: int, after: int,
api: PushshiftAPI, num_retries: int = 3) -> List[Dict[str, str]]:
"""Query pushshift API until valid query returned or user explicitly cancels query"""
while True:
# as long as either valid query returned or break occurs
try:
return query(
subreddit=subreddit, before=before, after=after,
api=api, num_retries=3)
except WaitingException:
print(
f"Waited {num_retries} times and still "
f"not all PushShift shards are active.")
while True:
# as long as user enters yes/no
pick = input("Continue waiting? (y/n)").lower()
if pick in ('yes', 'y'):
print("Repeating query...")
elif pick in ('no', 'n'):
print("Canceling query..")
break
else:
print("Select either yes (y) or no (n).")
def query_time(api: PushshiftAPI = API, subreddit: str = SUBREDDIT,
start_year: int = 2010, end_year: int = 2023,
start_month: int = 1, end_month: int = 12, reddit: praw.Reddit = REDDIT) -> int:
"""Given a start and end date, loop through years/months
in reverse order and store results to disk.
Returns total queried count (int) per subreddit.
"""
total_queried = 0
zero_received = 0
query_delta = 4 # weeks
_exit = False
created_date = dt.datetime.fromtimestamp(
reddit.subreddit(subreddit).created_utc)
min_date = dt.datetime(start_year, start_month, 1, 0, 0)
if min_date < created_date:
# check if min date is below creation of subreddit
min_date = created_date
print(
f'[{subreddit}] Limited to lowest available date for this subreddit: '
f'{min_date.year}-{min_date.month}')
_, max_day = calendar.monthrange(end_year, end_month)
max_date = dt.datetime(end_year, end_month, max_day, 0, 0)
query_max_date = max_date
query_min_date = max_date - dt.timedelta(weeks=4)
while query_min_date >= min_date and not _exit:
if query_min_date == min_date:
# last round
_exit = True
# limit query by time period, day accuracy, 1 day overlap buffer
before = int(
(dt.datetime(
query_max_date.year, query_max_date.month, query_max_date.day, 0, 0) + \
dt.timedelta(days=1)).timestamp())
after = int(
dt.datetime(
query_min_date.year, query_min_date.month, query_min_date.day, 0, 0).timestamp())
print(
f'Querying between {query_min_date.year}-{query_min_date.month}-{query_min_date.day} (min) '
f'and {query_max_date.year}-{query_max_date.month}-{query_max_date.day} (max)')
all_submissions = query_pushshift(
subreddit=subreddit, before=before, after=after, api=api, num_retries=3)
if all_submissions is None:
_exit = True
count = len(all_submissions)
total_queried += count
# clear jupyter output
clear_output(wait=True)
print(
f'[{subreddit}] Retrieved {count} submissions between '
f'{query_min_date.year}-{query_min_date.month}-{query_min_date.day} '
f'and {query_max_date.year}-{query_max_date.month}-{query_max_date.day}. '
f'Total queried: {total_queried:,}')
if count == 0:
zero_received += 1
if zero_received > 12:
# if none received for the last 12 queries,
# exit early
print(f'Exiting early at {query_min_date.year}-{query_min_date.month}')
_exit = True
else:
zero_received = 0
if count > 0 and count < 100:
ratio = int(100/count)
# increase timespan queried, based on counts retrieved,
# but limit range to max 12 months (=52 weeks)
query_delta += min(ratio*4, 52)
# query_delta can only go up, but not down
# write data
write_json(
items=all_submissions,
name=f'reddit_{subreddit}_{query_min_date.year}-{query_min_date.month}-{query_min_date.day}.json',
subreddit=subreddit)
# update query_min_date and query_max_date
query_max_date = query_min_date
query_min_date = query_max_date - dt.timedelta(weeks=query_delta)
if query_min_date < min_date:
query_min_date = min_date
return total_queried
SUBREDDIT = 'GrandTetonNatlPark'
(OUTPUT / SUBREDDIT).mkdir(exist_ok=True)
query_time(start_year=2010, start_month=1, end_year=2023, end_month=4, subreddit=SUBREDDIT, reddit=REDDIT)
Retrieve comments for submissions¶
We can use the original Reddit API to get comments for submissions, since most submissions contain less than 1000 comments.
Have a look at the praw comments docs.
Test this for the submission with id (a2s0cb), which has 18 comments.
submission = REDDIT.submission("a2s0cb")
submission.comments.replace_more(limit=None)
Have a look at the available attributes for comments:
display(Markdown(
f'<details><summary>Click</summary>\n\n```\n\n'
f'{pprint.pformat(vars(submission.comments[0]), indent=4)}\n\n```\n\n</details>'))
Filter for selected comment fields
COMMENTS_FIELDS = (
'id', 'created_utc', 'author_flair_text', 'author', 'is_submitter',
'name', 'parent_id', 'permalink', 'score',
'body', 'total_awards_received', 'ups', 'downs', 'likes')
Traverse stored json and retrieve all comments¶
The last step is to loop through the stored json for all submissions to retrieve all comments.
from prawcore.exceptions import ServerError, Forbidden, RequestException
def write_list(list_of_items: List[Dict[str, str]], output: Path):
"""Write list of json items as dump to disk"""
if not list_of_items:
return
filename = output / f'reddit_comments.json'
if filename.exists():
filename = get_new_filename(filename)
with open(filename, 'w') as f:
json.dump(list_of_items, f)
print(f'Wrote {len(list_of_items)} comments to {filename.name}')
def filter_comments_json(
items: List[Dict[str, str]], submission_id: str,
list_of_items: List[Dict[str, str]], comments_fields: Tuple[str] = COMMENTS_FIELDS):
"""Filter attributes of dictionaries as json per 1000 batch"""
if not items:
return
for comment in items:
# initialize actual values from lazy fields API
comment = vars(comment)
# except for author field, all other fields are already str
sub_dict = {
field:str(comment[field]) if field == 'author' else \
comment[field] for field in comments_fields}
sub_dict['submission_id'] = submission_id
list_of_items.append(sub_dict)
def query_comments(submission_id: str, reddit: praw.Reddit, num_retries: int = 3):
"""Query a single submission for all comments"""
for ix in range(num_retries):
try:
submission = reddit.submission(submission_id)
submission.comments.replace_more(limit=None)
return submission.comments.list()
except (ServerError, RequestException) as e:
print(f"Received {e.__name__}")
if ix < (num_retries - 1):
print(
f"Waiting for the Reddit API to become responsive again "
f"(try {ix+1} of {num_retries})")
if ix == 0:
print("Sleeping for 1 Minutes")
time.sleep(60)
elif ix == 1:
print("Sleeping for 10 Minutes")
time.sleep(600)
elif ix == 2:
print("Sleeping for 1 Hours")
time.sleep(3600)
else:
raise WaitingException()
except Forbidden:
if ix < (num_retries - 1):
print(
f"Received a Forbidden Exception. "
f"(try {ix+1} of {num_retries})")
if ix == 0:
print(f"Trying one more time after 1 Minute.. ")
time.sleep(60)
else:
print(f"Skipping entry.. ")
pass
def get_all_comments(
submission_id: str, list_of_items: List[Dict[str, str]],
output: Path, total_ct: int, perc: str, reddit: praw.Reddit) -> int:
"""Get all comments for submission"""
all_comments = query_comments(submission_id=submission_id, reddit=reddit)
if all_comments is None:
return 0
filter_comments_json(
items=all_comments, submission_id=submission_id, list_of_items=list_of_items)
if len(list_of_items) > 1000:
write_list(list_of_items=list_of_items, output=output)
list_of_items.clear()
clear_output(wait=True)
comments_count = len(all_comments)
print(
f'Retrieved {comments_count} comments for {submission_id}. '
f'Total comments queried: {total_ct+comments_count:,} - {perc} files.', end='\r')
return comments_count
See if some submissions have already been processed
SUBREDDIT = "GlacierNationalPark"
processed_file = OUTPUT / SUBREDDIT / "00_processed_submissions.txt"
already_processed = set()
if processed_file.exists():
already_processed = set(line.strip() for line in open(processed_file, "r"))
Loop through all submissions
def get_comments_subreddit(
already_processed: Set[str], subreddit: str = SUBREDDIT, output: Path = OUTPUT, reddit: praw.Reddit = REDDIT):
"""Parse a list of submissions (json), stored in a folder for a subreddit,
and retrieve comments as json from Reddit's original API"""
list_of_items = []
total_ct = 0
output_comments = output / subreddit / "comments"
output_comments.mkdir(exist_ok=True)
start_with_ix = 0
files = list(reversed(sorted((output / subreddit).glob("*.json"))))
print(f"Processing {len(files)} json files for subreddit {subreddit}")
skipped = 0
for ix, json_file in enumerate(files):
if ix < start_with_ix:
continue
submissions = json.load(open(json_file, 'r'))
if len(submissions) == 0:
continue
perc = f'{ix} of {len(files)}'
for submission_json in submissions:
sub_id = submission_json['id']
if sub_id in already_processed:
skipped += 1
continue
if skipped:
print(f'Skipped {skipped} submission ids that have already been processed')
skipped = 0
total_ct += get_all_comments(sub_id, list_of_items, output_comments, total_ct, perc, reddit)
with open(output / subreddit / "00_processed_submissions.txt", "a") as cfile:
cfile.write(f'{sub_id}\n')
already_processed.add(sub_id)
print(f'\nFinished {json_file.name}')
print(f'Writing remaining')
write_list(list_of_items=list_of_items, output=output_comments)
print(f'Finished retrieving all comments for {subreddit}')
get_comments_subreddit(
already_processed=already_processed, subreddit=SUBREDDIT, output=OUTPUT)
Make this notebook executable via cli¶
Create a list of all park reddits to query
PARKS_SUBREDDITS: List[str] = [
'GrandTetonNatlPark', 'GrandTeton', 'JoshuaTree', 'OlympicNationalPark',
'CuyahogaFalls', 'HotSprings', 'BryceCanyon', 'archesnationalpark', 'arches',
'NewRiverGorgeNP', 'Mount_Rainier', 'shenandoah', 'ShenandoahPark', 'CapitolReefNP',
'DeathValleyNP', 'deathvalley', 'Sequoia', 'Everglades', 'Canyonlands', 'haleakala',
'CraterLake', 'BigBendNationalPark', 'BigBend', 'MammothCave', 'Redwoodnationalpark',
'KenaiPeninsula', 'lassenvolcanic', 'CarlsbadCavernsNP', 'PinnaclesNP', 'virginislands',
'GreatBasinStories', 'glacier', 'isleroyale', 'northcascades', 'AmericanSamoa']
Create a python script and import methods from this notebook. All variables and methods in cells not tagged with active-ipynb
will be loaded and available.
Query for submissions:
display_file(Path.cwd().parents[0] / 'py' / 'get_all_submissions.py')
Query for comments:
display_file(Path.cwd().parents[0] / 'py' / 'get_all_comments.py')
To run:
- cd into
py/
directory - activate
praw
env (e.g.conda activate praw/
) python get_all_submissions.py
python get_all_comments.py
Create a zip file with all retrieved jsons¶
from modules.base.tools import zip_dir
%%time
from datetime import date
today = str(date.today())
zip_file = OUTPUT / SUBREDDIT / f'{today}_{SUBREDDIT}_submissions_all.zip'
if not zip_file.exists():
zip_dir(OUTPUT / SUBREDDIT, zip_file)
zip_file = OUTPUT / SUBREDDIT /f'{today}_{SUBREDDIT}_comments_all.zip'
if not zip_file.exists():
zip_dir(OUTPUT / SUBREDDIT / "comments", zip_file)
Create notebook HTML¶
!jupyter nbconvert --to html_toc \
--output-dir=../resources/html/ ./03_reddit_pmaw.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&-