rpmjp/projects/communityshield/populate_rollups.py
CompletedMay – August 2025
CommunityShield
ML-powered crime pattern explorer for Chicago. 8.5M rows, 4 XGBoost models with SHAP explanations, beat-level heatmap, and an honest methodology page about what the data can and cannot tell you.
Python 3.12FastAPIPostgreSQL 16PostGISXGBoostSHAPReact 19MapLibre GL
Languages
TypeScript52.4%
Python41.8%
CSS3.2%
Other2.6%
populate_rollups.py
"""Populate beat_rollups from the crimes table.
One big aggregation query. Idempotent: clears prior rollups for the city
first, then rebuilds them in a single INSERT ... SELECT.
This is the script that materializes the 7.8M rollup rows the heatmap
endpoint reads from. Running time is 1-3 minutes against 8.5M source rows
because Postgres can do the GROUP BY entirely in-memory — the working set
for the aggregation is small once it's keyed by (beat, year, month, hour,
day_of_week, primary_type).
The alternative — incremental upserts on every ingest — was considered and
rejected: a single full rebuild every 24 hours is operationally simpler,
impossible to get wrong, and runs in less time than the ingest itself.
"""
from __future__ import annotations
import argparse
import time
import uuid
import psycopg
from app.config import get_settings
def get_city_id(conn: psycopg.Connection, slug: str) -> uuid.UUID:
with conn.cursor() as cur:
cur.execute("SELECT id FROM cities WHERE slug = %s", (slug,))
row = cur.fetchone()
if not row:
raise RuntimeError(f"City '{slug}' not found.")
return row[0]
def populate(city_slug: str = "chicago") -> None:
settings = get_settings()
# SQLAlchemy URL has the +psycopg driver suffix; psycopg expects the bare URL.
db_url = settings.database_url.replace("postgresql+psycopg://", "postgresql://")
with psycopg.connect(db_url) as conn:
city_id = get_city_id(conn, city_slug)
print(f"City '{city_slug}': {city_id}")
with conn.cursor() as cur:
# Idempotency: clear this city's rollups before rebuilding.
# Scoped by city_id so other cities aren't affected.
print("Clearing existing rollups for this city...")
cur.execute("DELETE FROM beat_rollups WHERE city_id = %s", (city_id,))
print("Aggregating crimes into beat_rollups (this takes 1-3 minutes)...")
start = time.time()
# The aggregation. Notes on each piece:
# - EXTRACT(ISODOW) - 1 produces 0..6 (Mon..Sun) instead of
# Postgres's default 1..7. The frontend filter uses 0..6.
# - count(*) FILTER (WHERE ...) avoids needing separate queries
# for arrest_count and domestic_count — one pass, one row out.
# - GROUP BY includes every dimension the heatmap endpoint can
# filter on. The composite index on beat_rollups matches this
# grouping exactly, so reads are O(log n) on the rollup.
cur.execute(
"""
INSERT INTO beat_rollups (
id, city_id, beat_number, year, month, hour, day_of_week,
primary_type, incident_count, arrest_count, domestic_count
)
SELECT
gen_random_uuid(),
city_id,
beat,
year,
EXTRACT(MONTH FROM occurred_at)::int AS month,
EXTRACT(HOUR FROM occurred_at)::int AS hour,
EXTRACT(ISODOW FROM occurred_at)::int - 1 AS day_of_week,
primary_type,
count(*) AS incident_count,
count(*) FILTER (WHERE arrest = true) AS arrest_count,
count(*) FILTER (WHERE domestic = true) AS domestic_count
FROM crimes
WHERE city_id = %s
AND beat IS NOT NULL
AND occurred_at IS NOT NULL
AND year IS NOT NULL
GROUP BY city_id, beat, year, month, hour, day_of_week, primary_type
""",
(city_id,),
)
inserted = cur.rowcount
conn.commit()
elapsed = time.time() - start
print(f"Inserted {inserted:,} rollup rows in {elapsed:.1f}s")
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--city", type=str, default="chicago")
args = parser.parse_args()
populate(args.city)
if __name__ == "__main__":
main()