fix(auth): B-2.4 security review fixes — OAuth linking + magic link replay

Follow-up to commit 0513e67 addressing 2 critical OAuth account-takeover
vulnerabilities and 5 important issues found in the security review.

Critical fixes:
- C1: gate OAuth email-link on ``email_verified is True`` (strict bool)
  in find_user_by_oauth + callback. Hostile Microsoft personal account
  or Workspace tenant returning email_verified=False (or omitting the
  claim) can no longer auto-link to an existing account. Callback shows
  a friendly French flash + redirect to /login when the email exists
  but the IdP didn't verify it.
- C2: refuse to overwrite an existing sso_subject in find_user_by_oauth.
  A second IdP claiming the victim's email (Google after Microsoft, or
  a hostile second Microsoft tenant) now raises PermissionError instead
  of silently re-binding the User row, which would lock the legitimate
  user out. Callback catches and flashes the error message in French.

Important fixes:
- I1: replace ``except Exception: pass`` in init_oauth_providers with an
  idempotency pre-check on _oauth._clients. Real registration errors
  (bad metadata URL, network failure) now surface as exceptions instead
  of being silently swallowed at app boot.
- I2: single-use enforcement for magic-link tokens via in-process JTI
  cache (_consumed_jtis dict). Replay within the 15-min validity window
  now returns None. SECRET_KEY is now strictly required (no
  default-dev-key fallback). Operator-facing comment documents that
  /auth/magic-link/* should also be scrubbed from Cloudflare/Flask
  access logs as defence in depth.
- I3: pre-check email collision in create_oauth_user_with_consent and
  raise dedicated EmailAlreadyExistsError. Race against parallel /signup
  in another tab between OAuth callback and finish-signup POST now
  redirects to /login with a helpful French flash instead of burning 5
  retry attempts and surfacing a 500.
- I4: oauth_signup_pending session blob now carries a created_at
  timestamp; finish-signup rejects sessions older than 15 min with a
  graceful expiry flash + redirect to /login.
- I5: init_oauth_providers logs an INFO when no providers are enabled
  so operators can spot misconfigured deployments.

Tests: 16 → 21 (5 new):
- test_oauth_callback_refuses_link_when_email_not_verified (C1)
- test_oauth_callback_refuses_to_overwrite_existing_sso_subject (C2)
- test_finish_signup_handles_concurrent_account_creation (I3)
- test_finish_signup_expires_stale_oauth_session (I4)
- test_magic_link_token_is_single_use (I2)

Existing tests updated for new contract:
- test_oauth_callback_links_existing_user_by_email now sets
  email_verified=True in the mock token (required by C1 gate).
- test_finish_signup_requires_cgu_and_confidentialite and
  test_finish_signup_creates_user_and_4_consent_logs now seed
  created_at in the session blob (required by I4 expiry check).
- test_magic_link_consume_logs_in_user_with_valid_token now also
  asserts a second consume of the same token returns None and
  redirects to /auth/magic-link with an invalid/expired flash.

Verified: 21/21 OAuth+magic-link tests pass; 16/16 email service tests
still pass (no regression in adjacent surface).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Allison
2026-04-27 23:50:55 -04:00
parent 0513e67838
commit 3a41bb482d
4 changed files with 548 additions and 34 deletions

View File

@@ -55,6 +55,7 @@ from src.auth.oauth_providers import (
find_user_by_oauth, find_user_by_oauth,
create_oauth_user_with_consent, create_oauth_user_with_consent,
get_oauth_provider_display_name, get_oauth_provider_display_name,
EmailAlreadyExistsError,
) )
from src.auth.magic_link import ( from src.auth.magic_link import (
generate_magic_link_token, generate_magic_link_token,
@@ -595,13 +596,47 @@ def oauth_provider_callback(provider):
) )
return redirect(url_for('auth.login')) return redirect(url_for('auth.login'))
user = find_user_by_oauth(provider, subject, email) # C1: read email_verified strictly. We accept ONLY the literal boolean
# True; None / missing / 'true' string / etc. are treated as False so
# that a hostile or misconfigured IdP cannot upgrade an unverified
# claim into an auto-link to an existing account.
email_verified_raw = userinfo.get('email_verified')
email_verified = email_verified_raw is True
# C2: find_user_by_oauth raises PermissionError if the email matches
# an existing user that is already linked to a different OAuth identity.
try:
user = find_user_by_oauth(
provider, subject, email, email_verified=email_verified,
)
except PermissionError as e:
flash(str(e), 'danger')
return redirect(url_for('auth.login'))
if user: if user:
login_user(user) login_user(user)
audit_sso_login(user.id, details={'provider': provider}) audit_sso_login(user.id, details={'provider': provider})
return redirect(url_for('recordings.index')) return redirect(url_for('recordings.index'))
# C1: if no user was returned but an account exists for this email AND
# email_verified is False, the IdP couldn't (or wouldn't) prove the
# user controls the mailbox. Falling through to finish-signup would
# create a duplicate account on a different identity AND mask the
# takeover attempt; instead refuse explicitly with a friendly flash.
existing_with_email = User.query.filter_by(email=email).first()
if existing_with_email and not email_verified:
provider_display_name = get_oauth_provider_display_name(provider)
flash(
f"Un compte DictIA existe déjà pour ce courriel. "
f"Connectez-vous d'abord avec votre mot de passe pour lier votre "
f"compte {provider_display_name}, ou demandez l'aide du support.",
'danger',
)
return redirect(url_for('auth.login'))
# New user — defer creation until Loi 25 consents are captured. # New user — defer creation until Loi 25 consents are captured.
# I4: include created_at so finish-signup can reject stale sessions.
import time as _time
session['oauth_signup_pending'] = { session['oauth_signup_pending'] = {
'provider': provider, 'provider': provider,
'subject': subject, 'subject': subject,
@@ -611,6 +646,7 @@ def oauth_provider_callback(provider):
'given_name': userinfo.get('given_name', ''), 'given_name': userinfo.get('given_name', ''),
'family_name': userinfo.get('family_name', ''), 'family_name': userinfo.get('family_name', ''),
}, },
'created_at': _time.time(),
} }
return redirect(url_for('auth.oauth_finish_signup')) return redirect(url_for('auth.oauth_finish_signup'))
@@ -626,6 +662,21 @@ def oauth_finish_signup():
pending = session.get('oauth_signup_pending') pending = session.get('oauth_signup_pending')
if not pending: if not pending:
return redirect(url_for('auth.signup')) return redirect(url_for('auth.signup'))
# I4: reject stale OAuth signup sessions (>15 min). Operator might have
# restarted between callback and finish-signup, or user abandoned then
# came back hours later. Either way, restart the OAuth flow rather
# than trust a stale subject claim.
import time as _time
if pending.get('created_at', 0) < _time.time() - 15 * 60:
session.pop('oauth_signup_pending', None)
flash(
"Votre session d'inscription OAuth a expiré. "
"Recommencez avec votre fournisseur.",
'warning',
)
return redirect(url_for('auth.login'))
if current_user.is_authenticated: if current_user.is_authenticated:
session.pop('oauth_signup_pending', None) session.pop('oauth_signup_pending', None)
return redirect(url_for('recordings.index')) return redirect(url_for('recordings.index'))
@@ -673,6 +724,18 @@ def oauth_finish_signup():
ua=ua, ua=ua,
legal_version=SIGNUP_LEGAL_VERSION, legal_version=SIGNUP_LEGAL_VERSION,
) )
except EmailAlreadyExistsError:
# I3: race — a parallel /signup created the email between OAuth
# callback and this POST. Don't 500; redirect to /login with a
# helpful flash so the user knows their existing account is fine.
session.pop('oauth_signup_pending', None)
flash(
"Un compte DictIA existe déjà pour ce courriel. "
"Connectez-vous avec votre mot de passe ou utilisez votre "
"fournisseur d'origine.",
'warning',
)
return redirect(url_for('auth.login'))
except ValueError as e: except ValueError as e:
current_app.logger.warning('OAuth signup failed: %s', e) current_app.logger.warning('OAuth signup failed: %s', e)
flash( flash(

View File

@@ -5,13 +5,26 @@ Stateless tokens via ``itsdangerous`` (no DB column). Same pattern as
the user_id; ``max_age`` is 15 minutes. the user_id; ``max_age`` is 15 minutes.
The compatibility-audit (C2) explicitly forbids new User columns The compatibility-audit (C2) explicitly forbids new User columns
(no ``magic_link_token``, no ``magic_link_sent_at``). Single-use enforcement (no ``magic_link_token``, no ``magic_link_sent_at``). Single-use
is intentionally NOT implemented at this layer because the cost of a enforcement is implemented at the application layer via an in-process
short-window replay (≤15 min, requires the user's email) is acceptable JTI cache (see ``_consumed_jtis`` below) — within a single gunicorn
for the threat model — the user opened the email and clicked the link. worker, a token can be consumed exactly once. Cross-worker uniqueness
If single-use becomes a hard requirement later, add an ip + sent_at index in a multi-worker deployment is best-effort and would require Redis or
to a separate magic-link audit table without touching User. a small DB table; with the route's 10/min rate limit this is acceptable
for B-2.4.
OPERATOR NOTE — log scrubbing:
The magic-link token appears in the URL path (``/auth/magic-link/<token>``)
and will therefore be captured by Cloudflare access logs, Flask's request
log, and the user's browser history. The single-use cache here mitigates
replay-from-logs within the 15-minute validity window, but operators
should ALSO scrub ``/auth/magic-link/*`` from log retention as defence
in depth (the operator action is documented in the security review;
no application-side fix can fully address logs that have already been
written elsewhere).
""" """
import secrets
import time
from typing import Optional from typing import Optional
from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature
@@ -20,21 +33,73 @@ from flask import current_app
MAGIC_LINK_EXPIRY_SECONDS = 15 * 60 # 15 minutes MAGIC_LINK_EXPIRY_SECONDS = 15 * 60 # 15 minutes
_SALT = 'magic-link-login' _SALT = 'magic-link-login'
# In-process consumed-JTI cache: {jti: expires_at_unix_timestamp}.
# Single-use enforcement against replay within the 15-min validity window.
# Cache is best-effort: in a multi-worker gunicorn deployment a JTI
# consumed on worker A would still be accepted on worker B. For production
# multi-worker deployments, replace with Redis or a small DB table.
# For B-2.4 with rate-limiting at 10/min on consume + 5/min on request,
# this provides meaningful single-use enforcement within a worker.
_consumed_jtis: dict = {}
def _serializer() -> URLSafeTimedSerializer: def _serializer() -> URLSafeTimedSerializer:
"""Build a fresh serializer per call (cheap; reads SECRET_KEY from app config).""" """Build a fresh serializer per call (cheap; reads SECRET_KEY from app config).
secret_key = current_app.config.get('SECRET_KEY', 'default-dev-key')
Raises:
RuntimeError: if SECRET_KEY is missing from app config. We refuse
to fall back to a default key because that would let anyone
forge magic-link tokens against any deployment that forgot
to set SECRET_KEY.
"""
secret_key = current_app.config.get('SECRET_KEY')
if not secret_key:
raise RuntimeError(
"SECRET_KEY must be configured for magic-link tokens"
)
return URLSafeTimedSerializer(secret_key, salt=_SALT) return URLSafeTimedSerializer(secret_key, salt=_SALT)
def _purge_expired_jtis() -> None:
"""Drop entries past their expiry to bound memory."""
now = time.time()
for jti in [j for j, exp in _consumed_jtis.items() if exp < now]:
_consumed_jtis.pop(jti, None)
def generate_magic_link_token(user_id: int) -> str: def generate_magic_link_token(user_id: int) -> str:
"""Sign a magic-link token containing the user_id.""" """Generate a single-use magic-link token (15-min expiry, includes random JTI).
return _serializer().dumps(user_id)
The JTI (JSON Token ID) is a random 16-byte URL-safe string embedded
in the token payload. On consume, the JTI is added to the in-process
``_consumed_jtis`` cache; subsequent consumes of the same token
return None (single-use enforcement).
"""
jti = secrets.token_urlsafe(16)
return _serializer().dumps({'uid': user_id, 'jti': jti})
def consume_magic_link_token(token: str) -> Optional[int]: def consume_magic_link_token(token: str) -> Optional[int]:
"""Return user_id if token is valid and unexpired, else None.""" """Verify + mark token as consumed. Returns user_id once; None on
replay/expired/invalid/malformed.
Single-use enforcement: the JTI is added to ``_consumed_jtis`` on
success; a second call with the same token returns None.
"""
try: try:
return _serializer().loads(token, max_age=MAGIC_LINK_EXPIRY_SECONDS) payload = _serializer().loads(token, max_age=MAGIC_LINK_EXPIRY_SECONDS)
except (SignatureExpired, BadSignature): except (SignatureExpired, BadSignature):
return None return None
if not isinstance(payload, dict):
return None
user_id = payload.get('uid')
jti = payload.get('jti')
if not isinstance(user_id, int) or not isinstance(jti, str):
return None
_purge_expired_jtis()
if jti in _consumed_jtis:
return None # replay — token already consumed
_consumed_jtis[jti] = time.time() + MAGIC_LINK_EXPIRY_SECONDS
return user_id

View File

@@ -48,6 +48,19 @@ _PROVIDER_CONFIG = {
} }
class EmailAlreadyExistsError(Exception):
"""Raised by create_oauth_user_with_consent when email is already taken
between the OAuth callback (where the new-user check passed) and the
finish-signup POST (where the User row is finally inserted).
This protects against a race: a parallel /signup in another tab can
create a User with the same email between callback and finish-signup,
making the OAuth User insert fail with an IntegrityError on the
email-unique constraint. Catching this allows a graceful flash + redirect
instead of a 500.
"""
def is_oauth_provider_enabled(provider: str) -> bool: def is_oauth_provider_enabled(provider: str) -> bool:
"""Return True if the provider has client_id AND client_secret in env.""" """Return True if the provider has client_id AND client_secret in env."""
cfg = _PROVIDER_CONFIG.get(provider) cfg = _PROVIDER_CONFIG.get(provider)
@@ -72,25 +85,33 @@ def init_oauth_providers(app) -> Optional[OAuth]:
global _oauth global _oauth
enabled_providers = [p for p in _PROVIDER_CONFIG if is_oauth_provider_enabled(p)] enabled_providers = [p for p in _PROVIDER_CONFIG if is_oauth_provider_enabled(p)]
if not enabled_providers: if not enabled_providers:
# Operability: log when no providers are enabled so operators don't
# silently lose OAuth login on misconfigured deployments.
app.logger.info(
'OAuth providers: none enabled (set MS_CLIENT_ID/MS_CLIENT_SECRET '
'or GOOGLE_CLIENT_ID/GOOGLE_CLIENT_SECRET to enable).'
)
return None return None
if _oauth is None: if _oauth is None:
_oauth = OAuth(app) _oauth = OAuth(app)
for provider in enabled_providers: for provider in enabled_providers:
cfg = _PROVIDER_CONFIG[provider] cfg = _PROVIDER_CONFIG[provider]
# Authlib's register() is idempotent for the same name; safe to call # Idempotent: skip re-registration if already registered (Authlib caches
# again if already registered (no-op on duplicate). # by name in `_clients`). Real registration errors (bad metadata URL,
try: # network failure) now surface as exceptions instead of being silently
_oauth.register( # swallowed by a bare `except Exception: pass`.
name=provider, if provider in getattr(_oauth, '_clients', {}):
client_id=os.environ[cfg['env_client_id']], app.logger.debug(
client_secret=os.environ[cfg['env_client_secret']], 'OAuth provider %r already registered (skipping)', provider
server_metadata_url=cfg['server_metadata_url'],
client_kwargs={'scope': cfg['scope']},
) )
except Exception: continue
# Already-registered — Authlib raises on duplicate. Acceptable _oauth.register(
# for idempotent app boot. name=provider,
pass client_id=os.environ[cfg['env_client_id']],
client_secret=os.environ[cfg['env_client_secret']],
server_metadata_url=cfg['server_metadata_url'],
client_kwargs={'scope': cfg['scope']},
)
app.logger.info( app.logger.info(
'OAuth providers initialized: %s', ', '.join(enabled_providers) 'OAuth providers initialized: %s', ', '.join(enabled_providers)
) )
@@ -105,18 +126,43 @@ def get_oauth_client(provider: str):
def find_user_by_oauth( def find_user_by_oauth(
provider: str, subject: str, email: Optional[str] provider: str,
subject: str,
email: Optional[str],
email_verified: bool,
) -> Optional[User]: ) -> Optional[User]:
"""Lookup an existing user by sso_subject, then email (link path). """Lookup an existing user by sso_subject, then email (link path).
Args:
provider: 'microsoft' or 'google'.
subject: OAuth ``sub`` claim — stable per (IdP, user) tuple.
email: OAuth ``email`` claim (case-insensitive).
email_verified: MUST be True (the literal boolean) for the
email-link branch to fire. Caller is responsible for reading
``userinfo.get('email_verified') is True`` — we treat anything
else as untrusted.
Returns: Returns:
- User object: known account (login directly). - User object: known account (login directly).
- None: brand-new account caller should defer to consent page. - None: brand-new account (caller defers to finish-signup) OR the
email matched an existing account but ``email_verified is not True``
(caller should refuse to silently link — see oauth callback handler).
On the email-match path, the OAuth identity is bound to the existing Raises:
account on first login. This is safe because the OAuth provider has PermissionError: if an existing email-matched user already has a
already verified the email; we are not granting access to anyone who ``sso_subject`` set (linked to a different OAuth identity). Refusing
couldn't already prove control of the address. to overwrite protects against account-hijack via a second IdP
claiming the victim's email (C2 from the security review).
Security notes:
- Linking by email is gated on ``email_verified is True``. A hostile
IdP that returns ``email_verified=False`` (or omits the claim) does
NOT auto-link to an existing account. This blocks the takeover
vector where an attacker creates a Microsoft personal account or
Workspace tenant claiming a victim's mailbox without verification.
- We refuse to overwrite an existing ``sso_subject``. If Alice is
already linked to ms-sub-A, a second login claiming the same email
from google or another tenant is rejected, not silently re-linked.
""" """
user = User.query.filter_by(sso_subject=subject, sso_provider=provider).first() user = User.query.filter_by(sso_subject=subject, sso_provider=provider).first()
if user: if user:
@@ -124,6 +170,21 @@ def find_user_by_oauth(
if email: if email:
existing_email_user = User.query.filter_by(email=email.lower().strip()).first() existing_email_user = User.query.filter_by(email=email.lower().strip()).first()
if existing_email_user: if existing_email_user:
# C1: refuse to auto-link if the IdP did not assert email_verified.
# The caller will refuse to fall through to finish-signup either
# (since that would create a duplicate account on a different
# identity), so returning None here triggers the friendly flash.
if email_verified is not True:
return None
# C2: refuse to overwrite an existing linked OAuth identity.
# If we got here the first branch (sso_subject lookup) didn't
# match — meaning either the user has a different sso_subject
# (account hijack attempt) or no sso_subject at all (legit link).
if existing_email_user.sso_subject:
raise PermissionError(
f"L'adresse {email} est déjà liée à une autre identité fédérée. "
f"Connectez-vous avec votre fournisseur d'origine, ou contactez le support."
)
existing_email_user.sso_provider = provider existing_email_user.sso_provider = provider
existing_email_user.sso_subject = subject existing_email_user.sso_subject = subject
db.session.commit() db.session.commit()
@@ -148,6 +209,13 @@ def create_oauth_user_with_consent(
Always writes 4 ConsentLog rows (one per consent_type), recording Always writes 4 ConsentLog rows (one per consent_type), recording
explicit refusal as ``granted=False`` for the audit trail. explicit refusal as ``granted=False`` for the audit trail.
Raises:
ValueError: if userinfo is missing the email claim.
EmailAlreadyExistsError: if a User with this email already exists
(race against /signup or another OAuth login between the
callback and the finish-signup POST). Caller should handle
with a friendly French flash + redirect to /login.
""" """
from src.models.consent import ConsentLog from src.models.consent import ConsentLog
from src.auth.sso import generate_unique_username from src.auth.sso import generate_unique_username
@@ -157,6 +225,19 @@ def create_oauth_user_with_consent(
if not email: if not email:
raise ValueError('OAuth userinfo missing email') raise ValueError('OAuth userinfo missing email')
# I3: pre-check for the email-collision race. The username retry loop
# below ONLY helps with username collisions; a duplicate email would
# burn 5 attempts and then re-raise IntegrityError, which surfaces as
# a 500. Detect it once here and raise the dedicated exception so the
# caller can render a friendly "compte existe déjà" flash.
existing = User.query.filter_by(email=email).first()
if existing:
raise EmailAlreadyExistsError(
f"Account with email {email} already exists; cannot create via "
f"OAuth signup. User should sign in with their original method "
f"or contact support."
)
name = (userinfo.get('name') or '').strip() name = (userinfo.get('name') or '').strip()
if not name: if not name:
first = (userinfo.get('given_name') or '').strip() first = (userinfo.get('given_name') or '').strip()

View File

@@ -202,7 +202,12 @@ def test_oauth_callback_logs_in_existing_user_by_subject():
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def test_oauth_callback_links_existing_user_by_email(): def test_oauth_callback_links_existing_user_by_email():
"""User with matching email but no sso_subject gets linked + logged in.""" """User with matching email but no sso_subject gets linked + logged in.
Requires email_verified=True from the IdP — see test
test_oauth_callback_refuses_link_when_email_not_verified for the
negative case.
"""
with app.app_context(): with app.app_context():
_disable_csrf() _disable_csrf()
_set_oauth_env() _set_oauth_env()
@@ -222,6 +227,7 @@ def test_oauth_callback_links_existing_user_by_email():
'userinfo': { 'userinfo': {
'sub': 'google-new-sub-789', 'sub': 'google-new-sub-789',
'email': 'emaillink@example.qc.ca', 'email': 'emaillink@example.qc.ca',
'email_verified': True, # required for auto-link
'name': 'Email Link User', 'name': 'Email Link User',
} }
} }
@@ -249,6 +255,7 @@ def test_oauth_callback_links_existing_user_by_email():
def test_finish_signup_requires_cgu_and_confidentialite(): def test_finish_signup_requires_cgu_and_confidentialite():
"""POST without CGU+confidentialite returns 400; no User created; session preserved.""" """POST without CGU+confidentialite returns 400; no User created; session preserved."""
import time as _time
with app.app_context(): with app.app_context():
_disable_csrf() _disable_csrf()
db.create_all() db.create_all()
@@ -264,6 +271,7 @@ def test_finish_signup_requires_cgu_and_confidentialite():
'given_name': 'Pending', 'given_name': 'Pending',
'family_name': 'User', 'family_name': 'User',
}, },
'created_at': _time.time(),
} }
resp = client.post('/auth/oauth/finish-signup', data={ resp = client.post('/auth/oauth/finish-signup', data={
# No consents # No consents
@@ -287,6 +295,7 @@ def test_finish_signup_requires_cgu_and_confidentialite():
def test_finish_signup_creates_user_and_4_consent_logs(): def test_finish_signup_creates_user_and_4_consent_logs():
"""POST with all consents → User created with 4 ConsentLog rows + login.""" """POST with all consents → User created with 4 ConsentLog rows + login."""
import time as _time
with app.app_context(): with app.app_context():
_disable_csrf() _disable_csrf()
db.create_all() db.create_all()
@@ -302,6 +311,7 @@ def test_finish_signup_creates_user_and_4_consent_logs():
'given_name': 'Success', 'given_name': 'Success',
'family_name': 'User', 'family_name': 'User',
}, },
'created_at': _time.time(),
} }
resp = client.post('/auth/oauth/finish-signup', data={ resp = client.post('/auth/oauth/finish-signup', data={
'consent_cgu': 'y', 'consent_cgu': 'y',
@@ -446,7 +456,18 @@ def test_magic_link_request_skips_unverified_user_silently():
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def test_magic_link_consume_logs_in_user_with_valid_token(): def test_magic_link_consume_logs_in_user_with_valid_token():
"""Valid token → user logged in + redirect to recordings.index.""" """Valid token → user logged in + redirect to recordings.index. Second
consume of the same token is refused (single-use enforcement).
Note on the g.pop('_login_user') below: when test_client requests run
inside an outer ``with app.app_context()`` block, Flask-Login caches
the user on ``g._login_user`` and that cache persists to the next
request because ``g`` is bound to the app context, not the request
context. Production (no outer app_context) gets a fresh g per request
and is unaffected. We pop the cache between requests to simulate the
fresh-request behavior.
"""
from flask import g
with app.app_context(): with app.app_context():
_disable_csrf() _disable_csrf()
db.create_all() db.create_all()
@@ -468,6 +489,27 @@ def test_magic_link_consume_logs_in_user_with_valid_token():
assert '/auth/magic-link' not in resp.headers['Location'] assert '/auth/magic-link' not in resp.headers['Location']
with client.session_transaction() as sess: with client.session_transaction() as sess:
assert sess.get('_user_id') == str(user.id) assert sess.get('_user_id') == str(user.id)
# Clear Flask-Login's cached current_user so the next test_client
# request doesn't see the previous user as still-authenticated
# (artifact of running multiple test_clients inside a shared
# app_context — see docstring).
g.pop('_login_user', None)
# Single-use: replay the same token in a fresh client; must be
# refused.
with app.test_client() as client2:
resp2 = client2.get(f'/auth/magic-link/{token}', follow_redirects=False)
assert resp2.status_code == 302
# Refused → redirected back to /auth/magic-link (the request page)
assert '/auth/magic-link' in resp2.headers['Location']
with client2.session_transaction() as sess:
assert sess.get('_user_id') is None
flashes = sess.get('_flashes', [])
assert any(
'invalide' in msg.lower() or 'expir' in msg.lower()
for _cat, msg in flashes
), f'expected invalid/expired flash on replay, got {flashes}'
finally: finally:
db.session.rollback() db.session.rollback()
db.drop_all() db.drop_all()
@@ -612,3 +654,266 @@ def test_send_magic_link_email_french_branded():
finally: finally:
db.session.rollback() db.session.rollback()
db.drop_all() db.drop_all()
# ----------------------------------------------------------------------
# 16. C1 — refuse to auto-link when IdP did not verify the email
# ----------------------------------------------------------------------
def test_oauth_callback_refuses_link_when_email_not_verified():
"""Hostile IdP returns email_verified=False — must NOT auto-link to existing user.
Account-takeover protection: a Microsoft personal account or hostile
Workspace tenant could issue a token claiming
``email='alice@dictia.ca'`` without ever proving Alice controls that
mailbox. Falling through and auto-linking would let the attacker log
in as Alice. The fix gates email-link on ``email_verified is True``.
"""
with app.app_context():
_disable_csrf()
_set_oauth_env()
db.create_all()
try:
existing = User(
username='alicedict',
email='alice@dictia.ca',
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
email_verified=True,
)
db.session.add(existing)
db.session.commit()
existing_id = existing.id
# Hostile token: same email, different sub, NOT verified.
mock_token = {
'userinfo': {
'sub': 'hostile-sub-999',
'email': 'alice@dictia.ca',
'email_verified': False,
'name': 'Not Alice',
}
}
with patch('src.api.auth.get_oauth_client') as mock_get_client:
client_mock = MagicMock()
client_mock.authorize_access_token.return_value = mock_token
mock_get_client.return_value = client_mock
with app.test_client() as test_client:
resp = test_client.get('/auth/oauth/microsoft/callback')
# Refused → redirect to /login (not /auth/oauth/finish-signup
# and not into recordings.index).
assert resp.status_code == 302
assert '/login' in resp.headers['Location']
assert '/auth/oauth/finish-signup' not in resp.headers['Location']
with test_client.session_transaction() as sess:
# NOT logged in
assert sess.get('_user_id') is None
# NOT pending finish-signup either
assert 'oauth_signup_pending' not in sess
flashes = sess.get('_flashes', [])
# French flash about account already existing
assert any(
'compte dictia existe' in msg.lower()
or 'connectez-vous' in msg.lower()
for _cat, msg in flashes
), f'expected manual-link flash, got {flashes}'
# sso_subject untouched on Alice's row
refreshed = db.session.get(User, existing_id)
assert refreshed.sso_subject is None
assert refreshed.sso_provider is None
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 17. C2 — refuse to overwrite an existing sso_subject (second-IdP hijack)
# ----------------------------------------------------------------------
def test_oauth_callback_refuses_to_overwrite_existing_sso_subject():
"""Hostile second IdP claims existing user's email — must refuse to overwrite sso_subject.
Account-hijack protection: Bob is legitimately linked to
``ms-sub-A`` via Microsoft. An attacker on Google (or even a different
Microsoft tenant) authenticates with ``email='bob@dictia.ca'`` and
``email_verified=True`` (Google verifies Gmail addresses). Without
this guard, the email-link branch would silently overwrite Bob's
``sso_subject`` to ``google-sub-X`` and lock Bob out forever.
"""
with app.app_context():
_disable_csrf()
_set_oauth_env()
db.create_all()
try:
bob = User(
username='bobdict',
email='bob@dictia.ca',
password=None,
sso_provider='microsoft',
sso_subject='ms-sub-A',
email_verified=True,
)
db.session.add(bob)
db.session.commit()
bob_id = bob.id
mock_token = {
'userinfo': {
'sub': 'google-sub-X',
'email': 'bob@dictia.ca',
'email_verified': True,
'name': 'Bob (Google)',
}
}
with patch('src.api.auth.get_oauth_client') as mock_get_client:
client_mock = MagicMock()
client_mock.authorize_access_token.return_value = mock_token
mock_get_client.return_value = client_mock
with app.test_client() as test_client:
resp = test_client.get('/auth/oauth/google/callback')
assert resp.status_code == 302
assert '/login' in resp.headers['Location']
with test_client.session_transaction() as sess:
assert sess.get('_user_id') is None
assert 'oauth_signup_pending' not in sess
flashes = sess.get('_flashes', [])
assert any(
'déjà liée' in msg.lower()
or 'autre identité' in msg.lower()
for _cat, msg in flashes
), f'expected already-linked flash, got {flashes}'
# Bob's sso_subject untouched
refreshed = db.session.get(User, bob_id)
assert refreshed.sso_subject == 'ms-sub-A'
assert refreshed.sso_provider == 'microsoft'
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 18. I3 — finish-signup race: parallel /signup created the email
# ----------------------------------------------------------------------
def test_finish_signup_handles_concurrent_account_creation():
"""If user is created via /signup in another tab between OAuth callback
and finish-signup POST, /auth/oauth/finish-signup must redirect to
/login with a helpful French flash, not 500.
"""
import time as _time
with app.app_context():
_disable_csrf()
db.create_all()
try:
# Pre-create the user (simulating the parallel /signup that won)
racer = User(
username='raceuser',
email='race@x.ca',
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
email_verified=True,
)
db.session.add(racer)
db.session.commit()
racer_id = racer.id
with app.test_client() as client:
with client.session_transaction() as sess:
sess['oauth_signup_pending'] = {
'provider': 'microsoft',
'subject': 'ms-sub-race',
'userinfo': {
'email': 'race@x.ca',
'name': 'Race User',
'given_name': 'Race',
'family_name': 'User',
},
'created_at': _time.time(),
}
resp = client.post('/auth/oauth/finish-signup', data={
'consent_cgu': 'y',
'consent_confidentialite': 'y',
'consent_marketing': 'y',
'consent_analytics': 'y',
})
assert resp.status_code == 302
assert '/login' in resp.headers['Location']
with client.session_transaction() as sess:
assert 'oauth_signup_pending' not in sess
assert sess.get('_user_id') is None
flashes = sess.get('_flashes', [])
assert any(
'compte dictia existe' in msg.lower()
for _cat, msg in flashes
), f'expected race-flash, got {flashes}'
# Original racer untouched
refreshed = db.session.get(User, racer_id)
assert refreshed.sso_subject is None
assert refreshed.sso_provider is None
# Only one User row for that email
assert User.query.filter_by(email='race@x.ca').count() == 1
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 19. I2 single-use — magic-link token rejected on second consume (unit test)
# ----------------------------------------------------------------------
def test_magic_link_token_is_single_use():
"""Replaying a magic-link token within the validity window must fail
at the function level (no Flask request needed). Complements the
integration coverage in test_magic_link_consume_logs_in_user_with_valid_token."""
with app.app_context():
from src.auth.magic_link import (
generate_magic_link_token,
consume_magic_link_token,
)
token = generate_magic_link_token(424242)
first = consume_magic_link_token(token)
assert first == 424242, f'first consume should return user_id, got {first}'
second = consume_magic_link_token(token)
assert second is None, f'replay should return None, got {second}'
third = consume_magic_link_token(token)
assert third is None, f'second replay should also return None, got {third}'
# ----------------------------------------------------------------------
# 20. I4 — finish-signup expires stale OAuth signup sessions
# ----------------------------------------------------------------------
def test_finish_signup_expires_stale_oauth_session():
"""Session blob older than 15 min triggers a graceful expiry redirect."""
import time as _time
with app.app_context():
_disable_csrf()
db.create_all()
try:
with app.test_client() as client:
with client.session_transaction() as sess:
sess['oauth_signup_pending'] = {
'provider': 'google',
'subject': 'stale-sub-001',
'userinfo': {
'email': 'stale@example.qc.ca',
'name': 'Stale User',
'given_name': 'Stale',
'family_name': 'User',
},
# 16 minutes ago — past the 15-min expiry
'created_at': _time.time() - 16 * 60,
}
resp = client.get('/auth/oauth/finish-signup')
assert resp.status_code == 302
assert '/login' in resp.headers['Location']
with client.session_transaction() as sess:
assert 'oauth_signup_pending' not in sess
flashes = sess.get('_flashes', [])
assert any(
'expir' in msg.lower() or 'recommencez' in msg.lower()
for _cat, msg in flashes
), f'expected expiry flash, got {flashes}'
# No User created
assert User.query.filter_by(email='stale@example.qc.ca').first() is None
finally:
db.session.rollback()
db.drop_all()