Follow-up to commit 0513e67 addressing 2 critical OAuth account-takeover
vulnerabilities and 5 important issues found in the security review.
Critical fixes:
- C1: gate OAuth email-link on ``email_verified is True`` (strict bool)
in find_user_by_oauth + callback. Hostile Microsoft personal account
or Workspace tenant returning email_verified=False (or omitting the
claim) can no longer auto-link to an existing account. Callback shows
a friendly French flash + redirect to /login when the email exists
but the IdP didn't verify it.
- C2: refuse to overwrite an existing sso_subject in find_user_by_oauth.
A second IdP claiming the victim's email (Google after Microsoft, or
a hostile second Microsoft tenant) now raises PermissionError instead
of silently re-binding the User row, which would lock the legitimate
user out. Callback catches and flashes the error message in French.
Important fixes:
- I1: replace ``except Exception: pass`` in init_oauth_providers with an
idempotency pre-check on _oauth._clients. Real registration errors
(bad metadata URL, network failure) now surface as exceptions instead
of being silently swallowed at app boot.
- I2: single-use enforcement for magic-link tokens via in-process JTI
cache (_consumed_jtis dict). Replay within the 15-min validity window
now returns None. SECRET_KEY is now strictly required (no
default-dev-key fallback). Operator-facing comment documents that
/auth/magic-link/* should also be scrubbed from Cloudflare/Flask
access logs as defence in depth.
- I3: pre-check email collision in create_oauth_user_with_consent and
raise dedicated EmailAlreadyExistsError. Race against parallel /signup
in another tab between OAuth callback and finish-signup POST now
redirects to /login with a helpful French flash instead of burning 5
retry attempts and surfacing a 500.
- I4: oauth_signup_pending session blob now carries a created_at
timestamp; finish-signup rejects sessions older than 15 min with a
graceful expiry flash + redirect to /login.
- I5: init_oauth_providers logs an INFO when no providers are enabled
so operators can spot misconfigured deployments.
Tests: 16 → 21 (5 new):
- test_oauth_callback_refuses_link_when_email_not_verified (C1)
- test_oauth_callback_refuses_to_overwrite_existing_sso_subject (C2)
- test_finish_signup_handles_concurrent_account_creation (I3)
- test_finish_signup_expires_stale_oauth_session (I4)
- test_magic_link_token_is_single_use (I2)
Existing tests updated for new contract:
- test_oauth_callback_links_existing_user_by_email now sets
email_verified=True in the mock token (required by C1 gate).
- test_finish_signup_requires_cgu_and_confidentialite and
test_finish_signup_creates_user_and_4_consent_logs now seed
created_at in the session blob (required by I4 expiry check).
- test_magic_link_consume_logs_in_user_with_valid_token now also
asserts a second consume of the same token returns None and
redirects to /auth/magic-link with an invalid/expired flash.
Verified: 21/21 OAuth+magic-link tests pass; 16/16 email service tests
still pass (no regression in adjacent surface).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
920 lines
41 KiB
Python
920 lines
41 KiB
Python
"""Tests for B-2.4 — OAuth Microsoft 365 + Google + magic-link login.
|
|
|
|
Covers:
|
|
- is_oauth_provider_enabled() reads MS/GOOGLE env vars correctly.
|
|
- /auth/oauth/<provider>/login redirects to /login when provider disabled.
|
|
- /auth/oauth/<provider>/callback for new user → /auth/oauth/finish-signup
|
|
with session['oauth_signup_pending'] populated.
|
|
- /auth/oauth/<provider>/callback for existing user (sso_subject) → logs in.
|
|
- /auth/oauth/<provider>/callback for existing user (email match) → links.
|
|
- /auth/oauth/finish-signup requires CGU + confidentialité consents.
|
|
- /auth/oauth/finish-signup creates User + 4 ConsentLog rows on success.
|
|
- /auth/magic-link returns generic flash for unknown email (anti-enumeration).
|
|
- /auth/magic-link sends email for known verified user.
|
|
- /auth/magic-link skips unverified users silently (anti-enumeration).
|
|
- /auth/magic-link/<token> logs in user with valid token.
|
|
- /auth/magic-link/<token> rejects expired tokens.
|
|
- /auth/magic-link/<token> rejects tampered tokens.
|
|
- /auth/magic-link/<token> rejects unverified users.
|
|
|
|
Note: pytest cannot collect this file on Windows native because src/init_db.py
|
|
imports `fcntl` (POSIX-only). Use tests/_run_oauth_magic_link_windows.py.
|
|
"""
|
|
import os
|
|
import sys
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
|
|
os.environ.setdefault('SECRET_KEY', 'test-secret-key-oauth')
|
|
os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false')
|
|
os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false')
|
|
# Pre-set OAuth env vars so init_oauth_providers registers clients at app boot.
|
|
os.environ.setdefault('MS_CLIENT_ID', 'test-ms-client-id')
|
|
os.environ.setdefault('MS_CLIENT_SECRET', 'test-ms-client-secret')
|
|
os.environ.setdefault('GOOGLE_CLIENT_ID', 'test-google-client-id')
|
|
os.environ.setdefault('GOOGLE_CLIENT_SECRET', 'test-google-client-secret')
|
|
|
|
from src.app import app, db # noqa: E402
|
|
from src.models.user import User # noqa: E402
|
|
from src.models.consent import ConsentLog # noqa: E402
|
|
|
|
|
|
def _disable_csrf():
|
|
app.config['WTF_CSRF_ENABLED'] = False
|
|
|
|
|
|
def _set_oauth_env():
|
|
"""Ensure OAuth env vars are set (some tests clear them)."""
|
|
os.environ['MS_CLIENT_ID'] = 'test-ms-client-id'
|
|
os.environ['MS_CLIENT_SECRET'] = 'test-ms-client-secret'
|
|
os.environ['GOOGLE_CLIENT_ID'] = 'test-google-client-id'
|
|
os.environ['GOOGLE_CLIENT_SECRET'] = 'test-google-client-secret'
|
|
|
|
|
|
def _clear_oauth_env():
|
|
for k in ('MS_CLIENT_ID', 'MS_CLIENT_SECRET', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET'):
|
|
os.environ.pop(k, None)
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 1. is_oauth_provider_enabled — env var detection
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_provider_enabled_when_env_vars_present():
|
|
"""Microsoft is enabled when MS_CLIENT_ID + MS_CLIENT_SECRET are both set."""
|
|
_set_oauth_env()
|
|
from src.auth.oauth_providers import is_oauth_provider_enabled
|
|
assert is_oauth_provider_enabled('microsoft') is True
|
|
assert is_oauth_provider_enabled('google') is True
|
|
|
|
|
|
def test_oauth_provider_disabled_when_env_vars_missing():
|
|
"""Provider is disabled if either CLIENT_ID or CLIENT_SECRET is missing."""
|
|
_clear_oauth_env()
|
|
try:
|
|
from src.auth.oauth_providers import is_oauth_provider_enabled
|
|
assert is_oauth_provider_enabled('microsoft') is False
|
|
assert is_oauth_provider_enabled('google') is False
|
|
assert is_oauth_provider_enabled('unknown') is False
|
|
finally:
|
|
_set_oauth_env()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 2. /auth/oauth/<provider>/login — disabled provider
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_login_redirects_when_provider_disabled():
|
|
"""GET /auth/oauth/microsoft/login without env vars → 302 to /login + French flash."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_clear_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
client = app.test_client()
|
|
resp = client.get('/auth/oauth/microsoft/login', follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert '/login' in resp.headers['Location']
|
|
# Flash assertion — follow redirect and inspect body
|
|
with client.session_transaction() as sess:
|
|
flashes = sess.get('_flashes', [])
|
|
assert any("n'est pas activée" in msg for _cat, msg in flashes), (
|
|
f'expected French flash about provider disabled, got: {flashes}'
|
|
)
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
_set_oauth_env()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 3. /auth/oauth/<provider>/callback — new user → finish-signup
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_callback_creates_session_for_new_user():
|
|
"""Callback for a NEW user stores userinfo in session + redirects to finish-signup."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_set_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
mock_token = {
|
|
'userinfo': {
|
|
'sub': 'ms-sub-new-123',
|
|
'email': 'newuser@example.qc.ca',
|
|
'name': 'New User',
|
|
'given_name': 'New',
|
|
'family_name': 'User',
|
|
}
|
|
}
|
|
with patch('src.api.auth.get_oauth_client') as mock_get_client:
|
|
client_mock = MagicMock()
|
|
client_mock.authorize_access_token.return_value = mock_token
|
|
mock_get_client.return_value = client_mock
|
|
with app.test_client() as test_client:
|
|
resp = test_client.get('/auth/oauth/microsoft/callback')
|
|
assert resp.status_code == 302, f'Expected 302, got {resp.status_code} body={resp.data[:200]!r}'
|
|
assert '/auth/oauth/finish-signup' in resp.headers['Location']
|
|
with test_client.session_transaction() as sess:
|
|
assert 'oauth_signup_pending' in sess
|
|
pending = sess['oauth_signup_pending']
|
|
assert pending['provider'] == 'microsoft'
|
|
assert pending['subject'] == 'ms-sub-new-123'
|
|
assert pending['userinfo']['email'] == 'newuser@example.qc.ca'
|
|
assert pending['userinfo']['name'] == 'New User'
|
|
# No User row created yet
|
|
assert User.query.filter_by(email='newuser@example.qc.ca').first() is None
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 4. /auth/oauth/<provider>/callback — existing user by sso_subject
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_callback_logs_in_existing_user_by_subject():
|
|
"""User with matching sso_subject is logged in directly without consent flow."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_set_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
existing = User(
|
|
username='existing1',
|
|
email='existing@example.qc.ca',
|
|
password=None,
|
|
sso_provider='microsoft',
|
|
sso_subject='ms-sub-existing-456',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(existing)
|
|
db.session.commit()
|
|
|
|
mock_token = {
|
|
'userinfo': {
|
|
'sub': 'ms-sub-existing-456',
|
|
'email': 'existing@example.qc.ca',
|
|
'name': 'Existing User',
|
|
}
|
|
}
|
|
with patch('src.api.auth.get_oauth_client') as mock_get_client:
|
|
client_mock = MagicMock()
|
|
client_mock.authorize_access_token.return_value = mock_token
|
|
mock_get_client.return_value = client_mock
|
|
with app.test_client() as test_client:
|
|
resp = test_client.get('/auth/oauth/microsoft/callback')
|
|
assert resp.status_code == 302
|
|
assert '/auth/oauth/finish-signup' not in resp.headers['Location']
|
|
with test_client.session_transaction() as sess:
|
|
# User is logged in
|
|
assert '_user_id' in sess
|
|
assert sess['_user_id'] == str(existing.id)
|
|
assert 'oauth_signup_pending' not in sess
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 5. /auth/oauth/<provider>/callback — existing user by email
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_callback_links_existing_user_by_email():
|
|
"""User with matching email but no sso_subject gets linked + logged in.
|
|
|
|
Requires email_verified=True from the IdP — see test
|
|
test_oauth_callback_refuses_link_when_email_not_verified for the
|
|
negative case.
|
|
"""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_set_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
existing = User(
|
|
username='emaillink',
|
|
email='emaillink@example.qc.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(existing)
|
|
db.session.commit()
|
|
existing_id = existing.id
|
|
|
|
mock_token = {
|
|
'userinfo': {
|
|
'sub': 'google-new-sub-789',
|
|
'email': 'emaillink@example.qc.ca',
|
|
'email_verified': True, # required for auto-link
|
|
'name': 'Email Link User',
|
|
}
|
|
}
|
|
with patch('src.api.auth.get_oauth_client') as mock_get_client:
|
|
client_mock = MagicMock()
|
|
client_mock.authorize_access_token.return_value = mock_token
|
|
mock_get_client.return_value = client_mock
|
|
with app.test_client() as test_client:
|
|
resp = test_client.get('/auth/oauth/google/callback')
|
|
assert resp.status_code == 302
|
|
# Refresh to read updated columns
|
|
refreshed = db.session.get(User, existing_id)
|
|
assert refreshed.sso_subject == 'google-new-sub-789'
|
|
assert refreshed.sso_provider == 'google'
|
|
with test_client.session_transaction() as sess:
|
|
assert sess.get('_user_id') == str(existing_id)
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 6. /auth/oauth/finish-signup — required consents
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_finish_signup_requires_cgu_and_confidentialite():
|
|
"""POST without CGU+confidentialite returns 400; no User created; session preserved."""
|
|
import time as _time
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
with app.test_client() as client:
|
|
with client.session_transaction() as sess:
|
|
sess['oauth_signup_pending'] = {
|
|
'provider': 'microsoft',
|
|
'subject': 'ms-sub-pending-001',
|
|
'userinfo': {
|
|
'email': 'pending@example.qc.ca',
|
|
'name': 'Pending User',
|
|
'given_name': 'Pending',
|
|
'family_name': 'User',
|
|
},
|
|
'created_at': _time.time(),
|
|
}
|
|
resp = client.post('/auth/oauth/finish-signup', data={
|
|
# No consents
|
|
})
|
|
assert resp.status_code == 400
|
|
body = resp.data.decode('utf-8').lower()
|
|
assert "conditions d'utilisation" in body or 'cgu' in body
|
|
assert 'confidentialit' in body
|
|
assert User.query.filter_by(email='pending@example.qc.ca').first() is None
|
|
# Session still has the pending entry (so user can retry)
|
|
with client.session_transaction() as sess:
|
|
assert 'oauth_signup_pending' in sess
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 7. /auth/oauth/finish-signup — success path
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_finish_signup_creates_user_and_4_consent_logs():
|
|
"""POST with all consents → User created with 4 ConsentLog rows + login."""
|
|
import time as _time
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
with app.test_client() as client:
|
|
with client.session_transaction() as sess:
|
|
sess['oauth_signup_pending'] = {
|
|
'provider': 'google',
|
|
'subject': 'google-sub-success-002',
|
|
'userinfo': {
|
|
'email': 'success@example.qc.ca',
|
|
'name': 'Success User',
|
|
'given_name': 'Success',
|
|
'family_name': 'User',
|
|
},
|
|
'created_at': _time.time(),
|
|
}
|
|
resp = client.post('/auth/oauth/finish-signup', data={
|
|
'consent_cgu': 'y',
|
|
'consent_confidentialite': 'y',
|
|
'consent_marketing': 'y',
|
|
# analytics absent → recorded as False
|
|
}, headers={'CF-Connecting-IP': '198.51.100.7', 'User-Agent': 'TestUA/1.0'})
|
|
assert resp.status_code == 302
|
|
|
|
user = User.query.filter_by(email='success@example.qc.ca').first()
|
|
assert user is not None
|
|
assert user.sso_provider == 'google'
|
|
assert user.sso_subject == 'google-sub-success-002'
|
|
assert user.password is None
|
|
assert user.email_verified is True
|
|
assert user.name == 'Success User'
|
|
|
|
consents = ConsentLog.query.filter_by(user_id=user.id).all()
|
|
assert len(consents) == 4
|
|
state = {c.consent_type: c.granted for c in consents}
|
|
assert state == {
|
|
'cgu': True,
|
|
'confidentialite': True,
|
|
'marketing': True,
|
|
'analytics': False,
|
|
}
|
|
assert all(c.ip_address == '198.51.100.7' for c in consents)
|
|
assert all(c.user_agent == 'TestUA/1.0' for c in consents)
|
|
|
|
# Session cleaned + logged in
|
|
with client.session_transaction() as sess:
|
|
assert 'oauth_signup_pending' not in sess
|
|
assert sess.get('_user_id') == str(user.id)
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 8. /auth/magic-link — generic flash for unknown email
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_request_returns_generic_message_for_unknown_email():
|
|
"""Unknown email → generic flash; no email sent; no error leak."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
with patch('src.services.email._send_email') as mock_send:
|
|
with app.test_client() as client:
|
|
resp = client.post('/auth/magic-link', data={'email': 'doesnotexist@example.qc.ca'})
|
|
assert resp.status_code in (200, 302)
|
|
assert mock_send.call_count == 0
|
|
body = resp.data.decode('utf-8').lower()
|
|
# Generic French message — no leak
|
|
assert 'doesnotexist@example.qc.ca' in body # display the email
|
|
# No leak language like "introuvable" / "n'existe pas"
|
|
assert 'introuvable' not in body
|
|
assert "n'existe pas" not in body
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 9. /auth/magic-link — sends email for known verified user
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_request_sends_email_for_known_verified_user():
|
|
"""Known + verified user triggers _send_email exactly once with the magic URL."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='magicverified',
|
|
email='magic@example.qc.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
# Force SMTP-configured branch for the test (both the route gate AND
|
|
# the inner gate inside src.services.email.send_magic_link_email).
|
|
with patch('src.api.auth.is_smtp_configured', return_value=True), \
|
|
patch('src.services.email.is_smtp_configured', return_value=True), \
|
|
patch('src.services.email._send_email') as mock_send:
|
|
mock_send.return_value = True
|
|
with app.test_client() as client:
|
|
resp = client.post('/auth/magic-link', data={'email': 'magic@example.qc.ca'})
|
|
assert resp.status_code in (200, 302)
|
|
assert mock_send.call_count == 1
|
|
# Email body should contain the magic URL
|
|
args, _kwargs = mock_send.call_args
|
|
# _send_email(to_email, subject, html_body, text_body)
|
|
to_email, subject, html_body, text_body = args
|
|
assert to_email == 'magic@example.qc.ca'
|
|
assert 'DictIA' in subject
|
|
assert '/auth/magic-link/' in html_body
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 10. /auth/magic-link — skips unverified users silently
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_request_skips_unverified_user_silently():
|
|
"""Unverified user → email NOT sent, but flash STILL generic (anti-enumeration)."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='magicunverif',
|
|
email='unverif@example.qc.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=False,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
with patch('src.api.auth.is_smtp_configured', return_value=True), \
|
|
patch('src.services.email._send_email') as mock_send:
|
|
with app.test_client() as client:
|
|
resp = client.post('/auth/magic-link', data={'email': 'unverif@example.qc.ca'})
|
|
assert resp.status_code in (200, 302)
|
|
assert mock_send.call_count == 0
|
|
body = resp.data.decode('utf-8').lower()
|
|
# No leak: same message as for unknown emails
|
|
assert 'non vérifié' not in body
|
|
assert 'unverified' not in body
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 11. /auth/magic-link/<token> — valid token logs in
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_consume_logs_in_user_with_valid_token():
|
|
"""Valid token → user logged in + redirect to recordings.index. Second
|
|
consume of the same token is refused (single-use enforcement).
|
|
|
|
Note on the g.pop('_login_user') below: when test_client requests run
|
|
inside an outer ``with app.app_context()`` block, Flask-Login caches
|
|
the user on ``g._login_user`` and that cache persists to the next
|
|
request because ``g`` is bound to the app context, not the request
|
|
context. Production (no outer app_context) gets a fresh g per request
|
|
and is unaffected. We pop the cache between requests to simulate the
|
|
fresh-request behavior.
|
|
"""
|
|
from flask import g
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='consumeok',
|
|
email='consumeok@example.qc.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
from src.auth.magic_link import generate_magic_link_token
|
|
token = generate_magic_link_token(user.id)
|
|
with app.test_client() as client:
|
|
resp = client.get(f'/auth/magic-link/{token}', follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert '/auth/magic-link' not in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
assert sess.get('_user_id') == str(user.id)
|
|
|
|
# Clear Flask-Login's cached current_user so the next test_client
|
|
# request doesn't see the previous user as still-authenticated
|
|
# (artifact of running multiple test_clients inside a shared
|
|
# app_context — see docstring).
|
|
g.pop('_login_user', None)
|
|
|
|
# Single-use: replay the same token in a fresh client; must be
|
|
# refused.
|
|
with app.test_client() as client2:
|
|
resp2 = client2.get(f'/auth/magic-link/{token}', follow_redirects=False)
|
|
assert resp2.status_code == 302
|
|
# Refused → redirected back to /auth/magic-link (the request page)
|
|
assert '/auth/magic-link' in resp2.headers['Location']
|
|
with client2.session_transaction() as sess:
|
|
assert sess.get('_user_id') is None
|
|
flashes = sess.get('_flashes', [])
|
|
assert any(
|
|
'invalide' in msg.lower() or 'expir' in msg.lower()
|
|
for _cat, msg in flashes
|
|
), f'expected invalid/expired flash on replay, got {flashes}'
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 12. /auth/magic-link/<token> — expired token rejected
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_consume_rejects_expired_token():
|
|
"""Expired token → invalid flash + redirect to /auth/magic-link."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='expired',
|
|
email='expired@example.qc.ca',
|
|
password='x' * 60,
|
|
email_verified=True,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
from itsdangerous import SignatureExpired
|
|
with patch('src.auth.magic_link._serializer') as mock_ser:
|
|
inst = MagicMock()
|
|
inst.loads.side_effect = SignatureExpired('expired')
|
|
mock_ser.return_value = inst
|
|
with app.test_client() as client:
|
|
resp = client.get('/auth/magic-link/dummy-token', follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert '/auth/magic-link' in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
flashes = sess.get('_flashes', [])
|
|
assert any('invalide' in msg.lower() or 'expir' in msg.lower()
|
|
for _cat, msg in flashes), f'expected expiry flash, got {flashes}'
|
|
assert sess.get('_user_id') is None
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 13. /auth/magic-link/<token> — tampered token rejected
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_consume_rejects_invalid_signature():
|
|
"""Garbage token → BadSignature → invalid flash + redirect."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
with app.test_client() as client:
|
|
resp = client.get('/auth/magic-link/garbage.token.value', follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert '/auth/magic-link' in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
flashes = sess.get('_flashes', [])
|
|
assert any('invalide' in msg.lower() or 'expir' in msg.lower()
|
|
for _cat, msg in flashes), f'expected invalid flash, got {flashes}'
|
|
assert sess.get('_user_id') is None
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 14. /auth/magic-link/<token> — unverified user rejected
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_consume_rejects_unverified_user():
|
|
"""Token for unverified user → invalid flash (no leak that token was valid)."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='unverifconsume',
|
|
email='unverifconsume@example.qc.ca',
|
|
password='x' * 60,
|
|
email_verified=False,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
from src.auth.magic_link import generate_magic_link_token
|
|
token = generate_magic_link_token(user.id)
|
|
with app.test_client() as client:
|
|
resp = client.get(f'/auth/magic-link/{token}', follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert '/auth/magic-link' in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
assert sess.get('_user_id') is None
|
|
flashes = sess.get('_flashes', [])
|
|
# Flash should look the same as the bad-signature path
|
|
assert any('invalide' in msg.lower() or 'expir' in msg.lower()
|
|
for _cat, msg in flashes)
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 15. send_magic_link_email helper produces French DictIA-branded email
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_send_magic_link_email_french_branded():
|
|
"""send_magic_link_email() produces French + DictIA-branded HTML body."""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
user = User(
|
|
username='emailtest',
|
|
email='emailtest@example.qc.ca',
|
|
password='x' * 60,
|
|
name='Marie <script>', # XSS canary
|
|
email_verified=True,
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
with patch('src.services.email.is_smtp_configured', return_value=True), \
|
|
patch('src.services.email._send_email') as mock_send:
|
|
mock_send.return_value = True
|
|
from src.services.email import send_magic_link_email
|
|
ok = send_magic_link_email(user, 'https://example.com/auth/magic-link/abc123')
|
|
assert ok is True
|
|
args, _kwargs = mock_send.call_args
|
|
to_email, subject, html_body, text_body = args
|
|
assert to_email == 'emailtest@example.qc.ca'
|
|
assert 'DictIA' in subject
|
|
assert 'connexion' in subject.lower() or 'lien' in subject.lower()
|
|
assert 'https://example.com/auth/magic-link/abc123' in html_body
|
|
assert 'https://example.com/auth/magic-link/abc123' in text_body
|
|
# XSS canary: raw <script> must NOT appear in HTML body (must be escaped)
|
|
assert '<script>' not in html_body
|
|
assert '<script>' in html_body or 'Marie' in html_body
|
|
# 15 minutes mention
|
|
assert '15' in html_body
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 16. C1 — refuse to auto-link when IdP did not verify the email
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_callback_refuses_link_when_email_not_verified():
|
|
"""Hostile IdP returns email_verified=False — must NOT auto-link to existing user.
|
|
|
|
Account-takeover protection: a Microsoft personal account or hostile
|
|
Workspace tenant could issue a token claiming
|
|
``email='alice@dictia.ca'`` without ever proving Alice controls that
|
|
mailbox. Falling through and auto-linking would let the attacker log
|
|
in as Alice. The fix gates email-link on ``email_verified is True``.
|
|
"""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_set_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
existing = User(
|
|
username='alicedict',
|
|
email='alice@dictia.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(existing)
|
|
db.session.commit()
|
|
existing_id = existing.id
|
|
|
|
# Hostile token: same email, different sub, NOT verified.
|
|
mock_token = {
|
|
'userinfo': {
|
|
'sub': 'hostile-sub-999',
|
|
'email': 'alice@dictia.ca',
|
|
'email_verified': False,
|
|
'name': 'Not Alice',
|
|
}
|
|
}
|
|
with patch('src.api.auth.get_oauth_client') as mock_get_client:
|
|
client_mock = MagicMock()
|
|
client_mock.authorize_access_token.return_value = mock_token
|
|
mock_get_client.return_value = client_mock
|
|
with app.test_client() as test_client:
|
|
resp = test_client.get('/auth/oauth/microsoft/callback')
|
|
# Refused → redirect to /login (not /auth/oauth/finish-signup
|
|
# and not into recordings.index).
|
|
assert resp.status_code == 302
|
|
assert '/login' in resp.headers['Location']
|
|
assert '/auth/oauth/finish-signup' not in resp.headers['Location']
|
|
with test_client.session_transaction() as sess:
|
|
# NOT logged in
|
|
assert sess.get('_user_id') is None
|
|
# NOT pending finish-signup either
|
|
assert 'oauth_signup_pending' not in sess
|
|
flashes = sess.get('_flashes', [])
|
|
# French flash about account already existing
|
|
assert any(
|
|
'compte dictia existe' in msg.lower()
|
|
or 'connectez-vous' in msg.lower()
|
|
for _cat, msg in flashes
|
|
), f'expected manual-link flash, got {flashes}'
|
|
# sso_subject untouched on Alice's row
|
|
refreshed = db.session.get(User, existing_id)
|
|
assert refreshed.sso_subject is None
|
|
assert refreshed.sso_provider is None
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 17. C2 — refuse to overwrite an existing sso_subject (second-IdP hijack)
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_oauth_callback_refuses_to_overwrite_existing_sso_subject():
|
|
"""Hostile second IdP claims existing user's email — must refuse to overwrite sso_subject.
|
|
|
|
Account-hijack protection: Bob is legitimately linked to
|
|
``ms-sub-A`` via Microsoft. An attacker on Google (or even a different
|
|
Microsoft tenant) authenticates with ``email='bob@dictia.ca'`` and
|
|
``email_verified=True`` (Google verifies Gmail addresses). Without
|
|
this guard, the email-link branch would silently overwrite Bob's
|
|
``sso_subject`` to ``google-sub-X`` and lock Bob out forever.
|
|
"""
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
_set_oauth_env()
|
|
db.create_all()
|
|
try:
|
|
bob = User(
|
|
username='bobdict',
|
|
email='bob@dictia.ca',
|
|
password=None,
|
|
sso_provider='microsoft',
|
|
sso_subject='ms-sub-A',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(bob)
|
|
db.session.commit()
|
|
bob_id = bob.id
|
|
|
|
mock_token = {
|
|
'userinfo': {
|
|
'sub': 'google-sub-X',
|
|
'email': 'bob@dictia.ca',
|
|
'email_verified': True,
|
|
'name': 'Bob (Google)',
|
|
}
|
|
}
|
|
with patch('src.api.auth.get_oauth_client') as mock_get_client:
|
|
client_mock = MagicMock()
|
|
client_mock.authorize_access_token.return_value = mock_token
|
|
mock_get_client.return_value = client_mock
|
|
with app.test_client() as test_client:
|
|
resp = test_client.get('/auth/oauth/google/callback')
|
|
assert resp.status_code == 302
|
|
assert '/login' in resp.headers['Location']
|
|
with test_client.session_transaction() as sess:
|
|
assert sess.get('_user_id') is None
|
|
assert 'oauth_signup_pending' not in sess
|
|
flashes = sess.get('_flashes', [])
|
|
assert any(
|
|
'déjà liée' in msg.lower()
|
|
or 'autre identité' in msg.lower()
|
|
for _cat, msg in flashes
|
|
), f'expected already-linked flash, got {flashes}'
|
|
# Bob's sso_subject untouched
|
|
refreshed = db.session.get(User, bob_id)
|
|
assert refreshed.sso_subject == 'ms-sub-A'
|
|
assert refreshed.sso_provider == 'microsoft'
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 18. I3 — finish-signup race: parallel /signup created the email
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_finish_signup_handles_concurrent_account_creation():
|
|
"""If user is created via /signup in another tab between OAuth callback
|
|
and finish-signup POST, /auth/oauth/finish-signup must redirect to
|
|
/login with a helpful French flash, not 500.
|
|
"""
|
|
import time as _time
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
# Pre-create the user (simulating the parallel /signup that won)
|
|
racer = User(
|
|
username='raceuser',
|
|
email='race@x.ca',
|
|
password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN',
|
|
email_verified=True,
|
|
)
|
|
db.session.add(racer)
|
|
db.session.commit()
|
|
racer_id = racer.id
|
|
|
|
with app.test_client() as client:
|
|
with client.session_transaction() as sess:
|
|
sess['oauth_signup_pending'] = {
|
|
'provider': 'microsoft',
|
|
'subject': 'ms-sub-race',
|
|
'userinfo': {
|
|
'email': 'race@x.ca',
|
|
'name': 'Race User',
|
|
'given_name': 'Race',
|
|
'family_name': 'User',
|
|
},
|
|
'created_at': _time.time(),
|
|
}
|
|
resp = client.post('/auth/oauth/finish-signup', data={
|
|
'consent_cgu': 'y',
|
|
'consent_confidentialite': 'y',
|
|
'consent_marketing': 'y',
|
|
'consent_analytics': 'y',
|
|
})
|
|
assert resp.status_code == 302
|
|
assert '/login' in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
assert 'oauth_signup_pending' not in sess
|
|
assert sess.get('_user_id') is None
|
|
flashes = sess.get('_flashes', [])
|
|
assert any(
|
|
'compte dictia existe' in msg.lower()
|
|
for _cat, msg in flashes
|
|
), f'expected race-flash, got {flashes}'
|
|
# Original racer untouched
|
|
refreshed = db.session.get(User, racer_id)
|
|
assert refreshed.sso_subject is None
|
|
assert refreshed.sso_provider is None
|
|
# Only one User row for that email
|
|
assert User.query.filter_by(email='race@x.ca').count() == 1
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 19. I2 single-use — magic-link token rejected on second consume (unit test)
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_magic_link_token_is_single_use():
|
|
"""Replaying a magic-link token within the validity window must fail
|
|
at the function level (no Flask request needed). Complements the
|
|
integration coverage in test_magic_link_consume_logs_in_user_with_valid_token."""
|
|
with app.app_context():
|
|
from src.auth.magic_link import (
|
|
generate_magic_link_token,
|
|
consume_magic_link_token,
|
|
)
|
|
token = generate_magic_link_token(424242)
|
|
first = consume_magic_link_token(token)
|
|
assert first == 424242, f'first consume should return user_id, got {first}'
|
|
second = consume_magic_link_token(token)
|
|
assert second is None, f'replay should return None, got {second}'
|
|
third = consume_magic_link_token(token)
|
|
assert third is None, f'second replay should also return None, got {third}'
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
# 20. I4 — finish-signup expires stale OAuth signup sessions
|
|
# ----------------------------------------------------------------------
|
|
|
|
def test_finish_signup_expires_stale_oauth_session():
|
|
"""Session blob older than 15 min triggers a graceful expiry redirect."""
|
|
import time as _time
|
|
with app.app_context():
|
|
_disable_csrf()
|
|
db.create_all()
|
|
try:
|
|
with app.test_client() as client:
|
|
with client.session_transaction() as sess:
|
|
sess['oauth_signup_pending'] = {
|
|
'provider': 'google',
|
|
'subject': 'stale-sub-001',
|
|
'userinfo': {
|
|
'email': 'stale@example.qc.ca',
|
|
'name': 'Stale User',
|
|
'given_name': 'Stale',
|
|
'family_name': 'User',
|
|
},
|
|
# 16 minutes ago — past the 15-min expiry
|
|
'created_at': _time.time() - 16 * 60,
|
|
}
|
|
resp = client.get('/auth/oauth/finish-signup')
|
|
assert resp.status_code == 302
|
|
assert '/login' in resp.headers['Location']
|
|
with client.session_transaction() as sess:
|
|
assert 'oauth_signup_pending' not in sess
|
|
flashes = sess.get('_flashes', [])
|
|
assert any(
|
|
'expir' in msg.lower() or 'recommencez' in msg.lower()
|
|
for _cat, msg in flashes
|
|
), f'expected expiry flash, got {flashes}'
|
|
# No User created
|
|
assert User.query.filter_by(email='stale@example.qc.ca').first() is None
|
|
finally:
|
|
db.session.rollback()
|
|
db.drop_all()
|