Replaces FTS-only /api/search in place. RRF (k=60) fuses ts_rank and pgvector cosine distance rankings. Vector branch silently skipped when Ollama times out / errors, keeping search snappy and resilient. Messages have no embeddings in Plan 3, so they participate in the FTS branch only. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
170 lines
6.3 KiB
JavaScript
170 lines
6.3 KiB
JavaScript
import { pool } from '../pool.js';
|
|
import { embedText, padTo } from '../../ai/ollama.js';
|
|
import { log } from '../../log.js';
|
|
|
|
// Hybrid FTS + vector search with reciprocal-rank fusion.
|
|
//
|
|
// - FTS branch: tsvector @@ plainto_tsquery on pages / refs / source_docs /
|
|
// messages, unioned with a `kind` discriminator. Each expression matches
|
|
// the corresponding GIN index in migrations 002 / 003 / 004.
|
|
// - Vector branch: pgvector cosine distance against the entity's embedding
|
|
// column. The query is embedded via Ollama (nomic-embed-text, padded
|
|
// 1024-d). Messages are not embedded in Plan 3 — skipped in the vector
|
|
// branch.
|
|
// - Graceful fallback: if Ollama times out or errors, the vector branch is
|
|
// dropped silently and search continues as FTS-only.
|
|
// - RRF fuses the two rank lists with k=60 (Cormack et al. canonical).
|
|
|
|
const PAGES_TSV = `to_tsvector('english', p.title || ' ' || coalesce(p.body_md,''))`;
|
|
const REFS_TSV = `to_tsvector('english', coalesce(r.title,'') || ' ' || coalesce(r.summary,'') || ' ' || coalesce(r.body_text,''))`;
|
|
const SD_TSV = `to_tsvector('english', sd.name || ' ' || coalesce(sd.body_text,''))`;
|
|
const MSG_TSV = `to_tsvector('english', m.body)`;
|
|
|
|
// ----- FTS branch (returns flat row list ordered by ts_rank) ----------------
|
|
|
|
function ftsBranches({ kinds, spaceFilterPresent }) {
|
|
const want = k => !kinds || kinds.includes(k);
|
|
const out = [];
|
|
if (want('page')) out.push(`
|
|
SELECT 'page'::text AS kind, p.id::text, p.space_id, p.title AS title_or_snippet,
|
|
ts_rank(${PAGES_TSV}, q.tsq) AS rank
|
|
FROM pages p, q
|
|
WHERE ${PAGES_TSV} @@ q.tsq
|
|
AND ($2::uuid IS NULL OR p.space_id = $2)`);
|
|
if (want('ref')) out.push(`
|
|
SELECT 'ref'::text, r.id::text, r.space_id,
|
|
coalesce(r.title, r.source_url, '(untitled)'),
|
|
ts_rank(${REFS_TSV}, q.tsq)
|
|
FROM refs r, q
|
|
WHERE ${REFS_TSV} @@ q.tsq
|
|
AND ($2::uuid IS NULL OR r.space_id = $2)`);
|
|
if (want('source_doc')) out.push(`
|
|
SELECT 'source_doc'::text, sd.id::text, res.space_id, sd.name,
|
|
ts_rank(${SD_TSV}, q.tsq)
|
|
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id, q
|
|
WHERE ${SD_TSV} @@ q.tsq
|
|
AND ($2::uuid IS NULL OR res.space_id = $2)`);
|
|
if (want('message') && !spaceFilterPresent) out.push(`
|
|
SELECT 'message'::text, m.id::text, NULL::uuid, substring(m.body, 1, 200),
|
|
ts_rank(${MSG_TSV}, q.tsq)
|
|
FROM messages m, q
|
|
WHERE ${MSG_TSV} @@ q.tsq`);
|
|
return out;
|
|
}
|
|
|
|
async function ftsRows({ q, space_id, kinds, perBranchLimit }) {
|
|
const branches = ftsBranches({ kinds, spaceFilterPresent: space_id != null });
|
|
if (!branches.length) return [];
|
|
const sql = `WITH q AS (SELECT plainto_tsquery('english', $1) AS tsq)
|
|
SELECT * FROM (${branches.join('\n UNION ALL ')}) u
|
|
ORDER BY rank DESC LIMIT $3`;
|
|
const { rows } = await pool.query(sql, [q, space_id, perBranchLimit]);
|
|
return rows;
|
|
}
|
|
|
|
// ----- Vector branch (runs one ANN query per kind, concats results) --------
|
|
|
|
async function vectorRows({ qvec, space_id, kinds, perBranchLimit }) {
|
|
const want = k => !kinds || kinds.includes(k);
|
|
const literal = '[' + qvec.join(',') + ']';
|
|
const all = [];
|
|
|
|
if (want('page')) {
|
|
const { rows } = await pool.query(`
|
|
SELECT 'page'::text AS kind, p.id::text, p.space_id, p.title AS title_or_snippet,
|
|
(p.embedding <=> $1::vector) AS dist
|
|
FROM pages p
|
|
WHERE p.embedding IS NOT NULL
|
|
AND ($2::uuid IS NULL OR p.space_id = $2)
|
|
ORDER BY p.embedding <=> $1::vector LIMIT $3
|
|
`, [literal, space_id, perBranchLimit]);
|
|
all.push(...rows);
|
|
}
|
|
if (want('ref')) {
|
|
const { rows } = await pool.query(`
|
|
SELECT 'ref'::text AS kind, r.id::text, r.space_id,
|
|
coalesce(r.title, r.source_url, '(untitled)') AS title_or_snippet,
|
|
(r.embedding <=> $1::vector) AS dist
|
|
FROM refs r
|
|
WHERE r.embedding IS NOT NULL
|
|
AND ($2::uuid IS NULL OR r.space_id = $2)
|
|
ORDER BY r.embedding <=> $1::vector LIMIT $3
|
|
`, [literal, space_id, perBranchLimit]);
|
|
all.push(...rows);
|
|
}
|
|
if (want('source_doc')) {
|
|
const { rows } = await pool.query(`
|
|
SELECT 'source_doc'::text AS kind, sd.id::text, res.space_id, sd.name AS title_or_snippet,
|
|
(sd.embedding <=> $1::vector) AS dist
|
|
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id
|
|
WHERE sd.embedding IS NOT NULL
|
|
AND ($2::uuid IS NULL OR res.space_id = $2)
|
|
ORDER BY sd.embedding <=> $1::vector LIMIT $3
|
|
`, [literal, space_id, perBranchLimit]);
|
|
all.push(...rows);
|
|
}
|
|
// Messages aren't embedded in Plan 3.
|
|
|
|
return all.sort((a, b) => a.dist - b.dist);
|
|
}
|
|
|
|
// ----- RRF fusion ----------------------------------------------------------
|
|
|
|
function fuse(fts, vec, limit, offset) {
|
|
const K = 60;
|
|
const score = new Map();
|
|
function add(rows) {
|
|
rows.forEach((r, idx) => {
|
|
const key = `${r.kind}:${r.id}`;
|
|
const cur = score.get(key) || {
|
|
kind: r.kind,
|
|
id: r.id,
|
|
space_id: r.space_id,
|
|
title_or_snippet: r.title_or_snippet,
|
|
rrf: 0
|
|
};
|
|
cur.rrf += 1 / (K + idx + 1);
|
|
score.set(key, cur);
|
|
});
|
|
}
|
|
add(fts);
|
|
add(vec);
|
|
const sorted = [...score.values()].sort((a, b) => b.rrf - a.rrf);
|
|
return sorted.slice(offset, offset + limit).map(r => ({
|
|
kind: r.kind,
|
|
id: r.id,
|
|
space_id: r.space_id,
|
|
title_or_snippet: r.title_or_snippet,
|
|
rank: r.rrf
|
|
}));
|
|
}
|
|
|
|
// ----- Public API ----------------------------------------------------------
|
|
|
|
export async function fts({ q, space_id = null, kinds = null, limit = 50, offset = 0 } = {}) {
|
|
if (!q || typeof q !== 'string') return [];
|
|
const normalizedKinds = Array.isArray(kinds) && kinds.length ? kinds : null;
|
|
const perBranchLimit = limit * 3;
|
|
|
|
const ftsP = ftsRows({ q, space_id, kinds: normalizedKinds, perBranchLimit });
|
|
|
|
let vec = [];
|
|
try {
|
|
const raw = await embedText(q, { timeoutMs: 5_000 });
|
|
vec = await vectorRows({
|
|
qvec: padTo(raw, 1024),
|
|
space_id,
|
|
kinds: normalizedKinds,
|
|
perBranchLimit
|
|
});
|
|
} catch (e) {
|
|
log.debug({ err: e }, 'search vector branch skipped — falling back to FTS-only');
|
|
}
|
|
|
|
const ftsResult = await ftsP;
|
|
return fuse(ftsResult, vec, limit, offset);
|
|
}
|
|
|
|
// Alias for clarity at callsites that want the hybrid name.
|
|
export async function hybrid(args) { return fts(args); }
|