""" Notification dispatcher — webhooks and immediate email alerts. Called from burnin.py when a job reaches a terminal state (passed/failed). Webhook fires unconditionally when WEBHOOK_URL is set. Email alerts fire based on smtp_alert_on_fail / smtp_alert_on_pass settings. """ import asyncio import logging from app.config import settings log = logging.getLogger(__name__) async def notify_job_complete( job_id: int, devname: str, serial: str | None, model: str | None, state: str, profile: str, operator: str, error_text: str | None, ) -> None: """Fire all configured notifications for a completed burn-in job.""" tasks = [] if settings.webhook_url: tasks.append(_send_webhook({ "event": f"burnin_{state}", "job_id": job_id, "devname": devname, "serial": serial, "model": model, "state": state, "profile": profile, "operator": operator, "error_text": error_text, })) if settings.smtp_host: should_alert = ( (state == "failed" and settings.smtp_alert_on_fail) or (state == "passed" and settings.smtp_alert_on_pass) ) if should_alert: tasks.append(_send_alert_email(job_id, devname, serial, model, state, error_text)) if not tasks: return results = await asyncio.gather(*tasks, return_exceptions=True) for r in results: if isinstance(r, Exception): log.error("Notification failed: %s", r, extra={"job_id": job_id, "devname": devname}) async def _send_webhook(payload: dict) -> None: import httpx async with httpx.AsyncClient(timeout=10.0) as client: r = await client.post(settings.webhook_url, json=payload) r.raise_for_status() log.info( "Webhook sent", extra={"event": payload.get("event"), "job_id": payload.get("job_id"), "url": settings.webhook_url}, ) async def _send_alert_email( job_id: int, devname: str, serial: str | None, model: str | None, state: str, error_text: str | None, ) -> None: from app import mailer await mailer.send_job_alert(job_id, devname, serial, model, state, error_text)