feat(auth): B-2.5 TOTP MFA + recovery codes (Fernet-encrypted secret)

Adds TOTP-based two-factor authentication (RFC 6238) with 10 single-use
recovery codes. Secret is encrypted at rest with a Fernet key derived
deterministically from app SECRET_KEY (SHA-256 -> urlsafe-base64); the raw
base32 secret never lives in the database. Recovery codes are bcrypt-hashed
and consumed atomically (single-use, removed from the JSON list on match).

Routes:
- GET /2fa/setup: generate fresh secret + QR + 10 recovery codes; cache
  pending state in session, render auth/totp_setup.html with inline QR
  data URL and the 10 codes shown ONCE.
- POST /2fa/setup: verify the user-submitted 6-digit code against the
  pending secret; on success persist encrypted secret + hashes and flip
  totp_enabled=True. On invalid code re-render same QR (don't rotate),
  preserving the user's authenticator scan.
- GET /2fa/verify: second factor during login; reads pending_totp_user_id
  from session and renders auth/totp_verify.html (TOTP code input +
  collapsed recovery code form, with X codes restants notice).
- POST /2fa/verify: accepts EITHER a 6-digit TOTP code OR a recovery code;
  on success finalises login_user (preserving remember-me intent + next
  URL captured at the password step), audits success/failure.
- POST /2fa/disable: requires password re-auth; nullifies the 3 TOTP fields.

Login gate (src/api/auth.py /login): after password+email-verification
checks but BEFORE login_user, if user.totp_enabled set
session['pending_totp_user_id'] / pending_totp_remember /
pending_totp_next and 302 -> /2fa/verify. OAuth/SSO/magic-link paths are
intentionally NOT gated in B-2.5 (deferred — IdP handles its own MFA).

Schema:
- New JSON column User.totp_recovery_codes (nullable) added via
  add_column_if_not_exists in src/init_db.py (no Alembic, follows existing
  pattern).
- Re-uses B-2.1 columns totp_secret_encrypted (VARCHAR 255) and
  totp_enabled (BOOLEAN); both already migrated.

Compatibility audit overrides honoured:
- Service layer at src/auth/totp.py (NOT a new src/auth_extended/ pkg).
- Templates at templates/auth/totp_setup.html and templates/auth/totp_verify.html
  extending marketing/base.html with brand tokens + WCAG patterns
  (focus-visible, role=alert, aria-required, autocomplete=one-time-code,
  inputmode=numeric).
- account.html integration deferred to a polish task — admins access
  /2fa/setup directly for now.

Tests (21, all green via Windows manual driver):
- Service layer: encrypt/decrypt round-trip, key-mismatch rejection, secret
  validity, code verification (current/wrong/non-digit), recovery codes
  (10 pairs, 1:1 bcrypt mapping, single-use consumption, unknown rejection),
  set/disable user TOTP fields.
- Routes: login redirect-to-/2fa/verify when totp_enabled, direct login
  when disabled, /2fa/verify with correct/wrong TOTP, recovery code consume,
  redirect-to-login when no pending session, /2fa/setup GET creates pending,
  POST with valid code enables MFA, POST with invalid code keeps pending +
  returns 400, /2fa/disable wrong/correct password.

Regression check: prior 21 OAuth+magic-link, 16 email-service, and 9
signup-Loi-25 tests all still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Allison
2026-04-28 00:08:40 -04:00
parent 3a41bb482d
commit aa269c5bc0
9 changed files with 1208 additions and 0 deletions

View File

@@ -388,6 +388,15 @@ def login():
action='verification_required',
show_resend=True)
# B-2.5: TOTP gate — defer login until 2nd factor verified
if user.totp_enabled:
session['pending_totp_user_id'] = user.id
session['pending_totp_remember'] = bool(form.remember.data)
next_page = request.args.get('next')
if next_page and is_safe_url(next_page):
session['pending_totp_next'] = next_page
return redirect(url_for('auth.totp_verify_login'))
login_user(user, remember=form.remember.data)
audit_login(user.id)
next_page = request.args.get('next')
@@ -838,6 +847,192 @@ def magic_link_consume(token):
return redirect(url_for('recordings.index'))
# --- B-2.5: TOTP MFA routes ---
class TotpVerifyForm(FlaskForm):
"""Form definition kept for CSRF coverage / future template binding.
Routes parse request.form directly (no WTForms validators) because the
same endpoint accepts EITHER `code` (6 digits) OR `recovery_code`
(XXXXX-XXXXX), and WTForms would reject one when the other is sent.
"""
code = StringField('Code à 6 chiffres')
recovery_code = StringField('Code de récupération')
submit = SubmitField('Vérifier')
@auth_bp.route('/2fa/setup', methods=['GET', 'POST'])
@login_required
@rate_limit("10 per minute")
def totp_setup():
"""Enroll the current user in TOTP MFA (two-step flow).
GET: generate a fresh secret + QR + 10 recovery codes; cache the secret
and recovery-code hashes in the session (NOT yet persisted).
POST: verify the user-submitted 6-digit code against the pending secret;
on success, persist the encrypted secret + recovery code hashes and
flip ``totp_enabled`` to True.
Already-enrolled users are redirected to /account with an info flash so
that this endpoint never silently rotates a working enrollment.
"""
from src.auth.totp import (
generate_totp_secret, build_provisioning_uri, render_qr_data_url,
verify_totp_code, generate_recovery_codes, set_user_totp,
)
if current_user.totp_enabled:
flash('La double authentification est déjà activée pour votre compte.', 'info')
return redirect(url_for('auth.account'))
if request.method == 'POST':
pending_secret = session.get('totp_pending_secret')
pending_recovery_hashes = session.get('totp_pending_recovery_hashes')
pending_display_codes = session.get('totp_pending_display_codes', [])
if not pending_secret or not pending_recovery_hashes:
flash('Session expirée. Veuillez recommencer la configuration.', 'warning')
return redirect(url_for('auth.totp_setup'))
code = (request.form.get('code') or '').strip()
if not verify_totp_code(pending_secret, code):
qr = render_qr_data_url(
build_provisioning_uri(pending_secret, current_user.email)
)
return render_template(
'auth/totp_setup.html',
title='Configurer la double authentification',
qr_data_url=qr,
secret=pending_secret,
recovery_codes=pending_display_codes,
error="Code invalide. Vérifiez l'horloge de votre appareil et réessayez.",
), 400
# Persist
set_user_totp(current_user, pending_secret, pending_recovery_hashes)
# Clear pending session (one-shot enrollment)
for k in (
'totp_pending_secret',
'totp_pending_recovery_hashes',
'totp_pending_display_codes',
):
session.pop(k, None)
flash(
'Double authentification activée. Conservez vos codes de récupération en lieu sûr.',
'success',
)
return redirect(url_for('auth.account'))
# GET: fresh enrollment — generate secret + recovery codes, cache in session
secret = generate_totp_secret()
display_codes, hashed_codes = generate_recovery_codes()
session['totp_pending_secret'] = secret
session['totp_pending_recovery_hashes'] = hashed_codes
session['totp_pending_display_codes'] = display_codes
qr = render_qr_data_url(build_provisioning_uri(secret, current_user.email))
return render_template(
'auth/totp_setup.html',
title='Configurer la double authentification',
qr_data_url=qr,
secret=secret,
recovery_codes=display_codes,
)
@auth_bp.route('/2fa/disable', methods=['POST'])
@login_required
@rate_limit("5 per minute")
def totp_disable():
"""Disable TOTP MFA. Requires re-authentication via current password."""
from src.auth.totp import disable_user_totp
if not current_user.totp_enabled:
flash("La double authentification n'est pas activée.", 'info')
return redirect(url_for('auth.account'))
password = request.form.get('password', '')
if (
not current_user.password
or not bcrypt.check_password_hash(current_user.password, password)
):
flash('Mot de passe incorrect. La double authentification reste activée.', 'danger')
return redirect(url_for('auth.account'))
disable_user_totp(current_user)
flash('Double authentification désactivée.', 'success')
return redirect(url_for('auth.account'))
@auth_bp.route('/2fa/verify', methods=['GET', 'POST'])
@rate_limit("5 per minute")
def totp_verify_login():
"""Second factor during login. Reads pending_totp_user_id from session.
Accepts EITHER a 6-digit TOTP code OR a single-use recovery code. On
success, finalises ``login_user`` (preserving the remember-me intent
captured at the password step) and redirects to the next URL or the
recordings index.
"""
from src.auth.totp import (
verify_totp_code, get_user_totp_secret, consume_recovery_code,
)
pending_user_id = session.get('pending_totp_user_id')
pending_remember = bool(session.get('pending_totp_remember', False))
if not pending_user_id:
flash('Session de connexion expirée. Veuillez vous reconnecter.', 'warning')
return redirect(url_for('auth.login'))
user = db.session.get(User, pending_user_id)
if not user or not user.totp_enabled:
# Defensive: shouldn't happen if login flow is correct
for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'):
session.pop(k, None)
flash('Erreur de session. Veuillez vous reconnecter.', 'danger')
return redirect(url_for('auth.login'))
recovery_remaining = len(user.totp_recovery_codes or [])
if request.method == 'POST':
code = (request.form.get('code') or '').strip()
recovery = (request.form.get('recovery_code') or '').strip()
success = False
if code:
secret = get_user_totp_secret(user)
if secret and verify_totp_code(secret, code):
success = True
elif recovery:
success = consume_recovery_code(user, recovery)
if success:
next_page = session.pop('pending_totp_next', None)
session.pop('pending_totp_user_id', None)
session.pop('pending_totp_remember', None)
login_user(user, remember=pending_remember)
audit_login(user.id)
if next_page and is_safe_url(next_page):
return redirect(next_page)
return redirect(url_for('recordings.index'))
_user_hash = hashlib.sha256(str(user.id).encode()).hexdigest()[:16]
audit_failed_login(
details={'user_id_hash': _user_hash, 'reason': 'totp_invalid'}
)
return render_template(
'auth/totp_verify.html',
title='Vérification 2FA',
recovery_codes_remaining=len(user.totp_recovery_codes or []),
error=(
"Code invalide. Vérifiez votre application authenticator ou "
"utilisez un code de récupération."
),
), 400
return render_template(
'auth/totp_verify.html',
title='Vérification 2FA',
recovery_codes_remaining=recovery_remaining,
)
@auth_bp.route('/logout')
@csrf_exempt
def logout():

184
src/auth/totp.py Normal file
View File

@@ -0,0 +1,184 @@
"""TOTP MFA service layer (B-2.5).
Encrypts the base32 TOTP secret with Fernet (SECRET_KEY-derived key) before
DB persistence. NEVER store the raw base32 secret in the database.
Recovery codes: 10 single-use base32 codes (10 chars each, hyphenated for
readability) generated at TOTP enrollment, displayed ONCE to the user, stored
as bcrypt hashes in User.totp_recovery_codes (JSON list). Each successful
recovery-code login removes that hash from the list.
"""
import base64
import hashlib
import secrets
from typing import List, Optional, Tuple
import pyotp
import qrcode
from cryptography.fernet import Fernet, InvalidToken
from flask import current_app
# 10 single-use recovery codes per enrollment
RECOVERY_CODES_COUNT = 10
RECOVERY_CODE_LENGTH = 10 # base32 chars per code, formatted as XXXXX-XXXXX
def _fernet() -> Fernet:
"""Derive a Fernet key from app SECRET_KEY (deterministic, single-key).
Uses SHA-256 of SECRET_KEY → urlsafe-base64 (32 bytes) so the same
SECRET_KEY always produces the same Fernet key. Single-key design (no
rotation): rotating SECRET_KEY invalidates ALL stored TOTP secrets,
forcing every user to re-enroll. Acceptable for MVP; revisit when we
have key-rotation infra.
"""
secret_key = current_app.config.get('SECRET_KEY')
if not secret_key:
raise RuntimeError('SECRET_KEY must be configured to use TOTP encryption')
if isinstance(secret_key, str):
secret_key = secret_key.encode('utf-8')
derived = hashlib.sha256(secret_key).digest()
return Fernet(base64.urlsafe_b64encode(derived))
def encrypt_totp_secret(plaintext_base32: str) -> str:
"""Encrypt a base32 TOTP secret. Returns the Fernet token as a string."""
if not plaintext_base32 or not isinstance(plaintext_base32, str):
raise ValueError('encrypt_totp_secret requires a non-empty base32 string')
return _fernet().encrypt(plaintext_base32.encode('ascii')).decode('ascii')
def decrypt_totp_secret(ciphertext: str) -> str:
"""Decrypt a Fernet-encrypted TOTP secret. Returns the base32 string.
Raises ValueError on bad token (key mismatch, tampered, malformed).
"""
if not ciphertext:
raise ValueError('decrypt_totp_secret requires a non-empty ciphertext')
try:
return _fernet().decrypt(ciphertext.encode('ascii')).decode('ascii')
except InvalidToken as e:
raise ValueError(f'Invalid TOTP ciphertext (key mismatch?): {e}') from e
def generate_totp_secret() -> str:
"""Generate a fresh base32 TOTP secret (160-bit, RFC 6238 recommended)."""
return pyotp.random_base32()
def build_provisioning_uri(secret_base32: str, account_email: str) -> str:
"""Return the otpauth:// URI for QR encoding (RFC 6238)."""
return pyotp.TOTP(secret_base32).provisioning_uri(
name=account_email, issuer_name='DictIA'
)
def render_qr_data_url(provisioning_uri: str) -> str:
"""Render the URI as a base64 PNG data URL for inline display in HTML."""
import io
img = qrcode.make(provisioning_uri)
buf = io.BytesIO()
img.save(buf, format='PNG')
return 'data:image/png;base64,' + base64.b64encode(buf.getvalue()).decode('ascii')
def verify_totp_code(secret_base32: str, code: str) -> bool:
"""Verify a 6-digit TOTP code with a 1-window tolerance (current ±30s).
Rejects non-digit / wrong-length input early to avoid leaking timing.
"""
if not code or not isinstance(code, str):
return False
code = code.strip()
if len(code) != 6 or not code.isdigit():
return False
return pyotp.TOTP(secret_base32).verify(code, valid_window=1)
def _get_bcrypt():
"""Resolve the Flask-Bcrypt extension instance (handles missing context)."""
bcrypt = (
current_app.extensions.get('flask-bcrypt')
or current_app.extensions.get('bcrypt')
)
if bcrypt is None:
# Fall back to the global bcrypt initialised by init_auth_extensions()
from src.api.auth import bcrypt as _b
bcrypt = _b
if bcrypt is None:
raise RuntimeError('Flask-Bcrypt extension is not initialised')
return bcrypt
def generate_recovery_codes() -> Tuple[List[str], List[str]]:
"""Generate 10 fresh recovery codes.
Returns (display_codes, hashed_codes_for_storage):
- display_codes: human-readable XXXXX-XXXXX format, shown to user ONCE
- hashed_codes_for_storage: bcrypt-style hashes — store these in
User.totp_recovery_codes (JSON list).
"""
bcrypt = _get_bcrypt()
display_codes: List[str] = []
hashed_codes: List[str] = []
for _ in range(RECOVERY_CODES_COUNT):
# 5+5 base32 chars hyphenated for readability (XXXXX-XXXXX)
raw = base64.b32encode(secrets.token_bytes(7)).decode('ascii')[:RECOVERY_CODE_LENGTH]
display = f'{raw[:5]}-{raw[5:]}'
display_codes.append(display)
# Hash the hyphenated form (what the user will type back) with bcrypt
hashed = bcrypt.generate_password_hash(display).decode('ascii')
hashed_codes.append(hashed)
return display_codes, hashed_codes
def consume_recovery_code(user, candidate: str) -> bool:
"""Check `candidate` against the user's stored recovery code hashes.
On match: removes that hash from the list and commits. Returns True.
On mismatch: returns False, no DB write. Single-use: a code that
matched once will not match again.
"""
from src.database import db
bcrypt = _get_bcrypt()
if not candidate or not user.totp_recovery_codes:
return False
candidate = candidate.strip().upper()
remaining: List[str] = []
matched = False
for h in user.totp_recovery_codes:
if not matched and bcrypt.check_password_hash(h, candidate):
matched = True
# Drop this hash from the stored list (single-use)
continue
remaining.append(h)
if matched:
user.totp_recovery_codes = remaining
db.session.commit()
return matched
def set_user_totp(user, secret_base32: str, recovery_code_hashes: List[str]) -> None:
"""Persist the encrypted secret + recovery codes; mark MFA enabled."""
from src.database import db
user.totp_secret_encrypted = encrypt_totp_secret(secret_base32)
user.totp_recovery_codes = recovery_code_hashes
user.totp_enabled = True
db.session.commit()
def disable_user_totp(user) -> None:
"""Disable MFA: clear encrypted secret, recovery codes, and the flag."""
from src.database import db
user.totp_secret_encrypted = None
user.totp_recovery_codes = None
user.totp_enabled = False
db.session.commit()
def get_user_totp_secret(user) -> Optional[str]:
"""Return the decrypted base32 secret, or None if MFA not enrolled."""
if not user.totp_secret_encrypted:
return None
return decrypt_totp_secret(user.totp_secret_encrypted)

View File

@@ -292,6 +292,9 @@ def initialize_database(app):
app.logger.info("Added 'totp_enabled' column to user")
if add_column_if_not_exists(engine, 'user', 'webauthn_credentials', 'JSON'):
app.logger.info("Added webauthn_credentials column to user table")
# B-2.5: 10 single-use bcrypt-hashed recovery codes for TOTP MFA
if add_column_if_not_exists(engine, 'user', 'totp_recovery_codes', 'JSON'):
app.logger.info("Added totp_recovery_codes column to user table")
if add_column_if_not_exists(engine, 'user', 'ordre_pro', 'VARCHAR(50)'):
app.logger.info("Added ordre_pro column to user table")
if add_column_if_not_exists(engine, 'user', 'cabinet', 'VARCHAR(255)'):

View File

@@ -81,6 +81,9 @@ class User(db.Model, UserMixin):
# [{'id': str, 'public_key': str, 'sign_count': int, 'transports': list[str]}]
webauthn_credentials = db.Column(db.JSON, nullable=True)
# B-2.5: 10 single-use recovery codes (bcrypt-hashed). Cleared when MFA disabled.
totp_recovery_codes = db.Column(db.JSON, nullable=True)
# Loi 25 + ordre professionnel context (used at signup B-2.2)
ordre_pro = db.Column(db.String(50), nullable=True) # 'barreau', 'cpa', 'chad', etc.
cabinet = db.Column(db.String(255), nullable=True)