"""Tests for B-2.4 — OAuth Microsoft 365 + Google + magic-link login. Covers: - is_oauth_provider_enabled() reads MS/GOOGLE env vars correctly. - /auth/oauth//login redirects to /login when provider disabled. - /auth/oauth//callback for new user → /auth/oauth/finish-signup with session['oauth_signup_pending'] populated. - /auth/oauth//callback for existing user (sso_subject) → logs in. - /auth/oauth//callback for existing user (email match) → links. - /auth/oauth/finish-signup requires CGU + confidentialité consents. - /auth/oauth/finish-signup creates User + 4 ConsentLog rows on success. - /auth/magic-link returns generic flash for unknown email (anti-enumeration). - /auth/magic-link sends email for known verified user. - /auth/magic-link skips unverified users silently (anti-enumeration). - /auth/magic-link/ logs in user with valid token. - /auth/magic-link/ rejects expired tokens. - /auth/magic-link/ rejects tampered tokens. - /auth/magic-link/ rejects unverified users. Note: pytest cannot collect this file on Windows native because src/init_db.py imports `fcntl` (POSIX-only). Use tests/_run_oauth_magic_link_windows.py. """ import os import sys from unittest.mock import patch, MagicMock sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') os.environ.setdefault('SECRET_KEY', 'test-secret-key-oauth') os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') # Pre-set OAuth env vars so init_oauth_providers registers clients at app boot. os.environ.setdefault('MS_CLIENT_ID', 'test-ms-client-id') os.environ.setdefault('MS_CLIENT_SECRET', 'test-ms-client-secret') os.environ.setdefault('GOOGLE_CLIENT_ID', 'test-google-client-id') os.environ.setdefault('GOOGLE_CLIENT_SECRET', 'test-google-client-secret') from src.app import app, db # noqa: E402 from src.models.user import User # noqa: E402 from src.models.consent import ConsentLog # noqa: E402 def _disable_csrf(): app.config['WTF_CSRF_ENABLED'] = False def _set_oauth_env(): """Ensure OAuth env vars are set (some tests clear them).""" os.environ['MS_CLIENT_ID'] = 'test-ms-client-id' os.environ['MS_CLIENT_SECRET'] = 'test-ms-client-secret' os.environ['GOOGLE_CLIENT_ID'] = 'test-google-client-id' os.environ['GOOGLE_CLIENT_SECRET'] = 'test-google-client-secret' def _clear_oauth_env(): for k in ('MS_CLIENT_ID', 'MS_CLIENT_SECRET', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET'): os.environ.pop(k, None) # ---------------------------------------------------------------------- # 1. is_oauth_provider_enabled — env var detection # ---------------------------------------------------------------------- def test_oauth_provider_enabled_when_env_vars_present(): """Microsoft is enabled when MS_CLIENT_ID + MS_CLIENT_SECRET are both set.""" _set_oauth_env() from src.auth.oauth_providers import is_oauth_provider_enabled assert is_oauth_provider_enabled('microsoft') is True assert is_oauth_provider_enabled('google') is True def test_oauth_provider_disabled_when_env_vars_missing(): """Provider is disabled if either CLIENT_ID or CLIENT_SECRET is missing.""" _clear_oauth_env() try: from src.auth.oauth_providers import is_oauth_provider_enabled assert is_oauth_provider_enabled('microsoft') is False assert is_oauth_provider_enabled('google') is False assert is_oauth_provider_enabled('unknown') is False finally: _set_oauth_env() # ---------------------------------------------------------------------- # 2. /auth/oauth//login — disabled provider # ---------------------------------------------------------------------- def test_oauth_login_redirects_when_provider_disabled(): """GET /auth/oauth/microsoft/login without env vars → 302 to /login + French flash.""" with app.app_context(): _disable_csrf() _clear_oauth_env() db.create_all() try: client = app.test_client() resp = client.get('/auth/oauth/microsoft/login', follow_redirects=False) assert resp.status_code == 302 assert '/login' in resp.headers['Location'] # Flash assertion — follow redirect and inspect body with client.session_transaction() as sess: flashes = sess.get('_flashes', []) assert any("n'est pas activée" in msg for _cat, msg in flashes), ( f'expected French flash about provider disabled, got: {flashes}' ) finally: db.session.rollback() db.drop_all() _set_oauth_env() # ---------------------------------------------------------------------- # 3. /auth/oauth//callback — new user → finish-signup # ---------------------------------------------------------------------- def test_oauth_callback_creates_session_for_new_user(): """Callback for a NEW user stores userinfo in session + redirects to finish-signup.""" with app.app_context(): _disable_csrf() _set_oauth_env() db.create_all() try: mock_token = { 'userinfo': { 'sub': 'ms-sub-new-123', 'email': 'newuser@example.qc.ca', 'name': 'New User', 'given_name': 'New', 'family_name': 'User', } } with patch('src.api.auth.get_oauth_client') as mock_get_client: client_mock = MagicMock() client_mock.authorize_access_token.return_value = mock_token mock_get_client.return_value = client_mock with app.test_client() as test_client: resp = test_client.get('/auth/oauth/microsoft/callback') assert resp.status_code == 302, f'Expected 302, got {resp.status_code} body={resp.data[:200]!r}' assert '/auth/oauth/finish-signup' in resp.headers['Location'] with test_client.session_transaction() as sess: assert 'oauth_signup_pending' in sess pending = sess['oauth_signup_pending'] assert pending['provider'] == 'microsoft' assert pending['subject'] == 'ms-sub-new-123' assert pending['userinfo']['email'] == 'newuser@example.qc.ca' assert pending['userinfo']['name'] == 'New User' # No User row created yet assert User.query.filter_by(email='newuser@example.qc.ca').first() is None finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 4. /auth/oauth//callback — existing user by sso_subject # ---------------------------------------------------------------------- def test_oauth_callback_logs_in_existing_user_by_subject(): """User with matching sso_subject is logged in directly without consent flow.""" with app.app_context(): _disable_csrf() _set_oauth_env() db.create_all() try: existing = User( username='existing1', email='existing@example.qc.ca', password=None, sso_provider='microsoft', sso_subject='ms-sub-existing-456', email_verified=True, ) db.session.add(existing) db.session.commit() mock_token = { 'userinfo': { 'sub': 'ms-sub-existing-456', 'email': 'existing@example.qc.ca', 'name': 'Existing User', } } with patch('src.api.auth.get_oauth_client') as mock_get_client: client_mock = MagicMock() client_mock.authorize_access_token.return_value = mock_token mock_get_client.return_value = client_mock with app.test_client() as test_client: resp = test_client.get('/auth/oauth/microsoft/callback') assert resp.status_code == 302 assert '/auth/oauth/finish-signup' not in resp.headers['Location'] with test_client.session_transaction() as sess: # User is logged in assert '_user_id' in sess assert sess['_user_id'] == str(existing.id) assert 'oauth_signup_pending' not in sess finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 5. /auth/oauth//callback — existing user by email # ---------------------------------------------------------------------- def test_oauth_callback_links_existing_user_by_email(): """User with matching email but no sso_subject gets linked + logged in.""" with app.app_context(): _disable_csrf() _set_oauth_env() db.create_all() try: existing = User( username='emaillink', email='emaillink@example.qc.ca', password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', email_verified=True, ) db.session.add(existing) db.session.commit() existing_id = existing.id mock_token = { 'userinfo': { 'sub': 'google-new-sub-789', 'email': 'emaillink@example.qc.ca', 'name': 'Email Link User', } } with patch('src.api.auth.get_oauth_client') as mock_get_client: client_mock = MagicMock() client_mock.authorize_access_token.return_value = mock_token mock_get_client.return_value = client_mock with app.test_client() as test_client: resp = test_client.get('/auth/oauth/google/callback') assert resp.status_code == 302 # Refresh to read updated columns refreshed = db.session.get(User, existing_id) assert refreshed.sso_subject == 'google-new-sub-789' assert refreshed.sso_provider == 'google' with test_client.session_transaction() as sess: assert sess.get('_user_id') == str(existing_id) finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 6. /auth/oauth/finish-signup — required consents # ---------------------------------------------------------------------- def test_finish_signup_requires_cgu_and_confidentialite(): """POST without CGU+confidentialite returns 400; no User created; session preserved.""" with app.app_context(): _disable_csrf() db.create_all() try: with app.test_client() as client: with client.session_transaction() as sess: sess['oauth_signup_pending'] = { 'provider': 'microsoft', 'subject': 'ms-sub-pending-001', 'userinfo': { 'email': 'pending@example.qc.ca', 'name': 'Pending User', 'given_name': 'Pending', 'family_name': 'User', }, } resp = client.post('/auth/oauth/finish-signup', data={ # No consents }) assert resp.status_code == 400 body = resp.data.decode('utf-8').lower() assert "conditions d'utilisation" in body or 'cgu' in body assert 'confidentialit' in body assert User.query.filter_by(email='pending@example.qc.ca').first() is None # Session still has the pending entry (so user can retry) with client.session_transaction() as sess: assert 'oauth_signup_pending' in sess finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 7. /auth/oauth/finish-signup — success path # ---------------------------------------------------------------------- def test_finish_signup_creates_user_and_4_consent_logs(): """POST with all consents → User created with 4 ConsentLog rows + login.""" with app.app_context(): _disable_csrf() db.create_all() try: with app.test_client() as client: with client.session_transaction() as sess: sess['oauth_signup_pending'] = { 'provider': 'google', 'subject': 'google-sub-success-002', 'userinfo': { 'email': 'success@example.qc.ca', 'name': 'Success User', 'given_name': 'Success', 'family_name': 'User', }, } resp = client.post('/auth/oauth/finish-signup', data={ 'consent_cgu': 'y', 'consent_confidentialite': 'y', 'consent_marketing': 'y', # analytics absent → recorded as False }, headers={'CF-Connecting-IP': '198.51.100.7', 'User-Agent': 'TestUA/1.0'}) assert resp.status_code == 302 user = User.query.filter_by(email='success@example.qc.ca').first() assert user is not None assert user.sso_provider == 'google' assert user.sso_subject == 'google-sub-success-002' assert user.password is None assert user.email_verified is True assert user.name == 'Success User' consents = ConsentLog.query.filter_by(user_id=user.id).all() assert len(consents) == 4 state = {c.consent_type: c.granted for c in consents} assert state == { 'cgu': True, 'confidentialite': True, 'marketing': True, 'analytics': False, } assert all(c.ip_address == '198.51.100.7' for c in consents) assert all(c.user_agent == 'TestUA/1.0' for c in consents) # Session cleaned + logged in with client.session_transaction() as sess: assert 'oauth_signup_pending' not in sess assert sess.get('_user_id') == str(user.id) finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 8. /auth/magic-link — generic flash for unknown email # ---------------------------------------------------------------------- def test_magic_link_request_returns_generic_message_for_unknown_email(): """Unknown email → generic flash; no email sent; no error leak.""" with app.app_context(): _disable_csrf() db.create_all() try: with patch('src.services.email._send_email') as mock_send: with app.test_client() as client: resp = client.post('/auth/magic-link', data={'email': 'doesnotexist@example.qc.ca'}) assert resp.status_code in (200, 302) assert mock_send.call_count == 0 body = resp.data.decode('utf-8').lower() # Generic French message — no leak assert 'doesnotexist@example.qc.ca' in body # display the email # No leak language like "introuvable" / "n'existe pas" assert 'introuvable' not in body assert "n'existe pas" not in body finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 9. /auth/magic-link — sends email for known verified user # ---------------------------------------------------------------------- def test_magic_link_request_sends_email_for_known_verified_user(): """Known + verified user triggers _send_email exactly once with the magic URL.""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='magicverified', email='magic@example.qc.ca', password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', email_verified=True, ) db.session.add(user) db.session.commit() # Force SMTP-configured branch for the test (both the route gate AND # the inner gate inside src.services.email.send_magic_link_email). with patch('src.api.auth.is_smtp_configured', return_value=True), \ patch('src.services.email.is_smtp_configured', return_value=True), \ patch('src.services.email._send_email') as mock_send: mock_send.return_value = True with app.test_client() as client: resp = client.post('/auth/magic-link', data={'email': 'magic@example.qc.ca'}) assert resp.status_code in (200, 302) assert mock_send.call_count == 1 # Email body should contain the magic URL args, _kwargs = mock_send.call_args # _send_email(to_email, subject, html_body, text_body) to_email, subject, html_body, text_body = args assert to_email == 'magic@example.qc.ca' assert 'DictIA' in subject assert '/auth/magic-link/' in html_body finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 10. /auth/magic-link — skips unverified users silently # ---------------------------------------------------------------------- def test_magic_link_request_skips_unverified_user_silently(): """Unverified user → email NOT sent, but flash STILL generic (anti-enumeration).""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='magicunverif', email='unverif@example.qc.ca', password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', email_verified=False, ) db.session.add(user) db.session.commit() with patch('src.api.auth.is_smtp_configured', return_value=True), \ patch('src.services.email._send_email') as mock_send: with app.test_client() as client: resp = client.post('/auth/magic-link', data={'email': 'unverif@example.qc.ca'}) assert resp.status_code in (200, 302) assert mock_send.call_count == 0 body = resp.data.decode('utf-8').lower() # No leak: same message as for unknown emails assert 'non vérifié' not in body assert 'unverified' not in body finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 11. /auth/magic-link/ — valid token logs in # ---------------------------------------------------------------------- def test_magic_link_consume_logs_in_user_with_valid_token(): """Valid token → user logged in + redirect to recordings.index.""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='consumeok', email='consumeok@example.qc.ca', password='$2b$12$abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMN', email_verified=True, ) db.session.add(user) db.session.commit() from src.auth.magic_link import generate_magic_link_token token = generate_magic_link_token(user.id) with app.test_client() as client: resp = client.get(f'/auth/magic-link/{token}', follow_redirects=False) assert resp.status_code == 302 assert '/auth/magic-link' not in resp.headers['Location'] with client.session_transaction() as sess: assert sess.get('_user_id') == str(user.id) finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 12. /auth/magic-link/ — expired token rejected # ---------------------------------------------------------------------- def test_magic_link_consume_rejects_expired_token(): """Expired token → invalid flash + redirect to /auth/magic-link.""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='expired', email='expired@example.qc.ca', password='x' * 60, email_verified=True, ) db.session.add(user) db.session.commit() from itsdangerous import SignatureExpired with patch('src.auth.magic_link._serializer') as mock_ser: inst = MagicMock() inst.loads.side_effect = SignatureExpired('expired') mock_ser.return_value = inst with app.test_client() as client: resp = client.get('/auth/magic-link/dummy-token', follow_redirects=False) assert resp.status_code == 302 assert '/auth/magic-link' in resp.headers['Location'] with client.session_transaction() as sess: flashes = sess.get('_flashes', []) assert any('invalide' in msg.lower() or 'expir' in msg.lower() for _cat, msg in flashes), f'expected expiry flash, got {flashes}' assert sess.get('_user_id') is None finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 13. /auth/magic-link/ — tampered token rejected # ---------------------------------------------------------------------- def test_magic_link_consume_rejects_invalid_signature(): """Garbage token → BadSignature → invalid flash + redirect.""" with app.app_context(): _disable_csrf() db.create_all() try: with app.test_client() as client: resp = client.get('/auth/magic-link/garbage.token.value', follow_redirects=False) assert resp.status_code == 302 assert '/auth/magic-link' in resp.headers['Location'] with client.session_transaction() as sess: flashes = sess.get('_flashes', []) assert any('invalide' in msg.lower() or 'expir' in msg.lower() for _cat, msg in flashes), f'expected invalid flash, got {flashes}' assert sess.get('_user_id') is None finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 14. /auth/magic-link/ — unverified user rejected # ---------------------------------------------------------------------- def test_magic_link_consume_rejects_unverified_user(): """Token for unverified user → invalid flash (no leak that token was valid).""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='unverifconsume', email='unverifconsume@example.qc.ca', password='x' * 60, email_verified=False, ) db.session.add(user) db.session.commit() from src.auth.magic_link import generate_magic_link_token token = generate_magic_link_token(user.id) with app.test_client() as client: resp = client.get(f'/auth/magic-link/{token}', follow_redirects=False) assert resp.status_code == 302 assert '/auth/magic-link' in resp.headers['Location'] with client.session_transaction() as sess: assert sess.get('_user_id') is None flashes = sess.get('_flashes', []) # Flash should look the same as the bad-signature path assert any('invalide' in msg.lower() or 'expir' in msg.lower() for _cat, msg in flashes) finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 15. send_magic_link_email helper produces French DictIA-branded email # ---------------------------------------------------------------------- def test_send_magic_link_email_french_branded(): """send_magic_link_email() produces French + DictIA-branded HTML body.""" with app.app_context(): _disable_csrf() db.create_all() try: user = User( username='emailtest', email='emailtest@example.qc.ca', password='x' * 60, name='Marie