Compare commits

...

1 Commits

Author SHA1 Message Date
09c8df405c Bot Version 1.1.0 2026-04-14 22:14:08 -07:00
3 changed files with 179 additions and 15 deletions

View File

@@ -50,6 +50,16 @@ DISCORD_SECRET_KEY=your-bot-token-here
DISCORD_CLIENT_ID=123456789
DATABASE_PATH=uptime.db
# How often to poll all sites, in minutes (default: 15)
POLL_INTERVAL_MINUTES=15
# Discord channel ID to post alerts in. Set to 0 to disable alerts entirely.
ALERT_CHANNEL_ID=1493840872146600036
# Minimum minutes between repeat incident alerts for the same site.
# Recoveries always bypass this cooldown.
ALERT_COOLDOWN_MINUTES=30
MONITORED_SITES='[
{
"name": "MySite",

View File

@@ -9,8 +9,7 @@ from discord.ext import tasks
from .config import settings
from .db import init_db, insert_check, fetch_checks_since,fetch_month_checks
from .utils import (
check_site, summarize_counts, get_site_names,
render_bar, compute_uptime, format_detection_reason,
check_site, summarize_counts, get_site_names, render_bar, compute_uptime, format_detection_reason
)
TOKEN = settings.discord_secret_key
@@ -24,6 +23,130 @@ SITE_CHOICES = [
for site in MONITORED_SITES
]
# ---------------------------------------------------------------------------
# State tracking — populated on startup, updated each poll
# ---------------------------------------------------------------------------
# Holds the last known result string per site name: {"Site1": "up", ...}
last_known_state: dict[str, str] = {}
# Tracks when we last sent an alert per site to enforce the cooldown
last_alerted_at: dict[str, datetime] = {}
# ---------------------------------------------------------------------------
# Alert helpers
# ---------------------------------------------------------------------------
# Transitions that warrant an alert
ALERT_TRANSITIONS = {
("up", "down"),
("up", "degraded"),
("degraded", "down"),
("down", "up"), # recovery
("degraded", "up"), # recovery
}
INCIDENT_EMOJI = {
"down": "🟥",
"degraded": "🟨",
"up": "🟩",
}
async def maybe_send_alert(
site_name: str,
prev: str,
curr: str,
result: dict,
now: datetime,
) -> None:
"""Send an alert to the alerts channel if conditions are met."""
# Alerts disabled
if not settings.alert_channel_id:
return
# Not a transition we care about
if (prev, curr) not in ALERT_TRANSITIONS:
return
is_recovery = curr == "up"
# Cooldown — recoveries always bypass so you always know when a site is back
if not is_recovery:
last = last_alerted_at.get(site_name)
cooldown = timedelta(minutes=settings.alert_cooldown_minutes)
if last and (now - last) < cooldown:
return
last_alerted_at[site_name] = now
channel = client.get_channel(settings.alert_channel_id)
if channel is None:
print(f"Alert channel {settings.alert_channel_id} not found.")
return
await channel.send(_build_alert_message(site_name, prev, curr, result, now))
def _build_alert_message(
site_name: str,
prev: str,
curr: str,
result: dict,
now: datetime,
) -> str:
is_recovery = curr == "up"
emoji = INCIDENT_EMOJI.get(curr, "")
title = "RECOVERED" if is_recovery else curr.upper()
timestamp = now.strftime("%H:%M UTC")
lines = [
f"{emoji} **{title}** | {site_name}",
f"Status: `{prev}` → `{curr}`",
]
if is_recovery:
if result.get("latency_ms") is not None:
lines.append(f"Latency: {result['latency_ms']}ms")
# Approximate downtime from DB
downtime = _approximate_downtime(site_name, now)
if downtime:
lines.append(f"Downtime: ~{downtime}")
else:
reason = format_detection_reason(result.get("detection_reason"))
if reason:
lines.append(f"Reason: {reason}")
if result.get("http_status"):
lines.append(f"HTTP status: {result['http_status']}")
lines.append(f"Checked: {timestamp}")
return "\n".join(lines)
def _approximate_downtime(site_name: str, now: datetime) -> str | None:
"""
Walk back through recent checks to find the last 'up' row and return
a human-readable duration string, e.g. '~45 min' or '~2 hr 10 min'.
"""
rows = fetch_checks_since(site_name, now - timedelta(days=1))
last_up_at = None
for checked_at, result, *_ in reversed(rows):
if result == "up":
last_up_at = datetime.fromisoformat(checked_at)
break
if last_up_at is None:
return None
delta = now - last_up_at
total_minutes = int(delta.total_seconds() // 60)
if total_minutes < 60:
return f"{total_minutes} min"
hours, mins = divmod(total_minutes, 60)
return f"{hours} hr {mins} min" if mins else f"{hours} hr"
intents = discord.Intents.default()
client = discord.Client(intents=intents)
tree = app_commands.CommandTree(client)
@@ -37,34 +160,51 @@ async def on_ready():
if session is None:
session = aiohttp.ClientSession(headers={"User-Agent": "VoteUptimeBot/1.0"})
# Seed state from the most recent DB row per site so we don't false-alert
# on restart.
for site in MONITORED_SITES:
rows = fetch_checks_since(
site["name"], datetime.now(timezone.utc) - timedelta(hours=1)
)
if rows:
last_known_state[site["name"]] = rows[-1][1] # result column
if not poll_sites.is_running():
poll_sites.start()
await tree.sync()
print(f"Logged in as {client.user}")
@tasks.loop(minutes=15)
@tasks.loop(minutes=settings.poll_interval_minutes)
async def poll_sites():
now = datetime.now(timezone.utc).isoformat()
now = datetime.now(timezone.utc)
now_iso = now.isoformat()
for site in MONITORED_SITES:
result = await check_site(session, site)
# Combine notes + detection_reason into the notes field so the DB
# schema doesn't need a migration. error_type already carries reason.
site_name = site["name"]
new_state = result["result"]
notes = result["notes"]
if result.get("detection_reason") and not notes:
notes = format_detection_reason(result["detection_reason"])
insert_check(
site_name=site["name"],
checked_at=now,
site_name=site_name,
checked_at=now_iso,
http_status=result["http_status"],
latency_ms=result["latency_ms"],
result=result["result"],
error_type=result["error_type"], # now carries detection_reason too
result=new_state,
error_type=result["error_type"],
notes=notes,
)
# --- Alert logic ---
prev_state = last_known_state.get(site_name)
last_known_state[site_name] = new_state
if prev_state is not None and prev_state != new_state:
await maybe_send_alert(site_name, prev_state, new_state, result, now)
@poll_sites.before_loop
async def before_poll_sites():

View File

@@ -39,15 +39,29 @@ class Settings(BaseSettings):
)
# Discord
discord_secret_key: str = ""
discord_secret_key: str
discord_client_id: str = ""
discord_client_secret: str = ""
# Database
database_path: str = "uptime.db"
# Polling — how often to check all sites (minimum 1 minute)
poll_interval_minutes: int = 15
# Alerts — set alert_channel_id to 0 to disable
alert_channel_id: int = 0
alert_cooldown_minutes: int = 30
# Sites — stored as a JSON array string in .env:
monitored_sites: list[SiteConfig] = []
@field_validator("poll_interval_minutes")
@classmethod
def poll_interval_must_be_positive(cls, v: int) -> int:
if v < 1:
raise ValueError("poll_interval_minutes must be at least 1")
return v
settings = Settings()