feat(speedtest): full speedtest-tracker-style automation (2.9.0)
Switch worker to the Ookla CLI (jitter, packet loss, server, ISP, shareable result URL, bytes). Migration 028 enriches speedtest_results + adds a generic app_settings store. New /speedtest page: KPIs, throughput + latency charts, window stats, configurable schedule (reschedulable cron) & low-speed alert threshold, history table. SV card gains ping/jitter + a link through to the page. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,39 @@
|
||||
import { Router } from 'express';
|
||||
import { z } from 'zod';
|
||||
import { asyncWrap } from '../errors.js';
|
||||
import { requireOwner } from '../cap.js';
|
||||
import { validate } from '../validate.js';
|
||||
import * as repo from '../../db/repos/speedtest.js';
|
||||
import * as settings from '../../db/repos/app_settings.js';
|
||||
import { enqueue } from '../../jobs/queue.js';
|
||||
import { setSpeedtestSchedule } from '../../cron/index.js';
|
||||
export const router = Router();
|
||||
router.get('/history', asyncWrap(async (_req, res) => res.json(await repo.history(30))));
|
||||
router.post('/run', requireOwner, asyncWrap(async (_req, res) => {
|
||||
const id = await enqueue('speedtest', {});
|
||||
res.status(202).json({ enqueued: id });
|
||||
|
||||
const DEFAULT_CFG = { interval_min: 60, threshold_down_mbps: 0 };
|
||||
async function getCfg() { return { ...DEFAULT_CFG, ...(await settings.get('speedtest', {})) }; }
|
||||
|
||||
router.get('/history', asyncWrap(async (req, res) =>
|
||||
res.json(await repo.history(Math.min(500, Number(req.query.limit) || 30)))));
|
||||
|
||||
router.get('/results', asyncWrap(async (req, res) =>
|
||||
res.json(await repo.range(Math.min(2160, Number(req.query.hours) || 168), 2000))));
|
||||
|
||||
router.get('/latest', asyncWrap(async (_req, res) => res.json(await repo.latest())));
|
||||
|
||||
router.get('/stats', asyncWrap(async (req, res) =>
|
||||
res.json(await repo.stats(Math.min(2160, Number(req.query.hours) || 24)))));
|
||||
|
||||
router.get('/config', asyncWrap(async (_req, res) => res.json(await getCfg())));
|
||||
|
||||
const cfgBody = z.object({
|
||||
interval_min: z.number().int().min(5).max(1440),
|
||||
threshold_down_mbps: z.number().min(0).max(100000).default(0)
|
||||
});
|
||||
router.put('/config', requireOwner, validate({ body: cfgBody }), asyncWrap(async (req, res) => {
|
||||
const cfg = await settings.set('speedtest', req.body);
|
||||
setSpeedtestSchedule(cfg.interval_min);
|
||||
res.json(cfg);
|
||||
}));
|
||||
|
||||
router.post('/run', requireOwner, asyncWrap(async (_req, res) =>
|
||||
res.status(202).json({ enqueued: await enqueue('speedtest', {}) })));
|
||||
|
||||
@@ -6,6 +6,26 @@ import { checkAll } from '../health/checker.js';
|
||||
import * as statusRepo from '../db/repos/service_status.js';
|
||||
import * as services from '../db/repos/monitored_services.js';
|
||||
import { runDeviceScanCycle } from '../infra/scan_cycle.js';
|
||||
import * as settings from '../db/repos/app_settings.js';
|
||||
|
||||
// Speedtest runs on a user-configurable interval (PUT /api/speedtest/config →
|
||||
// setSpeedtestSchedule). Held module-level so it can be stopped + rescheduled.
|
||||
let speedtestTask = null;
|
||||
function speedtestExpr(min) {
|
||||
if (min < 60) return `*/${min} * * * *`;
|
||||
if (min % 60 === 0) { const h = min / 60; return h >= 24 ? '0 2 * * *' : `0 */${h} * * *`; }
|
||||
return '0 * * * *';
|
||||
}
|
||||
export function setSpeedtestSchedule(min) {
|
||||
const m = Math.max(5, Math.min(1440, Number(min) || 60));
|
||||
if (speedtestTask) { speedtestTask.stop(); speedtestTask = null; }
|
||||
const expr = speedtestExpr(m);
|
||||
speedtestTask = cron.schedule(expr, async () => {
|
||||
try { await enqueue('speedtest', {}); log.info({ expr }, 'cron speedtest enqueued'); }
|
||||
catch (e) { log.error({ err: e }, 'cron speedtest failed'); }
|
||||
});
|
||||
log.info({ expr, min: m }, 'speedtest schedule set');
|
||||
}
|
||||
|
||||
export function startCron() {
|
||||
// Daily at 03:00 local time
|
||||
@@ -18,11 +38,10 @@ export function startCron() {
|
||||
}
|
||||
});
|
||||
|
||||
// Hourly speedtest
|
||||
cron.schedule('0 * * * *', async () => {
|
||||
try { await enqueue('speedtest', {}); log.info('cron speedtest enqueued'); }
|
||||
catch (e) { log.error({ err: e }, 'cron speedtest failed'); }
|
||||
});
|
||||
// Speedtest — interval from the saved config (default 60 min), reschedulable.
|
||||
settings.get('speedtest', {})
|
||||
.then(cfg => setSpeedtestSchedule(cfg?.interval_min || 60))
|
||||
.catch(e => { log.error({ err: e }, 'speedtest schedule init failed'); setSpeedtestSchedule(60); });
|
||||
|
||||
// Health checks every minute. NOTE: this runs checkAll() inline; the same
|
||||
// probe+upsert logic is also exposed on-demand via the `health.check` pg-boss
|
||||
|
||||
22
lib/db/migrations/028_speedtest_metrics.sql
Normal file
22
lib/db/migrations/028_speedtest_metrics.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
-- 028_speedtest_metrics.sql
|
||||
-- Enrich speedtest results with the full Ookla metric set + a generic settings
|
||||
-- store (reused by the speedtest schedule and, later, theming).
|
||||
ALTER TABLE speedtest_results ALTER COLUMN down_mbps DROP NOT NULL;
|
||||
ALTER TABLE speedtest_results ALTER COLUMN up_mbps DROP NOT NULL;
|
||||
ALTER TABLE speedtest_results
|
||||
ADD COLUMN IF NOT EXISTS jitter_ms numeric,
|
||||
ADD COLUMN IF NOT EXISTS packet_loss numeric,
|
||||
ADD COLUMN IF NOT EXISTS server_name text,
|
||||
ADD COLUMN IF NOT EXISTS server_id text,
|
||||
ADD COLUMN IF NOT EXISTS isp text,
|
||||
ADD COLUMN IF NOT EXISTS result_url text,
|
||||
ADD COLUMN IF NOT EXISTS down_bytes bigint,
|
||||
ADD COLUMN IF NOT EXISTS up_bytes bigint,
|
||||
ADD COLUMN IF NOT EXISTS ok boolean NOT NULL DEFAULT true,
|
||||
ADD COLUMN IF NOT EXISTS error text;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS app_settings (
|
||||
key text PRIMARY KEY,
|
||||
value jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
17
lib/db/repos/app_settings.js
Normal file
17
lib/db/repos/app_settings.js
Normal file
@@ -0,0 +1,17 @@
|
||||
import { pool } from '../pool.js';
|
||||
|
||||
// Generic owner-scoped key→jsonb settings store. Used by the speedtest schedule
|
||||
// and (later) the theming panel. Keep values small + JSON-serialisable.
|
||||
export async function get(key, fallback = null) {
|
||||
const { rows } = await pool.query(`SELECT value FROM app_settings WHERE key = $1`, [key]);
|
||||
return rows[0] ? rows[0].value : fallback;
|
||||
}
|
||||
|
||||
export async function set(key, value) {
|
||||
const { rows } = await pool.query(
|
||||
`INSERT INTO app_settings (key, value, updated_at) VALUES ($1, $2::jsonb, now())
|
||||
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = now()
|
||||
RETURNING value`,
|
||||
[key, JSON.stringify(value)]);
|
||||
return rows[0].value;
|
||||
}
|
||||
@@ -1,12 +1,51 @@
|
||||
import { pool } from '../pool.js';
|
||||
export async function record({ down_mbps, up_mbps, ping_ms = null }) {
|
||||
|
||||
export async function record(r = {}) {
|
||||
const { rows } = await pool.query(
|
||||
`INSERT INTO speedtest_results (down_mbps, up_mbps, ping_ms) VALUES ($1,$2,$3) RETURNING *`,
|
||||
[down_mbps, up_mbps, ping_ms]);
|
||||
`INSERT INTO speedtest_results
|
||||
(down_mbps, up_mbps, ping_ms, jitter_ms, packet_loss, server_name, server_id,
|
||||
isp, result_url, down_bytes, up_bytes, ok, error)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) RETURNING *`,
|
||||
[r.down_mbps ?? null, r.up_mbps ?? null, r.ping_ms ?? null, r.jitter_ms ?? null,
|
||||
r.packet_loss ?? null, r.server_name ?? null, r.server_id ?? null, r.isp ?? null,
|
||||
r.result_url ?? null, r.down_bytes ?? null, r.up_bytes ?? null, r.ok ?? true, r.error ?? null]);
|
||||
return rows[0];
|
||||
}
|
||||
|
||||
export async function history(limit = 30) {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT * FROM speedtest_results ORDER BY ran_at DESC LIMIT $1`, [limit]);
|
||||
return rows;
|
||||
}
|
||||
|
||||
// Rows within the last N hours (ascending for charting), capped.
|
||||
export async function range(hours = 168, limit = 1000) {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT * FROM (
|
||||
SELECT * FROM speedtest_results
|
||||
WHERE ran_at >= now() - ($1 || ' hours')::interval
|
||||
ORDER BY ran_at DESC LIMIT $2
|
||||
) t ORDER BY ran_at ASC`, [hours, limit]);
|
||||
return rows;
|
||||
}
|
||||
|
||||
export async function latest() {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT * FROM speedtest_results WHERE ok ORDER BY ran_at DESC LIMIT 1`);
|
||||
return rows[0] || null;
|
||||
}
|
||||
|
||||
export async function stats(hours = 24) {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT count(*) FILTER (WHERE ok) AS n,
|
||||
count(*) FILTER (WHERE NOT ok) AS failures,
|
||||
avg(down_mbps) FILTER (WHERE ok) AS avg_down,
|
||||
min(down_mbps) FILTER (WHERE ok) AS min_down,
|
||||
max(down_mbps) FILTER (WHERE ok) AS max_down,
|
||||
avg(up_mbps) FILTER (WHERE ok) AS avg_up,
|
||||
avg(ping_ms) FILTER (WHERE ok) AS avg_ping,
|
||||
max(ping_ms) FILTER (WHERE ok) AS max_ping
|
||||
FROM speedtest_results
|
||||
WHERE ran_at >= now() - ($1 || ' hours')::interval`, [hours]);
|
||||
return rows[0];
|
||||
}
|
||||
|
||||
@@ -6,18 +6,42 @@ const pexec = promisify(execFile);
|
||||
|
||||
export const NAME = 'speedtest';
|
||||
|
||||
// Default runner uses speedtest-cli --json (bits/s → Mbps). Swap binary/flags
|
||||
// here if the box has the Ookla `speedtest -f json` CLI instead.
|
||||
async function defaultRunner() {
|
||||
const { stdout } = await pexec('speedtest-cli', ['--json'], { timeout: 120000 });
|
||||
// Ookla CLI gives the full metric set (jitter, packet loss, server, ISP,
|
||||
// shareable result URL). Override the binary via SPEEDTEST_BIN if needed.
|
||||
const OOKLA_BIN = process.env.SPEEDTEST_BIN || 'ookla-speedtest';
|
||||
|
||||
async function ooklaRunner() {
|
||||
const { stdout } = await pexec(OOKLA_BIN,
|
||||
['-f', 'json', '--accept-license', '--accept-gdpr'], { timeout: 120000 });
|
||||
const j = JSON.parse(stdout);
|
||||
return { down_mbps: j.download / 1e6, up_mbps: j.upload / 1e6, ping_ms: j.ping };
|
||||
const mbps = bw => (Number(bw) || 0) * 8 / 1e6; // Ookla bandwidth is bytes/s
|
||||
return {
|
||||
down_mbps: mbps(j.download?.bandwidth),
|
||||
up_mbps: mbps(j.upload?.bandwidth),
|
||||
ping_ms: j.ping?.latency ?? null,
|
||||
jitter_ms: j.ping?.jitter ?? null,
|
||||
packet_loss: j.packetLoss ?? null,
|
||||
server_name: j.server ? [j.server.name, j.server.location].filter(Boolean).join(' · ') : null,
|
||||
server_id: j.server?.id != null ? String(j.server.id) : null,
|
||||
isp: j.isp ?? null,
|
||||
result_url: j.result?.url ?? null,
|
||||
down_bytes: j.download?.bytes ?? null,
|
||||
up_bytes: j.upload?.bytes ?? null,
|
||||
ok: true
|
||||
};
|
||||
}
|
||||
let runner = defaultRunner;
|
||||
let runner = ooklaRunner;
|
||||
export function _setRunner(fn) { runner = fn; }
|
||||
|
||||
export async function handler(_job) {
|
||||
const r = await runner();
|
||||
await repo.record(r);
|
||||
log.info(r, 'speedtest recorded');
|
||||
try {
|
||||
const r = await runner();
|
||||
const saved = await repo.record(r);
|
||||
log.info({ down: r.down_mbps, up: r.up_mbps, ping: r.ping_ms }, 'speedtest recorded');
|
||||
return saved;
|
||||
} catch (e) {
|
||||
await repo.record({ ok: false, error: String(e?.message || e).slice(0, 300) });
|
||||
log.error({ err: e }, 'speedtest failed');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user