rpmjp/portfolio
rpmjp/projects/sentinel/auth_dependencies.py
CompletedOctober 2025 – January 2026

Sentinel — Fraud Detection Platform

Production-grade fraud operations platform with calibrated LightGBM scoring at 8.5ms, SHAP explainability on every prediction, and $1.23M in modeled net savings from cost-aware threshold tuning.

Python 3.12FastAPILightGBMSHAPPostgreSQL 16React 19TypeScriptTailwind v4
Languages
TypeScript56.7%
Python41.6%
CSS1%
Makefile0.4%
JavaScript0.1%
Mako0.1%
HTML0.1%
auth_dependencies.py
"""FastAPI auth dependencies.

Every protected endpoint depends on get_current_user, which decodes the JWT,
loads the user from the database, and returns an AuthContext that downstream
code uses to scope queries by tenant. require_role is a dependency factory
for role-gated endpoints — the check happens before handler code runs, so
there's no path to a protected endpoint that skips the role check.
"""

from __future__ import annotations

import uuid
from dataclasses import dataclass

import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session

from api.db.database import get_db
from api.db.models import User
from api.services.security import decode_access_token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


@dataclass(frozen=True)
class AuthContext:
    """Per-request identity carried through every protected handler.

    Frozen so handlers can't mutate it accidentally. Carries everything
    needed for tenant scoping and role checks without re-hitting the DB.
    """
    user_id: uuid.UUID
    tenant_id: uuid.UUID
    role: str
    email: str


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db),
) -> AuthContext:
    """Decode JWT, load user, return AuthContext. 401 on any failure.

    Failure modes (all return 401, never 500):
    - Token missing or malformed
    - Token signature invalid or expired
    - sub claim missing
    - User does not exist
    - User has been soft-deleted (deleted_at IS NOT NULL)
    """
    cred_exc = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid or expired credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_access_token(token)
    except jwt.PyJWTError as e:
        raise cred_exc from e

    user_id = payload.get("sub")
    if not user_id:
        raise cred_exc

    user = db.get(User, uuid.UUID(user_id))
    if user is None or user.deleted_at is not None:
        raise cred_exc

    return AuthContext(
        user_id=user.id,
        tenant_id=user.tenant_id,
        role=user.role,
        email=user.email,
    )


def require_role(*allowed: str):
    """Dependency factory for role-gated endpoints.

    Usage:
        @router.patch("/models/{id}/threshold")
        async def update_threshold(
            id: uuid.UUID,
            ctx: AuthContext = Depends(require_role("admin")),
        ):
            ...

    The role check runs as a FastAPI dependency — before handler code,
    before any DB writes. There's no path to a protected endpoint that
    skips this check.
    """
    def _checker(ctx: AuthContext = Depends(get_current_user)) -> AuthContext:
        if ctx.role not in allowed:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Requires one of: {', '.join(allowed)}",
            )
        return ctx
    return _checker