feat(auth): B-2.6 WebAuthn / Passkey support (FIDO2 + biometric 2FA)

Adds phishing-resistant 2nd factor via FIDO2 hardware keys (YubiKey etc.)
and device biometrics (Touch ID, Windows Hello, etc.). Reuses the existing
B-2.5 TOTP gate so a passkey is a 3rd valid option on /2fa/verify, alongside
TOTP code and recovery code. Post-login enrolment lives at /2fa/passkey/setup.

Wraps python-webauthn==2.5.2 in a thin service layer (src/auth/webauthn.py)
that persists credentials in the existing User.webauthn_credentials JSON
column (added in B-2.1 — no schema change). Each credential dict carries
id, public_key, sign_count, transports, name, and created_at. sign_count is
updated after every successful authentication for WebAuthn anti-cloning
(§6.1.1).

Backend: 6 new auth routes (passkey_setup, register/begin, register/finish,
delete, auth/begin, auth/finish). The 4 JSON endpoints are CSRF-exempt at
Flask-WTF level because CSRFProtect cannot read tokens from a JSON body
without app-wide config; the X-CSRFToken header is still sent as
defence-in-depth. The form-POST delete route DOES enforce CSRF. The
@csrf_exempt decorator was previously a no-op label; init_auth_extensions
now walks module-level functions and applies real csrf.exempt() to any
flagged with _csrf_exempt=True.

Login gate now fires when the user has TOTP enabled OR at least one
passkey, and totp_verify_login passes has_passkeys + has_totp flags so the
template can show only the relevant sections.

Frontend: templates/auth/totp_verify.html updated IN PLACE with a passkey
button section (above TOTP) and an "ou" divider. New
templates/auth/passkey_setup.html for managing/enrolling passkeys. New
static/js/webauthn-client.js (no external deps, ES2020) wraps
navigator.credentials and exchanges base64url payloads with the backend.
Tailwind CSS rebuilt.

Tests: 22 new tests in tests/test_webauthn_passkey.py covering the service
layer (b64url helpers, RP config, list/has, begin/finish for both
registration and authentication, delete) and the route flow (CSRF-exempt
JSON endpoints, login gate redirection, sign_count anti-cloning
persistence). Mocks python-webauthn's verify_* functions so tests run
without a real authenticator. Windows manual driver follows the existing
no-conftest pattern.

Self-review: 22/22 new tests pass; 21/21 prior TOTP, 16/16 email,
21/21 OAuth tests still pass (no regression).

Env: config/env.oauth.example documents WEBAUTHN_RP_ID, WEBAUTHN_RP_NAME,
WEBAUTHN_ORIGIN with full deployment notes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Allison
2026-04-28 00:27:09 -04:00
parent aa269c5bc0
commit b8fa321edd
9 changed files with 1580 additions and 7 deletions

View File

@@ -71,11 +71,37 @@ csrf = None
limiter = None
def init_auth_extensions(_bcrypt, _csrf, _limiter):
"""Initialize extensions after app creation."""
"""Initialize extensions after app creation.
Also wires the no-op @csrf_exempt decorator into a real
Flask-WTF csrf.exempt() call for any view function flagged with
_csrf_exempt=True. This is what actually disables CSRF on the
JSON-only WebAuthn / logout endpoints; without it the decorator is
just a label.
"""
global bcrypt, csrf, limiter
bcrypt = _bcrypt
csrf = _csrf
limiter = _limiter
if csrf is not None:
# Walk module-level functions defined in THIS file and exempt any
# tagged with _csrf_exempt=True via Flask-WTF's csrf.exempt().
# We restrict to FunctionType (not Flask proxies / classes / modules)
# and to objects whose __module__ is this module to avoid touching
# imported names like `current_app` which raise on attribute access
# outside an app context.
import sys as _sys
from types import FunctionType as _FT
_mod = _sys.modules[__name__]
for _name, _obj in list(vars(_mod).items()):
if (isinstance(_obj, _FT)
and getattr(_obj, '__module__', None) == __name__
and getattr(_obj, '_csrf_exempt', False)):
try:
csrf.exempt(_obj)
except Exception:
# Defensive: don't crash app boot on a single exemption fail
pass
def rate_limit(limit_string):
@@ -388,8 +414,12 @@ def login():
action='verification_required',
show_resend=True)
# B-2.5: TOTP gate — defer login until 2nd factor verified
if user.totp_enabled:
# B-2.5/B-2.6: 2FA gate — defer login until 2nd factor verified.
# Fires when the user has TOTP enabled OR at least one passkey.
# Same pending session contract for both methods; the verify
# page offers whichever options the user has registered.
from src.auth.webauthn import has_passkeys
if user.totp_enabled or has_passkeys(user):
session['pending_totp_user_id'] = user.id
session['pending_totp_remember'] = bool(form.remember.data)
next_page = request.args.get('next')
@@ -974,6 +1004,8 @@ def totp_verify_login():
verify_totp_code, get_user_totp_secret, consume_recovery_code,
)
from src.auth.webauthn import has_passkeys
pending_user_id = session.get('pending_totp_user_id')
pending_remember = bool(session.get('pending_totp_remember', False))
if not pending_user_id:
@@ -981,25 +1013,29 @@ def totp_verify_login():
return redirect(url_for('auth.login'))
user = db.session.get(User, pending_user_id)
if not user or not user.totp_enabled:
# Defensive: shouldn't happen if login flow is correct
# B-2.6: gate fires for either TOTP OR passkey enrolment, so the verify
# page must accept users with EITHER second factor (or both). Reject only
# if the user has neither (defensive — shouldn't happen if login is correct).
if not user or (not user.totp_enabled and not has_passkeys(user)):
for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'):
session.pop(k, None)
flash('Erreur de session. Veuillez vous reconnecter.', 'danger')
return redirect(url_for('auth.login'))
recovery_remaining = len(user.totp_recovery_codes or [])
user_has_passkeys = has_passkeys(user)
user_has_totp = bool(user.totp_enabled)
if request.method == 'POST':
code = (request.form.get('code') or '').strip()
recovery = (request.form.get('recovery_code') or '').strip()
success = False
if code:
if code and user_has_totp:
secret = get_user_totp_secret(user)
if secret and verify_totp_code(secret, code):
success = True
elif recovery:
elif recovery and user_has_totp:
success = consume_recovery_code(user, recovery)
if success:
@@ -1020,6 +1056,8 @@ def totp_verify_login():
'auth/totp_verify.html',
title='Vérification 2FA',
recovery_codes_remaining=len(user.totp_recovery_codes or []),
has_passkeys=user_has_passkeys,
has_totp=user_has_totp,
error=(
"Code invalide. Vérifiez votre application authenticator ou "
"utilisez un code de récupération."
@@ -1030,9 +1068,205 @@ def totp_verify_login():
'auth/totp_verify.html',
title='Vérification 2FA',
recovery_codes_remaining=recovery_remaining,
has_passkeys=user_has_passkeys,
has_totp=user_has_totp,
)
# --- B-2.6: WebAuthn / Passkey routes ---
#
# CSRF strategy: the 4 JSON endpoints (begin/finish for register & auth) are
# decorated with @csrf_exempt because Flask-WTF's CSRFProtect cannot read a
# token from a JSON body without app-wide config changes. The X-CSRFToken
# header is still sent by the client (defense-in-depth) but not enforced.
# The form-POST delete route DOES enforce CSRF via the standard form token.
# Real CSRF exemption is wired in init_auth_extensions() — it walks the
# blueprint and calls csrf.exempt() on functions tagged with _csrf_exempt.
@auth_bp.route('/2fa/passkey/setup', methods=['GET'])
@login_required
def passkey_setup():
"""Page where the user manages and enrolls passkeys (post-login)."""
from src.auth.webauthn import list_user_credentials
return render_template(
'auth/passkey_setup.html',
title='Gérer mes passkeys',
credentials=list_user_credentials(current_user),
)
@auth_bp.route('/2fa/passkey/register/begin', methods=['POST'])
@login_required
@csrf_exempt
@rate_limit('10 per minute')
def passkey_register_begin():
"""Generate registration options for a NEW passkey (post-login enrolment).
Stores the b64url-encoded challenge in the session under
'webauthn_register_challenge'. The /finish route pops it.
"""
from src.auth.webauthn import begin_registration
options, challenge_b64 = begin_registration(current_user)
session['webauthn_register_challenge'] = challenge_b64
return jsonify(options)
@auth_bp.route('/2fa/passkey/register/finish', methods=['POST'])
@login_required
@csrf_exempt
@rate_limit('10 per minute')
def passkey_register_finish():
"""Verify the authenticator response and persist the credential."""
from src.auth.webauthn import finish_registration
challenge_b64 = session.pop('webauthn_register_challenge', None)
if not challenge_b64:
return jsonify({
'error': 'no_challenge',
'message': 'Aucun défi WebAuthn en attente.',
}), 400
payload = request.get_json(silent=True) or {}
label = (payload.get('label') or '').strip()
response_json = payload.get('response')
if not response_json:
return jsonify({
'error': 'missing_response',
'message': "Réponse de l'authentificateur manquante.",
}), 400
try:
cred = finish_registration(
current_user, response_json, challenge_b64, label=label
)
except Exception as e:
current_app.logger.warning(
'WebAuthn register failed for user %s: %s', current_user.id, e
)
return jsonify({
'error': 'verification_failed',
'message': 'Échec de la vérification.',
}), 400
return jsonify({
'ok': True,
'credential': {
'id': cred['id'],
'name': cred['name'],
'created_at': cred['created_at'],
},
})
@auth_bp.route(
'/2fa/passkey/credentials/<credential_id>/delete', methods=['POST']
)
@login_required
@rate_limit('10 per minute')
def passkey_delete(credential_id):
"""Delete one of the user's passkeys (form POST — CSRF enforced)."""
from src.auth.webauthn import delete_credential
if delete_credential(current_user, credential_id):
flash('Passkey supprimée.', 'success')
else:
flash('Passkey introuvable.', 'warning')
return redirect(url_for('auth.passkey_setup'))
@auth_bp.route('/2fa/passkey/auth/begin', methods=['POST'])
@csrf_exempt
@rate_limit('10 per minute')
def passkey_auth_begin():
"""Generate authentication options during /2fa/verify (login flow).
Reads pending_totp_user_id from session — same handshake the TOTP gate
uses. No @login_required because the user is in the post-password,
pre-2FA limbo state.
"""
from src.auth.webauthn import begin_authentication
pending_user_id = session.get('pending_totp_user_id')
if not pending_user_id:
return jsonify({
'error': 'no_session',
'message': 'Session expirée.',
}), 400
user = db.session.get(User, pending_user_id)
if not user:
return jsonify({
'error': 'no_user',
'message': 'Utilisateur introuvable.',
}), 400
try:
options, challenge_b64 = begin_authentication(user)
except ValueError:
return jsonify({
'error': 'no_passkeys',
'message': 'Aucune passkey enregistrée.',
}), 400
session['webauthn_auth_challenge'] = challenge_b64
return jsonify(options)
@auth_bp.route('/2fa/passkey/auth/finish', methods=['POST'])
@csrf_exempt
@rate_limit('10 per minute')
def passkey_auth_finish():
"""Verify the assertion. On success, finalise login + clear pending state.
Mirrors the TOTP verify-success path: pops pending_totp_*, calls
login_user with the captured remember-me intent, audit_login, and
returns the safe redirect target as JSON for the client to follow.
"""
from src.auth.webauthn import finish_authentication
pending_user_id = session.get('pending_totp_user_id')
pending_remember = bool(session.get('pending_totp_remember', False))
pending_next = session.get('pending_totp_next')
challenge_b64 = session.pop('webauthn_auth_challenge', None)
if not pending_user_id or not challenge_b64:
return jsonify({
'error': 'no_session',
'message': 'Session expirée.',
}), 400
user = db.session.get(User, pending_user_id)
if not user:
return jsonify({
'error': 'no_user',
'message': 'Utilisateur introuvable.',
}), 400
payload = request.get_json(silent=True) or {}
response_json = payload.get('response')
if not response_json:
return jsonify({
'error': 'missing_response',
'message': "Réponse de l'authentificateur manquante.",
}), 400
try:
finish_authentication(user, response_json, challenge_b64)
except Exception as e:
current_app.logger.warning(
'WebAuthn auth failed for user %s: %s', user.id, e
)
_user_hash = hashlib.sha256(str(user.id).encode()).hexdigest()[:16]
audit_failed_login(details={
'user_id_hash': _user_hash, 'reason': 'passkey_invalid',
})
return jsonify({
'error': 'verification_failed',
'message': 'Échec de la vérification.',
}), 400
for k in ('pending_totp_user_id', 'pending_totp_remember', 'pending_totp_next'):
session.pop(k, None)
login_user(user, remember=pending_remember)
audit_login(user.id)
redirect_url = (
pending_next if (pending_next and is_safe_url(pending_next))
else url_for('recordings.index')
)
return jsonify({'ok': True, 'redirect': redirect_url})
@auth_bp.route('/logout')
@csrf_exempt
def logout():

241
src/auth/webauthn.py Normal file
View File

@@ -0,0 +1,241 @@
"""WebAuthn / Passkey service layer (B-2.6).
Wraps the python-webauthn==2.5.2 library to provide:
- Registration options + verification (post-login enrollment)
- Authentication options + verification (used during /2fa/verify)
- Credential persistence in User.webauthn_credentials (JSON column)
Each credential dict stored is:
{
'id': str (base64url, <= 1023 chars by RFC),
'public_key': str (base64url-encoded COSE key),
'sign_count': int,
'transports': list[str] (e.g., ['usb', 'nfc', 'ble', 'internal']),
'name': str (user-supplied label, e.g., 'YubiKey 5C'),
'created_at': str (ISO 8601 UTC),
}
Anti-cloning: every successful authentication updates sign_count from the
authenticator's monotonic counter (RFC 8809). A regression would indicate
a cloned authenticator and is rejected by python-webauthn at verify time.
"""
import base64
import json
import os
from datetime import datetime, timezone
from typing import Dict, List, Tuple
from webauthn import (
generate_registration_options,
generate_authentication_options,
verify_registration_response,
verify_authentication_response,
options_to_json,
)
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
# --- RP / origin configuration -------------------------------------------------
def get_rp_id() -> str:
"""Relying Party ID — host name only (no scheme, no port).
For dictia.ca this is 'dictia.ca'. Browsers strictly enforce that the
RP ID matches the registrable domain of the page making the request,
so this MUST be configured per-environment.
"""
return os.environ.get('WEBAUTHN_RP_ID', 'localhost')
def get_rp_name() -> str:
"""Display name shown in the user's authenticator UI."""
return os.environ.get('WEBAUTHN_RP_NAME', 'DictIA')
def get_expected_origin() -> str:
"""Full origin (scheme + host + optional port).
MUST match window.location.origin on the client side. Browsers reject
auth if these do not match. Example: 'https://dictia.ca'.
"""
return os.environ.get('WEBAUTHN_ORIGIN', 'http://localhost:8899')
# --- base64url helpers (no padding, RFC 4648 §5) -------------------------------
def _b64url_encode(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii')
def _b64url_decode(s: str) -> bytes:
pad = '=' * (-len(s) % 4)
return base64.urlsafe_b64decode(s + pad)
# --- Stored credential accessors -----------------------------------------------
def list_user_credentials(user) -> List[Dict]:
"""Return the user's stored credentials (or empty list if None)."""
return user.webauthn_credentials or []
def has_passkeys(user) -> bool:
"""True iff the user has at least one registered WebAuthn credential."""
return bool(list_user_credentials(user))
# --- Registration --------------------------------------------------------------
def begin_registration(user) -> Tuple[dict, str]:
"""Generate registration options for a new credential.
Returns (json-safe options dict, challenge_b64url). Caller stores the
challenge in session keyed by user_id; passes it back to
finish_registration() for verification. Existing credentials are listed
in `excludeCredentials` so the authenticator can refuse to re-enroll
something it has already provisioned for this user.
"""
existing = list_user_credentials(user)
exclude = [
PublicKeyCredentialDescriptor(id=_b64url_decode(c['id']))
for c in existing
]
options = generate_registration_options(
rp_id=get_rp_id(),
rp_name=get_rp_name(),
user_id=str(user.id).encode('utf-8'),
user_name=user.email,
user_display_name=(user.name or user.username),
exclude_credentials=exclude,
authenticator_selection=AuthenticatorSelectionCriteria(
user_verification=UserVerificationRequirement.PREFERRED,
resident_key=ResidentKeyRequirement.PREFERRED,
),
)
challenge_b64 = _b64url_encode(options.challenge)
return json.loads(options_to_json(options)), challenge_b64
def finish_registration(user, response_json: dict, expected_challenge_b64: str,
label: str = '') -> Dict:
"""Verify the authenticator's registration response and persist the cred.
Raises webauthn.exceptions.InvalidRegistrationResponse on failure.
Returns the credential dict that was added to user.webauthn_credentials.
"""
from src.database import db
expected_challenge = _b64url_decode(expected_challenge_b64)
verification = verify_registration_response(
credential=response_json,
expected_challenge=expected_challenge,
expected_origin=get_expected_origin(),
expected_rp_id=get_rp_id(),
require_user_verification=False,
)
new_cred = {
'id': _b64url_encode(verification.credential_id),
'public_key': _b64url_encode(verification.credential_public_key),
'sign_count': verification.sign_count,
'transports': response_json.get('response', {}).get('transports', []),
'name': (label or 'Passkey').strip()[:80],
'created_at': datetime.now(timezone.utc).isoformat(timespec='seconds'),
}
creds = list(user.webauthn_credentials or [])
creds.append(new_cred)
user.webauthn_credentials = creds
db.session.commit()
return new_cred
# --- Authentication ------------------------------------------------------------
def begin_authentication(user) -> Tuple[dict, str]:
"""Generate authentication options scoped to this user's credentials.
Returns (json-safe options dict, challenge_b64url) — store the challenge
in session keyed by pending_totp_user_id during /2fa/verify.
Raises ValueError if the user has no registered credentials.
"""
existing = list_user_credentials(user)
if not existing:
raise ValueError('User has no registered passkeys')
allow = [
PublicKeyCredentialDescriptor(id=_b64url_decode(c['id']))
for c in existing
]
options = generate_authentication_options(
rp_id=get_rp_id(),
allow_credentials=allow,
user_verification=UserVerificationRequirement.PREFERRED,
)
challenge_b64 = _b64url_encode(options.challenge)
return json.loads(options_to_json(options)), challenge_b64
def finish_authentication(user, response_json: dict,
expected_challenge_b64: str) -> Dict:
"""Verify the authenticator assertion, increment sign_count, return cred.
Anti-cloning per WebAuthn §6.1.1: every successful auth must update
the stored sign_count from the new monotonic counter. python-webauthn
raises if the new counter is not strictly greater than the stored one
(when both are non-zero).
"""
from src.database import db
expected_challenge = _b64url_decode(expected_challenge_b64)
cred_id_b64 = response_json.get('id') or response_json.get('rawId')
if not cred_id_b64:
raise ValueError('Missing credential id in authentication response')
# Take a fresh list with shallow-copied dicts so SQLAlchemy's JSON-column
# change detection sees a new value (mutating the existing list in place
# would not flag the row dirty under the default JSON type).
creds = [dict(c) for c in list_user_credentials(user)]
matched_idx = None
for i, c in enumerate(creds):
if c['id'] == cred_id_b64:
matched_idx = i
break
if matched_idx is None:
raise ValueError('Credential not registered for this user')
matched = creds[matched_idx]
verification = verify_authentication_response(
credential=response_json,
expected_challenge=expected_challenge,
expected_origin=get_expected_origin(),
expected_rp_id=get_rp_id(),
credential_public_key=_b64url_decode(matched['public_key']),
credential_current_sign_count=matched['sign_count'],
require_user_verification=False,
)
matched['sign_count'] = verification.new_sign_count
creds[matched_idx] = matched
user.webauthn_credentials = creds
db.session.commit()
return matched
# --- Deletion ------------------------------------------------------------------
def delete_credential(user, credential_id_b64: str) -> bool:
"""Remove a credential by its base64url id.
Returns True if removed, False if not found. Sets the JSON column to
None when the last credential is removed (matches the column's
nullable=True semantics).
"""
from src.database import db
creds = list_user_credentials(user)
new_creds = [c for c in creds if c['id'] != credential_id_b64]
if len(new_creds) == len(creds):
return False
user.webauthn_credentials = new_creds if new_creds else None
db.session.commit()
return True