161 lines
6.2 KiB
Python
161 lines
6.2 KiB
Python
#app.checker
|
|
|
|
import asyncio
|
|
import time
|
|
import aiohttp
|
|
|
|
# Fingerprints that identify a Cloudflare interstitial/challenge/block page.
|
|
# These appear in the response body even when the HTTP status is 200.
|
|
CLOUDFLARE_FINGERPRINTS = [
|
|
"Just a moment", # JS challenge page <title>
|
|
"cf-browser-verification", # Legacy challenge <form> id
|
|
"cf_chl_opt", # Challenge options JS variable
|
|
"cf-wrapper", # <body> class on block/error pages
|
|
"Checking your browser", # Human-verification copy
|
|
"Enable JavaScript and cookies to continue", # CF IUAM page body
|
|
"Please Wait... | Cloudflare", # Page title variant
|
|
"_cf_chl_f_tk", # Hidden challenge token field
|
|
]
|
|
|
|
|
|
def _detect_cloudflare(body: str) -> bool:
|
|
"""Return True if the response body looks like a Cloudflare intercept page."""
|
|
return any(fp in body for fp in CLOUDFLARE_FINGERPRINTS)
|
|
|
|
|
|
def _check_keywords(body: str, keywords: list[str]) -> list[str]:
|
|
"""Return any expected keywords that are absent from the body."""
|
|
return [kw for kw in keywords if kw not in body]
|
|
|
|
|
|
class SiteChecker:
|
|
def __init__(self, session: aiohttp.ClientSession):
|
|
self.session = session
|
|
|
|
async def check(self, site: dict) -> dict:
|
|
"""
|
|
Check a site dict with the following keys:
|
|
url (str) required
|
|
timeout_seconds (int) default 10
|
|
expected_status (int) default 200
|
|
expected_keywords (list) default [] — strings that must appear in body
|
|
max_retries (int) default 1 — extra attempts on CF intercept
|
|
|
|
Returns a result dict:
|
|
http_status, latency_ms, result, error_type, notes, detection_reason
|
|
"""
|
|
url = site["url"]
|
|
timeout_secs = site.get("timeout_seconds", 10)
|
|
expected_kws = site.get("expected_keywords", [])
|
|
max_retries = site.get("max_retries", 1)
|
|
|
|
last_result = None
|
|
for attempt in range(max_retries + 1):
|
|
last_result = await self._single_check(url, timeout_secs, expected_kws)
|
|
|
|
# Only retry when we hit a CF intercept and we have attempts left
|
|
if last_result["detection_reason"] == "cf_intercept" and attempt < max_retries:
|
|
await asyncio.sleep(2)
|
|
continue
|
|
break
|
|
|
|
return last_result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _single_check(
|
|
self,
|
|
url: str,
|
|
timeout_seconds: int,
|
|
expected_keywords: list[str],
|
|
) -> dict:
|
|
started = time.perf_counter()
|
|
try:
|
|
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
|
|
async with self.session.get(
|
|
url, timeout=timeout, allow_redirects=True
|
|
) as resp:
|
|
latency_ms = int((time.perf_counter() - started) * 1000)
|
|
body = await resp.text(errors="replace")
|
|
|
|
# --- Cloudflare intercept check (fires even on HTTP 200) ---
|
|
if _detect_cloudflare(body):
|
|
return {
|
|
"http_status": resp.status,
|
|
"latency_ms": latency_ms,
|
|
"result": "degraded",
|
|
"error_type": "cf_intercept",
|
|
"detection_reason": "cf_intercept",
|
|
"notes": "Cloudflare challenge/block page returned instead of real content",
|
|
}
|
|
|
|
# --- HTTP status check ---
|
|
if 200 <= resp.status < 400:
|
|
result = "up" if latency_ms < 3000 else "degraded"
|
|
detection_reason = "slow_response" if result == "degraded" else None
|
|
elif resp.status in {429, 500, 502, 503, 504}:
|
|
result = "degraded"
|
|
detection_reason = f"http_{resp.status}"
|
|
else:
|
|
result = "down"
|
|
detection_reason = f"http_{resp.status}"
|
|
|
|
# --- Keyword content check (only meaningful when HTTP is OK) ---
|
|
if result in {"up", "degraded"} and expected_keywords:
|
|
missing = _check_keywords(body, expected_keywords)
|
|
if missing:
|
|
result = "degraded"
|
|
detection_reason = "missing_keywords"
|
|
notes = f"Expected keywords not found in page: {missing}"
|
|
else:
|
|
notes = None
|
|
else:
|
|
notes = None
|
|
|
|
return {
|
|
"http_status": resp.status,
|
|
"latency_ms": latency_ms,
|
|
"result": result,
|
|
"error_type": detection_reason,
|
|
"detection_reason": detection_reason,
|
|
"notes": notes,
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
return {
|
|
"http_status": None,
|
|
"latency_ms": None,
|
|
"result": "down",
|
|
"error_type": "timeout",
|
|
"detection_reason": "timeout",
|
|
"notes": "Request timed out",
|
|
}
|
|
except aiohttp.ClientConnectorError:
|
|
return {
|
|
"http_status": None,
|
|
"latency_ms": None,
|
|
"result": "down",
|
|
"error_type": "connection",
|
|
"detection_reason": "connection",
|
|
"notes": "Could not connect",
|
|
}
|
|
except aiohttp.ClientSSLError:
|
|
return {
|
|
"http_status": None,
|
|
"latency_ms": None,
|
|
"result": "down",
|
|
"error_type": "ssl",
|
|
"detection_reason": "ssl",
|
|
"notes": "SSL/TLS handshake error",
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"http_status": None,
|
|
"latency_ms": None,
|
|
"result": "down",
|
|
"error_type": "unknown",
|
|
"detection_reason": "unknown",
|
|
"notes": str(e),
|
|
} |