feat(cron): daily sync.source_doc enqueue
node-cron schedules runSync at 03:00 local time; runSync enqueues sync.source_doc for every source_docs row with sync_source='url'. Started from server.js's CLI gate alongside the job queue. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
16
lib/cron/index.js
Normal file
16
lib/cron/index.js
Normal file
@@ -0,0 +1,16 @@
|
||||
import cron from 'node-cron';
|
||||
import { runSync } from './sync_source_docs.js';
|
||||
import { log } from '../log.js';
|
||||
|
||||
export function startCron() {
|
||||
// Daily at 03:00 local time
|
||||
cron.schedule('0 3 * * *', async () => {
|
||||
try {
|
||||
const n = await runSync();
|
||||
log.info({ enqueued: n }, 'cron sync.source_doc complete');
|
||||
} catch (e) {
|
||||
log.error({ err: e }, 'cron sync.source_doc failed');
|
||||
}
|
||||
});
|
||||
log.info('cron started');
|
||||
}
|
||||
12
lib/cron/sync_source_docs.js
Normal file
12
lib/cron/sync_source_docs.js
Normal file
@@ -0,0 +1,12 @@
|
||||
import { pool } from '../db/pool.js';
|
||||
import * as queue from '../jobs/queue.js';
|
||||
|
||||
export async function runSync() {
|
||||
const { rows } = await pool.query(
|
||||
`SELECT id FROM source_docs WHERE sync_source = 'url'`
|
||||
);
|
||||
for (const r of rows) {
|
||||
await queue.enqueue('sync.source_doc', { source_doc_id: r.id });
|
||||
}
|
||||
return rows.length;
|
||||
}
|
||||
27
package-lock.json
generated
27
package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "void-server",
|
||||
"version": "2.0.0-alpha.2",
|
||||
"version": "2.0.0-alpha.3",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "void-server",
|
||||
"version": "2.0.0-alpha.2",
|
||||
"version": "2.0.0-alpha.3",
|
||||
"dependencies": {
|
||||
"@mozilla/readability": "^0.6.0",
|
||||
"bcrypt": "^6.0.0",
|
||||
@@ -16,6 +16,7 @@
|
||||
"jsdom": "^29.1.1",
|
||||
"marked": "^18.0.4",
|
||||
"multer": "^2.1.1",
|
||||
"node-cron": "^3.0.3",
|
||||
"pg": "^8.21.0",
|
||||
"pg-boss": "^10.4.2",
|
||||
"pino": "^10.3.1",
|
||||
@@ -2431,6 +2432,18 @@
|
||||
"node": "^18 || ^20 || >= 21"
|
||||
}
|
||||
},
|
||||
"node_modules/node-cron": {
|
||||
"version": "3.0.3",
|
||||
"resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz",
|
||||
"integrity": "sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"uuid": "8.3.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=6.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/node-gyp-build": {
|
||||
"version": "4.8.4",
|
||||
"resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz",
|
||||
@@ -3509,6 +3522,16 @@
|
||||
"integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==",
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/uuid": {
|
||||
"version": "8.3.2",
|
||||
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
|
||||
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
|
||||
"deprecated": "uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028).",
|
||||
"license": "MIT",
|
||||
"bin": {
|
||||
"uuid": "dist/bin/uuid"
|
||||
}
|
||||
},
|
||||
"node_modules/vary": {
|
||||
"version": "1.1.2",
|
||||
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
"jsdom": "^29.1.1",
|
||||
"marked": "^18.0.4",
|
||||
"multer": "^2.1.1",
|
||||
"node-cron": "^3.0.3",
|
||||
"pg": "^8.21.0",
|
||||
"pg-boss": "^10.4.2",
|
||||
"pino": "^10.3.1",
|
||||
|
||||
@@ -6,6 +6,7 @@ import { mountApi } from './lib/api/index.js';
|
||||
import * as queue from './lib/jobs/queue.js';
|
||||
import { registerWorkers } from './lib/jobs/index.js';
|
||||
import { router as ingestRouter } from './lib/api/routes/ingest.js';
|
||||
import { startCron } from './lib/cron/index.js';
|
||||
|
||||
const VERSION = '2.0.0-alpha.3';
|
||||
|
||||
@@ -51,6 +52,7 @@ if (import.meta.url === `file://${process.argv[1]}`) {
|
||||
.then(registerWorkers)
|
||||
.then(() => log.info('job queue ready'))
|
||||
.catch(err => log.error({ err }, 'queue boot failed'));
|
||||
startCron();
|
||||
app.listen(port, () => log.info({ port }, 'void-server listening'));
|
||||
for (const sig of ['SIGTERM', 'SIGINT']) {
|
||||
process.on(sig, async () => {
|
||||
|
||||
51
tests/cron/sync_source_docs.test.js
Normal file
51
tests/cron/sync_source_docs.test.js
Normal file
@@ -0,0 +1,51 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import { resetDb } from '../helpers/db.js';
|
||||
import { migrateUp } from '../../lib/db/migrate.js';
|
||||
import { stopBoss } from '../helpers/boss.js';
|
||||
import { pool } from '../../lib/db/pool.js';
|
||||
import * as queue from '../../lib/jobs/queue.js';
|
||||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||||
import { runSync } from '../../lib/cron/sync_source_docs.js';
|
||||
import * as jobs from '../../lib/db/repos/jobs.js';
|
||||
|
||||
beforeEach(async () => {
|
||||
await resetDb(); await migrateUp();
|
||||
await queue.start(); await registerWorkers();
|
||||
});
|
||||
afterEach(async () => { await stopBoss(); });
|
||||
|
||||
describe('cron/sync_source_docs.runSync', () => {
|
||||
it('enqueues sync.source_doc for each url-synced row', async () => {
|
||||
const sp = (await pool.query(
|
||||
`INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id`
|
||||
)).rows[0].id;
|
||||
const res = (await pool.query(
|
||||
`INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`,
|
||||
[sp]
|
||||
)).rows[0].id;
|
||||
await pool.query(
|
||||
`INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`,
|
||||
[res]
|
||||
);
|
||||
const enqueued = await runSync();
|
||||
expect(enqueued).toBe(1);
|
||||
const queued = await jobs.list({ name: 'sync.source_doc' });
|
||||
expect(queued.length).toBe(1);
|
||||
});
|
||||
|
||||
it('skips rows without sync_source=url', async () => {
|
||||
const sp = (await pool.query(
|
||||
`INSERT INTO spaces(slug, name) VALUES('s2','S2') RETURNING id`
|
||||
)).rows[0].id;
|
||||
const res = (await pool.query(
|
||||
`INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`,
|
||||
[sp]
|
||||
)).rows[0].id;
|
||||
await pool.query(
|
||||
`INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','manual')`,
|
||||
[res]
|
||||
);
|
||||
const enqueued = await runSync();
|
||||
expect(enqueued).toBe(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user