feat: 2.0.0-alpha.11 — DB-backed service registry + LAN auto-discovery
- monitored_services table (mig 015) replaces config/services.json (now a boot seed) - owner CRUD over /api/health/services; GET is DB-backed; cron+worker read the DB - discover.lan worker: pure-Node TCP sweep + HTTP-title probe -> disabled 'discovered' candidates (never clobbers curated entries); POST /api/health/discover + GET .../discovered - dashboard: Scan button + Discovered(N) section with one-click promote Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1,16 +1,20 @@
|
||||
import { Router } from 'express';
|
||||
import { z } from 'zod';
|
||||
import { asyncWrap } from '../errors.js';
|
||||
import { requireOwner } from '../cap.js';
|
||||
import { load, grouped, iconSlug } from '../../health/registry.js';
|
||||
import { validate } from '../validate.js';
|
||||
import { grouped, iconSlug } from '../../health/registry.js';
|
||||
import * as services from '../../db/repos/monitored_services.js';
|
||||
import * as statusRepo from '../../db/repos/service_status.js';
|
||||
import { enqueue } from '../../jobs/queue.js';
|
||||
|
||||
export const router = Router();
|
||||
|
||||
// GET /services — grouped health band (DB-backed registry + cached status).
|
||||
router.get('/services', asyncWrap(async (_req, res) => {
|
||||
const statuses = Object.fromEntries((await statusRepo.all()).map(s => [s.service_id, s]));
|
||||
const groups = grouped(load()).map(g => {
|
||||
const services = g.services.map(s => {
|
||||
const groups = grouped(await services.listEnabled()).map(g => {
|
||||
const list = g.services.map(s => {
|
||||
const st = statuses[s.id];
|
||||
return {
|
||||
id: s.id, name: s.name, host: s.host, url: s.url, icon: iconSlug(s),
|
||||
@@ -18,13 +22,53 @@ router.get('/services', asyncWrap(async (_req, res) => {
|
||||
detail: st?.detail || null, checked_at: st?.checked_at || null
|
||||
};
|
||||
});
|
||||
return { category: g.category, healthy: services.filter(s => s.status === 'ok').length,
|
||||
total: services.length, services };
|
||||
return { category: g.category, healthy: list.filter(s => s.status === 'ok').length, total: list.length, services: list };
|
||||
});
|
||||
res.json(groups);
|
||||
}));
|
||||
|
||||
router.post('/check', requireOwner, asyncWrap(async (_req, res) => {
|
||||
const id = await enqueue('health.check', {});
|
||||
res.status(202).json({ enqueued: id });
|
||||
// GET /services/discovered — candidates from a LAN scan, awaiting review (owner).
|
||||
router.get('/services/discovered', requireOwner, asyncWrap(async (_req, res) => {
|
||||
res.json((await services.listDiscovered()).map(s => ({ ...s, icon: iconSlug(s) })));
|
||||
}));
|
||||
|
||||
const checkCfg = z.object({ type: z.enum(['http', 'tcp']).optional(), path: z.string().max(200).optional() });
|
||||
const svcBody = z.object({
|
||||
id: z.string().min(1).max(64).regex(/^[a-z0-9-]+$/),
|
||||
name: z.string().min(1).max(120),
|
||||
category: z.enum(['agents', 'infrastructure', 'media', 'other']).default('other'),
|
||||
host: z.string().max(120).optional(),
|
||||
url: z.string().url(),
|
||||
icon: z.string().max(64).optional(),
|
||||
check: checkCfg.optional()
|
||||
});
|
||||
const patchBody = svcBody.omit({ id: true }).partial().extend({ enabled: z.boolean().optional() });
|
||||
const idParam = z.object({ id: z.string().regex(/^[a-z0-9-]+$/) });
|
||||
|
||||
// POST /services — add a manual service (owner).
|
||||
router.post('/services', requireOwner, validate({ body: svcBody }), asyncWrap(async (req, res) => {
|
||||
res.status(201).json(await services.create({ ...req.body, source: 'manual', enabled: true }));
|
||||
}));
|
||||
|
||||
// PATCH /services/:id — edit / enable (promote a discovered candidate) (owner).
|
||||
router.patch('/services/:id', requireOwner, validate({ params: idParam, body: patchBody }), asyncWrap(async (req, res) => {
|
||||
const updated = await services.update(req.params.id, req.body);
|
||||
if (!updated) return res.status(404).json({ error: { code: 'not_found', message: 'service not found' } });
|
||||
res.json(updated);
|
||||
}));
|
||||
|
||||
// DELETE /services/:id — remove (owner).
|
||||
router.delete('/services/:id', requireOwner, validate({ params: idParam }), asyncWrap(async (req, res) => {
|
||||
if (!(await services.remove(req.params.id))) return res.status(404).json({ error: { code: 'not_found' } });
|
||||
res.status(204).end();
|
||||
}));
|
||||
|
||||
// POST /check — immediate health pass (owner).
|
||||
router.post('/check', requireOwner, asyncWrap(async (_req, res) => {
|
||||
res.status(202).json({ enqueued: await enqueue('health.check', {}) });
|
||||
}));
|
||||
|
||||
// POST /discover — kick off a LAN discovery scan (owner).
|
||||
router.post('/discover', requireOwner, asyncWrap(async (_req, res) => {
|
||||
res.status(202).json({ enqueued: await enqueue('discover.lan', {}) });
|
||||
}));
|
||||
|
||||
@@ -2,9 +2,9 @@ import cron from 'node-cron';
|
||||
import { runSync } from './sync_source_docs.js';
|
||||
import { log } from '../log.js';
|
||||
import { enqueue } from '../jobs/queue.js';
|
||||
import { load } from '../health/registry.js';
|
||||
import { checkAll } from '../health/checker.js';
|
||||
import * as statusRepo from '../db/repos/service_status.js';
|
||||
import * as services from '../db/repos/monitored_services.js';
|
||||
|
||||
export function startCron() {
|
||||
// Daily at 03:00 local time
|
||||
@@ -29,7 +29,7 @@ export function startCron() {
|
||||
// Keep the two in sync — both rely on lib/health/checker.js as the source of truth.
|
||||
cron.schedule('*/1 * * * *', async () => {
|
||||
try {
|
||||
const results = await checkAll(load());
|
||||
const results = await checkAll(await services.listEnabled());
|
||||
for (const r of results) await statusRepo.upsert(r);
|
||||
log.info({ n: results.length }, 'health check complete');
|
||||
} catch (e) { log.error({ err: e }, 'health check failed'); }
|
||||
|
||||
20
lib/db/migrations/015_monitored_services.sql
Normal file
20
lib/db/migrations/015_monitored_services.sql
Normal file
@@ -0,0 +1,20 @@
|
||||
-- 015_monitored_services.sql
|
||||
-- DB-backed homelab service registry (replaces the hand-edited config/services.json).
|
||||
-- Instance-wide (NOT space-scoped — these are infra services, not knowledge resources).
|
||||
-- Live status stays in service_status, keyed by service_id = monitored_services.id.
|
||||
CREATE TABLE monitored_services (
|
||||
id text PRIMARY KEY, -- stable slug, e.g. 'gitea'
|
||||
name text NOT NULL,
|
||||
category text NOT NULL DEFAULT 'other',
|
||||
host text,
|
||||
url text NOT NULL,
|
||||
icon text,
|
||||
check_cfg jsonb NOT NULL DEFAULT '{}'::jsonb, -- {type:'http'|'tcp', path?:'/...'}
|
||||
source text NOT NULL DEFAULT 'manual'
|
||||
CHECK (source IN ('manual','discovered')),
|
||||
enabled boolean NOT NULL DEFAULT true,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
updated_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
-- Discovery reconciliation looks up by url to avoid re-adding an existing service.
|
||||
CREATE INDEX idx_monitored_services_url ON monitored_services (url);
|
||||
85
lib/db/repos/monitored_services.js
Normal file
85
lib/db/repos/monitored_services.js
Normal file
@@ -0,0 +1,85 @@
|
||||
import { pool } from '../pool.js';
|
||||
|
||||
const COLS = 'id, name, category, host, url, icon, check_cfg, source, enabled';
|
||||
|
||||
// Map a DB row to the service shape the registry/checker expect (check_cfg -> check).
|
||||
function toSvc(r) {
|
||||
return {
|
||||
id: r.id, name: r.name, category: r.category, host: r.host, url: r.url,
|
||||
icon: r.icon, check: r.check_cfg || {}, source: r.source, enabled: r.enabled
|
||||
};
|
||||
}
|
||||
|
||||
export async function listEnabled() {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT ${COLS} FROM monitored_services WHERE enabled ORDER BY category, name`);
|
||||
return rows.map(toSvc);
|
||||
}
|
||||
|
||||
export async function all() {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT ${COLS} FROM monitored_services ORDER BY category, name`);
|
||||
return rows.map(toSvc);
|
||||
}
|
||||
|
||||
// Discovered, not-yet-promoted candidates awaiting the owner's review.
|
||||
export async function listDiscovered() {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT ${COLS} FROM monitored_services WHERE source='discovered' AND NOT enabled ORDER BY name`);
|
||||
return rows.map(toSvc);
|
||||
}
|
||||
|
||||
export async function get(id) {
|
||||
const { rows: [r] } = await pool.query(
|
||||
`SELECT ${COLS} FROM monitored_services WHERE id=$1`, [id]);
|
||||
return r ? toSvc(r) : null;
|
||||
}
|
||||
|
||||
export async function count() {
|
||||
const { rows: [r] } = await pool.query(`SELECT count(*)::int AS n FROM monitored_services`);
|
||||
return r.n;
|
||||
}
|
||||
|
||||
export async function create(svc) {
|
||||
const { id, name, category = 'other', host = null, url, icon = null,
|
||||
check = {}, source = 'manual', enabled = true } = svc;
|
||||
const { rows: [r] } = await pool.query(
|
||||
`INSERT INTO monitored_services (id, name, category, host, url, icon, check_cfg, source, enabled)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8,$9) RETURNING ${COLS}`,
|
||||
[id, name, category, host, url, icon, JSON.stringify(check), source, enabled]);
|
||||
return toSvc(r);
|
||||
}
|
||||
|
||||
const PATCHABLE = ['name', 'category', 'host', 'url', 'icon', 'enabled'];
|
||||
export async function update(id, patch) {
|
||||
const sets = [], vals = [];
|
||||
for (const k of PATCHABLE) {
|
||||
if (patch[k] !== undefined) { vals.push(patch[k]); sets.push(`${k}=$${vals.length}`); }
|
||||
}
|
||||
if (patch.check !== undefined) { vals.push(JSON.stringify(patch.check)); sets.push(`check_cfg=$${vals.length}::jsonb`); }
|
||||
if (!sets.length) return get(id);
|
||||
vals.push(id);
|
||||
const { rows: [r] } = await pool.query(
|
||||
`UPDATE monitored_services SET ${sets.join(', ')}, updated_at=now() WHERE id=$${vals.length} RETURNING ${COLS}`,
|
||||
vals);
|
||||
return r ? toSvc(r) : null;
|
||||
}
|
||||
|
||||
export async function remove(id) {
|
||||
const { rowCount } = await pool.query(`DELETE FROM monitored_services WHERE id=$1`, [id]);
|
||||
return rowCount > 0;
|
||||
}
|
||||
|
||||
// Insert a discovered candidate (disabled, source='discovered') unless a service
|
||||
// with the same id OR url already exists — never clobbers a curated entry.
|
||||
export async function upsertDiscovered(svc) {
|
||||
const { id, name, category = 'other', host = null, url, icon = null, check = {} } = svc;
|
||||
const { rows: [r] } = await pool.query(
|
||||
`INSERT INTO monitored_services (id, name, category, host, url, icon, check_cfg, source, enabled)
|
||||
SELECT $1,$2,$3,$4,$5,$6,$7::jsonb,'discovered',false
|
||||
WHERE NOT EXISTS (SELECT 1 FROM monitored_services WHERE url=$5)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
RETURNING ${COLS}`,
|
||||
[id, name, category, host, url, icon, JSON.stringify(check)]);
|
||||
return r ? toSvc(r) : null; // null = already existed (skipped)
|
||||
}
|
||||
@@ -1,22 +1,18 @@
|
||||
import { readFileSync } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import * as repo from '../db/repos/monitored_services.js';
|
||||
|
||||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
const CONFIG = path.join(__dirname, '../../config/services.json');
|
||||
const SEED_FILE = path.join(__dirname, '../../config/services.json');
|
||||
export const CATEGORY_ORDER = ['agents', 'infrastructure', 'media', 'other'];
|
||||
|
||||
let cache = null;
|
||||
export function load() {
|
||||
if (!cache) cache = JSON.parse(readFileSync(CONFIG, 'utf8'));
|
||||
return cache;
|
||||
}
|
||||
export function _reset() { cache = null; } // tests
|
||||
|
||||
// Icon slug: explicit `icon`, else slugified name. Pure.
|
||||
export function iconSlug(svc) {
|
||||
return (svc.icon || svc.name).toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/(^-|-$)/g, '');
|
||||
}
|
||||
|
||||
// Group services by category in CATEGORY_ORDER (unknown categories last). Pure.
|
||||
export function grouped(services) {
|
||||
const map = new Map();
|
||||
for (const s of services) {
|
||||
@@ -28,3 +24,18 @@ export function grouped(services) {
|
||||
.filter(c => map.has(c))
|
||||
.map(category => ({ category, services: map.get(category) }));
|
||||
}
|
||||
|
||||
// One-time bootstrap: if the registry table is empty, populate it from the
|
||||
// version-controlled config/services.json seed. Idempotent (no-op once seeded).
|
||||
export async function seedFromConfig() {
|
||||
if ((await repo.count()) > 0) return 0;
|
||||
let seed;
|
||||
try { seed = JSON.parse(readFileSync(SEED_FILE, 'utf8')); }
|
||||
catch { return 0; }
|
||||
let n = 0;
|
||||
for (const s of seed) {
|
||||
try { await repo.create({ ...s, source: 'manual', enabled: true }); n++; }
|
||||
catch { /* skip a bad/duplicate seed row */ }
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
@@ -6,8 +6,9 @@ import * as embed from './workers/embed.js';
|
||||
import * as karakeep from './workers/karakeep.js';
|
||||
import * as speedtest from './workers/speedtest.js';
|
||||
import * as healthCheck from './workers/health_check.js';
|
||||
import * as discover from './workers/discover.js';
|
||||
|
||||
const WORKERS = [echo, url, blob, embed, karakeep, speedtest, healthCheck];
|
||||
const WORKERS = [echo, url, blob, embed, karakeep, speedtest, healthCheck, discover];
|
||||
|
||||
export async function registerWorkers() {
|
||||
for (const w of WORKERS) {
|
||||
|
||||
72
lib/jobs/workers/discover.js
Normal file
72
lib/jobs/workers/discover.js
Normal file
@@ -0,0 +1,72 @@
|
||||
import net from 'node:net';
|
||||
import * as services from '../../db/repos/monitored_services.js';
|
||||
import { log } from '../../log.js';
|
||||
|
||||
export const NAME = 'discover.lan';
|
||||
|
||||
// Common homelab web/service ports to probe.
|
||||
const PORTS = [80, 81, 443, 2424, 3000, 3001, 5000, 5055, 6767, 6875, 7878, 8000,
|
||||
8006, 8080, 8081, 8096, 8123, 8265, 8384, 8443, 8989, 9000, 9090, 9696, 11434, 19999, 32400, 60072];
|
||||
const HTTPS_PORTS = new Set([443, 8443, 8006]);
|
||||
|
||||
function tcpOpen(host, port, timeoutMs = 350) {
|
||||
return new Promise(resolve => {
|
||||
const sock = net.connect({ host, port });
|
||||
let done = false;
|
||||
const finish = (ok) => { if (!done) { done = true; sock.destroy(); resolve(ok); } };
|
||||
sock.setTimeout(timeoutMs);
|
||||
sock.on('connect', () => finish(true));
|
||||
sock.on('timeout', () => finish(false));
|
||||
sock.on('error', () => finish(false));
|
||||
});
|
||||
}
|
||||
|
||||
async function httpTitle(url) {
|
||||
try {
|
||||
const res = await fetch(url, { redirect: 'manual', signal: AbortSignal.timeout(2500) });
|
||||
let title = '';
|
||||
if (res.status >= 200 && res.status < 400) {
|
||||
const html = await res.text().catch(() => '');
|
||||
const m = html.match(/<title>([^<]{1,80})/i);
|
||||
title = m ? m[1].trim().replace(/\s+/g, ' ') : '';
|
||||
}
|
||||
return { code: res.status, title };
|
||||
} catch { return null; }
|
||||
}
|
||||
|
||||
// Test seam.
|
||||
let _tcp = tcpOpen, _http = httpTitle;
|
||||
export function _setProbes({ tcp, http } = {}) { _tcp = tcp || tcpOpen; _http = http || httpTitle; }
|
||||
|
||||
async function mapPool(items, concurrency, fn) {
|
||||
const out = new Array(items.length);
|
||||
let i = 0;
|
||||
await Promise.all(Array.from({ length: Math.min(concurrency, items.length) }, async () => {
|
||||
while (i < items.length) { const idx = i++; out[idx] = await fn(items[idx]); }
|
||||
}));
|
||||
return out;
|
||||
}
|
||||
|
||||
export async function handler(job) {
|
||||
const subnet = job?.data?.subnet || process.env.DISCOVER_SUBNET || '192.168.1';
|
||||
const targets = [];
|
||||
for (let h = 1; h <= 254; h++) for (const port of PORTS) targets.push({ host: `${subnet}.${h}`, port });
|
||||
|
||||
// 1) TCP sweep → live host:ports
|
||||
const open = (await mapPool(targets, 120, async (t) => (await _tcp(t.host, t.port)) ? t : null)).filter(Boolean);
|
||||
|
||||
// 2) HTTP-probe each, build + upsert discovered candidates (no-clobber in the repo)
|
||||
let added = 0;
|
||||
for (const { host, port } of open) {
|
||||
const scheme = HTTPS_PORTS.has(port) ? 'https' : 'http';
|
||||
const url = `${scheme}://${host}:${port}`;
|
||||
const probe = await _http(url);
|
||||
const name = (probe && probe.title) || `${host}:${port}`;
|
||||
const id = `disc-${host.replace(/\./g, '-')}-${port}`;
|
||||
const check = scheme === 'https' ? { type: 'tcp' } : { type: 'http' };
|
||||
const r = await services.upsertDiscovered({ id, name, category: 'other', host, url, check });
|
||||
if (r) added++;
|
||||
}
|
||||
log.info({ open: open.length, added }, 'lan discovery complete');
|
||||
return { open: open.length, added };
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
import { load } from '../../health/registry.js';
|
||||
import { checkAll } from '../../health/checker.js';
|
||||
import * as statusRepo from '../../db/repos/service_status.js';
|
||||
import * as services from '../../db/repos/monitored_services.js';
|
||||
export const NAME = 'health.check';
|
||||
export async function handler(_job) {
|
||||
const results = await checkAll(load());
|
||||
const results = await checkAll(await services.listEnabled());
|
||||
for (const r of results) await statusRepo.upsert(r);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user