#!/usr/bin/env python3 """ Tests for the Loi 25 audit system. Covers: - audit_access(): adds to session, does NOT commit - audit_login(): commits independently - audit_failed_login(): commits, uses email_hash (not plain email) - _is_recent_duplicate(): deduplication window - get_access_logs() / get_auth_logs(): pagination and filters - Admin API endpoints: /api/admin/audit/status, /access, /auth """ import os import sys import hashlib # Add the parent directory to the path to import app sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:') os.environ.setdefault('ENABLE_AUDIT_LOG', 'true') os.environ['ENABLE_AUDIT_LOG'] = 'true' from src.app import app, db from src.models import User from src.models.access_log import AccessLog from src.models.auth_log import AuthLog # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_user(username, is_admin=False): user = User(username=username, email=f"{username}@test.local", is_admin=is_admin) user.set_password("TestPass1!") db.session.add(user) db.session.commit() return user def _login_client(client, user): """Push a real Flask-Login session for a user via test request context.""" from flask_login import login_user with client.session_transaction() as sess: pass # Use the test client's post to login with app.test_request_context(): login_user(user) # Directly inject the user_id into the session cookie with client.session_transaction() as sess: sess['_user_id'] = str(user.id) sess['_fresh'] = True # --------------------------------------------------------------------------- # Service-level tests # --------------------------------------------------------------------------- def test_audit_access_no_commit(): """audit_access() adds to session but does NOT commit.""" with app.app_context(): db.create_all() user = _make_user("audit_no_commit") try: initial_count = AccessLog.query.count() from src.services.audit import audit_access with app.test_request_context(): from flask_login import login_user login_user(user) log = audit_access('edit', 'recording', 1, user_id=user.id) # Log object returned but not yet in DB (no commit happened) assert log is not None, "audit_access should return a log object" # The session hasn't been committed, so count is still the same # (SQLite in :memory: — flush to verify it's in session) db.session.flush() assert AccessLog.query.count() == initial_count + 1, "Log should be in session after flush" db.session.rollback() # undo — simulates caller rollback assert AccessLog.query.count() == initial_count, "Log should be gone after rollback" print("✅ audit_access() does not commit") return True finally: db.session.delete(user) db.session.commit() def test_audit_login_commits(): """audit_login() commits its own transaction.""" with app.app_context(): db.create_all() user = _make_user("audit_login_user") try: initial_count = AuthLog.query.count() from src.services.audit import audit_login with app.test_request_context(): audit_login(user.id) # Should be committed — visible in a fresh query assert AuthLog.query.count() == initial_count + 1, "auth log should be committed" log = AuthLog.query.order_by(AuthLog.id.desc()).first() assert log.action == 'login' assert log.user_id == user.id print("✅ audit_login() commits independently") return True finally: AuthLog.query.filter_by(user_id=user.id).delete() db.session.delete(user) db.session.commit() def test_audit_failed_login_uses_email_hash(): """audit_failed_login() stores email_hash, not plain email.""" with app.app_context(): db.create_all() initial_count = AuthLog.query.count() email = "attacker-target@example.com" email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16] from src.services.audit import audit_failed_login with app.test_request_context(): audit_failed_login(details={'email_hash': email_hash, 'reason': 'wrong_password'}) assert AuthLog.query.count() == initial_count + 1 log = AuthLog.query.order_by(AuthLog.id.desc()).first() assert log.action == 'failed_login' assert log.details is not None assert 'email_hash' in log.details, "Should store email_hash, not plain email" assert 'email' not in log.details, "Should NOT store plain email" assert log.details['email_hash'] == email_hash # Cleanup db.session.delete(log) db.session.commit() print("✅ audit_failed_login() stores email_hash, not plain email") return True def test_audit_view_deduplication(): """audit_view() on the same resource within 5 min creates only one log entry.""" with app.app_context(): db.create_all() user = _make_user("audit_dedup_user") try: initial_count = AccessLog.query.count() from src.services.audit import audit_view with app.test_request_context(): from flask_login import login_user login_user(user) # First view — should log log1 = audit_view('recording', 42, user_id=user.id) db.session.commit() # Second view within 5 min — should be deduped log2 = audit_view('recording', 42, user_id=user.id) if log2 is not None: db.session.commit() count_after = AccessLog.query.filter_by( user_id=user.id, action='view', resource_type='recording', resource_id=42 ).count() assert count_after == 1, f"Expected 1 log entry, got {count_after} (dedup failed)" print("✅ audit_view() deduplication works (5-min window)") return True finally: AccessLog.query.filter_by(user_id=user.id).delete() db.session.delete(user) db.session.commit() def test_get_access_logs_pagination(): """get_access_logs() returns paginated results.""" with app.app_context(): db.create_all() user = _make_user("audit_pag_user") try: # Create 5 access log entries for i in range(5): log = AccessLog.log_access( action='view', resource_type='recording', resource_id=i, user_id=user.id, status='success', ) db.session.add(log) db.session.commit() from src.services.audit import get_access_logs page1 = get_access_logs(page=1, per_page=3, user_id=user.id) assert page1.total >= 5 assert len(page1.items) == 3 page2 = get_access_logs(page=2, per_page=3, user_id=user.id) assert len(page2.items) >= 2 print("✅ get_access_logs() pagination works") return True finally: AccessLog.query.filter_by(user_id=user.id).delete() db.session.delete(user) db.session.commit() # --------------------------------------------------------------------------- # Admin API endpoint tests # --------------------------------------------------------------------------- def test_audit_status_requires_admin(): """GET /api/admin/audit/status: 401 anon, 403 non-admin, 200 admin.""" with app.app_context(): db.create_all() regular = _make_user("audit_status_regular") admin = _make_user("audit_status_admin", is_admin=True) client = app.test_client() try: # Anonymous — should redirect to login (302) or 401 resp = client.get('/api/admin/audit/status') assert resp.status_code in (401, 302), f"Expected 401/302 for anon, got {resp.status_code}" # Regular user — should get 403 _login_client(client, regular) resp = client.get('/api/admin/audit/status') assert resp.status_code == 403, f"Expected 403 for non-admin, got {resp.status_code}" # Admin — should get 200 _login_client(client, admin) resp = client.get('/api/admin/audit/status') assert resp.status_code == 200, f"Expected 200 for admin, got {resp.status_code}" data = resp.get_json() assert 'enabled' in data print("✅ /api/admin/audit/status access control works") return True finally: db.session.delete(regular) db.session.delete(admin) db.session.commit() def test_audit_access_logs_endpoint(): """GET /api/admin/audit/access returns paginated logs for admin.""" with app.app_context(): db.create_all() admin = _make_user("audit_access_ep_admin", is_admin=True) client = app.test_client() try: _login_client(client, admin) resp = client.get('/api/admin/audit/access?per_page=10') assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" data = resp.get_json() assert 'logs' in data assert 'total' in data assert 'page' in data print("✅ /api/admin/audit/access returns correct structure") return True finally: db.session.delete(admin) db.session.commit() def test_audit_auth_logs_endpoint(): """GET /api/admin/audit/auth returns paginated auth logs for admin.""" with app.app_context(): db.create_all() admin = _make_user("audit_auth_ep_admin", is_admin=True) client = app.test_client() try: _login_client(client, admin) resp = client.get('/api/admin/audit/auth?per_page=10') assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" data = resp.get_json() assert 'logs' in data assert 'total' in data print("✅ /api/admin/audit/auth returns correct structure") return True finally: db.session.delete(admin) db.session.commit() # --------------------------------------------------------------------------- # Runner # --------------------------------------------------------------------------- def main(): print("🚀 Running audit system tests...\n") tests = [ test_audit_access_no_commit, test_audit_login_commits, test_audit_failed_login_uses_email_hash, test_audit_view_deduplication, test_get_access_logs_pagination, test_audit_status_requires_admin, test_audit_access_logs_endpoint, test_audit_auth_logs_endpoint, ] passed = 0 failed = 0 for test in tests: try: result = test() if result: passed += 1 else: print(f"❌ {test.__name__} returned False") failed += 1 except Exception as e: print(f"❌ {test.__name__} raised: {e}") import traceback traceback.print_exc() failed += 1 print(f"\n{'='*40}") print(f"Results: {passed} passed, {failed} failed") print("✅ ALL PASS" if failed == 0 else "❌ SOME FAILED") sys.exit(0 if failed == 0 else 1) if __name__ == "__main__": main()