rpmjp/portfolio
rpmjp/projects/communityshield/heatmap_endpoint.py
CompletedMay – August 2025

CommunityShield

ML-powered crime pattern explorer for Chicago. 8.5M rows, 4 XGBoost models with SHAP explanations, beat-level heatmap, and an honest methodology page about what the data can and cannot tell you.

Python 3.12FastAPIPostgreSQL 16PostGISXGBoostSHAPReact 19MapLibre GL
Languages
TypeScript52.4%
Python41.8%
CSS3.2%
Other2.6%
heatmap_endpoint.py
"""Heatmap data endpoint.

Returns aggregated incident counts per beat with optional filters.
Backed by the beat_rollups table (pre-aggregated) for performance.

The critical design decision is here: this endpoint NEVER touches the raw
crimes table. Every query hits beat_rollups, which is keyed by
(city_id, beat_number, year, month, hour, day_of_week, primary_type) and
backed by a composite covering index. Typical query latency: ~15ms across
8.5M underlying rows.
"""
from __future__ import annotations

from typing import Optional

from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import text

from app.db import SessionLocal
from app.schemas.heatmap import (
    BeatHeatmapCell,
    CrimeTypeOption,
    HeatmapResponse,
)


router = APIRouter(prefix="/heatmap", tags=["heatmap"])


@router.get("", response_model=HeatmapResponse)
def get_heatmap(
    city_slug: str = Query("chicago"),
    year: Optional[int] = Query(None, description="Filter to a specific year"),
    year_from: Optional[int] = Query(None),
    year_to: Optional[int] = Query(None),
    hour_min: int = Query(0, ge=0, le=23),
    hour_max: int = Query(23, ge=0, le=23),
    primary_type: Optional[str] = Query(None, description="e.g. THEFT, BATTERY"),
):
    """Return per-beat aggregated counts for the given filters.

    Fast: uses ix_beat_rollups_heatmap_query composite index. ~15ms typical.
    """
    if year is not None and (year_from is not None or year_to is not None):
        raise HTTPException(400, "Use either 'year' or 'year_from/year_to', not both")

    # Wrap-around hour windows (e.g. 22 -> 5 for overnight crime queries)
    # need the OR form; same-day windows use BETWEEN.
    hour_clause = (
        "br.hour BETWEEN :hour_min AND :hour_max"
        if hour_min <= hour_max
        else "(br.hour >= :hour_min OR br.hour <= :hour_max)"
    )
    where_clauses = [
        "c.slug = :city_slug",
        hour_clause,
    ]
    params: dict = {
        "city_slug": city_slug,
        "hour_min": hour_min,
        "hour_max": hour_max,
    }

    if year is not None:
        where_clauses.append("br.year = :year")
        params["year"] = year
    elif year_from is not None or year_to is not None:
        if year_from is not None:
            where_clauses.append("br.year >= :year_from")
            params["year_from"] = year_from
        if year_to is not None:
            where_clauses.append("br.year <= :year_to")
            params["year_to"] = year_to

    if primary_type is not None:
        where_clauses.append("br.primary_type = :primary_type")
        params["primary_type"] = primary_type

    sql = text(f"""
        SELECT
            br.beat_number,
            SUM(br.incident_count)::int AS incident_count,
            SUM(br.arrest_count)::int AS arrest_count,
            SUM(br.domestic_count)::int AS domestic_count
        FROM beat_rollups br
        JOIN cities c ON c.id = br.city_id
        WHERE {' AND '.join(where_clauses)}
        GROUP BY br.beat_number
        ORDER BY br.beat_number
    """)

    with SessionLocal() as session:
        rows = session.execute(sql, params).fetchall()

    if not rows:
        return HeatmapResponse(
            city_slug=city_slug,
            filters={
                "year": year, "year_from": year_from, "year_to": year_to,
                "hour_min": hour_min, "hour_max": hour_max,
                "primary_type": primary_type,
            },
            beats=[],
            total_incidents=0,
            max_beat_incidents=0,
        )

    beats = [
        BeatHeatmapCell(
            beat_number=r.beat_number,
            incident_count=r.incident_count,
            arrest_count=r.arrest_count,
            domestic_count=r.domestic_count,
        )
        for r in rows
    ]
    total = sum(b.incident_count for b in beats)
    max_count = max(b.incident_count for b in beats)

    return HeatmapResponse(
        city_slug=city_slug,
        filters={
            "year": year, "year_from": year_from, "year_to": year_to,
            "hour_min": hour_min, "hour_max": hour_max,
            "primary_type": primary_type,
        },
        beats=beats,
        total_incidents=total,
        max_beat_incidents=max_count,
    )