import json import psycopg from psycopg.rows import dict_row from .config import DATABASE_URL def _conn(): return psycopg.connect( DATABASE_URL, autocommit=True, row_factory=dict_row, client_encoding='UTF8' ) def _maybe_json(v): return json.dumps(v) if isinstance(v, (dict, list)) else v def update_ref(ref_id, *, body_text=None, metadata_patch=None): """UPDATE refs ... and emit an audit_log row with actor_kind='worker'.""" with _conn() as conn: before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone() sets, args = [], [] if body_text is not None: sets.append("body_text=%s"); args.append(body_text) if metadata_patch is not None: sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") args.append(json.dumps(metadata_patch)) if not sets: return before sets.append("updated_at=now()") args.append(ref_id) after = conn.execute( f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *", args ).fetchone() diff = {} if body_text is not None and (before or {}).get("body_text") != body_text: diff["body_text"] = {"before_len": len((before or {}).get("body_text") or ""), "after_len": len(body_text)} conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'ref', %s, 'update', %s) """, (ref_id, json.dumps({"kind": "update", "changes": diff}))) return after def create_ref(input_): """INSERT INTO refs(...) RETURNING id; emit an audit_log row.""" fields = ["space_id", "kind", "source_url", "title", "summary", "body_text", "blob_path", "metadata", "source_kind", "external_id", "captured_at"] cols, vals = [], [] for f in fields: if f in input_: cols.append(f) vals.append(_maybe_json(input_[f])) placeholders = ",".join("%s" for _ in cols) sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" with _conn() as conn: ref_id = conn.execute(sql, vals).fetchone()["id"] conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'ref', %s, 'create', %s) """, (ref_id, json.dumps({"kind": "create"}))) return str(ref_id) def get_source_doc(source_doc_id): with _conn() as conn: return conn.execute( "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) ).fetchone() def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None): with _conn() as conn: before = conn.execute( "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) ).fetchone() sets, args = [], [] if body_text is not None: sets.append("body_text=%s"); args.append(body_text) if last_synced is not None: sets.append("last_synced=%s"); args.append(last_synced) if metadata_patch is not None: sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") args.append(json.dumps(metadata_patch)) if not sets: return before sets.append("updated_at=now()") args.append(source_doc_id) after = conn.execute( f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args ).fetchone() conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'source_doc', %s, 'update', %s) """, (source_doc_id, json.dumps({"kind": "update", "changes": {"body_text": "updated" if body_text is not None else None}}))) return after