diff --git a/lib/cron/index.js b/lib/cron/index.js new file mode 100644 index 0000000..ce68010 --- /dev/null +++ b/lib/cron/index.js @@ -0,0 +1,16 @@ +import cron from 'node-cron'; +import { runSync } from './sync_source_docs.js'; +import { log } from '../log.js'; + +export function startCron() { + // Daily at 03:00 local time + cron.schedule('0 3 * * *', async () => { + try { + const n = await runSync(); + log.info({ enqueued: n }, 'cron sync.source_doc complete'); + } catch (e) { + log.error({ err: e }, 'cron sync.source_doc failed'); + } + }); + log.info('cron started'); +} diff --git a/lib/cron/sync_source_docs.js b/lib/cron/sync_source_docs.js new file mode 100644 index 0000000..6a5d45e --- /dev/null +++ b/lib/cron/sync_source_docs.js @@ -0,0 +1,12 @@ +import { pool } from '../db/pool.js'; +import * as queue from '../jobs/queue.js'; + +export async function runSync() { + const { rows } = await pool.query( + `SELECT id FROM source_docs WHERE sync_source = 'url'` + ); + for (const r of rows) { + await queue.enqueue('sync.source_doc', { source_doc_id: r.id }); + } + return rows.length; +} diff --git a/package-lock.json b/package-lock.json index b705c41..83c0a50 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "void-server", - "version": "2.0.0-alpha.2", + "version": "2.0.0-alpha.3", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "void-server", - "version": "2.0.0-alpha.2", + "version": "2.0.0-alpha.3", "dependencies": { "@mozilla/readability": "^0.6.0", "bcrypt": "^6.0.0", @@ -16,6 +16,7 @@ "jsdom": "^29.1.1", "marked": "^18.0.4", "multer": "^2.1.1", + "node-cron": "^3.0.3", "pg": "^8.21.0", "pg-boss": "^10.4.2", "pino": "^10.3.1", @@ -2431,6 +2432,18 @@ "node": "^18 || ^20 || >= 21" } }, + "node_modules/node-cron": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz", + "integrity": "sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==", + "license": "ISC", + "dependencies": { + "uuid": "8.3.2" + }, + "engines": { + "node": ">=6.0.0" + } + }, "node_modules/node-gyp-build": { "version": "4.8.4", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz", @@ -3509,6 +3522,16 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "license": "MIT" }, + "node_modules/uuid": { + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", + "deprecated": "uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028).", + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", diff --git a/package.json b/package.json index 9c3d9c8..a2b955f 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "jsdom": "^29.1.1", "marked": "^18.0.4", "multer": "^2.1.1", + "node-cron": "^3.0.3", "pg": "^8.21.0", "pg-boss": "^10.4.2", "pino": "^10.3.1", diff --git a/server.js b/server.js index a70234b..5c3e9f2 100644 --- a/server.js +++ b/server.js @@ -6,6 +6,7 @@ import { mountApi } from './lib/api/index.js'; import * as queue from './lib/jobs/queue.js'; import { registerWorkers } from './lib/jobs/index.js'; import { router as ingestRouter } from './lib/api/routes/ingest.js'; +import { startCron } from './lib/cron/index.js'; const VERSION = '2.0.0-alpha.3'; @@ -51,6 +52,7 @@ if (import.meta.url === `file://${process.argv[1]}`) { .then(registerWorkers) .then(() => log.info('job queue ready')) .catch(err => log.error({ err }, 'queue boot failed')); + startCron(); app.listen(port, () => log.info({ port }, 'void-server listening')); for (const sig of ['SIGTERM', 'SIGINT']) { process.on(sig, async () => { diff --git a/tests/cron/sync_source_docs.test.js b/tests/cron/sync_source_docs.test.js new file mode 100644 index 0000000..6e53d1b --- /dev/null +++ b/tests/cron/sync_source_docs.test.js @@ -0,0 +1,51 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../helpers/db.js'; +import { migrateUp } from '../../lib/db/migrate.js'; +import { stopBoss } from '../helpers/boss.js'; +import { pool } from '../../lib/db/pool.js'; +import * as queue from '../../lib/jobs/queue.js'; +import { registerWorkers } from '../../lib/jobs/index.js'; +import { runSync } from '../../lib/cron/sync_source_docs.js'; +import * as jobs from '../../lib/db/repos/jobs.js'; + +beforeEach(async () => { + await resetDb(); await migrateUp(); + await queue.start(); await registerWorkers(); +}); +afterEach(async () => { await stopBoss(); }); + +describe('cron/sync_source_docs.runSync', () => { + it('enqueues sync.source_doc for each url-synced row', async () => { + const sp = (await pool.query( + `INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id` + )).rows[0].id; + const res = (await pool.query( + `INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`, + [sp] + )).rows[0].id; + await pool.query( + `INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`, + [res] + ); + const enqueued = await runSync(); + expect(enqueued).toBe(1); + const queued = await jobs.list({ name: 'sync.source_doc' }); + expect(queued.length).toBe(1); + }); + + it('skips rows without sync_source=url', async () => { + const sp = (await pool.query( + `INSERT INTO spaces(slug, name) VALUES('s2','S2') RETURNING id` + )).rows[0].id; + const res = (await pool.query( + `INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`, + [sp] + )).rows[0].id; + await pool.query( + `INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','manual')`, + [res] + ); + const enqueued = await runSync(); + expect(enqueued).toBe(0); + }); +});