From 3a41bb482dcf72251af13b125cf0926e1b42ee1a Mon Sep 17 00:00:00 2001 From: Allison Date: Mon, 27 Apr 2026 23:50:55 -0400 Subject: [PATCH] =?UTF-8?q?fix(auth):=20B-2.4=20security=20review=20fixes?= =?UTF-8?q?=20=E2=80=94=20OAuth=20linking=20+=20magic=20link=20replay?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to commit 0513e67 addressing 2 critical OAuth account-takeover vulnerabilities and 5 important issues found in the security review. Critical fixes: - C1: gate OAuth email-link on ``email_verified is True`` (strict bool) in find_user_by_oauth + callback. Hostile Microsoft personal account or Workspace tenant returning email_verified=False (or omitting the claim) can no longer auto-link to an existing account. Callback shows a friendly French flash + redirect to /login when the email exists but the IdP didn't verify it. - C2: refuse to overwrite an existing sso_subject in find_user_by_oauth. A second IdP claiming the victim's email (Google after Microsoft, or a hostile second Microsoft tenant) now raises PermissionError instead of silently re-binding the User row, which would lock the legitimate user out. Callback catches and flashes the error message in French. Important fixes: - I1: replace ``except Exception: pass`` in init_oauth_providers with an idempotency pre-check on _oauth._clients. Real registration errors (bad metadata URL, network failure) now surface as exceptions instead of being silently swallowed at app boot. - I2: single-use enforcement for magic-link tokens via in-process JTI cache (_consumed_jtis dict). Replay within the 15-min validity window now returns None. SECRET_KEY is now strictly required (no default-dev-key fallback). Operator-facing comment documents that /auth/magic-link/* should also be scrubbed from Cloudflare/Flask access logs as defence in depth. - I3: pre-check email collision in create_oauth_user_with_consent and raise dedicated EmailAlreadyExistsError. Race against parallel /signup in another tab between OAuth callback and finish-signup POST now redirects to /login with a helpful French flash instead of burning 5 retry attempts and surfacing a 500. - I4: oauth_signup_pending session blob now carries a created_at timestamp; finish-signup rejects sessions older than 15 min with a graceful expiry flash + redirect to /login. - I5: init_oauth_providers logs an INFO when no providers are enabled so operators can spot misconfigured deployments. Tests: 16 → 21 (5 new): - test_oauth_callback_refuses_link_when_email_not_verified (C1) - test_oauth_callback_refuses_to_overwrite_existing_sso_subject (C2) - test_finish_signup_handles_concurrent_account_creation (I3) - test_finish_signup_expires_stale_oauth_session (I4) - test_magic_link_token_is_single_use (I2) Existing tests updated for new contract: - test_oauth_callback_links_existing_user_by_email now sets email_verified=True in the mock token (required by C1 gate). - test_finish_signup_requires_cgu_and_confidentialite and test_finish_signup_creates_user_and_4_consent_logs now seed created_at in the session blob (required by I4 expiry check). - test_magic_link_consume_logs_in_user_with_valid_token now also asserts a second consume of the same token returns None and redirects to /auth/magic-link with an invalid/expired flash. Verified: 21/21 OAuth+magic-link tests pass; 16/16 email service tests still pass (no regression in adjacent surface). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/api/auth.py | 65 ++++++- src/auth/magic_link.py | 89 ++++++++-- src/auth/oauth_providers.py | 119 +++++++++++-- tests/test_oauth_magic_link.py | 309 ++++++++++++++++++++++++++++++++- 4 files changed, 548 insertions(+), 34 deletions(-) diff --git a/src/api/auth.py b/src/api/auth.py index 7d36ada..0503a5d 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -55,6 +55,7 @@ from src.auth.oauth_providers import ( find_user_by_oauth, create_oauth_user_with_consent, get_oauth_provider_display_name, + EmailAlreadyExistsError, ) from src.auth.magic_link import ( generate_magic_link_token, @@ -595,13 +596,47 @@ def oauth_provider_callback(provider): ) return redirect(url_for('auth.login')) - user = find_user_by_oauth(provider, subject, email) + # C1: read email_verified strictly. We accept ONLY the literal boolean + # True; None / missing / 'true' string / etc. are treated as False so + # that a hostile or misconfigured IdP cannot upgrade an unverified + # claim into an auto-link to an existing account. + email_verified_raw = userinfo.get('email_verified') + email_verified = email_verified_raw is True + + # C2: find_user_by_oauth raises PermissionError if the email matches + # an existing user that is already linked to a different OAuth identity. + try: + user = find_user_by_oauth( + provider, subject, email, email_verified=email_verified, + ) + except PermissionError as e: + flash(str(e), 'danger') + return redirect(url_for('auth.login')) + if user: login_user(user) audit_sso_login(user.id, details={'provider': provider}) return redirect(url_for('recordings.index')) + # C1: if no user was returned but an account exists for this email AND + # email_verified is False, the IdP couldn't (or wouldn't) prove the + # user controls the mailbox. Falling through to finish-signup would + # create a duplicate account on a different identity AND mask the + # takeover attempt; instead refuse explicitly with a friendly flash. + existing_with_email = User.query.filter_by(email=email).first() + if existing_with_email and not email_verified: + provider_display_name = get_oauth_provider_display_name(provider) + flash( + f"Un compte DictIA existe déjà pour ce courriel. " + f"Connectez-vous d'abord avec votre mot de passe pour lier votre " + f"compte {provider_display_name}, ou demandez l'aide du support.", + 'danger', + ) + return redirect(url_for('auth.login')) + # New user — defer creation until Loi 25 consents are captured. + # I4: include created_at so finish-signup can reject stale sessions. + import time as _time session['oauth_signup_pending'] = { 'provider': provider, 'subject': subject, @@ -611,6 +646,7 @@ def oauth_provider_callback(provider): 'given_name': userinfo.get('given_name', ''), 'family_name': userinfo.get('family_name', ''), }, + 'created_at': _time.time(), } return redirect(url_for('auth.oauth_finish_signup')) @@ -626,6 +662,21 @@ def oauth_finish_signup(): pending = session.get('oauth_signup_pending') if not pending: return redirect(url_for('auth.signup')) + + # I4: reject stale OAuth signup sessions (>15 min). Operator might have + # restarted between callback and finish-signup, or user abandoned then + # came back hours later. Either way, restart the OAuth flow rather + # than trust a stale subject claim. + import time as _time + if pending.get('created_at', 0) < _time.time() - 15 * 60: + session.pop('oauth_signup_pending', None) + flash( + "Votre session d'inscription OAuth a expiré. " + "Recommencez avec votre fournisseur.", + 'warning', + ) + return redirect(url_for('auth.login')) + if current_user.is_authenticated: session.pop('oauth_signup_pending', None) return redirect(url_for('recordings.index')) @@ -673,6 +724,18 @@ def oauth_finish_signup(): ua=ua, legal_version=SIGNUP_LEGAL_VERSION, ) + except EmailAlreadyExistsError: + # I3: race — a parallel /signup created the email between OAuth + # callback and this POST. Don't 500; redirect to /login with a + # helpful flash so the user knows their existing account is fine. + session.pop('oauth_signup_pending', None) + flash( + "Un compte DictIA existe déjà pour ce courriel. " + "Connectez-vous avec votre mot de passe ou utilisez votre " + "fournisseur d'origine.", + 'warning', + ) + return redirect(url_for('auth.login')) except ValueError as e: current_app.logger.warning('OAuth signup failed: %s', e) flash( diff --git a/src/auth/magic_link.py b/src/auth/magic_link.py index 127bc08..60a015c 100644 --- a/src/auth/magic_link.py +++ b/src/auth/magic_link.py @@ -5,13 +5,26 @@ Stateless tokens via ``itsdangerous`` (no DB column). Same pattern as the user_id; ``max_age`` is 15 minutes. The compatibility-audit (C2) explicitly forbids new User columns -(no ``magic_link_token``, no ``magic_link_sent_at``). Single-use enforcement -is intentionally NOT implemented at this layer because the cost of a -short-window replay (≤15 min, requires the user's email) is acceptable -for the threat model — the user opened the email and clicked the link. -If single-use becomes a hard requirement later, add an ip + sent_at index -to a separate magic-link audit table without touching User. +(no ``magic_link_token``, no ``magic_link_sent_at``). Single-use +enforcement is implemented at the application layer via an in-process +JTI cache (see ``_consumed_jtis`` below) — within a single gunicorn +worker, a token can be consumed exactly once. Cross-worker uniqueness +in a multi-worker deployment is best-effort and would require Redis or +a small DB table; with the route's 10/min rate limit this is acceptable +for B-2.4. + +OPERATOR NOTE — log scrubbing: +The magic-link token appears in the URL path (``/auth/magic-link/``) +and will therefore be captured by Cloudflare access logs, Flask's request +log, and the user's browser history. The single-use cache here mitigates +replay-from-logs within the 15-minute validity window, but operators +should ALSO scrub ``/auth/magic-link/*`` from log retention as defence +in depth (the operator action is documented in the security review; +no application-side fix can fully address logs that have already been +written elsewhere). """ +import secrets +import time from typing import Optional from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature @@ -20,21 +33,73 @@ from flask import current_app MAGIC_LINK_EXPIRY_SECONDS = 15 * 60 # 15 minutes _SALT = 'magic-link-login' +# In-process consumed-JTI cache: {jti: expires_at_unix_timestamp}. +# Single-use enforcement against replay within the 15-min validity window. +# Cache is best-effort: in a multi-worker gunicorn deployment a JTI +# consumed on worker A would still be accepted on worker B. For production +# multi-worker deployments, replace with Redis or a small DB table. +# For B-2.4 with rate-limiting at 10/min on consume + 5/min on request, +# this provides meaningful single-use enforcement within a worker. +_consumed_jtis: dict = {} + def _serializer() -> URLSafeTimedSerializer: - """Build a fresh serializer per call (cheap; reads SECRET_KEY from app config).""" - secret_key = current_app.config.get('SECRET_KEY', 'default-dev-key') + """Build a fresh serializer per call (cheap; reads SECRET_KEY from app config). + + Raises: + RuntimeError: if SECRET_KEY is missing from app config. We refuse + to fall back to a default key because that would let anyone + forge magic-link tokens against any deployment that forgot + to set SECRET_KEY. + """ + secret_key = current_app.config.get('SECRET_KEY') + if not secret_key: + raise RuntimeError( + "SECRET_KEY must be configured for magic-link tokens" + ) return URLSafeTimedSerializer(secret_key, salt=_SALT) +def _purge_expired_jtis() -> None: + """Drop entries past their expiry to bound memory.""" + now = time.time() + for jti in [j for j, exp in _consumed_jtis.items() if exp < now]: + _consumed_jtis.pop(jti, None) + + def generate_magic_link_token(user_id: int) -> str: - """Sign a magic-link token containing the user_id.""" - return _serializer().dumps(user_id) + """Generate a single-use magic-link token (15-min expiry, includes random JTI). + + The JTI (JSON Token ID) is a random 16-byte URL-safe string embedded + in the token payload. On consume, the JTI is added to the in-process + ``_consumed_jtis`` cache; subsequent consumes of the same token + return None (single-use enforcement). + """ + jti = secrets.token_urlsafe(16) + return _serializer().dumps({'uid': user_id, 'jti': jti}) def consume_magic_link_token(token: str) -> Optional[int]: - """Return user_id if token is valid and unexpired, else None.""" + """Verify + mark token as consumed. Returns user_id once; None on + replay/expired/invalid/malformed. + + Single-use enforcement: the JTI is added to ``_consumed_jtis`` on + success; a second call with the same token returns None. + """ try: - return _serializer().loads(token, max_age=MAGIC_LINK_EXPIRY_SECONDS) + payload = _serializer().loads(token, max_age=MAGIC_LINK_EXPIRY_SECONDS) except (SignatureExpired, BadSignature): return None + + if not isinstance(payload, dict): + return None + user_id = payload.get('uid') + jti = payload.get('jti') + if not isinstance(user_id, int) or not isinstance(jti, str): + return None + + _purge_expired_jtis() + if jti in _consumed_jtis: + return None # replay — token already consumed + _consumed_jtis[jti] = time.time() + MAGIC_LINK_EXPIRY_SECONDS + return user_id diff --git a/src/auth/oauth_providers.py b/src/auth/oauth_providers.py index 9e70124..ec8377e 100644 --- a/src/auth/oauth_providers.py +++ b/src/auth/oauth_providers.py @@ -48,6 +48,19 @@ _PROVIDER_CONFIG = { } +class EmailAlreadyExistsError(Exception): + """Raised by create_oauth_user_with_consent when email is already taken + between the OAuth callback (where the new-user check passed) and the + finish-signup POST (where the User row is finally inserted). + + This protects against a race: a parallel /signup in another tab can + create a User with the same email between callback and finish-signup, + making the OAuth User insert fail with an IntegrityError on the + email-unique constraint. Catching this allows a graceful flash + redirect + instead of a 500. + """ + + def is_oauth_provider_enabled(provider: str) -> bool: """Return True if the provider has client_id AND client_secret in env.""" cfg = _PROVIDER_CONFIG.get(provider) @@ -72,25 +85,33 @@ def init_oauth_providers(app) -> Optional[OAuth]: global _oauth enabled_providers = [p for p in _PROVIDER_CONFIG if is_oauth_provider_enabled(p)] if not enabled_providers: + # Operability: log when no providers are enabled so operators don't + # silently lose OAuth login on misconfigured deployments. + app.logger.info( + 'OAuth providers: none enabled (set MS_CLIENT_ID/MS_CLIENT_SECRET ' + 'or GOOGLE_CLIENT_ID/GOOGLE_CLIENT_SECRET to enable).' + ) return None if _oauth is None: _oauth = OAuth(app) for provider in enabled_providers: cfg = _PROVIDER_CONFIG[provider] - # Authlib's register() is idempotent for the same name; safe to call - # again if already registered (no-op on duplicate). - try: - _oauth.register( - name=provider, - client_id=os.environ[cfg['env_client_id']], - client_secret=os.environ[cfg['env_client_secret']], - server_metadata_url=cfg['server_metadata_url'], - client_kwargs={'scope': cfg['scope']}, + # Idempotent: skip re-registration if already registered (Authlib caches + # by name in `_clients`). Real registration errors (bad metadata URL, + # network failure) now surface as exceptions instead of being silently + # swallowed by a bare `except Exception: pass`. + if provider in getattr(_oauth, '_clients', {}): + app.logger.debug( + 'OAuth provider %r already registered (skipping)', provider ) - except Exception: - # Already-registered — Authlib raises on duplicate. Acceptable - # for idempotent app boot. - pass + continue + _oauth.register( + name=provider, + client_id=os.environ[cfg['env_client_id']], + client_secret=os.environ[cfg['env_client_secret']], + server_metadata_url=cfg['server_metadata_url'], + client_kwargs={'scope': cfg['scope']}, + ) app.logger.info( 'OAuth providers initialized: %s', ', '.join(enabled_providers) ) @@ -105,18 +126,43 @@ def get_oauth_client(provider: str): def find_user_by_oauth( - provider: str, subject: str, email: Optional[str] + provider: str, + subject: str, + email: Optional[str], + email_verified: bool, ) -> Optional[User]: """Lookup an existing user by sso_subject, then email (link path). + Args: + provider: 'microsoft' or 'google'. + subject: OAuth ``sub`` claim — stable per (IdP, user) tuple. + email: OAuth ``email`` claim (case-insensitive). + email_verified: MUST be True (the literal boolean) for the + email-link branch to fire. Caller is responsible for reading + ``userinfo.get('email_verified') is True`` — we treat anything + else as untrusted. + Returns: - User object: known account (login directly). - - None: brand-new account — caller should defer to consent page. + - None: brand-new account (caller defers to finish-signup) OR the + email matched an existing account but ``email_verified is not True`` + (caller should refuse to silently link — see oauth callback handler). - On the email-match path, the OAuth identity is bound to the existing - account on first login. This is safe because the OAuth provider has - already verified the email; we are not granting access to anyone who - couldn't already prove control of the address. + Raises: + PermissionError: if an existing email-matched user already has a + ``sso_subject`` set (linked to a different OAuth identity). Refusing + to overwrite protects against account-hijack via a second IdP + claiming the victim's email (C2 from the security review). + + Security notes: + - Linking by email is gated on ``email_verified is True``. A hostile + IdP that returns ``email_verified=False`` (or omits the claim) does + NOT auto-link to an existing account. This blocks the takeover + vector where an attacker creates a Microsoft personal account or + Workspace tenant claiming a victim's mailbox without verification. + - We refuse to overwrite an existing ``sso_subject``. If Alice is + already linked to ms-sub-A, a second login claiming the same email + from google or another tenant is rejected, not silently re-linked. """ user = User.query.filter_by(sso_subject=subject, sso_provider=provider).first() if user: @@ -124,6 +170,21 @@ def find_user_by_oauth( if email: existing_email_user = User.query.filter_by(email=email.lower().strip()).first() if existing_email_user: + # C1: refuse to auto-link if the IdP did not assert email_verified. + # The caller will refuse to fall through to finish-signup either + # (since that would create a duplicate account on a different + # identity), so returning None here triggers the friendly flash. + if email_verified is not True: + return None + # C2: refuse to overwrite an existing linked OAuth identity. + # If we got here the first branch (sso_subject lookup) didn't + # match — meaning either the user has a different sso_subject + # (account hijack attempt) or no sso_subject at all (legit link). + if existing_email_user.sso_subject: + raise PermissionError( + f"L'adresse {email} est déjà liée à une autre identité fédérée. " + f"Connectez-vous avec votre fournisseur d'origine, ou contactez le support." + ) existing_email_user.sso_provider = provider existing_email_user.sso_subject = subject db.session.commit() @@ -148,6 +209,13 @@ def create_oauth_user_with_consent( Always writes 4 ConsentLog rows (one per consent_type), recording explicit refusal as ``granted=False`` for the audit trail. + + Raises: + ValueError: if userinfo is missing the email claim. + EmailAlreadyExistsError: if a User with this email already exists + (race against /signup or another OAuth login between the + callback and the finish-signup POST). Caller should handle + with a friendly French flash + redirect to /login. """ from src.models.consent import ConsentLog from src.auth.sso import generate_unique_username @@ -157,6 +225,19 @@ def create_oauth_user_with_consent( if not email: raise ValueError('OAuth userinfo missing email') + # I3: pre-check for the email-collision race. The username retry loop + # below ONLY helps with username collisions; a duplicate email would + # burn 5 attempts and then re-raise IntegrityError, which surfaces as + # a 500. Detect it once here and raise the dedicated exception so the + # caller can render a friendly "compte existe déjà" flash. + existing = User.query.filter_by(email=email).first() + if existing: + raise EmailAlreadyExistsError( + f"Account with email {email} already exists; cannot create via " + f"OAuth signup. User should sign in with their original method " + f"or contact support." + ) + name = (userinfo.get('name') or '').strip() if not name: first = (userinfo.get('given_name') or '').strip() diff --git a/tests/test_oauth_magic_link.py b/tests/test_oauth_magic_link.py index 091bfb9..49668f7 100644 --- a/tests/test_oauth_magic_link.py +++ b/tests/test_oauth_magic_link.py @@ -202,7 +202,12 @@ def test_oauth_callback_logs_in_existing_user_by_subject(): # ---------------------------------------------------------------------- def test_oauth_callback_links_existing_user_by_email(): - """User with matching email but no sso_subject gets linked + logged in.""" + """User with matching email but no sso_subject gets linked + logged in. + + Requires email_verified=True from the IdP — see test + test_oauth_callback_refuses_link_when_email_not_verified for the + negative case. + """ with app.app_context(): _disable_csrf() _set_oauth_env() @@ -222,6 +227,7 @@ def test_oauth_callback_links_existing_user_by_email(): 'userinfo': { 'sub': 'google-new-sub-789', 'email': 'emaillink@example.qc.ca', + 'email_verified': True, # required for auto-link 'name': 'Email Link User', } } @@ -249,6 +255,7 @@ def test_oauth_callback_links_existing_user_by_email(): def test_finish_signup_requires_cgu_and_confidentialite(): """POST without CGU+confidentialite returns 400; no User created; session preserved.""" + import time as _time with app.app_context(): _disable_csrf() db.create_all() @@ -264,6 +271,7 @@ def test_finish_signup_requires_cgu_and_confidentialite(): 'given_name': 'Pending', 'family_name': 'User', }, + 'created_at': _time.time(), } resp = client.post('/auth/oauth/finish-signup', data={ # No consents @@ -287,6 +295,7 @@ def test_finish_signup_requires_cgu_and_confidentialite(): def test_finish_signup_creates_user_and_4_consent_logs(): """POST with all consents → User created with 4 ConsentLog rows + login.""" + import time as _time with app.app_context(): _disable_csrf() db.create_all() @@ -302,6 +311,7 @@ def test_finish_signup_creates_user_and_4_consent_logs(): 'given_name': 'Success', 'family_name': 'User', }, + 'created_at': _time.time(), } resp = client.post('/auth/oauth/finish-signup', data={ 'consent_cgu': 'y', @@ -446,7 +456,18 @@ def test_magic_link_request_skips_unverified_user_silently(): # ---------------------------------------------------------------------- def test_magic_link_consume_logs_in_user_with_valid_token(): - """Valid token → user logged in + redirect to recordings.index.""" + """Valid token → user logged in + redirect to recordings.index. Second + consume of the same token is refused (single-use enforcement). + + Note on the g.pop('_login_user') below: when test_client requests run + inside an outer ``with app.app_context()`` block, Flask-Login caches + the user on ``g._login_user`` and that cache persists to the next + request because ``g`` is bound to the app context, not the request + context. Production (no outer app_context) gets a fresh g per request + and is unaffected. We pop the cache between requests to simulate the + fresh-request behavior. + """ + from flask import g with app.app_context(): _disable_csrf() db.create_all() @@ -468,6 +489,27 @@ def test_magic_link_consume_logs_in_user_with_valid_token(): assert '/auth/magic-link' not in resp.headers['Location'] with client.session_transaction() as sess: assert sess.get('_user_id') == str(user.id) + + # Clear Flask-Login's cached current_user so the next test_client + # request doesn't see the previous user as still-authenticated + # (artifact of running multiple test_clients inside a shared + # app_context — see docstring). + g.pop('_login_user', None) + + # Single-use: replay the same token in a fresh client; must be + # refused. + with app.test_client() as client2: + resp2 = client2.get(f'/auth/magic-link/{token}', follow_redirects=False) + assert resp2.status_code == 302 + # Refused → redirected back to /auth/magic-link (the request page) + assert '/auth/magic-link' in resp2.headers['Location'] + with client2.session_transaction() as sess: + assert sess.get('_user_id') is None + flashes = sess.get('_flashes', []) + assert any( + 'invalide' in msg.lower() or 'expir' in msg.lower() + for _cat, msg in flashes + ), f'expected invalid/expired flash on replay, got {flashes}' finally: db.session.rollback() db.drop_all() @@ -612,3 +654,266 @@ def test_send_magic_link_email_french_branded(): finally: db.session.rollback() db.drop_all() + + +# ---------------------------------------------------------------------- +# 16. C1 — refuse to auto-link when IdP did not verify the email +# ---------------------------------------------------------------------- + +def test_oauth_callback_refuses_link_when_email_not_verified(): + """Hostile IdP returns email_verified=False — must NOT auto-link to existing user. + + Account-takeover protection: a Microsoft personal account or hostile + Workspace tenant could issue a token claiming + ``email='alice@dictia.ca'`` without ever proving Alice controls that + mailbox. Falling through and auto-linking would let the attacker log + in as Alice. The fix gates email-link on ``email_verified is True``. + """ + with app.app_context(): + _disable_csrf() + _set_oauth_env() + db.create_all() + try: + existing = User( + username='alicedict', + email='alice@dictia.ca', + password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', + email_verified=True, + ) + db.session.add(existing) + db.session.commit() + existing_id = existing.id + + # Hostile token: same email, different sub, NOT verified. + mock_token = { + 'userinfo': { + 'sub': 'hostile-sub-999', + 'email': 'alice@dictia.ca', + 'email_verified': False, + 'name': 'Not Alice', + } + } + with patch('src.api.auth.get_oauth_client') as mock_get_client: + client_mock = MagicMock() + client_mock.authorize_access_token.return_value = mock_token + mock_get_client.return_value = client_mock + with app.test_client() as test_client: + resp = test_client.get('/auth/oauth/microsoft/callback') + # Refused → redirect to /login (not /auth/oauth/finish-signup + # and not into recordings.index). + assert resp.status_code == 302 + assert '/login' in resp.headers['Location'] + assert '/auth/oauth/finish-signup' not in resp.headers['Location'] + with test_client.session_transaction() as sess: + # NOT logged in + assert sess.get('_user_id') is None + # NOT pending finish-signup either + assert 'oauth_signup_pending' not in sess + flashes = sess.get('_flashes', []) + # French flash about account already existing + assert any( + 'compte dictia existe' in msg.lower() + or 'connectez-vous' in msg.lower() + for _cat, msg in flashes + ), f'expected manual-link flash, got {flashes}' + # sso_subject untouched on Alice's row + refreshed = db.session.get(User, existing_id) + assert refreshed.sso_subject is None + assert refreshed.sso_provider is None + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 17. C2 — refuse to overwrite an existing sso_subject (second-IdP hijack) +# ---------------------------------------------------------------------- + +def test_oauth_callback_refuses_to_overwrite_existing_sso_subject(): + """Hostile second IdP claims existing user's email — must refuse to overwrite sso_subject. + + Account-hijack protection: Bob is legitimately linked to + ``ms-sub-A`` via Microsoft. An attacker on Google (or even a different + Microsoft tenant) authenticates with ``email='bob@dictia.ca'`` and + ``email_verified=True`` (Google verifies Gmail addresses). Without + this guard, the email-link branch would silently overwrite Bob's + ``sso_subject`` to ``google-sub-X`` and lock Bob out forever. + """ + with app.app_context(): + _disable_csrf() + _set_oauth_env() + db.create_all() + try: + bob = User( + username='bobdict', + email='bob@dictia.ca', + password=None, + sso_provider='microsoft', + sso_subject='ms-sub-A', + email_verified=True, + ) + db.session.add(bob) + db.session.commit() + bob_id = bob.id + + mock_token = { + 'userinfo': { + 'sub': 'google-sub-X', + 'email': 'bob@dictia.ca', + 'email_verified': True, + 'name': 'Bob (Google)', + } + } + with patch('src.api.auth.get_oauth_client') as mock_get_client: + client_mock = MagicMock() + client_mock.authorize_access_token.return_value = mock_token + mock_get_client.return_value = client_mock + with app.test_client() as test_client: + resp = test_client.get('/auth/oauth/google/callback') + assert resp.status_code == 302 + assert '/login' in resp.headers['Location'] + with test_client.session_transaction() as sess: + assert sess.get('_user_id') is None + assert 'oauth_signup_pending' not in sess + flashes = sess.get('_flashes', []) + assert any( + 'déjà liée' in msg.lower() + or 'autre identité' in msg.lower() + for _cat, msg in flashes + ), f'expected already-linked flash, got {flashes}' + # Bob's sso_subject untouched + refreshed = db.session.get(User, bob_id) + assert refreshed.sso_subject == 'ms-sub-A' + assert refreshed.sso_provider == 'microsoft' + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 18. I3 — finish-signup race: parallel /signup created the email +# ---------------------------------------------------------------------- + +def test_finish_signup_handles_concurrent_account_creation(): + """If user is created via /signup in another tab between OAuth callback + and finish-signup POST, /auth/oauth/finish-signup must redirect to + /login with a helpful French flash, not 500. + """ + import time as _time + with app.app_context(): + _disable_csrf() + db.create_all() + try: + # Pre-create the user (simulating the parallel /signup that won) + racer = User( + username='raceuser', + email='race@x.ca', + password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', + email_verified=True, + ) + db.session.add(racer) + db.session.commit() + racer_id = racer.id + + with app.test_client() as client: + with client.session_transaction() as sess: + sess['oauth_signup_pending'] = { + 'provider': 'microsoft', + 'subject': 'ms-sub-race', + 'userinfo': { + 'email': 'race@x.ca', + 'name': 'Race User', + 'given_name': 'Race', + 'family_name': 'User', + }, + 'created_at': _time.time(), + } + resp = client.post('/auth/oauth/finish-signup', data={ + 'consent_cgu': 'y', + 'consent_confidentialite': 'y', + 'consent_marketing': 'y', + 'consent_analytics': 'y', + }) + assert resp.status_code == 302 + assert '/login' in resp.headers['Location'] + with client.session_transaction() as sess: + assert 'oauth_signup_pending' not in sess + assert sess.get('_user_id') is None + flashes = sess.get('_flashes', []) + assert any( + 'compte dictia existe' in msg.lower() + for _cat, msg in flashes + ), f'expected race-flash, got {flashes}' + # Original racer untouched + refreshed = db.session.get(User, racer_id) + assert refreshed.sso_subject is None + assert refreshed.sso_provider is None + # Only one User row for that email + assert User.query.filter_by(email='race@x.ca').count() == 1 + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 19. I2 single-use — magic-link token rejected on second consume (unit test) +# ---------------------------------------------------------------------- + +def test_magic_link_token_is_single_use(): + """Replaying a magic-link token within the validity window must fail + at the function level (no Flask request needed). Complements the + integration coverage in test_magic_link_consume_logs_in_user_with_valid_token.""" + with app.app_context(): + from src.auth.magic_link import ( + generate_magic_link_token, + consume_magic_link_token, + ) + token = generate_magic_link_token(424242) + first = consume_magic_link_token(token) + assert first == 424242, f'first consume should return user_id, got {first}' + second = consume_magic_link_token(token) + assert second is None, f'replay should return None, got {second}' + third = consume_magic_link_token(token) + assert third is None, f'second replay should also return None, got {third}' + + +# ---------------------------------------------------------------------- +# 20. I4 — finish-signup expires stale OAuth signup sessions +# ---------------------------------------------------------------------- + +def test_finish_signup_expires_stale_oauth_session(): + """Session blob older than 15 min triggers a graceful expiry redirect.""" + import time as _time + with app.app_context(): + _disable_csrf() + db.create_all() + try: + with app.test_client() as client: + with client.session_transaction() as sess: + sess['oauth_signup_pending'] = { + 'provider': 'google', + 'subject': 'stale-sub-001', + 'userinfo': { + 'email': 'stale@example.qc.ca', + 'name': 'Stale User', + 'given_name': 'Stale', + 'family_name': 'User', + }, + # 16 minutes ago — past the 15-min expiry + 'created_at': _time.time() - 16 * 60, + } + resp = client.get('/auth/oauth/finish-signup') + assert resp.status_code == 302 + assert '/login' in resp.headers['Location'] + with client.session_transaction() as sess: + assert 'oauth_signup_pending' not in sess + flashes = sess.get('_flashes', []) + assert any( + 'expir' in msg.lower() or 'recommencez' in msg.lower() + for _cat, msg in flashes + ), f'expected expiry flash, got {flashes}' + # No User created + assert User.query.filter_by(email='stale@example.qc.ca').first() is None + finally: + db.session.rollback() + db.drop_all()