rpmjp/projects/sentinel/score_endpoint.py
CompletedOctober 2025 – January 2026
Sentinel — Fraud Detection Platform
Production-grade fraud operations platform with calibrated LightGBM scoring at 8.5ms, SHAP explainability on every prediction, and $1.23M in modeled net savings from cost-aware threshold tuning.
Python 3.12FastAPILightGBMSHAPPostgreSQL 16React 19TypeScriptTailwind v4
Languages
TypeScript56.7%
Python41.6%
CSS1%
Makefile0.4%
JavaScript0.1%
Mako0.1%
HTML0.1%
score_endpoint.py
"""Scoring endpoints: /score (single) and /score/batch.
Every scored transaction persists both a Transaction row and a Prediction row.
The SHAP explanation lives in JSONB on predictions.explanation so the queue,
feedback, and audit endpoints can reconstruct exactly what the analyst saw.
Tenant scoping is enforced by the AuthContext dependency — every query and
insert below is automatically scoped to ctx.tenant_id.
"""
from __future__ import annotations
import time
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from api.db.database import get_db
from api.db.models import ModelVersion, Prediction, Transaction
from api.schemas.scoring import (
BatchScoreIn,
BatchScoreOut,
ScoreOut,
TopFeatureOut,
TransactionIn,
)
from api.services.auth import AuthContext, get_current_user
from api.services.model_service import ModelService, ScoredTransaction, get_model_service
router = APIRouter(prefix="/score", tags=["scoring"])
def _active_model_version(db: Session, tenant_id: uuid.UUID) -> ModelVersion:
"""Resolve the current production model for this tenant.
Each tenant can independently promote a model to production. Scoring fails
fast with 503 if no production model is registered — better than scoring
with a stale or staging model and silently producing wrong predictions.
"""
mv = (
db.query(ModelVersion)
.filter(ModelVersion.tenant_id == tenant_id, ModelVersion.stage == "production")
.order_by(ModelVersion.created_at.desc())
.first()
)
if mv is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No production model registered for this tenant",
)
return mv
def _persist(
db: Session,
*,
tenant_id: uuid.UUID,
model_version_id: uuid.UUID,
txn_in: TransactionIn,
scored: ScoredTransaction,
) -> tuple[uuid.UUID, uuid.UUID]:
"""Persist Transaction + Prediction atomically; return their UUIDs.
threshold_at_scoring is captured per-row so historical decisions remain
reproducible even after the production threshold is tuned.
"""
txn = Transaction(
tenant_id=tenant_id,
step=txn_in.step,
type=txn_in.type,
amount=txn_in.amount,
name_orig=txn_in.nameOrig,
old_balance_org=txn_in.oldbalanceOrg,
name_dest=txn_in.nameDest,
old_balance_dest=txn_in.oldbalanceDest,
)
db.add(txn)
db.flush()
pred = Prediction(
tenant_id=tenant_id,
transaction_id=txn.id,
model_version_id=model_version_id,
score=scored.score,
risk_band=scored.risk_band,
threshold_at_scoring=scored.threshold,
explanation={
"top_features": [
{"name": f.name, "value": f.value, "contribution": f.contribution}
for f in scored.top_features
],
},
latency_ms=scored.latency_ms,
)
db.add(pred)
db.flush()
return txn.id, pred.id
@router.post("", response_model=ScoreOut)
async def score_single(
txn: TransactionIn,
svc: ModelService = Depends(get_model_service),
db: Session = Depends(get_db),
ctx: AuthContext = Depends(get_current_user),
) -> ScoreOut:
"""Score a single transaction; persist Transaction + Prediction; return score + SHAP."""
mv = _active_model_version(db, ctx.tenant_id)
scored = svc.score([txn.model_dump()])[0]
txn_id, pred_id = _persist(
db, tenant_id=ctx.tenant_id, model_version_id=mv.id, txn_in=txn, scored=scored
)
db.commit()
return ScoreOut(
transaction_id=txn_id,
prediction_id=pred_id,
score=scored.score,
risk_band=scored.risk_band,
threshold=scored.threshold,
top_features=[
TopFeatureOut(name=f.name, value=f.value, contribution=f.contribution)
for f in scored.top_features
],
latency_ms=scored.latency_ms,
)