Files
dictia-public/tests/test_audit.py

332 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Tests for the Loi 25 audit system.
Covers:
- audit_access(): adds to session, does NOT commit
- audit_login(): commits independently
- audit_failed_login(): commits, uses email_hash (not plain email)
- _is_recent_duplicate(): deduplication window
- get_access_logs() / get_auth_logs(): pagination and filters
- Admin API endpoints: /api/admin/audit/status, /access, /auth
"""
import os
import sys
import hashlib
# Add the parent directory to the path to import app
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
os.environ.setdefault('ENABLE_AUDIT_LOG', 'true')
os.environ['ENABLE_AUDIT_LOG'] = 'true'
from src.app import app, db
from src.models import User
from src.models.access_log import AccessLog
from src.models.auth_log import AuthLog
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_user(username, is_admin=False):
user = User(username=username, email=f"{username}@test.local", is_admin=is_admin)
user.set_password("TestPass1!")
db.session.add(user)
db.session.commit()
return user
def _login_client(client, user):
"""Push a real Flask-Login session for a user via test request context."""
from flask_login import login_user
with client.session_transaction() as sess:
pass
# Use the test client's post to login
with app.test_request_context():
login_user(user)
# Directly inject the user_id into the session cookie
with client.session_transaction() as sess:
sess['_user_id'] = str(user.id)
sess['_fresh'] = True
# ---------------------------------------------------------------------------
# Service-level tests
# ---------------------------------------------------------------------------
def test_audit_access_no_commit():
"""audit_access() adds to session but does NOT commit."""
with app.app_context():
db.create_all()
user = _make_user("audit_no_commit")
try:
initial_count = AccessLog.query.count()
from src.services.audit import audit_access
with app.test_request_context():
from flask_login import login_user
login_user(user)
log = audit_access('edit', 'recording', 1, user_id=user.id)
# Log object returned but not yet in DB (no commit happened)
assert log is not None, "audit_access should return a log object"
# The session hasn't been committed, so count is still the same
# (SQLite in :memory: — flush to verify it's in session)
db.session.flush()
assert AccessLog.query.count() == initial_count + 1, "Log should be in session after flush"
db.session.rollback() # undo — simulates caller rollback
assert AccessLog.query.count() == initial_count, "Log should be gone after rollback"
print("✅ audit_access() does not commit")
return True
finally:
db.session.delete(user)
db.session.commit()
def test_audit_login_commits():
"""audit_login() commits its own transaction."""
with app.app_context():
db.create_all()
user = _make_user("audit_login_user")
try:
initial_count = AuthLog.query.count()
from src.services.audit import audit_login
with app.test_request_context():
audit_login(user.id)
# Should be committed — visible in a fresh query
assert AuthLog.query.count() == initial_count + 1, "auth log should be committed"
log = AuthLog.query.order_by(AuthLog.id.desc()).first()
assert log.action == 'login'
assert log.user_id == user.id
print("✅ audit_login() commits independently")
return True
finally:
AuthLog.query.filter_by(user_id=user.id).delete()
db.session.delete(user)
db.session.commit()
def test_audit_failed_login_uses_email_hash():
"""audit_failed_login() stores email_hash, not plain email."""
with app.app_context():
db.create_all()
initial_count = AuthLog.query.count()
email = "attacker-target@example.com"
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
from src.services.audit import audit_failed_login
with app.test_request_context():
audit_failed_login(details={'email_hash': email_hash, 'reason': 'wrong_password'})
assert AuthLog.query.count() == initial_count + 1
log = AuthLog.query.order_by(AuthLog.id.desc()).first()
assert log.action == 'failed_login'
assert log.details is not None
assert 'email_hash' in log.details, "Should store email_hash, not plain email"
assert 'email' not in log.details, "Should NOT store plain email"
assert log.details['email_hash'] == email_hash
# Cleanup
db.session.delete(log)
db.session.commit()
print("✅ audit_failed_login() stores email_hash, not plain email")
return True
def test_audit_view_deduplication():
"""audit_view() on the same resource within 5 min creates only one log entry."""
with app.app_context():
db.create_all()
user = _make_user("audit_dedup_user")
try:
initial_count = AccessLog.query.count()
from src.services.audit import audit_view
with app.test_request_context():
from flask_login import login_user
login_user(user)
# First view — should log
log1 = audit_view('recording', 42, user_id=user.id)
db.session.commit()
# Second view within 5 min — should be deduped
log2 = audit_view('recording', 42, user_id=user.id)
if log2 is not None:
db.session.commit()
count_after = AccessLog.query.filter_by(
user_id=user.id, action='view', resource_type='recording', resource_id=42
).count()
assert count_after == 1, f"Expected 1 log entry, got {count_after} (dedup failed)"
print("✅ audit_view() deduplication works (5-min window)")
return True
finally:
AccessLog.query.filter_by(user_id=user.id).delete()
db.session.delete(user)
db.session.commit()
def test_get_access_logs_pagination():
"""get_access_logs() returns paginated results."""
with app.app_context():
db.create_all()
user = _make_user("audit_pag_user")
try:
# Create 5 access log entries
for i in range(5):
log = AccessLog.log_access(
action='view', resource_type='recording', resource_id=i,
user_id=user.id, status='success',
)
db.session.add(log)
db.session.commit()
from src.services.audit import get_access_logs
page1 = get_access_logs(page=1, per_page=3, user_id=user.id)
assert page1.total >= 5
assert len(page1.items) == 3
page2 = get_access_logs(page=2, per_page=3, user_id=user.id)
assert len(page2.items) >= 2
print("✅ get_access_logs() pagination works")
return True
finally:
AccessLog.query.filter_by(user_id=user.id).delete()
db.session.delete(user)
db.session.commit()
# ---------------------------------------------------------------------------
# Admin API endpoint tests
# ---------------------------------------------------------------------------
def test_audit_status_requires_admin():
"""GET /api/admin/audit/status: 401 anon, 403 non-admin, 200 admin."""
with app.app_context():
db.create_all()
regular = _make_user("audit_status_regular")
admin = _make_user("audit_status_admin", is_admin=True)
client = app.test_client()
try:
# Anonymous — should redirect to login (302) or 401
resp = client.get('/api/admin/audit/status')
assert resp.status_code in (401, 302), f"Expected 401/302 for anon, got {resp.status_code}"
# Regular user — should get 403
_login_client(client, regular)
resp = client.get('/api/admin/audit/status')
assert resp.status_code == 403, f"Expected 403 for non-admin, got {resp.status_code}"
# Admin — should get 200
_login_client(client, admin)
resp = client.get('/api/admin/audit/status')
assert resp.status_code == 200, f"Expected 200 for admin, got {resp.status_code}"
data = resp.get_json()
assert 'enabled' in data
print("✅ /api/admin/audit/status access control works")
return True
finally:
db.session.delete(regular)
db.session.delete(admin)
db.session.commit()
def test_audit_access_logs_endpoint():
"""GET /api/admin/audit/access returns paginated logs for admin."""
with app.app_context():
db.create_all()
admin = _make_user("audit_access_ep_admin", is_admin=True)
client = app.test_client()
try:
_login_client(client, admin)
resp = client.get('/api/admin/audit/access?per_page=10')
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}"
data = resp.get_json()
assert 'logs' in data
assert 'total' in data
assert 'page' in data
print("✅ /api/admin/audit/access returns correct structure")
return True
finally:
db.session.delete(admin)
db.session.commit()
def test_audit_auth_logs_endpoint():
"""GET /api/admin/audit/auth returns paginated auth logs for admin."""
with app.app_context():
db.create_all()
admin = _make_user("audit_auth_ep_admin", is_admin=True)
client = app.test_client()
try:
_login_client(client, admin)
resp = client.get('/api/admin/audit/auth?per_page=10')
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}"
data = resp.get_json()
assert 'logs' in data
assert 'total' in data
print("✅ /api/admin/audit/auth returns correct structure")
return True
finally:
db.session.delete(admin)
db.session.commit()
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main():
print("🚀 Running audit system tests...\n")
tests = [
test_audit_access_no_commit,
test_audit_login_commits,
test_audit_failed_login_uses_email_hash,
test_audit_view_deduplication,
test_get_access_logs_pagination,
test_audit_status_requires_admin,
test_audit_access_logs_endpoint,
test_audit_auth_logs_endpoint,
]
passed = 0
failed = 0
for test in tests:
try:
result = test()
if result:
passed += 1
else:
print(f"{test.__name__} returned False")
failed += 1
except Exception as e:
print(f"{test.__name__} raised: {e}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*40}")
print(f"Results: {passed} passed, {failed} failed")
print("✅ ALL PASS" if failed == 0 else "❌ SOME FAILED")
sys.exit(0 if failed == 0 else 1)
if __name__ == "__main__":
main()