rpmjp/portfolio
rpmjp/projects/communityshield/populate_rollups.py
CompletedMay – August 2025

CommunityShield

ML-powered crime pattern explorer for Chicago. 8.5M rows, 4 XGBoost models with SHAP explanations, beat-level heatmap, and an honest methodology page about what the data can and cannot tell you.

Python 3.12FastAPIPostgreSQL 16PostGISXGBoostSHAPReact 19MapLibre GL
Languages
TypeScript52.4%
Python41.8%
CSS3.2%
Other2.6%
populate_rollups.py
"""Populate beat_rollups from the crimes table.

One big aggregation query. Idempotent: clears prior rollups for the city
first, then rebuilds them in a single INSERT ... SELECT.

This is the script that materializes the 7.8M rollup rows the heatmap
endpoint reads from. Running time is 1-3 minutes against 8.5M source rows
because Postgres can do the GROUP BY entirely in-memory — the working set
for the aggregation is small once it's keyed by (beat, year, month, hour,
day_of_week, primary_type).

The alternative — incremental upserts on every ingest — was considered and
rejected: a single full rebuild every 24 hours is operationally simpler,
impossible to get wrong, and runs in less time than the ingest itself.
"""
from __future__ import annotations

import argparse
import time
import uuid

import psycopg

from app.config import get_settings


def get_city_id(conn: psycopg.Connection, slug: str) -> uuid.UUID:
    with conn.cursor() as cur:
        cur.execute("SELECT id FROM cities WHERE slug = %s", (slug,))
        row = cur.fetchone()
        if not row:
            raise RuntimeError(f"City '{slug}' not found.")
        return row[0]


def populate(city_slug: str = "chicago") -> None:
    settings = get_settings()
    # SQLAlchemy URL has the +psycopg driver suffix; psycopg expects the bare URL.
    db_url = settings.database_url.replace("postgresql+psycopg://", "postgresql://")

    with psycopg.connect(db_url) as conn:
        city_id = get_city_id(conn, city_slug)
        print(f"City '{city_slug}': {city_id}")

        with conn.cursor() as cur:
            # Idempotency: clear this city's rollups before rebuilding.
            # Scoped by city_id so other cities aren't affected.
            print("Clearing existing rollups for this city...")
            cur.execute("DELETE FROM beat_rollups WHERE city_id = %s", (city_id,))

            print("Aggregating crimes into beat_rollups (this takes 1-3 minutes)...")
            start = time.time()

            # The aggregation. Notes on each piece:
            #   - EXTRACT(ISODOW) - 1 produces 0..6 (Mon..Sun) instead of
            #     Postgres's default 1..7. The frontend filter uses 0..6.
            #   - count(*) FILTER (WHERE ...) avoids needing separate queries
            #     for arrest_count and domestic_count — one pass, one row out.
            #   - GROUP BY includes every dimension the heatmap endpoint can
            #     filter on. The composite index on beat_rollups matches this
            #     grouping exactly, so reads are O(log n) on the rollup.
            cur.execute(
                """
                INSERT INTO beat_rollups (
                    id, city_id, beat_number, year, month, hour, day_of_week,
                    primary_type, incident_count, arrest_count, domestic_count
                )
                SELECT
                    gen_random_uuid(),
                    city_id,
                    beat,
                    year,
                    EXTRACT(MONTH FROM occurred_at)::int AS month,
                    EXTRACT(HOUR FROM occurred_at)::int AS hour,
                    EXTRACT(ISODOW FROM occurred_at)::int - 1 AS day_of_week,
                    primary_type,
                    count(*) AS incident_count,
                    count(*) FILTER (WHERE arrest = true) AS arrest_count,
                    count(*) FILTER (WHERE domestic = true) AS domestic_count
                FROM crimes
                WHERE city_id = %s
                  AND beat IS NOT NULL
                  AND occurred_at IS NOT NULL
                  AND year IS NOT NULL
                GROUP BY city_id, beat, year, month, hour, day_of_week, primary_type
                """,
                (city_id,),
            )
            inserted = cur.rowcount
            conn.commit()

            elapsed = time.time() - start
            print(f"Inserted {inserted:,} rollup rows in {elapsed:.1f}s")


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("--city", type=str, default="chicago")
    args = parser.parse_args()
    populate(args.city)


if __name__ == "__main__":
    main()