Files
dictia-public/tests/test_totp_mfa.py
Allison aa269c5bc0 feat(auth): B-2.5 TOTP MFA + recovery codes (Fernet-encrypted secret)
Adds TOTP-based two-factor authentication (RFC 6238) with 10 single-use
recovery codes. Secret is encrypted at rest with a Fernet key derived
deterministically from app SECRET_KEY (SHA-256 -> urlsafe-base64); the raw
base32 secret never lives in the database. Recovery codes are bcrypt-hashed
and consumed atomically (single-use, removed from the JSON list on match).

Routes:
- GET /2fa/setup: generate fresh secret + QR + 10 recovery codes; cache
  pending state in session, render auth/totp_setup.html with inline QR
  data URL and the 10 codes shown ONCE.
- POST /2fa/setup: verify the user-submitted 6-digit code against the
  pending secret; on success persist encrypted secret + hashes and flip
  totp_enabled=True. On invalid code re-render same QR (don't rotate),
  preserving the user's authenticator scan.
- GET /2fa/verify: second factor during login; reads pending_totp_user_id
  from session and renders auth/totp_verify.html (TOTP code input +
  collapsed recovery code form, with X codes restants notice).
- POST /2fa/verify: accepts EITHER a 6-digit TOTP code OR a recovery code;
  on success finalises login_user (preserving remember-me intent + next
  URL captured at the password step), audits success/failure.
- POST /2fa/disable: requires password re-auth; nullifies the 3 TOTP fields.

Login gate (src/api/auth.py /login): after password+email-verification
checks but BEFORE login_user, if user.totp_enabled set
session['pending_totp_user_id'] / pending_totp_remember /
pending_totp_next and 302 -> /2fa/verify. OAuth/SSO/magic-link paths are
intentionally NOT gated in B-2.5 (deferred — IdP handles its own MFA).

Schema:
- New JSON column User.totp_recovery_codes (nullable) added via
  add_column_if_not_exists in src/init_db.py (no Alembic, follows existing
  pattern).
- Re-uses B-2.1 columns totp_secret_encrypted (VARCHAR 255) and
  totp_enabled (BOOLEAN); both already migrated.

Compatibility audit overrides honoured:
- Service layer at src/auth/totp.py (NOT a new src/auth_extended/ pkg).
- Templates at templates/auth/totp_setup.html and templates/auth/totp_verify.html
  extending marketing/base.html with brand tokens + WCAG patterns
  (focus-visible, role=alert, aria-required, autocomplete=one-time-code,
  inputmode=numeric).
- account.html integration deferred to a polish task — admins access
  /2fa/setup directly for now.

Tests (21, all green via Windows manual driver):
- Service layer: encrypt/decrypt round-trip, key-mismatch rejection, secret
  validity, code verification (current/wrong/non-digit), recovery codes
  (10 pairs, 1:1 bcrypt mapping, single-use consumption, unknown rejection),
  set/disable user TOTP fields.
- Routes: login redirect-to-/2fa/verify when totp_enabled, direct login
  when disabled, /2fa/verify with correct/wrong TOTP, recovery code consume,
  redirect-to-login when no pending session, /2fa/setup GET creates pending,
  POST with valid code enables MFA, POST with invalid code keeps pending +
  returns 400, /2fa/disable wrong/correct password.

Regression check: prior 21 OAuth+magic-link, 16 email-service, and 9
signup-Loi-25 tests all still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 00:08:40 -04:00

543 lines
22 KiB
Python

"""Tests for B-2.5 — TOTP MFA + recovery codes.
Covers:
- Encrypt/decrypt round-trip (Fernet derived from SECRET_KEY).
- Decrypt rejects ciphertext from a different SECRET_KEY.
- generate_totp_secret produces a valid base32 secret (RFC 6238).
- verify_totp_code accepts current window, rejects wrong / non-digit codes.
- generate_recovery_codes returns 10 display + 10 hashes that map 1:1.
- consume_recovery_code is single-use; rejects unknown.
- set_user_totp persists encrypted (not raw) secret + flips flag.
- disable_user_totp clears all 3 fields.
- Login route gates on totp_enabled; bypasses gate when disabled.
- /2fa/verify accepts correct TOTP code, rejects wrong, accepts recovery.
- /2fa/verify redirects to /login when no pending session.
- /2fa/setup creates pending session on GET, persists on valid POST,
keeps pending on invalid POST.
- /2fa/disable requires correct password.
Note: pytest cannot collect this file on Windows native because src/init_db.py
imports `fcntl` (POSIX-only). Use tests/_run_totp_mfa_windows.py.
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
os.environ.setdefault('SECRET_KEY', 'test-secret-key-totp')
os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false')
os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false')
os.environ.setdefault('RATELIMIT_ENABLED', 'false')
import pyotp # noqa: E402
from src.app import app, db, bcrypt # noqa: E402
from src.models.user import User # noqa: E402
from src.auth import totp as totp_module # noqa: E402
def _disable_csrf():
app.config['WTF_CSRF_ENABLED'] = False
def _make_user(email='totp@example.qc.ca', password='Password!123', totp_enabled=False,
username=None, totp_secret_encrypted=None, totp_recovery_codes=None):
"""Create a User row with bcrypt-hashed password. Returns the User."""
hashed = bcrypt.generate_password_hash(password).decode('utf-8')
u = User(
username=username or email.split('@', 1)[0][:20],
email=email,
password=hashed,
email_verified=True,
totp_enabled=totp_enabled,
totp_secret_encrypted=totp_secret_encrypted,
totp_recovery_codes=totp_recovery_codes,
)
db.session.add(u)
db.session.commit()
return u
# ----------------------------------------------------------------------
# 1. Service layer — encryption round-trip
# ----------------------------------------------------------------------
def test_encrypt_decrypt_round_trips():
"""encrypt_totp_secret → decrypt_totp_secret returns the original base32."""
with app.app_context():
secret = pyotp.random_base32()
ct = totp_module.encrypt_totp_secret(secret)
assert ct != secret # ciphertext is NOT the raw secret
assert totp_module.decrypt_totp_secret(ct) == secret
def test_decrypt_with_different_secret_key_fails():
"""Fernet rejects a token encrypted with a different SECRET_KEY."""
with app.app_context():
secret = pyotp.random_base32()
ct = totp_module.encrypt_totp_secret(secret)
# Swap SECRET_KEY and confirm decryption fails
original_key = app.config['SECRET_KEY']
try:
app.config['SECRET_KEY'] = 'totally-different-secret-key'
with app.app_context():
try:
totp_module.decrypt_totp_secret(ct)
raise AssertionError('Expected ValueError on key mismatch')
except ValueError:
pass
finally:
app.config['SECRET_KEY'] = original_key
def test_generate_totp_secret_is_valid_base32_160_bit():
"""generate_totp_secret returns a base32 string accepted by pyotp.TOTP."""
secret = totp_module.generate_totp_secret()
assert isinstance(secret, str)
assert len(secret) >= 16 # pyotp default is 32 chars (160 bits)
# If invalid, .now() would raise
code = pyotp.TOTP(secret).now()
assert len(code) == 6
assert code.isdigit()
# ----------------------------------------------------------------------
# 2. Service layer — verify_totp_code
# ----------------------------------------------------------------------
def test_verify_totp_code_accepts_current_window():
"""A code generated by pyotp.TOTP(secret).now() is accepted."""
with app.app_context():
secret = totp_module.generate_totp_secret()
code = pyotp.TOTP(secret).now()
assert totp_module.verify_totp_code(secret, code) is True
def test_verify_totp_code_rejects_wrong_code():
"""The literal '000000' is (almost certainly) not the current code."""
with app.app_context():
secret = totp_module.generate_totp_secret()
current = pyotp.TOTP(secret).now()
wrong = '000000' if current != '000000' else '111111'
assert totp_module.verify_totp_code(secret, wrong) is False
def test_verify_totp_code_rejects_non_digit():
"""Non-digit input is rejected before reaching pyotp."""
secret = totp_module.generate_totp_secret()
assert totp_module.verify_totp_code(secret, 'abcdef') is False
assert totp_module.verify_totp_code(secret, '') is False
assert totp_module.verify_totp_code(secret, '12345') is False # too short
assert totp_module.verify_totp_code(secret, '1234567') is False # too long
# ----------------------------------------------------------------------
# 3. Service layer — recovery codes
# ----------------------------------------------------------------------
def test_generate_recovery_codes_returns_10_pairs():
"""generate_recovery_codes returns 10 displays + 10 hashes that map 1:1."""
with app.app_context():
display, hashed = totp_module.generate_recovery_codes()
assert len(display) == 10
assert len(hashed) == 10
# Each display follows XXXXX-XXXXX
for d in display:
assert len(d) == 11
assert d[5] == '-'
# Each display matches its hash; mismatches don't match
for d, h in zip(display, hashed):
assert bcrypt.check_password_hash(h, d) is True
# And a wrong code never matches
assert bcrypt.check_password_hash(hashed[0], 'WRONG-CODE0') is False
def test_consume_recovery_code_succeeds_once():
"""Consuming a code twice: 2nd attempt fails; list shrinks by 1."""
with app.app_context():
db.create_all()
try:
user = _make_user(email='consume1@example.qc.ca')
display, hashed = totp_module.generate_recovery_codes()
user.totp_recovery_codes = hashed
user.totp_enabled = True
db.session.commit()
assert len(user.totp_recovery_codes) == 10
assert totp_module.consume_recovery_code(user, display[0]) is True
assert len(user.totp_recovery_codes) == 9
# Second attempt with the same code: rejected (single-use)
assert totp_module.consume_recovery_code(user, display[0]) is False
assert len(user.totp_recovery_codes) == 9
finally:
db.session.rollback()
db.drop_all()
def test_consume_recovery_code_rejects_unknown():
"""Submitting a code that isn't in the list returns False; no DB write."""
with app.app_context():
db.create_all()
try:
user = _make_user(email='consume2@example.qc.ca')
_, hashed = totp_module.generate_recovery_codes()
user.totp_recovery_codes = hashed
user.totp_enabled = True
db.session.commit()
assert totp_module.consume_recovery_code(user, 'NOPE1-NOPE2') is False
assert len(user.totp_recovery_codes) == 10
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 4. Service layer — set / disable user TOTP
# ----------------------------------------------------------------------
def test_set_user_totp_persists_encrypted_not_raw():
"""set_user_totp encrypts secret, stores hashes, flips totp_enabled True."""
with app.app_context():
db.create_all()
try:
user = _make_user(email='setuser@example.qc.ca')
secret = totp_module.generate_totp_secret()
display, hashed = totp_module.generate_recovery_codes()
totp_module.set_user_totp(user, secret, hashed)
db.session.refresh(user)
assert user.totp_enabled is True
assert user.totp_secret_encrypted is not None
assert user.totp_secret_encrypted != secret # encrypted, not raw
assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret
assert len(user.totp_recovery_codes) == 10
finally:
db.session.rollback()
db.drop_all()
def test_disable_user_totp_clears_all_fields():
"""disable_user_totp nullifies the 3 TOTP-related fields."""
with app.app_context():
db.create_all()
try:
user = _make_user(email='disable@example.qc.ca')
secret = totp_module.generate_totp_secret()
_, hashed = totp_module.generate_recovery_codes()
totp_module.set_user_totp(user, secret, hashed)
assert user.totp_enabled is True
totp_module.disable_user_totp(user)
db.session.refresh(user)
assert user.totp_enabled is False
assert user.totp_secret_encrypted is None
assert user.totp_recovery_codes is None
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 5. Login route — TOTP gate
# ----------------------------------------------------------------------
def test_login_redirects_to_totp_verify_when_mfa_enabled():
"""Password OK + totp_enabled → 302 to /2fa/verify; pending_totp_user_id set."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
password = 'CorrectHorseBattery!42'
secret = totp_module.generate_totp_secret()
user = _make_user(
email='gate@example.qc.ca', password=password,
totp_enabled=True,
totp_secret_encrypted=totp_module.encrypt_totp_secret(secret),
totp_recovery_codes=[],
)
with app.test_client() as client:
resp = client.post('/login', data={
'email': 'gate@example.qc.ca',
'password': password,
'remember': 'y',
})
assert resp.status_code == 302
assert '/2fa/verify' in resp.headers['Location']
with client.session_transaction() as sess:
assert sess.get('pending_totp_user_id') == user.id
assert sess.get('pending_totp_remember') is True
# NOT yet logged in
assert '_user_id' not in sess
finally:
db.session.rollback()
db.drop_all()
def test_login_logs_in_directly_when_mfa_disabled():
"""Password OK + totp_enabled=False → 302 to /; no pending_totp_user_id."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
password = 'CorrectHorseBattery!42'
user = _make_user(
email='nomfa@example.qc.ca', password=password, totp_enabled=False,
)
with app.test_client() as client:
resp = client.post('/login', data={
'email': 'nomfa@example.qc.ca',
'password': password,
})
assert resp.status_code == 302
# Logged in directly
with client.session_transaction() as sess:
assert 'pending_totp_user_id' not in sess
assert sess.get('_user_id') == str(user.id)
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 6. /2fa/verify — second factor route
# ----------------------------------------------------------------------
def test_totp_verify_login_logs_in_with_correct_code():
"""POST /2fa/verify with correct TOTP code logs the user in."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
secret = totp_module.generate_totp_secret()
user = _make_user(
email='verifyok@example.qc.ca',
totp_enabled=True,
totp_secret_encrypted=totp_module.encrypt_totp_secret(secret),
totp_recovery_codes=[],
)
code = pyotp.TOTP(secret).now()
with app.test_client() as client:
with client.session_transaction() as sess:
sess['pending_totp_user_id'] = user.id
sess['pending_totp_remember'] = False
resp = client.post('/2fa/verify', data={'code': code})
assert resp.status_code == 302
# Cleared pending + logged in
with client.session_transaction() as sess:
assert 'pending_totp_user_id' not in sess
assert 'pending_totp_remember' not in sess
assert sess.get('_user_id') == str(user.id)
finally:
db.session.rollback()
db.drop_all()
def test_totp_verify_login_rejects_wrong_code():
"""POST /2fa/verify with '000000' → 400; not logged in."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
secret = totp_module.generate_totp_secret()
user = _make_user(
email='verifybad@example.qc.ca',
totp_enabled=True,
totp_secret_encrypted=totp_module.encrypt_totp_secret(secret),
totp_recovery_codes=[],
)
current = pyotp.TOTP(secret).now()
wrong = '000000' if current != '000000' else '111111'
with app.test_client() as client:
with client.session_transaction() as sess:
sess['pending_totp_user_id'] = user.id
resp = client.post('/2fa/verify', data={'code': wrong})
assert resp.status_code == 400
with client.session_transaction() as sess:
assert '_user_id' not in sess
# Pending session preserved so user can retry
assert sess.get('pending_totp_user_id') == user.id
finally:
db.session.rollback()
db.drop_all()
def test_totp_verify_login_accepts_recovery_code():
"""POST /2fa/verify with a recovery code logs in + consumes the code."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
secret = totp_module.generate_totp_secret()
display, hashed = totp_module.generate_recovery_codes()
user = _make_user(
email='recovery@example.qc.ca',
totp_enabled=True,
totp_secret_encrypted=totp_module.encrypt_totp_secret(secret),
totp_recovery_codes=hashed,
)
with app.test_client() as client:
with client.session_transaction() as sess:
sess['pending_totp_user_id'] = user.id
resp = client.post('/2fa/verify', data={'recovery_code': display[0]})
assert resp.status_code == 302
with client.session_transaction() as sess:
assert sess.get('_user_id') == str(user.id)
# Recovery code consumed
db.session.refresh(user)
assert len(user.totp_recovery_codes) == 9
finally:
db.session.rollback()
db.drop_all()
def test_totp_verify_login_redirects_to_login_without_pending_session():
"""GET /2fa/verify with no pending session → 302 to /login."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
with app.test_client() as client:
resp = client.get('/2fa/verify')
assert resp.status_code == 302
assert '/login' in resp.headers['Location']
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 7. /2fa/setup — enrollment route
# ----------------------------------------------------------------------
def test_totp_setup_get_creates_pending_session():
"""GET /2fa/setup as logged-in user shows QR and primes session."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
user = _make_user(email='setupget@example.qc.ca')
with app.test_client() as client:
with client.session_transaction() as sess:
sess['_user_id'] = str(user.id)
sess['_fresh'] = True
resp = client.get('/2fa/setup')
assert resp.status_code == 200
body = resp.data.decode('utf-8')
assert 'data:image/png;base64,' in body
with client.session_transaction() as sess:
assert sess.get('totp_pending_secret') is not None
assert len(sess.get('totp_pending_recovery_hashes')) == 10
assert len(sess.get('totp_pending_display_codes')) == 10
finally:
db.session.rollback()
db.drop_all()
def test_totp_setup_post_with_valid_code_enables_mfa():
"""POST with a code matching the pending secret enables MFA + clears session."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
user = _make_user(email='setupok@example.qc.ca')
secret = totp_module.generate_totp_secret()
display, hashed = totp_module.generate_recovery_codes()
with app.test_client() as client:
with client.session_transaction() as sess:
sess['_user_id'] = str(user.id)
sess['_fresh'] = True
sess['totp_pending_secret'] = secret
sess['totp_pending_recovery_hashes'] = hashed
sess['totp_pending_display_codes'] = display
code = pyotp.TOTP(secret).now()
resp = client.post('/2fa/setup', data={'code': code})
assert resp.status_code == 302
# User enrolled
db.session.refresh(user)
assert user.totp_enabled is True
assert user.totp_secret_encrypted is not None
assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret
# Pending session cleared
with client.session_transaction() as sess:
assert 'totp_pending_secret' not in sess
assert 'totp_pending_recovery_hashes' not in sess
assert 'totp_pending_display_codes' not in sess
finally:
db.session.rollback()
db.drop_all()
def test_totp_setup_post_with_invalid_code_returns_400_keeps_pending():
"""POST with wrong code → 400; user not yet enabled; pending preserved."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
user = _make_user(email='setupbad@example.qc.ca')
secret = totp_module.generate_totp_secret()
display, hashed = totp_module.generate_recovery_codes()
with app.test_client() as client:
with client.session_transaction() as sess:
sess['_user_id'] = str(user.id)
sess['_fresh'] = True
sess['totp_pending_secret'] = secret
sess['totp_pending_recovery_hashes'] = hashed
sess['totp_pending_display_codes'] = display
current = pyotp.TOTP(secret).now()
wrong = '000000' if current != '000000' else '111111'
resp = client.post('/2fa/setup', data={'code': wrong})
assert resp.status_code == 400
db.session.refresh(user)
assert user.totp_enabled is False
assert user.totp_secret_encrypted is None
# Pending session preserved so user can retry without scanning a new QR
with client.session_transaction() as sess:
assert sess.get('totp_pending_secret') == secret
finally:
db.session.rollback()
db.drop_all()
# ----------------------------------------------------------------------
# 8. /2fa/disable — password re-auth required
# ----------------------------------------------------------------------
def test_totp_disable_requires_password():
"""Wrong password → flash + still enabled. Correct password → disabled."""
with app.app_context():
_disable_csrf()
db.create_all()
try:
password = 'CorrectHorseBattery!42'
secret = totp_module.generate_totp_secret()
_, hashed = totp_module.generate_recovery_codes()
user = _make_user(
email='disable@example.qc.ca', password=password,
totp_enabled=True,
totp_secret_encrypted=totp_module.encrypt_totp_secret(secret),
totp_recovery_codes=hashed,
)
with app.test_client() as client:
with client.session_transaction() as sess:
sess['_user_id'] = str(user.id)
sess['_fresh'] = True
# Wrong password → still enabled
resp = client.post('/2fa/disable', data={'password': 'wrong-password'})
assert resp.status_code == 302 # redirect to /account
db.session.refresh(user)
assert user.totp_enabled is True
assert user.totp_secret_encrypted is not None
# Correct password → disabled
resp = client.post('/2fa/disable', data={'password': password})
assert resp.status_code == 302
db.session.refresh(user)
assert user.totp_enabled is False
assert user.totp_secret_encrypted is None
assert user.totp_recovery_codes is None
finally:
db.session.rollback()
db.drop_all()