fix(auth): B-2.1 — FK erasure policy, totp_secret_encrypted, validates, docs
- ConsentLog.user_id: nullable=True + ondelete='SET NULL' for Loi 25 art. 28.1
right-to-erasure (audit row survives user deletion, user_id nulled out).
Matches existing pattern in auth_log.py / access_log.py.
- Add ConsentLog.@validates('consent_type') to reject typos at ORM level
(silent typos in audit data are very hard to detect later).
- Rename User.totp_secret -> totp_secret_encrypted (size 64->255 for Fernet
envelope). Self-documenting contract: never assign plaintext to this column.
- init_db.py: drop NOT NULL from totp_enabled migration string for consistency
with every other Boolean column in the file (model-side nullable=False is
sufficient).
- Docs: User class docstring updated to reflect MFA/billing/ordre context;
webauthn_credentials shape documented; version column policy documented.
- Tests: cleaner IntegrityError catch; add survives_user_deletion test
(right-to-erasure); add rejects_invalid_consent_type test (validator).
This commit is contained in:
@@ -286,10 +286,10 @@ def initialize_database(app):
|
|||||||
app.logger.info("Added transcription_initial_prompt column to user table")
|
app.logger.info("Added transcription_initial_prompt column to user table")
|
||||||
|
|
||||||
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 user fields ===
|
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 user fields ===
|
||||||
if add_column_if_not_exists(engine, 'user', 'totp_secret', 'VARCHAR(64)'):
|
if add_column_if_not_exists(engine, 'user', 'totp_secret_encrypted', 'VARCHAR(255)'):
|
||||||
app.logger.info("Added totp_secret column to user table")
|
app.logger.info("Added 'totp_secret_encrypted' column to user")
|
||||||
if add_column_if_not_exists(engine, 'user', 'totp_enabled', 'BOOLEAN DEFAULT 0 NOT NULL'):
|
if add_column_if_not_exists(engine, 'user', 'totp_enabled', 'BOOLEAN DEFAULT 0'):
|
||||||
app.logger.info("Added totp_enabled column to user table")
|
app.logger.info("Added 'totp_enabled' column to user")
|
||||||
if add_column_if_not_exists(engine, 'user', 'webauthn_credentials', 'JSON'):
|
if add_column_if_not_exists(engine, 'user', 'webauthn_credentials', 'JSON'):
|
||||||
app.logger.info("Added webauthn_credentials column to user table")
|
app.logger.info("Added webauthn_credentials column to user table")
|
||||||
if add_column_if_not_exists(engine, 'user', 'ordre_pro', 'VARCHAR(50)'):
|
if add_column_if_not_exists(engine, 'user', 'ordre_pro', 'VARCHAR(50)'):
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ explicit and tracé) and art. 3.5 (audit trail).
|
|||||||
"""
|
"""
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from sqlalchemy.orm import validates
|
||||||
|
|
||||||
from src.database import db
|
from src.database import db
|
||||||
|
|
||||||
|
|
||||||
@@ -17,12 +19,25 @@ class ConsentLog(db.Model):
|
|||||||
"""
|
"""
|
||||||
__tablename__ = 'consent_log'
|
__tablename__ = 'consent_log'
|
||||||
|
|
||||||
|
ALLOWED_CONSENT_TYPES = ('cgu', 'confidentialite', 'marketing', 'analytics')
|
||||||
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, index=True)
|
# nullable + ondelete=SET NULL preserves audit trail (LPRPSP art. 3.5) while
|
||||||
|
# supporting right-to-erasure (LPRPSP art. 28.1): on user deletion, the row
|
||||||
|
# survives with user_id=NULL — proof that consent existed without identifying
|
||||||
|
# the data subject. Pattern matches src/models/auth_log.py and access_log.py.
|
||||||
|
user_id = db.Column(
|
||||||
|
db.Integer,
|
||||||
|
db.ForeignKey('user.id', ondelete='SET NULL'),
|
||||||
|
nullable=True,
|
||||||
|
index=True,
|
||||||
|
)
|
||||||
|
|
||||||
# 'cgu', 'confidentialite', 'marketing', 'analytics'
|
# 'cgu', 'confidentialite', 'marketing', 'analytics'
|
||||||
consent_type = db.Column(db.String(50), nullable=False)
|
consent_type = db.Column(db.String(50), nullable=False)
|
||||||
# Version of the document accepted (e.g. '1.0', '2026-04-27')
|
# Version of the legal text accepted. Convention: ISO date 'YYYY-MM-DD' of
|
||||||
|
# the document revision (e.g. '2026-04-27'). B-2.9 will define the canonical
|
||||||
|
# version constants in src/legal/__init__.py — DO NOT hardcode dates here.
|
||||||
version = db.Column(db.String(20), nullable=False)
|
version = db.Column(db.String(20), nullable=False)
|
||||||
|
|
||||||
granted = db.Column(db.Boolean, nullable=False)
|
granted = db.Column(db.Boolean, nullable=False)
|
||||||
@@ -36,6 +51,15 @@ class ConsentLog(db.Model):
|
|||||||
# Backref creates User.consent_logs
|
# Backref creates User.consent_logs
|
||||||
user = db.relationship('User', backref='consent_logs')
|
user = db.relationship('User', backref='consent_logs')
|
||||||
|
|
||||||
|
@validates('consent_type')
|
||||||
|
def _validate_consent_type(self, key, value):
|
||||||
|
if value not in self.ALLOWED_CONSENT_TYPES:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid consent_type {value!r}. "
|
||||||
|
f"Must be one of: {self.ALLOWED_CONSENT_TYPES}"
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
action = 'granted' if self.granted else 'revoked'
|
action = 'granted' if self.granted else 'revoked'
|
||||||
return f"<ConsentLog user={self.user_id} type={self.consent_type} v={self.version} {action}>"
|
return f"<ConsentLog user={self.user_id} type={self.consent_type} v={self.version} {action}>"
|
||||||
|
|||||||
@@ -13,7 +13,13 @@ from src.database import db
|
|||||||
|
|
||||||
|
|
||||||
class User(db.Model, UserMixin):
|
class User(db.Model, UserMixin):
|
||||||
"""User model for authentication and profile management."""
|
"""User model — authentication, profile, MFA enrollment, and subscription state.
|
||||||
|
|
||||||
|
Post-B-2.1 columns include MFA (totp_secret_encrypted, totp_enabled,
|
||||||
|
webauthn_credentials), Stripe billing (stripe_customer_id, subscription_status),
|
||||||
|
and ordre professionnel context (ordre_pro, cabinet) used at signup (B-2.2).
|
||||||
|
Consent audit trail in src/models/consent.py via User.consent_logs backref.
|
||||||
|
"""
|
||||||
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
username = db.Column(db.String(20), unique=True, nullable=False)
|
username = db.Column(db.String(20), unique=True, nullable=False)
|
||||||
@@ -65,11 +71,14 @@ class User(db.Model, UserMixin):
|
|||||||
transcription_initial_prompt = db.Column(db.Text, nullable=True)
|
transcription_initial_prompt = db.Column(db.Text, nullable=True)
|
||||||
|
|
||||||
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 fields (Phase 2 backend) ===
|
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 fields (Phase 2 backend) ===
|
||||||
# TOTP MFA (B-2.5) — chiffré au repos via SECRET_KEY (handled in service layer)
|
# B-2.5 service layer encrypts the base32 secret with SECRET_KEY before storing.
|
||||||
totp_secret = db.Column(db.String(64), nullable=True)
|
# The encrypted blob (Fernet token) is what lives in this column. NEVER assign a
|
||||||
|
# raw base32 secret to this attribute — use the service-layer setter.
|
||||||
|
totp_secret_encrypted = db.Column(db.String(255), nullable=True)
|
||||||
totp_enabled = db.Column(db.Boolean, default=False, nullable=False)
|
totp_enabled = db.Column(db.Boolean, default=False, nullable=False)
|
||||||
|
|
||||||
# WebAuthn / Passkey credentials (B-2.6) — list of credential dicts
|
# WebAuthn / Passkey credentials (B-2.6) — list of credential dicts:
|
||||||
|
# [{'id': str, 'public_key': str, 'sign_count': int, 'transports': list[str]}]
|
||||||
webauthn_credentials = db.Column(db.JSON, nullable=True)
|
webauthn_credentials = db.Column(db.JSON, nullable=True)
|
||||||
|
|
||||||
# Loi 25 + ordre professionnel context (used at signup B-2.2)
|
# Loi 25 + ordre professionnel context (used at signup B-2.2)
|
||||||
|
|||||||
@@ -119,22 +119,34 @@ def test_consent_log_user_backref():
|
|||||||
|
|
||||||
def test_consent_log_requires_ip_and_user_agent():
|
def test_consent_log_requires_ip_and_user_agent():
|
||||||
"""ip_address and user_agent are NOT NULL — required for Loi 25 traceability."""
|
"""ip_address and user_agent are NOT NULL — required for Loi 25 traceability."""
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
db.create_all()
|
db.create_all()
|
||||||
try:
|
try:
|
||||||
user = _make_user()
|
user = _make_user(username='erica', email='erica@example.com')
|
||||||
# Missing ip_address — should fail at flush
|
|
||||||
log = ConsentLog(
|
# Missing ip_address
|
||||||
|
log_no_ip = ConsentLog(
|
||||||
user_id=user.id, consent_type='cgu', version='1.0',
|
user_id=user.id, consent_type='cgu', version='1.0',
|
||||||
granted=True, user_agent='UA'
|
granted=True, user_agent='UA'
|
||||||
# ip_address intentionally omitted
|
|
||||||
)
|
)
|
||||||
db.session.add(log)
|
db.session.add(log_no_ip)
|
||||||
try:
|
try:
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
raise AssertionError("Expected IntegrityError on missing ip_address")
|
raise AssertionError("Expected IntegrityError on missing ip_address")
|
||||||
except Exception as e:
|
except IntegrityError:
|
||||||
assert 'ip_address' in str(e).lower() or 'NOT NULL' in str(e) or 'integrity' in str(e).lower()
|
db.session.rollback()
|
||||||
|
|
||||||
|
# Missing user_agent
|
||||||
|
log_no_ua = ConsentLog(
|
||||||
|
user_id=user.id, consent_type='cgu', version='1.0',
|
||||||
|
granted=True, ip_address='192.0.2.1'
|
||||||
|
)
|
||||||
|
db.session.add(log_no_ua)
|
||||||
|
try:
|
||||||
|
db.session.commit()
|
||||||
|
raise AssertionError("Expected IntegrityError on missing user_agent")
|
||||||
|
except IntegrityError:
|
||||||
db.session.rollback()
|
db.session.rollback()
|
||||||
finally:
|
finally:
|
||||||
db.session.rollback()
|
db.session.rollback()
|
||||||
@@ -142,12 +154,12 @@ def test_consent_log_requires_ip_and_user_agent():
|
|||||||
|
|
||||||
|
|
||||||
def test_user_has_new_b21_fields():
|
def test_user_has_new_b21_fields():
|
||||||
"""User model gained: totp_secret, totp_enabled, webauthn_credentials, ordre_pro, cabinet, stripe_customer_id, subscription_status."""
|
"""User model gained: totp_secret_encrypted, totp_enabled, webauthn_credentials, ordre_pro, cabinet, stripe_customer_id, subscription_status."""
|
||||||
with app.app_context():
|
with app.app_context():
|
||||||
db.create_all()
|
db.create_all()
|
||||||
try:
|
try:
|
||||||
user = _make_user()
|
user = _make_user()
|
||||||
user.totp_secret = 'JBSWY3DPEHPK3PXP'
|
user.totp_secret_encrypted = 'gAAAAABh-encrypted-fernet-token-placeholder'
|
||||||
user.totp_enabled = True
|
user.totp_enabled = True
|
||||||
user.webauthn_credentials = [{'id': 'cred1', 'public_key': 'abc'}]
|
user.webauthn_credentials = [{'id': 'cred1', 'public_key': 'abc'}]
|
||||||
user.ordre_pro = 'barreau'
|
user.ordre_pro = 'barreau'
|
||||||
@@ -157,7 +169,7 @@ def test_user_has_new_b21_fields():
|
|||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
fetched = User.query.filter_by(username='alice').first()
|
fetched = User.query.filter_by(username='alice').first()
|
||||||
assert fetched.totp_secret == 'JBSWY3DPEHPK3PXP'
|
assert fetched.totp_secret_encrypted == 'gAAAAABh-encrypted-fernet-token-placeholder'
|
||||||
assert fetched.totp_enabled is True
|
assert fetched.totp_enabled is True
|
||||||
assert fetched.webauthn_credentials == [{'id': 'cred1', 'public_key': 'abc'}]
|
assert fetched.webauthn_credentials == [{'id': 'cred1', 'public_key': 'abc'}]
|
||||||
assert fetched.ordre_pro == 'barreau'
|
assert fetched.ordre_pro == 'barreau'
|
||||||
@@ -176,7 +188,7 @@ def test_user_b21_fields_default_to_safe_values():
|
|||||||
try:
|
try:
|
||||||
user = _make_user(username='bob', email='bob@example.com')
|
user = _make_user(username='bob', email='bob@example.com')
|
||||||
assert user.totp_enabled is False, "totp_enabled must default to False (no MFA bypass)"
|
assert user.totp_enabled is False, "totp_enabled must default to False (no MFA bypass)"
|
||||||
assert user.totp_secret is None
|
assert user.totp_secret_encrypted is None
|
||||||
assert user.webauthn_credentials is None
|
assert user.webauthn_credentials is None
|
||||||
assert user.stripe_customer_id is None
|
assert user.stripe_customer_id is None
|
||||||
assert user.subscription_status is None
|
assert user.subscription_status is None
|
||||||
@@ -185,3 +197,54 @@ def test_user_b21_fields_default_to_safe_values():
|
|||||||
finally:
|
finally:
|
||||||
db.session.rollback()
|
db.session.rollback()
|
||||||
db.drop_all()
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_survives_user_deletion_with_null_user_id():
|
||||||
|
"""Loi 25 art. 28.1 right-to-erasure: deleting a User must NOT delete their
|
||||||
|
consent log rows. The user_id is set to NULL, the audit row survives.
|
||||||
|
"""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(username='claire', email='claire@example.com')
|
||||||
|
uid = user.id
|
||||||
|
log = ConsentLog(
|
||||||
|
user_id=uid, consent_type='cgu', version='2026-04-27',
|
||||||
|
granted=True, ip_address='192.0.2.1', user_agent='UA'
|
||||||
|
)
|
||||||
|
db.session.add(log)
|
||||||
|
db.session.commit()
|
||||||
|
log_id = log.id
|
||||||
|
|
||||||
|
db.session.delete(user)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
# Audit row must survive
|
||||||
|
surviving = ConsentLog.query.get(log_id)
|
||||||
|
assert surviving is not None, "Consent log row must survive user deletion (Loi 25 audit trail)"
|
||||||
|
assert surviving.user_id is None, "user_id must be NULL after user deletion (data minimization)"
|
||||||
|
assert surviving.consent_type == 'cgu'
|
||||||
|
assert surviving.granted is True
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_rejects_invalid_consent_type():
|
||||||
|
"""Typos like 'comfidentialite' must be rejected at ORM level."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(username='diane', email='diane@example.com')
|
||||||
|
try:
|
||||||
|
ConsentLog(
|
||||||
|
user_id=user.id, consent_type='comfidentialite', # typo
|
||||||
|
version='1.0', granted=True,
|
||||||
|
ip_address='192.0.2.1', user_agent='UA'
|
||||||
|
)
|
||||||
|
raise AssertionError("ValueError expected on invalid consent_type")
|
||||||
|
except ValueError as e:
|
||||||
|
assert 'Invalid consent_type' in str(e)
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|||||||
Reference in New Issue
Block a user