feat(auth): B-2.1 ConsentLog model (Loi 25) + User MFA/OAuth/Stripe fields
- New src/models/consent.py — ConsentLog with user_id FK, consent_type
('cgu' | 'confidentialite' | 'marketing' | 'analytics'), version, granted
bool, granted_at/revoked_at timestamps, ip_address (45 chars for IPv6),
user_agent (500 chars). User.consent_logs backref. Audit trail per
LPRPSP art. 14 (consent tracé) + art. 3.5 (journal).
- src/models/user.py: add 7 new columns (totp_secret, totp_enabled DEFAULT 0,
webauthn_credentials JSON, ordre_pro, cabinet, stripe_customer_id,
subscription_status). Do NOT duplicate existing sso_provider/sso_subject/
email_verified/etc. (per compatibility-audit C4).
- src/init_db.py: 7 add_column_if_not_exists() calls for the new User
columns + 2 create_index_if_not_exists() for stripe_customer_id and
subscription_status. NO Alembic — init_db.py pattern matches
compatibility-audit C3.
- src/models/__init__.py: register ConsentLog import.
- tests/test_consent_log.py: 7 tests — grant flow, 4 consent types, revoke
preserves audit trail, User backref, NOT NULL on ip/UA, User.B-2.1 fields
round-trip, defaults safe.
This commit is contained in:
@@ -284,6 +284,31 @@ def initialize_database(app):
|
|||||||
app.logger.info("Added transcription_hotwords column to user table")
|
app.logger.info("Added transcription_hotwords column to user table")
|
||||||
if add_column_if_not_exists(engine, 'user', 'transcription_initial_prompt', 'TEXT'):
|
if add_column_if_not_exists(engine, 'user', 'transcription_initial_prompt', 'TEXT'):
|
||||||
app.logger.info("Added transcription_initial_prompt column to user table")
|
app.logger.info("Added transcription_initial_prompt column to user table")
|
||||||
|
|
||||||
|
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 user fields ===
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'totp_secret', 'VARCHAR(64)'):
|
||||||
|
app.logger.info("Added totp_secret column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'totp_enabled', 'BOOLEAN DEFAULT 0 NOT NULL'):
|
||||||
|
app.logger.info("Added totp_enabled column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'webauthn_credentials', 'JSON'):
|
||||||
|
app.logger.info("Added webauthn_credentials column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'ordre_pro', 'VARCHAR(50)'):
|
||||||
|
app.logger.info("Added ordre_pro column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'cabinet', 'VARCHAR(255)'):
|
||||||
|
app.logger.info("Added cabinet column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'stripe_customer_id', 'VARCHAR(120)'):
|
||||||
|
app.logger.info("Added stripe_customer_id column to user table")
|
||||||
|
if add_column_if_not_exists(engine, 'user', 'subscription_status', 'VARCHAR(20)'):
|
||||||
|
app.logger.info("Added subscription_status column to user table")
|
||||||
|
|
||||||
|
# === B-2.1: Indexes on stripe_customer_id and subscription_status ===
|
||||||
|
try:
|
||||||
|
if create_index_if_not_exists(engine, 'idx_user_stripe_customer', 'user', 'stripe_customer_id'):
|
||||||
|
app.logger.info("Created index idx_user_stripe_customer on user.stripe_customer_id")
|
||||||
|
if create_index_if_not_exists(engine, 'idx_user_subscription_status', 'user', 'subscription_status'):
|
||||||
|
app.logger.info("Created index idx_user_subscription_status on user.subscription_status")
|
||||||
|
except Exception as e:
|
||||||
|
app.logger.warning(f"Could not create B-2.1 user indexes: {e}")
|
||||||
if add_column_if_not_exists(engine, 'tag', 'default_hotwords', 'TEXT'):
|
if add_column_if_not_exists(engine, 'tag', 'default_hotwords', 'TEXT'):
|
||||||
app.logger.info("Added default_hotwords column to tag table")
|
app.logger.info("Added default_hotwords column to tag table")
|
||||||
if add_column_if_not_exists(engine, 'tag', 'default_initial_prompt', 'TEXT'):
|
if add_column_if_not_exists(engine, 'tag', 'default_initial_prompt', 'TEXT'):
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ from .push_subscription import PushSubscription
|
|||||||
from .processing_job import ProcessingJob
|
from .processing_job import ProcessingJob
|
||||||
from .token_usage import TokenUsage
|
from .token_usage import TokenUsage
|
||||||
from .transcription_usage import TranscriptionUsage
|
from .transcription_usage import TranscriptionUsage
|
||||||
|
from .consent import ConsentLog
|
||||||
|
|
||||||
# Export all models
|
# Export all models
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@@ -70,4 +71,5 @@ __all__ = [
|
|||||||
'ProcessingJob',
|
'ProcessingJob',
|
||||||
'TokenUsage',
|
'TokenUsage',
|
||||||
'TranscriptionUsage',
|
'TranscriptionUsage',
|
||||||
|
'ConsentLog',
|
||||||
]
|
]
|
||||||
|
|||||||
41
src/models/consent.py
Normal file
41
src/models/consent.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
"""ConsentLog model — Loi 25 audit trail.
|
||||||
|
|
||||||
|
Records every grant/revoke of user consent for: CGU, confidentiality (RPRP),
|
||||||
|
marketing communications, analytics. Required by LPRPSP art. 14 (consent
|
||||||
|
explicit and tracé) and art. 3.5 (audit trail).
|
||||||
|
"""
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from src.database import db
|
||||||
|
|
||||||
|
|
||||||
|
class ConsentLog(db.Model):
|
||||||
|
"""Journal Loi 25 — traçabilité des consentements utilisateurs.
|
||||||
|
|
||||||
|
One row per (user, consent_type, version) state change. Granting,
|
||||||
|
revoking, and re-granting all create separate rows for the audit trail.
|
||||||
|
"""
|
||||||
|
__tablename__ = 'consent_log'
|
||||||
|
|
||||||
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, index=True)
|
||||||
|
|
||||||
|
# 'cgu', 'confidentialite', 'marketing', 'analytics'
|
||||||
|
consent_type = db.Column(db.String(50), nullable=False)
|
||||||
|
# Version of the document accepted (e.g. '1.0', '2026-04-27')
|
||||||
|
version = db.Column(db.String(20), nullable=False)
|
||||||
|
|
||||||
|
granted = db.Column(db.Boolean, nullable=False)
|
||||||
|
granted_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
||||||
|
revoked_at = db.Column(db.DateTime, nullable=True)
|
||||||
|
|
||||||
|
# Source IP — supports both IPv4 (15 chars) and IPv6 (45 chars)
|
||||||
|
ip_address = db.Column(db.String(45), nullable=False)
|
||||||
|
user_agent = db.Column(db.String(500), nullable=False)
|
||||||
|
|
||||||
|
# Backref creates User.consent_logs
|
||||||
|
user = db.relationship('User', backref='consent_logs')
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
action = 'granted' if self.granted else 'revoked'
|
||||||
|
return f"<ConsentLog user={self.user_id} type={self.consent_type} v={self.version} {action}>"
|
||||||
@@ -9,6 +9,8 @@ from datetime import datetime
|
|||||||
from flask_login import UserMixin
|
from flask_login import UserMixin
|
||||||
from src.database import db
|
from src.database import db
|
||||||
|
|
||||||
|
# ConsentLog backref defined in src/models/consent.py — accessible as User.consent_logs
|
||||||
|
|
||||||
|
|
||||||
class User(db.Model, UserMixin):
|
class User(db.Model, UserMixin):
|
||||||
"""User model for authentication and profile management."""
|
"""User model for authentication and profile management."""
|
||||||
@@ -62,6 +64,23 @@ class User(db.Model, UserMixin):
|
|||||||
transcription_hotwords = db.Column(db.Text, nullable=True)
|
transcription_hotwords = db.Column(db.Text, nullable=True)
|
||||||
transcription_initial_prompt = db.Column(db.Text, nullable=True)
|
transcription_initial_prompt = db.Column(db.Text, nullable=True)
|
||||||
|
|
||||||
|
# === B-2.1: MFA / WebAuthn / Stripe / Loi 25 fields (Phase 2 backend) ===
|
||||||
|
# TOTP MFA (B-2.5) — chiffré au repos via SECRET_KEY (handled in service layer)
|
||||||
|
totp_secret = db.Column(db.String(64), nullable=True)
|
||||||
|
totp_enabled = db.Column(db.Boolean, default=False, nullable=False)
|
||||||
|
|
||||||
|
# WebAuthn / Passkey credentials (B-2.6) — list of credential dicts
|
||||||
|
webauthn_credentials = db.Column(db.JSON, nullable=True)
|
||||||
|
|
||||||
|
# Loi 25 + ordre professionnel context (used at signup B-2.2)
|
||||||
|
ordre_pro = db.Column(db.String(50), nullable=True) # 'barreau', 'cpa', 'chad', etc.
|
||||||
|
cabinet = db.Column(db.String(255), nullable=True)
|
||||||
|
|
||||||
|
# Stripe billing (B-2.7 / B-2.8)
|
||||||
|
stripe_customer_id = db.Column(db.String(120), nullable=True, index=True)
|
||||||
|
# 'trialing' | 'active' | 'past_due' | 'canceled' | 'incomplete' | None
|
||||||
|
subscription_status = db.Column(db.String(20), nullable=True, index=True)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f"User('{self.username}', '{self.email}')"
|
return f"User('{self.username}', '{self.email}')"
|
||||||
|
|
||||||
|
|||||||
187
tests/test_consent_log.py
Normal file
187
tests/test_consent_log.py
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
"""Tests for the ConsentLog model — B-2.1 Loi 25 audit trail."""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
|
||||||
|
os.environ.setdefault('SECRET_KEY', 'test-secret-key')
|
||||||
|
|
||||||
|
from src.app import app, db # noqa: E402
|
||||||
|
from src.models.user import User # noqa: E402
|
||||||
|
from src.models.consent import ConsentLog # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def _make_user(username='alice', email='alice@example.com'):
|
||||||
|
user = User(username=username, email=email, password='x' * 60)
|
||||||
|
db.session.add(user)
|
||||||
|
db.session.commit()
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_records_cgu_grant():
|
||||||
|
"""Granting CGU consent at signup creates a row with all audit fields."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
log = ConsentLog(
|
||||||
|
user_id=user.id,
|
||||||
|
consent_type='cgu',
|
||||||
|
version='1.0',
|
||||||
|
granted=True,
|
||||||
|
ip_address='192.0.2.1',
|
||||||
|
user_agent='Mozilla/5.0 (Windows NT 10.0)'
|
||||||
|
)
|
||||||
|
db.session.add(log)
|
||||||
|
db.session.commit()
|
||||||
|
assert ConsentLog.query.count() == 1
|
||||||
|
assert log.granted_at is not None
|
||||||
|
assert log.granted_at <= datetime.utcnow()
|
||||||
|
assert log.revoked_at is None
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_supports_4_consent_types():
|
||||||
|
"""All 4 Loi 25 consent types (cgu, confidentialite, marketing, analytics) are valid."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
for consent_type in ('cgu', 'confidentialite', 'marketing', 'analytics'):
|
||||||
|
log = ConsentLog(
|
||||||
|
user_id=user.id,
|
||||||
|
consent_type=consent_type,
|
||||||
|
version='1.0',
|
||||||
|
granted=True,
|
||||||
|
ip_address='192.0.2.1',
|
||||||
|
user_agent='Mozilla/5.0'
|
||||||
|
)
|
||||||
|
db.session.add(log)
|
||||||
|
db.session.commit()
|
||||||
|
types = sorted([l.consent_type for l in ConsentLog.query.all()])
|
||||||
|
assert types == ['analytics', 'cgu', 'confidentialite', 'marketing']
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_revoke_creates_separate_row():
|
||||||
|
"""Revoking later does NOT mutate the grant — both rows persist for audit trail."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
grant = ConsentLog(
|
||||||
|
user_id=user.id, consent_type='marketing', version='1.0',
|
||||||
|
granted=True, ip_address='192.0.2.1', user_agent='UA'
|
||||||
|
)
|
||||||
|
db.session.add(grant)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
revoke = ConsentLog(
|
||||||
|
user_id=user.id, consent_type='marketing', version='1.0',
|
||||||
|
granted=False, ip_address='192.0.2.1', user_agent='UA',
|
||||||
|
revoked_at=datetime.utcnow()
|
||||||
|
)
|
||||||
|
db.session.add(revoke)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
rows = ConsentLog.query.filter_by(consent_type='marketing').order_by(ConsentLog.id).all()
|
||||||
|
assert len(rows) == 2, "Grant and revoke must each have their own row (audit trail)"
|
||||||
|
assert rows[0].granted is True and rows[0].revoked_at is None
|
||||||
|
assert rows[1].granted is False and rows[1].revoked_at is not None
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_user_backref():
|
||||||
|
"""User.consent_logs backref returns the user's consent history."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
for ct in ('cgu', 'confidentialite'):
|
||||||
|
db.session.add(ConsentLog(
|
||||||
|
user_id=user.id, consent_type=ct, version='1.0',
|
||||||
|
granted=True, ip_address='192.0.2.1', user_agent='UA'
|
||||||
|
))
|
||||||
|
db.session.commit()
|
||||||
|
assert len(user.consent_logs) == 2
|
||||||
|
assert sorted([l.consent_type for l in user.consent_logs]) == ['cgu', 'confidentialite']
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_consent_log_requires_ip_and_user_agent():
|
||||||
|
"""ip_address and user_agent are NOT NULL — required for Loi 25 traceability."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
# Missing ip_address — should fail at flush
|
||||||
|
log = ConsentLog(
|
||||||
|
user_id=user.id, consent_type='cgu', version='1.0',
|
||||||
|
granted=True, user_agent='UA'
|
||||||
|
# ip_address intentionally omitted
|
||||||
|
)
|
||||||
|
db.session.add(log)
|
||||||
|
try:
|
||||||
|
db.session.commit()
|
||||||
|
raise AssertionError("Expected IntegrityError on missing ip_address")
|
||||||
|
except Exception as e:
|
||||||
|
assert 'ip_address' in str(e).lower() or 'NOT NULL' in str(e) or 'integrity' in str(e).lower()
|
||||||
|
db.session.rollback()
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_has_new_b21_fields():
|
||||||
|
"""User model gained: totp_secret, totp_enabled, webauthn_credentials, ordre_pro, cabinet, stripe_customer_id, subscription_status."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user()
|
||||||
|
user.totp_secret = 'JBSWY3DPEHPK3PXP'
|
||||||
|
user.totp_enabled = True
|
||||||
|
user.webauthn_credentials = [{'id': 'cred1', 'public_key': 'abc'}]
|
||||||
|
user.ordre_pro = 'barreau'
|
||||||
|
user.cabinet = 'Cabinet Pilote A'
|
||||||
|
user.stripe_customer_id = 'cus_TestCustomerId'
|
||||||
|
user.subscription_status = 'active'
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
fetched = User.query.filter_by(username='alice').first()
|
||||||
|
assert fetched.totp_secret == 'JBSWY3DPEHPK3PXP'
|
||||||
|
assert fetched.totp_enabled is True
|
||||||
|
assert fetched.webauthn_credentials == [{'id': 'cred1', 'public_key': 'abc'}]
|
||||||
|
assert fetched.ordre_pro == 'barreau'
|
||||||
|
assert fetched.cabinet == 'Cabinet Pilote A'
|
||||||
|
assert fetched.stripe_customer_id == 'cus_TestCustomerId'
|
||||||
|
assert fetched.subscription_status == 'active'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_b21_fields_default_to_safe_values():
|
||||||
|
"""New User defaults: totp_enabled=False, others None."""
|
||||||
|
with app.app_context():
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(username='bob', email='bob@example.com')
|
||||||
|
assert user.totp_enabled is False, "totp_enabled must default to False (no MFA bypass)"
|
||||||
|
assert user.totp_secret is None
|
||||||
|
assert user.webauthn_credentials is None
|
||||||
|
assert user.stripe_customer_id is None
|
||||||
|
assert user.subscription_status is None
|
||||||
|
assert user.ordre_pro is None
|
||||||
|
assert user.cabinet is None
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
Reference in New Issue
Block a user