"""Magic link login (B-2.4). Stateless tokens via ``itsdangerous`` (no DB column). Same pattern as ``src/services/email.py:generate_verification_token`` — token contains the user_id; ``max_age`` is 15 minutes. The compatibility-audit (C2) explicitly forbids new User columns (no ``magic_link_token``, no ``magic_link_sent_at``). Single-use enforcement is implemented at the application layer via an in-process JTI cache (see ``_consumed_jtis`` below) — within a single gunicorn worker, a token can be consumed exactly once. Cross-worker uniqueness in a multi-worker deployment is best-effort and would require Redis or a small DB table; with the route's 10/min rate limit this is acceptable for B-2.4. OPERATOR NOTE — log scrubbing: The magic-link token appears in the URL path (``/auth/magic-link/``) and will therefore be captured by Cloudflare access logs, Flask's request log, and the user's browser history. The single-use cache here mitigates replay-from-logs within the 15-minute validity window, but operators should ALSO scrub ``/auth/magic-link/*`` from log retention as defence in depth (the operator action is documented in the security review; no application-side fix can fully address logs that have already been written elsewhere). """ import secrets import time from typing import Optional from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature from flask import current_app MAGIC_LINK_EXPIRY_SECONDS = 15 * 60 # 15 minutes _SALT = 'magic-link-login' # In-process consumed-JTI cache: {jti: expires_at_unix_timestamp}. # Single-use enforcement against replay within the 15-min validity window. # Cache is best-effort: in a multi-worker gunicorn deployment a JTI # consumed on worker A would still be accepted on worker B. For production # multi-worker deployments, replace with Redis or a small DB table. # For B-2.4 with rate-limiting at 10/min on consume + 5/min on request, # this provides meaningful single-use enforcement within a worker. _consumed_jtis: dict = {} def _serializer() -> URLSafeTimedSerializer: """Build a fresh serializer per call (cheap; reads SECRET_KEY from app config). Raises: RuntimeError: if SECRET_KEY is missing from app config. We refuse to fall back to a default key because that would let anyone forge magic-link tokens against any deployment that forgot to set SECRET_KEY. """ secret_key = current_app.config.get('SECRET_KEY') if not secret_key: raise RuntimeError( "SECRET_KEY must be configured for magic-link tokens" ) return URLSafeTimedSerializer(secret_key, salt=_SALT) def _purge_expired_jtis() -> None: """Drop entries past their expiry to bound memory.""" now = time.time() for jti in [j for j, exp in _consumed_jtis.items() if exp < now]: _consumed_jtis.pop(jti, None) def generate_magic_link_token(user_id: int) -> str: """Generate a single-use magic-link token (15-min expiry, includes random JTI). The JTI (JSON Token ID) is a random 16-byte URL-safe string embedded in the token payload. On consume, the JTI is added to the in-process ``_consumed_jtis`` cache; subsequent consumes of the same token return None (single-use enforcement). """ jti = secrets.token_urlsafe(16) return _serializer().dumps({'uid': user_id, 'jti': jti}) def consume_magic_link_token(token: str) -> Optional[int]: """Verify + mark token as consumed. Returns user_id once; None on replay/expired/invalid/malformed. Single-use enforcement: the JTI is added to ``_consumed_jtis`` on success; a second call with the same token returns None. """ try: payload = _serializer().loads(token, max_age=MAGIC_LINK_EXPIRY_SECONDS) except (SignatureExpired, BadSignature): return None if not isinstance(payload, dict): return None user_id = payload.get('uid') jti = payload.get('jti') if not isinstance(user_id, int) or not isinstance(jti, str): return None _purge_expired_jtis() if jti in _consumed_jtis: return None # replay — token already consumed _consumed_jtis[jti] = time.time() + MAGIC_LINK_EXPIRY_SECONDS return user_id