Files
Voter-Uptime-Bot/app/checker.py
2026-04-12 21:27:34 -07:00

161 lines
6.2 KiB
Python

#app.checker
import asyncio
import time
import aiohttp
# Fingerprints that identify a Cloudflare interstitial/challenge/block page.
# These appear in the response body even when the HTTP status is 200.
CLOUDFLARE_FINGERPRINTS = [
"Just a moment", # JS challenge page <title>
"cf-browser-verification", # Legacy challenge <form> id
"cf_chl_opt", # Challenge options JS variable
"cf-wrapper", # <body> class on block/error pages
"Checking your browser", # Human-verification copy
"Enable JavaScript and cookies to continue", # CF IUAM page body
"Please Wait... | Cloudflare", # Page title variant
"_cf_chl_f_tk", # Hidden challenge token field
]
def _detect_cloudflare(body: str) -> bool:
"""Return True if the response body looks like a Cloudflare intercept page."""
return any(fp in body for fp in CLOUDFLARE_FINGERPRINTS)
def _check_keywords(body: str, keywords: list[str]) -> list[str]:
"""Return any expected keywords that are absent from the body."""
return [kw for kw in keywords if kw not in body]
class SiteChecker:
def __init__(self, session: aiohttp.ClientSession):
self.session = session
async def check(self, site: dict) -> dict:
"""
Check a site dict with the following keys:
url (str) required
timeout_seconds (int) default 10
expected_status (int) default 200
expected_keywords (list) default [] — strings that must appear in body
max_retries (int) default 1 — extra attempts on CF intercept
Returns a result dict:
http_status, latency_ms, result, error_type, notes, detection_reason
"""
url = site["url"]
timeout_secs = site.get("timeout_seconds", 10)
expected_kws = site.get("expected_keywords", [])
max_retries = site.get("max_retries", 1)
last_result = None
for attempt in range(max_retries + 1):
last_result = await self._single_check(url, timeout_secs, expected_kws)
# Only retry when we hit a CF intercept and we have attempts left
if last_result["detection_reason"] == "cf_intercept" and attempt < max_retries:
await asyncio.sleep(2)
continue
break
return last_result
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _single_check(
self,
url: str,
timeout_seconds: int,
expected_keywords: list[str],
) -> dict:
started = time.perf_counter()
try:
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with self.session.get(
url, timeout=timeout, allow_redirects=True
) as resp:
latency_ms = int((time.perf_counter() - started) * 1000)
body = await resp.text(errors="replace")
# --- Cloudflare intercept check (fires even on HTTP 200) ---
if _detect_cloudflare(body):
return {
"http_status": resp.status,
"latency_ms": latency_ms,
"result": "degraded",
"error_type": "cf_intercept",
"detection_reason": "cf_intercept",
"notes": "Cloudflare challenge/block page returned instead of real content",
}
# --- HTTP status check ---
if 200 <= resp.status < 400:
result = "up" if latency_ms < 3000 else "degraded"
detection_reason = "slow_response" if result == "degraded" else None
elif resp.status in {429, 500, 502, 503, 504}:
result = "degraded"
detection_reason = f"http_{resp.status}"
else:
result = "down"
detection_reason = f"http_{resp.status}"
# --- Keyword content check (only meaningful when HTTP is OK) ---
if result in {"up", "degraded"} and expected_keywords:
missing = _check_keywords(body, expected_keywords)
if missing:
result = "degraded"
detection_reason = "missing_keywords"
notes = f"Expected keywords not found in page: {missing}"
else:
notes = None
else:
notes = None
return {
"http_status": resp.status,
"latency_ms": latency_ms,
"result": result,
"error_type": detection_reason,
"detection_reason": detection_reason,
"notes": notes,
}
except asyncio.TimeoutError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "timeout",
"detection_reason": "timeout",
"notes": "Request timed out",
}
except aiohttp.ClientConnectorError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "connection",
"detection_reason": "connection",
"notes": "Could not connect",
}
except aiohttp.ClientSSLError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "ssl",
"detection_reason": "ssl",
"notes": "SSL/TLS handshake error",
}
except Exception as e:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "unknown",
"detection_reason": "unknown",
"notes": str(e),
}