feat(billing): B-2.8 Stripe webhook handler (subscription lifecycle + idempotency)
Endpoint: POST /checkout/webhooks/stripe (CSRF-exempt; signature-verified) Handles 5 Stripe events: - checkout.session.completed -> create Subscription, activate user - customer.subscription.updated -> sync status + current_period_end - customer.subscription.deleted -> mark canceled - invoice.payment_succeeded -> recover from past_due if applicable - invoice.payment_failed -> mark past_due Idempotency via WebhookEvent table (Stripe ID dedup) and Subscription unique constraint on stripe_subscription_id (defends against duplicate deliveries with distinct event IDs). User resolution prefers stripe_customer_id (server-set, anti-tamper) over event metadata.dictia_user_id over customer_email (per B-2.7 review note). New tables created via db.create_all(): - subscription (FK user.id ondelete=SET NULL for Loi 25 art. 28.1) - webhook_event (idempotency ledger) CSRF exemption wired via src/billing/exempt_webhook_csrf(csrf) called from src/app.py after billing_bp registration. Tests: 17/17 pass via tests/_run_stripe_webhook_windows.py. Existing 25 B-2.7 + 21 TOTP + 22 WebAuthn + 21 OAuth + 16 email tests unaffected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -61,8 +61,20 @@
|
|||||||
# - One recurring yearly Price (CAD, = monthly × 12 × 0.85)
|
# - One recurring yearly Price (CAD, = monthly × 12 × 0.85)
|
||||||
# For DictIA 8 and DictIA 16, also create a one-time Price for the setup fee.
|
# For DictIA 8 and DictIA 16, also create a one-time Price for the setup fee.
|
||||||
#
|
#
|
||||||
# 5. Create a webhook endpoint (B-2.8) pointing at https://dictia.ca/webhooks/stripe
|
# 5. Create a webhook endpoint (B-2.8) pointing at
|
||||||
# with at least the events: checkout.session.completed,
|
# https://your-domain.example/checkout/webhooks/stripe
|
||||||
# customer.subscription.created, customer.subscription.updated,
|
# (the route lives under the /checkout/* prefix; CSRF-exempt; signature-
|
||||||
# customer.subscription.deleted, invoice.payment_failed.
|
# verified via STRIPE_WEBHOOK_SECRET below).
|
||||||
# Copy the signing secret into STRIPE_WEBHOOK_SECRET above.
|
#
|
||||||
|
# Subscribe at minimum to these 5 events (the only ones the handler
|
||||||
|
# processes; all others are acknowledged with 200 + ignored):
|
||||||
|
# - checkout.session.completed (creates Subscription row, sets
|
||||||
|
# User.subscription_status='active')
|
||||||
|
# - customer.subscription.updated (status / current_period_end sync)
|
||||||
|
# - customer.subscription.deleted (marks status='canceled')
|
||||||
|
# - invoice.payment_succeeded (renewal touch; recovers past_due)
|
||||||
|
# - invoice.payment_failed (marks status='past_due')
|
||||||
|
#
|
||||||
|
# Copy the signing secret (whsec_...) into STRIPE_WEBHOOK_SECRET above.
|
||||||
|
# Without that secret, the webhook endpoint returns 400 invalid_signature
|
||||||
|
# on every delivery (Stripe will retry for up to 30 days).
|
||||||
|
|||||||
@@ -647,6 +647,12 @@ app.register_blueprint(marketing_bp)
|
|||||||
app.register_blueprint(billing_bp)
|
app.register_blueprint(billing_bp)
|
||||||
app.register_blueprint(legal_bp)
|
app.register_blueprint(legal_bp)
|
||||||
|
|
||||||
|
# B-2.8: CSRF-exempt the Stripe webhook (signature-verified server-to-server).
|
||||||
|
# Must be called AFTER billing_bp is registered (so the view function exists
|
||||||
|
# in app.view_functions) and AFTER csrf is initialized (already done above).
|
||||||
|
from src.billing import exempt_webhook_csrf as _exempt_webhook_csrf
|
||||||
|
_exempt_webhook_csrf(csrf)
|
||||||
|
|
||||||
# Initialize Microsoft + Google OAuth providers (B-2.4) — no-op if env vars absent.
|
# Initialize Microsoft + Google OAuth providers (B-2.4) — no-op if env vars absent.
|
||||||
# Must run AFTER blueprints are registered (Authlib's OAuth object needs to be
|
# Must run AFTER blueprints are registered (Authlib's OAuth object needs to be
|
||||||
# attached to the running app instance).
|
# attached to the running app instance).
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
"""Billing blueprint - Stripe Checkout, webhook, subscription management.
|
"""Billing blueprint - Stripe Checkout, webhook, subscription management.
|
||||||
|
|
||||||
Mounted at /checkout/* prefix for the customer-facing checkout flow. The
|
Mounted at /checkout/* prefix for the customer-facing checkout flow.
|
||||||
/webhooks/stripe route (added in B-2.8) bypasses the prefix and is also
|
The webhook (B-2.8) is exposed at /checkout/webhooks/stripe and is
|
||||||
csrf-exempted.
|
CSRF-exempted via `exempt_webhook_csrf` (signature-verified instead).
|
||||||
|
|
||||||
Routes added in Tasks B-2.7 (checkout) and B-2.8 (webhook).
|
Routes added in Tasks B-2.7 (checkout) and B-2.8 (webhook).
|
||||||
"""
|
"""
|
||||||
@@ -22,3 +22,15 @@ billing_bp = Blueprint(
|
|||||||
# Import routes to register them on billing_bp. Must come after blueprint
|
# Import routes to register them on billing_bp. Must come after blueprint
|
||||||
# instantiation. Keep the # noqa comments — these guards exist for ruff/flake8.
|
# instantiation. Keep the # noqa comments — these guards exist for ruff/flake8.
|
||||||
from src.billing import routes # noqa: E402, F401
|
from src.billing import routes # noqa: E402, F401
|
||||||
|
from src.billing import webhooks # noqa: E402, F401
|
||||||
|
|
||||||
|
|
||||||
|
def exempt_webhook_csrf(csrf_protect):
|
||||||
|
"""Exempt the Stripe webhook view from CSRF protection.
|
||||||
|
|
||||||
|
Called from app.py after CSRFProtect is initialized. Stripe webhooks have
|
||||||
|
no CSRF token (server-to-server). The `stripe_webhook` view validates
|
||||||
|
Stripe's signature header (`Stripe-Signature` + STRIPE_WEBHOOK_SECRET) instead.
|
||||||
|
"""
|
||||||
|
from src.billing.webhooks import stripe_webhook
|
||||||
|
csrf_protect.exempt(stripe_webhook)
|
||||||
|
|||||||
@@ -5,8 +5,8 @@ URL space (prefix `/checkout`, set on billing_bp):
|
|||||||
- GET /checkout/success?session_id=... → confirmation page (async activation note)
|
- GET /checkout/success?session_id=... → confirmation page (async activation note)
|
||||||
- GET /checkout/cancel → friendly "no charge made" page
|
- GET /checkout/cancel → friendly "no charge made" page
|
||||||
|
|
||||||
The webhook route (B-2.8) is registered separately at /webhooks/stripe outside
|
The webhook route (B-2.8) is registered at /checkout/webhooks/stripe (under
|
||||||
the /checkout prefix and is CSRF-exempt.
|
the same blueprint prefix) and is CSRF-exempt (signature-verified instead).
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|||||||
320
src/billing/webhooks.py
Normal file
320
src/billing/webhooks.py
Normal file
@@ -0,0 +1,320 @@
|
|||||||
|
"""Stripe webhook handler (B-2.8) — subscription lifecycle.
|
||||||
|
|
||||||
|
Endpoint: POST /checkout/webhooks/stripe (CSRF-exempt; signature verified)
|
||||||
|
|
||||||
|
Handled events:
|
||||||
|
- checkout.session.completed: create Subscription row, set User.subscription_status
|
||||||
|
- customer.subscription.updated: update status + current_period_end
|
||||||
|
- customer.subscription.deleted: mark status='canceled', clear User.subscription_status
|
||||||
|
- invoice.payment_succeeded: touch updated_at (renewal confirmation)
|
||||||
|
- invoice.payment_failed: set status='past_due'
|
||||||
|
|
||||||
|
All other event types are acknowledged with 200 but ignored.
|
||||||
|
|
||||||
|
Idempotency: every processed event ID is recorded in WebhookEvent.
|
||||||
|
Duplicate deliveries return 200 immediately without re-processing.
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import stripe
|
||||||
|
from flask import jsonify, request
|
||||||
|
|
||||||
|
from src.billing import billing_bp
|
||||||
|
from src.billing.plans import VALID_PERIODS, get_plan
|
||||||
|
from src.billing.stripe_client import is_stripe_configured
|
||||||
|
from src.database import db
|
||||||
|
from src.models import Subscription, User, WebhookEvent
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_webhook_secret() -> Optional[str]:
|
||||||
|
"""Return STRIPE_WEBHOOK_SECRET, or None if not configured."""
|
||||||
|
return os.environ.get('STRIPE_WEBHOOK_SECRET')
|
||||||
|
|
||||||
|
|
||||||
|
def is_webhook_configured() -> bool:
|
||||||
|
return bool(get_webhook_secret() and is_stripe_configured())
|
||||||
|
|
||||||
|
|
||||||
|
def _verify_event(payload: bytes, sig_header: str):
|
||||||
|
"""Validate Stripe signature and return the parsed event, or None on failure."""
|
||||||
|
secret = get_webhook_secret()
|
||||||
|
if not secret:
|
||||||
|
logger.error('STRIPE_WEBHOOK_SECRET not set; rejecting webhook')
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return stripe.Webhook.construct_event(payload, sig_header, secret)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning('Stripe webhook: invalid JSON payload')
|
||||||
|
return None
|
||||||
|
except stripe.error.SignatureVerificationError:
|
||||||
|
logger.warning('Stripe webhook: signature verification failed')
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _is_duplicate(event_id: str) -> bool:
|
||||||
|
return WebhookEvent.query.filter_by(stripe_event_id=event_id).first() is not None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_user_for_event(event_obj: dict) -> Optional[User]:
|
||||||
|
"""Resolve the DictIA User from a Stripe event object.
|
||||||
|
|
||||||
|
Trust order (anti-tamper per B-2.7 review note):
|
||||||
|
1. Look up by stripe_customer_id on the event object — this is server-set
|
||||||
|
by Stripe at customer creation, not user-controlled.
|
||||||
|
2. Fall back to event metadata 'dictia_user_id', re-validated against DB.
|
||||||
|
3. Fall back to customer_email lookup (last resort, rare for subscriptions).
|
||||||
|
"""
|
||||||
|
cust_id = event_obj.get('customer')
|
||||||
|
if cust_id:
|
||||||
|
user = User.query.filter_by(stripe_customer_id=cust_id).first()
|
||||||
|
if user:
|
||||||
|
return user
|
||||||
|
|
||||||
|
metadata = event_obj.get('metadata') or {}
|
||||||
|
raw_user_id = metadata.get('dictia_user_id')
|
||||||
|
if raw_user_id:
|
||||||
|
try:
|
||||||
|
uid = int(raw_user_id)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
uid = None
|
||||||
|
if uid is not None:
|
||||||
|
user = db.session.get(User, uid)
|
||||||
|
if user:
|
||||||
|
# Bind stripe_customer_id if missing (defensive)
|
||||||
|
if not user.stripe_customer_id and cust_id:
|
||||||
|
user.stripe_customer_id = cust_id
|
||||||
|
return user
|
||||||
|
|
||||||
|
email = event_obj.get('customer_email')
|
||||||
|
if email:
|
||||||
|
user = User.query.filter_by(email=email.lower().strip()).first()
|
||||||
|
if user and cust_id and not user.stripe_customer_id:
|
||||||
|
user.stripe_customer_id = cust_id
|
||||||
|
return user
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_plan_period(event_obj: dict, default_period: str = 'monthly') -> tuple:
|
||||||
|
"""Extract plan_slug and period from event metadata, validating both."""
|
||||||
|
metadata = event_obj.get('metadata') or {}
|
||||||
|
plan_slug = metadata.get('dictia_plan_slug')
|
||||||
|
period = metadata.get('dictia_period', default_period)
|
||||||
|
if get_plan(plan_slug) is None:
|
||||||
|
plan_slug = None # invalid / missing — leave for handler to log
|
||||||
|
if period not in VALID_PERIODS:
|
||||||
|
period = default_period
|
||||||
|
return plan_slug, period
|
||||||
|
|
||||||
|
|
||||||
|
def _ts_to_dt(ts) -> Optional[datetime]:
|
||||||
|
if ts is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.fromtimestamp(int(ts), tz=timezone.utc).replace(tzinfo=None)
|
||||||
|
except (TypeError, ValueError, OSError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _record_event(event, sub_id: Optional[str], cust_id: Optional[str]) -> None:
|
||||||
|
"""Insert a WebhookEvent row marking this event as processed."""
|
||||||
|
db.session.add(WebhookEvent(
|
||||||
|
stripe_event_id=event.id,
|
||||||
|
event_type=event.type,
|
||||||
|
stripe_subscription_id=sub_id,
|
||||||
|
stripe_customer_id=cust_id,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_checkout_session_completed(event) -> None:
|
||||||
|
obj = event.data.object # stripe.checkout.Session
|
||||||
|
user = _resolve_user_for_event(obj)
|
||||||
|
sub_id = obj.get('subscription')
|
||||||
|
cust_id = obj.get('customer')
|
||||||
|
plan_slug, period = _resolve_plan_period(obj)
|
||||||
|
|
||||||
|
if not user:
|
||||||
|
logger.warning('checkout.session.completed: no user for cust=%s sub=%s', cust_id, sub_id)
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
return
|
||||||
|
if not sub_id:
|
||||||
|
logger.warning('checkout.session.completed: missing subscription id for user %s', user.id)
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
return
|
||||||
|
if not plan_slug:
|
||||||
|
logger.warning('checkout.session.completed: missing/invalid plan_slug metadata for sub=%s', sub_id)
|
||||||
|
plan_slug = 'unknown'
|
||||||
|
|
||||||
|
# Look up the existing subscription row (defensive against duplicate webhooks)
|
||||||
|
existing = Subscription.query.filter_by(stripe_subscription_id=sub_id).first()
|
||||||
|
now = datetime.utcnow()
|
||||||
|
if existing:
|
||||||
|
existing.status = 'active'
|
||||||
|
existing.updated_at = now
|
||||||
|
else:
|
||||||
|
# We need current_period_end — pull it from the subscription object
|
||||||
|
# if the event includes it; otherwise leave None and let
|
||||||
|
# customer.subscription.updated fill it in.
|
||||||
|
period_end = None
|
||||||
|
# Fetch the subscription via Stripe API for accurate period_end
|
||||||
|
try:
|
||||||
|
from src.billing.stripe_client import _ensure_configured
|
||||||
|
_ensure_configured()
|
||||||
|
sub_obj = stripe.Subscription.retrieve(sub_id)
|
||||||
|
period_end = _ts_to_dt(sub_obj.get('current_period_end'))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning('Could not fetch subscription %s for period_end: %s', sub_id, e)
|
||||||
|
db.session.add(Subscription(
|
||||||
|
user_id=user.id,
|
||||||
|
stripe_customer_id=cust_id,
|
||||||
|
stripe_subscription_id=sub_id,
|
||||||
|
plan_slug=plan_slug,
|
||||||
|
period=period,
|
||||||
|
status='active',
|
||||||
|
current_period_end=period_end,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
))
|
||||||
|
|
||||||
|
user.subscription_status = 'active'
|
||||||
|
if cust_id and not user.stripe_customer_id:
|
||||||
|
user.stripe_customer_id = cust_id
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_subscription_updated(event) -> None:
|
||||||
|
obj = event.data.object # stripe.Subscription
|
||||||
|
sub_id = obj.get('id')
|
||||||
|
cust_id = obj.get('customer')
|
||||||
|
new_status = obj.get('status')
|
||||||
|
period_end = _ts_to_dt(obj.get('current_period_end'))
|
||||||
|
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id=sub_id).first()
|
||||||
|
user = _resolve_user_for_event({'customer': cust_id, 'metadata': obj.get('metadata') or {}})
|
||||||
|
|
||||||
|
now = datetime.utcnow()
|
||||||
|
if sub:
|
||||||
|
if new_status:
|
||||||
|
sub.status = new_status
|
||||||
|
if period_end:
|
||||||
|
sub.current_period_end = period_end
|
||||||
|
sub.updated_at = now
|
||||||
|
else:
|
||||||
|
# Webhook arrived before we created the row (race) — create defensively
|
||||||
|
plan_slug, period = _resolve_plan_period(obj)
|
||||||
|
db.session.add(Subscription(
|
||||||
|
user_id=user.id if user else None,
|
||||||
|
stripe_customer_id=cust_id,
|
||||||
|
stripe_subscription_id=sub_id,
|
||||||
|
plan_slug=plan_slug or 'unknown',
|
||||||
|
period=period,
|
||||||
|
status=new_status or 'unknown',
|
||||||
|
current_period_end=period_end,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
))
|
||||||
|
|
||||||
|
if user and new_status:
|
||||||
|
user.subscription_status = new_status
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_subscription_deleted(event) -> None:
|
||||||
|
obj = event.data.object
|
||||||
|
sub_id = obj.get('id')
|
||||||
|
cust_id = obj.get('customer')
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id=sub_id).first()
|
||||||
|
user = _resolve_user_for_event({'customer': cust_id, 'metadata': obj.get('metadata') or {}})
|
||||||
|
now = datetime.utcnow()
|
||||||
|
if sub:
|
||||||
|
sub.status = 'canceled'
|
||||||
|
sub.updated_at = now
|
||||||
|
if user:
|
||||||
|
user.subscription_status = 'canceled'
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_invoice_payment_succeeded(event) -> None:
|
||||||
|
obj = event.data.object # stripe.Invoice
|
||||||
|
sub_id = obj.get('subscription')
|
||||||
|
cust_id = obj.get('customer')
|
||||||
|
if sub_id:
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id=sub_id).first()
|
||||||
|
if sub:
|
||||||
|
sub.updated_at = datetime.utcnow()
|
||||||
|
if sub.status == 'past_due':
|
||||||
|
sub.status = 'active'
|
||||||
|
user = _resolve_user_for_event({'customer': cust_id, 'metadata': {}})
|
||||||
|
if user:
|
||||||
|
user.subscription_status = 'active'
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_invoice_payment_failed(event) -> None:
|
||||||
|
obj = event.data.object
|
||||||
|
sub_id = obj.get('subscription')
|
||||||
|
cust_id = obj.get('customer')
|
||||||
|
if sub_id:
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id=sub_id).first()
|
||||||
|
user = _resolve_user_for_event({'customer': cust_id, 'metadata': {}})
|
||||||
|
if sub:
|
||||||
|
sub.status = 'past_due'
|
||||||
|
sub.updated_at = datetime.utcnow()
|
||||||
|
if user:
|
||||||
|
user.subscription_status = 'past_due'
|
||||||
|
_record_event(event, sub_id, cust_id)
|
||||||
|
|
||||||
|
|
||||||
|
_HANDLERS = {
|
||||||
|
'checkout.session.completed': _handle_checkout_session_completed,
|
||||||
|
'customer.subscription.updated': _handle_subscription_updated,
|
||||||
|
'customer.subscription.deleted': _handle_subscription_deleted,
|
||||||
|
'invoice.payment_succeeded': _handle_invoice_payment_succeeded,
|
||||||
|
'invoice.payment_failed': _handle_invoice_payment_failed,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@billing_bp.route('/webhooks/stripe', methods=['POST'])
|
||||||
|
def stripe_webhook():
|
||||||
|
"""Stripe webhook endpoint. Signature-verified; CSRF-exempt.
|
||||||
|
|
||||||
|
Returns 400 on signature failure (Stripe will retry); 200 otherwise
|
||||||
|
(even for unhandled event types, to acknowledge receipt).
|
||||||
|
"""
|
||||||
|
payload = request.get_data()
|
||||||
|
sig_header = request.headers.get('Stripe-Signature', '')
|
||||||
|
event = _verify_event(payload, sig_header)
|
||||||
|
if event is None:
|
||||||
|
return jsonify({'error': 'invalid_signature'}), 400
|
||||||
|
|
||||||
|
# Idempotency check
|
||||||
|
if _is_duplicate(event.id):
|
||||||
|
logger.info('Stripe webhook: duplicate event %s ignored', event.id)
|
||||||
|
return jsonify({'received': True, 'duplicate': True})
|
||||||
|
|
||||||
|
handler = _HANDLERS.get(event.type)
|
||||||
|
if handler is None:
|
||||||
|
# Unhandled event type — record + ack so Stripe stops retrying
|
||||||
|
_record_event(event, None, None)
|
||||||
|
try:
|
||||||
|
db.session.commit()
|
||||||
|
except Exception:
|
||||||
|
db.session.rollback()
|
||||||
|
return jsonify({'received': True, 'handled': False})
|
||||||
|
|
||||||
|
try:
|
||||||
|
handler(event)
|
||||||
|
db.session.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception('Stripe webhook: handler for %s failed: %s', event.type, e)
|
||||||
|
db.session.rollback()
|
||||||
|
# Return 500 so Stripe retries — but only for genuine handler failures,
|
||||||
|
# not for malformed/unhandled events
|
||||||
|
return jsonify({'error': 'handler_failed'}), 500
|
||||||
|
|
||||||
|
return jsonify({'received': True})
|
||||||
@@ -34,6 +34,8 @@ from .processing_job import ProcessingJob
|
|||||||
from .token_usage import TokenUsage
|
from .token_usage import TokenUsage
|
||||||
from .transcription_usage import TranscriptionUsage
|
from .transcription_usage import TranscriptionUsage
|
||||||
from .consent import ConsentLog
|
from .consent import ConsentLog
|
||||||
|
from .subscription import Subscription
|
||||||
|
from .webhook_event import WebhookEvent
|
||||||
|
|
||||||
# Export all models
|
# Export all models
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@@ -72,4 +74,7 @@ __all__ = [
|
|||||||
'TokenUsage',
|
'TokenUsage',
|
||||||
'TranscriptionUsage',
|
'TranscriptionUsage',
|
||||||
'ConsentLog',
|
'ConsentLog',
|
||||||
|
# Billing models (B-2.8)
|
||||||
|
'Subscription',
|
||||||
|
'WebhookEvent',
|
||||||
]
|
]
|
||||||
|
|||||||
55
src/models/subscription.py
Normal file
55
src/models/subscription.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
"""DictIA subscription model — Stripe subscription state mirror (B-2.8).
|
||||||
|
|
||||||
|
This table is updated EXCLUSIVELY by the Stripe webhook handler. Never
|
||||||
|
write to it from user-facing routes (Checkout creates the Stripe
|
||||||
|
subscription; webhook reflects its state into our DB).
|
||||||
|
|
||||||
|
Each row corresponds to one Stripe Subscription object. A user can have
|
||||||
|
multiple historical subscriptions (renewed, cancelled, re-subscribed).
|
||||||
|
"""
|
||||||
|
from datetime import datetime
|
||||||
|
from src.database import db
|
||||||
|
|
||||||
|
|
||||||
|
class Subscription(db.Model):
|
||||||
|
"""One row per Stripe Subscription. The active row for a user is the
|
||||||
|
one with status in ('active', 'trialing', 'past_due') ordered by
|
||||||
|
created_at DESC."""
|
||||||
|
__tablename__ = 'subscription'
|
||||||
|
|
||||||
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
|
# Use ondelete=SET NULL so we keep historical billing records even if
|
||||||
|
# the user deletes their account (Loi 25 art. 28.1 right-to-erasure +
|
||||||
|
# accounting/tax retention obligations are reconciled by anonymizing
|
||||||
|
# rather than dropping the row).
|
||||||
|
user_id = db.Column(
|
||||||
|
db.Integer,
|
||||||
|
db.ForeignKey('user.id', ondelete='SET NULL'),
|
||||||
|
nullable=True,
|
||||||
|
index=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
stripe_customer_id = db.Column(db.String(120), nullable=False, index=True)
|
||||||
|
# Stripe subscription ID is unique — UNIQUE constraint also gives natural
|
||||||
|
# dedup against duplicate webhook deliveries of checkout.session.completed
|
||||||
|
stripe_subscription_id = db.Column(db.String(120), unique=True, nullable=False)
|
||||||
|
plan_slug = db.Column(db.String(40), nullable=False)
|
||||||
|
period = db.Column(db.String(10), nullable=False) # 'monthly' | 'yearly'
|
||||||
|
|
||||||
|
# Stripe subscription status: 'trialing' | 'active' | 'past_due' |
|
||||||
|
# 'canceled' | 'incomplete' | 'incomplete_expired' | 'unpaid' | 'paused'
|
||||||
|
status = db.Column(db.String(20), nullable=False, index=True)
|
||||||
|
|
||||||
|
# Period end: when next invoice will be billed (or when access expires
|
||||||
|
# if status='canceled' with cancel_at_period_end=True)
|
||||||
|
current_period_end = db.Column(db.DateTime, nullable=True)
|
||||||
|
|
||||||
|
# When the subscription was first created in Stripe
|
||||||
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
||||||
|
# Last time we received a webhook event updating this subscription
|
||||||
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
||||||
|
|
||||||
|
user = db.relationship('User', backref='subscriptions')
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<Subscription {self.stripe_subscription_id} {self.status} {self.plan_slug}/{self.period}>'
|
||||||
28
src/models/webhook_event.py
Normal file
28
src/models/webhook_event.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
"""Stripe webhook event ledger (B-2.8) — for idempotent processing.
|
||||||
|
|
||||||
|
Stripe delivers webhook events at least once. We record the event ID on
|
||||||
|
first successful processing; subsequent deliveries with the same ID are
|
||||||
|
no-op'd. Records are NOT garbage-collected automatically — operations
|
||||||
|
team can prune events older than 30 days if storage becomes a concern
|
||||||
|
(Stripe also has a 30-day delivery retry policy).
|
||||||
|
"""
|
||||||
|
from datetime import datetime
|
||||||
|
from src.database import db
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookEvent(db.Model):
|
||||||
|
"""One row per processed Stripe webhook event."""
|
||||||
|
__tablename__ = 'webhook_event'
|
||||||
|
|
||||||
|
id = db.Column(db.Integer, primary_key=True)
|
||||||
|
# Stripe event ID (`evt_xxx`) — primary dedup key
|
||||||
|
stripe_event_id = db.Column(db.String(80), unique=True, nullable=False, index=True)
|
||||||
|
event_type = db.Column(db.String(80), nullable=False, index=True)
|
||||||
|
processed_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
||||||
|
# Optional: store the related stripe_subscription_id or stripe_customer_id
|
||||||
|
# for fast lookup during incident debugging. Both nullable.
|
||||||
|
stripe_subscription_id = db.Column(db.String(120), nullable=True, index=True)
|
||||||
|
stripe_customer_id = db.Column(db.String(120), nullable=True, index=True)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<WebhookEvent {self.stripe_event_id} {self.event_type}>'
|
||||||
74
tests/_run_stripe_webhook_windows.py
Normal file
74
tests/_run_stripe_webhook_windows.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
"""Windows manual driver for tests/test_stripe_webhook.py.
|
||||||
|
|
||||||
|
src/init_db.py imports `fcntl`, which is POSIX-only. On Windows we stub it
|
||||||
|
before src.app gets imported, then run each test_* function and report.
|
||||||
|
|
||||||
|
Run from the repo root:
|
||||||
|
py -3 tests/_run_stripe_webhook_windows.py
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
# 1) Stub fcntl BEFORE any import of src.* happens.
|
||||||
|
if 'fcntl' not in sys.modules:
|
||||||
|
fcntl_stub = types.ModuleType('fcntl')
|
||||||
|
fcntl_stub.LOCK_EX = 2
|
||||||
|
fcntl_stub.LOCK_NB = 4
|
||||||
|
fcntl_stub.LOCK_UN = 8
|
||||||
|
fcntl_stub.LOCK_SH = 1
|
||||||
|
fcntl_stub.flock = lambda *_args, **_kw: None
|
||||||
|
fcntl_stub.fcntl = lambda *_args, **_kw: 0
|
||||||
|
sys.modules['fcntl'] = fcntl_stub
|
||||||
|
|
||||||
|
# 2) Make repo root importable
|
||||||
|
HERE = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
REPO = os.path.dirname(HERE)
|
||||||
|
sys.path.insert(0, REPO)
|
||||||
|
|
||||||
|
# 3) Test-friendly env defaults
|
||||||
|
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
|
||||||
|
os.environ.setdefault('SECRET_KEY', 'test-secret-key-webhook')
|
||||||
|
os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false')
|
||||||
|
os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false')
|
||||||
|
os.environ.setdefault('TRANSCRIPTION_BASE_URL', 'http://test-stub')
|
||||||
|
os.environ.setdefault('TRANSCRIPTION_API_KEY', 'test-stub')
|
||||||
|
os.environ.setdefault('RATELIMIT_ENABLED', 'false')
|
||||||
|
try:
|
||||||
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
||||||
|
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 4) Import the test module and run every test_* function
|
||||||
|
import importlib.util # noqa: E402
|
||||||
|
spec = importlib.util.spec_from_file_location(
|
||||||
|
'test_stripe_webhook',
|
||||||
|
os.path.join(HERE, 'test_stripe_webhook.py'),
|
||||||
|
)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
|
||||||
|
tests = [(name, fn) for name, fn in vars(mod).items()
|
||||||
|
if name.startswith('test_') and callable(fn)]
|
||||||
|
|
||||||
|
passed = 0
|
||||||
|
failed = []
|
||||||
|
for name, fn in tests:
|
||||||
|
try:
|
||||||
|
fn()
|
||||||
|
print(f' PASS {name}')
|
||||||
|
passed += 1
|
||||||
|
except Exception as e: # noqa: BLE001
|
||||||
|
print(f' FAIL {name}: {type(e).__name__}: {e}')
|
||||||
|
failed.append((name, traceback.format_exc()))
|
||||||
|
|
||||||
|
total = len(tests)
|
||||||
|
print()
|
||||||
|
print(f'Result: {passed}/{total} passed, {len(failed)} failed')
|
||||||
|
if failed:
|
||||||
|
print('\n--- Failures ---\n')
|
||||||
|
for name, tb in failed:
|
||||||
|
print(f'### {name}\n{tb}\n')
|
||||||
|
sys.exit(0 if not failed else 1)
|
||||||
784
tests/test_stripe_webhook.py
Normal file
784
tests/test_stripe_webhook.py
Normal file
@@ -0,0 +1,784 @@
|
|||||||
|
"""Tests for B-2.8 — Stripe Webhook (subscription lifecycle + idempotency).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- Signature verification (rejects missing/invalid; honors STRIPE_WEBHOOK_SECRET)
|
||||||
|
- Idempotency via WebhookEvent table
|
||||||
|
- Each event handler:
|
||||||
|
checkout.session.completed
|
||||||
|
customer.subscription.updated
|
||||||
|
customer.subscription.deleted
|
||||||
|
invoice.payment_succeeded
|
||||||
|
invoice.payment_failed
|
||||||
|
- User resolution order (stripe_customer_id → metadata → email)
|
||||||
|
- 500 retry path on handler exceptions
|
||||||
|
|
||||||
|
Mocks `_verify_event` (since real signatures need a real secret) and
|
||||||
|
`stripe.Subscription.retrieve` to avoid real Stripe API calls.
|
||||||
|
|
||||||
|
Note: pytest cannot collect this file on Windows native because src/init_db.py
|
||||||
|
imports `fcntl` (POSIX-only). Use tests/_run_stripe_webhook_windows.py.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
os.environ.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
|
||||||
|
os.environ.setdefault('SECRET_KEY', 'test-secret-key-webhook')
|
||||||
|
os.environ.setdefault('ENABLE_EMAIL_VERIFICATION', 'false')
|
||||||
|
os.environ.setdefault('REQUIRE_EMAIL_VERIFICATION', 'false')
|
||||||
|
os.environ.setdefault('RATELIMIT_ENABLED', 'false')
|
||||||
|
|
||||||
|
from src.app import app, db, bcrypt # noqa: E402
|
||||||
|
from src.models.user import User # noqa: E402
|
||||||
|
from src.models.subscription import Subscription # noqa: E402
|
||||||
|
from src.models.webhook_event import WebhookEvent # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
_WEBHOOK_ENV_VARS = (
|
||||||
|
'STRIPE_SECRET_KEY',
|
||||||
|
'STRIPE_WEBHOOK_SECRET',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _clear_webhook_env():
|
||||||
|
for k in _WEBHOOK_ENV_VARS:
|
||||||
|
os.environ.pop(k, None)
|
||||||
|
import stripe
|
||||||
|
stripe.api_key = None
|
||||||
|
|
||||||
|
|
||||||
|
def _disable_csrf():
|
||||||
|
app.config['WTF_CSRF_ENABLED'] = False
|
||||||
|
|
||||||
|
|
||||||
|
def _make_user(email='hookuser@example.qc.ca', password='Password!123',
|
||||||
|
username=None, name='Hook User',
|
||||||
|
stripe_customer_id=None,
|
||||||
|
subscription_status=None):
|
||||||
|
hashed = bcrypt.generate_password_hash(password).decode('utf-8')
|
||||||
|
u = User(
|
||||||
|
username=username or email.split('@', 1)[0][:20],
|
||||||
|
email=email,
|
||||||
|
password=hashed,
|
||||||
|
email_verified=True,
|
||||||
|
name=name,
|
||||||
|
stripe_customer_id=stripe_customer_id,
|
||||||
|
subscription_status=subscription_status,
|
||||||
|
)
|
||||||
|
db.session.add(u)
|
||||||
|
db.session.commit()
|
||||||
|
return u
|
||||||
|
|
||||||
|
|
||||||
|
def _make_event(event_type, obj_data, event_id='evt_test_123'):
|
||||||
|
"""Build a fake Stripe event for testing webhook handlers."""
|
||||||
|
return MagicMock(
|
||||||
|
id=event_id,
|
||||||
|
type=event_type,
|
||||||
|
data=MagicMock(object=obj_data),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_checkout_session(customer='cus_test', subscription='sub_test',
|
||||||
|
plan_slug='dictia-cloud', period='monthly',
|
||||||
|
email=None, user_id='1'):
|
||||||
|
return {
|
||||||
|
'id': 'cs_test_abc',
|
||||||
|
'customer': customer,
|
||||||
|
'subscription': subscription,
|
||||||
|
'customer_email': email,
|
||||||
|
'metadata': {
|
||||||
|
'dictia_user_id': user_id,
|
||||||
|
'dictia_plan_slug': plan_slug,
|
||||||
|
'dictia_period': period,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _make_subscription_obj(sub_id='sub_test', customer='cus_test',
|
||||||
|
status='active', period_end=1730000000,
|
||||||
|
plan_slug='dictia-cloud', period='monthly'):
|
||||||
|
return {
|
||||||
|
'id': sub_id,
|
||||||
|
'customer': customer,
|
||||||
|
'status': status,
|
||||||
|
'current_period_end': period_end,
|
||||||
|
'metadata': {'dictia_plan_slug': plan_slug, 'dictia_period': period},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _make_invoice(sub_id='sub_test', customer='cus_test'):
|
||||||
|
return {
|
||||||
|
'id': 'in_test_123',
|
||||||
|
'customer': customer,
|
||||||
|
'subscription': sub_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _post_webhook(client, payload=b'{}', signature='t=1,v1=fake'):
|
||||||
|
return client.post(
|
||||||
|
'/checkout/webhooks/stripe',
|
||||||
|
data=payload,
|
||||||
|
headers={'Stripe-Signature': signature, 'Content-Type': 'application/json'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 1-3. Signature verification
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_webhook_rejects_missing_signature():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
with app.test_client() as client:
|
||||||
|
# No Stripe-Signature header
|
||||||
|
resp = client.post(
|
||||||
|
'/checkout/webhooks/stripe',
|
||||||
|
data=b'{}',
|
||||||
|
headers={'Content-Type': 'application/json'},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
body = resp.get_json()
|
||||||
|
assert body.get('error') == 'invalid_signature'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_rejects_invalid_signature():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
with app.test_client() as client:
|
||||||
|
# Real construct_event will fail on bogus signature
|
||||||
|
resp = _post_webhook(client, payload=b'{"id":"evt_x"}',
|
||||||
|
signature='t=1,v1=bogus')
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert resp.get_json().get('error') == 'invalid_signature'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_rejects_when_secret_not_configured():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
# NO STRIPE_WEBHOOK_SECRET
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client, payload=b'{"id":"evt_x"}')
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert resp.get_json().get('error') == 'invalid_signature'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 4. Idempotency
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_webhook_idempotent_on_duplicate_event_id():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='dup@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_dup')
|
||||||
|
event = _make_event(
|
||||||
|
'checkout.session.completed',
|
||||||
|
_make_checkout_session(customer='cus_dup',
|
||||||
|
subscription='sub_dup',
|
||||||
|
user_id=str(user.id)),
|
||||||
|
event_id='evt_dup_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {
|
||||||
|
'current_period_end': 1730000000,
|
||||||
|
}
|
||||||
|
with app.test_client() as client:
|
||||||
|
# First call processes
|
||||||
|
resp1 = _post_webhook(client)
|
||||||
|
assert resp1.status_code == 200
|
||||||
|
assert resp1.get_json().get('received') is True
|
||||||
|
# Second call dedup'd
|
||||||
|
resp2 = _post_webhook(client)
|
||||||
|
assert resp2.status_code == 200
|
||||||
|
body2 = resp2.get_json()
|
||||||
|
assert body2.get('duplicate') is True
|
||||||
|
|
||||||
|
# Only one Subscription row + one WebhookEvent row
|
||||||
|
assert Subscription.query.filter_by(stripe_subscription_id='sub_dup').count() == 1
|
||||||
|
assert WebhookEvent.query.filter_by(stripe_event_id='evt_dup_1').count() == 1
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 5. Unhandled event types
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_webhook_acks_unhandled_event_type():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
event = _make_event('customer.created', {'id': 'cus_new'},
|
||||||
|
event_id='evt_unhandled_1')
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.get_json()
|
||||||
|
assert body.get('received') is True
|
||||||
|
assert body.get('handled') is False
|
||||||
|
# Recorded so Stripe stops retrying
|
||||||
|
assert WebhookEvent.query.filter_by(stripe_event_id='evt_unhandled_1').count() == 1
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 6. checkout.session.completed — happy path
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_checkout_session_completed_creates_subscription_and_sets_user_status():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='create@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_test')
|
||||||
|
event = _make_event(
|
||||||
|
'checkout.session.completed',
|
||||||
|
_make_checkout_session(customer='cus_test',
|
||||||
|
subscription='sub_test',
|
||||||
|
user_id=str(user.id)),
|
||||||
|
event_id='evt_create_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id='sub_test').first()
|
||||||
|
assert sub is not None
|
||||||
|
assert sub.user_id == user.id
|
||||||
|
assert sub.plan_slug == 'dictia-cloud'
|
||||||
|
assert sub.period == 'monthly'
|
||||||
|
assert sub.status == 'active'
|
||||||
|
assert sub.current_period_end is not None
|
||||||
|
db.session.refresh(user)
|
||||||
|
assert user.subscription_status == 'active'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 7. User resolution by stripe_customer_id
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_checkout_session_completed_resolves_user_by_stripe_customer_id():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='resolve@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_resolve')
|
||||||
|
session_obj = _make_checkout_session(
|
||||||
|
customer='cus_resolve',
|
||||||
|
subscription='sub_resolve',
|
||||||
|
user_id='',
|
||||||
|
)
|
||||||
|
# No metadata.dictia_user_id
|
||||||
|
session_obj['metadata'].pop('dictia_user_id', None)
|
||||||
|
event = _make_event('checkout.session.completed', session_obj,
|
||||||
|
event_id='evt_resolve_1')
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id='sub_resolve').first()
|
||||||
|
assert sub is not None
|
||||||
|
assert sub.user_id == user.id
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 8. User resolution by metadata when customer unknown
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_checkout_session_completed_resolves_user_by_metadata_when_customer_unknown():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='meta@example.qc.ca',
|
||||||
|
stripe_customer_id=None)
|
||||||
|
event = _make_event(
|
||||||
|
'checkout.session.completed',
|
||||||
|
_make_checkout_session(customer='cus_new',
|
||||||
|
subscription='sub_meta',
|
||||||
|
user_id=str(user.id)),
|
||||||
|
event_id='evt_meta_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id='sub_meta').first()
|
||||||
|
assert sub is not None
|
||||||
|
assert sub.user_id == user.id
|
||||||
|
db.session.refresh(user)
|
||||||
|
# stripe_customer_id is bound from the event
|
||||||
|
assert user.stripe_customer_id == 'cus_new'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 9. Idempotency via subscription_id uniqueness (different event IDs, same sub)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_checkout_session_completed_idempotent_via_subscription_id_uniqueness():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='subdup@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_subdup')
|
||||||
|
session_obj = _make_checkout_session(
|
||||||
|
customer='cus_subdup',
|
||||||
|
subscription='sub_subdup',
|
||||||
|
user_id=str(user.id),
|
||||||
|
)
|
||||||
|
event_a = _make_event('checkout.session.completed', session_obj,
|
||||||
|
event_id='evt_subdup_a')
|
||||||
|
event_b = _make_event('checkout.session.completed', session_obj,
|
||||||
|
event_id='evt_subdup_b')
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
mock_verify.return_value = event_a
|
||||||
|
resp1 = _post_webhook(client)
|
||||||
|
assert resp1.status_code == 200
|
||||||
|
mock_verify.return_value = event_b
|
||||||
|
resp2 = _post_webhook(client)
|
||||||
|
assert resp2.status_code == 200
|
||||||
|
|
||||||
|
# Only one Subscription row despite two distinct events
|
||||||
|
assert Subscription.query.filter_by(stripe_subscription_id='sub_subdup').count() == 1
|
||||||
|
# But both events recorded
|
||||||
|
assert WebhookEvent.query.filter_by(stripe_event_id='evt_subdup_a').count() == 1
|
||||||
|
assert WebhookEvent.query.filter_by(stripe_event_id='evt_subdup_b').count() == 1
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 10. customer.subscription.updated
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_subscription_updated_updates_status_and_period_end():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
from datetime import datetime
|
||||||
|
user = _make_user(email='upd@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_upd',
|
||||||
|
subscription_status='active')
|
||||||
|
existing = Subscription(
|
||||||
|
user_id=user.id,
|
||||||
|
stripe_customer_id='cus_upd',
|
||||||
|
stripe_subscription_id='sub_upd',
|
||||||
|
plan_slug='dictia-cloud',
|
||||||
|
period='monthly',
|
||||||
|
status='active',
|
||||||
|
current_period_end=datetime(2025, 1, 1),
|
||||||
|
)
|
||||||
|
db.session.add(existing)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
event = _make_event(
|
||||||
|
'customer.subscription.updated',
|
||||||
|
_make_subscription_obj(sub_id='sub_upd', customer='cus_upd',
|
||||||
|
status='past_due',
|
||||||
|
period_end=1735689600), # 2025-01-01
|
||||||
|
event_id='evt_upd_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
db.session.refresh(existing)
|
||||||
|
assert existing.status == 'past_due'
|
||||||
|
assert existing.current_period_end is not None
|
||||||
|
db.session.refresh(user)
|
||||||
|
assert user.subscription_status == 'past_due'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 11. customer.subscription.updated when row missing
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_subscription_updated_creates_row_if_missing():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='race@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_race')
|
||||||
|
event = _make_event(
|
||||||
|
'customer.subscription.updated',
|
||||||
|
_make_subscription_obj(sub_id='sub_race', customer='cus_race',
|
||||||
|
status='active'),
|
||||||
|
event_id='evt_race_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id='sub_race').first()
|
||||||
|
assert sub is not None
|
||||||
|
assert sub.user_id == user.id
|
||||||
|
assert sub.status == 'active'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 12. customer.subscription.deleted
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_subscription_deleted_marks_canceled():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='del@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_del',
|
||||||
|
subscription_status='active')
|
||||||
|
existing = Subscription(
|
||||||
|
user_id=user.id,
|
||||||
|
stripe_customer_id='cus_del',
|
||||||
|
stripe_subscription_id='sub_del',
|
||||||
|
plan_slug='dictia-cloud',
|
||||||
|
period='monthly',
|
||||||
|
status='active',
|
||||||
|
)
|
||||||
|
db.session.add(existing)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
event = _make_event(
|
||||||
|
'customer.subscription.deleted',
|
||||||
|
_make_subscription_obj(sub_id='sub_del', customer='cus_del',
|
||||||
|
status='canceled'),
|
||||||
|
event_id='evt_del_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
db.session.refresh(existing)
|
||||||
|
assert existing.status == 'canceled'
|
||||||
|
db.session.refresh(user)
|
||||||
|
assert user.subscription_status == 'canceled'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 13. invoice.payment_succeeded — recovers past_due
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_invoice_payment_succeeded_recovers_past_due():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='paysucc@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_paysucc',
|
||||||
|
subscription_status='past_due')
|
||||||
|
existing = Subscription(
|
||||||
|
user_id=user.id,
|
||||||
|
stripe_customer_id='cus_paysucc',
|
||||||
|
stripe_subscription_id='sub_paysucc',
|
||||||
|
plan_slug='dictia-cloud',
|
||||||
|
period='monthly',
|
||||||
|
status='past_due',
|
||||||
|
)
|
||||||
|
db.session.add(existing)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
event = _make_event(
|
||||||
|
'invoice.payment_succeeded',
|
||||||
|
_make_invoice(sub_id='sub_paysucc', customer='cus_paysucc'),
|
||||||
|
event_id='evt_paysucc_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
db.session.refresh(existing)
|
||||||
|
assert existing.status == 'active'
|
||||||
|
db.session.refresh(user)
|
||||||
|
assert user.subscription_status == 'active'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 14. invoice.payment_failed
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_invoice_payment_failed_marks_past_due():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='payfail@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_payfail',
|
||||||
|
subscription_status='active')
|
||||||
|
existing = Subscription(
|
||||||
|
user_id=user.id,
|
||||||
|
stripe_customer_id='cus_payfail',
|
||||||
|
stripe_subscription_id='sub_payfail',
|
||||||
|
plan_slug='dictia-cloud',
|
||||||
|
period='monthly',
|
||||||
|
status='active',
|
||||||
|
)
|
||||||
|
db.session.add(existing)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
event = _make_event(
|
||||||
|
'invoice.payment_failed',
|
||||||
|
_make_invoice(sub_id='sub_payfail', customer='cus_payfail'),
|
||||||
|
event_id='evt_payfail_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
db.session.refresh(existing)
|
||||||
|
assert existing.status == 'past_due'
|
||||||
|
db.session.refresh(user)
|
||||||
|
assert user.subscription_status == 'past_due'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 15. Handler exception → 500 (Stripe retry)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_handler_exception_returns_500_for_stripe_retry():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='boom@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_boom')
|
||||||
|
event = _make_event(
|
||||||
|
'checkout.session.completed',
|
||||||
|
_make_checkout_session(customer='cus_boom',
|
||||||
|
subscription='sub_boom',
|
||||||
|
user_id=str(user.id)),
|
||||||
|
event_id='evt_boom_1',
|
||||||
|
)
|
||||||
|
# Patch the dispatch table entry so our raising stub fires inside the
|
||||||
|
# try/except handler block in stripe_webhook(). Patching the module
|
||||||
|
# attribute alone wouldn't help because _HANDLERS captures the original
|
||||||
|
# reference at module load.
|
||||||
|
from src.billing import webhooks as wh_mod
|
||||||
|
|
||||||
|
def _boom(_event):
|
||||||
|
raise RuntimeError('boom')
|
||||||
|
|
||||||
|
with patch.dict(wh_mod._HANDLERS,
|
||||||
|
{'checkout.session.completed': _boom}), \
|
||||||
|
patch('src.billing.webhooks._verify_event') as mock_verify:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 500
|
||||||
|
assert resp.get_json().get('error') == 'handler_failed'
|
||||||
|
# No WebhookEvent recorded (rolled back)
|
||||||
|
assert WebhookEvent.query.filter_by(stripe_event_id='evt_boom_1').count() == 0
|
||||||
|
assert Subscription.query.filter_by(stripe_subscription_id='sub_boom').count() == 0
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 16. Metadata 'dictia_user_id' invalid → falls through
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_resolve_user_metadata_dictia_user_id_invalid_falls_through():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='emailfb@example.qc.ca',
|
||||||
|
stripe_customer_id=None)
|
||||||
|
session_obj = _make_checkout_session(
|
||||||
|
customer='cus_unknown',
|
||||||
|
subscription='sub_emailfb',
|
||||||
|
user_id='not-a-number',
|
||||||
|
email='emailfb@example.qc.ca',
|
||||||
|
)
|
||||||
|
event = _make_event('checkout.session.completed', session_obj,
|
||||||
|
event_id='evt_emailfb_1')
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
sub = Subscription.query.filter_by(stripe_subscription_id='sub_emailfb').first()
|
||||||
|
assert sub is not None
|
||||||
|
# Resolved by email
|
||||||
|
assert sub.user_id == user.id
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# 17. WebhookEvent records subscription_id for audit
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_webhook_event_records_subscription_id_for_audit():
|
||||||
|
with app.app_context():
|
||||||
|
_disable_csrf()
|
||||||
|
_clear_webhook_env()
|
||||||
|
os.environ['STRIPE_SECRET_KEY'] = 'sk_test_fake'
|
||||||
|
os.environ['STRIPE_WEBHOOK_SECRET'] = 'whsec_fake'
|
||||||
|
db.create_all()
|
||||||
|
try:
|
||||||
|
user = _make_user(email='audit@example.qc.ca',
|
||||||
|
stripe_customer_id='cus_audit')
|
||||||
|
event = _make_event(
|
||||||
|
'checkout.session.completed',
|
||||||
|
_make_checkout_session(customer='cus_audit',
|
||||||
|
subscription='sub_audit',
|
||||||
|
user_id=str(user.id)),
|
||||||
|
event_id='evt_audit_1',
|
||||||
|
)
|
||||||
|
with patch('src.billing.webhooks._verify_event') as mock_verify, \
|
||||||
|
patch('src.billing.webhooks.stripe.Subscription.retrieve') as mock_retr:
|
||||||
|
mock_verify.return_value = event
|
||||||
|
mock_retr.return_value = {'current_period_end': 1730000000}
|
||||||
|
with app.test_client() as client:
|
||||||
|
resp = _post_webhook(client)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
wh = WebhookEvent.query.filter_by(stripe_event_id='evt_audit_1').first()
|
||||||
|
assert wh is not None
|
||||||
|
assert wh.event_type == 'checkout.session.completed'
|
||||||
|
assert wh.stripe_subscription_id == 'sub_audit'
|
||||||
|
assert wh.stripe_customer_id == 'cus_audit'
|
||||||
|
finally:
|
||||||
|
db.session.rollback()
|
||||||
|
db.drop_all()
|
||||||
|
_clear_webhook_env()
|
||||||
Reference in New Issue
Block a user