From b8fa321edd5b5b7feb2470d94ceed649c6343076 Mon Sep 17 00:00:00 2001 From: Allison Date: Tue, 28 Apr 2026 00:27:09 -0400 Subject: [PATCH] feat(auth): B-2.6 WebAuthn / Passkey support (FIDO2 + biometric 2FA) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds phishing-resistant 2nd factor via FIDO2 hardware keys (YubiKey etc.) and device biometrics (Touch ID, Windows Hello, etc.). Reuses the existing B-2.5 TOTP gate so a passkey is a 3rd valid option on /2fa/verify, alongside TOTP code and recovery code. Post-login enrolment lives at /2fa/passkey/setup. Wraps python-webauthn==2.5.2 in a thin service layer (src/auth/webauthn.py) that persists credentials in the existing User.webauthn_credentials JSON column (added in B-2.1 — no schema change). Each credential dict carries id, public_key, sign_count, transports, name, and created_at. sign_count is updated after every successful authentication for WebAuthn anti-cloning (§6.1.1). Backend: 6 new auth routes (passkey_setup, register/begin, register/finish, delete, auth/begin, auth/finish). The 4 JSON endpoints are CSRF-exempt at Flask-WTF level because CSRFProtect cannot read tokens from a JSON body without app-wide config; the X-CSRFToken header is still sent as defence-in-depth. The form-POST delete route DOES enforce CSRF. The @csrf_exempt decorator was previously a no-op label; init_auth_extensions now walks module-level functions and applies real csrf.exempt() to any flagged with _csrf_exempt=True. Login gate now fires when the user has TOTP enabled OR at least one passkey, and totp_verify_login passes has_passkeys + has_totp flags so the template can show only the relevant sections. Frontend: templates/auth/totp_verify.html updated IN PLACE with a passkey button section (above TOTP) and an "ou" divider. New templates/auth/passkey_setup.html for managing/enrolling passkeys. New static/js/webauthn-client.js (no external deps, ES2020) wraps navigator.credentials and exchanges base64url payloads with the backend. Tailwind CSS rebuilt. Tests: 22 new tests in tests/test_webauthn_passkey.py covering the service layer (b64url helpers, RP config, list/has, begin/finish for both registration and authentication, delete) and the route flow (CSRF-exempt JSON endpoints, login gate redirection, sign_count anti-cloning persistence). Mocks python-webauthn's verify_* functions so tests run without a real authenticator. Windows manual driver follows the existing no-conftest pattern. Self-review: 22/22 new tests pass; 21/21 prior TOTP, 16/16 email, 21/21 OAuth tests still pass (no regression). Env: config/env.oauth.example documents WEBAUTHN_RP_ID, WEBAUTHN_RP_NAME, WEBAUTHN_ORIGIN with full deployment notes. Co-Authored-By: Claude Opus 4.7 (1M context) --- config/env.oauth.example | 32 ++ src/api/auth.py | 248 ++++++++- src/auth/webauthn.py | 241 +++++++++ static/css/marketing.css | 15 + static/js/webauthn-client.js | 178 +++++++ templates/auth/passkey_setup.html | 79 +++ templates/auth/totp_verify.html | 36 ++ tests/_run_webauthn_passkey_windows.py | 77 +++ tests/test_webauthn_passkey.py | 681 +++++++++++++++++++++++++ 9 files changed, 1580 insertions(+), 7 deletions(-) create mode 100644 src/auth/webauthn.py create mode 100644 static/js/webauthn-client.js create mode 100644 templates/auth/passkey_setup.html create mode 100644 tests/_run_webauthn_passkey_windows.py create mode 100644 tests/test_webauthn_passkey.py diff --git a/config/env.oauth.example b/config/env.oauth.example index f24172b..affe490 100644 --- a/config/env.oauth.example +++ b/config/env.oauth.example @@ -67,3 +67,35 @@ # - 15-minute expiry, signed with SECRET_KEY + salt 'magic-link-login' # - No DB column — tokens are not single-use within the 15-min window # - SMTP must be configured (see env.email.example) for the link to send + +############################################################################### +# WebAuthn / Passkey (B-2.6) +############################################################################### +# Phishing-resistant 2nd factor via FIDO2 hardware keys (YubiKey etc.) and +# device biometrics (Touch ID, Windows Hello). Browsers strictly enforce that +# the values below match the page making the WebAuthn API call: +# +# - WEBAUTHN_RP_ID : the registrable host name (NO scheme, NO port). Must +# match the eTLD+1 of the page or be a parent domain. For dictia.ca use +# 'dictia.ca'; for staging at app.staging.dictia.ca use 'dictia.ca' or +# 'staging.dictia.ca'. Defaults to 'localhost' for local development. +# +# - WEBAUTHN_RP_NAME : the display name shown to the user inside their +# authenticator's prompt (e.g. 'Sign in to DictIA'). Defaults to 'DictIA'. +# +# - WEBAUTHN_ORIGIN : the FULL origin including scheme + host + optional +# port. MUST equal window.location.origin on the client side. Mismatches +# are rejected by the browser before the request even reaches the server. +# Defaults to 'http://localhost:8899' for local development. +# +# Credentials are persisted in user.webauthn_credentials (JSON column, +# added in B-2.1). Each credential dict contains base64url id, public_key, +# sign_count (anti-cloning per WebAuthn §6.1.1), transports, name, and +# created_at. The 4 JSON endpoints (register/begin, register/finish, +# auth/begin, auth/finish) are CSRF-exempt at Flask-WTF level because +# CSRFProtect cannot read tokens from a JSON body without app-wide config. +# An X-CSRFToken header is still sent by the client as defence-in-depth. +# +# WEBAUTHN_RP_ID=dictia.ca +# WEBAUTHN_RP_NAME=DictIA +# WEBAUTHN_ORIGIN=https://dictia.ca diff --git a/src/api/auth.py b/src/api/auth.py index e62887a..d19ccc0 100644 --- a/src/api/auth.py +++ b/src/api/auth.py @@ -71,11 +71,37 @@ csrf = None limiter = None def init_auth_extensions(_bcrypt, _csrf, _limiter): - """Initialize extensions after app creation.""" + """Initialize extensions after app creation. + + Also wires the no-op @csrf_exempt decorator into a real + Flask-WTF csrf.exempt() call for any view function flagged with + _csrf_exempt=True. This is what actually disables CSRF on the + JSON-only WebAuthn / logout endpoints; without it the decorator is + just a label. + """ global bcrypt, csrf, limiter bcrypt = _bcrypt csrf = _csrf limiter = _limiter + if csrf is not None: + # Walk module-level functions defined in THIS file and exempt any + # tagged with _csrf_exempt=True via Flask-WTF's csrf.exempt(). + # We restrict to FunctionType (not Flask proxies / classes / modules) + # and to objects whose __module__ is this module to avoid touching + # imported names like `current_app` which raise on attribute access + # outside an app context. + import sys as _sys + from types import FunctionType as _FT + _mod = _sys.modules[__name__] + for _name, _obj in list(vars(_mod).items()): + if (isinstance(_obj, _FT) + and getattr(_obj, '__module__', None) == __name__ + and getattr(_obj, '_csrf_exempt', False)): + try: + csrf.exempt(_obj) + except Exception: + # Defensive: don't crash app boot on a single exemption fail + pass def rate_limit(limit_string): @@ -388,8 +414,12 @@ def login(): action='verification_required', show_resend=True) - # B-2.5: TOTP gate — defer login until 2nd factor verified - if user.totp_enabled: + # B-2.5/B-2.6: 2FA gate — defer login until 2nd factor verified. + # Fires when the user has TOTP enabled OR at least one passkey. + # Same pending session contract for both methods; the verify + # page offers whichever options the user has registered. + from src.auth.webauthn import has_passkeys + if user.totp_enabled or has_passkeys(user): session['pending_totp_user_id'] = user.id session['pending_totp_remember'] = bool(form.remember.data) next_page = request.args.get('next') @@ -974,6 +1004,8 @@ def totp_verify_login(): verify_totp_code, get_user_totp_secret, consume_recovery_code, ) + from src.auth.webauthn import has_passkeys + pending_user_id = session.get('pending_totp_user_id') pending_remember = bool(session.get('pending_totp_remember', False)) if not pending_user_id: @@ -981,25 +1013,29 @@ def totp_verify_login(): return redirect(url_for('auth.login')) user = db.session.get(User, pending_user_id) - if not user or not user.totp_enabled: - # Defensive: shouldn't happen if login flow is correct + # B-2.6: gate fires for either TOTP OR passkey enrolment, so the verify + # page must accept users with EITHER second factor (or both). Reject only + # if the user has neither (defensive — shouldn't happen if login is correct). + if not user or (not user.totp_enabled and not has_passkeys(user)): for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'): session.pop(k, None) flash('Erreur de session. Veuillez vous reconnecter.', 'danger') return redirect(url_for('auth.login')) recovery_remaining = len(user.totp_recovery_codes or []) + user_has_passkeys = has_passkeys(user) + user_has_totp = bool(user.totp_enabled) if request.method == 'POST': code = (request.form.get('code') or '').strip() recovery = (request.form.get('recovery_code') or '').strip() success = False - if code: + if code and user_has_totp: secret = get_user_totp_secret(user) if secret and verify_totp_code(secret, code): success = True - elif recovery: + elif recovery and user_has_totp: success = consume_recovery_code(user, recovery) if success: @@ -1020,6 +1056,8 @@ def totp_verify_login(): 'auth/totp_verify.html', title='Vérification 2FA', recovery_codes_remaining=len(user.totp_recovery_codes or []), + has_passkeys=user_has_passkeys, + has_totp=user_has_totp, error=( "Code invalide. Vérifiez votre application authenticator ou " "utilisez un code de récupération." @@ -1030,9 +1068,205 @@ def totp_verify_login(): 'auth/totp_verify.html', title='Vérification 2FA', recovery_codes_remaining=recovery_remaining, + has_passkeys=user_has_passkeys, + has_totp=user_has_totp, ) +# --- B-2.6: WebAuthn / Passkey routes --- +# +# CSRF strategy: the 4 JSON endpoints (begin/finish for register & auth) are +# decorated with @csrf_exempt because Flask-WTF's CSRFProtect cannot read a +# token from a JSON body without app-wide config changes. The X-CSRFToken +# header is still sent by the client (defense-in-depth) but not enforced. +# The form-POST delete route DOES enforce CSRF via the standard form token. +# Real CSRF exemption is wired in init_auth_extensions() — it walks the +# blueprint and calls csrf.exempt() on functions tagged with _csrf_exempt. + + +@auth_bp.route('/2fa/passkey/setup', methods=['GET']) +@login_required +def passkey_setup(): + """Page where the user manages and enrolls passkeys (post-login).""" + from src.auth.webauthn import list_user_credentials + return render_template( + 'auth/passkey_setup.html', + title='Gérer mes passkeys', + credentials=list_user_credentials(current_user), + ) + + +@auth_bp.route('/2fa/passkey/register/begin', methods=['POST']) +@login_required +@csrf_exempt +@rate_limit('10 per minute') +def passkey_register_begin(): + """Generate registration options for a NEW passkey (post-login enrolment). + + Stores the b64url-encoded challenge in the session under + 'webauthn_register_challenge'. The /finish route pops it. + """ + from src.auth.webauthn import begin_registration + options, challenge_b64 = begin_registration(current_user) + session['webauthn_register_challenge'] = challenge_b64 + return jsonify(options) + + +@auth_bp.route('/2fa/passkey/register/finish', methods=['POST']) +@login_required +@csrf_exempt +@rate_limit('10 per minute') +def passkey_register_finish(): + """Verify the authenticator response and persist the credential.""" + from src.auth.webauthn import finish_registration + challenge_b64 = session.pop('webauthn_register_challenge', None) + if not challenge_b64: + return jsonify({ + 'error': 'no_challenge', + 'message': 'Aucun défi WebAuthn en attente.', + }), 400 + payload = request.get_json(silent=True) or {} + label = (payload.get('label') or '').strip() + response_json = payload.get('response') + if not response_json: + return jsonify({ + 'error': 'missing_response', + 'message': "Réponse de l'authentificateur manquante.", + }), 400 + try: + cred = finish_registration( + current_user, response_json, challenge_b64, label=label + ) + except Exception as e: + current_app.logger.warning( + 'WebAuthn register failed for user %s: %s', current_user.id, e + ) + return jsonify({ + 'error': 'verification_failed', + 'message': 'Échec de la vérification.', + }), 400 + return jsonify({ + 'ok': True, + 'credential': { + 'id': cred['id'], + 'name': cred['name'], + 'created_at': cred['created_at'], + }, + }) + + +@auth_bp.route( + '/2fa/passkey/credentials//delete', methods=['POST'] +) +@login_required +@rate_limit('10 per minute') +def passkey_delete(credential_id): + """Delete one of the user's passkeys (form POST — CSRF enforced).""" + from src.auth.webauthn import delete_credential + if delete_credential(current_user, credential_id): + flash('Passkey supprimée.', 'success') + else: + flash('Passkey introuvable.', 'warning') + return redirect(url_for('auth.passkey_setup')) + + +@auth_bp.route('/2fa/passkey/auth/begin', methods=['POST']) +@csrf_exempt +@rate_limit('10 per minute') +def passkey_auth_begin(): + """Generate authentication options during /2fa/verify (login flow). + + Reads pending_totp_user_id from session — same handshake the TOTP gate + uses. No @login_required because the user is in the post-password, + pre-2FA limbo state. + """ + from src.auth.webauthn import begin_authentication + pending_user_id = session.get('pending_totp_user_id') + if not pending_user_id: + return jsonify({ + 'error': 'no_session', + 'message': 'Session expirée.', + }), 400 + user = db.session.get(User, pending_user_id) + if not user: + return jsonify({ + 'error': 'no_user', + 'message': 'Utilisateur introuvable.', + }), 400 + try: + options, challenge_b64 = begin_authentication(user) + except ValueError: + return jsonify({ + 'error': 'no_passkeys', + 'message': 'Aucune passkey enregistrée.', + }), 400 + session['webauthn_auth_challenge'] = challenge_b64 + return jsonify(options) + + +@auth_bp.route('/2fa/passkey/auth/finish', methods=['POST']) +@csrf_exempt +@rate_limit('10 per minute') +def passkey_auth_finish(): + """Verify the assertion. On success, finalise login + clear pending state. + + Mirrors the TOTP verify-success path: pops pending_totp_*, calls + login_user with the captured remember-me intent, audit_login, and + returns the safe redirect target as JSON for the client to follow. + """ + from src.auth.webauthn import finish_authentication + pending_user_id = session.get('pending_totp_user_id') + pending_remember = bool(session.get('pending_totp_remember', False)) + pending_next = session.get('pending_totp_next') + challenge_b64 = session.pop('webauthn_auth_challenge', None) + + if not pending_user_id or not challenge_b64: + return jsonify({ + 'error': 'no_session', + 'message': 'Session expirée.', + }), 400 + + user = db.session.get(User, pending_user_id) + if not user: + return jsonify({ + 'error': 'no_user', + 'message': 'Utilisateur introuvable.', + }), 400 + + payload = request.get_json(silent=True) or {} + response_json = payload.get('response') + if not response_json: + return jsonify({ + 'error': 'missing_response', + 'message': "Réponse de l'authentificateur manquante.", + }), 400 + + try: + finish_authentication(user, response_json, challenge_b64) + except Exception as e: + current_app.logger.warning( + 'WebAuthn auth failed for user %s: %s', user.id, e + ) + _user_hash = hashlib.sha256(str(user.id).encode()).hexdigest()[:16] + audit_failed_login(details={ + 'user_id_hash': _user_hash, 'reason': 'passkey_invalid', + }) + return jsonify({ + 'error': 'verification_failed', + 'message': 'Échec de la vérification.', + }), 400 + + for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'): + session.pop(k, None) + login_user(user, remember=pending_remember) + audit_login(user.id) + redirect_url = ( + pending_next if (pending_next and is_safe_url(pending_next)) + else url_for('recordings.index') + ) + return jsonify({'ok': True, 'redirect': redirect_url}) + + @auth_bp.route('/logout') @csrf_exempt def logout(): diff --git a/src/auth/webauthn.py b/src/auth/webauthn.py new file mode 100644 index 0000000..bc40f41 --- /dev/null +++ b/src/auth/webauthn.py @@ -0,0 +1,241 @@ +"""WebAuthn / Passkey service layer (B-2.6). + +Wraps the python-webauthn==2.5.2 library to provide: +- Registration options + verification (post-login enrollment) +- Authentication options + verification (used during /2fa/verify) +- Credential persistence in User.webauthn_credentials (JSON column) + +Each credential dict stored is: + { + 'id': str (base64url, <= 1023 chars by RFC), + 'public_key': str (base64url-encoded COSE key), + 'sign_count': int, + 'transports': list[str] (e.g., ['usb', 'nfc', 'ble', 'internal']), + 'name': str (user-supplied label, e.g., 'YubiKey 5C'), + 'created_at': str (ISO 8601 UTC), + } + +Anti-cloning: every successful authentication updates sign_count from the +authenticator's monotonic counter (RFC 8809). A regression would indicate +a cloned authenticator and is rejected by python-webauthn at verify time. +""" +import base64 +import json +import os +from datetime import datetime, timezone +from typing import Dict, List, Tuple + +from webauthn import ( + generate_registration_options, + generate_authentication_options, + verify_registration_response, + verify_authentication_response, + options_to_json, +) +from webauthn.helpers.structs import ( + AuthenticatorSelectionCriteria, + PublicKeyCredentialDescriptor, + ResidentKeyRequirement, + UserVerificationRequirement, +) + + +# --- RP / origin configuration ------------------------------------------------- + +def get_rp_id() -> str: + """Relying Party ID — host name only (no scheme, no port). + + For dictia.ca this is 'dictia.ca'. Browsers strictly enforce that the + RP ID matches the registrable domain of the page making the request, + so this MUST be configured per-environment. + """ + return os.environ.get('WEBAUTHN_RP_ID', 'localhost') + + +def get_rp_name() -> str: + """Display name shown in the user's authenticator UI.""" + return os.environ.get('WEBAUTHN_RP_NAME', 'DictIA') + + +def get_expected_origin() -> str: + """Full origin (scheme + host + optional port). + + MUST match window.location.origin on the client side. Browsers reject + auth if these do not match. Example: 'https://dictia.ca'. + """ + return os.environ.get('WEBAUTHN_ORIGIN', 'http://localhost:8899') + + +# --- base64url helpers (no padding, RFC 4648 §5) ------------------------------- + +def _b64url_encode(b: bytes) -> str: + return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii') + + +def _b64url_decode(s: str) -> bytes: + pad = '=' * (-len(s) % 4) + return base64.urlsafe_b64decode(s + pad) + + +# --- Stored credential accessors ----------------------------------------------- + +def list_user_credentials(user) -> List[Dict]: + """Return the user's stored credentials (or empty list if None).""" + return user.webauthn_credentials or [] + + +def has_passkeys(user) -> bool: + """True iff the user has at least one registered WebAuthn credential.""" + return bool(list_user_credentials(user)) + + +# --- Registration -------------------------------------------------------------- + +def begin_registration(user) -> Tuple[dict, str]: + """Generate registration options for a new credential. + + Returns (json-safe options dict, challenge_b64url). Caller stores the + challenge in session keyed by user_id; passes it back to + finish_registration() for verification. Existing credentials are listed + in `excludeCredentials` so the authenticator can refuse to re-enroll + something it has already provisioned for this user. + """ + existing = list_user_credentials(user) + exclude = [ + PublicKeyCredentialDescriptor(id=_b64url_decode(c['id'])) + for c in existing + ] + options = generate_registration_options( + rp_id=get_rp_id(), + rp_name=get_rp_name(), + user_id=str(user.id).encode('utf-8'), + user_name=user.email, + user_display_name=(user.name or user.username), + exclude_credentials=exclude, + authenticator_selection=AuthenticatorSelectionCriteria( + user_verification=UserVerificationRequirement.PREFERRED, + resident_key=ResidentKeyRequirement.PREFERRED, + ), + ) + challenge_b64 = _b64url_encode(options.challenge) + return json.loads(options_to_json(options)), challenge_b64 + + +def finish_registration(user, response_json: dict, expected_challenge_b64: str, + label: str = '') -> Dict: + """Verify the authenticator's registration response and persist the cred. + + Raises webauthn.exceptions.InvalidRegistrationResponse on failure. + Returns the credential dict that was added to user.webauthn_credentials. + """ + from src.database import db + expected_challenge = _b64url_decode(expected_challenge_b64) + verification = verify_registration_response( + credential=response_json, + expected_challenge=expected_challenge, + expected_origin=get_expected_origin(), + expected_rp_id=get_rp_id(), + require_user_verification=False, + ) + new_cred = { + 'id': _b64url_encode(verification.credential_id), + 'public_key': _b64url_encode(verification.credential_public_key), + 'sign_count': verification.sign_count, + 'transports': response_json.get('response', {}).get('transports', []), + 'name': (label or 'Passkey').strip()[:80], + 'created_at': datetime.now(timezone.utc).isoformat(timespec='seconds'), + } + creds = list(user.webauthn_credentials or []) + creds.append(new_cred) + user.webauthn_credentials = creds + db.session.commit() + return new_cred + + +# --- Authentication ------------------------------------------------------------ + +def begin_authentication(user) -> Tuple[dict, str]: + """Generate authentication options scoped to this user's credentials. + + Returns (json-safe options dict, challenge_b64url) — store the challenge + in session keyed by pending_totp_user_id during /2fa/verify. + + Raises ValueError if the user has no registered credentials. + """ + existing = list_user_credentials(user) + if not existing: + raise ValueError('User has no registered passkeys') + allow = [ + PublicKeyCredentialDescriptor(id=_b64url_decode(c['id'])) + for c in existing + ] + options = generate_authentication_options( + rp_id=get_rp_id(), + allow_credentials=allow, + user_verification=UserVerificationRequirement.PREFERRED, + ) + challenge_b64 = _b64url_encode(options.challenge) + return json.loads(options_to_json(options)), challenge_b64 + + +def finish_authentication(user, response_json: dict, + expected_challenge_b64: str) -> Dict: + """Verify the authenticator assertion, increment sign_count, return cred. + + Anti-cloning per WebAuthn §6.1.1: every successful auth must update + the stored sign_count from the new monotonic counter. python-webauthn + raises if the new counter is not strictly greater than the stored one + (when both are non-zero). + """ + from src.database import db + expected_challenge = _b64url_decode(expected_challenge_b64) + + cred_id_b64 = response_json.get('id') or response_json.get('rawId') + if not cred_id_b64: + raise ValueError('Missing credential id in authentication response') + # Take a fresh list with shallow-copied dicts so SQLAlchemy's JSON-column + # change detection sees a new value (mutating the existing list in place + # would not flag the row dirty under the default JSON type). + creds = [dict(c) for c in list_user_credentials(user)] + matched_idx = None + for i, c in enumerate(creds): + if c['id'] == cred_id_b64: + matched_idx = i + break + if matched_idx is None: + raise ValueError('Credential not registered for this user') + matched = creds[matched_idx] + + verification = verify_authentication_response( + credential=response_json, + expected_challenge=expected_challenge, + expected_origin=get_expected_origin(), + expected_rp_id=get_rp_id(), + credential_public_key=_b64url_decode(matched['public_key']), + credential_current_sign_count=matched['sign_count'], + require_user_verification=False, + ) + matched['sign_count'] = verification.new_sign_count + creds[matched_idx] = matched + user.webauthn_credentials = creds + db.session.commit() + return matched + + +# --- Deletion ------------------------------------------------------------------ + +def delete_credential(user, credential_id_b64: str) -> bool: + """Remove a credential by its base64url id. + + Returns True if removed, False if not found. Sets the JSON column to + None when the last credential is removed (matches the column's + nullable=True semantics). + """ + from src.database import db + creds = list_user_credentials(user) + new_creds = [c for c in creds if c['id'] != credential_id_b64] + if len(new_creds) == len(creds): + return False + user.webauthn_credentials = new_creds if new_creds else None + db.session.commit() + return True diff --git a/static/css/marketing.css b/static/css/marketing.css index ee6b3f5..4900007 100644 --- a/static/css/marketing.css +++ b/static/css/marketing.css @@ -2477,6 +2477,9 @@ .text-brand-navy { color: #060d1a; } + .text-brand-navy\/50 { + color: color-mix(in oklab, #060d1a 50%, transparent); + } .text-brand-navy\/60 { color: color-mix(in oklab, #060d1a 60%, transparent); } @@ -3628,6 +3631,13 @@ } } } + .hover\:text-red-900 { + &:hover { + @media (hover: hover) { + color: var(--color-red-900); + } + } + } .hover\:text-white { &:hover { @media (hover: hover) { @@ -3790,6 +3800,11 @@ outline-color: #0062ff; } } + .focus-visible\:outline-red-700 { + &:focus-visible { + outline-color: var(--color-red-700); + } + } .active\:scale-95 { &:active { --tw-scale-x: 95%; diff --git a/static/js/webauthn-client.js b/static/js/webauthn-client.js new file mode 100644 index 0000000..8bd20ca --- /dev/null +++ b/static/js/webauthn-client.js @@ -0,0 +1,178 @@ +/* DictIA WebAuthn client (B-2.6). + * No external dependencies. Wraps the navigator.credentials API and + * exchanges base64url-encoded payloads with the Flask backend at + * /2fa/passkey/* endpoints. + * + * Exports window.DictIAWebAuthn = { wireRegisterButton, wireAuthButton }. + */ +(function (global) { + 'use strict'; + + // --- base64url helpers (no padding) ----------------------------------- + + function b64urlToBuffer(s) { + if (!s) return new ArrayBuffer(0); + const pad = '='.repeat((4 - (s.length % 4)) % 4); + const b64 = (s + pad).replace(/-/g, '+').replace(/_/g, '/'); + const binary = atob(b64); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i); + return bytes.buffer; + } + + function bufferToB64url(buf) { + const bytes = new Uint8Array(buf); + let binary = ''; + for (let i = 0; i < bytes.byteLength; i++) binary += String.fromCharCode(bytes[i]); + return btoa(binary).replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, ''); + } + + // --- Options decoding (server sends b64url; navigator.credentials needs ArrayBuffer) + + function decodeRegistrationOptions(o) { + return Object.assign({}, o, { + challenge: b64urlToBuffer(o.challenge), + user: Object.assign({}, o.user, { id: b64urlToBuffer(o.user.id) }), + excludeCredentials: (o.excludeCredentials || []).map(function (c) { + return Object.assign({}, c, { id: b64urlToBuffer(c.id) }); + }), + }); + } + + function decodeAuthenticationOptions(o) { + return Object.assign({}, o, { + challenge: b64urlToBuffer(o.challenge), + allowCredentials: (o.allowCredentials || []).map(function (c) { + return Object.assign({}, c, { id: b64urlToBuffer(c.id) }); + }), + }); + } + + // --- Credential encoding (ArrayBuffer fields → b64url for JSON) ------- + + function encodeRegistrationCredential(cred) { + return { + id: cred.id, + rawId: bufferToB64url(cred.rawId), + type: cred.type, + response: { + clientDataJSON: bufferToB64url(cred.response.clientDataJSON), + attestationObject: bufferToB64url(cred.response.attestationObject), + transports: typeof cred.response.getTransports === 'function' + ? cred.response.getTransports() : [], + }, + clientExtensionResults: cred.getClientExtensionResults + ? cred.getClientExtensionResults() : {}, + }; + } + + function encodeAuthenticationAssertion(cred) { + return { + id: cred.id, + rawId: bufferToB64url(cred.rawId), + type: cred.type, + response: { + clientDataJSON: bufferToB64url(cred.response.clientDataJSON), + authenticatorData: bufferToB64url(cred.response.authenticatorData), + signature: bufferToB64url(cred.response.signature), + userHandle: cred.response.userHandle + ? bufferToB64url(cred.response.userHandle) : null, + }, + clientExtensionResults: cred.getClientExtensionResults + ? cred.getClientExtensionResults() : {}, + }; + } + + // --- HTTP helper ------------------------------------------------------ + + async function postJson(url, body, csrfToken) { + const headers = { 'Content-Type': 'application/json' }; + if (csrfToken) headers['X-CSRFToken'] = csrfToken; + const r = await fetch(url, { + method: 'POST', + headers: headers, + body: JSON.stringify(body || {}), + credentials: 'same-origin', + }); + let data = null; + try { data = await r.json(); } catch (_) {} + return { ok: r.ok, status: r.status, data: data }; + } + + // --- Public API: wire enrolment button -------------------------------- + + function wireRegisterButton(cfg) { + const btn = document.getElementById(cfg.buttonId); + const labelEl = cfg.labelInputId ? document.getElementById(cfg.labelInputId) : null; + const statusEl = cfg.statusElementId ? document.getElementById(cfg.statusElementId) : null; + if (!btn) return; + btn.addEventListener('click', async function () { + if (statusEl) statusEl.textContent = 'Préparation...'; + btn.disabled = true; + try { + if (!('credentials' in navigator) || !navigator.credentials.create) { + throw new Error('Votre navigateur ne supporte pas les passkeys.'); + } + const beginRes = await postJson(cfg.beginUrl, {}, cfg.csrfToken); + if (!beginRes.ok) { + throw new Error((beginRes.data && beginRes.data.message) || 'Erreur de préparation'); + } + const opts = decodeRegistrationOptions(beginRes.data); + if (statusEl) statusEl.textContent = 'Suivez les instructions de votre authentificateur...'; + const cred = await navigator.credentials.create({ publicKey: opts }); + const encoded = encodeRegistrationCredential(cred); + const finishRes = await postJson( + cfg.finishUrl, + { response: encoded, label: labelEl ? labelEl.value : '' }, + cfg.csrfToken + ); + if (!finishRes.ok) { + throw new Error((finishRes.data && finishRes.data.message) || 'Échec de la vérification'); + } + if (statusEl) statusEl.textContent = 'Passkey enregistrée. Rafraîchissement...'; + setTimeout(function () { window.location.reload(); }, 800); + } catch (e) { + if (statusEl) statusEl.textContent = 'Erreur : ' + (e.message || e); + btn.disabled = false; + } + }); + } + + // --- Public API: wire login button ------------------------------------ + + function wireAuthButton(cfg) { + const btn = document.getElementById(cfg.buttonId); + const statusEl = cfg.statusElementId ? document.getElementById(cfg.statusElementId) : null; + if (!btn) return; + btn.addEventListener('click', async function () { + if (statusEl) statusEl.textContent = 'Préparation...'; + btn.disabled = true; + try { + if (!('credentials' in navigator) || !navigator.credentials.get) { + throw new Error('Votre navigateur ne supporte pas les passkeys.'); + } + const beginRes = await postJson(cfg.beginUrl, {}, cfg.csrfToken); + if (!beginRes.ok) { + throw new Error((beginRes.data && beginRes.data.message) || 'Erreur de préparation'); + } + const opts = decodeAuthenticationOptions(beginRes.data); + if (statusEl) statusEl.textContent = 'Confirmez avec votre authentificateur...'; + const cred = await navigator.credentials.get({ publicKey: opts }); + const encoded = encodeAuthenticationAssertion(cred); + const finishRes = await postJson(cfg.finishUrl, { response: encoded }, cfg.csrfToken); + if (!finishRes.ok) { + throw new Error((finishRes.data && finishRes.data.message) || 'Échec de la vérification'); + } + window.location.assign((finishRes.data && finishRes.data.redirect) || '/'); + } catch (e) { + if (statusEl) statusEl.textContent = 'Erreur : ' + (e.message || e); + btn.disabled = false; + } + }); + } + + global.DictIAWebAuthn = { + wireRegisterButton: wireRegisterButton, + wireAuthButton: wireAuthButton, + }; +})(window); diff --git a/templates/auth/passkey_setup.html b/templates/auth/passkey_setup.html new file mode 100644 index 0000000..c0b7932 --- /dev/null +++ b/templates/auth/passkey_setup.html @@ -0,0 +1,79 @@ +{% extends 'marketing/base.html' %} + +{% block title %}Gérer mes passkeys — DictIA{% endblock %} +{% block description %}Gérez les passkeys de votre compte DictIA — second facteur sans mot de passe (FIDO2 / biométrie).{% endblock %} + +{% block content %} +
+
+

Mes passkeys

+

{{ "Une passkey est un second facteur sans mot de passe (clé matérielle YubiKey, biométrie de votre appareil, etc.). Conforme Loi 25." | safe }}

+ + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} + + {% endfor %} + {% endif %} + {% endwith %} + +

Passkeys enregistrées

+ {% if credentials %} +
    + {% for cred in credentials %} +
  • +
    +

    {{ cred.name }}

    +

    Ajoutée le {{ cred.created_at[:10] }}

    +
    +
    + + +
    +
  • + {% endfor %} +
+ {% else %} +

Aucune passkey enregistrée pour le moment.

+ {% endif %} + +

Ajouter une passkey

+
+ + + +

+
+ +

+ ← Retour à mon compte +

+
+
+{% endblock %} + +{% block scripts %} + + +{% endblock %} diff --git a/templates/auth/totp_verify.html b/templates/auth/totp_verify.html index 1bde85b..5f7aef1 100644 --- a/templates/auth/totp_verify.html +++ b/templates/auth/totp_verify.html @@ -27,6 +27,24 @@ {% endif %} + {# B-2.6: Passkey path (only if user has at least one registered passkey) #} + {% if has_passkeys %} +
+

Connexion par Passkey

+ +

+
+ + {% if has_totp %} + + {% endif %} + {% endif %} + + {% if has_totp %} {# Primary path: 6-digit TOTP code #}
@@ -66,6 +84,7 @@

{{ recovery_codes_remaining }} code{{ 's' if recovery_codes_remaining != 1 else '' }} de récupération restant{{ 's' if recovery_codes_remaining != 1 else '' }}.

+ {% endif %}

Annuler la connexion @@ -73,3 +92,20 @@ {% endblock %} + +{% block scripts %} +{% if has_passkeys %} + + +{% endif %} +{% endblock %} diff --git a/tests/_run_webauthn_passkey_windows.py b/tests/_run_webauthn_passkey_windows.py new file mode 100644 index 0000000..a9bdbbd --- /dev/null +++ b/tests/_run_webauthn_passkey_windows.py @@ -0,0 +1,77 @@ +"""Windows manual driver for tests/test_webauthn_passkey.py. + +src/init_db.py imports `fcntl`, which is POSIX-only. On Windows we stub it +before src.app gets imported, then run each test_* function and report. + +Run from the repo root: + py -3 tests/_run_webauthn_passkey_windows.py + +This script is local-dev only (not picked up by pytest collection). +""" +import os +import sys +import types +import traceback + +# 1) Stub fcntl BEFORE any import of src.* happens. +if 'fcntl' not in sys.modules: + fcntl_stub = types.ModuleType('fcntl') + fcntl_stub.LOCK_EX = 2 + fcntl_stub.LOCK_NB = 4 + fcntl_stub.LOCK_UN = 8 + fcntl_stub.LOCK_SH = 1 + fcntl_stub.flock = lambda *_args, **_kw: None + fcntl_stub.fcntl = lambda *_args, **_kw: 0 + sys.modules['fcntl'] = fcntl_stub + +# 2) Make repo root importable +HERE = os.path.dirname(os.path.abspath(__file__)) +REPO = os.path.dirname(HERE) +sys.path.insert(0, REPO) + +# 3) Set test config +os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') +os.environ.setdefault('SECRET_KEY', 'test-secret-key-webauthn') +os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('TRANSCRIPTION_BASE_URL', 'http://test-stub') +os.environ.setdefault('TRANSCRIPTION_API_KEY', 'test-stub') +os.environ.setdefault('RATELIMIT_ENABLED', 'false') +# Force UTF-8 stdout so src.app's emoji prints don't crash on cp1252 Windows. +try: + sys.stdout.reconfigure(encoding='utf-8', errors='replace') + sys.stderr.reconfigure(encoding='utf-8', errors='replace') +except Exception: + pass + +# 4) Import the test module and run every test_* function it defines +import importlib.util # noqa: E402 +spec = importlib.util.spec_from_file_location( + 'test_webauthn_passkey', + os.path.join(HERE, 'test_webauthn_passkey.py'), +) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + +tests = [(name, fn) for name, fn in vars(mod).items() + if name.startswith('test_') and callable(fn)] + +passed = 0 +failed = [] +for name, fn in tests: + try: + fn() + print(f' PASS {name}') + passed += 1 + except Exception as e: # noqa: BLE001 + print(f' FAIL {name}: {type(e).__name__}: {e}') + failed.append((name, traceback.format_exc())) + +total = len(tests) +print() +print(f'Result: {passed}/{total} passed, {len(failed)} failed') +if failed: + print('\n--- Failures ---\n') + for name, tb in failed: + print(f'### {name}\n{tb}\n') +sys.exit(0 if not failed else 1) diff --git a/tests/test_webauthn_passkey.py b/tests/test_webauthn_passkey.py new file mode 100644 index 0000000..aba84f8 --- /dev/null +++ b/tests/test_webauthn_passkey.py @@ -0,0 +1,681 @@ +"""Tests for B-2.6 — WebAuthn / Passkey support. + +Covers: + - Service layer helpers: b64url encode/decode, RP getters, list/has_passkeys. + - begin_registration returns json-safe options + challenge. + - finish_registration persists the credential dict in user.webauthn_credentials. + - begin_authentication raises if user has no credentials. + - finish_authentication increments sign_count (anti-cloning) + rejects unknown id. + - delete_credential removes by id. + - Routes: register/begin, register/finish (success + missing challenge + invalid), + auth/begin (with/without pending session), auth/finish (success + invalid), + delete (form POST), and the login gate that defers when has_passkeys. + +Mocks the python-webauthn library functions so the tests don't need a real +authenticator. The library is installed (webauthn==2.5.2) so imports work, +but the verify_* functions are patched. + +Note: pytest cannot collect this file on Windows native because src/init_db.py +imports `fcntl` (POSIX-only). Use tests/_run_webauthn_passkey_windows.py. +""" +import os +import sys +from types import SimpleNamespace +from unittest.mock import patch + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') +os.environ.setdefault('SECRET_KEY', 'test-secret-key-webauthn') +os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false') +os.environ.setdefault('RATELIMIT_ENABLED', 'false') + +from src.app import app, db, bcrypt # noqa: E402 +from src.models.user import User # noqa: E402 +from src.auth import webauthn as wa # noqa: E402 + + +def _disable_csrf(): + app.config['WTF_CSRF_ENABLED'] = False + + +def _make_user(email='passkey@example.qc.ca', password='Password!123', + username=None, webauthn_credentials=None, totp_enabled=False, + totp_secret_encrypted=None, totp_recovery_codes=None): + """Create a User row with bcrypt-hashed password. Returns the User.""" + hashed = bcrypt.generate_password_hash(password).decode('utf-8') + u = User( + username=username or email.split('@', 1)[0][:20], + email=email, + password=hashed, + email_verified=True, + webauthn_credentials=webauthn_credentials, + totp_enabled=totp_enabled, + totp_secret_encrypted=totp_secret_encrypted, + totp_recovery_codes=totp_recovery_codes, + ) + db.session.add(u) + db.session.commit() + return u + + +def _login_session(client, user): + """Mark a Flask-Login session as logged in for `user`.""" + with client.session_transaction() as sess: + sess['_user_id'] = str(user.id) + sess['_fresh'] = True + + +# ---------------------------------------------------------------------- +# 1-3. Helpers + RP config +# ---------------------------------------------------------------------- + +def test_b64url_encode_decode_round_trips(): + """bytes → str → bytes round-trips for various lengths (incl. padding edge).""" + samples = [b'', b'A', b'AB', b'ABC', b'ABCD', b'\x00\xff\x10' * 20] + for raw in samples: + encoded = wa._b64url_encode(raw) + assert isinstance(encoded, str) + assert '=' not in encoded # no padding + assert wa._b64url_decode(encoded) == raw + + +def test_get_rp_id_default(): + """get_rp_id returns 'localhost' when WEBAUTHN_RP_ID not set.""" + saved = os.environ.pop('WEBAUTHN_RP_ID', None) + try: + assert wa.get_rp_id() == 'localhost' + finally: + if saved is not None: + os.environ['WEBAUTHN_RP_ID'] = saved + + +def test_get_rp_id_from_env(): + """get_rp_id returns the env var value when set.""" + saved = os.environ.get('WEBAUTHN_RP_ID') + try: + os.environ['WEBAUTHN_RP_ID'] = 'dictia.ca' + assert wa.get_rp_id() == 'dictia.ca' + finally: + if saved is None: + os.environ.pop('WEBAUTHN_RP_ID', None) + else: + os.environ['WEBAUTHN_RP_ID'] = saved + + +# ---------------------------------------------------------------------- +# 4-5. list_user_credentials / has_passkeys +# ---------------------------------------------------------------------- + +def test_list_user_credentials_empty(): + """Returns [] when webauthn_credentials is None.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='listempty@example.qc.ca') + assert wa.list_user_credentials(user) == [] + assert wa.has_passkeys(user) is False + finally: + db.session.rollback() + db.drop_all() + + +def test_has_passkeys_true_after_credential_added(): + """has_passkeys returns True after a credential dict is appended.""" + with app.app_context(): + db.create_all() + try: + user = _make_user( + email='hasone@example.qc.ca', + webauthn_credentials=[{ + 'id': 'AQID', 'public_key': 'oLDA', + 'sign_count': 0, 'transports': ['usb'], + 'name': 'Test', 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + assert wa.has_passkeys(user) is True + assert len(wa.list_user_credentials(user)) == 1 + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 6. begin_registration +# ---------------------------------------------------------------------- + +def test_begin_registration_returns_options_and_challenge(): + """Mocks generate_registration_options; asserts json-safe dict + challenge str.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='regbegin@example.qc.ca') + fake_challenge = b'\x01\x02\x03\x04challenge-bytes\xff' + with patch('src.auth.webauthn.generate_registration_options') as gen, \ + patch('src.auth.webauthn.options_to_json') as to_json: + gen.return_value = SimpleNamespace(challenge=fake_challenge) + to_json.return_value = ( + '{"challenge": "AQIDBGNoYWxsZW5nZS1ieXRlc_8", ' + '"rp": {"name": "DictIA", "id": "localhost"}, ' + '"user": {"id": "MQ", "name": "x", "displayName": "x"}}' + ) + opts, challenge_b64 = wa.begin_registration(user) + assert isinstance(opts, dict) + assert 'challenge' in opts + assert 'rp' in opts + assert isinstance(challenge_b64, str) + assert wa._b64url_decode(challenge_b64) == fake_challenge + gen.assert_called_once() + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 7. finish_registration +# ---------------------------------------------------------------------- + +def test_finish_registration_persists_credential(): + """Mocks verify_registration_response; asserts credential dict appended.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='regfin@example.qc.ca') + challenge = b'challenge-bytes-xyz' + challenge_b64 = wa._b64url_encode(challenge) + mock_verification = SimpleNamespace( + credential_id=b'\xaa\xbb\xcc', + credential_public_key=b'\x01\x02\x03', + sign_count=0, + ) + response_json = { + 'id': wa._b64url_encode(b'\xaa\xbb\xcc'), + 'rawId': wa._b64url_encode(b'\xaa\xbb\xcc'), + 'type': 'public-key', + 'response': { + 'clientDataJSON': 'aaa', + 'attestationObject': 'bbb', + 'transports': ['usb', 'nfc'], + }, + } + with patch('src.auth.webauthn.verify_registration_response', + return_value=mock_verification) as ver: + cred = wa.finish_registration( + user, response_json, challenge_b64, label='YubiKey 5C' + ) + ver.assert_called_once() + assert cred['id'] == wa._b64url_encode(b'\xaa\xbb\xcc') + assert cred['public_key'] == wa._b64url_encode(b'\x01\x02\x03') + assert cred['sign_count'] == 0 + assert cred['transports'] == ['usb', 'nfc'] + assert cred['name'] == 'YubiKey 5C' + assert 'created_at' in cred + db.session.refresh(user) + assert len(user.webauthn_credentials) == 1 + assert user.webauthn_credentials[0]['id'] == cred['id'] + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 8. begin_authentication +# ---------------------------------------------------------------------- + +def test_begin_authentication_raises_when_no_credentials(): + """ValueError when user has no registered passkeys.""" + with app.app_context(): + db.create_all() + try: + user = _make_user(email='noauthcreds@example.qc.ca') + try: + wa.begin_authentication(user) + raise AssertionError('Expected ValueError') + except ValueError: + pass + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 9-10. finish_authentication +# ---------------------------------------------------------------------- + +def test_finish_authentication_increments_sign_count(): + """Pre-set 1 cred sign_count=5; mock verify returns 6; assert persisted.""" + with app.app_context(): + db.create_all() + try: + cred_id = wa._b64url_encode(b'\xde\xad\xbe\xef') + user = _make_user( + email='signinc@example.qc.ca', + webauthn_credentials=[{ + 'id': cred_id, + 'public_key': wa._b64url_encode(b'pub'), + 'sign_count': 5, + 'transports': ['internal'], + 'name': 'Touch ID', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + challenge_b64 = wa._b64url_encode(b'auth-challenge') + mock_ver = SimpleNamespace(new_sign_count=6) + response_json = {'id': cred_id, 'type': 'public-key', 'response': {}} + with patch('src.auth.webauthn.verify_authentication_response', + return_value=mock_ver): + matched = wa.finish_authentication( + user, response_json, challenge_b64 + ) + assert matched['id'] == cred_id + assert matched['sign_count'] == 6 + db.session.refresh(user) + assert user.webauthn_credentials[0]['sign_count'] == 6 + finally: + db.session.rollback() + db.drop_all() + + +def test_finish_authentication_rejects_unknown_credential_id(): + """Response.id not in user's credentials → ValueError.""" + with app.app_context(): + db.create_all() + try: + user = _make_user( + email='unknownid@example.qc.ca', + webauthn_credentials=[{ + 'id': 'KNOWN', 'public_key': 'pub', 'sign_count': 0, + 'transports': [], 'name': 'A', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + response_json = {'id': 'NOT-REGISTERED', 'response': {}} + try: + wa.finish_authentication( + user, response_json, wa._b64url_encode(b'c') + ) + raise AssertionError('Expected ValueError') + except ValueError: + pass + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 11-12. delete_credential +# ---------------------------------------------------------------------- + +def test_delete_credential_removes_by_id(): + """Pre-set 2 creds; delete one; remaining list shrinks.""" + with app.app_context(): + db.create_all() + try: + user = _make_user( + email='delok@example.qc.ca', + webauthn_credentials=[ + {'id': 'A', 'public_key': 'p1', 'sign_count': 0, + 'transports': [], 'name': 'A', + 'created_at': '2026-04-27T00:00:00+00:00'}, + {'id': 'B', 'public_key': 'p2', 'sign_count': 0, + 'transports': [], 'name': 'B', + 'created_at': '2026-04-27T00:00:00+00:00'}, + ], + ) + assert wa.delete_credential(user, 'A') is True + db.session.refresh(user) + assert len(user.webauthn_credentials) == 1 + assert user.webauthn_credentials[0]['id'] == 'B' + finally: + db.session.rollback() + db.drop_all() + + +def test_delete_credential_returns_false_for_unknown_id(): + """Bogus id → False, list unchanged.""" + with app.app_context(): + db.create_all() + try: + user = _make_user( + email='delno@example.qc.ca', + webauthn_credentials=[ + {'id': 'X', 'public_key': 'p', 'sign_count': 0, + 'transports': [], 'name': 'X', + 'created_at': '2026-04-27T00:00:00+00:00'}, + ], + ) + assert wa.delete_credential(user, 'NOPE') is False + db.session.refresh(user) + assert len(user.webauthn_credentials) == 1 + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 13-16. /2fa/passkey/register routes +# ---------------------------------------------------------------------- + +def test_passkey_register_begin_returns_options_for_logged_in_user(): + """POST /2fa/passkey/register/begin → JSON options + session challenge set.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='regbeginrt@example.qc.ca') + fake_challenge = b'route-begin-challenge' + with app.test_client() as client: + _login_session(client, user) + with patch('src.auth.webauthn.generate_registration_options') as gen, \ + patch('src.auth.webauthn.options_to_json') as to_json: + gen.return_value = SimpleNamespace(challenge=fake_challenge) + to_json.return_value = '{"challenge": "x", "rp": {"id": "localhost"}}' + resp = client.post('/2fa/passkey/register/begin', json={}) + assert resp.status_code == 200 + data = resp.get_json() + assert 'challenge' in data + assert 'rp' in data + with client.session_transaction() as sess: + assert sess.get('webauthn_register_challenge') == \ + wa._b64url_encode(fake_challenge) + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_register_finish_persists_credential(): + """POST /2fa/passkey/register/finish with challenge in session + mocked verify.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='regfinrt@example.qc.ca') + challenge = b'finish-route-challenge' + challenge_b64 = wa._b64url_encode(challenge) + mock_ver = SimpleNamespace( + credential_id=b'\x10\x20\x30', + credential_public_key=b'\x40\x50', + sign_count=0, + ) + with app.test_client() as client: + _login_session(client, user) + with client.session_transaction() as sess: + sess['webauthn_register_challenge'] = challenge_b64 + with patch('src.auth.webauthn.verify_registration_response', + return_value=mock_ver): + resp = client.post('/2fa/passkey/register/finish', json={ + 'label': 'My Key', + 'response': { + 'id': wa._b64url_encode(b'\x10\x20\x30'), + 'rawId': wa._b64url_encode(b'\x10\x20\x30'), + 'type': 'public-key', + 'response': { + 'clientDataJSON': 'aa', 'attestationObject': 'bb', + 'transports': ['usb'], + }, + }, + }) + assert resp.status_code == 200 + data = resp.get_json() + assert data['ok'] is True + assert data['credential']['name'] == 'My Key' + db.session.refresh(user) + assert len(user.webauthn_credentials) == 1 + # Challenge consumed + with client.session_transaction() as sess: + assert 'webauthn_register_challenge' not in sess + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_register_finish_rejects_without_challenge(): + """POST /2fa/passkey/register/finish without session challenge → 400 FR.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='regnoch@example.qc.ca') + with app.test_client() as client: + _login_session(client, user) + resp = client.post('/2fa/passkey/register/finish', json={ + 'response': {'id': 'x', 'response': {}}, + }) + assert resp.status_code == 400 + data = resp.get_json() + assert data['error'] == 'no_challenge' + # French message + assert 'défi' in data['message'].lower() or 'attente' in data['message'].lower() + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_register_finish_rejects_invalid_response(): + """verify_registration_response raises → 400 with FR message.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user(email='regbad@example.qc.ca') + challenge_b64 = wa._b64url_encode(b'c') + with app.test_client() as client: + _login_session(client, user) + with client.session_transaction() as sess: + sess['webauthn_register_challenge'] = challenge_b64 + with patch('src.auth.webauthn.verify_registration_response', + side_effect=Exception('bad')): + resp = client.post('/2fa/passkey/register/finish', json={ + 'response': {'id': 'x', 'response': {}}, + }) + assert resp.status_code == 400 + data = resp.get_json() + assert data['error'] == 'verification_failed' + assert 'vérification' in data['message'].lower() or \ + 'échec' in data['message'].lower() + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 17-20. /2fa/passkey/auth routes +# ---------------------------------------------------------------------- + +def test_passkey_auth_begin_uses_pending_session_user(): + """POST /2fa/passkey/auth/begin with pending_totp_user_id → options.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + cred_id = wa._b64url_encode(b'authcred') + user = _make_user( + email='authbegin@example.qc.ca', + webauthn_credentials=[{ + 'id': cred_id, 'public_key': wa._b64url_encode(b'pub'), + 'sign_count': 0, 'transports': [], 'name': 'A', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + fake_challenge = b'auth-route-challenge' + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + with patch('src.auth.webauthn.generate_authentication_options') as gen, \ + patch('src.auth.webauthn.options_to_json') as to_json: + gen.return_value = SimpleNamespace(challenge=fake_challenge) + to_json.return_value = '{"challenge": "x", "rpId": "localhost"}' + resp = client.post('/2fa/passkey/auth/begin', json={}) + assert resp.status_code == 200 + data = resp.get_json() + assert 'challenge' in data + with client.session_transaction() as sess: + assert sess.get('webauthn_auth_challenge') == \ + wa._b64url_encode(fake_challenge) + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_auth_begin_returns_400_without_pending_session(): + """No pending_totp_user_id → 400 with FR error.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + with app.test_client() as client: + resp = client.post('/2fa/passkey/auth/begin', json={}) + assert resp.status_code == 400 + data = resp.get_json() + assert data['error'] == 'no_session' + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_auth_finish_logs_in_user_and_returns_redirect(): + """Successful auth → ok+redirect, user logged in via session.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + cred_id = wa._b64url_encode(b'\xaa\xbb') + user = _make_user( + email='authfin@example.qc.ca', + webauthn_credentials=[{ + 'id': cred_id, 'public_key': wa._b64url_encode(b'p'), + 'sign_count': 3, 'transports': [], 'name': 'X', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + challenge_b64 = wa._b64url_encode(b'authch') + mock_ver = SimpleNamespace(new_sign_count=4) + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + sess['webauthn_auth_challenge'] = challenge_b64 + sess['pending_totp_remember'] = True + with patch('src.auth.webauthn.verify_authentication_response', + return_value=mock_ver): + resp = client.post('/2fa/passkey/auth/finish', json={ + 'response': {'id': cred_id, 'response': {}}, + }) + assert resp.status_code == 200 + data = resp.get_json() + assert data['ok'] is True + assert 'redirect' in data + with client.session_transaction() as sess: + assert sess.get('_user_id') == str(user.id) + assert 'pending_totp_user_id' not in sess + assert 'webauthn_auth_challenge' not in sess + # sign_count incremented in DB + db.session.refresh(user) + assert user.webauthn_credentials[0]['sign_count'] == 4 + finally: + db.session.rollback() + db.drop_all() + + +def test_passkey_auth_finish_rejects_invalid(): + """verify raises → 400 verification_failed; user NOT logged in.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + cred_id = wa._b64url_encode(b'\x99') + user = _make_user( + email='authbad@example.qc.ca', + webauthn_credentials=[{ + 'id': cred_id, 'public_key': wa._b64url_encode(b'p'), + 'sign_count': 0, 'transports': [], 'name': 'X', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + challenge_b64 = wa._b64url_encode(b'c') + with app.test_client() as client: + with client.session_transaction() as sess: + sess['pending_totp_user_id'] = user.id + sess['webauthn_auth_challenge'] = challenge_b64 + with patch('src.auth.webauthn.verify_authentication_response', + side_effect=Exception('nope')): + resp = client.post('/2fa/passkey/auth/finish', json={ + 'response': {'id': cred_id, 'response': {}}, + }) + assert resp.status_code == 400 + data = resp.get_json() + assert data['error'] == 'verification_failed' + with client.session_transaction() as sess: + assert '_user_id' not in sess + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 21. Delete route (form POST) +# ---------------------------------------------------------------------- + +def test_passkey_delete_route_removes_credential(): + """POST /2fa/passkey/credentials//delete removes it.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + user = _make_user( + email='delroute@example.qc.ca', + webauthn_credentials=[{ + 'id': 'TARGET', 'public_key': 'p', 'sign_count': 0, + 'transports': [], 'name': 'Target', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + with app.test_client() as client: + _login_session(client, user) + resp = client.post( + '/2fa/passkey/credentials/TARGET/delete', + follow_redirects=False, + ) + assert resp.status_code == 302 + db.session.refresh(user) + assert (user.webauthn_credentials or []) == [] + finally: + db.session.rollback() + db.drop_all() + + +# ---------------------------------------------------------------------- +# 22. Login gate extension +# ---------------------------------------------------------------------- + +def test_login_gate_redirects_to_2fa_when_user_has_passkey(): + """Login with valid password + has_passkeys → 302 to /2fa/verify.""" + with app.app_context(): + _disable_csrf() + db.create_all() + try: + password = 'CorrectHorseBattery!42' + user = _make_user( + email='gatepasskey@example.qc.ca', password=password, + totp_enabled=False, + webauthn_credentials=[{ + 'id': 'GATE', 'public_key': 'p', 'sign_count': 0, + 'transports': [], 'name': 'Gate', + 'created_at': '2026-04-27T00:00:00+00:00', + }], + ) + with app.test_client() as client: + resp = client.post('/login', data={ + 'email': 'gatepasskey@example.qc.ca', + 'password': password, + 'remember': 'y', + }) + assert resp.status_code == 302 + assert '/2fa/verify' in resp.headers['Location'] + with client.session_transaction() as sess: + assert sess.get('pending_totp_user_id') == user.id + assert sess.get('pending_totp_remember') is True + # NOT logged in yet + assert '_user_id' not in sess + finally: + db.session.rollback() + db.drop_all()