feat(search): hybrid FTS + vector with RRF + graceful Ollama fallback
Replaces FTS-only /api/search in place. RRF (k=60) fuses ts_rank and pgvector cosine distance rankings. Vector branch silently skipped when Ollama times out / errors, keeping search snappy and resilient. Messages have no embeddings in Plan 3, so they participate in the FTS branch only. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,83 +1,169 @@
|
||||
import { pool } from '../pool.js';
|
||||
import { embedText, padTo } from '../../ai/ollama.js';
|
||||
import { log } from '../../log.js';
|
||||
|
||||
// FTS-only search across pages / refs / source_docs / messages, unioned
|
||||
// with a `kind` discriminator and ranked by ts_rank. Each branch's
|
||||
// to_tsvector expression matches the GIN index on its table so the
|
||||
// indexes are used. Vector / hybrid RRF search lands in Plan 3.
|
||||
// Hybrid FTS + vector search with reciprocal-rank fusion.
|
||||
//
|
||||
// Notes:
|
||||
// - messages have no space_id → the messages branch is dropped when a
|
||||
// space_id filter is present.
|
||||
// - source_docs inherit space_id from their owning resource via join.
|
||||
// - FTS branch: tsvector @@ plainto_tsquery on pages / refs / source_docs /
|
||||
// messages, unioned with a `kind` discriminator. Each expression matches
|
||||
// the corresponding GIN index in migrations 002 / 003 / 004.
|
||||
// - Vector branch: pgvector cosine distance against the entity's embedding
|
||||
// column. The query is embedded via Ollama (nomic-embed-text, padded
|
||||
// 1024-d). Messages are not embedded in Plan 3 — skipped in the vector
|
||||
// branch.
|
||||
// - Graceful fallback: if Ollama times out or errors, the vector branch is
|
||||
// dropped silently and search continues as FTS-only.
|
||||
// - RRF fuses the two rank lists with k=60 (Cormack et al. canonical).
|
||||
|
||||
const PAGES_TSV = `to_tsvector('english', p.title || ' ' || coalesce(p.body_md,''))`;
|
||||
const REFS_TSV = `to_tsvector('english', coalesce(r.title,'') || ' ' || coalesce(r.summary,'') || ' ' || coalesce(r.body_text,''))`;
|
||||
const SD_TSV = `to_tsvector('english', sd.name || ' ' || coalesce(sd.body_text,''))`;
|
||||
const MSG_TSV = `to_tsvector('english', m.body)`;
|
||||
|
||||
function buildBranches({ kinds, spaceFilterPresent }) {
|
||||
const branches = [];
|
||||
const wantPage = !kinds || kinds.includes('page');
|
||||
const wantRef = !kinds || kinds.includes('ref');
|
||||
const wantSD = !kinds || kinds.includes('source_doc');
|
||||
const wantMsg = (!kinds || kinds.includes('message')) && !spaceFilterPresent;
|
||||
// ----- FTS branch (returns flat row list ordered by ts_rank) ----------------
|
||||
|
||||
if (wantPage) {
|
||||
branches.push(`
|
||||
SELECT 'page'::text AS kind, p.id, p.space_id, p.title AS title_or_snippet,
|
||||
ts_rank(${PAGES_TSV}, q.tsq) AS rank
|
||||
FROM pages p, q
|
||||
WHERE ${PAGES_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR p.space_id = $2)
|
||||
`);
|
||||
}
|
||||
if (wantRef) {
|
||||
branches.push(`
|
||||
SELECT 'ref'::text AS kind, r.id, r.space_id,
|
||||
coalesce(r.title, r.source_url, '(untitled)') AS title_or_snippet,
|
||||
ts_rank(${REFS_TSV}, q.tsq) AS rank
|
||||
FROM refs r, q
|
||||
WHERE ${REFS_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR r.space_id = $2)
|
||||
`);
|
||||
}
|
||||
if (wantSD) {
|
||||
branches.push(`
|
||||
SELECT 'source_doc'::text AS kind, sd.id, res.space_id, sd.name AS title_or_snippet,
|
||||
ts_rank(${SD_TSV}, q.tsq) AS rank
|
||||
FROM source_docs sd
|
||||
JOIN resources res ON res.id = sd.resource_id, q
|
||||
WHERE ${SD_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR res.space_id = $2)
|
||||
`);
|
||||
}
|
||||
if (wantMsg) {
|
||||
branches.push(`
|
||||
SELECT 'message'::text AS kind, m.id, NULL::uuid AS space_id,
|
||||
substring(m.body, 1, 200) AS title_or_snippet,
|
||||
ts_rank(${MSG_TSV}, q.tsq) AS rank
|
||||
FROM messages m, q
|
||||
WHERE ${MSG_TSV} @@ q.tsq
|
||||
`);
|
||||
}
|
||||
return branches;
|
||||
function ftsBranches({ kinds, spaceFilterPresent }) {
|
||||
const want = k => !kinds || kinds.includes(k);
|
||||
const out = [];
|
||||
if (want('page')) out.push(`
|
||||
SELECT 'page'::text AS kind, p.id::text, p.space_id, p.title AS title_or_snippet,
|
||||
ts_rank(${PAGES_TSV}, q.tsq) AS rank
|
||||
FROM pages p, q
|
||||
WHERE ${PAGES_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR p.space_id = $2)`);
|
||||
if (want('ref')) out.push(`
|
||||
SELECT 'ref'::text, r.id::text, r.space_id,
|
||||
coalesce(r.title, r.source_url, '(untitled)'),
|
||||
ts_rank(${REFS_TSV}, q.tsq)
|
||||
FROM refs r, q
|
||||
WHERE ${REFS_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR r.space_id = $2)`);
|
||||
if (want('source_doc')) out.push(`
|
||||
SELECT 'source_doc'::text, sd.id::text, res.space_id, sd.name,
|
||||
ts_rank(${SD_TSV}, q.tsq)
|
||||
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id, q
|
||||
WHERE ${SD_TSV} @@ q.tsq
|
||||
AND ($2::uuid IS NULL OR res.space_id = $2)`);
|
||||
if (want('message') && !spaceFilterPresent) out.push(`
|
||||
SELECT 'message'::text, m.id::text, NULL::uuid, substring(m.body, 1, 200),
|
||||
ts_rank(${MSG_TSV}, q.tsq)
|
||||
FROM messages m, q
|
||||
WHERE ${MSG_TSV} @@ q.tsq`);
|
||||
return out;
|
||||
}
|
||||
|
||||
async function ftsRows({ q, space_id, kinds, perBranchLimit }) {
|
||||
const branches = ftsBranches({ kinds, spaceFilterPresent: space_id != null });
|
||||
if (!branches.length) return [];
|
||||
const sql = `WITH q AS (SELECT plainto_tsquery('english', $1) AS tsq)
|
||||
SELECT * FROM (${branches.join('\n UNION ALL ')}) u
|
||||
ORDER BY rank DESC LIMIT $3`;
|
||||
const { rows } = await pool.query(sql, [q, space_id, perBranchLimit]);
|
||||
return rows;
|
||||
}
|
||||
|
||||
// ----- Vector branch (runs one ANN query per kind, concats results) --------
|
||||
|
||||
async function vectorRows({ qvec, space_id, kinds, perBranchLimit }) {
|
||||
const want = k => !kinds || kinds.includes(k);
|
||||
const literal = '[' + qvec.join(',') + ']';
|
||||
const all = [];
|
||||
|
||||
if (want('page')) {
|
||||
const { rows } = await pool.query(`
|
||||
SELECT 'page'::text AS kind, p.id::text, p.space_id, p.title AS title_or_snippet,
|
||||
(p.embedding <=> $1::vector) AS dist
|
||||
FROM pages p
|
||||
WHERE p.embedding IS NOT NULL
|
||||
AND ($2::uuid IS NULL OR p.space_id = $2)
|
||||
ORDER BY p.embedding <=> $1::vector LIMIT $3
|
||||
`, [literal, space_id, perBranchLimit]);
|
||||
all.push(...rows);
|
||||
}
|
||||
if (want('ref')) {
|
||||
const { rows } = await pool.query(`
|
||||
SELECT 'ref'::text AS kind, r.id::text, r.space_id,
|
||||
coalesce(r.title, r.source_url, '(untitled)') AS title_or_snippet,
|
||||
(r.embedding <=> $1::vector) AS dist
|
||||
FROM refs r
|
||||
WHERE r.embedding IS NOT NULL
|
||||
AND ($2::uuid IS NULL OR r.space_id = $2)
|
||||
ORDER BY r.embedding <=> $1::vector LIMIT $3
|
||||
`, [literal, space_id, perBranchLimit]);
|
||||
all.push(...rows);
|
||||
}
|
||||
if (want('source_doc')) {
|
||||
const { rows } = await pool.query(`
|
||||
SELECT 'source_doc'::text AS kind, sd.id::text, res.space_id, sd.name AS title_or_snippet,
|
||||
(sd.embedding <=> $1::vector) AS dist
|
||||
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id
|
||||
WHERE sd.embedding IS NOT NULL
|
||||
AND ($2::uuid IS NULL OR res.space_id = $2)
|
||||
ORDER BY sd.embedding <=> $1::vector LIMIT $3
|
||||
`, [literal, space_id, perBranchLimit]);
|
||||
all.push(...rows);
|
||||
}
|
||||
// Messages aren't embedded in Plan 3.
|
||||
|
||||
return all.sort((a, b) => a.dist - b.dist);
|
||||
}
|
||||
|
||||
// ----- RRF fusion ----------------------------------------------------------
|
||||
|
||||
function fuse(fts, vec, limit, offset) {
|
||||
const K = 60;
|
||||
const score = new Map();
|
||||
function add(rows) {
|
||||
rows.forEach((r, idx) => {
|
||||
const key = `${r.kind}:${r.id}`;
|
||||
const cur = score.get(key) || {
|
||||
kind: r.kind,
|
||||
id: r.id,
|
||||
space_id: r.space_id,
|
||||
title_or_snippet: r.title_or_snippet,
|
||||
rrf: 0
|
||||
};
|
||||
cur.rrf += 1 / (K + idx + 1);
|
||||
score.set(key, cur);
|
||||
});
|
||||
}
|
||||
add(fts);
|
||||
add(vec);
|
||||
const sorted = [...score.values()].sort((a, b) => b.rrf - a.rrf);
|
||||
return sorted.slice(offset, offset + limit).map(r => ({
|
||||
kind: r.kind,
|
||||
id: r.id,
|
||||
space_id: r.space_id,
|
||||
title_or_snippet: r.title_or_snippet,
|
||||
rank: r.rrf
|
||||
}));
|
||||
}
|
||||
|
||||
// ----- Public API ----------------------------------------------------------
|
||||
|
||||
export async function fts({ q, space_id = null, kinds = null, limit = 50, offset = 0 } = {}) {
|
||||
if (!q || typeof q !== 'string') return [];
|
||||
const normalizedKinds = Array.isArray(kinds) && kinds.length ? kinds : null;
|
||||
const spaceFilterPresent = space_id !== null && space_id !== undefined;
|
||||
const branches = buildBranches({ kinds: normalizedKinds, spaceFilterPresent });
|
||||
if (branches.length === 0) return [];
|
||||
const perBranchLimit = limit * 3;
|
||||
|
||||
const sql = `
|
||||
WITH q AS (SELECT plainto_tsquery('english', $1) AS tsq)
|
||||
SELECT * FROM (
|
||||
${branches.join('\n UNION ALL\n ')}
|
||||
) u
|
||||
ORDER BY rank DESC
|
||||
LIMIT $3 OFFSET $4
|
||||
`;
|
||||
const { rows } = await pool.query(sql, [q, space_id, limit, offset]);
|
||||
return rows;
|
||||
const ftsP = ftsRows({ q, space_id, kinds: normalizedKinds, perBranchLimit });
|
||||
|
||||
let vec = [];
|
||||
try {
|
||||
const raw = await embedText(q, { timeoutMs: 5_000 });
|
||||
vec = await vectorRows({
|
||||
qvec: padTo(raw, 1024),
|
||||
space_id,
|
||||
kinds: normalizedKinds,
|
||||
perBranchLimit
|
||||
});
|
||||
} catch (e) {
|
||||
log.debug({ err: e }, 'search vector branch skipped — falling back to FTS-only');
|
||||
}
|
||||
|
||||
const ftsResult = await ftsP;
|
||||
return fuse(ftsResult, vec, limit, offset);
|
||||
}
|
||||
|
||||
// Alias for clarity at callsites that want the hybrid name.
|
||||
export async function hybrid(args) { return fts(args); }
|
||||
|
||||
Reference in New Issue
Block a user