Compare commits
3 Commits
f3203f3b3d
...
9c9db344df
| Author | SHA1 | Date | |
|---|---|---|---|
| 9c9db344df | |||
| a627511467 | |||
| 2a3891936e |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -174,3 +174,4 @@ cython_debug/
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
*.db
|
||||
115
README.md
115
README.md
@@ -1,2 +1,117 @@
|
||||
# Voter-Uptime-Bot
|
||||
|
||||
|
||||
A Discord bot that monitors the availability of web services and reports uptime statistics directly in your server. Built with `discord.py`, `aiohttp`, and `pydantic-settings`, it goes beyond simple HTTP checks — detecting Cloudflare intercept pages and validating real page content so you know your sites are genuinely reachable, not just returning a 200.
|
||||
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
- **Periodic polling** — checks all configured sites every 15 minutes
|
||||
- **Cloudflare intercept detection** — identifies challenge/block pages that return HTTP 200 but serve no real content
|
||||
- **Keyword content validation** — confirms expected strings are present on the loaded page
|
||||
- **Retry logic** — re-attempts checks on transient CF intercepts before marking a site degraded
|
||||
- **Uptime history** — stores all check results in a local SQLite database
|
||||
- **Discord slash commands** — view live status, 24-hour bars, and monthly summaries per site
|
||||
|
||||
---
|
||||
|
||||
## Commands
|
||||
|
||||
| Command | Description |
|
||||
|---|---|
|
||||
| `/uptime now` | Current status of all monitored sites |
|
||||
| `/uptime day <site>` | 24-hour bar chart (🟩🟨🟥) with uptime % |
|
||||
| `/uptime month <site>` | Current month summary, one square per day |
|
||||
| `/uptime summarize` | Monthly uptime % for all sites at once |
|
||||
|
||||
---
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
app/
|
||||
├── __init__.py
|
||||
├── bot.py # Discord client, slash commands, polling task
|
||||
├── checker.py # SiteChecker — HTTP, CF detection, keyword validation
|
||||
├── config.py # Pydantic Settings — loads all config from .env
|
||||
├── db.py # SQLite init, insert, and fetch helpers
|
||||
└── utils.py # check_site wrapper, bar rendering, uptime maths
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
All configuration is managed via a `.env` file using `pydantic-settings`. Copy `.env.example` to `.env` and fill in your values.
|
||||
|
||||
```env
|
||||
DISCORD_SECRET_KEY=your-bot-token-here
|
||||
DISCORD_CLIENT_ID=123456789
|
||||
DATABASE_PATH=uptime.db
|
||||
|
||||
MONITORED_SITES='[
|
||||
{
|
||||
"name": "MySite",
|
||||
"url": "https://example.com",
|
||||
"timeout_seconds": 10,
|
||||
"expected_keywords": ["Login", "Vote"],
|
||||
"max_retries": 1
|
||||
}
|
||||
]'
|
||||
```
|
||||
|
||||
### Site config fields
|
||||
|
||||
| Field | Required | Default | Description |
|
||||
|---|---|---|---|
|
||||
| `name` | ✅ | — | Display name used in Discord |
|
||||
| `url` | ✅ | — | Full URL to check |
|
||||
| `timeout_seconds` | | `10` | Request timeout |
|
||||
| `expected_status` | | `200` | Expected HTTP status code |
|
||||
| `expected_keywords` | | `[]` | Strings that must appear in the page body |
|
||||
| `max_retries` | | `1` | Extra attempts on CF intercept before marking degraded |
|
||||
|
||||
---
|
||||
|
||||
## How Checks Work
|
||||
|
||||
Each poll goes through three layers:
|
||||
|
||||
1. **HTTP status** — non-2xx/3xx responses are marked degraded or down
|
||||
2. **Cloudflare detection** — response body is scanned for CF challenge fingerprints; a match is marked `degraded` even if status was 200
|
||||
3. **Keyword validation** — any `expected_keywords` that are missing from the body mark the site `degraded`
|
||||
|
||||
Results are stored with a `detection_reason` (`cf_intercept`, `missing_keywords`, `http_503`, `timeout`, etc.) so you can query historical failure causes.
|
||||
|
||||
---
|
||||
|
||||
## Result States
|
||||
|
||||
| State | Emoji | Meaning |
|
||||
|---|---|---|
|
||||
| `up` | 🟩 | HTTP OK, no CF intercept, all keywords present, latency < 3s |
|
||||
| `degraded` | 🟨 | Reachable but CF intercept, missing content, slow, or 5xx |
|
||||
| `down` | 🟥 | Timeout, connection failure, SSL error, or unexpected status |
|
||||
|
||||
---
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
Run the bot:
|
||||
|
||||
```bash
|
||||
python -m app.bot
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.11+
|
||||
- A Discord bot token with the `applications.commands` scope and `bot` scope enabled
|
||||
- The bot must be invited to your server with slash command permissions
|
||||
1
app/__init__.py
Normal file
1
app/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
#app.__init__
|
||||
206
app/bot.py
Normal file
206
app/bot.py
Normal file
@@ -0,0 +1,206 @@
|
||||
#app.bot
|
||||
|
||||
import discord
|
||||
import aiohttp
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from discord import app_commands
|
||||
from discord.ext import tasks
|
||||
|
||||
from .config import settings
|
||||
from .db import init_db, insert_check, fetch_checks_since,fetch_month_checks
|
||||
from .utils import (
|
||||
check_site, summarize_counts, get_site_names,
|
||||
render_bar, compute_uptime, format_detection_reason,
|
||||
)
|
||||
|
||||
TOKEN = settings.discord_secret_key
|
||||
|
||||
# Convert Pydantic models → plain dicts once so the rest of bot.py is unchanged.
|
||||
MONITORED_SITES = [site.to_dict() for site in settings.monitored_sites]
|
||||
|
||||
# SITE_CHOICES stays exactly the same — it reads from MONITORED_SITES:
|
||||
SITE_CHOICES = [
|
||||
app_commands.Choice(name=site["name"], value=site["name"])
|
||||
for site in MONITORED_SITES
|
||||
]
|
||||
|
||||
intents = discord.Intents.default()
|
||||
client = discord.Client(intents=intents)
|
||||
tree = app_commands.CommandTree(client)
|
||||
session: aiohttp.ClientSession | None = None
|
||||
|
||||
@client.event
|
||||
async def on_ready():
|
||||
global session
|
||||
init_db()
|
||||
|
||||
if session is None:
|
||||
session = aiohttp.ClientSession(headers={"User-Agent": "VoteUptimeBot/1.0"})
|
||||
|
||||
if not poll_sites.is_running():
|
||||
poll_sites.start()
|
||||
|
||||
await tree.sync()
|
||||
print(f"Logged in as {client.user}")
|
||||
|
||||
@tasks.loop(minutes=15)
|
||||
async def poll_sites():
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
for site in MONITORED_SITES:
|
||||
result = await check_site(session, site)
|
||||
# Combine notes + detection_reason into the notes field so the DB
|
||||
# schema doesn't need a migration. error_type already carries reason.
|
||||
notes = result["notes"]
|
||||
if result.get("detection_reason") and not notes:
|
||||
notes = format_detection_reason(result["detection_reason"])
|
||||
|
||||
insert_check(
|
||||
site_name=site["name"],
|
||||
checked_at=now,
|
||||
http_status=result["http_status"],
|
||||
latency_ms=result["latency_ms"],
|
||||
result=result["result"],
|
||||
error_type=result["error_type"], # now carries detection_reason too
|
||||
notes=notes,
|
||||
)
|
||||
|
||||
|
||||
@poll_sites.before_loop
|
||||
async def before_poll_sites():
|
||||
await client.wait_until_ready()
|
||||
|
||||
class SiteNameTransformer(app_commands.Transformer):
|
||||
async def transform(self, interaction: discord.Interaction, value: str) -> str:
|
||||
if value not in get_site_names(MONITORED_SITES):
|
||||
raise app_commands.AppCommandError(f"Unknown site: {value}")
|
||||
return value
|
||||
|
||||
uptime_group = app_commands.Group(name="uptime", description="Uptime tools")
|
||||
tree.add_command(uptime_group)
|
||||
|
||||
|
||||
@uptime_group.command(name="now", description="Show current configured sites")
|
||||
async def uptime_now(interaction: discord.Interaction):
|
||||
lines = []
|
||||
for site in MONITORED_SITES:
|
||||
rows = fetch_checks_since(
|
||||
site["name"], datetime.now(timezone.utc) - timedelta(hours=1)
|
||||
)
|
||||
if not rows:
|
||||
lines.append(f"**{site['name']}**: no recent data")
|
||||
continue
|
||||
|
||||
checked_at, result, http_status, latency_ms, error_type = rows[-1]
|
||||
|
||||
# Emoji prefix for quick scanning
|
||||
emoji = {"up": "🟩", "degraded": "🟨", "down": "🟥"}.get(result, "⬛")
|
||||
|
||||
detail = f"{result.upper()} | status={http_status} | latency={latency_ms}ms"
|
||||
if error_type:
|
||||
detail += f" | reason={format_detection_reason(error_type)}"
|
||||
|
||||
lines.append(f"{emoji} **{site['name']}**: {detail}")
|
||||
|
||||
await interaction.response.send_message("\n".join(lines))
|
||||
|
||||
|
||||
SITE_CHOICES = [
|
||||
app_commands.Choice(name=site["name"], value=site["name"])
|
||||
for site in MONITORED_SITES
|
||||
]
|
||||
|
||||
@uptime_group.command(name="day", description="Last 24 hours in 15-minute bars")
|
||||
@app_commands.describe(site="Site name")
|
||||
@app_commands.choices(site=SITE_CHOICES)
|
||||
async def uptime_day(
|
||||
interaction: discord.Interaction,
|
||||
site: app_commands.Transform[str, SiteNameTransformer],
|
||||
):
|
||||
since = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
rows = fetch_checks_since(site, since)
|
||||
results = [row[1] for row in rows]
|
||||
|
||||
bar = render_bar(results[-96:]) if results else "⬛"
|
||||
uptime = compute_uptime(results)
|
||||
up, degraded, down = summarize_counts(results)
|
||||
|
||||
msg = (
|
||||
f"**{site}** last 24h\n"
|
||||
f"{bar}\n"
|
||||
f"Uptime: **{uptime:.2f}%**\n"
|
||||
f"Up: {up} | Degraded: {degraded} | Down: {down}"
|
||||
)
|
||||
await interaction.response.send_message(msg)
|
||||
|
||||
|
||||
@uptime_group.command(name="month", description="Current month summary")
|
||||
@app_commands.describe(site="Site name")
|
||||
@app_commands.choices(site=SITE_CHOICES)
|
||||
async def uptime_month(
|
||||
interaction: discord.Interaction,
|
||||
site: app_commands.Transform[str, SiteNameTransformer],
|
||||
):
|
||||
now = datetime.now(timezone.utc)
|
||||
rows = fetch_month_checks(site, now.year, now.month)
|
||||
|
||||
by_day: dict[str, list[str]] = {}
|
||||
for checked_at, result, *_ in rows:
|
||||
day_key = checked_at[:10]
|
||||
by_day.setdefault(day_key, []).append(result)
|
||||
|
||||
day_bars = []
|
||||
for day in sorted(by_day.keys()):
|
||||
pct = compute_uptime(by_day[day])
|
||||
if pct >= 99:
|
||||
day_bars.append("🟩")
|
||||
elif pct >= 95:
|
||||
day_bars.append("🟨")
|
||||
else:
|
||||
day_bars.append("🟥")
|
||||
|
||||
all_results = [row[1] for row in rows]
|
||||
uptime = compute_uptime(all_results)
|
||||
up, degraded, down = summarize_counts(all_results)
|
||||
|
||||
msg = (
|
||||
f"**{site}** {now.year}-{now.month:02d}\n"
|
||||
f"{''.join(day_bars) if day_bars else '⬛'}\n"
|
||||
f"Uptime: **{uptime:.2f}%**\n"
|
||||
f"Up: {up} | Degraded: {degraded} | Down: {down}"
|
||||
)
|
||||
await interaction.response.send_message(msg)
|
||||
|
||||
|
||||
@uptime_group.command(name="summarize", description="Summarize current month for all sites")
|
||||
async def uptime_summarize(interaction: discord.Interaction):
|
||||
now = datetime.now(timezone.utc)
|
||||
lines = [f"**Monthly summary for {now.year}-{now.month:02d}**"]
|
||||
|
||||
for site in MONITORED_SITES:
|
||||
rows = fetch_month_checks(site["name"], now.year, now.month)
|
||||
results = [row[1] for row in rows]
|
||||
uptime = compute_uptime(results)
|
||||
up, degraded, down = summarize_counts(results)
|
||||
lines.append(
|
||||
f"{site['name']}: uptime={uptime:.2f}% | up={up} | degraded={degraded} | down={down}"
|
||||
)
|
||||
|
||||
await interaction.response.send_message("\n".join(lines))
|
||||
|
||||
|
||||
@tree.command(name="incident", description="Placeholder incident review command")
|
||||
async def incident(interaction: discord.Interaction):
|
||||
await interaction.response.send_message("Incident review command placeholder.")
|
||||
|
||||
|
||||
@tree.command(name="hello", description="Say hello")
|
||||
async def hello(interaction: discord.Interaction):
|
||||
await interaction.response.send_message("Hello, world!")
|
||||
|
||||
|
||||
@tree.command(name="add", description="Add two numbers")
|
||||
async def add(interaction: discord.Interaction, a: float, b: float):
|
||||
await interaction.response.send_message(f"Sum: {a + b}")
|
||||
|
||||
client.run(TOKEN)
|
||||
161
app/checker.py
Normal file
161
app/checker.py
Normal file
@@ -0,0 +1,161 @@
|
||||
#app.checker
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import aiohttp
|
||||
|
||||
# Fingerprints that identify a Cloudflare interstitial/challenge/block page.
|
||||
# These appear in the response body even when the HTTP status is 200.
|
||||
CLOUDFLARE_FINGERPRINTS = [
|
||||
"Just a moment", # JS challenge page <title>
|
||||
"cf-browser-verification", # Legacy challenge <form> id
|
||||
"cf_chl_opt", # Challenge options JS variable
|
||||
"cf-wrapper", # <body> class on block/error pages
|
||||
"Checking your browser", # Human-verification copy
|
||||
"Enable JavaScript and cookies to continue", # CF IUAM page body
|
||||
"Please Wait... | Cloudflare", # Page title variant
|
||||
"_cf_chl_f_tk", # Hidden challenge token field
|
||||
]
|
||||
|
||||
|
||||
def _detect_cloudflare(body: str) -> bool:
|
||||
"""Return True if the response body looks like a Cloudflare intercept page."""
|
||||
return any(fp in body for fp in CLOUDFLARE_FINGERPRINTS)
|
||||
|
||||
|
||||
def _check_keywords(body: str, keywords: list[str]) -> list[str]:
|
||||
"""Return any expected keywords that are absent from the body."""
|
||||
return [kw for kw in keywords if kw not in body]
|
||||
|
||||
|
||||
class SiteChecker:
|
||||
def __init__(self, session: aiohttp.ClientSession):
|
||||
self.session = session
|
||||
|
||||
async def check(self, site: dict) -> dict:
|
||||
"""
|
||||
Check a site dict with the following keys:
|
||||
url (str) required
|
||||
timeout_seconds (int) default 10
|
||||
expected_status (int) default 200
|
||||
expected_keywords (list) default [] — strings that must appear in body
|
||||
max_retries (int) default 1 — extra attempts on CF intercept
|
||||
|
||||
Returns a result dict:
|
||||
http_status, latency_ms, result, error_type, notes, detection_reason
|
||||
"""
|
||||
url = site["url"]
|
||||
timeout_secs = site.get("timeout_seconds", 10)
|
||||
expected_kws = site.get("expected_keywords", [])
|
||||
max_retries = site.get("max_retries", 1)
|
||||
|
||||
last_result = None
|
||||
for attempt in range(max_retries + 1):
|
||||
last_result = await self._single_check(url, timeout_secs, expected_kws)
|
||||
|
||||
# Only retry when we hit a CF intercept and we have attempts left
|
||||
if last_result["detection_reason"] == "cf_intercept" and attempt < max_retries:
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
break
|
||||
|
||||
return last_result
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _single_check(
|
||||
self,
|
||||
url: str,
|
||||
timeout_seconds: int,
|
||||
expected_keywords: list[str],
|
||||
) -> dict:
|
||||
started = time.perf_counter()
|
||||
try:
|
||||
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
|
||||
async with self.session.get(
|
||||
url, timeout=timeout, allow_redirects=True
|
||||
) as resp:
|
||||
latency_ms = int((time.perf_counter() - started) * 1000)
|
||||
body = await resp.text(errors="replace")
|
||||
|
||||
# --- Cloudflare intercept check (fires even on HTTP 200) ---
|
||||
if _detect_cloudflare(body):
|
||||
return {
|
||||
"http_status": resp.status,
|
||||
"latency_ms": latency_ms,
|
||||
"result": "degraded",
|
||||
"error_type": "cf_intercept",
|
||||
"detection_reason": "cf_intercept",
|
||||
"notes": "Cloudflare challenge/block page returned instead of real content",
|
||||
}
|
||||
|
||||
# --- HTTP status check ---
|
||||
if 200 <= resp.status < 400:
|
||||
result = "up" if latency_ms < 3000 else "degraded"
|
||||
detection_reason = "slow_response" if result == "degraded" else None
|
||||
elif resp.status in {429, 500, 502, 503, 504}:
|
||||
result = "degraded"
|
||||
detection_reason = f"http_{resp.status}"
|
||||
else:
|
||||
result = "down"
|
||||
detection_reason = f"http_{resp.status}"
|
||||
|
||||
# --- Keyword content check (only meaningful when HTTP is OK) ---
|
||||
if result in {"up", "degraded"} and expected_keywords:
|
||||
missing = _check_keywords(body, expected_keywords)
|
||||
if missing:
|
||||
result = "degraded"
|
||||
detection_reason = "missing_keywords"
|
||||
notes = f"Expected keywords not found in page: {missing}"
|
||||
else:
|
||||
notes = None
|
||||
else:
|
||||
notes = None
|
||||
|
||||
return {
|
||||
"http_status": resp.status,
|
||||
"latency_ms": latency_ms,
|
||||
"result": result,
|
||||
"error_type": detection_reason,
|
||||
"detection_reason": detection_reason,
|
||||
"notes": notes,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "timeout",
|
||||
"detection_reason": "timeout",
|
||||
"notes": "Request timed out",
|
||||
}
|
||||
except aiohttp.ClientConnectorError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "connection",
|
||||
"detection_reason": "connection",
|
||||
"notes": "Could not connect",
|
||||
}
|
||||
except aiohttp.ClientSSLError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "ssl",
|
||||
"detection_reason": "ssl",
|
||||
"notes": "SSL/TLS handshake error",
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "unknown",
|
||||
"detection_reason": "unknown",
|
||||
"notes": str(e),
|
||||
}
|
||||
53
app/config.py
Normal file
53
app/config.py
Normal file
@@ -0,0 +1,53 @@
|
||||
# app.config
|
||||
|
||||
from pydantic import BaseModel, HttpUrl, field_validator
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class SiteConfig(BaseModel):
|
||||
"""Schema for a single monitored site."""
|
||||
name: str
|
||||
url: HttpUrl
|
||||
timeout_seconds: int = 10
|
||||
expected_status: int = 200
|
||||
expected_keywords: list[str] = []
|
||||
max_retries: int = 1
|
||||
|
||||
@field_validator("timeout_seconds")
|
||||
@classmethod
|
||||
def timeout_must_be_positive(cls, v: int) -> int:
|
||||
if v <= 0:
|
||||
raise ValueError("timeout_seconds must be a positive integer")
|
||||
return v
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return a plain dict compatible with check_site() in utils.py."""
|
||||
return {
|
||||
"name": self.name,
|
||||
"url": str(self.url),
|
||||
"timeout_seconds": self.timeout_seconds,
|
||||
"expected_status": self.expected_status,
|
||||
"expected_keywords": self.expected_keywords,
|
||||
"max_retries": self.max_retries,
|
||||
}
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
)
|
||||
|
||||
# Discord
|
||||
discord_secret_key: str = ""
|
||||
discord_client_id: str = ""
|
||||
discord_client_secret: str = ""
|
||||
|
||||
# Database
|
||||
database_path: str = "uptime.db"
|
||||
|
||||
# Sites — stored as a JSON array string in .env:
|
||||
monitored_sites: list[SiteConfig] = []
|
||||
|
||||
|
||||
settings = Settings()
|
||||
95
app/db.py
Normal file
95
app/db.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#app.db
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
from .config import settings
|
||||
DB_PATH = settings.database_path
|
||||
|
||||
def init_db() -> None:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS site_checks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_name TEXT NOT NULL,
|
||||
checked_at TEXT NOT NULL,
|
||||
http_status INTEGER,
|
||||
latency_ms INTEGER,
|
||||
result TEXT NOT NULL,
|
||||
error_type TEXT,
|
||||
notes TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS monthly_reports (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
month_key TEXT NOT NULL,
|
||||
site_name TEXT NOT NULL,
|
||||
total_checks INTEGER NOT NULL,
|
||||
up_checks INTEGER NOT NULL,
|
||||
degraded_checks INTEGER NOT NULL,
|
||||
down_checks INTEGER NOT NULL,
|
||||
uptime_percent REAL NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def insert_check(
|
||||
site_name: str,
|
||||
checked_at: str,
|
||||
http_status: int | None,
|
||||
latency_ms: int | None,
|
||||
result: str,
|
||||
error_type: str | None,
|
||||
notes: str | None,
|
||||
) -> None:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO site_checks (
|
||||
site_name, checked_at, http_status, latency_ms, result, error_type, notes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""", (site_name, checked_at, http_status, latency_ms, result, error_type, notes))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def fetch_checks_since(site_name: str, since_dt: datetime) -> list[tuple]:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT checked_at, result, http_status, latency_ms, error_type
|
||||
FROM site_checks
|
||||
WHERE site_name = ? AND checked_at >= ?
|
||||
ORDER BY checked_at ASC
|
||||
""", (site_name, since_dt.isoformat()))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
|
||||
|
||||
def fetch_month_checks(site_name: str, year: int, month: int) -> list[tuple]:
|
||||
start = datetime(year, month, 1, tzinfo=timezone.utc)
|
||||
if month == 12:
|
||||
end = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
|
||||
else:
|
||||
end = datetime(year, month + 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT checked_at, result, http_status, latency_ms, error_type
|
||||
FROM site_checks
|
||||
WHERE site_name = ? AND checked_at >= ? AND checked_at < ?
|
||||
ORDER BY checked_at ASC
|
||||
""", (site_name, start.isoformat(), end.isoformat()))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
88
app/utils.py
Normal file
88
app/utils.py
Normal file
@@ -0,0 +1,88 @@
|
||||
#app.utils
|
||||
|
||||
# check_site is the single entry point used by bot.py.
|
||||
# All HTTP / CF / keyword logic now lives in checker.py (SiteChecker).
|
||||
# This file keeps the pure helper functions for bar rendering, uptime
|
||||
# maths, and site-list utilities.
|
||||
|
||||
import aiohttp
|
||||
from .checker import SiteChecker
|
||||
|
||||
|
||||
async def check_site(session: aiohttp.ClientSession, site: dict) -> dict:
|
||||
"""
|
||||
Thin wrapper so bot.py doesn't need to import SiteChecker directly.
|
||||
|
||||
Accepts a site dict (same shape as MONITORED_SITES entries):
|
||||
name, url, timeout_seconds, expected_status,
|
||||
expected_keywords (optional list[str]),
|
||||
max_retries (optional int, default 1)
|
||||
"""
|
||||
assert session is not None, "aiohttp session must be initialised before checking sites"
|
||||
checker = SiteChecker(session)
|
||||
return await checker.check(site)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pure helpers — no I/O
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def summarize_counts(results: list[str]) -> tuple[int, int, int]:
|
||||
"""Return (up, degraded, down) counts from a list of result strings."""
|
||||
up = sum(1 for r in results if r == "up")
|
||||
degraded = sum(1 for r in results if r == "degraded")
|
||||
down = sum(1 for r in results if r == "down")
|
||||
return up, degraded, down
|
||||
|
||||
|
||||
def get_site_names(monitored_sites: list[dict]) -> list[str]:
|
||||
return [site["name"] for site in monitored_sites]
|
||||
|
||||
|
||||
def render_bar(results: list[str]) -> str:
|
||||
"""
|
||||
Convert a sequence of result strings into a coloured emoji bar.
|
||||
|
||||
Emoji key:
|
||||
🟩 up 🟨 degraded / cf_intercept / missing_keywords
|
||||
🟥 down ⬛ no data
|
||||
"""
|
||||
mapping = {
|
||||
"up": "🟩",
|
||||
"degraded": "🟨",
|
||||
"down": "🟥",
|
||||
"nodata": "⬛",
|
||||
}
|
||||
return "".join(mapping.get(r, "⬛") for r in results)
|
||||
|
||||
|
||||
def compute_uptime(results: list[str]) -> float:
|
||||
"""
|
||||
Return the percentage of checks that were fully 'up'.
|
||||
'nodata' entries are excluded from the denominator.
|
||||
Returns 0.0 when there are no considered results.
|
||||
"""
|
||||
considered = [r for r in results if r != "nodata"]
|
||||
if not considered:
|
||||
return 0.0
|
||||
good = sum(1 for r in considered if r == "up")
|
||||
return (good / len(considered)) * 100
|
||||
|
||||
|
||||
def format_detection_reason(reason: str | None) -> str:
|
||||
"""Human-readable label for a detection_reason value."""
|
||||
if not reason:
|
||||
return ""
|
||||
labels = {
|
||||
"cf_intercept": "Cloudflare intercept",
|
||||
"missing_keywords": "Content check failed",
|
||||
"slow_response": "Slow response (>3 s)",
|
||||
"timeout": "Request timed out",
|
||||
"connection": "Connection refused",
|
||||
"ssl": "SSL/TLS error",
|
||||
"unknown": "Unknown error",
|
||||
}
|
||||
# http_NNN codes
|
||||
if reason.startswith("http_"):
|
||||
return f"HTTP {reason[5:]}"
|
||||
return labels.get(reason, reason)
|
||||
BIN
requirements.txt
Normal file
BIN
requirements.txt
Normal file
Binary file not shown.
Reference in New Issue
Block a user