332 lines
12 KiB
Python
332 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests for the Loi 25 audit system.
|
|
|
|
Covers:
|
|
- audit_access(): adds to session, does NOT commit
|
|
- audit_login(): commits independently
|
|
- audit_failed_login(): commits, uses email_hash (not plain email)
|
|
- _is_recent_duplicate(): deduplication window
|
|
- get_access_logs() / get_auth_logs(): pagination and filters
|
|
- Admin API endpoints: /api/admin/audit/status, /access, /auth
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import hashlib
|
|
|
|
# Add the parent directory to the path to import app
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
|
|
os.environ.setdefault('ENABLE_AUDIT_LOG', 'true')
|
|
os.environ['ENABLE_AUDIT_LOG'] = 'true'
|
|
|
|
from src.app import app, db
|
|
from src.models import User
|
|
from src.models.access_log import AccessLog
|
|
from src.models.auth_log import AuthLog
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_user(username, is_admin=False):
|
|
user = User(username=username, email=f"{username}@test.local", is_admin=is_admin)
|
|
user.set_password("TestPass1!")
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user
|
|
|
|
|
|
def _login_client(client, user):
|
|
"""Push a real Flask-Login session for a user via test request context."""
|
|
from flask_login import login_user
|
|
with client.session_transaction() as sess:
|
|
pass
|
|
# Use the test client's post to login
|
|
with app.test_request_context():
|
|
login_user(user)
|
|
# Directly inject the user_id into the session cookie
|
|
with client.session_transaction() as sess:
|
|
sess['_user_id'] = str(user.id)
|
|
sess['_fresh'] = True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Service-level tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_audit_access_no_commit():
|
|
"""audit_access() adds to session but does NOT commit."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
user = _make_user("audit_no_commit")
|
|
try:
|
|
initial_count = AccessLog.query.count()
|
|
|
|
from src.services.audit import audit_access
|
|
with app.test_request_context():
|
|
from flask_login import login_user
|
|
login_user(user)
|
|
log = audit_access('edit', 'recording', 1, user_id=user.id)
|
|
|
|
# Log object returned but not yet in DB (no commit happened)
|
|
assert log is not None, "audit_access should return a log object"
|
|
# The session hasn't been committed, so count is still the same
|
|
# (SQLite in :memory: — flush to verify it's in session)
|
|
db.session.flush()
|
|
assert AccessLog.query.count() == initial_count + 1, "Log should be in session after flush"
|
|
db.session.rollback() # undo — simulates caller rollback
|
|
assert AccessLog.query.count() == initial_count, "Log should be gone after rollback"
|
|
|
|
print("✅ audit_access() does not commit")
|
|
return True
|
|
finally:
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def test_audit_login_commits():
|
|
"""audit_login() commits its own transaction."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
user = _make_user("audit_login_user")
|
|
try:
|
|
initial_count = AuthLog.query.count()
|
|
|
|
from src.services.audit import audit_login
|
|
with app.test_request_context():
|
|
audit_login(user.id)
|
|
|
|
# Should be committed — visible in a fresh query
|
|
assert AuthLog.query.count() == initial_count + 1, "auth log should be committed"
|
|
log = AuthLog.query.order_by(AuthLog.id.desc()).first()
|
|
assert log.action == 'login'
|
|
assert log.user_id == user.id
|
|
|
|
print("✅ audit_login() commits independently")
|
|
return True
|
|
finally:
|
|
AuthLog.query.filter_by(user_id=user.id).delete()
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def test_audit_failed_login_uses_email_hash():
|
|
"""audit_failed_login() stores email_hash, not plain email."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
initial_count = AuthLog.query.count()
|
|
email = "attacker-target@example.com"
|
|
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
|
|
|
|
from src.services.audit import audit_failed_login
|
|
with app.test_request_context():
|
|
audit_failed_login(details={'email_hash': email_hash, 'reason': 'wrong_password'})
|
|
|
|
assert AuthLog.query.count() == initial_count + 1
|
|
log = AuthLog.query.order_by(AuthLog.id.desc()).first()
|
|
assert log.action == 'failed_login'
|
|
assert log.details is not None
|
|
assert 'email_hash' in log.details, "Should store email_hash, not plain email"
|
|
assert 'email' not in log.details, "Should NOT store plain email"
|
|
assert log.details['email_hash'] == email_hash
|
|
|
|
# Cleanup
|
|
db.session.delete(log)
|
|
db.session.commit()
|
|
|
|
print("✅ audit_failed_login() stores email_hash, not plain email")
|
|
return True
|
|
|
|
|
|
def test_audit_view_deduplication():
|
|
"""audit_view() on the same resource within 5 min creates only one log entry."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
user = _make_user("audit_dedup_user")
|
|
try:
|
|
initial_count = AccessLog.query.count()
|
|
|
|
from src.services.audit import audit_view
|
|
with app.test_request_context():
|
|
from flask_login import login_user
|
|
login_user(user)
|
|
# First view — should log
|
|
log1 = audit_view('recording', 42, user_id=user.id)
|
|
db.session.commit()
|
|
# Second view within 5 min — should be deduped
|
|
log2 = audit_view('recording', 42, user_id=user.id)
|
|
if log2 is not None:
|
|
db.session.commit()
|
|
|
|
count_after = AccessLog.query.filter_by(
|
|
user_id=user.id, action='view', resource_type='recording', resource_id=42
|
|
).count()
|
|
assert count_after == 1, f"Expected 1 log entry, got {count_after} (dedup failed)"
|
|
|
|
print("✅ audit_view() deduplication works (5-min window)")
|
|
return True
|
|
finally:
|
|
AccessLog.query.filter_by(user_id=user.id).delete()
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def test_get_access_logs_pagination():
|
|
"""get_access_logs() returns paginated results."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
user = _make_user("audit_pag_user")
|
|
try:
|
|
# Create 5 access log entries
|
|
for i in range(5):
|
|
log = AccessLog.log_access(
|
|
action='view', resource_type='recording', resource_id=i,
|
|
user_id=user.id, status='success',
|
|
)
|
|
db.session.add(log)
|
|
db.session.commit()
|
|
|
|
from src.services.audit import get_access_logs
|
|
page1 = get_access_logs(page=1, per_page=3, user_id=user.id)
|
|
assert page1.total >= 5
|
|
assert len(page1.items) == 3
|
|
|
|
page2 = get_access_logs(page=2, per_page=3, user_id=user.id)
|
|
assert len(page2.items) >= 2
|
|
|
|
print("✅ get_access_logs() pagination works")
|
|
return True
|
|
finally:
|
|
AccessLog.query.filter_by(user_id=user.id).delete()
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin API endpoint tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_audit_status_requires_admin():
|
|
"""GET /api/admin/audit/status: 401 anon, 403 non-admin, 200 admin."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
regular = _make_user("audit_status_regular")
|
|
admin = _make_user("audit_status_admin", is_admin=True)
|
|
client = app.test_client()
|
|
|
|
try:
|
|
# Anonymous — should redirect to login (302) or 401
|
|
resp = client.get('/api/admin/audit/status')
|
|
assert resp.status_code in (401, 302), f"Expected 401/302 for anon, got {resp.status_code}"
|
|
|
|
# Regular user — should get 403
|
|
_login_client(client, regular)
|
|
resp = client.get('/api/admin/audit/status')
|
|
assert resp.status_code == 403, f"Expected 403 for non-admin, got {resp.status_code}"
|
|
|
|
# Admin — should get 200
|
|
_login_client(client, admin)
|
|
resp = client.get('/api/admin/audit/status')
|
|
assert resp.status_code == 200, f"Expected 200 for admin, got {resp.status_code}"
|
|
data = resp.get_json()
|
|
assert 'enabled' in data
|
|
|
|
print("✅ /api/admin/audit/status access control works")
|
|
return True
|
|
finally:
|
|
db.session.delete(regular)
|
|
db.session.delete(admin)
|
|
db.session.commit()
|
|
|
|
|
|
def test_audit_access_logs_endpoint():
|
|
"""GET /api/admin/audit/access returns paginated logs for admin."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
admin = _make_user("audit_access_ep_admin", is_admin=True)
|
|
client = app.test_client()
|
|
|
|
try:
|
|
_login_client(client, admin)
|
|
resp = client.get('/api/admin/audit/access?per_page=10')
|
|
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}"
|
|
data = resp.get_json()
|
|
assert 'logs' in data
|
|
assert 'total' in data
|
|
assert 'page' in data
|
|
|
|
print("✅ /api/admin/audit/access returns correct structure")
|
|
return True
|
|
finally:
|
|
db.session.delete(admin)
|
|
db.session.commit()
|
|
|
|
|
|
def test_audit_auth_logs_endpoint():
|
|
"""GET /api/admin/audit/auth returns paginated auth logs for admin."""
|
|
with app.app_context():
|
|
db.create_all()
|
|
admin = _make_user("audit_auth_ep_admin", is_admin=True)
|
|
client = app.test_client()
|
|
|
|
try:
|
|
_login_client(client, admin)
|
|
resp = client.get('/api/admin/audit/auth?per_page=10')
|
|
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}"
|
|
data = resp.get_json()
|
|
assert 'logs' in data
|
|
assert 'total' in data
|
|
|
|
print("✅ /api/admin/audit/auth returns correct structure")
|
|
return True
|
|
finally:
|
|
db.session.delete(admin)
|
|
db.session.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
print("🚀 Running audit system tests...\n")
|
|
tests = [
|
|
test_audit_access_no_commit,
|
|
test_audit_login_commits,
|
|
test_audit_failed_login_uses_email_hash,
|
|
test_audit_view_deduplication,
|
|
test_get_access_logs_pagination,
|
|
test_audit_status_requires_admin,
|
|
test_audit_access_logs_endpoint,
|
|
test_audit_auth_logs_endpoint,
|
|
]
|
|
|
|
passed = 0
|
|
failed = 0
|
|
for test in tests:
|
|
try:
|
|
result = test()
|
|
if result:
|
|
passed += 1
|
|
else:
|
|
print(f"❌ {test.__name__} returned False")
|
|
failed += 1
|
|
except Exception as e:
|
|
print(f"❌ {test.__name__} raised: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
failed += 1
|
|
|
|
print(f"\n{'='*40}")
|
|
print(f"Results: {passed} passed, {failed} failed")
|
|
print("✅ ALL PASS" if failed == 0 else "❌ SOME FAILED")
|
|
sys.exit(0 if failed == 0 else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|