diff --git a/src/api/auth.py b/src/api/auth.py index 0503a5d..e62887a 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -388,6 +388,15 @@ def login(): action='verification_required', show_resend=True) + # B-2.5: TOTP gate — defer login until 2nd factor verified + if user.totp_enabled: + session['pending_totp_user_id'] = user.id + session['pending_totp_remember'] = bool(form.remember.data) + next_page = request.args.get('next') + if next_page and is_safe_url(next_page): + session['pending_totp_next'] = next_page + return redirect(url_for('auth.totp_verify_login')) + login_user(user, remember=form.remember.data) audit_login(user.id) next_page = request.args.get('next') @@ -838,6 +847,192 @@ def magic_link_consume(token): return redirect(url_for('recordings.index')) +# --- B-2.5: TOTP MFA routes --- + + +class TotpVerifyForm(FlaskForm): + """Form definition kept for CSRF coverage / future template binding. + + Routes parse request.form directly (no WTForms validators) because the + same endpoint accepts EITHER `code` (6 digits) OR `recovery_code` + (XXXXX-XXXXX), and WTForms would reject one when the other is sent. + """ + code = StringField('Code à 6 chiffres') + recovery_code = StringField('Code de récupération') + submit = SubmitField('Vérifier') + + +@auth_bp.route('/2fa/setup', methods=['GET', 'POST']) +@login_required +@rate_limit("10 per minute") +def totp_setup(): + """Enroll the current user in TOTP MFA (two-step flow). + + GET: generate a fresh secret + QR + 10 recovery codes; cache the secret + and recovery-code hashes in the session (NOT yet persisted). + POST: verify the user-submitted 6-digit code against the pending secret; + on success, persist the encrypted secret + recovery code hashes and + flip ``totp_enabled`` to True. + + Already-enrolled users are redirected to /account with an info flash so + that this endpoint never silently rotates a working enrollment. + """ + from src.auth.totp import ( + generate_totp_secret, build_provisioning_uri, render_qr_data_url, + verify_totp_code, generate_recovery_codes, set_user_totp, + ) + + if current_user.totp_enabled: + flash('La double authentification est déjà activée pour votre compte.', 'info') + return redirect(url_for('auth.account')) + + if request.method == 'POST': + pending_secret = session.get('totp_pending_secret') + pending_recovery_hashes = session.get('totp_pending_recovery_hashes') + pending_display_codes = session.get('totp_pending_display_codes', []) + if not pending_secret or not pending_recovery_hashes: + flash('Session expirée. Veuillez recommencer la configuration.', 'warning') + return redirect(url_for('auth.totp_setup')) + + code = (request.form.get('code') or '').strip() + if not verify_totp_code(pending_secret, code): + qr = render_qr_data_url( + build_provisioning_uri(pending_secret, current_user.email) + ) + return render_template( + 'auth/totp_setup.html', + title='Configurer la double authentification', + qr_data_url=qr, + secret=pending_secret, + recovery_codes=pending_display_codes, + error="Code invalide. Vérifiez l'horloge de votre appareil et réessayez.", + ), 400 + + # Persist + set_user_totp(current_user, pending_secret, pending_recovery_hashes) + # Clear pending session (one-shot enrollment) + for k in ( + 'totp_pending_secret', + 'totp_pending_recovery_hashes', + 'totp_pending_display_codes', + ): + session.pop(k, None) + flash( + 'Double authentification activée. Conservez vos codes de récupération en lieu sûr.', + 'success', + ) + return redirect(url_for('auth.account')) + + # GET: fresh enrollment — generate secret + recovery codes, cache in session + secret = generate_totp_secret() + display_codes, hashed_codes = generate_recovery_codes() + session['totp_pending_secret'] = secret + session['totp_pending_recovery_hashes'] = hashed_codes + session['totp_pending_display_codes'] = display_codes + qr = render_qr_data_url(build_provisioning_uri(secret, current_user.email)) + return render_template( + 'auth/totp_setup.html', + title='Configurer la double authentification', + qr_data_url=qr, + secret=secret, + recovery_codes=display_codes, + ) + + +@auth_bp.route('/2fa/disable', methods=['POST']) +@login_required +@rate_limit("5 per minute") +def totp_disable(): + """Disable TOTP MFA. Requires re-authentication via current password.""" + from src.auth.totp import disable_user_totp + if not current_user.totp_enabled: + flash("La double authentification n'est pas activée.", 'info') + return redirect(url_for('auth.account')) + password = request.form.get('password', '') + if ( + not current_user.password + or not bcrypt.check_password_hash(current_user.password, password) + ): + flash('Mot de passe incorrect. La double authentification reste activée.', 'danger') + return redirect(url_for('auth.account')) + disable_user_totp(current_user) + flash('Double authentification désactivée.', 'success') + return redirect(url_for('auth.account')) + + +@auth_bp.route('/2fa/verify', methods=['GET', 'POST']) +@rate_limit("5 per minute") +def totp_verify_login(): + """Second factor during login. Reads pending_totp_user_id from session. + + Accepts EITHER a 6-digit TOTP code OR a single-use recovery code. On + success, finalises ``login_user`` (preserving the remember-me intent + captured at the password step) and redirects to the next URL or the + recordings index. + """ + from src.auth.totp import ( + verify_totp_code, get_user_totp_secret, consume_recovery_code, + ) + + pending_user_id = session.get('pending_totp_user_id') + pending_remember = bool(session.get('pending_totp_remember', False)) + if not pending_user_id: + flash('Session de connexion expirée. Veuillez vous reconnecter.', 'warning') + return redirect(url_for('auth.login')) + + user = db.session.get(User, pending_user_id) + if not user or not user.totp_enabled: + # Defensive: shouldn't happen if login flow is correct + for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'): + session.pop(k, None) + flash('Erreur de session. Veuillez vous reconnecter.', 'danger') + return redirect(url_for('auth.login')) + + recovery_remaining = len(user.totp_recovery_codes or []) + + if request.method == 'POST': + code = (request.form.get('code') or '').strip() + recovery = (request.form.get('recovery_code') or '').strip() + + success = False + if code: + secret = get_user_totp_secret(user) + if secret and verify_totp_code(secret, code): + success = True + elif recovery: + success = consume_recovery_code(user, recovery) + + if success: + next_page = session.pop('pending_totp_next', None) + session.pop('pending_totp_user_id', None) + session.pop('pending_totp_remember', None) + login_user(user, remember=pending_remember) + audit_login(user.id) + if next_page and is_safe_url(next_page): + return redirect(next_page) + return redirect(url_for('recordings.index')) + + _user_hash = hashlib.sha256(str(user.id).encode()).hexdigest()[:16] + audit_failed_login( + details={'user_id_hash': _user_hash, 'reason': 'totp_invalid'} + ) + return render_template( + 'auth/totp_verify.html', + title='Vérification 2FA', + recovery_codes_remaining=len(user.totp_recovery_codes or []), + error=( + "Code invalide. Vérifiez votre application authenticator ou " + "utilisez un code de récupération." + ), + ), 400 + + return render_template( + 'auth/totp_verify.html', + title='Vérification 2FA', + recovery_codes_remaining=recovery_remaining, + ) + + @auth_bp.route('/logout') @csrf_exempt def logout(): diff --git a/src/auth/totp.py b/src/auth/totp.py new file mode 100644 index 0000000..b9b81c8 --- /dev/null +++ b/src/auth/totp.py @@ -0,0 +1,184 @@ +"""TOTP MFA service layer (B-2.5). + +Encrypts the base32 TOTP secret with Fernet (SECRET_KEY-derived key) before +DB persistence. NEVER store the raw base32 secret in the database. + +Recovery codes: 10 single-use base32 codes (10 chars each, hyphenated for +readability) generated at TOTP enrollment, displayed ONCE to the user, stored +as bcrypt hashes in User.totp_recovery_codes (JSON list). Each successful +recovery-code login removes that hash from the list. +""" +import base64 +import hashlib +import secrets +from typing import List, Optional, Tuple + +import pyotp +import qrcode +from cryptography.fernet import Fernet, InvalidToken +from flask import current_app + +# 10 single-use recovery codes per enrollment +RECOVERY_CODES_COUNT = 10 +RECOVERY_CODE_LENGTH = 10 # base32 chars per code, formatted as XXXXX-XXXXX + + +def _fernet() -> Fernet: + """Derive a Fernet key from app SECRET_KEY (deterministic, single-key). + + Uses SHA-256 of SECRET_KEY → urlsafe-base64 (32 bytes) so the same + SECRET_KEY always produces the same Fernet key. Single-key design (no + rotation): rotating SECRET_KEY invalidates ALL stored TOTP secrets, + forcing every user to re-enroll. Acceptable for MVP; revisit when we + have key-rotation infra. + """ + secret_key = current_app.config.get('SECRET_KEY') + if not secret_key: + raise RuntimeError('SECRET_KEY must be configured to use TOTP encryption') + if isinstance(secret_key, str): + secret_key = secret_key.encode('utf-8') + derived = hashlib.sha256(secret_key).digest() + return Fernet(base64.urlsafe_b64encode(derived)) + + +def encrypt_totp_secret(plaintext_base32: str) -> str: + """Encrypt a base32 TOTP secret. Returns the Fernet token as a string.""" + if not plaintext_base32 or not isinstance(plaintext_base32, str): + raise ValueError('encrypt_totp_secret requires a non-empty base32 string') + return _fernet().encrypt(plaintext_base32.encode('ascii')).decode('ascii') + + +def decrypt_totp_secret(ciphertext: str) -> str: + """Decrypt a Fernet-encrypted TOTP secret. Returns the base32 string. + + Raises ValueError on bad token (key mismatch, tampered, malformed). + """ + if not ciphertext: + raise ValueError('decrypt_totp_secret requires a non-empty ciphertext') + try: + return _fernet().decrypt(ciphertext.encode('ascii')).decode('ascii') + except InvalidToken as e: + raise ValueError(f'Invalid TOTP ciphertext (key mismatch?): {e}') from e + + +def generate_totp_secret() -> str: + """Generate a fresh base32 TOTP secret (160-bit, RFC 6238 recommended).""" + return pyotp.random_base32() + + +def build_provisioning_uri(secret_base32: str, account_email: str) -> str: + """Return the otpauth:// URI for QR encoding (RFC 6238).""" + return pyotp.TOTP(secret_base32).provisioning_uri( + name=account_email, issuer_name='DictIA' + ) + + +def render_qr_data_url(provisioning_uri: str) -> str: + """Render the URI as a base64 PNG data URL for inline display in HTML.""" + import io + img = qrcode.make(provisioning_uri) + buf = io.BytesIO() + img.save(buf, format='PNG') + return 'data:image/png;base64,' + base64.b64encode(buf.getvalue()).decode('ascii') + + +def verify_totp_code(secret_base32: str, code: str) -> bool: + """Verify a 6-digit TOTP code with a 1-window tolerance (current ±30s). + + Rejects non-digit / wrong-length input early to avoid leaking timing. + """ + if not code or not isinstance(code, str): + return False + code = code.strip() + if len(code) != 6 or not code.isdigit(): + return False + return pyotp.TOTP(secret_base32).verify(code, valid_window=1) + + +def _get_bcrypt(): + """Resolve the Flask-Bcrypt extension instance (handles missing context).""" + bcrypt = ( + current_app.extensions.get('flask-bcrypt') + or current_app.extensions.get('bcrypt') + ) + if bcrypt is None: + # Fall back to the global bcrypt initialised by init_auth_extensions() + from src.api.auth import bcrypt as _b + bcrypt = _b + if bcrypt is None: + raise RuntimeError('Flask-Bcrypt extension is not initialised') + return bcrypt + + +def generate_recovery_codes() -> Tuple[List[str], List[str]]: + """Generate 10 fresh recovery codes. + + Returns (display_codes, hashed_codes_for_storage): + - display_codes: human-readable XXXXX-XXXXX format, shown to user ONCE + - hashed_codes_for_storage: bcrypt-style hashes — store these in + User.totp_recovery_codes (JSON list). + """ + bcrypt = _get_bcrypt() + display_codes: List[str] = [] + hashed_codes: List[str] = [] + for _ in range(RECOVERY_CODES_COUNT): + # 5+5 base32 chars hyphenated for readability (XXXXX-XXXXX) + raw = base64.b32encode(secrets.token_bytes(7)).decode('ascii')[:RECOVERY_CODE_LENGTH] + display = f'{raw[:5]}-{raw[5:]}' + display_codes.append(display) + # Hash the hyphenated form (what the user will type back) with bcrypt + hashed = bcrypt.generate_password_hash(display).decode('ascii') + hashed_codes.append(hashed) + return display_codes, hashed_codes + + +def consume_recovery_code(user, candidate: str) -> bool: + """Check `candidate` against the user's stored recovery code hashes. + + On match: removes that hash from the list and commits. Returns True. + On mismatch: returns False, no DB write. Single-use: a code that + matched once will not match again. + """ + from src.database import db + + bcrypt = _get_bcrypt() + if not candidate or not user.totp_recovery_codes: + return False + candidate = candidate.strip().upper() + remaining: List[str] = [] + matched = False + for h in user.totp_recovery_codes: + if not matched and bcrypt.check_password_hash(h, candidate): + matched = True + # Drop this hash from the stored list (single-use) + continue + remaining.append(h) + if matched: + user.totp_recovery_codes = remaining + db.session.commit() + return matched + + +def set_user_totp(user, secret_base32: str, recovery_code_hashes: List[str]) -> None: + """Persist the encrypted secret + recovery codes; mark MFA enabled.""" + from src.database import db + user.totp_secret_encrypted = encrypt_totp_secret(secret_base32) + user.totp_recovery_codes = recovery_code_hashes + user.totp_enabled = True + db.session.commit() + + +def disable_user_totp(user) -> None: + """Disable MFA: clear encrypted secret, recovery codes, and the flag.""" + from src.database import db + user.totp_secret_encrypted = None + user.totp_recovery_codes = None + user.totp_enabled = False + db.session.commit() + + +def get_user_totp_secret(user) -> Optional[str]: + """Return the decrypted base32 secret, or None if MFA not enrolled.""" + if not user.totp_secret_encrypted: + return None + return decrypt_totp_secret(user.totp_secret_encrypted) diff --git a/src/init_db.py b/src/init_db.py index 6c0f5e9..19bc7f3 100644 --- a/src/init_db.py +++ b/src/init_db.py @@ -292,6 +292,9 @@ def initialize_database(app): app.logger.info("Added 'totp_enabled' column to user") if add_column_if_not_exists(engine, 'user', 'webauthn_credentials', 'JSON'): app.logger.info("Added webauthn_credentials column to user table") + # B-2.5: 10 single-use bcrypt-hashed recovery codes for TOTP MFA + if add_column_if_not_exists(engine, 'user', 'totp_recovery_codes', 'JSON'): + app.logger.info("Added totp_recovery_codes column to user table") if add_column_if_not_exists(engine, 'user', 'ordre_pro', 'VARCHAR(50)'): app.logger.info("Added ordre_pro column to user table") if add_column_if_not_exists(engine, 'user', 'cabinet', 'VARCHAR(255)'): diff --git a/src/models/user.py b/src/models/user.py index c9b6fa9..d8bc02b 100644 --- a/src/models/user.py +++ b/src/models/user.py @@ -81,6 +81,9 @@ class User(db.Model, UserMixin): # [{'id': str, 'public_key': str, 'sign_count': int, 'transports': list[str]}] webauthn_credentials = db.Column(db.JSON, nullable=True) + # B-2.5: 10 single-use recovery codes (bcrypt-hashed). Cleared when MFA disabled. + totp_recovery_codes = db.Column(db.JSON, nullable=True) + # Loi 25 + ordre professionnel context (used at signup B-2.2) ordre_pro = db.Column(db.String(50), nullable=True) # 'barreau', 'cpa', 'chad', etc. cabinet = db.Column(db.String(255), nullable=True) diff --git a/static/css/marketing.css b/static/css/marketing.css index 425b054..ee6b3f5 100644 --- a/static/css/marketing.css +++ b/static/css/marketing.css @@ -127,6 +127,7 @@ --tracking-tight: -0.025em; --tracking-wide: 0.025em; --tracking-wider: 0.05em; + --tracking-widest: 0.1em; --leading-snug: 1.375; --leading-relaxed: 1.625; --radius-md: 0.375rem; @@ -756,6 +757,9 @@ .h-32 { height: calc(var(--spacing) * 32); } + .h-48 { + height: calc(var(--spacing) * 48); + } .h-64 { height: calc(var(--spacing) * 64); } @@ -2358,6 +2362,10 @@ --tw-tracking: var(--tracking-wider); letter-spacing: var(--tracking-wider); } + .tracking-widest { + --tw-tracking: var(--tracking-widest); + letter-spacing: var(--tracking-widest); + } .break-words { overflow-wrap: break-word; } @@ -2436,6 +2444,12 @@ .text-amber-900 { color: var(--color-amber-900); } + .text-amber-900\/90 { + color: color-mix(in srgb, oklch(41.4% 0.112 45.904) 90%, transparent); + @supports (color: color-mix(in lab, red, red)) { + color: color-mix(in oklab, var(--color-amber-900) 90%, transparent); + } + } .text-blue-400 { color: var(--color-blue-400); } @@ -2454,6 +2468,9 @@ .text-blue-900 { color: var(--color-blue-900); } + .text-brand-b1 { + color: #0062ff; + } .text-brand-b3 { color: #00c896; } @@ -3255,6 +3272,13 @@ } } } + .hover\:bg-brand-navy2 { + &:hover { + @media (hover: hover) { + background-color: #0b1525; + } + } + } .hover\:bg-emerald-700 { &:hover { @media (hover: hover) { @@ -3541,6 +3565,13 @@ } } } + .hover\:text-brand-b1 { + &:hover { + @media (hover: hover) { + color: #0062ff; + } + } + } .hover\:text-brand-navy { &:hover { @media (hover: hover) { @@ -4163,6 +4194,11 @@ min-height: 12rem; } } + .md\:w-48 { + @media (width >= 48rem) { + width: calc(var(--spacing) * 48); + } + } .md\:grid-cols-2 { @media (width >= 48rem) { grid-template-columns: repeat(2, minmax(0, 1fr)); diff --git a/templates/auth/totp_setup.html b/templates/auth/totp_setup.html new file mode 100644 index 0000000..a5ffcb6 --- /dev/null +++ b/templates/auth/totp_setup.html @@ -0,0 +1,93 @@ +{% extends 'marketing/base.html' %} + +{% block title %}Configurer la double authentification — DictIA{% endblock %} +{% block description %}Activez la double authentification (TOTP) sur votre compte DictIA pour protéger vos données conformément aux exigences Loi 25.{% endblock %} + +{% block content %} +
+
+

Configurer la double authentification

+

{{ "La double authentification (2FA) ajoute une seconde étape lors de la connexion, en plus de votre mot de passe. Une exigence forte recommandée pour les comptes traitant des données confidentielles (Loi 25)." | safe }}

+ + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} + + {% endfor %} + {% endif %} + {% endwith %} + + {% if error %} + + {% endif %} + +
    +
  1. +

    1. Installez une application d'authentification

    +

    Sur votre téléphone, installez par exemple Google Authenticator, Microsoft Authenticator, Authy ou 1Password.

    +
  2. + +
  3. +

    2. Scannez le code QR

    +
    +
    + Code QR pour configurer DictIA dans votre application authenticator +
    +
    +

    Pointez l'appareil photo de votre application authenticator vers ce code QR.

    +

    Vous ne pouvez pas scanner ?
    Saisissez la clé manuellement :

    + {{ secret }} +
    +
    +
  4. + +
  5. +

    3. Conservez vos codes de récupération

    + +
    {% for c in recovery_codes %}{{ c }}
    +{% endfor %}
    + +
  6. + +
  7. +

    4. Confirmez avec un code à 6 chiffres

    +

    Entrez le code à 6 chiffres affiché actuellement dans votre application authenticator pour valider l'installation.

    + +
    + + +
    + + +
    + + +
    +
  8. +
+ +

+ ← Annuler et retourner au compte +

+
+
+{% endblock %} diff --git a/templates/auth/totp_verify.html b/templates/auth/totp_verify.html new file mode 100644 index 0000000..1bde85b --- /dev/null +++ b/templates/auth/totp_verify.html @@ -0,0 +1,75 @@ +{% extends 'marketing/base.html' %} + +{% block title %}Vérification 2FA — DictIA{% endblock %} +{% block description %}Saisissez votre code à 6 chiffres pour terminer la connexion à votre compte DictIA.{% endblock %} + +{% block content %} +
+
+

Vérification en deux étapes

+

Entrez le code à 6 chiffres affiché dans votre application authenticator pour terminer la connexion.

+ + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} + + {% endfor %} + {% endif %} + {% endwith %} + + {% if error %} + + {% endif %} + + {# Primary path: 6-digit TOTP code #} +
+ + +
+ + +
+ + +
+ + {# Secondary path: recovery code (collapsed by default for clarity) #} +
+ + Pas accès à votre application authenticator ? Utiliser un code de récupération + +
+ +
+ + +

Format : 5 caractères + tiret + 5 caractères. Chaque code est à usage unique.

+
+ +

{{ recovery_codes_remaining }} code{{ 's' if recovery_codes_remaining != 1 else '' }} de récupération restant{{ 's' if recovery_codes_remaining != 1 else '' }}.

+
+
+ +

+ Annuler la connexion +

+
+
+{% endblock %} diff --git a/tests/_run_totp_mfa_windows.py b/tests/_run_totp_mfa_windows.py new file mode 100644 index 0000000..776b234 --- /dev/null +++ b/tests/_run_totp_mfa_windows.py @@ -0,0 +1,77 @@ +"""Windows manual driver for tests/test_totp_mfa.py. + +src/init_db.py imports `fcntl`, which is POSIX-only. On Windows we stub it +before src.app gets imported, then run each test_* function and report. + +Run from the repo root: + py -3 tests/_run_totp_mfa_windows.py + +This script is local-dev only (not picked up by pytest collection). +""" +import os +import sys +import types +import traceback + +# 1) Stub fcntl BEFORE any import of src.* happens. +if 'fcntl' not in sys.modules: + fcntl_stub = types.ModuleType('fcntl') + fcntl_stub.LOCK_EX = 2 + fcntl_stub.LOCK_NB = 4 + fcntl_stub.LOCK_UN = 8 + fcntl_stub.LOCK_SH = 1 + fcntl_stub.flock = lambda *_args, **_kw: None + fcntl_stub.fcntl = lambda *_args, **_kw: 0 + sys.modules['fcntl'] = fcntl_stub + +# 2) Make repo root importable +HERE = os.path.dirname(os.path.abspath(__file__)) +REPO = os.path.dirname(HERE) +sys.path.insert(0, REPO) + +# 3) Set test config +os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') +os.environ.setdefault('SECRET_KEY', 'test-secret-key-totp') +os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('TRANSCRIPTION_BASE_URL', 'http://test-stub') +os.environ.setdefault('TRANSCRIPTION_API_KEY', 'test-stub') +os.environ.setdefault('RATELIMIT_ENABLED', 'false') +# Force UTF-8 stdout so src.app's emoji prints don't crash on cp1252 Windows. +try: + sys.stdout.reconfigure(encoding='utf-8', errors='replace') + sys.stderr.reconfigure(encoding='utf-8', errors='replace') +except Exception: + pass + +# 4) Import the test module and run every test_* function it defines +import importlib.util # noqa: E402 +spec = importlib.util.spec_from_file_location( + 'test_totp_mfa', + os.path.join(HERE, 'test_totp_mfa.py'), +) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + +tests = [(name, fn) for name, fn in vars(mod).items() + if name.startswith('test_') and callable(fn)] + +passed = 0 +failed = [] +for name, fn in tests: + try: + fn() + print(f' PASS {name}') + passed += 1 + except Exception as e: # noqa: BLE001 + print(f' FAIL {name}: {type(e).__name__}: {e}') + failed.append((name, traceback.format_exc())) + +total = len(tests) +print() +print(f'Result: {passed}/{total} passed, {len(failed)} failed') +if failed: + print('\n--- Failures ---\n') + for name, tb in failed: + print(f'### {name}\n{tb}\n') +sys.exit(0 if not failed else 1) diff --git a/tests/test_totp_mfa.py b/tests/test_totp_mfa.py new file mode 100644 index 0000000..d005675 --- /dev/null +++ b/tests/test_totp_mfa.py @@ -0,0 +1,542 @@ +"""Tests for B-2.5 — TOTP MFA + recovery codes. + +Covers: + - Encrypt/decrypt round-trip (Fernet derived from SECRET_KEY). + - Decrypt rejects ciphertext from a different SECRET_KEY. + - generate_totp_secret produces a valid base32 secret (RFC 6238). + - verify_totp_code accepts current window, rejects wrong / non-digit codes. + - generate_recovery_codes returns 10 display + 10 hashes that map 1:1. + - consume_recovery_code is single-use; rejects unknown. + - set_user_totp persists encrypted (not raw) secret + flips flag. + - disable_user_totp clears all 3 fields. + - Login route gates on totp_enabled; bypasses gate when disabled. + - /2fa/verify accepts correct TOTP code, rejects wrong, accepts recovery. + - /2fa/verify redirects to /login when no pending session. + - /2fa/setup creates pending session on GET, persists on valid POST, + keeps pending on invalid POST. + - /2fa/disable requires correct password. + +Note: pytest cannot collect this file on Windows native because src/init_db.py +imports `fcntl` (POSIX-only). Use tests/_run_totp_mfa_windows.py. +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') +os.environ.setdefault('SECRET_KEY', 'test-secret-key-totp') +os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('RATELIMIT_ENABLED', 'false') + +import pyotp # noqa: E402 + +from src.app import app, db, bcrypt # noqa: E402 +from src.models.user import User # noqa: E402 +from src.auth import totp as totp_module # noqa: E402 + + +def _disable_csrf(): + app.config['WTF_CSRF_ENABLED'] = False + + +def _make_user(email='totp@example.qc.ca', password='Password!123', totp_enabled=False, + username=None, totp_secret_encrypted=None, totp_recovery_codes=None): + """Create a User row with bcrypt-hashed password. Returns the User.""" + hashed = bcrypt.generate_password_hash(password).decode('utf-8') + u = User( + username=username or email.split('@', 1)[0][:20], + email=email, + password=hashed, + email_verified=True, + totp_enabled=totp_enabled, + totp_secret_encrypted=totp_secret_encrypted, + totp_recovery_codes=totp_recovery_codes, + ) + db.session.add(u) + db.session.commit() + return u + + +# ---------------------------------------------------------------------- +# 1. Service layer — encryption round-trip +# ---------------------------------------------------------------------- + +def test_encrypt_decrypt_round_trips(): + """encrypt_totp_secret → decrypt_totp_secret returns the original base32.""" + with app.app_context(): + secret = pyotp.random_base32() + ct = totp_module.encrypt_totp_secret(secret) + assert ct != secret # ciphertext is NOT the raw secret + assert totp_module.decrypt_totp_secret(ct) == secret + + +def test_decrypt_with_different_secret_key_fails(): + """Fernet rejects a token encrypted with a different SECRET_KEY.""" + with app.app_context(): + secret = pyotp.random_base32() + ct = totp_module.encrypt_totp_secret(secret) + # Swap SECRET_KEY and confirm decryption fails + original_key = app.config['SECRET_KEY'] + try: + app.config['SECRET_KEY'] = 'totally-different-secret-key' + with app.app_context(): + try: + totp_module.decrypt_totp_secret(ct) + raise AssertionError('Expected ValueError on key mismatch') + except ValueError: + pass + finally: + app.config['SECRET_KEY'] = original_key + + +def test_generate_totp_secret_is_valid_base32_160_bit(): + """generate_totp_secret returns a base32 string accepted by pyotp.TOTP.""" + secret = totp_module.generate_totp_secret() + assert isinstance(secret, str) + assert len(secret) >= 16 # pyotp default is 32 chars (160 bits) + # If invalid, .now() would raise + code = pyotp.TOTP(secret).now() + assert len(code) == 6 + assert code.isdigit() + + +# ---------------------------------------------------------------------- +# 2. Service layer — verify_totp_code +# ---------------------------------------------------------------------- + +def test_verify_totp_code_accepts_current_window(): + """A code generated by pyotp.TOTP(secret).now() is accepted.""" + with app.app_context(): + secret = totp_module.generate_totp_secret() + code = pyotp.TOTP(secret).now() + assert totp_module.verify_totp_code(secret, code) is True + + +def test_verify_totp_code_rejects_wrong_code(): + """The literal '000000' is (almost certainly) not the current code.""" + with app.app_context(): + secret = totp_module.generate_totp_secret() + current = pyotp.TOTP(secret).now() + wrong = '000000' if current != '000000' else '111111' + assert totp_module.verify_totp_code(secret, wrong) is False + + +def test_verify_totp_code_rejects_non_digit(): + """Non-digit input is rejected before reaching pyotp.""" + secret = totp_module.generate_totp_secret() + assert totp_module.verify_totp_code(secret, 'abcdef') is False + assert totp_module.verify_totp_code(secret, '') is False + assert totp_module.verify_totp_code(secret, '12345') is False # too short + assert totp_module.verify_totp_code(secret, '1234567') is False # too long + + +# ---------------------------------------------------------------------- +# 3. Service layer — recovery codes +# ---------------------------------------------------------------------- + +def test_generate_recovery_codes_returns_10_pairs(): + """generate_recovery_codes returns 10 displays + 10 hashes that map 1:1.""" + with app.app_context(): + display, hashed = totp_module.generate_recovery_codes() + assert len(display) == 10 + assert len(hashed) == 10 + # Each display follows XXXXX-XXXXX + for d in display: + assert len(d) == 11 + assert d[5] == '-' + # Each display matches its hash; mismatches don't match + for d, h in zip(display, hashed): + assert bcrypt.check_password_hash(h, d) is True + # And a wrong code never matches + assert bcrypt.check_password_hash(hashed[0], 'WRONG-CODE0') is False + + +def test_consume_recovery_code_succeeds_once(): + """Consuming a code twice: 2nd attempt fails; list shrinks by 1.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='consume1@example.qc.ca') + display, hashed = totp_module.generate_recovery_codes() + user.totp_recovery_codes = hashed + user.totp_enabled = True + db.session.commit() + + assert len(user.totp_recovery_codes) == 10 + assert totp_module.consume_recovery_code(user, display[0]) is True + assert len(user.totp_recovery_codes) == 9 + # Second attempt with the same code: rejected (single-use) + assert totp_module.consume_recovery_code(user, display[0]) is False + assert len(user.totp_recovery_codes) == 9 + finally: + db.session.rollback() + db.drop_all() + + +def test_consume_recovery_code_rejects_unknown(): + """Submitting a code that isn't in the list returns False; no DB write.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='consume2@example.qc.ca') + _, hashed = totp_module.generate_recovery_codes() + user.totp_recovery_codes = hashed + user.totp_enabled = True + db.session.commit() + + assert totp_module.consume_recovery_code(user, 'NOPE1-NOPE2') is False + assert len(user.totp_recovery_codes) == 10 + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 4. Service layer — set / disable user TOTP +# ---------------------------------------------------------------------- + +def test_set_user_totp_persists_encrypted_not_raw(): + """set_user_totp encrypts secret, stores hashes, flips totp_enabled True.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='setuser@example.qc.ca') + secret = totp_module.generate_totp_secret() + display, hashed = totp_module.generate_recovery_codes() + totp_module.set_user_totp(user, secret, hashed) + + db.session.refresh(user) + assert user.totp_enabled is True + assert user.totp_secret_encrypted is not None + assert user.totp_secret_encrypted != secret # encrypted, not raw + assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret + assert len(user.totp_recovery_codes) == 10 + finally: + db.session.rollback() + db.drop_all() + + +def test_disable_user_totp_clears_all_fields(): + """disable_user_totp nullifies the 3 TOTP-related fields.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='disable@example.qc.ca') + secret = totp_module.generate_totp_secret() + _, hashed = totp_module.generate_recovery_codes() + totp_module.set_user_totp(user, secret, hashed) + assert user.totp_enabled is True + + totp_module.disable_user_totp(user) + db.session.refresh(user) + assert user.totp_enabled is False + assert user.totp_secret_encrypted is None + assert user.totp_recovery_codes is None + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 5. Login route — TOTP gate +# ---------------------------------------------------------------------- + +def test_login_redirects_to_totp_verify_when_mfa_enabled(): + """Password OK + totp_enabled → 302 to /2fa/verify; pending_totp_user_id set.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + password = 'CorrectHorseBattery!42' + secret = totp_module.generate_totp_secret() + user = _make_user( + email='gate@example.qc.ca', password=password, + totp_enabled=True, + totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), + totp_recovery_codes=[], + ) + with app.test_client() as client: + resp = client.post('/login', data={ + 'email': 'gate@example.qc.ca', + 'password': password, + 'remember': 'y', + }) + assert resp.status_code == 302 + assert '/2fa/verify' in resp.headers['Location'] + with client.session_transaction() as sess: + assert sess.get('pending_totp_user_id') == user.id + assert sess.get('pending_totp_remember') is True + # NOT yet logged in + assert '_user_id' not in sess + finally: + db.session.rollback() + db.drop_all() + + +def test_login_logs_in_directly_when_mfa_disabled(): + """Password OK + totp_enabled=False → 302 to /; no pending_totp_user_id.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + password = 'CorrectHorseBattery!42' + user = _make_user( + email='nomfa@example.qc.ca', password=password, totp_enabled=False, + ) + with app.test_client() as client: + resp = client.post('/login', data={ + 'email': 'nomfa@example.qc.ca', + 'password': password, + }) + assert resp.status_code == 302 + # Logged in directly + with client.session_transaction() as sess: + assert 'pending_totp_user_id' not in sess + assert sess.get('_user_id') == str(user.id) + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 6. /2fa/verify — second factor route +# ---------------------------------------------------------------------- + +def test_totp_verify_login_logs_in_with_correct_code(): + """POST /2fa/verify with correct TOTP code logs the user in.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + secret = totp_module.generate_totp_secret() + user = _make_user( + email='verifyok@example.qc.ca', + totp_enabled=True, + totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), + totp_recovery_codes=[], + ) + code = pyotp.TOTP(secret).now() + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + sess['pending_totp_remember'] = False + resp = client.post('/2fa/verify', data={'code': code}) + assert resp.status_code == 302 + # Cleared pending + logged in + with client.session_transaction() as sess: + assert 'pending_totp_user_id' not in sess + assert 'pending_totp_remember' not in sess + assert sess.get('_user_id') == str(user.id) + finally: + db.session.rollback() + db.drop_all() + + +def test_totp_verify_login_rejects_wrong_code(): + """POST /2fa/verify with '000000' → 400; not logged in.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + secret = totp_module.generate_totp_secret() + user = _make_user( + email='verifybad@example.qc.ca', + totp_enabled=True, + totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), + totp_recovery_codes=[], + ) + current = pyotp.TOTP(secret).now() + wrong = '000000' if current != '000000' else '111111' + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + resp = client.post('/2fa/verify', data={'code': wrong}) + assert resp.status_code == 400 + with client.session_transaction() as sess: + assert '_user_id' not in sess + # Pending session preserved so user can retry + assert sess.get('pending_totp_user_id') == user.id + finally: + db.session.rollback() + db.drop_all() + + +def test_totp_verify_login_accepts_recovery_code(): + """POST /2fa/verify with a recovery code logs in + consumes the code.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + secret = totp_module.generate_totp_secret() + display, hashed = totp_module.generate_recovery_codes() + user = _make_user( + email='recovery@example.qc.ca', + totp_enabled=True, + totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), + totp_recovery_codes=hashed, + ) + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + resp = client.post('/2fa/verify', data={'recovery_code': display[0]}) + assert resp.status_code == 302 + with client.session_transaction() as sess: + assert sess.get('_user_id') == str(user.id) + # Recovery code consumed + db.session.refresh(user) + assert len(user.totp_recovery_codes) == 9 + finally: + db.session.rollback() + db.drop_all() + + +def test_totp_verify_login_redirects_to_login_without_pending_session(): + """GET /2fa/verify with no pending session → 302 to /login.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + with app.test_client() as client: + resp = client.get('/2fa/verify') + assert resp.status_code == 302 + assert '/login' in resp.headers['Location'] + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 7. /2fa/setup — enrollment route +# ---------------------------------------------------------------------- + +def test_totp_setup_get_creates_pending_session(): + """GET /2fa/setup as logged-in user shows QR and primes session.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='setupget@example.qc.ca') + with app.test_client() as client: + with client.session_transaction() as sess: + sess['_user_id'] = str(user.id) + sess['_fresh'] = True + resp = client.get('/2fa/setup') + assert resp.status_code == 200 + body = resp.data.decode('utf-8') + assert 'data:image/png;base64,' in body + with client.session_transaction() as sess: + assert sess.get('totp_pending_secret') is not None + assert len(sess.get('totp_pending_recovery_hashes')) == 10 + assert len(sess.get('totp_pending_display_codes')) == 10 + finally: + db.session.rollback() + db.drop_all() + + +def test_totp_setup_post_with_valid_code_enables_mfa(): + """POST with a code matching the pending secret enables MFA + clears session.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='setupok@example.qc.ca') + secret = totp_module.generate_totp_secret() + display, hashed = totp_module.generate_recovery_codes() + with app.test_client() as client: + with client.session_transaction() as sess: + sess['_user_id'] = str(user.id) + sess['_fresh'] = True + sess['totp_pending_secret'] = secret + sess['totp_pending_recovery_hashes'] = hashed + sess['totp_pending_display_codes'] = display + code = pyotp.TOTP(secret).now() + resp = client.post('/2fa/setup', data={'code': code}) + assert resp.status_code == 302 + # User enrolled + db.session.refresh(user) + assert user.totp_enabled is True + assert user.totp_secret_encrypted is not None + assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret + # Pending session cleared + with client.session_transaction() as sess: + assert 'totp_pending_secret' not in sess + assert 'totp_pending_recovery_hashes' not in sess + assert 'totp_pending_display_codes' not in sess + finally: + db.session.rollback() + db.drop_all() + + +def test_totp_setup_post_with_invalid_code_returns_400_keeps_pending(): + """POST with wrong code → 400; user not yet enabled; pending preserved.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='setupbad@example.qc.ca') + secret = totp_module.generate_totp_secret() + display, hashed = totp_module.generate_recovery_codes() + with app.test_client() as client: + with client.session_transaction() as sess: + sess['_user_id'] = str(user.id) + sess['_fresh'] = True + sess['totp_pending_secret'] = secret + sess['totp_pending_recovery_hashes'] = hashed + sess['totp_pending_display_codes'] = display + current = pyotp.TOTP(secret).now() + wrong = '000000' if current != '000000' else '111111' + resp = client.post('/2fa/setup', data={'code': wrong}) + assert resp.status_code == 400 + db.session.refresh(user) + assert user.totp_enabled is False + assert user.totp_secret_encrypted is None + # Pending session preserved so user can retry without scanning a new QR + with client.session_transaction() as sess: + assert sess.get('totp_pending_secret') == secret + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 8. /2fa/disable — password re-auth required +# ---------------------------------------------------------------------- + +def test_totp_disable_requires_password(): + """Wrong password → flash + still enabled. Correct password → disabled.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + password = 'CorrectHorseBattery!42' + secret = totp_module.generate_totp_secret() + _, hashed = totp_module.generate_recovery_codes() + user = _make_user( + email='disable@example.qc.ca', password=password, + totp_enabled=True, + totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), + totp_recovery_codes=hashed, + ) + with app.test_client() as client: + with client.session_transaction() as sess: + sess['_user_id'] = str(user.id) + sess['_fresh'] = True + + # Wrong password → still enabled + resp = client.post('/2fa/disable', data={'password': 'wrong-password'}) + assert resp.status_code == 302 # redirect to /account + db.session.refresh(user) + assert user.totp_enabled is True + assert user.totp_secret_encrypted is not None + + # Correct password → disabled + resp = client.post('/2fa/disable', data={'password': password}) + assert resp.status_code == 302 + db.session.refresh(user) + assert user.totp_enabled is False + assert user.totp_secret_encrypted is None + assert user.totp_recovery_codes is None + finally: + db.session.rollback() + db.drop_all()