"""Tests for B-2.5 — TOTP MFA + recovery codes. Covers: - Encrypt/decrypt round-trip (Fernet derived from SECRET_KEY). - Decrypt rejects ciphertext from a different SECRET_KEY. - generate_totp_secret produces a valid base32 secret (RFC 6238). - verify_totp_code accepts current window, rejects wrong / non-digit codes. - generate_recovery_codes returns 10 display + 10 hashes that map 1:1. - consume_recovery_code is single-use; rejects unknown. - set_user_totp persists encrypted (not raw) secret + flips flag. - disable_user_totp clears all 3 fields. - Login route gates on totp_enabled; bypasses gate when disabled. - /2fa/verify accepts correct TOTP code, rejects wrong, accepts recovery. - /2fa/verify redirects to /login when no pending session. - /2fa/setup creates pending session on GET, persists on valid POST, keeps pending on invalid POST. - /2fa/disable requires correct password. Note: pytest cannot collect this file on Windows native because src/init_db.py imports `fcntl` (POSIX-only). Use tests/_run_totp_mfa_windows.py. """ import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') os.environ.setdefault('SECRET_KEY', 'test-secret-key-totp') os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') os.environ.setdefault('RATELIMIT_ENABLED', 'false') import pyotp # noqa: E402 from src.app import app, db, bcrypt # noqa: E402 from src.models.user import User # noqa: E402 from src.auth import totp as totp_module # noqa: E402 def _disable_csrf(): app.config['WTF_CSRF_ENABLED'] = False def _make_user(email='totp@example.qc.ca', password='Password!123', totp_enabled=False, username=None, totp_secret_encrypted=None, totp_recovery_codes=None): """Create a User row with bcrypt-hashed password. Returns the User.""" hashed = bcrypt.generate_password_hash(password).decode('utf-8') u = User( username=username or email.split('@', 1)[0][:20], email=email, password=hashed, email_verified=True, totp_enabled=totp_enabled, totp_secret_encrypted=totp_secret_encrypted, totp_recovery_codes=totp_recovery_codes, ) db.session.add(u) db.session.commit() return u # ---------------------------------------------------------------------- # 1. Service layer — encryption round-trip # ---------------------------------------------------------------------- def test_encrypt_decrypt_round_trips(): """encrypt_totp_secret → decrypt_totp_secret returns the original base32.""" with app.app_context(): secret = pyotp.random_base32() ct = totp_module.encrypt_totp_secret(secret) assert ct != secret # ciphertext is NOT the raw secret assert totp_module.decrypt_totp_secret(ct) == secret def test_decrypt_with_different_secret_key_fails(): """Fernet rejects a token encrypted with a different SECRET_KEY.""" with app.app_context(): secret = pyotp.random_base32() ct = totp_module.encrypt_totp_secret(secret) # Swap SECRET_KEY and confirm decryption fails original_key = app.config['SECRET_KEY'] try: app.config['SECRET_KEY'] = 'totally-different-secret-key' with app.app_context(): try: totp_module.decrypt_totp_secret(ct) raise AssertionError('Expected ValueError on key mismatch') except ValueError: pass finally: app.config['SECRET_KEY'] = original_key def test_generate_totp_secret_is_valid_base32_160_bit(): """generate_totp_secret returns a base32 string accepted by pyotp.TOTP.""" secret = totp_module.generate_totp_secret() assert isinstance(secret, str) assert len(secret) >= 16 # pyotp default is 32 chars (160 bits) # If invalid, .now() would raise code = pyotp.TOTP(secret).now() assert len(code) == 6 assert code.isdigit() # ---------------------------------------------------------------------- # 2. Service layer — verify_totp_code # ---------------------------------------------------------------------- def test_verify_totp_code_accepts_current_window(): """A code generated by pyotp.TOTP(secret).now() is accepted.""" with app.app_context(): secret = totp_module.generate_totp_secret() code = pyotp.TOTP(secret).now() assert totp_module.verify_totp_code(secret, code) is True def test_verify_totp_code_rejects_wrong_code(): """The literal '000000' is (almost certainly) not the current code.""" with app.app_context(): secret = totp_module.generate_totp_secret() current = pyotp.TOTP(secret).now() wrong = '000000' if current != '000000' else '111111' assert totp_module.verify_totp_code(secret, wrong) is False def test_verify_totp_code_rejects_non_digit(): """Non-digit input is rejected before reaching pyotp.""" secret = totp_module.generate_totp_secret() assert totp_module.verify_totp_code(secret, 'abcdef') is False assert totp_module.verify_totp_code(secret, '') is False assert totp_module.verify_totp_code(secret, '12345') is False # too short assert totp_module.verify_totp_code(secret, '1234567') is False # too long # ---------------------------------------------------------------------- # 3. Service layer — recovery codes # ---------------------------------------------------------------------- def test_generate_recovery_codes_returns_10_pairs(): """generate_recovery_codes returns 10 displays + 10 hashes that map 1:1.""" with app.app_context(): display, hashed = totp_module.generate_recovery_codes() assert len(display) == 10 assert len(hashed) == 10 # Each display follows XXXXX-XXXXX for d in display: assert len(d) == 11 assert d[5] == '-' # Each display matches its hash; mismatches don't match for d, h in zip(display, hashed): assert bcrypt.check_password_hash(h, d) is True # And a wrong code never matches assert bcrypt.check_password_hash(hashed[0], 'WRONG-CODE0') is False def test_consume_recovery_code_succeeds_once(): """Consuming a code twice: 2nd attempt fails; list shrinks by 1.""" with app.app_context(): db.create_all() try: user = _make_user(email='consume1@example.qc.ca') display, hashed = totp_module.generate_recovery_codes() user.totp_recovery_codes = hashed user.totp_enabled = True db.session.commit() assert len(user.totp_recovery_codes) == 10 assert totp_module.consume_recovery_code(user, display[0]) is True assert len(user.totp_recovery_codes) == 9 # Second attempt with the same code: rejected (single-use) assert totp_module.consume_recovery_code(user, display[0]) is False assert len(user.totp_recovery_codes) == 9 finally: db.session.rollback() db.drop_all() def test_consume_recovery_code_rejects_unknown(): """Submitting a code that isn't in the list returns False; no DB write.""" with app.app_context(): db.create_all() try: user = _make_user(email='consume2@example.qc.ca') _, hashed = totp_module.generate_recovery_codes() user.totp_recovery_codes = hashed user.totp_enabled = True db.session.commit() assert totp_module.consume_recovery_code(user, 'NOPE1-NOPE2') is False assert len(user.totp_recovery_codes) == 10 finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 4. Service layer — set / disable user TOTP # ---------------------------------------------------------------------- def test_set_user_totp_persists_encrypted_not_raw(): """set_user_totp encrypts secret, stores hashes, flips totp_enabled True.""" with app.app_context(): db.create_all() try: user = _make_user(email='setuser@example.qc.ca') secret = totp_module.generate_totp_secret() display, hashed = totp_module.generate_recovery_codes() totp_module.set_user_totp(user, secret, hashed) db.session.refresh(user) assert user.totp_enabled is True assert user.totp_secret_encrypted is not None assert user.totp_secret_encrypted != secret # encrypted, not raw assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret assert len(user.totp_recovery_codes) == 10 finally: db.session.rollback() db.drop_all() def test_disable_user_totp_clears_all_fields(): """disable_user_totp nullifies the 3 TOTP-related fields.""" with app.app_context(): db.create_all() try: user = _make_user(email='disable@example.qc.ca') secret = totp_module.generate_totp_secret() _, hashed = totp_module.generate_recovery_codes() totp_module.set_user_totp(user, secret, hashed) assert user.totp_enabled is True totp_module.disable_user_totp(user) db.session.refresh(user) assert user.totp_enabled is False assert user.totp_secret_encrypted is None assert user.totp_recovery_codes is None finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 5. Login route — TOTP gate # ---------------------------------------------------------------------- def test_login_redirects_to_totp_verify_when_mfa_enabled(): """Password OK + totp_enabled → 302 to /2fa/verify; pending_totp_user_id set.""" with app.app_context(): _disable_csrf() db.create_all() try: password = 'CorrectHorseBattery!42' secret = totp_module.generate_totp_secret() user = _make_user( email='gate@example.qc.ca', password=password, totp_enabled=True, totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), totp_recovery_codes=[], ) with app.test_client() as client: resp = client.post('/login', data={ 'email': 'gate@example.qc.ca', 'password': password, 'remember': 'y', }) assert resp.status_code == 302 assert '/2fa/verify' in resp.headers['Location'] with client.session_transaction() as sess: assert sess.get('pending_totp_user_id') == user.id assert sess.get('pending_totp_remember') is True # NOT yet logged in assert '_user_id' not in sess finally: db.session.rollback() db.drop_all() def test_login_logs_in_directly_when_mfa_disabled(): """Password OK + totp_enabled=False → 302 to /; no pending_totp_user_id.""" with app.app_context(): _disable_csrf() db.create_all() try: password = 'CorrectHorseBattery!42' user = _make_user( email='nomfa@example.qc.ca', password=password, totp_enabled=False, ) with app.test_client() as client: resp = client.post('/login', data={ 'email': 'nomfa@example.qc.ca', 'password': password, }) assert resp.status_code == 302 # Logged in directly with client.session_transaction() as sess: assert 'pending_totp_user_id' not in sess assert sess.get('_user_id') == str(user.id) finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 6. /2fa/verify — second factor route # ---------------------------------------------------------------------- def test_totp_verify_login_logs_in_with_correct_code(): """POST /2fa/verify with correct TOTP code logs the user in.""" with app.app_context(): _disable_csrf() db.create_all() try: secret = totp_module.generate_totp_secret() user = _make_user( email='verifyok@example.qc.ca', totp_enabled=True, totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), totp_recovery_codes=[], ) code = pyotp.TOTP(secret).now() with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id sess['pending_totp_remember'] = False resp = client.post('/2fa/verify', data={'code': code}) assert resp.status_code == 302 # Cleared pending + logged in with client.session_transaction() as sess: assert 'pending_totp_user_id' not in sess assert 'pending_totp_remember' not in sess assert sess.get('_user_id') == str(user.id) finally: db.session.rollback() db.drop_all() def test_totp_verify_login_rejects_wrong_code(): """POST /2fa/verify with '000000' → 400; not logged in.""" with app.app_context(): _disable_csrf() db.create_all() try: secret = totp_module.generate_totp_secret() user = _make_user( email='verifybad@example.qc.ca', totp_enabled=True, totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), totp_recovery_codes=[], ) current = pyotp.TOTP(secret).now() wrong = '000000' if current != '000000' else '111111' with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id resp = client.post('/2fa/verify', data={'code': wrong}) assert resp.status_code == 400 with client.session_transaction() as sess: assert '_user_id' not in sess # Pending session preserved so user can retry assert sess.get('pending_totp_user_id') == user.id finally: db.session.rollback() db.drop_all() def test_totp_verify_login_accepts_recovery_code(): """POST /2fa/verify with a recovery code logs in + consumes the code.""" with app.app_context(): _disable_csrf() db.create_all() try: secret = totp_module.generate_totp_secret() display, hashed = totp_module.generate_recovery_codes() user = _make_user( email='recovery@example.qc.ca', totp_enabled=True, totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), totp_recovery_codes=hashed, ) with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id resp = client.post('/2fa/verify', data={'recovery_code': display[0]}) assert resp.status_code == 302 with client.session_transaction() as sess: assert sess.get('_user_id') == str(user.id) # Recovery code consumed db.session.refresh(user) assert len(user.totp_recovery_codes) == 9 finally: db.session.rollback() db.drop_all() def test_totp_verify_login_redirects_to_login_without_pending_session(): """GET /2fa/verify with no pending session → 302 to /login.""" with app.app_context(): _disable_csrf() db.create_all() try: with app.test_client() as client: resp = client.get('/2fa/verify') assert resp.status_code == 302 assert '/login' in resp.headers['Location'] finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 7. /2fa/setup — enrollment route # ---------------------------------------------------------------------- def test_totp_setup_get_creates_pending_session(): """GET /2fa/setup as logged-in user shows QR and primes session.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='setupget@example.qc.ca') with app.test_client() as client: with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True resp = client.get('/2fa/setup') assert resp.status_code == 200 body = resp.data.decode('utf-8') assert 'data:image/png;base64,' in body with client.session_transaction() as sess: assert sess.get('totp_pending_secret') is not None assert len(sess.get('totp_pending_recovery_hashes')) == 10 assert len(sess.get('totp_pending_display_codes')) == 10 finally: db.session.rollback() db.drop_all() def test_totp_setup_post_with_valid_code_enables_mfa(): """POST with a code matching the pending secret enables MFA + clears session.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='setupok@example.qc.ca') secret = totp_module.generate_totp_secret() display, hashed = totp_module.generate_recovery_codes() with app.test_client() as client: with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True sess['totp_pending_secret'] = secret sess['totp_pending_recovery_hashes'] = hashed sess['totp_pending_display_codes'] = display code = pyotp.TOTP(secret).now() resp = client.post('/2fa/setup', data={'code': code}) assert resp.status_code == 302 # User enrolled db.session.refresh(user) assert user.totp_enabled is True assert user.totp_secret_encrypted is not None assert totp_module.decrypt_totp_secret(user.totp_secret_encrypted) == secret # Pending session cleared with client.session_transaction() as sess: assert 'totp_pending_secret' not in sess assert 'totp_pending_recovery_hashes' not in sess assert 'totp_pending_display_codes' not in sess finally: db.session.rollback() db.drop_all() def test_totp_setup_post_with_invalid_code_returns_400_keeps_pending(): """POST with wrong code → 400; user not yet enabled; pending preserved.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='setupbad@example.qc.ca') secret = totp_module.generate_totp_secret() display, hashed = totp_module.generate_recovery_codes() with app.test_client() as client: with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True sess['totp_pending_secret'] = secret sess['totp_pending_recovery_hashes'] = hashed sess['totp_pending_display_codes'] = display current = pyotp.TOTP(secret).now() wrong = '000000' if current != '000000' else '111111' resp = client.post('/2fa/setup', data={'code': wrong}) assert resp.status_code == 400 db.session.refresh(user) assert user.totp_enabled is False assert user.totp_secret_encrypted is None # Pending session preserved so user can retry without scanning a new QR with client.session_transaction() as sess: assert sess.get('totp_pending_secret') == secret finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 8. /2fa/disable — password re-auth required # ---------------------------------------------------------------------- def test_totp_disable_requires_password(): """Wrong password → flash + still enabled. Correct password → disabled.""" with app.app_context(): _disable_csrf() db.create_all() try: password = 'CorrectHorseBattery!42' secret = totp_module.generate_totp_secret() _, hashed = totp_module.generate_recovery_codes() user = _make_user( email='disable@example.qc.ca', password=password, totp_enabled=True, totp_secret_encrypted=totp_module.encrypt_totp_secret(secret), totp_recovery_codes=hashed, ) with app.test_client() as client: with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True # Wrong password → still enabled resp = client.post('/2fa/disable', data={'password': 'wrong-password'}) assert resp.status_code == 302 # redirect to /account db.session.refresh(user) assert user.totp_enabled is True assert user.totp_secret_encrypted is not None # Correct password → disabled resp = client.post('/2fa/disable', data={'password': password}) assert resp.status_code == 302 db.session.refresh(user) assert user.totp_enabled is False assert user.totp_secret_encrypted is None assert user.totp_recovery_codes is None finally: db.session.rollback() db.drop_all()