rpmjp/projects/sentinel/upload_hardening.py
CompletedOctober 2025 – January 2026
Sentinel — Fraud Detection Platform
Production-grade fraud operations platform with calibrated LightGBM scoring at 8.5ms, SHAP explainability on every prediction, and $1.23M in modeled net savings from cost-aware threshold tuning.
Python 3.12FastAPILightGBMSHAPPostgreSQL 16React 19TypeScriptTailwind v4
Languages
TypeScript56.7%
Python41.6%
CSS1%
Makefile0.4%
JavaScript0.1%
Mako0.1%
HTML0.1%
upload_hardening.py
"""Hardened CSV upload pipeline.
The CSV upload endpoint accepts arbitrary user files and runs them through
the scoring pipeline. That's an attack surface — every CSV is untrusted
input from the network. This module's defenses, in order:
1. Role gate: only senior_analyst or admin can upload.
2. Per-user rate limit: rolling minute + hour windows, in-memory deque.
3. Filename/content-type allowlist: only .csv with CSV-ish content types.
4. Size cap: 5 MB enforced at Nginx (edge) and re-checked here.
5. Schema validation: required columns + alias normalization.
6. Formula injection neutralization: cells starting with = + - @
are prefixed with ' so spreadsheet apps treat them as text.
7. Row cap: 10,000 transactions per upload.
8. Atomic commit: scoring failure mid-batch rolls back the whole upload
and writes a failed audit row.
9. Full audit trail: every attempt logged to upload_audits, success or fail.
"""
from __future__ import annotations
import csv
import io
import time
import uuid
from collections import defaultdict, deque
from typing import Any
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from pydantic import ValidationError
from sqlalchemy.orm import Session
from api.db.database import get_db
from api.db.models import UploadAudit
from api.routers.scoring import _active_model_version, _persist
from api.schemas.scoring import TransactionIn
from api.services.auth import AuthContext, get_current_user
from api.services.model_service import ModelService, get_model_service
router = APIRouter(prefix="/upload", tags=["upload"])
# --- Defense parameters ------------------------------------------------------
MAX_ROWS = 10_000
MAX_UPLOAD_BYTES = 5 * 1024 * 1024
ALLOWED_CONTENT_TYPES = {
"text/csv", "application/csv", "application/vnd.ms-excel", "text/plain",
}
RATE_LIMIT_PER_MINUTE = 30
RATE_LIMIT_PER_HOUR = 200
CHUNK_SIZE = 100
FORMULA_PREFIXES = ("=", "+", "-", "@")
TEXT_COLUMNS = {"type", "nameOrig", "nameDest"}
_upload_attempts: defaultdict[tuple[uuid.UUID, uuid.UUID], deque[float]] = defaultdict(deque)
REQUIRED_COLUMNS = [
"step", "type", "amount",
"nameOrig", "oldbalanceOrg", "newbalanceOrig",
"nameDest", "oldbalanceDest", "newbalanceDest",
]
def _normalize_row(row: dict[str, Any]) -> dict[str, Any]:
"""Neutralize formula injection in user-supplied text cells.
Why: CSVs we export get opened in Excel/Sheets by analysts. A cell value
like '=cmd|"/c calc"!A0' executes as a formula when opened. Prefixing
with a single-quote forces the spreadsheet to treat it as a literal
string, blocking the entire class of attack.
Also strips null bytes (\x00), which break some CSV readers and have
been used as a parser-confusion vector.
"""
normalized = dict(row)
for column in TEXT_COLUMNS:
value = normalized.get(column)
if isinstance(value, str):
cleaned = value.replace("\x00", "")
stripped = cleaned.lstrip()
if stripped.startswith(FORMULA_PREFIXES):
normalized[column] = f"'{stripped}"
else:
normalized[column] = cleaned
return normalized
def _check_rate_limit(ctx: AuthContext) -> None:
"""Per-user rolling-window rate limit. Two checks: per-minute, per-hour.
Per-user keying (not per-tenant) means a compromised credential can't
use other users' allowances. Per-tenant keying would let one bad actor
exhaust the budget for the whole org.
"""
now = time.monotonic()
key = (ctx.tenant_id, ctx.user_id)
attempts = _upload_attempts[key]
while attempts and attempts[0] < now - 3600:
attempts.popleft()
minute_count = sum(1 for ts in attempts if ts >= now - 60)
if minute_count >= RATE_LIMIT_PER_MINUTE or len(attempts) >= RATE_LIMIT_PER_HOUR:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Upload rate limit exceeded. Please wait before trying again.",
)
attempts.append(now)
async def _read_limited(file: UploadFile) -> bytes:
"""Read at most MAX_UPLOAD_BYTES + 1; reject anything larger.
Nginx already enforces this at the edge — the Python process should
never see oversized payloads in practice. This is defense in depth:
if the Nginx config drifts or the deployment skips the reverse proxy,
the cap still holds.
"""
data = await file.read(MAX_UPLOAD_BYTES + 1)
if len(data) > MAX_UPLOAD_BYTES:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"CSV exceeds {MAX_UPLOAD_BYTES // (1024 * 1024)} MB upload limit",
)
return data
def _parse_csv(contents: bytes) -> tuple[list[TransactionIn], list[dict[str, Any]]]:
"""Parse + validate CSV. Returns (valid_rows, per_row_errors).
Uses utf-8-sig to transparently strip the BOM that Excel exports add.
Hard-fails on missing required columns; soft-fails per-row on type/value
errors so the caller can show analysts a structured error report.
"""
try:
text = contents.decode("utf-8-sig")
except UnicodeDecodeError as exc:
raise HTTPException(status_code=400, detail="CSV must be UTF-8 encoded") from exc
reader = csv.DictReader(io.StringIO(text))
if not reader.fieldnames:
raise HTTPException(status_code=400, detail="CSV has no header row")
missing = [c for c in REQUIRED_COLUMNS if c not in reader.fieldnames]
if missing:
raise HTTPException(
status_code=400,
detail={"message": "CSV is missing required columns", "missing": missing},
)
transactions: list[TransactionIn] = []
errors: list[dict[str, Any]] = []
for index, raw in enumerate(reader, start=2):
if len(transactions) >= MAX_ROWS:
raise HTTPException(status_code=400, detail=f"CSV exceeds {MAX_ROWS:,} row cap")
try:
transactions.append(TransactionIn.model_validate(_normalize_row(raw)))
except ValidationError as exc:
for err in exc.errors():
errors.append({
"row": index,
"field": ".".join(str(p) for p in err["loc"]),
"message": err["msg"],
})
return transactions, errors
@router.post("/transactions")
async def upload_transactions(
file: UploadFile = File(...),
svc: ModelService = Depends(get_model_service),
db: Session = Depends(get_db),
ctx: AuthContext = Depends(get_current_user),
) -> dict[str, Any]:
if ctx.role not in {"senior_analyst", "admin"}:
raise HTTPException(status_code=403, detail="senior analyst or admin role required")
_check_rate_limit(ctx)
filename = file.filename or ""
if not filename.lower().endswith(".csv"):
raise HTTPException(status_code=400, detail="Upload must be a CSV file")
if file.content_type and file.content_type.lower() not in ALLOWED_CONTENT_TYPES:
raise HTTPException(status_code=400, detail="Upload content type must be CSV")
contents = await _read_limited(file)
transactions, errors = _parse_csv(contents)
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={"message": "CSV validation failed", "errors": errors[:200]},
)
mv = _active_model_version(db, ctx.tenant_id)
counts = {"high": 0, "medium": 0, "low": 0}
# Chunked scoring: keeps memory bounded for 10k-row uploads and gives
# the scoring service natural batch boundaries.
for start in range(0, len(transactions), CHUNK_SIZE):
chunk = transactions[start : start + CHUNK_SIZE]
scored = svc.score([txn.model_dump() for txn in chunk])
for txn, score in zip(chunk, scored, strict=True):
_persist(db, tenant_id=ctx.tenant_id, model_version_id=mv.id,
txn_in=txn, scored=score)
counts[score.risk_band] += 1
db.flush()
db.commit()
return {"uploaded": len(transactions), "scored": len(transactions), **counts}