"""Tests for B-2.6 — WebAuthn / Passkey support. Covers: - Service layer helpers: b64url encode/decode, RP getters, list/has_passkeys. - begin_registration returns json-safe options + challenge. - finish_registration persists the credential dict in user.webauthn_credentials. - begin_authentication raises if user has no credentials. - finish_authentication increments sign_count (anti-cloning) + rejects unknown id. - delete_credential removes by id. - Routes: register/begin, register/finish (success + missing challenge + invalid), auth/begin (with/without pending session), auth/finish (success + invalid), delete (form POST), and the login gate that defers when has_passkeys. Mocks the python-webauthn library functions so the tests don't need a real authenticator. The library is installed (webauthn==2.5.2) so imports work, but the verify_* functions are patched. Note: pytest cannot collect this file on Windows native because src/init_db.py imports `fcntl` (POSIX-only). Use tests/_run_webauthn_passkey_windows.py. """ import os import sys from types import SimpleNamespace from unittest.mock import patch sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') os.environ.setdefault('SECRET_KEY', 'test-secret-key-webauthn') os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') os.environ.setdefault('RATELIMIT_ENABLED', 'false') from src.app import app, db, bcrypt # noqa: E402 from src.models.user import User # noqa: E402 from src.auth import webauthn as wa # noqa: E402 def _disable_csrf(): app.config['WTF_CSRF_ENABLED'] = False def _make_user(email='passkey@example.qc.ca', password='Password!123', username=None, webauthn_credentials=None, totp_enabled=False, totp_secret_encrypted=None, totp_recovery_codes=None): """Create a User row with bcrypt-hashed password. Returns the User.""" hashed = bcrypt.generate_password_hash(password).decode('utf-8') u = User( username=username or email.split('@', 1)[0][:20], email=email, password=hashed, email_verified=True, webauthn_credentials=webauthn_credentials, totp_enabled=totp_enabled, totp_secret_encrypted=totp_secret_encrypted, totp_recovery_codes=totp_recovery_codes, ) db.session.add(u) db.session.commit() return u def _login_session(client, user): """Mark a Flask-Login session as logged in for `user`.""" with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True # ---------------------------------------------------------------------- # 1-3. Helpers + RP config # ---------------------------------------------------------------------- def test_b64url_encode_decode_round_trips(): """bytes → str → bytes round-trips for various lengths (incl. padding edge).""" samples = [b'', b'A', b'AB', b'ABC', b'ABCD', b'\x00\xff\x10' * 20] for raw in samples: encoded = wa._b64url_encode(raw) assert isinstance(encoded, str) assert '=' not in encoded # no padding assert wa._b64url_decode(encoded) == raw def test_get_rp_id_default(): """get_rp_id returns 'localhost' when WEBAUTHN_RP_ID not set.""" saved = os.environ.pop('WEBAUTHN_RP_ID', None) try: assert wa.get_rp_id() == 'localhost' finally: if saved is not None: os.environ['WEBAUTHN_RP_ID'] = saved def test_get_rp_id_from_env(): """get_rp_id returns the env var value when set.""" saved = os.environ.get('WEBAUTHN_RP_ID') try: os.environ['WEBAUTHN_RP_ID'] = 'dictia.ca' assert wa.get_rp_id() == 'dictia.ca' finally: if saved is None: os.environ.pop('WEBAUTHN_RP_ID', None) else: os.environ['WEBAUTHN_RP_ID'] = saved # ---------------------------------------------------------------------- # 4-5. list_user_credentials / has_passkeys # ---------------------------------------------------------------------- def test_list_user_credentials_empty(): """Returns [] when webauthn_credentials is None.""" with app.app_context(): db.create_all() try: user = _make_user(email='listempty@example.qc.ca') assert wa.list_user_credentials(user) == [] assert wa.has_passkeys(user) is False finally: db.session.rollback() db.drop_all() def test_has_passkeys_true_after_credential_added(): """has_passkeys returns True after a credential dict is appended.""" with app.app_context(): db.create_all() try: user = _make_user( email='hasone@example.qc.ca', webauthn_credentials=[{ 'id': 'AQID', 'public_key': 'oLDA', 'sign_count': 0, 'transports': ['usb'], 'name': 'Test', 'created_at': '2026-04-27T00:00:00+00:00', }], ) assert wa.has_passkeys(user) is True assert len(wa.list_user_credentials(user)) == 1 finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 6. begin_registration # ---------------------------------------------------------------------- def test_begin_registration_returns_options_and_challenge(): """Mocks generate_registration_options; asserts json-safe dict + challenge str.""" with app.app_context(): db.create_all() try: user = _make_user(email='regbegin@example.qc.ca') fake_challenge = b'\x01\x02\x03\x04challenge-bytes\xff' with patch('src.auth.webauthn.generate_registration_options') as gen, \ patch('src.auth.webauthn.options_to_json') as to_json: gen.return_value = SimpleNamespace(challenge=fake_challenge) to_json.return_value = ( '{"challenge": "AQIDBGNoYWxsZW5nZS1ieXRlc_8", ' '"rp": {"name": "DictIA", "id": "localhost"}, ' '"user": {"id": "MQ", "name": "x", "displayName": "x"}}' ) opts, challenge_b64 = wa.begin_registration(user) assert isinstance(opts, dict) assert 'challenge' in opts assert 'rp' in opts assert isinstance(challenge_b64, str) assert wa._b64url_decode(challenge_b64) == fake_challenge gen.assert_called_once() finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 7. finish_registration # ---------------------------------------------------------------------- def test_finish_registration_persists_credential(): """Mocks verify_registration_response; asserts credential dict appended.""" with app.app_context(): db.create_all() try: user = _make_user(email='regfin@example.qc.ca') challenge = b'challenge-bytes-xyz' challenge_b64 = wa._b64url_encode(challenge) mock_verification = SimpleNamespace( credential_id=b'\xaa\xbb\xcc', credential_public_key=b'\x01\x02\x03', sign_count=0, ) response_json = { 'id': wa._b64url_encode(b'\xaa\xbb\xcc'), 'rawId': wa._b64url_encode(b'\xaa\xbb\xcc'), 'type': 'public-key', 'response': { 'clientDataJSON': 'aaa', 'attestationObject': 'bbb', 'transports': ['usb', 'nfc'], }, } with patch('src.auth.webauthn.verify_registration_response', return_value=mock_verification) as ver: cred = wa.finish_registration( user, response_json, challenge_b64, label='YubiKey 5C' ) ver.assert_called_once() assert cred['id'] == wa._b64url_encode(b'\xaa\xbb\xcc') assert cred['public_key'] == wa._b64url_encode(b'\x01\x02\x03') assert cred['sign_count'] == 0 assert cred['transports'] == ['usb', 'nfc'] assert cred['name'] == 'YubiKey 5C' assert 'created_at' in cred db.session.refresh(user) assert len(user.webauthn_credentials) == 1 assert user.webauthn_credentials[0]['id'] == cred['id'] finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 8. begin_authentication # ---------------------------------------------------------------------- def test_begin_authentication_raises_when_no_credentials(): """ValueError when user has no registered passkeys.""" with app.app_context(): db.create_all() try: user = _make_user(email='noauthcreds@example.qc.ca') try: wa.begin_authentication(user) raise AssertionError('Expected ValueError') except ValueError: pass finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 9-10. finish_authentication # ---------------------------------------------------------------------- def test_finish_authentication_increments_sign_count(): """Pre-set 1 cred sign_count=5; mock verify returns 6; assert persisted.""" with app.app_context(): db.create_all() try: cred_id = wa._b64url_encode(b'\xde\xad\xbe\xef') user = _make_user( email='signinc@example.qc.ca', webauthn_credentials=[{ 'id': cred_id, 'public_key': wa._b64url_encode(b'pub'), 'sign_count': 5, 'transports': ['internal'], 'name': 'Touch ID', 'created_at': '2026-04-27T00:00:00+00:00', }], ) challenge_b64 = wa._b64url_encode(b'auth-challenge') mock_ver = SimpleNamespace(new_sign_count=6) response_json = {'id': cred_id, 'type': 'public-key', 'response': {}} with patch('src.auth.webauthn.verify_authentication_response', return_value=mock_ver): matched = wa.finish_authentication( user, response_json, challenge_b64 ) assert matched['id'] == cred_id assert matched['sign_count'] == 6 db.session.refresh(user) assert user.webauthn_credentials[0]['sign_count'] == 6 finally: db.session.rollback() db.drop_all() def test_finish_authentication_rejects_unknown_credential_id(): """Response.id not in user's credentials → ValueError.""" with app.app_context(): db.create_all() try: user = _make_user( email='unknownid@example.qc.ca', webauthn_credentials=[{ 'id': 'KNOWN', 'public_key': 'pub', 'sign_count': 0, 'transports': [], 'name': 'A', 'created_at': '2026-04-27T00:00:00+00:00', }], ) response_json = {'id': 'NOT-REGISTERED', 'response': {}} try: wa.finish_authentication( user, response_json, wa._b64url_encode(b'c') ) raise AssertionError('Expected ValueError') except ValueError: pass finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 11-12. delete_credential # ---------------------------------------------------------------------- def test_delete_credential_removes_by_id(): """Pre-set 2 creds; delete one; remaining list shrinks.""" with app.app_context(): db.create_all() try: user = _make_user( email='delok@example.qc.ca', webauthn_credentials=[ {'id': 'A', 'public_key': 'p1', 'sign_count': 0, 'transports': [], 'name': 'A', 'created_at': '2026-04-27T00:00:00+00:00'}, {'id': 'B', 'public_key': 'p2', 'sign_count': 0, 'transports': [], 'name': 'B', 'created_at': '2026-04-27T00:00:00+00:00'}, ], ) assert wa.delete_credential(user, 'A') is True db.session.refresh(user) assert len(user.webauthn_credentials) == 1 assert user.webauthn_credentials[0]['id'] == 'B' finally: db.session.rollback() db.drop_all() def test_delete_credential_returns_false_for_unknown_id(): """Bogus id → False, list unchanged.""" with app.app_context(): db.create_all() try: user = _make_user( email='delno@example.qc.ca', webauthn_credentials=[ {'id': 'X', 'public_key': 'p', 'sign_count': 0, 'transports': [], 'name': 'X', 'created_at': '2026-04-27T00:00:00+00:00'}, ], ) assert wa.delete_credential(user, 'NOPE') is False db.session.refresh(user) assert len(user.webauthn_credentials) == 1 finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 13-16. /2fa/passkey/register routes # ---------------------------------------------------------------------- def test_passkey_register_begin_returns_options_for_logged_in_user(): """POST /2fa/passkey/register/begin → JSON options + session challenge set.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='regbeginrt@example.qc.ca') fake_challenge = b'route-begin-challenge' with app.test_client() as client: _login_session(client, user) with patch('src.auth.webauthn.generate_registration_options') as gen, \ patch('src.auth.webauthn.options_to_json') as to_json: gen.return_value = SimpleNamespace(challenge=fake_challenge) to_json.return_value = '{"challenge": "x", "rp": {"id": "localhost"}}' resp = client.post('/2fa/passkey/register/begin', json={}) assert resp.status_code == 200 data = resp.get_json() assert 'challenge' in data assert 'rp' in data with client.session_transaction() as sess: assert sess.get('webauthn_register_challenge') == \ wa._b64url_encode(fake_challenge) finally: db.session.rollback() db.drop_all() def test_passkey_register_finish_persists_credential(): """POST /2fa/passkey/register/finish with challenge in session + mocked verify.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='regfinrt@example.qc.ca') challenge = b'finish-route-challenge' challenge_b64 = wa._b64url_encode(challenge) mock_ver = SimpleNamespace( credential_id=b'\x10\x20\x30', credential_public_key=b'\x40\x50', sign_count=0, ) with app.test_client() as client: _login_session(client, user) with client.session_transaction() as sess: sess['webauthn_register_challenge'] = challenge_b64 with patch('src.auth.webauthn.verify_registration_response', return_value=mock_ver): resp = client.post('/2fa/passkey/register/finish', json={ 'label': 'My Key', 'response': { 'id': wa._b64url_encode(b'\x10\x20\x30'), 'rawId': wa._b64url_encode(b'\x10\x20\x30'), 'type': 'public-key', 'response': { 'clientDataJSON': 'aa', 'attestationObject': 'bb', 'transports': ['usb'], }, }, }) assert resp.status_code == 200 data = resp.get_json() assert data['ok'] is True assert data['credential']['name'] == 'My Key' db.session.refresh(user) assert len(user.webauthn_credentials) == 1 # Challenge consumed with client.session_transaction() as sess: assert 'webauthn_register_challenge' not in sess finally: db.session.rollback() db.drop_all() def test_passkey_register_finish_rejects_without_challenge(): """POST /2fa/passkey/register/finish without session challenge → 400 FR.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='regnoch@example.qc.ca') with app.test_client() as client: _login_session(client, user) resp = client.post('/2fa/passkey/register/finish', json={ 'response': {'id': 'x', 'response': {}}, }) assert resp.status_code == 400 data = resp.get_json() assert data['error'] == 'no_challenge' # French message assert 'défi' in data['message'].lower() or 'attente' in data['message'].lower() finally: db.session.rollback() db.drop_all() def test_passkey_register_finish_rejects_invalid_response(): """verify_registration_response raises → 400 with FR message.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user(email='regbad@example.qc.ca') challenge_b64 = wa._b64url_encode(b'c') with app.test_client() as client: _login_session(client, user) with client.session_transaction() as sess: sess['webauthn_register_challenge'] = challenge_b64 with patch('src.auth.webauthn.verify_registration_response', side_effect=Exception('bad')): resp = client.post('/2fa/passkey/register/finish', json={ 'response': {'id': 'x', 'response': {}}, }) assert resp.status_code == 400 data = resp.get_json() assert data['error'] == 'verification_failed' assert 'vérification' in data['message'].lower() or \ 'échec' in data['message'].lower() finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 17-20. /2fa/passkey/auth routes # ---------------------------------------------------------------------- def test_passkey_auth_begin_uses_pending_session_user(): """POST /2fa/passkey/auth/begin with pending_totp_user_id → options.""" with app.app_context(): _disable_csrf() db.create_all() try: cred_id = wa._b64url_encode(b'authcred') user = _make_user( email='authbegin@example.qc.ca', webauthn_credentials=[{ 'id': cred_id, 'public_key': wa._b64url_encode(b'pub'), 'sign_count': 0, 'transports': [], 'name': 'A', 'created_at': '2026-04-27T00:00:00+00:00', }], ) fake_challenge = b'auth-route-challenge' with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id with patch('src.auth.webauthn.generate_authentication_options') as gen, \ patch('src.auth.webauthn.options_to_json') as to_json: gen.return_value = SimpleNamespace(challenge=fake_challenge) to_json.return_value = '{"challenge": "x", "rpId": "localhost"}' resp = client.post('/2fa/passkey/auth/begin', json={}) assert resp.status_code == 200 data = resp.get_json() assert 'challenge' in data with client.session_transaction() as sess: assert sess.get('webauthn_auth_challenge') == \ wa._b64url_encode(fake_challenge) finally: db.session.rollback() db.drop_all() def test_passkey_auth_begin_returns_400_without_pending_session(): """No pending_totp_user_id → 400 with FR error.""" with app.app_context(): _disable_csrf() db.create_all() try: with app.test_client() as client: resp = client.post('/2fa/passkey/auth/begin', json={}) assert resp.status_code == 400 data = resp.get_json() assert data['error'] == 'no_session' finally: db.session.rollback() db.drop_all() def test_passkey_auth_finish_logs_in_user_and_returns_redirect(): """Successful auth → ok+redirect, user logged in via session.""" with app.app_context(): _disable_csrf() db.create_all() try: cred_id = wa._b64url_encode(b'\xaa\xbb') user = _make_user( email='authfin@example.qc.ca', webauthn_credentials=[{ 'id': cred_id, 'public_key': wa._b64url_encode(b'p'), 'sign_count': 3, 'transports': [], 'name': 'X', 'created_at': '2026-04-27T00:00:00+00:00', }], ) challenge_b64 = wa._b64url_encode(b'authch') mock_ver = SimpleNamespace(new_sign_count=4) with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id sess['webauthn_auth_challenge'] = challenge_b64 sess['pending_totp_remember'] = True with patch('src.auth.webauthn.verify_authentication_response', return_value=mock_ver): resp = client.post('/2fa/passkey/auth/finish', json={ 'response': {'id': cred_id, 'response': {}}, }) assert resp.status_code == 200 data = resp.get_json() assert data['ok'] is True assert 'redirect' in data with client.session_transaction() as sess: assert sess.get('_user_id') == str(user.id) assert 'pending_totp_user_id' not in sess assert 'webauthn_auth_challenge' not in sess # sign_count incremented in DB db.session.refresh(user) assert user.webauthn_credentials[0]['sign_count'] == 4 finally: db.session.rollback() db.drop_all() def test_passkey_auth_finish_rejects_invalid(): """verify raises → 400 verification_failed; user NOT logged in.""" with app.app_context(): _disable_csrf() db.create_all() try: cred_id = wa._b64url_encode(b'\x99') user = _make_user( email='authbad@example.qc.ca', webauthn_credentials=[{ 'id': cred_id, 'public_key': wa._b64url_encode(b'p'), 'sign_count': 0, 'transports': [], 'name': 'X', 'created_at': '2026-04-27T00:00:00+00:00', }], ) challenge_b64 = wa._b64url_encode(b'c') with app.test_client() as client: with client.session_transaction() as sess: sess['pending_totp_user_id'] = user.id sess['webauthn_auth_challenge'] = challenge_b64 with patch('src.auth.webauthn.verify_authentication_response', side_effect=Exception('nope')): resp = client.post('/2fa/passkey/auth/finish', json={ 'response': {'id': cred_id, 'response': {}}, }) assert resp.status_code == 400 data = resp.get_json() assert data['error'] == 'verification_failed' with client.session_transaction() as sess: assert '_user_id' not in sess finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 21. Delete route (form POST) # ---------------------------------------------------------------------- def test_passkey_delete_route_removes_credential(): """POST /2fa/passkey/credentials//delete removes it.""" with app.app_context(): _disable_csrf() db.create_all() try: user = _make_user( email='delroute@example.qc.ca', webauthn_credentials=[{ 'id': 'TARGET', 'public_key': 'p', 'sign_count': 0, 'transports': [], 'name': 'Target', 'created_at': '2026-04-27T00:00:00+00:00', }], ) with app.test_client() as client: _login_session(client, user) resp = client.post( '/2fa/passkey/credentials/TARGET/delete', follow_redirects=False, ) assert resp.status_code == 302 db.session.refresh(user) assert (user.webauthn_credentials or []) == [] finally: db.session.rollback() db.drop_all() # ---------------------------------------------------------------------- # 22. Login gate extension # ---------------------------------------------------------------------- def test_login_gate_redirects_to_2fa_when_user_has_passkey(): """Login with valid password + has_passkeys → 302 to /2fa/verify.""" with app.app_context(): _disable_csrf() db.create_all() try: password = 'CorrectHorseBattery!42' user = _make_user( email='gatepasskey@example.qc.ca', password=password, totp_enabled=False, webauthn_credentials=[{ 'id': 'GATE', 'public_key': 'p', 'sign_count': 0, 'transports': [], 'name': 'Gate', 'created_at': '2026-04-27T00:00:00+00:00', }], ) with app.test_client() as client: resp = client.post('/login', data={ 'email': 'gatepasskey@example.qc.ca', 'password': password, 'remember': 'y', }) assert resp.status_code == 302 assert '/2fa/verify' in resp.headers['Location'] with client.session_transaction() as sess: assert sess.get('pending_totp_user_id') == user.id assert sess.get('pending_totp_remember') is True # NOT logged in yet assert '_user_id' not in sess finally: db.session.rollback() db.drop_all()