Bot Version 1.0.0

This commit is contained in:
2026-04-12 21:27:34 -07:00
parent f3203f3b3d
commit 2a3891936e
6 changed files with 604 additions and 0 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
#app.__init__

206
app/bot.py Normal file
View File

@@ -0,0 +1,206 @@
#app.bot
import discord
import aiohttp
from datetime import datetime, timezone, timedelta
from discord import app_commands
from discord.ext import tasks
from .config import settings
from .db import init_db, insert_check, fetch_checks_since,fetch_month_checks
from .utils import (
check_site, summarize_counts, get_site_names,
render_bar, compute_uptime, format_detection_reason,
)
TOKEN = settings.discord_secret_key
# Convert Pydantic models → plain dicts once so the rest of bot.py is unchanged.
MONITORED_SITES = [site.to_dict() for site in settings.monitored_sites]
# SITE_CHOICES stays exactly the same — it reads from MONITORED_SITES:
SITE_CHOICES = [
app_commands.Choice(name=site["name"], value=site["name"])
for site in MONITORED_SITES
]
intents = discord.Intents.default()
client = discord.Client(intents=intents)
tree = app_commands.CommandTree(client)
session: aiohttp.ClientSession | None = None
@client.event
async def on_ready():
global session
init_db()
if session is None:
session = aiohttp.ClientSession(headers={"User-Agent": "VoteUptimeBot/1.0"})
if not poll_sites.is_running():
poll_sites.start()
await tree.sync()
print(f"Logged in as {client.user}")
@tasks.loop(minutes=15)
async def poll_sites():
now = datetime.now(timezone.utc).isoformat()
for site in MONITORED_SITES:
result = await check_site(session, site)
# Combine notes + detection_reason into the notes field so the DB
# schema doesn't need a migration. error_type already carries reason.
notes = result["notes"]
if result.get("detection_reason") and not notes:
notes = format_detection_reason(result["detection_reason"])
insert_check(
site_name=site["name"],
checked_at=now,
http_status=result["http_status"],
latency_ms=result["latency_ms"],
result=result["result"],
error_type=result["error_type"], # now carries detection_reason too
notes=notes,
)
@poll_sites.before_loop
async def before_poll_sites():
await client.wait_until_ready()
class SiteNameTransformer(app_commands.Transformer):
async def transform(self, interaction: discord.Interaction, value: str) -> str:
if value not in get_site_names(MONITORED_SITES):
raise app_commands.AppCommandError(f"Unknown site: {value}")
return value
uptime_group = app_commands.Group(name="uptime", description="Uptime tools")
tree.add_command(uptime_group)
@uptime_group.command(name="now", description="Show current configured sites")
async def uptime_now(interaction: discord.Interaction):
lines = []
for site in MONITORED_SITES:
rows = fetch_checks_since(
site["name"], datetime.now(timezone.utc) - timedelta(hours=1)
)
if not rows:
lines.append(f"**{site['name']}**: no recent data")
continue
checked_at, result, http_status, latency_ms, error_type = rows[-1]
# Emoji prefix for quick scanning
emoji = {"up": "🟩", "degraded": "🟨", "down": "🟥"}.get(result, "")
detail = f"{result.upper()} | status={http_status} | latency={latency_ms}ms"
if error_type:
detail += f" | reason={format_detection_reason(error_type)}"
lines.append(f"{emoji} **{site['name']}**: {detail}")
await interaction.response.send_message("\n".join(lines))
SITE_CHOICES = [
app_commands.Choice(name=site["name"], value=site["name"])
for site in MONITORED_SITES
]
@uptime_group.command(name="day", description="Last 24 hours in 15-minute bars")
@app_commands.describe(site="Site name")
@app_commands.choices(site=SITE_CHOICES)
async def uptime_day(
interaction: discord.Interaction,
site: app_commands.Transform[str, SiteNameTransformer],
):
since = datetime.now(timezone.utc) - timedelta(hours=24)
rows = fetch_checks_since(site, since)
results = [row[1] for row in rows]
bar = render_bar(results[-96:]) if results else ""
uptime = compute_uptime(results)
up, degraded, down = summarize_counts(results)
msg = (
f"**{site}** last 24h\n"
f"{bar}\n"
f"Uptime: **{uptime:.2f}%**\n"
f"Up: {up} | Degraded: {degraded} | Down: {down}"
)
await interaction.response.send_message(msg)
@uptime_group.command(name="month", description="Current month summary")
@app_commands.describe(site="Site name")
@app_commands.choices(site=SITE_CHOICES)
async def uptime_month(
interaction: discord.Interaction,
site: app_commands.Transform[str, SiteNameTransformer],
):
now = datetime.now(timezone.utc)
rows = fetch_month_checks(site, now.year, now.month)
by_day: dict[str, list[str]] = {}
for checked_at, result, *_ in rows:
day_key = checked_at[:10]
by_day.setdefault(day_key, []).append(result)
day_bars = []
for day in sorted(by_day.keys()):
pct = compute_uptime(by_day[day])
if pct >= 99:
day_bars.append("🟩")
elif pct >= 95:
day_bars.append("🟨")
else:
day_bars.append("🟥")
all_results = [row[1] for row in rows]
uptime = compute_uptime(all_results)
up, degraded, down = summarize_counts(all_results)
msg = (
f"**{site}** {now.year}-{now.month:02d}\n"
f"{''.join(day_bars) if day_bars else ''}\n"
f"Uptime: **{uptime:.2f}%**\n"
f"Up: {up} | Degraded: {degraded} | Down: {down}"
)
await interaction.response.send_message(msg)
@uptime_group.command(name="summarize", description="Summarize current month for all sites")
async def uptime_summarize(interaction: discord.Interaction):
now = datetime.now(timezone.utc)
lines = [f"**Monthly summary for {now.year}-{now.month:02d}**"]
for site in MONITORED_SITES:
rows = fetch_month_checks(site["name"], now.year, now.month)
results = [row[1] for row in rows]
uptime = compute_uptime(results)
up, degraded, down = summarize_counts(results)
lines.append(
f"{site['name']}: uptime={uptime:.2f}% | up={up} | degraded={degraded} | down={down}"
)
await interaction.response.send_message("\n".join(lines))
@tree.command(name="incident", description="Placeholder incident review command")
async def incident(interaction: discord.Interaction):
await interaction.response.send_message("Incident review command placeholder.")
@tree.command(name="hello", description="Say hello")
async def hello(interaction: discord.Interaction):
await interaction.response.send_message("Hello, world!")
@tree.command(name="add", description="Add two numbers")
async def add(interaction: discord.Interaction, a: float, b: float):
await interaction.response.send_message(f"Sum: {a + b}")
client.run(TOKEN)

161
app/checker.py Normal file
View File

@@ -0,0 +1,161 @@
#app.checker
import asyncio
import time
import aiohttp
# Fingerprints that identify a Cloudflare interstitial/challenge/block page.
# These appear in the response body even when the HTTP status is 200.
CLOUDFLARE_FINGERPRINTS = [
"Just a moment", # JS challenge page <title>
"cf-browser-verification", # Legacy challenge <form> id
"cf_chl_opt", # Challenge options JS variable
"cf-wrapper", # <body> class on block/error pages
"Checking your browser", # Human-verification copy
"Enable JavaScript and cookies to continue", # CF IUAM page body
"Please Wait... | Cloudflare", # Page title variant
"_cf_chl_f_tk", # Hidden challenge token field
]
def _detect_cloudflare(body: str) -> bool:
"""Return True if the response body looks like a Cloudflare intercept page."""
return any(fp in body for fp in CLOUDFLARE_FINGERPRINTS)
def _check_keywords(body: str, keywords: list[str]) -> list[str]:
"""Return any expected keywords that are absent from the body."""
return [kw for kw in keywords if kw not in body]
class SiteChecker:
def __init__(self, session: aiohttp.ClientSession):
self.session = session
async def check(self, site: dict) -> dict:
"""
Check a site dict with the following keys:
url (str) required
timeout_seconds (int) default 10
expected_status (int) default 200
expected_keywords (list) default [] — strings that must appear in body
max_retries (int) default 1 — extra attempts on CF intercept
Returns a result dict:
http_status, latency_ms, result, error_type, notes, detection_reason
"""
url = site["url"]
timeout_secs = site.get("timeout_seconds", 10)
expected_kws = site.get("expected_keywords", [])
max_retries = site.get("max_retries", 1)
last_result = None
for attempt in range(max_retries + 1):
last_result = await self._single_check(url, timeout_secs, expected_kws)
# Only retry when we hit a CF intercept and we have attempts left
if last_result["detection_reason"] == "cf_intercept" and attempt < max_retries:
await asyncio.sleep(2)
continue
break
return last_result
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
async def _single_check(
self,
url: str,
timeout_seconds: int,
expected_keywords: list[str],
) -> dict:
started = time.perf_counter()
try:
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
async with self.session.get(
url, timeout=timeout, allow_redirects=True
) as resp:
latency_ms = int((time.perf_counter() - started) * 1000)
body = await resp.text(errors="replace")
# --- Cloudflare intercept check (fires even on HTTP 200) ---
if _detect_cloudflare(body):
return {
"http_status": resp.status,
"latency_ms": latency_ms,
"result": "degraded",
"error_type": "cf_intercept",
"detection_reason": "cf_intercept",
"notes": "Cloudflare challenge/block page returned instead of real content",
}
# --- HTTP status check ---
if 200 <= resp.status < 400:
result = "up" if latency_ms < 3000 else "degraded"
detection_reason = "slow_response" if result == "degraded" else None
elif resp.status in {429, 500, 502, 503, 504}:
result = "degraded"
detection_reason = f"http_{resp.status}"
else:
result = "down"
detection_reason = f"http_{resp.status}"
# --- Keyword content check (only meaningful when HTTP is OK) ---
if result in {"up", "degraded"} and expected_keywords:
missing = _check_keywords(body, expected_keywords)
if missing:
result = "degraded"
detection_reason = "missing_keywords"
notes = f"Expected keywords not found in page: {missing}"
else:
notes = None
else:
notes = None
return {
"http_status": resp.status,
"latency_ms": latency_ms,
"result": result,
"error_type": detection_reason,
"detection_reason": detection_reason,
"notes": notes,
}
except asyncio.TimeoutError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "timeout",
"detection_reason": "timeout",
"notes": "Request timed out",
}
except aiohttp.ClientConnectorError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "connection",
"detection_reason": "connection",
"notes": "Could not connect",
}
except aiohttp.ClientSSLError:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "ssl",
"detection_reason": "ssl",
"notes": "SSL/TLS handshake error",
}
except Exception as e:
return {
"http_status": None,
"latency_ms": None,
"result": "down",
"error_type": "unknown",
"detection_reason": "unknown",
"notes": str(e),
}

53
app/config.py Normal file
View File

@@ -0,0 +1,53 @@
# app.config
from pydantic import BaseModel, HttpUrl, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class SiteConfig(BaseModel):
"""Schema for a single monitored site."""
name: str
url: HttpUrl
timeout_seconds: int = 10
expected_status: int = 200
expected_keywords: list[str] = []
max_retries: int = 1
@field_validator("timeout_seconds")
@classmethod
def timeout_must_be_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("timeout_seconds must be a positive integer")
return v
def to_dict(self) -> dict:
"""Return a plain dict compatible with check_site() in utils.py."""
return {
"name": self.name,
"url": str(self.url),
"timeout_seconds": self.timeout_seconds,
"expected_status": self.expected_status,
"expected_keywords": self.expected_keywords,
"max_retries": self.max_retries,
}
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
)
# Discord
discord_secret_key: str = ""
discord_client_id: str = ""
discord_client_secret: str = ""
# Database
database_path: str = "uptime.db"
# Sites — stored as a JSON array string in .env:
monitored_sites: list[SiteConfig] = []
settings = Settings()

95
app/db.py Normal file
View File

@@ -0,0 +1,95 @@
#app.db
import sqlite3
from datetime import datetime, timezone, timedelta
from .config import settings
DB_PATH = settings.database_path
def init_db() -> None:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS site_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_name TEXT NOT NULL,
checked_at TEXT NOT NULL,
http_status INTEGER,
latency_ms INTEGER,
result TEXT NOT NULL,
error_type TEXT,
notes TEXT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS monthly_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
month_key TEXT NOT NULL,
site_name TEXT NOT NULL,
total_checks INTEGER NOT NULL,
up_checks INTEGER NOT NULL,
degraded_checks INTEGER NOT NULL,
down_checks INTEGER NOT NULL,
uptime_percent REAL NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def insert_check(
site_name: str,
checked_at: str,
http_status: int | None,
latency_ms: int | None,
result: str,
error_type: str | None,
notes: str | None,
) -> None:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("""
INSERT INTO site_checks (
site_name, checked_at, http_status, latency_ms, result, error_type, notes
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (site_name, checked_at, http_status, latency_ms, result, error_type, notes))
conn.commit()
conn.close()
def fetch_checks_since(site_name: str, since_dt: datetime) -> list[tuple]:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("""
SELECT checked_at, result, http_status, latency_ms, error_type
FROM site_checks
WHERE site_name = ? AND checked_at >= ?
ORDER BY checked_at ASC
""", (site_name, since_dt.isoformat()))
rows = cur.fetchall()
conn.close()
return rows
def fetch_month_checks(site_name: str, year: int, month: int) -> list[tuple]:
start = datetime(year, month, 1, tzinfo=timezone.utc)
if month == 12:
end = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
else:
end = datetime(year, month + 1, 1, tzinfo=timezone.utc)
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("""
SELECT checked_at, result, http_status, latency_ms, error_type
FROM site_checks
WHERE site_name = ? AND checked_at >= ? AND checked_at < ?
ORDER BY checked_at ASC
""", (site_name, start.isoformat(), end.isoformat()))
rows = cur.fetchall()
conn.close()
return rows

88
app/utils.py Normal file
View File

@@ -0,0 +1,88 @@
#app.utils
# check_site is the single entry point used by bot.py.
# All HTTP / CF / keyword logic now lives in checker.py (SiteChecker).
# This file keeps the pure helper functions for bar rendering, uptime
# maths, and site-list utilities.
import aiohttp
from .checker import SiteChecker
async def check_site(session: aiohttp.ClientSession, site: dict) -> dict:
"""
Thin wrapper so bot.py doesn't need to import SiteChecker directly.
Accepts a site dict (same shape as MONITORED_SITES entries):
name, url, timeout_seconds, expected_status,
expected_keywords (optional list[str]),
max_retries (optional int, default 1)
"""
assert session is not None, "aiohttp session must be initialised before checking sites"
checker = SiteChecker(session)
return await checker.check(site)
# ---------------------------------------------------------------------------
# Pure helpers — no I/O
# ---------------------------------------------------------------------------
def summarize_counts(results: list[str]) -> tuple[int, int, int]:
"""Return (up, degraded, down) counts from a list of result strings."""
up = sum(1 for r in results if r == "up")
degraded = sum(1 for r in results if r == "degraded")
down = sum(1 for r in results if r == "down")
return up, degraded, down
def get_site_names(monitored_sites: list[dict]) -> list[str]:
return [site["name"] for site in monitored_sites]
def render_bar(results: list[str]) -> str:
"""
Convert a sequence of result strings into a coloured emoji bar.
Emoji key:
🟩 up 🟨 degraded / cf_intercept / missing_keywords
🟥 down ⬛ no data
"""
mapping = {
"up": "🟩",
"degraded": "🟨",
"down": "🟥",
"nodata": "",
}
return "".join(mapping.get(r, "") for r in results)
def compute_uptime(results: list[str]) -> float:
"""
Return the percentage of checks that were fully 'up'.
'nodata' entries are excluded from the denominator.
Returns 0.0 when there are no considered results.
"""
considered = [r for r in results if r != "nodata"]
if not considered:
return 0.0
good = sum(1 for r in considered if r == "up")
return (good / len(considered)) * 100
def format_detection_reason(reason: str | None) -> str:
"""Human-readable label for a detection_reason value."""
if not reason:
return ""
labels = {
"cf_intercept": "Cloudflare intercept",
"missing_keywords": "Content check failed",
"slow_response": "Slow response (>3 s)",
"timeout": "Request timed out",
"connection": "Connection refused",
"ssl": "SSL/TLS error",
"unknown": "Unknown error",
}
# http_NNN codes
if reason.startswith("http_"):
return f"HTTP {reason[5:]}"
return labels.get(reason, reason)