rpmjp/projects/sentinel/train_pipeline.py
CompletedOctober 2025 – January 2026
Sentinel — Fraud Detection Platform
Production-grade fraud operations platform with calibrated LightGBM scoring at 8.5ms, SHAP explainability on every prediction, and $1.23M in modeled net savings from cost-aware threshold tuning.
Python 3.12FastAPILightGBMSHAPPostgreSQL 16React 19TypeScriptTailwind v4
Languages
TypeScript56.7%
Python41.6%
CSS1%
Makefile0.4%
JavaScript0.1%
Mako0.1%
HTML0.1%
train_pipeline.py
"""Training entrypoint with early stopping, calibration, and MLflow tracking.
This script trains one model end-to-end: LightGBM, XGBoost, or LogReg.
Key design decisions encoded in this file:
- Per-iteration train/val curves logged to MLflow so the threshold tuner UI
can render learning curves and the threshold/cost surface.
- Calibration via isotonic regression on the validation set, wrapped around
a FrozenEstimator so the base model isn't refit. Boosting models produce
well-ranked but poorly-calibrated probabilities; calibration is required
for the cost-aware threshold tuner to be meaningful.
- Test set is logged to MLflow but NEVER printed to the console during
iteration. Peeking at test during model selection is leakage. The test
set gets revealed exactly once at the end of Phase 1, via scripts/final_eval.
- Aggregates feature ablation is parameterized: --with-aggregates includes
sender/receiver aggregate features, default omits them. Ablation runs
showed they hurt PR-AUC (likely leakage), so they're off by default.
Usage:
uv run python -m ml.training.train --model lightgbm
uv run python -m ml.training.train --model lightgbm --with-aggregates
"""
from __future__ import annotations
import argparse
import logging
import subprocess
from pathlib import Path
import joblib
import mlflow
import numpy as np
import pandas as pd
from lightgbm import LGBMClassifier, early_stopping, log_evaluation
from sklearn.calibration import CalibratedClassifierCV
from sklearn.frozen import FrozenEstimator
from ml.features.pipeline import prepare
from ml.training.metrics import evaluate
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
DATA_PATH = Path("data/raw/paysim.csv")
MODELS_DIR = Path("models")
EXPERIMENT_NAME = "sentinel-fraud-detection"
TRACKING_URI = "file:./mlruns"
def _git_sha() -> str:
"""Tag every MLflow run with the git commit so artifacts are reproducible."""
try:
return subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"], text=True
).strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return "unknown"
def _fit_lightgbm(X_train, y_train, X_val, y_val, scale_pos_weight: float) -> LGBMClassifier:
"""Fit LightGBM with early stopping on validation PR-AUC (average_precision).
Early stopping on PR-AUC, not ROC-AUC, because PR-AUC is the right metric
for a 0.13% fraud rate — ROC-AUC is dominated by easy true negatives and
overstates performance on imbalanced problems.
"""
model = LGBMClassifier(
n_estimators=500,
learning_rate=0.1,
max_depth=6,
num_leaves=31,
min_child_samples=50,
reg_lambda=1.0,
scale_pos_weight=scale_pos_weight,
random_state=42,
verbose=-1,
n_jobs=-1,
)
model.fit(
X_train, y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
eval_names=["train", "val"],
eval_metric="average_precision",
callbacks=[
early_stopping(stopping_rounds=100, verbose=True),
log_evaluation(period=25),
],
)
return model
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--model", choices=["lightgbm", "xgboost", "logreg"], default="lightgbm")
parser.add_argument("--with-aggregates", action="store_true",
help="Include sender/receiver aggregates (ablated off by default)")
args = parser.parse_args()
raw = pd.read_csv(DATA_PATH)
data = prepare(raw, use_aggregates=args.with_aggregates)
scale_pos_weight = float((data.y_train == 0).sum() / max((data.y_train == 1).sum(), 1))
log.info("scale_pos_weight = %.2f (handles 0.13%% fraud rate)", scale_pos_weight)
mlflow.set_tracking_uri(TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run(run_name=args.model):
mlflow.set_tag("git_sha", _git_sha())
mlflow.log_param("use_aggregates", args.with_aggregates)
mlflow.log_param("scale_pos_weight", scale_pos_weight)
log.info("Fitting base model: %s", args.model)
base_model = _fit_lightgbm(
data.X_train, data.y_train, data.X_val, data.y_val, scale_pos_weight
)
# Calibration: boosting models output ranking-good but probability-bad
# scores. FrozenEstimator wraps the trained model so the calibrator
# fits an isotonic mapping on the val set WITHOUT refitting the base.
# After this step, model.predict_proba returns actual probabilities,
# which is required for the cost-aware threshold tuner.
log.info("Calibrating probabilities (isotonic, prefit)")
model = CalibratedClassifierCV(FrozenEstimator(base_model), method="isotonic")
model.fit(data.X_val, data.y_val)
# Validation metrics: shown to me during iteration.
val_score = model.predict_proba(data.X_val)[:, 1]
val_report = evaluate(np.asarray(data.y_val), val_score)
for k, v in val_report.to_dict().items():
mlflow.log_metric(f"val_{k}", v)
log.info(
"Val: ROC-AUC=%.4f PR-AUC=%.4f best_savings=$%.0f @ t=%.2f",
val_report.roc_auc, val_report.pr_auc,
val_report.best_net_savings, val_report.best_threshold,
)
# Test metrics: logged to MLflow but NEVER printed. Peeking at test
# during model selection is leakage. The test set is unsealed exactly
# once at the end of Phase 1, via scripts/final_eval.py.
test_score = model.predict_proba(data.X_test)[:, 1]
test_report = evaluate(np.asarray(data.y_test), test_score)
for k, v in test_report.to_dict().items():
mlflow.log_metric(f"test_{k}", v)
MODELS_DIR.mkdir(exist_ok=True)
artifact_path = MODELS_DIR / f"{args.model}.joblib"
joblib.dump(
{"model": model, "feature_names": data.feature_names,
"best_threshold": val_report.best_threshold},
artifact_path,
)
mlflow.log_artifact(str(artifact_path), artifact_path="model")
log.info("Saved %s", artifact_path)
if __name__ == "__main__":
main()