rpmjp/projects/communityshield/heatmap_endpoint.py
CompletedMay – August 2025
CommunityShield
ML-powered crime pattern explorer for Chicago. 8.5M rows, 4 XGBoost models with SHAP explanations, beat-level heatmap, and an honest methodology page about what the data can and cannot tell you.
Python 3.12FastAPIPostgreSQL 16PostGISXGBoostSHAPReact 19MapLibre GL
Languages
TypeScript52.4%
Python41.8%
CSS3.2%
Other2.6%
heatmap_endpoint.py
"""Heatmap data endpoint.
Returns aggregated incident counts per beat with optional filters.
Backed by the beat_rollups table (pre-aggregated) for performance.
The critical design decision is here: this endpoint NEVER touches the raw
crimes table. Every query hits beat_rollups, which is keyed by
(city_id, beat_number, year, month, hour, day_of_week, primary_type) and
backed by a composite covering index. Typical query latency: ~15ms across
8.5M underlying rows.
"""
from __future__ import annotations
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import text
from app.db import SessionLocal
from app.schemas.heatmap import (
BeatHeatmapCell,
CrimeTypeOption,
HeatmapResponse,
)
router = APIRouter(prefix="/heatmap", tags=["heatmap"])
@router.get("", response_model=HeatmapResponse)
def get_heatmap(
city_slug: str = Query("chicago"),
year: Optional[int] = Query(None, description="Filter to a specific year"),
year_from: Optional[int] = Query(None),
year_to: Optional[int] = Query(None),
hour_min: int = Query(0, ge=0, le=23),
hour_max: int = Query(23, ge=0, le=23),
primary_type: Optional[str] = Query(None, description="e.g. THEFT, BATTERY"),
):
"""Return per-beat aggregated counts for the given filters.
Fast: uses ix_beat_rollups_heatmap_query composite index. ~15ms typical.
"""
if year is not None and (year_from is not None or year_to is not None):
raise HTTPException(400, "Use either 'year' or 'year_from/year_to', not both")
# Wrap-around hour windows (e.g. 22 -> 5 for overnight crime queries)
# need the OR form; same-day windows use BETWEEN.
hour_clause = (
"br.hour BETWEEN :hour_min AND :hour_max"
if hour_min <= hour_max
else "(br.hour >= :hour_min OR br.hour <= :hour_max)"
)
where_clauses = [
"c.slug = :city_slug",
hour_clause,
]
params: dict = {
"city_slug": city_slug,
"hour_min": hour_min,
"hour_max": hour_max,
}
if year is not None:
where_clauses.append("br.year = :year")
params["year"] = year
elif year_from is not None or year_to is not None:
if year_from is not None:
where_clauses.append("br.year >= :year_from")
params["year_from"] = year_from
if year_to is not None:
where_clauses.append("br.year <= :year_to")
params["year_to"] = year_to
if primary_type is not None:
where_clauses.append("br.primary_type = :primary_type")
params["primary_type"] = primary_type
sql = text(f"""
SELECT
br.beat_number,
SUM(br.incident_count)::int AS incident_count,
SUM(br.arrest_count)::int AS arrest_count,
SUM(br.domestic_count)::int AS domestic_count
FROM beat_rollups br
JOIN cities c ON c.id = br.city_id
WHERE {' AND '.join(where_clauses)}
GROUP BY br.beat_number
ORDER BY br.beat_number
""")
with SessionLocal() as session:
rows = session.execute(sql, params).fetchall()
if not rows:
return HeatmapResponse(
city_slug=city_slug,
filters={
"year": year, "year_from": year_from, "year_to": year_to,
"hour_min": hour_min, "hour_max": hour_max,
"primary_type": primary_type,
},
beats=[],
total_incidents=0,
max_beat_incidents=0,
)
beats = [
BeatHeatmapCell(
beat_number=r.beat_number,
incident_count=r.incident_count,
arrest_count=r.arrest_count,
domestic_count=r.domestic_count,
)
for r in rows
]
total = sum(b.incident_count for b in beats)
max_count = max(b.incident_count for b in beats)
return HeatmapResponse(
city_slug=city_slug,
filters={
"year": year, "year_from": year_from, "year_to": year_to,
"hour_min": hour_min, "hour_max": hour_max,
"primary_type": primary_type,
},
beats=beats,
total_incidents=total,
max_beat_incidents=max_count,
)