Compare commits
4 Commits
f3203f3b3d
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 09c8df405c | |||
| 9c9db344df | |||
| a627511467 | |||
| 2a3891936e |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -174,3 +174,4 @@ cython_debug/
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
*.db
|
||||
125
README.md
125
README.md
@@ -1,2 +1,127 @@
|
||||
# Voter-Uptime-Bot
|
||||
|
||||
|
||||
A Discord bot that monitors the availability of web services and reports uptime statistics directly in your server. Built with `discord.py`, `aiohttp`, and `pydantic-settings`, it goes beyond simple HTTP checks — detecting Cloudflare intercept pages and validating real page content so you know your sites are genuinely reachable, not just returning a 200.
|
||||
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
- **Periodic polling** — checks all configured sites every 15 minutes
|
||||
- **Cloudflare intercept detection** — identifies challenge/block pages that return HTTP 200 but serve no real content
|
||||
- **Keyword content validation** — confirms expected strings are present on the loaded page
|
||||
- **Retry logic** — re-attempts checks on transient CF intercepts before marking a site degraded
|
||||
- **Uptime history** — stores all check results in a local SQLite database
|
||||
- **Discord slash commands** — view live status, 24-hour bars, and monthly summaries per site
|
||||
|
||||
---
|
||||
|
||||
## Commands
|
||||
|
||||
| Command | Description |
|
||||
|---|---|
|
||||
| `/uptime now` | Current status of all monitored sites |
|
||||
| `/uptime day <site>` | 24-hour bar chart (🟩🟨🟥) with uptime % |
|
||||
| `/uptime month <site>` | Current month summary, one square per day |
|
||||
| `/uptime summarize` | Monthly uptime % for all sites at once |
|
||||
|
||||
---
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
app/
|
||||
├── __init__.py
|
||||
├── bot.py # Discord client, slash commands, polling task
|
||||
├── checker.py # SiteChecker — HTTP, CF detection, keyword validation
|
||||
├── config.py # Pydantic Settings — loads all config from .env
|
||||
├── db.py # SQLite init, insert, and fetch helpers
|
||||
└── utils.py # check_site wrapper, bar rendering, uptime maths
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
All configuration is managed via a `.env` file using `pydantic-settings`. Copy `.env.example` to `.env` and fill in your values.
|
||||
|
||||
```env
|
||||
DISCORD_SECRET_KEY=your-bot-token-here
|
||||
DISCORD_CLIENT_ID=123456789
|
||||
DATABASE_PATH=uptime.db
|
||||
|
||||
# How often to poll all sites, in minutes (default: 15)
|
||||
POLL_INTERVAL_MINUTES=15
|
||||
|
||||
# Discord channel ID to post alerts in. Set to 0 to disable alerts entirely.
|
||||
ALERT_CHANNEL_ID=1493840872146600036
|
||||
|
||||
# Minimum minutes between repeat incident alerts for the same site.
|
||||
# Recoveries always bypass this cooldown.
|
||||
ALERT_COOLDOWN_MINUTES=30
|
||||
|
||||
MONITORED_SITES='[
|
||||
{
|
||||
"name": "MySite",
|
||||
"url": "https://example.com",
|
||||
"timeout_seconds": 10,
|
||||
"expected_keywords": ["Login", "Vote"],
|
||||
"max_retries": 1
|
||||
}
|
||||
]'
|
||||
```
|
||||
|
||||
### Site config fields
|
||||
|
||||
| Field | Required | Default | Description |
|
||||
|---|---|---|---|
|
||||
| `name` | ✅ | — | Display name used in Discord |
|
||||
| `url` | ✅ | — | Full URL to check |
|
||||
| `timeout_seconds` | | `10` | Request timeout |
|
||||
| `expected_status` | | `200` | Expected HTTP status code |
|
||||
| `expected_keywords` | | `[]` | Strings that must appear in the page body |
|
||||
| `max_retries` | | `1` | Extra attempts on CF intercept before marking degraded |
|
||||
|
||||
---
|
||||
|
||||
## How Checks Work
|
||||
|
||||
Each poll goes through three layers:
|
||||
|
||||
1. **HTTP status** — non-2xx/3xx responses are marked degraded or down
|
||||
2. **Cloudflare detection** — response body is scanned for CF challenge fingerprints; a match is marked `degraded` even if status was 200
|
||||
3. **Keyword validation** — any `expected_keywords` that are missing from the body mark the site `degraded`
|
||||
|
||||
Results are stored with a `detection_reason` (`cf_intercept`, `missing_keywords`, `http_503`, `timeout`, etc.) so you can query historical failure causes.
|
||||
|
||||
---
|
||||
|
||||
## Result States
|
||||
|
||||
| State | Emoji | Meaning |
|
||||
|---|---|---|
|
||||
| `up` | 🟩 | HTTP OK, no CF intercept, all keywords present, latency < 3s |
|
||||
| `degraded` | 🟨 | Reachable but CF intercept, missing content, slow, or 5xx |
|
||||
| `down` | 🟥 | Timeout, connection failure, SSL error, or unexpected status |
|
||||
|
||||
---
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
Run the bot:
|
||||
|
||||
```bash
|
||||
python -m app.bot
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.11+
|
||||
- A Discord bot token with the `applications.commands` scope and `bot` scope enabled
|
||||
- The bot must be invited to your server with slash command permissions
|
||||
1
app/__init__.py
Normal file
1
app/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
#app.__init__
|
||||
346
app/bot.py
Normal file
346
app/bot.py
Normal file
@@ -0,0 +1,346 @@
|
||||
#app.bot
|
||||
|
||||
import discord
|
||||
import aiohttp
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from discord import app_commands
|
||||
from discord.ext import tasks
|
||||
|
||||
from .config import settings
|
||||
from .db import init_db, insert_check, fetch_checks_since,fetch_month_checks
|
||||
from .utils import (
|
||||
check_site, summarize_counts, get_site_names, render_bar, compute_uptime, format_detection_reason
|
||||
)
|
||||
|
||||
TOKEN = settings.discord_secret_key
|
||||
|
||||
# Convert Pydantic models → plain dicts once so the rest of bot.py is unchanged.
|
||||
MONITORED_SITES = [site.to_dict() for site in settings.monitored_sites]
|
||||
|
||||
# SITE_CHOICES stays exactly the same — it reads from MONITORED_SITES:
|
||||
SITE_CHOICES = [
|
||||
app_commands.Choice(name=site["name"], value=site["name"])
|
||||
for site in MONITORED_SITES
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State tracking — populated on startup, updated each poll
|
||||
# ---------------------------------------------------------------------------
|
||||
# Holds the last known result string per site name: {"Site1": "up", ...}
|
||||
last_known_state: dict[str, str] = {}
|
||||
|
||||
# Tracks when we last sent an alert per site to enforce the cooldown
|
||||
last_alerted_at: dict[str, datetime] = {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Alert helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Transitions that warrant an alert
|
||||
ALERT_TRANSITIONS = {
|
||||
("up", "down"),
|
||||
("up", "degraded"),
|
||||
("degraded", "down"),
|
||||
("down", "up"), # recovery
|
||||
("degraded", "up"), # recovery
|
||||
}
|
||||
|
||||
INCIDENT_EMOJI = {
|
||||
"down": "🟥",
|
||||
"degraded": "🟨",
|
||||
"up": "🟩",
|
||||
}
|
||||
|
||||
|
||||
async def maybe_send_alert(
|
||||
site_name: str,
|
||||
prev: str,
|
||||
curr: str,
|
||||
result: dict,
|
||||
now: datetime,
|
||||
) -> None:
|
||||
"""Send an alert to the alerts channel if conditions are met."""
|
||||
|
||||
# Alerts disabled
|
||||
if not settings.alert_channel_id:
|
||||
return
|
||||
|
||||
# Not a transition we care about
|
||||
if (prev, curr) not in ALERT_TRANSITIONS:
|
||||
return
|
||||
|
||||
is_recovery = curr == "up"
|
||||
|
||||
# Cooldown — recoveries always bypass so you always know when a site is back
|
||||
if not is_recovery:
|
||||
last = last_alerted_at.get(site_name)
|
||||
cooldown = timedelta(minutes=settings.alert_cooldown_minutes)
|
||||
if last and (now - last) < cooldown:
|
||||
return
|
||||
|
||||
last_alerted_at[site_name] = now
|
||||
|
||||
channel = client.get_channel(settings.alert_channel_id)
|
||||
if channel is None:
|
||||
print(f"Alert channel {settings.alert_channel_id} not found.")
|
||||
return
|
||||
|
||||
await channel.send(_build_alert_message(site_name, prev, curr, result, now))
|
||||
|
||||
|
||||
def _build_alert_message(
|
||||
site_name: str,
|
||||
prev: str,
|
||||
curr: str,
|
||||
result: dict,
|
||||
now: datetime,
|
||||
) -> str:
|
||||
is_recovery = curr == "up"
|
||||
emoji = INCIDENT_EMOJI.get(curr, "⬛")
|
||||
title = "RECOVERED" if is_recovery else curr.upper()
|
||||
timestamp = now.strftime("%H:%M UTC")
|
||||
|
||||
lines = [
|
||||
f"{emoji} **{title}** | {site_name}",
|
||||
f"Status: `{prev}` → `{curr}`",
|
||||
]
|
||||
|
||||
if is_recovery:
|
||||
if result.get("latency_ms") is not None:
|
||||
lines.append(f"Latency: {result['latency_ms']}ms")
|
||||
# Approximate downtime from DB
|
||||
downtime = _approximate_downtime(site_name, now)
|
||||
if downtime:
|
||||
lines.append(f"Downtime: ~{downtime}")
|
||||
else:
|
||||
reason = format_detection_reason(result.get("detection_reason"))
|
||||
if reason:
|
||||
lines.append(f"Reason: {reason}")
|
||||
if result.get("http_status"):
|
||||
lines.append(f"HTTP status: {result['http_status']}")
|
||||
|
||||
lines.append(f"Checked: {timestamp}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _approximate_downtime(site_name: str, now: datetime) -> str | None:
|
||||
"""
|
||||
Walk back through recent checks to find the last 'up' row and return
|
||||
a human-readable duration string, e.g. '~45 min' or '~2 hr 10 min'.
|
||||
"""
|
||||
rows = fetch_checks_since(site_name, now - timedelta(days=1))
|
||||
last_up_at = None
|
||||
for checked_at, result, *_ in reversed(rows):
|
||||
if result == "up":
|
||||
last_up_at = datetime.fromisoformat(checked_at)
|
||||
break
|
||||
|
||||
if last_up_at is None:
|
||||
return None
|
||||
|
||||
delta = now - last_up_at
|
||||
total_minutes = int(delta.total_seconds() // 60)
|
||||
if total_minutes < 60:
|
||||
return f"{total_minutes} min"
|
||||
hours, mins = divmod(total_minutes, 60)
|
||||
return f"{hours} hr {mins} min" if mins else f"{hours} hr"
|
||||
|
||||
|
||||
intents = discord.Intents.default()
|
||||
client = discord.Client(intents=intents)
|
||||
tree = app_commands.CommandTree(client)
|
||||
session: aiohttp.ClientSession | None = None
|
||||
|
||||
@client.event
|
||||
async def on_ready():
|
||||
global session
|
||||
init_db()
|
||||
|
||||
if session is None:
|
||||
session = aiohttp.ClientSession(headers={"User-Agent": "VoteUptimeBot/1.0"})
|
||||
|
||||
# Seed state from the most recent DB row per site so we don't false-alert
|
||||
# on restart.
|
||||
for site in MONITORED_SITES:
|
||||
rows = fetch_checks_since(
|
||||
site["name"], datetime.now(timezone.utc) - timedelta(hours=1)
|
||||
)
|
||||
if rows:
|
||||
last_known_state[site["name"]] = rows[-1][1] # result column
|
||||
|
||||
if not poll_sites.is_running():
|
||||
poll_sites.start()
|
||||
|
||||
await tree.sync()
|
||||
print(f"Logged in as {client.user}")
|
||||
|
||||
@tasks.loop(minutes=settings.poll_interval_minutes)
|
||||
async def poll_sites():
|
||||
now = datetime.now(timezone.utc)
|
||||
now_iso = now.isoformat()
|
||||
|
||||
for site in MONITORED_SITES:
|
||||
result = await check_site(session, site)
|
||||
site_name = site["name"]
|
||||
new_state = result["result"]
|
||||
|
||||
notes = result["notes"]
|
||||
if result.get("detection_reason") and not notes:
|
||||
notes = format_detection_reason(result["detection_reason"])
|
||||
|
||||
insert_check(
|
||||
site_name=site_name,
|
||||
checked_at=now_iso,
|
||||
http_status=result["http_status"],
|
||||
latency_ms=result["latency_ms"],
|
||||
result=new_state,
|
||||
error_type=result["error_type"],
|
||||
notes=notes,
|
||||
)
|
||||
|
||||
# --- Alert logic ---
|
||||
prev_state = last_known_state.get(site_name)
|
||||
last_known_state[site_name] = new_state
|
||||
|
||||
if prev_state is not None and prev_state != new_state:
|
||||
await maybe_send_alert(site_name, prev_state, new_state, result, now)
|
||||
|
||||
@poll_sites.before_loop
|
||||
async def before_poll_sites():
|
||||
await client.wait_until_ready()
|
||||
|
||||
class SiteNameTransformer(app_commands.Transformer):
|
||||
async def transform(self, interaction: discord.Interaction, value: str) -> str:
|
||||
if value not in get_site_names(MONITORED_SITES):
|
||||
raise app_commands.AppCommandError(f"Unknown site: {value}")
|
||||
return value
|
||||
|
||||
uptime_group = app_commands.Group(name="uptime", description="Uptime tools")
|
||||
tree.add_command(uptime_group)
|
||||
|
||||
|
||||
@uptime_group.command(name="now", description="Show current configured sites")
|
||||
async def uptime_now(interaction: discord.Interaction):
|
||||
lines = []
|
||||
for site in MONITORED_SITES:
|
||||
rows = fetch_checks_since(
|
||||
site["name"], datetime.now(timezone.utc) - timedelta(hours=1)
|
||||
)
|
||||
if not rows:
|
||||
lines.append(f"**{site['name']}**: no recent data")
|
||||
continue
|
||||
|
||||
checked_at, result, http_status, latency_ms, error_type = rows[-1]
|
||||
|
||||
# Emoji prefix for quick scanning
|
||||
emoji = {"up": "🟩", "degraded": "🟨", "down": "🟥"}.get(result, "⬛")
|
||||
|
||||
detail = f"{result.upper()} | status={http_status} | latency={latency_ms}ms"
|
||||
if error_type:
|
||||
detail += f" | reason={format_detection_reason(error_type)}"
|
||||
|
||||
lines.append(f"{emoji} **{site['name']}**: {detail}")
|
||||
|
||||
await interaction.response.send_message("\n".join(lines))
|
||||
|
||||
|
||||
SITE_CHOICES = [
|
||||
app_commands.Choice(name=site["name"], value=site["name"])
|
||||
for site in MONITORED_SITES
|
||||
]
|
||||
|
||||
@uptime_group.command(name="day", description="Last 24 hours in 15-minute bars")
|
||||
@app_commands.describe(site="Site name")
|
||||
@app_commands.choices(site=SITE_CHOICES)
|
||||
async def uptime_day(
|
||||
interaction: discord.Interaction,
|
||||
site: app_commands.Transform[str, SiteNameTransformer],
|
||||
):
|
||||
since = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
rows = fetch_checks_since(site, since)
|
||||
results = [row[1] for row in rows]
|
||||
|
||||
bar = render_bar(results[-96:]) if results else "⬛"
|
||||
uptime = compute_uptime(results)
|
||||
up, degraded, down = summarize_counts(results)
|
||||
|
||||
msg = (
|
||||
f"**{site}** last 24h\n"
|
||||
f"{bar}\n"
|
||||
f"Uptime: **{uptime:.2f}%**\n"
|
||||
f"Up: {up} | Degraded: {degraded} | Down: {down}"
|
||||
)
|
||||
await interaction.response.send_message(msg)
|
||||
|
||||
|
||||
@uptime_group.command(name="month", description="Current month summary")
|
||||
@app_commands.describe(site="Site name")
|
||||
@app_commands.choices(site=SITE_CHOICES)
|
||||
async def uptime_month(
|
||||
interaction: discord.Interaction,
|
||||
site: app_commands.Transform[str, SiteNameTransformer],
|
||||
):
|
||||
now = datetime.now(timezone.utc)
|
||||
rows = fetch_month_checks(site, now.year, now.month)
|
||||
|
||||
by_day: dict[str, list[str]] = {}
|
||||
for checked_at, result, *_ in rows:
|
||||
day_key = checked_at[:10]
|
||||
by_day.setdefault(day_key, []).append(result)
|
||||
|
||||
day_bars = []
|
||||
for day in sorted(by_day.keys()):
|
||||
pct = compute_uptime(by_day[day])
|
||||
if pct >= 99:
|
||||
day_bars.append("🟩")
|
||||
elif pct >= 95:
|
||||
day_bars.append("🟨")
|
||||
else:
|
||||
day_bars.append("🟥")
|
||||
|
||||
all_results = [row[1] for row in rows]
|
||||
uptime = compute_uptime(all_results)
|
||||
up, degraded, down = summarize_counts(all_results)
|
||||
|
||||
msg = (
|
||||
f"**{site}** {now.year}-{now.month:02d}\n"
|
||||
f"{''.join(day_bars) if day_bars else '⬛'}\n"
|
||||
f"Uptime: **{uptime:.2f}%**\n"
|
||||
f"Up: {up} | Degraded: {degraded} | Down: {down}"
|
||||
)
|
||||
await interaction.response.send_message(msg)
|
||||
|
||||
|
||||
@uptime_group.command(name="summarize", description="Summarize current month for all sites")
|
||||
async def uptime_summarize(interaction: discord.Interaction):
|
||||
now = datetime.now(timezone.utc)
|
||||
lines = [f"**Monthly summary for {now.year}-{now.month:02d}**"]
|
||||
|
||||
for site in MONITORED_SITES:
|
||||
rows = fetch_month_checks(site["name"], now.year, now.month)
|
||||
results = [row[1] for row in rows]
|
||||
uptime = compute_uptime(results)
|
||||
up, degraded, down = summarize_counts(results)
|
||||
lines.append(
|
||||
f"{site['name']}: uptime={uptime:.2f}% | up={up} | degraded={degraded} | down={down}"
|
||||
)
|
||||
|
||||
await interaction.response.send_message("\n".join(lines))
|
||||
|
||||
|
||||
@tree.command(name="incident", description="Placeholder incident review command")
|
||||
async def incident(interaction: discord.Interaction):
|
||||
await interaction.response.send_message("Incident review command placeholder.")
|
||||
|
||||
|
||||
@tree.command(name="hello", description="Say hello")
|
||||
async def hello(interaction: discord.Interaction):
|
||||
await interaction.response.send_message("Hello, world!")
|
||||
|
||||
|
||||
@tree.command(name="add", description="Add two numbers")
|
||||
async def add(interaction: discord.Interaction, a: float, b: float):
|
||||
await interaction.response.send_message(f"Sum: {a + b}")
|
||||
|
||||
client.run(TOKEN)
|
||||
161
app/checker.py
Normal file
161
app/checker.py
Normal file
@@ -0,0 +1,161 @@
|
||||
#app.checker
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import aiohttp
|
||||
|
||||
# Fingerprints that identify a Cloudflare interstitial/challenge/block page.
|
||||
# These appear in the response body even when the HTTP status is 200.
|
||||
CLOUDFLARE_FINGERPRINTS = [
|
||||
"Just a moment", # JS challenge page <title>
|
||||
"cf-browser-verification", # Legacy challenge <form> id
|
||||
"cf_chl_opt", # Challenge options JS variable
|
||||
"cf-wrapper", # <body> class on block/error pages
|
||||
"Checking your browser", # Human-verification copy
|
||||
"Enable JavaScript and cookies to continue", # CF IUAM page body
|
||||
"Please Wait... | Cloudflare", # Page title variant
|
||||
"_cf_chl_f_tk", # Hidden challenge token field
|
||||
]
|
||||
|
||||
|
||||
def _detect_cloudflare(body: str) -> bool:
|
||||
"""Return True if the response body looks like a Cloudflare intercept page."""
|
||||
return any(fp in body for fp in CLOUDFLARE_FINGERPRINTS)
|
||||
|
||||
|
||||
def _check_keywords(body: str, keywords: list[str]) -> list[str]:
|
||||
"""Return any expected keywords that are absent from the body."""
|
||||
return [kw for kw in keywords if kw not in body]
|
||||
|
||||
|
||||
class SiteChecker:
|
||||
def __init__(self, session: aiohttp.ClientSession):
|
||||
self.session = session
|
||||
|
||||
async def check(self, site: dict) -> dict:
|
||||
"""
|
||||
Check a site dict with the following keys:
|
||||
url (str) required
|
||||
timeout_seconds (int) default 10
|
||||
expected_status (int) default 200
|
||||
expected_keywords (list) default [] — strings that must appear in body
|
||||
max_retries (int) default 1 — extra attempts on CF intercept
|
||||
|
||||
Returns a result dict:
|
||||
http_status, latency_ms, result, error_type, notes, detection_reason
|
||||
"""
|
||||
url = site["url"]
|
||||
timeout_secs = site.get("timeout_seconds", 10)
|
||||
expected_kws = site.get("expected_keywords", [])
|
||||
max_retries = site.get("max_retries", 1)
|
||||
|
||||
last_result = None
|
||||
for attempt in range(max_retries + 1):
|
||||
last_result = await self._single_check(url, timeout_secs, expected_kws)
|
||||
|
||||
# Only retry when we hit a CF intercept and we have attempts left
|
||||
if last_result["detection_reason"] == "cf_intercept" and attempt < max_retries:
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
break
|
||||
|
||||
return last_result
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _single_check(
|
||||
self,
|
||||
url: str,
|
||||
timeout_seconds: int,
|
||||
expected_keywords: list[str],
|
||||
) -> dict:
|
||||
started = time.perf_counter()
|
||||
try:
|
||||
timeout = aiohttp.ClientTimeout(total=timeout_seconds)
|
||||
async with self.session.get(
|
||||
url, timeout=timeout, allow_redirects=True
|
||||
) as resp:
|
||||
latency_ms = int((time.perf_counter() - started) * 1000)
|
||||
body = await resp.text(errors="replace")
|
||||
|
||||
# --- Cloudflare intercept check (fires even on HTTP 200) ---
|
||||
if _detect_cloudflare(body):
|
||||
return {
|
||||
"http_status": resp.status,
|
||||
"latency_ms": latency_ms,
|
||||
"result": "degraded",
|
||||
"error_type": "cf_intercept",
|
||||
"detection_reason": "cf_intercept",
|
||||
"notes": "Cloudflare challenge/block page returned instead of real content",
|
||||
}
|
||||
|
||||
# --- HTTP status check ---
|
||||
if 200 <= resp.status < 400:
|
||||
result = "up" if latency_ms < 3000 else "degraded"
|
||||
detection_reason = "slow_response" if result == "degraded" else None
|
||||
elif resp.status in {429, 500, 502, 503, 504}:
|
||||
result = "degraded"
|
||||
detection_reason = f"http_{resp.status}"
|
||||
else:
|
||||
result = "down"
|
||||
detection_reason = f"http_{resp.status}"
|
||||
|
||||
# --- Keyword content check (only meaningful when HTTP is OK) ---
|
||||
if result in {"up", "degraded"} and expected_keywords:
|
||||
missing = _check_keywords(body, expected_keywords)
|
||||
if missing:
|
||||
result = "degraded"
|
||||
detection_reason = "missing_keywords"
|
||||
notes = f"Expected keywords not found in page: {missing}"
|
||||
else:
|
||||
notes = None
|
||||
else:
|
||||
notes = None
|
||||
|
||||
return {
|
||||
"http_status": resp.status,
|
||||
"latency_ms": latency_ms,
|
||||
"result": result,
|
||||
"error_type": detection_reason,
|
||||
"detection_reason": detection_reason,
|
||||
"notes": notes,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "timeout",
|
||||
"detection_reason": "timeout",
|
||||
"notes": "Request timed out",
|
||||
}
|
||||
except aiohttp.ClientConnectorError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "connection",
|
||||
"detection_reason": "connection",
|
||||
"notes": "Could not connect",
|
||||
}
|
||||
except aiohttp.ClientSSLError:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "ssl",
|
||||
"detection_reason": "ssl",
|
||||
"notes": "SSL/TLS handshake error",
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"http_status": None,
|
||||
"latency_ms": None,
|
||||
"result": "down",
|
||||
"error_type": "unknown",
|
||||
"detection_reason": "unknown",
|
||||
"notes": str(e),
|
||||
}
|
||||
67
app/config.py
Normal file
67
app/config.py
Normal file
@@ -0,0 +1,67 @@
|
||||
# app.config
|
||||
|
||||
from pydantic import BaseModel, HttpUrl, field_validator
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class SiteConfig(BaseModel):
|
||||
"""Schema for a single monitored site."""
|
||||
name: str
|
||||
url: HttpUrl
|
||||
timeout_seconds: int = 10
|
||||
expected_status: int = 200
|
||||
expected_keywords: list[str] = []
|
||||
max_retries: int = 1
|
||||
|
||||
@field_validator("timeout_seconds")
|
||||
@classmethod
|
||||
def timeout_must_be_positive(cls, v: int) -> int:
|
||||
if v <= 0:
|
||||
raise ValueError("timeout_seconds must be a positive integer")
|
||||
return v
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return a plain dict compatible with check_site() in utils.py."""
|
||||
return {
|
||||
"name": self.name,
|
||||
"url": str(self.url),
|
||||
"timeout_seconds": self.timeout_seconds,
|
||||
"expected_status": self.expected_status,
|
||||
"expected_keywords": self.expected_keywords,
|
||||
"max_retries": self.max_retries,
|
||||
}
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
)
|
||||
|
||||
# Discord
|
||||
discord_secret_key: str
|
||||
discord_client_id: str = ""
|
||||
discord_client_secret: str = ""
|
||||
|
||||
# Database
|
||||
database_path: str = "uptime.db"
|
||||
|
||||
# Polling — how often to check all sites (minimum 1 minute)
|
||||
poll_interval_minutes: int = 15
|
||||
|
||||
# Alerts — set alert_channel_id to 0 to disable
|
||||
alert_channel_id: int = 0
|
||||
alert_cooldown_minutes: int = 30
|
||||
|
||||
# Sites — stored as a JSON array string in .env:
|
||||
monitored_sites: list[SiteConfig] = []
|
||||
|
||||
@field_validator("poll_interval_minutes")
|
||||
@classmethod
|
||||
def poll_interval_must_be_positive(cls, v: int) -> int:
|
||||
if v < 1:
|
||||
raise ValueError("poll_interval_minutes must be at least 1")
|
||||
return v
|
||||
|
||||
|
||||
settings = Settings()
|
||||
95
app/db.py
Normal file
95
app/db.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#app.db
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
from .config import settings
|
||||
DB_PATH = settings.database_path
|
||||
|
||||
def init_db() -> None:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS site_checks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_name TEXT NOT NULL,
|
||||
checked_at TEXT NOT NULL,
|
||||
http_status INTEGER,
|
||||
latency_ms INTEGER,
|
||||
result TEXT NOT NULL,
|
||||
error_type TEXT,
|
||||
notes TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS monthly_reports (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
month_key TEXT NOT NULL,
|
||||
site_name TEXT NOT NULL,
|
||||
total_checks INTEGER NOT NULL,
|
||||
up_checks INTEGER NOT NULL,
|
||||
degraded_checks INTEGER NOT NULL,
|
||||
down_checks INTEGER NOT NULL,
|
||||
uptime_percent REAL NOT NULL,
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def insert_check(
|
||||
site_name: str,
|
||||
checked_at: str,
|
||||
http_status: int | None,
|
||||
latency_ms: int | None,
|
||||
result: str,
|
||||
error_type: str | None,
|
||||
notes: str | None,
|
||||
) -> None:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO site_checks (
|
||||
site_name, checked_at, http_status, latency_ms, result, error_type, notes
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""", (site_name, checked_at, http_status, latency_ms, result, error_type, notes))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def fetch_checks_since(site_name: str, since_dt: datetime) -> list[tuple]:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT checked_at, result, http_status, latency_ms, error_type
|
||||
FROM site_checks
|
||||
WHERE site_name = ? AND checked_at >= ?
|
||||
ORDER BY checked_at ASC
|
||||
""", (site_name, since_dt.isoformat()))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
|
||||
|
||||
def fetch_month_checks(site_name: str, year: int, month: int) -> list[tuple]:
|
||||
start = datetime(year, month, 1, tzinfo=timezone.utc)
|
||||
if month == 12:
|
||||
end = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
|
||||
else:
|
||||
end = datetime(year, month + 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT checked_at, result, http_status, latency_ms, error_type
|
||||
FROM site_checks
|
||||
WHERE site_name = ? AND checked_at >= ? AND checked_at < ?
|
||||
ORDER BY checked_at ASC
|
||||
""", (site_name, start.isoformat(), end.isoformat()))
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return rows
|
||||
88
app/utils.py
Normal file
88
app/utils.py
Normal file
@@ -0,0 +1,88 @@
|
||||
#app.utils
|
||||
|
||||
# check_site is the single entry point used by bot.py.
|
||||
# All HTTP / CF / keyword logic now lives in checker.py (SiteChecker).
|
||||
# This file keeps the pure helper functions for bar rendering, uptime
|
||||
# maths, and site-list utilities.
|
||||
|
||||
import aiohttp
|
||||
from .checker import SiteChecker
|
||||
|
||||
|
||||
async def check_site(session: aiohttp.ClientSession, site: dict) -> dict:
|
||||
"""
|
||||
Thin wrapper so bot.py doesn't need to import SiteChecker directly.
|
||||
|
||||
Accepts a site dict (same shape as MONITORED_SITES entries):
|
||||
name, url, timeout_seconds, expected_status,
|
||||
expected_keywords (optional list[str]),
|
||||
max_retries (optional int, default 1)
|
||||
"""
|
||||
assert session is not None, "aiohttp session must be initialised before checking sites"
|
||||
checker = SiteChecker(session)
|
||||
return await checker.check(site)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pure helpers — no I/O
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def summarize_counts(results: list[str]) -> tuple[int, int, int]:
|
||||
"""Return (up, degraded, down) counts from a list of result strings."""
|
||||
up = sum(1 for r in results if r == "up")
|
||||
degraded = sum(1 for r in results if r == "degraded")
|
||||
down = sum(1 for r in results if r == "down")
|
||||
return up, degraded, down
|
||||
|
||||
|
||||
def get_site_names(monitored_sites: list[dict]) -> list[str]:
|
||||
return [site["name"] for site in monitored_sites]
|
||||
|
||||
|
||||
def render_bar(results: list[str]) -> str:
|
||||
"""
|
||||
Convert a sequence of result strings into a coloured emoji bar.
|
||||
|
||||
Emoji key:
|
||||
🟩 up 🟨 degraded / cf_intercept / missing_keywords
|
||||
🟥 down ⬛ no data
|
||||
"""
|
||||
mapping = {
|
||||
"up": "🟩",
|
||||
"degraded": "🟨",
|
||||
"down": "🟥",
|
||||
"nodata": "⬛",
|
||||
}
|
||||
return "".join(mapping.get(r, "⬛") for r in results)
|
||||
|
||||
|
||||
def compute_uptime(results: list[str]) -> float:
|
||||
"""
|
||||
Return the percentage of checks that were fully 'up'.
|
||||
'nodata' entries are excluded from the denominator.
|
||||
Returns 0.0 when there are no considered results.
|
||||
"""
|
||||
considered = [r for r in results if r != "nodata"]
|
||||
if not considered:
|
||||
return 0.0
|
||||
good = sum(1 for r in considered if r == "up")
|
||||
return (good / len(considered)) * 100
|
||||
|
||||
|
||||
def format_detection_reason(reason: str | None) -> str:
|
||||
"""Human-readable label for a detection_reason value."""
|
||||
if not reason:
|
||||
return ""
|
||||
labels = {
|
||||
"cf_intercept": "Cloudflare intercept",
|
||||
"missing_keywords": "Content check failed",
|
||||
"slow_response": "Slow response (>3 s)",
|
||||
"timeout": "Request timed out",
|
||||
"connection": "Connection refused",
|
||||
"ssl": "SSL/TLS error",
|
||||
"unknown": "Unknown error",
|
||||
}
|
||||
# http_NNN codes
|
||||
if reason.startswith("http_"):
|
||||
return f"HTTP {reason[5:]}"
|
||||
return labels.get(reason, reason)
|
||||
BIN
requirements.txt
Normal file
BIN
requirements.txt
Normal file
Binary file not shown.
Reference in New Issue
Block a user