rpmjp/portfolio
rpmjp/projects/sentinel/score_endpoint.py
CompletedOctober 2025 – January 2026

Sentinel — Fraud Detection Platform

Production-grade fraud operations platform with calibrated LightGBM scoring at 8.5ms, SHAP explainability on every prediction, and $1.23M in modeled net savings from cost-aware threshold tuning.

Python 3.12FastAPILightGBMSHAPPostgreSQL 16React 19TypeScriptTailwind v4
Languages
TypeScript56.7%
Python41.6%
CSS1%
Makefile0.4%
JavaScript0.1%
Mako0.1%
HTML0.1%
score_endpoint.py
"""Scoring endpoints: /score (single) and /score/batch.

Every scored transaction persists both a Transaction row and a Prediction row.
The SHAP explanation lives in JSONB on predictions.explanation so the queue,
feedback, and audit endpoints can reconstruct exactly what the analyst saw.
Tenant scoping is enforced by the AuthContext dependency — every query and
insert below is automatically scoped to ctx.tenant_id.
"""

from __future__ import annotations

import time
import uuid

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session

from api.db.database import get_db
from api.db.models import ModelVersion, Prediction, Transaction
from api.schemas.scoring import (
    BatchScoreIn,
    BatchScoreOut,
    ScoreOut,
    TopFeatureOut,
    TransactionIn,
)
from api.services.auth import AuthContext, get_current_user
from api.services.model_service import ModelService, ScoredTransaction, get_model_service

router = APIRouter(prefix="/score", tags=["scoring"])


def _active_model_version(db: Session, tenant_id: uuid.UUID) -> ModelVersion:
    """Resolve the current production model for this tenant.

    Each tenant can independently promote a model to production. Scoring fails
    fast with 503 if no production model is registered — better than scoring
    with a stale or staging model and silently producing wrong predictions.
    """
    mv = (
        db.query(ModelVersion)
        .filter(ModelVersion.tenant_id == tenant_id, ModelVersion.stage == "production")
        .order_by(ModelVersion.created_at.desc())
        .first()
    )
    if mv is None:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="No production model registered for this tenant",
        )
    return mv


def _persist(
    db: Session,
    *,
    tenant_id: uuid.UUID,
    model_version_id: uuid.UUID,
    txn_in: TransactionIn,
    scored: ScoredTransaction,
) -> tuple[uuid.UUID, uuid.UUID]:
    """Persist Transaction + Prediction atomically; return their UUIDs.

    threshold_at_scoring is captured per-row so historical decisions remain
    reproducible even after the production threshold is tuned.
    """
    txn = Transaction(
        tenant_id=tenant_id,
        step=txn_in.step,
        type=txn_in.type,
        amount=txn_in.amount,
        name_orig=txn_in.nameOrig,
        old_balance_org=txn_in.oldbalanceOrg,
        name_dest=txn_in.nameDest,
        old_balance_dest=txn_in.oldbalanceDest,
    )
    db.add(txn)
    db.flush()

    pred = Prediction(
        tenant_id=tenant_id,
        transaction_id=txn.id,
        model_version_id=model_version_id,
        score=scored.score,
        risk_band=scored.risk_band,
        threshold_at_scoring=scored.threshold,
        explanation={
            "top_features": [
                {"name": f.name, "value": f.value, "contribution": f.contribution}
                for f in scored.top_features
            ],
        },
        latency_ms=scored.latency_ms,
    )
    db.add(pred)
    db.flush()
    return txn.id, pred.id


@router.post("", response_model=ScoreOut)
async def score_single(
    txn: TransactionIn,
    svc: ModelService = Depends(get_model_service),
    db: Session = Depends(get_db),
    ctx: AuthContext = Depends(get_current_user),
) -> ScoreOut:
    """Score a single transaction; persist Transaction + Prediction; return score + SHAP."""
    mv = _active_model_version(db, ctx.tenant_id)
    scored = svc.score([txn.model_dump()])[0]
    txn_id, pred_id = _persist(
        db, tenant_id=ctx.tenant_id, model_version_id=mv.id, txn_in=txn, scored=scored
    )
    db.commit()
    return ScoreOut(
        transaction_id=txn_id,
        prediction_id=pred_id,
        score=scored.score,
        risk_band=scored.risk_band,
        threshold=scored.threshold,
        top_features=[
            TopFeatureOut(name=f.name, value=f.value, contribution=f.contribution)
            for f in scored.top_features
        ],
        latency_ms=scored.latency_ms,
    )