#app.utils # check_site is the single entry point used by bot.py. # All HTTP / CF / keyword logic now lives in checker.py (SiteChecker). # This file keeps the pure helper functions for bar rendering, uptime # maths, and site-list utilities. import aiohttp from .checker import SiteChecker async def check_site(session: aiohttp.ClientSession, site: dict) -> dict: """ Thin wrapper so bot.py doesn't need to import SiteChecker directly. Accepts a site dict (same shape as MONITORED_SITES entries): name, url, timeout_seconds, expected_status, expected_keywords (optional list[str]), max_retries (optional int, default 1) """ assert session is not None, "aiohttp session must be initialised before checking sites" checker = SiteChecker(session) return await checker.check(site) # --------------------------------------------------------------------------- # Pure helpers — no I/O # --------------------------------------------------------------------------- def summarize_counts(results: list[str]) -> tuple[int, int, int]: """Return (up, degraded, down) counts from a list of result strings.""" up = sum(1 for r in results if r == "up") degraded = sum(1 for r in results if r == "degraded") down = sum(1 for r in results if r == "down") return up, degraded, down def get_site_names(monitored_sites: list[dict]) -> list[str]: return [site["name"] for site in monitored_sites] def render_bar(results: list[str]) -> str: """ Convert a sequence of result strings into a coloured emoji bar. Emoji key: 🟩 up 🟨 degraded / cf_intercept / missing_keywords 🟥 down ⬛ no data """ mapping = { "up": "🟩", "degraded": "🟨", "down": "🟥", "nodata": "⬛", } return "".join(mapping.get(r, "⬛") for r in results) def compute_uptime(results: list[str]) -> float: """ Return the percentage of checks that were fully 'up'. 'nodata' entries are excluded from the denominator. Returns 0.0 when there are no considered results. """ considered = [r for r in results if r != "nodata"] if not considered: return 0.0 good = sum(1 for r in considered if r == "up") return (good / len(considered)) * 100 def format_detection_reason(reason: str | None) -> str: """Human-readable label for a detection_reason value.""" if not reason: return "" labels = { "cf_intercept": "Cloudflare intercept", "missing_keywords": "Content check failed", "slow_response": "Slow response (>3 s)", "timeout": "Request timed out", "connection": "Connection refused", "ssl": "SSL/TLS error", "unknown": "Unknown error", } # http_NNN codes if reason.startswith("http_"): return f"HTTP {reason[5:]}" return labels.get(reason, reason)