From ec4cccc72046fa483be9f6de8b1531c1a0651889 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 11:24:24 +0300 Subject: [PATCH 01/12] chore: add tests/__init__.py for lerim-cloud .pth compatibility The lerim-cloud .pth file in the venv makes lerim-cloud's tests/ package shadow lerim-cli's tests/ directory. Adding __init__.py ensures Python resolves lerim-cli's tests first. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/__init__.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 From 1fee16b6d566629586cd0a2cc5a2d3f19a59c9e1 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 14:11:47 +0300 Subject: [PATCH 02/12] opt: quality criteria in extraction signature | extraction 0.841 Added QUALITY BAR section to MemoryExtractSignature: atomic, actionable, context-independent, structured body, durable. Extraction improved +0.022 on 100 cases. Dedup -0.056 is within 3-case noise. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/extract_pipeline.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/lerim/memory/extract_pipeline.py b/src/lerim/memory/extract_pipeline.py index 363f3ed..29a714f 100644 --- a/src/lerim/memory/extract_pipeline.py +++ b/src/lerim/memory/extract_pipeline.py @@ -80,6 +80,13 @@ class MemoryExtractSignature(dspy.Signature): ✗ Agent: "The extraction pipeline uses DSPy Predict" → just DESCRIBED existing code ✗ Agent: "B2B SaaS typically converts at 5-7%" → UNSOLICITED generic statistic, not tied to a user question + QUALITY BAR for each candidate: + - Atomic: ONE decision or learning per candidate. Don't bundle multiple items. + - Actionable: must change how an agent behaves in a future session. + - Context-independent: understandable without the original conversation. + - Structured body: lead with the rule/fact, then WHY, then HOW TO APPLY. + - Durable: still relevant weeks or months later, not tied to a specific moment. + Kind (for learnings only): - insight: a reusable observation or pattern - procedure: a step-by-step fix or workflow From d04d639cfae965a5dc498a29a96e03b4613d95c1 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 14:33:57 +0300 Subject: [PATCH 03/12] opt: body structure WHY + HOW TO APPLY in schemas.py | extraction 0.848 Updated MemoryCandidate body field description: "lead with rule/fact, then WHY, then HOW TO APPLY". Aligned with Claude Code memory body structure. Extraction +0.007 (cumulative +0.029 from baseline). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lerim/memory/schemas.py b/src/lerim/memory/schemas.py index 5f0b178..babef4d 100644 --- a/src/lerim/memory/schemas.py +++ b/src/lerim/memory/schemas.py @@ -28,7 +28,7 @@ class MemoryCandidate(BaseModel): description="Subtype for learnings: insight, procedure, friction, pitfall, or preference. Must be null for decisions.", ) title: str = Field(description="Short descriptive title starting with a verb or noun phrase. Format: 'Use X for Y', 'Switch to X', 'X causes Y'. Max 10 words. Must be specific enough to identify the topic without reading the body.") - body: str = Field(description="Memory content in plain language. Must add substantive information beyond the title — include the WHY (rationale), WHAT was considered (alternatives), and CONTEXT (when this applies). Minimum 2 sentences.") + body: str = Field(description="Memory content. Structure: lead with the rule or fact, then WHY (rationale/motivation), then HOW TO APPLY (when and where this matters in future sessions). Must be understandable without the original conversation. Minimum 2 sentences.") confidence: float | None = Field( default=None, ge=0.0, From 841c0e473c93e2cdab17599e6a837e8ae41e1ae2 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 17:53:57 +0300 Subject: [PATCH 04/12] opt: add positive WHY+HOW TO APPLY example | extraction 0.845 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a positive ✓ example demonstrating body structure: "WHY: mocked tests passed but prod migration failed. HOW TO APPLY: integration tests must hit real database." Reinforces exp021+exp022 quality criteria by demonstration. Extraction 0.845 (within noise of best 0.848). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/extract_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lerim/memory/extract_pipeline.py b/src/lerim/memory/extract_pipeline.py index 29a714f..0d24362 100644 --- a/src/lerim/memory/extract_pipeline.py +++ b/src/lerim/memory/extract_pipeline.py @@ -76,6 +76,7 @@ class MemoryExtractSignature(dspy.Signature): ✓ User: "use tabs for indentation" → preference (user STATED) ✓ Agent: "vllm-mlx crashes with concurrent Metal requests" → pitfall (DISCOVERED through debugging) ✓ User asked to research pitch deck structure → agent synthesized: "Best decks follow problem-solution-traction-ask arc" → insight (RESEARCHED and CONCLUDED for this project) + ✓ User: "don't mock the database in tests" → feedback. Body: "WHY: mocked tests passed but prod migration failed. HOW TO APPLY: integration tests must hit real database." ✗ Agent: "The config file has sync_interval = 10" → just REPORTED a config value ✗ Agent: "The extraction pipeline uses DSPy Predict" → just DESCRIBED existing code ✗ Agent: "B2B SaaS typically converts at 5-7%" → UNSOLICITED generic statistic, not tied to a user question From 93ac8b3c831a1d45bc9374047ad7daa178e249e1 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 20:45:52 +0300 Subject: [PATCH 05/12] fix (adapter): Enhance iter_sessions to skip subagent transcripts and short sessions Updated the iter_sessions function to skip processing of sidechain transcripts and sessions with fewer than 6 conversation turns. This change prevents double-counting of content and ensures only meaningful interactions are considered. Adjusted unit tests to reflect the new minimum turn requirement for session filtering. --- src/lerim/adapters/claude.py | 15 +++++++++++++++ tests/unit/test_claude_adapter.py | 29 +++++++++-------------------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/lerim/adapters/claude.py b/src/lerim/adapters/claude.py index 727372e..9d63c46 100644 --- a/src/lerim/adapters/claude.py +++ b/src/lerim/adapters/claude.py @@ -234,6 +234,21 @@ def iter_sessions( entries = load_jsonl_dict_lines(path) if not entries: continue + + # Skip subagent/sidechain transcripts — their content flows back to + # the parent session via tool results, so extracting from both would + # double-count. Also skip tiny sessions (< 6 conversation turns) which + # are typically eval judge calls or trivial interactions. + is_sidechain = any(e.get("isSidechain") for e in entries[:5]) + if is_sidechain: + continue + conv_turns = sum( + 1 for e in entries + if e.get("type") in ("user", "assistant") + ) + if conv_turns < 6: + continue + started_at: datetime | None = None repo_name: str | None = None cwd: str | None = None diff --git a/tests/unit/test_claude_adapter.py b/tests/unit/test_claude_adapter.py index 91c5c3f..4668f7a 100644 --- a/tests/unit/test_claude_adapter.py +++ b/tests/unit/test_claude_adapter.py @@ -169,26 +169,15 @@ def test_iter_sessions_window_filtering(tmp_path): def test_iter_sessions_skips_known_ids(tmp_path): """iter_sessions skips sessions whose run_id is already known.""" - _write_claude_jsonl( - tmp_path / "known.jsonl", - [ - { - "type": "user", - "message": {"content": "hi"}, - "timestamp": "2026-02-20T10:00:00Z", - }, - ], - ) - _write_claude_jsonl( - tmp_path / "new.jsonl", - [ - { - "type": "user", - "message": {"content": "hi"}, - "timestamp": "2026-02-20T10:00:00Z", - }, - ], - ) + # Sessions need >= 6 conversation turns to pass the min-turn filter + _turns = [ + {"type": "user", "message": {"content": f"msg {i}"}, "timestamp": "2026-02-20T10:00:00Z"} + if i % 2 == 0 else + {"type": "assistant", "message": {"content": f"reply {i}"}, "timestamp": "2026-02-20T10:00:00Z"} + for i in range(8) + ] + _write_claude_jsonl(tmp_path / "known.jsonl", _turns) + _write_claude_jsonl(tmp_path / "new.jsonl", _turns) # Skip "known" by providing its ID records = iter_sessions( traces_dir=tmp_path, From ee579e8888abfd7599f16ea01e6e865d827dab92 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Sun, 29 Mar 2026 20:53:18 +0300 Subject: [PATCH 06/12] refactor(cli, memory): Update memory reset command to include cache deletion Enhanced the memory reset command help text to clarify that it now wipes cache data along with memory, workspace, and index data. Updated the reset_memory_root function to delete the cache directory and added a note about clearing the adapter cache for improved session management. This change ensures users are fully informed about the implications of the reset operation. --- src/lerim/app/cli.py | 14 +++++++------- src/lerim/memory/memory_repo.py | 9 +++++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/lerim/app/cli.py b/src/lerim/app/cli.py index f8ba3c2..47044da 100644 --- a/src/lerim/app/cli.py +++ b/src/lerim/app/cli.py @@ -1445,20 +1445,20 @@ def build_parser() -> argparse.ArgumentParser: memory_reset = memory_sub.add_parser( "reset", formatter_class=_F, - help="DESTRUCTIVE: wipe memory, workspace, and index data", + help="DESTRUCTIVE: wipe memory, workspace, cache, and index data", description=( - "Irreversibly delete memory/, workspace/, and index/ under the selected\n" - "scope, then recreate canonical empty folders.\n\n" + "Irreversibly delete memory/, workspace/, index/, and cache/ under the\n" + "selected scope, then recreate canonical empty folders.\n\n" "Scopes:\n" " project -- reset /.lerim/ only\n" - " global -- reset ~/.lerim/ only (includes sessions DB)\n" + " global -- reset ~/.lerim/ only (includes sessions DB + cache)\n" " both -- reset both project and global roots (default)\n\n" - "The sessions DB lives in global index/, so --scope project alone\n" - "does NOT reset the session queue. Use 'global' or 'both' for a full wipe.\n\n" + "The sessions DB lives in global index/, and compacted session traces\n" + "live in global cache/. Use 'global' or 'both' for a full wipe.\n\n" "Examples:\n" " lerim memory reset --yes # wipe everything\n" " lerim memory reset --scope project --yes # project data only\n" - " lerim memory reset --yes && lerim sync --max-sessions 5 # fresh start" + " lerim memory reset --yes && lerim up --build # fresh start" ), ) memory_reset.add_argument( diff --git a/src/lerim/memory/memory_repo.py b/src/lerim/memory/memory_repo.py index 021ebf1..78a86d9 100644 --- a/src/lerim/memory/memory_repo.py +++ b/src/lerim/memory/memory_repo.py @@ -57,9 +57,14 @@ def ensure_memory_paths(paths: MemoryPaths) -> None: def reset_memory_root(paths: MemoryPaths) -> dict[str, list[str]]: - """Delete memory/index/workspace trees for a root and recreate canonical layout.""" + """Delete memory/index/workspace/cache trees for a root and recreate canonical layout. + + Also clears the adapter cache (compacted session traces) so the next sync + re-discovers and re-filters sessions from scratch. + """ removed: list[str] = [] - for path in (paths.memory_dir, paths.workspace_dir, paths.index_dir): + cache_dir = paths.data_dir / "cache" + for path in (paths.memory_dir, paths.workspace_dir, paths.index_dir, cache_dir): if path.exists(): if path.is_dir(): shutil.rmtree(path, ignore_errors=True) From 3d5f6587de87bec83a7c44231aae8fbc13cc98dd Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Mon, 30 Mar 2026 22:48:34 +0300 Subject: [PATCH 07/12] fix(memory): add similarity normalization, rich metadata, and schema fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - memory_record: persist source_speaker and durability in frontmatter (data was silently lost) - memory_index: normalize find_similar output with fused_score, similarity, lexical_similarity - oai_tools: fix batch_dedup score bug (was returning 0 for everything), add write_memory source_speaker/durability/outcome params with validation - oai_sync: update dedup thresholds (0.7→0.75, 0.4→0.45), instruct agent to pass rich metadata - tests: update for new frontmatter keys and similarity fields Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/memory_index.py | 59 ++++++++++++++++++-- src/lerim/memory/memory_record.py | 6 +++ src/lerim/runtime/oai_tools.py | 72 ++++++++++++++++++++++--- src/lerim/runtime/prompts/oai_sync.py | 16 ++++-- tests/unit/test_memory_index.py | 4 ++ tests/unit/test_memory_record.py | 2 + tests/unit/test_oai_tools.py | 42 +++++++++++++++ tests/unit/test_regression_contracts.py | 2 + 8 files changed, 188 insertions(+), 15 deletions(-) diff --git a/src/lerim/memory/memory_index.py b/src/lerim/memory/memory_index.py index d944274..6529d12 100644 --- a/src/lerim/memory/memory_index.py +++ b/src/lerim/memory/memory_index.py @@ -70,6 +70,38 @@ def _extract_terms(text: str, max_terms: int = 10) -> list[str]: return terms +def _normalize_similarity(value: float) -> float: + """Clamp a similarity-like value into the 0.0-1.0 range.""" + return max(0.0, min(1.0, float(value))) + + +def _cosine_similarity_from_distance(distance: float | None) -> float | None: + """Convert sqlite-vec cosine distance into normalized similarity.""" + if distance is None: + return None + try: + return _normalize_similarity(1.0 - float(distance)) + except (TypeError, ValueError): + return None + + +def _term_set(text: str) -> set[str]: + """Return normalized content terms for overlap scoring.""" + return set(_extract_terms(text, max_terms=32)) + + +def _token_overlap_similarity(left: str, right: str) -> float: + """Cheap lexical similarity using Jaccard overlap over content terms.""" + left_terms = _term_set(left) + right_terms = _term_set(right) + if not left_terms or not right_terms: + return 0.0 + union = left_terms | right_terms + if not union: + return 0.0 + return _normalize_similarity(len(left_terms & right_terms) / len(union)) + + # ── embedding helpers ──────────────────────────────────────────────────── _EMBED_MODEL = None @@ -477,6 +509,7 @@ def search( d.memory_id, d.title, d.tags, + d.body, d.confidence, d.primitive, d.kind, @@ -523,7 +556,7 @@ def _vector_search(self, query_vec: list[float], limit: int = 10) -> list[dict[s placeholders = ", ".join("?" for _ in ids) meta_rows = conn.execute( f""" - SELECT memory_id, title, tags, confidence, primitive, kind, file_path + SELECT memory_id, title, tags, body, confidence, primitive, kind, file_path FROM memory_docs WHERE memory_id IN ({placeholders}) """, ids, @@ -536,6 +569,7 @@ def _vector_search(self, query_vec: list[float], limit: int = 10) -> list[dict[s if mid in meta_by_id: row = dict(meta_by_id[mid]) row["distance"] = vr["distance"] + row["similarity"] = _cosine_similarity_from_distance(vr.get("distance")) results.append(row) return results @@ -577,20 +611,37 @@ def find_similar( k = 60 rrf_scores: dict[str, float] = {} data: dict[str, dict[str, Any]] = {} + candidate_text = f"{title}\n{tags}\n{body}" for rank, r in enumerate(fts_results): mid = r["memory_id"] rrf_scores[mid] = rrf_scores.get(mid, 0) + 1 / (k + rank) - data.setdefault(mid, r) + row = dict(r) + existing_text = f"{row.get('title', '')}\n{row.get('tags', '')}\n{row.get('body', '')}" + row["lexical_similarity"] = _token_overlap_similarity(candidate_text, existing_text) + data.setdefault(mid, row) for rank, r in enumerate(vec_results): mid = r["memory_id"] rrf_scores[mid] = rrf_scores.get(mid, 0) + 1 / (k + rank) - data.setdefault(mid, r) + row = dict(r) + existing_text = f"{row.get('title', '')}\n{row.get('tags', '')}\n{row.get('body', '')}" + row["lexical_similarity"] = _token_overlap_similarity(candidate_text, existing_text) + data.setdefault(mid, row) # Sort by fused score descending, return top-limit. ranked = sorted(rrf_scores, key=lambda mid: rrf_scores[mid], reverse=True)[:limit] - return [data[mid] for mid in ranked] + results: list[dict[str, Any]] = [] + for mid in ranked: + row = dict(data[mid]) + row["fused_score"] = round(rrf_scores[mid], 6) + row["similarity"] = _normalize_similarity( + row.get("similarity") + or row.get("lexical_similarity") + or 0.0 + ) + results.append(row) + return results # ── scan ───────────────────────────────────────────────────────────── diff --git a/src/lerim/memory/memory_record.py b/src/lerim/memory/memory_record.py index 4012a5d..aaa1232 100644 --- a/src/lerim/memory/memory_record.py +++ b/src/lerim/memory/memory_record.py @@ -42,6 +42,8 @@ class MemoryType(str, Enum): "updated", "source", "confidence", + "source_speaker", + "durability", "tags", ], MemoryType.learning: [ @@ -52,6 +54,8 @@ class MemoryType(str, Enum): "source", "kind", "confidence", + "source_speaker", + "durability", "tags", ], } @@ -108,6 +112,8 @@ def to_frontmatter_dict(self) -> dict: "updated": self.updated.isoformat(), "source": self.source, "confidence": self.confidence, + "source_speaker": self.source_speaker, + "durability": self.durability, "tags": list(self.tags), } if self.primitive == MemoryType.learning: diff --git a/src/lerim/runtime/oai_tools.py b/src/lerim/runtime/oai_tools.py index d604e0c..6508911 100644 --- a/src/lerim/runtime/oai_tools.py +++ b/src/lerim/runtime/oai_tools.py @@ -44,6 +44,9 @@ from lerim.runtime.oai_context import OAIRuntimeContext VALID_KINDS = {"insight", "procedure", "friction", "pitfall", "preference"} +VALID_SOURCE_SPEAKERS = {"user", "agent", "both"} +VALID_DURABILITY = {"permanent", "project", "session"} +VALID_OUTCOMES = {"worked", "failed", "unknown"} def _is_within(path: Path, root: Path) -> bool: @@ -78,6 +81,9 @@ def _write_memory_impl( confidence: float = 0.8, tags: str = "", kind: str = "", + source_speaker: str = "both", + durability: str = "project", + outcome: str = "", ) -> str: """Core write_memory logic — separated for direct unit testing.""" ctx = wrapper.context @@ -109,6 +115,27 @@ def _write_memory_impl( "Example: write_memory(primitive='learning', title='...', body='...', kind='insight')" ) + effective_source_speaker = source_speaker.strip() or "both" + if effective_source_speaker not in VALID_SOURCE_SPEAKERS: + return ( + f"ERROR: source_speaker={effective_source_speaker!r} is invalid. " + f"Must be one of: {', '.join(sorted(VALID_SOURCE_SPEAKERS))}." + ) + + effective_durability = durability.strip() or "project" + if effective_durability not in VALID_DURABILITY: + return ( + f"ERROR: durability={effective_durability!r} is invalid. " + f"Must be one of: {', '.join(sorted(VALID_DURABILITY))}." + ) + + effective_outcome = outcome.strip() or None + if effective_outcome is not None and effective_outcome not in VALID_OUTCOMES: + return ( + f"ERROR: outcome={effective_outcome!r} is invalid. " + f"Must be one of: {', '.join(sorted(VALID_OUTCOMES))}." + ) + tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else [] try: @@ -119,6 +146,9 @@ def _write_memory_impl( confidence=confidence, tags=tag_list, kind=effective_kind, + source_speaker=cast(Literal["user", "agent", "both"], effective_source_speaker), + durability=cast(Literal["permanent", "project", "session"], effective_durability), + outcome=cast(Literal["worked", "failed", "unknown"] | None, effective_outcome), id=slugify(title), source=ctx.run_id, ) @@ -126,7 +156,8 @@ def _write_memory_impl( return ( f"ERROR: Invalid memory fields: {exc}. " "Required: primitive ('decision'|'learning'), title (non-empty string), body (non-empty string). " - "Optional: confidence (0.0-1.0, default 0.8), tags (comma-separated), kind (required for learnings)." + "Optional: confidence (0.0-1.0, default 0.8), tags (comma-separated), " + "kind (required for learnings), source_speaker, durability, outcome." ) mem_type = MemoryType(record.primitive) @@ -155,6 +186,9 @@ def write_memory( confidence: float = 0.8, tags: str = "", kind: str = "", + source_speaker: str = "both", + durability: str = "project", + outcome: str = "", ) -> str: """Create a memory file (decision or learning) under memory_root. @@ -170,8 +204,22 @@ def write_memory( confidence: Float 0.0-1.0. Default 0.8. Higher = more certain. tags: Comma-separated tags. Example: "queue,reliability". kind: Required for learnings. One of: "friction", "insight", "pitfall", "preference", "procedure". + source_speaker: Who originated the memory: "user", "agent", or "both". + durability: Expected lifespan: "permanent", "project", or "session". + outcome: Optional validation status: "worked", "failed", or "unknown". """ - return _write_memory_impl(wrapper, primitive, title, body, confidence, tags, kind) + return _write_memory_impl( + wrapper, + primitive, + title, + body, + confidence, + tags, + kind, + source_speaker, + durability, + outcome, + ) @function_tool @@ -600,7 +648,7 @@ def memory_search( Use primitive= to filter by "decision" or "learning". mode="keyword": BM25-ranked keyword search. Returns {mode, query, count, results}. mode="similar": Find memories similar to a candidate for dedup. Pass title + body + tags. - Returns {mode, count, results: [{title, score, ...}]}. + Returns {mode, count, results: [{title, similarity, lexical_similarity, fused_score, ...}]}. mode="clusters": Find groups of related memories sharing tags for merge review. Returns {mode, cluster_count, clusters: [{size, memories}]}. @@ -653,10 +701,18 @@ def _batch_dedup_candidates_impl( c_body = c.get("body", "") c_tags = ",".join(c.get("tags", [])) if isinstance(c.get("tags"), list) else str(c.get("tags", "")) similar = index.find_similar(c_title, c_body, tags=c_tags, limit=3) + top_similarity = 0.0 + if similar: + top = similar[0] + top_similarity = float( + top.get("similarity") + or top.get("lexical_similarity") + or 0.0 + ) enriched.append({ "candidate": c, "similar_existing": similar, - "top_similarity": similar[0].get("score", 0) if similar else 0, + "top_similarity": top_similarity, }) return json.dumps({"count": len(enriched), "results": enriched}, default=str) @@ -674,11 +730,13 @@ def batch_dedup_candidates( memories and a top_similarity score for dedup classification. Interpreting top_similarity scores: - - 0.7+ : Very likely duplicate. Classify as "no_op" unless candidate has + - top_similarity uses normalized 0.0-1.0 similarity (prefer semantic similarity, + fall back to lexical overlap when vector data is unavailable). + - 0.75+ : Very likely duplicate. Classify as "no_op" unless candidate has clearly distinct information not present in the existing memory. - - 0.4-0.7 : Related topic. Read both carefully. Classify as "update" if + - 0.45-0.75 : Related topic. Read both carefully. Classify as "update" if candidate adds new facts, "no_op" if it's just rephrasing. - - Below 0.4 : Likely a new topic. Classify as "add". + - Below 0.45 : Likely a new topic. Classify as "add". - 0.0 : No existing memories at all (empty store). All candidates are "add". Returns JSON: {"count": int, "results": [{"candidate": {...}, diff --git a/src/lerim/runtime/prompts/oai_sync.py b/src/lerim/runtime/prompts/oai_sync.py index e902f6a..5731a3d 100644 --- a/src/lerim/runtime/prompts/oai_sync.py +++ b/src/lerim/runtime/prompts/oai_sync.py @@ -54,10 +54,12 @@ def build_oai_sync_prompt( For each candidate, classify using the batch dedup results: Classification rules (use top_similarity score from batch_dedup_candidates): - - top_similarity >= 0.7 AND the existing memory covers the same core topic → "no_op" - - top_similarity 0.4-0.7 AND same topic but candidate adds genuinely NEW information + - top_similarity is normalized 0.0-1.0 similarity. It prefers semantic similarity and + falls back to lexical overlap when vector similarity is unavailable. + - top_similarity >= 0.75 AND the existing memory covers the same core topic → "no_op" + - top_similarity 0.45-0.75 AND same topic but candidate adds genuinely NEW information not present in the existing memory → "update" - - top_similarity < 0.4 OR no relevant match at all → "add" + - top_similarity < 0.45 OR no relevant match at all → "add" - top_similarity == 0.0 (no existing memories) → always "add" IMPORTANT DEDUP RULES: @@ -73,8 +75,14 @@ def build_oai_sync_prompt( incorporating new information into the body. Skip "no_op" candidates. + Preserve the extracted candidate metadata when writing: + - source_speaker: "user" | "agent" | "both" + - durability: "permanent" | "project" | "session" + - outcome: "worked" | "failed" | "unknown" (optional) + write_memory(primitive="decision"|"learning", title=..., body=..., - confidence=0.0-1.0, tags="tag1,tag2", kind=...) + confidence=0.0-1.0, tags="tag1,tag2", kind=..., + source_speaker=..., durability=..., outcome=...) kind is REQUIRED for learnings: "insight", "procedure", "friction", "pitfall", or "preference". write_memory is the ONLY tool for creating memory files. Do NOT write .md files directly. diff --git a/tests/unit/test_memory_index.py b/tests/unit/test_memory_index.py index 0dbc6c9..95d9c50 100644 --- a/tests/unit/test_memory_index.py +++ b/tests/unit/test_memory_index.py @@ -156,6 +156,9 @@ def test_find_similar(index: MemoryIndex, memory_root: Path) -> None: ) assert len(results) >= 1 assert results[0]["memory_id"] == "deploy-k8s" + assert "fused_score" in results[0] + assert "similarity" in results[0] + assert "lexical_similarity" in results[0] # ── test_incremental_skip ──────────────────────────────────────────────── @@ -468,6 +471,7 @@ def test_vector_search_returns_metadata(index: MemoryIndex, memory_root: Path) - assert r["kind"] == "architectural" assert r["confidence"] == 0.95 assert "distance" in r + assert "similarity" in r # ── test_tag_edges_built ──────────────────────────────────────────────── diff --git a/tests/unit/test_memory_record.py b/tests/unit/test_memory_record.py index 5709960..5d0ebe0 100644 --- a/tests/unit/test_memory_record.py +++ b/tests/unit/test_memory_record.py @@ -102,6 +102,8 @@ def test_memory_record_to_frontmatter_dict(): "updated", "source", "confidence", + "source_speaker", + "durability", "tags", } assert set(fm.keys()) == expected_keys diff --git a/tests/unit/test_oai_tools.py b/tests/unit/test_oai_tools.py index 39d8ffe..e4fc9ca 100644 --- a/tests/unit/test_oai_tools.py +++ b/tests/unit/test_oai_tools.py @@ -101,6 +101,28 @@ def test_write_memory_valid_learning(tmp_path): assert "kind: insight" in content +def test_write_memory_persists_rich_metadata(tmp_path): + """write_memory should persist source_speaker, durability, and outcome.""" + ctx = _make_ctx(tmp_path) + result = _call_write_memory( + ctx, + primitive="learning", + title="Queue retries can fail safely", + body="Use bounded retries for flaky queue workers.", + confidence=0.85, + tags="queue,reliability", + kind="pitfall", + source_speaker="user", + durability="permanent", + outcome="worked", + ) + parsed = json.loads(result) + content = Path(parsed["file_path"]).read_text() + assert "source_speaker: user" in content + assert "durability: permanent" in content + assert "outcome: worked" in content + + def test_write_memory_tags_parsed(tmp_path): """Comma-separated tags string should be parsed into list.""" ctx = _make_ctx(tmp_path) @@ -176,6 +198,20 @@ def test_write_memory_learning_invalid_kind(tmp_path): assert "kind" in result +def test_write_memory_invalid_source_speaker(tmp_path): + """Unknown source_speaker should return an ERROR string.""" + ctx = _make_ctx(tmp_path) + result = _call_write_memory( + ctx, + primitive="decision", + title="Bad source speaker", + body="Should fail.", + source_speaker="system", + ) + assert result.startswith("ERROR:") + assert "source_speaker" in result + + def test_write_memory_empty_title(tmp_path): """Empty title should return an ERROR string.""" ctx = _make_ctx(tmp_path) @@ -975,6 +1011,12 @@ def test_batch_dedup_with_list(tmp_path): assert "candidate" in r assert "similar_existing" in r assert "top_similarity" in r + assert parsed["results"][0]["top_similarity"] > 0 + if parsed["results"][0]["similar_existing"]: + top = parsed["results"][0]["similar_existing"][0] + assert "fused_score" in top + assert "similarity" in top + assert "lexical_similarity" in top def test_batch_dedup_with_dict_candidates_key(tmp_path): diff --git a/tests/unit/test_regression_contracts.py b/tests/unit/test_regression_contracts.py index e0de208..4da4181 100644 --- a/tests/unit/test_regression_contracts.py +++ b/tests/unit/test_regression_contracts.py @@ -101,3 +101,5 @@ def test_memory_frontmatter_schema_keys(): assert MemoryType.learning in MEMORY_FRONTMATTER_SCHEMA assert "id" in MEMORY_FRONTMATTER_SCHEMA[MemoryType.decision] assert "kind" in MEMORY_FRONTMATTER_SCHEMA[MemoryType.learning] + assert "source_speaker" in MEMORY_FRONTMATTER_SCHEMA[MemoryType.decision] + assert "durability" in MEMORY_FRONTMATTER_SCHEMA[MemoryType.learning] From 14f91016184d31e053467bec876b0d72b14af9e8 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Mon, 30 Mar 2026 22:48:43 +0300 Subject: [PATCH 08/12] refactor(extract): replace Jaccard dedup with 3-signature DSPy module Replace ~130 lines of regex-based Jaccard word-matching with a DSPy module containing three optimizable signatures: - MemoryExtractSignature (existing, per-window extraction) - ConsolidateCandidatesSignature (LLM merges semantic duplicates across windows) - QualityGateSignature (LLM drops low-value candidates) Also fixes format detection to handle "type":"human" traces (was silently dropping all user messages, causing extraction to return 0 candidates). Every judgment call is now an LLM call that DSPy can optimize via autoresearch, replacing magic thresholds with model understanding. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/extract_pipeline.py | 204 ++++++++++++++++++-- tests/unit/test_extract_pipeline_quality.py | 85 ++++++++ 2 files changed, 277 insertions(+), 12 deletions(-) create mode 100644 tests/unit/test_extract_pipeline_quality.py diff --git a/src/lerim/memory/extract_pipeline.py b/src/lerim/memory/extract_pipeline.py index 0d24362..be7a5ed 100644 --- a/src/lerim/memory/extract_pipeline.py +++ b/src/lerim/memory/extract_pipeline.py @@ -1,11 +1,12 @@ -"""Extraction pipeline for session transcripts using Predict + windowing. +"""Extraction pipeline for session transcripts using DSPy modules + windowing. -session file (.jsonl/.json) -> read text -> window (if needed) -> dspy.Predict --> concat candidates from all windows. +session file (.jsonl/.json) -> read text -> window (if needed) -> dspy.ChainOfThought +-> deterministic pre-filter -> LLM consolidation -> LLM quality gate. Traces are compacted by adapters (tool outputs stripped), so most traces fit in a -single window. Windowing is a fallback for unusually large sessions. No merge or -deduplication — the downstream maintain path handles that. +single window. Windowing is a fallback for unusually large sessions. Post-extraction, +an LLM consolidation pass merges semantic duplicates across windows, and a quality +gate drops low-value candidates before the sync agent sees them. When max_workers > 1, windows are processed in parallel via ThreadPoolExecutor. Each thread gets its own DSPy LM instances for thread safety. @@ -167,11 +168,189 @@ def _filter_candidates(candidates: list[dict[str, Any]]) -> list[dict[str, Any]] # Gate 5: Session-durability items dropped if durability == "session": continue + # Gate 6: Learnings must carry a valid kind + primitive = str(item.get("primitive") or "").strip() + kind = str(item.get("kind") or "").strip() + if primitive == "learning" and kind not in { + "insight", "procedure", "friction", "pitfall", "preference" + }: + continue filtered.append(item) return filtered +class ConsolidateCandidatesSignature(dspy.Signature): + """Merge near-duplicate memory candidates extracted from overlapping transcript windows. + + You receive candidates from overlapping windows of the SAME session. Adjacent + windows share ~20% text overlap, so the same insight may appear in different + wording across windows. + + For each group of semantic duplicates: + - Keep the version with the richest, most structured body (WHY + HOW TO APPLY) + - Use the highest confidence score from the group + - Union all tags from duplicates + - If source_speaker differs across duplicates, use "both" + - If durability differs, keep the more durable (permanent > project > session) + - Preserve the most specific title + + Candidates covering DIFFERENT topics must remain separate — do not merge + unrelated items. If no duplicates exist, return the input unchanged. + """ + + candidates: list[MemoryCandidate] = dspy.InputField( + desc="All memory candidates extracted from overlapping transcript windows of one session", + ) + unique_candidates: list[MemoryCandidate] = dspy.OutputField( + desc="Deduplicated set — duplicates merged, unique items unchanged. Same MemoryCandidate schema.", + ) + + +class QualityGateSignature(dspy.Signature): + """Filter memory candidates to keep only high-quality, durable items worth persisting. + + Score each candidate against these criteria: + 1. Atomic: covers ONE decision or learning, not bundled + 2. Actionable: would change how an agent behaves in a future session + 3. Context-independent: understandable without the original conversation + 4. Structured body: rule/fact → WHY → HOW TO APPLY + 5. Durable: still relevant weeks or months later + + DROP a candidate if it: + - Fails 2+ criteria above + - Contains generic knowledge any LLM already knows + - Is code-derivable (could be learned by reading the codebase or git log) + - Bundles multiple unrelated decisions/learnings + - Is ephemeral (specific line numbers, PR comments, TODO items) + + KEEP a candidate that passes the future-self test: "Would this genuinely help + the user or their coding agent in a future session on this project?" + + Do NOT rewrite or modify candidates. Return accepted candidates exactly as + received. This is a filter, not a rewriter. + """ + + candidates: list[MemoryCandidate] = dspy.InputField( + desc="Consolidated memory candidates to evaluate for quality", + ) + accepted: list[MemoryCandidate] = dspy.OutputField( + desc="High-quality candidates that pass all criteria. Subset of input, unmodified.", + ) + + +class MemoryExtractionPipeline(dspy.Module): + """Three-stage memory extraction: extract → consolidate → quality gate. + + Stage 1 (extract): Run per transcript window, produces raw candidates. + Stage 2 (consolidate): Merge semantic duplicates across overlapping windows. + Stage 3 (quality_gate): Drop low-quality candidates using LLM judgment. + + All three stages are optimizable by DSPy (MIPROv2, BootstrapFewShot, etc.). + """ + + def __init__(self): + super().__init__() + self.extract = dspy.ChainOfThought(MemoryExtractSignature) + self.consolidate = dspy.ChainOfThought(ConsolidateCandidatesSignature) + self.quality_gate = dspy.ChainOfThought(QualityGateSignature) + + def forward(self, windows: list[str], guidance: str = "") -> dspy.Prediction: + # Stage 1: Extract from each window + all_candidates: list[dict[str, Any]] = [] + for window in windows: + result = self.extract(transcript=window, guidance=guidance) + primitives = result.primitives + if isinstance(primitives, list): + for item in primitives: + if isinstance(item, MemoryCandidate): + all_candidates.append(item.model_dump(mode="json", exclude_none=True)) + elif isinstance(item, dict): + all_candidates.append(item) + + # Deterministic pre-filter (cheap, catches obvious junk) + filtered = _filter_candidates(all_candidates) + if not filtered: + return dspy.Prediction(primitives=[]) + + # Stage 2: Consolidate cross-window duplicates + if len(filtered) > 1: + result = self.consolidate(candidates=filtered) + unique = result.unique_candidates + if isinstance(unique, list) and unique: + filtered = _to_dicts(unique) + + # Stage 3: Quality gate + result = self.quality_gate(candidates=filtered) + accepted = result.accepted + if isinstance(accepted, list): + return dspy.Prediction(primitives=_to_dicts(accepted)) + + return dspy.Prediction(primitives=filtered) + + +def _to_dicts(items: list) -> list[dict[str, Any]]: + """Normalize MemoryCandidate or dict items to plain dicts.""" + result: list[dict[str, Any]] = [] + for item in items: + if isinstance(item, MemoryCandidate): + result.append(item.model_dump(mode="json", exclude_none=True)) + elif isinstance(item, dict): + result.append(item) + return result + + +def _consolidate_and_gate(candidates: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Run LLM consolidation and quality gate on extracted candidates. + + Stage 2: Merge semantic duplicates across overlapping windows. + Stage 3: Drop low-quality candidates using LLM judgment. + + Falls back gracefully — if either stage fails, the previous result is kept. + """ + if not candidates: + return [] + + lms = configure_dspy_lms("extract") + pipeline = MemoryExtractionPipeline() + + # Stage 2: Consolidate cross-window duplicates + if len(candidates) > 1: + with logfire.span("consolidate_candidates", count=len(candidates)): + pre_count = len(candidates) + history_start = len(lms[0].history) + try: + _, result = call_with_fallback( + pipeline.consolidate, lms, candidates=candidates, + ) + unique = result.unique_candidates + if isinstance(unique, list) and unique: + candidates = _to_dicts(unique) + logger.info("Consolidation: {} → {} candidates", pre_count, len(candidates)) + except Exception: + logger.warning("Consolidation failed, keeping {} pre-filtered candidates", pre_count) + finally: + capture_dspy_cost(lms[0], history_start) + + # Stage 3: Quality gate + with logfire.span("quality_gate", count=len(candidates)): + history_start = len(lms[0].history) + try: + _, result = call_with_fallback( + pipeline.quality_gate, lms, candidates=candidates, + ) + accepted = result.accepted + if isinstance(accepted, list): + candidates = _to_dicts(accepted) + logger.info("Quality gate: {} accepted", len(candidates)) + except Exception: + logger.warning("Quality gate failed, keeping {} consolidated candidates", len(candidates)) + finally: + capture_dspy_cost(lms[0], history_start) + + return candidates + + def _format_transcript_for_extraction(raw: str) -> str: """Convert compacted JSONL transcript to clean conversation format for extraction. @@ -218,8 +397,8 @@ def _format_transcript_for_extraction(raw: str) -> str: def _detect_trace_format(lines: list[dict]) -> str: """Detect which agent produced this trace by inspecting line structure.""" for obj in lines[:5]: # check first 5 lines - # Claude: has "type" in ("user","assistant") and "message" key - if obj.get("type") in ("user", "assistant") and "message" in obj: + # Claude: has "type" in ("user","assistant","human") and "message" key + if obj.get("type") in ("user", "assistant", "human") and "message" in obj: return "claude" # Codex: has "type" in ("event_msg","response_item","session_meta") if obj.get("type") in ("event_msg", "response_item", "session_meta"): @@ -246,11 +425,12 @@ def _format_claude_line(obj: dict) -> str | None: role = msg.get("role", entry_type) content = msg.get("content") - if role == "user": + if role in ("user", "human"): text = _extract_content_text(content, skip_tool_results=True) if text: return f"[USER]\n{text}" - elif role == "assistant": + elif role in ("assistant", "ai"): + # "ai" is used by some LangChain-style traces text = _extract_content_text(content, skip_tool_results=False) if text: return f"[ASSISTANT]\n{text}" @@ -536,7 +716,7 @@ def _extract_candidates( When max_workers > 1 and multiple windows exist, processes windows in parallel via ThreadPoolExecutor. Otherwise falls back to sequential. - No merge or deduplication — maintain handles that downstream. + Applies deterministic filtering, LLM consolidation, and LLM quality gate. """ if not transcript.strip(): return [] @@ -576,13 +756,13 @@ def _extract_candidates( } for future in as_completed(futures): all_candidates.extend(future.result()) - return _filter_candidates(all_candidates) + return _consolidate_and_gate(_filter_candidates(all_candidates)) # Sequential: single-thread path (max_workers=1 or single window) all_candidates = [] for wi, window in enumerate(windows, 1): all_candidates.extend(_extract_one_window(wi, total, window, guid)) - return _filter_candidates(all_candidates) + return _consolidate_and_gate(_filter_candidates(all_candidates)) def extract_memories_from_session_file( diff --git a/tests/unit/test_extract_pipeline_quality.py b/tests/unit/test_extract_pipeline_quality.py new file mode 100644 index 0000000..a843f32 --- /dev/null +++ b/tests/unit/test_extract_pipeline_quality.py @@ -0,0 +1,85 @@ +"""Quality-oriented unit tests for extraction post-processing.""" + +from __future__ import annotations + +from lerim.memory.extract_pipeline import ( + ConsolidateCandidatesSignature, + MemoryExtractionPipeline, + QualityGateSignature, + _filter_candidates, + _to_dicts, +) +from lerim.memory.schemas import MemoryCandidate + + +def test_filter_candidates_drops_learning_without_kind(): + """Learning candidates without a valid kind should be dropped before sync.""" + candidates = [ + { + "primitive": "learning", + "title": "Queue retries need limits", + "body": "Queue retries need limits. WHY: unbounded retries cause noisy failures. HOW TO APPLY: cap attempts at three.", + "confidence": 0.8, + "durability": "project", + "tags": ["queue"], + }, + { + "primitive": "learning", + "kind": "pitfall", + "title": "Queue retries need limits", + "body": "Queue retries need limits. WHY: unbounded retries cause noisy failures. HOW TO APPLY: cap attempts at three.", + "confidence": 0.8, + "durability": "project", + "tags": ["queue"], + }, + ] + + filtered = _filter_candidates(candidates) + assert len(filtered) == 1 + assert filtered[0]["kind"] == "pitfall" + + +def test_to_dicts_normalizes_memory_candidates(): + """_to_dicts should convert MemoryCandidate objects to plain dicts.""" + candidate = MemoryCandidate( + primitive="decision", + title="Use SQLite for session catalog", + body="Use SQLite for the session catalog. WHY: lightweight, embedded. HOW TO APPLY: no external DB dependency.", + confidence=0.9, + tags=["database", "architecture"], + ) + result = _to_dicts([candidate, {"primitive": "learning", "title": "test"}]) + assert len(result) == 2 + assert isinstance(result[0], dict) + assert result[0]["primitive"] == "decision" + assert result[0]["title"] == "Use SQLite for session catalog" + assert isinstance(result[1], dict) + assert result[1]["title"] == "test" + + +def test_to_dicts_skips_non_dict_non_candidate(): + """_to_dicts should silently skip items that are neither dict nor MemoryCandidate.""" + result = _to_dicts(["string_item", 42, None]) + assert result == [] + + +def test_pipeline_module_has_three_predictors(): + """MemoryExtractionPipeline should have extract, consolidate, and quality_gate.""" + pipeline = MemoryExtractionPipeline() + assert hasattr(pipeline, "extract") + assert hasattr(pipeline, "consolidate") + assert hasattr(pipeline, "quality_gate") + + +def test_consolidate_signature_fields(): + """ConsolidateCandidatesSignature should have correct input/output fields.""" + fields = ConsolidateCandidatesSignature.model_fields + assert "candidates" in fields + assert "unique_candidates" in fields + + +def test_quality_gate_signature_fields(): + """QualityGateSignature should have correct input/output fields.""" + fields = QualityGateSignature.model_fields + assert "candidates" in fields + assert "accepted" in fields From 2a2f5a3d31473416ebe4ec87d5405318817e107b Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Mon, 30 Mar 2026 22:48:52 +0300 Subject: [PATCH 09/12] perf(summarize): replace sequential fold with parallel MapReduce tree MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the sequential refine/fold pattern (73 chunks × 45s = hours) with: - Parallel map: extract lightweight facets per chunk (~80 words each) - Tree reduce: merge facets hierarchically when they exceed context budget - Single synthesis: produce final TraceSummaryCandidate from all facets Also adds transcript formatting before windowing (13MB raw → 1.1MB formatted), reducing 73 windows to 6 and total time from hours to ~33 seconds. Key signatures: ChunkFacetSignature (map), MergeFacetsSignature (reduce), SynthesizeSummarySignature (final). All DSPy-optimizable. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/summarization_pipeline.py | 354 +++++++++++++++++---- 1 file changed, 300 insertions(+), 54 deletions(-) diff --git a/src/lerim/memory/summarization_pipeline.py b/src/lerim/memory/summarization_pipeline.py index fd2b0dd..b424438 100644 --- a/src/lerim/memory/summarization_pipeline.py +++ b/src/lerim/memory/summarization_pipeline.py @@ -1,12 +1,12 @@ -"""Trace summarization pipeline using Predict. +"""Trace summarization pipeline using DSPy modules + parallel MapReduce. Outputs markdown-frontmatter-ready metadata + summary. When --memory-root is provided, writes the summary markdown file directly to memory_root/summaries/YYYYMMDD/HHMMSS/{slug}.md using python-frontmatter. Traces are compacted by adapters (tool outputs stripped), so most traces fit in a -single LLM call. For rare oversized traces, a sequential refine/fold pattern -processes chunks in order and accumulates into a single summary. +single LLM call. For oversized traces, a parallel MapReduce with tree reduction +summarizes chunks concurrently, then merges them hierarchically. """ from __future__ import annotations @@ -17,6 +17,7 @@ import re import sys import time +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from tempfile import TemporaryDirectory @@ -36,6 +37,7 @@ window_transcript, window_transcript_jsonl, ) +from lerim.memory.extract_pipeline import _format_transcript_for_extraction from lerim.runtime.cost_tracker import capture_dspy_cost from lerim.sessions import catalog as session_db @@ -88,20 +90,284 @@ class TraceSummarySignature(dspy.Signature): ) -class RefineSummarySignature(dspy.Signature): - """Refine a session summary with the next chunk of the trace. - - You have a running summary from earlier chunks and a new chunk of the trace. - Update the summary to incorporate new information from this chunk while - preserving important details from the running summary. - """ - - running_summary: str = dspy.InputField(desc="Summary from previous chunks (JSON)") - trace_chunk: str = dspy.InputField(desc="Next chunk of the session trace") - chunk_position: str = dspy.InputField(desc="e.g. 'chunk 2 of 4'") - summary_payload: TraceSummaryCandidate = dspy.OutputField( - desc="Updated summary incorporating the new chunk" - ) +class ChunkFacetSignature(dspy.Signature): + """Extract a lightweight facet from one chunk of a session transcript. + + Produce a BRIEF bullet-point summary of what happened in this chunk. + 3-5 bullets max. Focus on: + - Decisions made (by user or agent) + - Problems encountered and how they were resolved + - Key tools or files involved + - Outcomes and results + + Be extremely concise — this will be merged with other chunk facets to + produce a full session summary. Do NOT write full paragraphs. + """ + + transcript: str = dspy.InputField( + desc="One chunk of a session transcript", + ) + chunk_position: str = dspy.InputField( + desc="e.g. 'chunk 3 of 73' — helps orient within the session", + ) + facet: str = dspy.OutputField( + desc="3-5 bullet points capturing key events, decisions, and outcomes from this chunk. Max 80 words.", + ) + tags: list[str] = dspy.OutputField( + desc="2-5 topic tags for this chunk", + ) + + +class SynthesizeSummarySignature(dspy.Signature): + """Synthesize a full session summary from ordered chunk facets. + + You receive lightweight facets (bullet points) from consecutive chunks + of the SAME coding session, in chronological order. Produce a coherent + structured summary of the ENTIRE session. + + The facets are brief — your job is to weave them into a narrative that + reads as one coherent story, not a list of per-chunk bullets. + """ + + ordered_facets: str = dspy.InputField( + desc="Chronologically ordered chunk facets, formatted as 'Chunk N: [bullets]\\n...'", + ) + summary_payload: TraceSummaryCandidate = dspy.OutputField( + desc="Full session summary with title, description, user_intent, session_narrative, and tags", + ) + + +class MergeFacetsSignature(dspy.Signature): + """Merge multiple chunk facets into a condensed intermediate facet. + + Used in tree reduction when too many facets to fit in one synthesis call. + Combine the input facets into a single, slightly longer facet that + preserves the key events and decisions from all inputs. + """ + + ordered_facets: str = dspy.InputField( + desc="Chronologically ordered chunk facets to merge", + ) + chunk_range: str = dspy.InputField( + desc="Which portion these facets cover, e.g. 'chunks 1-8 of 73'", + ) + merged_facet: str = dspy.OutputField( + desc="Condensed facet covering all input chunks. 5-10 bullet points, max 150 words.", + ) + tags: list[str] = dspy.OutputField( + desc="Combined topic tags from all merged chunks", + ) + + +class TraceSummarizationPipeline(dspy.Module): + """Two-phase summarization: parallel map (tiny facets) → reduce (synthesize). + + Map: Extract lightweight facets from each chunk (3-5 bullets, ~80 words each). + Reduce: If all facets fit in context, single synthesis call. Otherwise tree merge + facets down, then synthesize. + + All signatures are optimizable by DSPy (MIPROv2, BootstrapFewShot, etc.). + """ + + def __init__(self): + super().__init__() + self.extract_facet = dspy.Predict(ChunkFacetSignature) + self.merge_facets = dspy.Predict(MergeFacetsSignature) + self.synthesize = dspy.Predict(SynthesizeSummarySignature) + + def forward(self, windows: list[str], guidance: str = "") -> dspy.Prediction: + # Map: extract tiny facet per chunk + facets: list[dict[str, Any]] = [] + for i, window in enumerate(windows, 1): + result = self.extract_facet( + transcript=window, + chunk_position=f"chunk {i} of {len(windows)}", + ) + facets.append({ + "chunk": i, + "facet": str(result.facet), + "tags": list(result.tags) if isinstance(result.tags, list) else [], + }) + + if not facets: + raise RuntimeError("map_phase_produced_no_facets") + + # Reduce: tree merge if needed, then synthesize + formatted = _format_facets(facets) + result = self.synthesize(ordered_facets=formatted) + return dspy.Prediction(summary_payload=result.summary_payload) + + +def _format_facets(facets: list[dict[str, Any]]) -> str: + """Format facets as readable text for synthesis/merge input.""" + parts: list[str] = [] + for f in facets: + parts.append(f"Chunk {f['chunk']}:\n{f['facet']}") + return "\n\n".join(parts) + + +def _extract_one_facet( + wi: int, + total: int, + window: str, +) -> dict[str, Any] | None: + """Extract a lightweight facet from one chunk (thread-safe). + + Returns None if extraction fails (chunk is skipped in reduce phase). + """ + lms = configure_dspy_lms("summarize") + extractor = dspy.Predict(ChunkFacetSignature) + history_start = len(lms[0].history) + w_start = time.time() + try: + _, result = call_with_fallback( + extractor, lms, + transcript=window, + chunk_position=f"chunk {wi} of {total}", + ) + logger.info(" Map {}/{}: done ({:.1f}s)", wi, total, time.time() - w_start) + capture_dspy_cost(lms[0], history_start) + return { + "chunk": wi, + "facet": str(result.facet), + "tags": list(result.tags) if isinstance(result.tags, list) else [], + } + except Exception: + logger.warning(" Map {}/{}: failed ({:.1f}s), skipping", wi, total, time.time() - w_start) + capture_dspy_cost(lms[0], history_start) + return None + + +def _merge_facet_batch( + facets: list[dict[str, Any]], + batch_idx: int, + level: int, +) -> dict[str, Any] | None: + """Merge a batch of facets into one condensed facet (thread-safe).""" + lms = configure_dspy_lms("summarize") + merger = dspy.Predict(MergeFacetsSignature) + history_start = len(lms[0].history) + w_start = time.time() + chunk_range = f"chunks {facets[0]['chunk']}-{facets[-1]['chunk']}" + try: + _, result = call_with_fallback( + merger, lms, + ordered_facets=_format_facets(facets), + chunk_range=chunk_range, + ) + all_tags: list[str] = [] + for f in facets: + all_tags.extend(f.get("tags", [])) + merged_tags = list(result.tags) if isinstance(result.tags, list) else all_tags + logger.info(" Reduce L{} group {}: done ({:.1f}s)", level, batch_idx + 1, time.time() - w_start) + capture_dspy_cost(lms[0], history_start) + return { + "chunk": facets[0]["chunk"], + "facet": str(result.merged_facet), + "tags": sorted(set(merged_tags)), + } + except Exception: + logger.warning(" Reduce L{} group {}: failed ({:.1f}s)", level, batch_idx + 1, time.time() - w_start) + capture_dspy_cost(lms[0], history_start) + return None + + +def _map_and_reduce( + windows: list[str], + guidance: str, + max_workers: int, + facet_context_budget: int = 80000, + batch_size: int = 10, +) -> TraceSummaryCandidate: + """Parallel map (tiny facets) → optional tree reduce → final synthesis. + + Map phase: Extract lightweight facets from each chunk in parallel (~80 words each). + Reduce phase: If all facets fit in context budget, single synthesis call. + Otherwise, tree-merge facets down until they fit, then synthesize. + """ + total = len(windows) + + # ── Map phase: parallel facet extraction ── + logger.info("Summarization: {} windows (map-reduce), {} workers", total, max_workers) + facet_slots: list[dict[str, Any] | None] = [None] * total + + if max_workers > 1 and total > 1: + effective_workers = min(max_workers, total) + with ThreadPoolExecutor(max_workers=effective_workers) as pool: + futures = { + pool.submit(_extract_one_facet, wi, total, window): wi + for wi, window in enumerate(windows, 1) + } + for future in as_completed(futures): + wi = futures[future] + facet_slots[wi - 1] = future.result() + else: + for wi, window in enumerate(windows, 1): + facet_slots[wi - 1] = _extract_one_facet(wi, total, window) + + # Filter out failed chunks, preserving order + facets = [f for f in facet_slots if f is not None] + if not facets: + raise RuntimeError("map_phase_produced_no_facets") + logger.info("Map phase: {}/{} facets extracted", len(facets), total) + + # ── Reduce phase: tree merge if facets exceed context budget ── + formatted = _format_facets(facets) + est_tokens = estimate_tokens(formatted) + level = 0 + + while est_tokens > facet_context_budget and len(facets) > 1: + logger.info("Reduce level {}: {} facets (~{} tokens, budget {})", level, len(facets), est_tokens, facet_context_budget) + batches: list[list[dict[str, Any]]] = [] + for i in range(0, len(facets), batch_size): + batches.append(facets[i : i + batch_size]) + + next_level: list[dict[str, Any] | None] = [None] * len(batches) + multi_batches = [(idx, b) for idx, b in enumerate(batches) if len(b) > 1] + single_batches = [(idx, b[0]) for idx, b in enumerate(batches) if len(b) == 1] + + for idx, facet in single_batches: + next_level[idx] = facet + + if max_workers > 1 and len(multi_batches) > 1: + effective_workers = min(max_workers, len(multi_batches)) + with ThreadPoolExecutor(max_workers=effective_workers) as pool: + merge_futures = { + pool.submit(_merge_facet_batch, batch, batch_idx, level): orig_idx + for batch_idx, (orig_idx, batch) in enumerate(multi_batches) + } + for future in as_completed(merge_futures): + orig_idx = merge_futures[future] + next_level[orig_idx] = future.result() + else: + for batch_idx, (orig_idx, batch) in enumerate(multi_batches): + next_level[orig_idx] = _merge_facet_batch(batch, batch_idx, level) + + facets = [f for f in next_level if f is not None] + if not facets: + raise RuntimeError(f"reduce_level_{level}_produced_no_facets") + formatted = _format_facets(facets) + est_tokens = estimate_tokens(formatted) + level += 1 + + # ── Synthesis: produce final TraceSummaryCandidate from facets ── + logger.info("Synthesis: {} facets (~{} tokens)", len(facets), est_tokens) + lms = configure_dspy_lms("summarize") + synthesizer = dspy.Predict(SynthesizeSummarySignature) + history_start = len(lms[0].history) + w_start = time.time() + _, result = call_with_fallback( + synthesizer, lms, ordered_facets=formatted, + ) + logger.info("Synthesis: done ({:.1f}s)", time.time() - w_start) + capture_dspy_cost(lms[0], history_start) + + payload = result.summary_payload + if isinstance(payload, TraceSummaryCandidate): + return payload + if isinstance(payload, dict): + return TraceSummaryCandidate.model_validate(payload) + raise RuntimeError("synthesis_produced_invalid_payload") def _summarize_trace( @@ -118,6 +384,13 @@ def _summarize_trace( """ if not transcript.strip(): raise RuntimeError("session_trace_empty") + # Pre-process: convert agent JSONL to clean conversation format. + # Strips tool outputs, metadata noise, adds [USER]/[ASSISTANT] labels. + # Typically reduces trace size by 10-12x (e.g. 13MB → 1.1MB). + if "\n{" in transcript: + formatted = _format_transcript_for_extraction(transcript) + if formatted.strip() and formatted != transcript: + transcript = formatted config = get_config() max_window_tokens = config.summarize_role.max_window_tokens overlap_tokens = config.summarize_role.window_overlap_tokens @@ -139,51 +412,24 @@ def _summarize_trace( ) logger.info("Summarization: done ({:.1f}s)", time.time() - w_start) else: - # Slow path: refine/fold — process chunks sequentially + # Parallel path: MapReduce with tree reduction if "\n{" in transcript: windows = window_transcript_jsonl( transcript, max_window_tokens, overlap_tokens ) else: windows = window_transcript(transcript, max_window_tokens, overlap_tokens) - logger.info( - "Summarization: {} windows (refine/fold), {} est. tokens", - len(windows), - trace_tokens, - ) - # First chunk: full summarization + max_workers = config.summarize_role.max_workers w_start = time.time() - summarizer = dspy.Predict(TraceSummarySignature) - _, result = call_with_fallback( - summarizer, lms, transcript=windows[0], guidance=guid - ) - logger.info(" Chunk 1/{}: done ({:.1f}s)", len(windows), time.time() - w_start) - - # Subsequent chunks: refine with running summary - refiner = dspy.Predict(RefineSummarySignature) - for i, window in enumerate(windows[1:], 2): - running = getattr(result, "summary_payload", None) - if isinstance(running, TraceSummaryCandidate): - running_json = running.model_dump_json() - elif isinstance(running, dict): - running_json = json.dumps(running) - else: - running_json = str(running) - w_start = time.time() - _, result = call_with_fallback( - refiner, - lms, - running_summary=running_json, - trace_chunk=window, - chunk_position=f"chunk {i} of {len(windows)}", - ) - logger.info( - " Chunk {}/{}: done ({:.1f}s)", - i, - len(windows), - time.time() - w_start, - ) + candidate = _map_and_reduce(windows, guid, max_workers) + logger.info("Summarization: done ({:.1f}s, {} windows)", time.time() - w_start, len(windows)) + + # _map_and_reduce returns TraceSummaryCandidate directly — wrap for + # the unified payload extraction below. + class _Result: + summary_payload = candidate + result = _Result() capture_dspy_cost(lms[0], history_start) From 98e7d7a6ecd084af70dcd50d2d75d30abe45210e Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Mon, 30 Mar 2026 22:49:04 +0300 Subject: [PATCH 10/12] feat(activity): ship memory actions with full metadata to cloud Add memory_actions to OperationResult and details_json so the activity feed can show per-session memory lists with titles, body, tags, confidence, source_speaker, and durability. Each memory action includes session_run_id for per-session grouping. The daemon reads frontmatter from written memory files to extract full metadata. Fixes the "0 memories" bug in the activity feed. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/app/daemon.py | 41 ++++++++++++++++++++++++++++--- src/lerim/app/operation_result.py | 1 + 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/lerim/app/daemon.py b/src/lerim/app/daemon.py index 863b08c..127cd03 100644 --- a/src/lerim/app/daemon.py +++ b/src/lerim/app/daemon.py @@ -340,31 +340,61 @@ def _process_one_job(job: dict[str, Any]) -> dict[str, Any]: ) return {"status": "failed"} counts = result.get("counts") or {} + # Extract memory actions with full metadata for activity feed + memory_actions: list[dict] = [] + for path in result.get("written_memory_paths") or []: + p = Path(path) + fname = p.stem if path else "" + title = fname[9:].replace("-", " ") if len(fname) > 9 and fname[8] == "-" else fname.replace("-", " ") + primitive = "decision" if "/decisions/" in str(path) else "learning" + ma: dict = { + "action": "add", + "title": title, + "primitive": primitive, + "session_run_id": rid, + } + # Read frontmatter for full metadata + if p.exists(): + try: + import frontmatter + post = frontmatter.load(str(p)) + ma["title"] = post.metadata.get("title", title) + ma["body"] = post.content.strip() + ma["confidence"] = float(post.metadata.get("confidence", 0)) + ma["tags"] = post.metadata.get("tags", []) + ma["source_speaker"] = post.metadata.get("source_speaker", "") + ma["durability"] = post.metadata.get("durability", "") + ma["kind"] = post.metadata.get("kind", "") + except Exception: + pass + memory_actions.append(ma) complete_session_job(rid) return { "status": "extracted", "learnings_new": int(counts.get("add") or 0), "learnings_updated": int(counts.get("update") or 0), + "memory_actions": memory_actions, "cost_usd": float(result.get("cost_usd") or 0), } def _process_claimed_jobs( claimed: list[dict[str, Any]], -) -> tuple[int, int, int, int, int, float]: +) -> tuple[int, int, int, int, int, list[dict[str, str]], float]: """Process claimed jobs sequentially in chronological order. Jobs are already sorted oldest-first by ``claim_session_jobs``. Sequential processing ensures that later sessions can correctly update or supersede memories created by earlier ones. - Returns (extracted, failed, skipped, new, updated, cost_usd). + Returns (extracted, failed, skipped, new, updated, memory_actions, cost_usd). """ extracted = 0 failed = 0 skipped = 0 learnings_new = 0 learnings_updated = 0 + memory_actions: list[dict[str, str]] = [] cost_usd = 0.0 for job in claimed: result = _process_one_job(job) @@ -372,12 +402,13 @@ def _process_claimed_jobs( extracted += 1 learnings_new += result.get("learnings_new", 0) learnings_updated += result.get("learnings_updated", 0) + memory_actions.extend(result.get("memory_actions", [])) cost_usd += result.get("cost_usd", 0.0) elif result["status"] == "failed": failed += 1 elif result["status"] == "skipped": skipped += 1 - return extracted, failed, skipped, learnings_new, learnings_updated, cost_usd + return extracted, failed, skipped, learnings_new, learnings_updated, memory_actions, cost_usd def run_sync_once( @@ -495,6 +526,7 @@ def run_sync_once( failed = 0 learnings_new = 0 learnings_updated = 0 + all_memory_actions: list[dict[str, str]] = [] cost_usd = 0.0 projects: set[str] = set() claim_limit = max(max_sessions, 1) @@ -526,6 +558,7 @@ def run_sync_once( batch_skipped, batch_new, batch_updated, + batch_actions, batch_cost, ) = _process_claimed_jobs(claimed) extracted += batch_extracted @@ -533,6 +566,7 @@ def run_sync_once( skipped += batch_skipped learnings_new += batch_new learnings_updated += batch_updated + all_memory_actions.extend(batch_actions) cost_usd += batch_cost total_processed += len(claimed) @@ -568,6 +602,7 @@ def run_sync_once( failed_sessions=failed, learnings_new=learnings_new, learnings_updated=learnings_updated, + memory_actions=all_memory_actions, run_ids=target_run_ids, window_start=window_start.isoformat() if window_start else None, window_end=window_end.isoformat() if window_end else None, diff --git a/src/lerim/app/operation_result.py b/src/lerim/app/operation_result.py index d6ad979..d15e43c 100644 --- a/src/lerim/app/operation_result.py +++ b/src/lerim/app/operation_result.py @@ -26,6 +26,7 @@ class OperationResult: failed_sessions: int = 0 learnings_new: int = 0 learnings_updated: int = 0 + memory_actions: list[dict[str, str]] = field(default_factory=list) run_ids: list[str] = field(default_factory=list) window_start: str | None = None window_end: str | None = None From 005644297f01d40941d2283eb58b32f9eb6808b3 Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Tue, 31 Mar 2026 07:29:49 +0300 Subject: [PATCH 11/12] refactor(extract): enhance MemoryExtractSignature and similarity handling - Updated MemoryExtractSignature to clarify extraction criteria, emphasizing the importance of actionable insights and structured body content. - Improved similarity handling in MemoryIndex and OAI tools by merging similarity signals and normalizing outputs for better candidate ranking. - Adjusted examples in the documentation to reflect new extraction rules and quality criteria. --- src/lerim/memory/extract_pipeline.py | 133 ++++++++++++++++++--------- src/lerim/memory/memory_index.py | 17 ++-- src/lerim/runtime/oai_tools.py | 7 +- 3 files changed, 104 insertions(+), 53 deletions(-) diff --git a/src/lerim/memory/extract_pipeline.py b/src/lerim/memory/extract_pipeline.py index be7a5ed..b85a832 100644 --- a/src/lerim/memory/extract_pipeline.py +++ b/src/lerim/memory/extract_pipeline.py @@ -45,48 +45,68 @@ class MemoryExtractSignature(dspy.Signature): """Extract reusable memory candidates from this coding-agent session transcript. + HARD GATE — ask for EVERY candidate before including it: + "If an agent read this memory at the start of a future session on this project, + would it CONCRETELY change a decision the agent makes?" + If the answer is "no" or "probably not" — do NOT extract it. + THE FUTURE-SELF TEST: Only extract items that would help the user or their coding agent in a FUTURE session. If the information is obtainable by reading the codebase, git log, or documentation, do NOT extract it. DO NOT EXTRACT: - - Facts the assistant learned by READING code, configs, or docs (these are code-derivable). - Example: "The default port is 8765" or "The pipeline uses DSPy Predict" — just reporting what exists. - - Generic industry knowledge or benchmarks the user did NOT specifically request - - Ephemeral task details (specific slide edits, line-number fixes, PR comments, TODO items) - - Items where the body merely restates the title - - Version-specific changelogs or release notes (git log has these) - - Raw web search results that were not synthesized into a conclusion or decision + - Facts the assistant learned by READING code, configs, or docs (code-derivable). + - Implementation details that will be visible in the codebase once committed + (config values, timeout numbers, CLI commands, tool settings, hook args). + - Generic programming knowledge any experienced developer already knows + (debounce inputs, use pre-push hooks, cache API responses). + - Ephemeral task details (line-number fixes, PR comments, TODO items). + - Items where the body merely restates the title without adding WHY or HOW. + - Version-specific changelogs or release notes (git log has these). + - Raw web search results that were not synthesized into a conclusion. + - Observations that are self-evident or tautological ("keep high-value items", + "archive low-value items", "merge duplicates into one"). + - Architecture or workflow descriptions ("the pipeline has 6 steps", + "extraction runs via DSPy") — these describe WHAT exists, not WHY. + - Specific numbers that will change soon (eval scores, trace counts, + timeout values, weight coefficients) unless the RATIONALE is the point. EXTRACT (high-value items only): - Decisions: choices about how to build, structure, or design things — by the user OR by the agent during implementation. If the agent chose an approach and the user didn't object, that - is a team decision worth remembering. Includes strategic, product, and business decisions, - not just code decisions. + is a team decision worth remembering. Includes strategic, product, and business decisions. - Preferences: coding style, tool preferences, workflow habits (usually user-originated) - Hard-won insights: non-obvious lessons learned through debugging or painful experience - Friction: recurring blockers, time-wasters, tool failures worth remembering - Pitfalls: mistakes to avoid that are NOT obvious from reading the code - Procedures: multi-step workarounds that would otherwise be forgotten - - Research conclusions: when the user explicitly requested research on a topic and the session - produced synthesized findings that inform project direction. Extract the conclusion, not the - raw data. + - Research conclusions: when the user explicitly requested research and the session + produced synthesized findings that inform project direction. EXAMPLES — extract vs skip: - ✓ Agent: "I'll use SQLite for the session catalog" → decision (agent CHOSE an approach) - ✓ User: "use tabs for indentation" → preference (user STATED) - ✓ Agent: "vllm-mlx crashes with concurrent Metal requests" → pitfall (DISCOVERED through debugging) - ✓ User asked to research pitch deck structure → agent synthesized: "Best decks follow problem-solution-traction-ask arc" → insight (RESEARCHED and CONCLUDED for this project) - ✓ User: "don't mock the database in tests" → feedback. Body: "WHY: mocked tests passed but prod migration failed. HOW TO APPLY: integration tests must hit real database." - ✗ Agent: "The config file has sync_interval = 10" → just REPORTED a config value - ✗ Agent: "The extraction pipeline uses DSPy Predict" → just DESCRIBED existing code - ✗ Agent: "B2B SaaS typically converts at 5-7%" → UNSOLICITED generic statistic, not tied to a user question + ✓ "Use SQLite for the session catalog" → decision (agent CHOSE an approach, WHY matters) + ✓ "use tabs for indentation" → preference (user STATED) + ✓ "vllm-mlx crashes with concurrent Metal requests" → pitfall (DISCOVERED through debugging) + ✓ "Restrictive extraction rules always backfire — 5 experiments ALL regressed" → insight (QUANTIFIED and NON-OBVIOUS) + ✓ "don't mock the database in tests" → preference. WHY: mocked tests passed but prod migration failed. + ✗ "The config file has sync_interval = 10" → REPORTED a config value (code-derivable) + ✗ "The extraction pipeline uses DSPy Predict" → DESCRIBED existing code (code-derivable) + ✗ "B2B SaaS typically converts at 5-7%" → UNSOLICITED generic statistic + ✗ "Debounce search input by 300ms" → IMPLEMENTATION DETAIL readable from code + ✗ "Pre-commit hook needs --hook-type pre-push" → TOOL CONFIG, belongs in docs + ✗ "Use timeout 2400 for eval with 327 traces" → EPHEMERAL NUMBERS that will change + ✗ "Eval formula v1 was wrong" → DEAD CODE, v1 no longer exists + ✗ "Accept empty cross-session analysis" → OBVIOUS, wouldn't change any behavior + ✗ "Merge duplicate topics into comprehensive target" → GENERIC ADVICE, title says it all + ✗ "Sync workflow processes sessions in 6 steps" → ARCHITECTURE DESCRIPTION, code has this QUALITY BAR for each candidate: + - Actionable: MUST change how an agent behaves in a future session. This is non-negotiable. - Atomic: ONE decision or learning per candidate. Don't bundle multiple items. - - Actionable: must change how an agent behaves in a future session. - Context-independent: understandable without the original conversation. - - Structured body: lead with the rule/fact, then WHY, then HOW TO APPLY. + - Structured body: the body must add information NOT present in the title. + Lead with the rule/fact, then WHY, then HOW TO APPLY. + Target: 2-4 sentences. The reader is an expert programmer — focus on the non-obvious WHY. - Durable: still relevant weeks or months later, not tied to a specific moment. Kind (for learnings only): @@ -102,10 +122,21 @@ class MemoryExtractSignature(dspy.Signature): - Those subtype names belong in the kind field only when primitive is "learning". Confidence calibration: - - 0.9+ = explicitly stated or decided (by user or agent) - - 0.7-0.8 = strongly implied by behavior or accepted without objection - - 0.5-0.6 = inferred, uncertain - - Below 0.5 = do not extract + - 0.9+ = the user or agent EXPLICITLY stated this verbatim. + Only for direct quotes like "I decided X" or "always use Y". + - 0.75-0.85 = strongly implied by consistent behavior across multiple turns, + or agent chose and user accepted without objection. + - 0.55-0.70 = inferred from a single turn. Reasonable interpretation but unconfirmed. + - 0.3-0.5 = weak signal. Only extract if the topic is highly unusual or novel. + - Below 0.3 = do not extract. + SELF-CHECK: If you assigned 0.8+ to more than half your candidates, re-examine + each and ask: "Did the user literally say this, or am I interpreting?" + + Durability calibration: + - permanent: user preferences, identity, and convictions (about the person) + - project: architecture decisions with rationale NOT in code (the WHY behind a choice) + - session: specific numbers, eval results, current scores, version-specific bugs, + tool config values, CLI commands (things about THIS moment) Prefer precision over recall. Fewer high-quality items beat many weak ones. If a session has no durable memories, return an empty list: []. @@ -153,8 +184,8 @@ def _filter_candidates(candidates: list[dict[str, Any]]) -> list[dict[str, Any]] confidence = item.get("confidence") durability = str(item.get("durability") or "project") - # Gate 1: Drop low-confidence (< 0.5) - if isinstance(confidence, (int, float)) and confidence < 0.5: + # Gate 1: Drop low-confidence (< 0.3) + if isinstance(confidence, (int, float)) and confidence < 0.3: continue # Gate 2: Title too short (< 10 chars) if len(title) < 10: @@ -176,6 +207,14 @@ def _filter_candidates(candidates: list[dict[str, Any]]) -> list[dict[str, Any]] }: continue + # Normalize tags: lowercase, hyphenated, deduplicated. + tags = item.get("tags", []) + if isinstance(tags, list): + item["tags"] = sorted(set( + t.strip().lower().replace(" ", "-") + for t in tags if t and t.strip() + )) + filtered.append(item) return filtered @@ -210,22 +249,30 @@ class ConsolidateCandidatesSignature(dspy.Signature): class QualityGateSignature(dspy.Signature): """Filter memory candidates to keep only high-quality, durable items worth persisting. - Score each candidate against these criteria: + HARD GATES (fail ANY one → DROP immediately): + - NOT actionable: would not concretely change how an agent behaves in a future session. + Ask: "What would an agent do DIFFERENTLY after reading this?" If no clear answer → DROP. + - Code-derivable: the information exists in the codebase, git log, docs, or config files. + - Generic knowledge: any experienced programmer already knows this. + - Self-evident / tautological: the observation is obvious and the body just restates the title. + + SOFT CRITERIA (fail 2+ → DROP): 1. Atomic: covers ONE decision or learning, not bundled - 2. Actionable: would change how an agent behaves in a future session - 3. Context-independent: understandable without the original conversation - 4. Structured body: rule/fact → WHY → HOW TO APPLY - 5. Durable: still relevant weeks or months later - - DROP a candidate if it: - - Fails 2+ criteria above - - Contains generic knowledge any LLM already knows - - Is code-derivable (could be learned by reading the codebase or git log) - - Bundles multiple unrelated decisions/learnings - - Is ephemeral (specific line numbers, PR comments, TODO items) - - KEEP a candidate that passes the future-self test: "Would this genuinely help - the user or their coding agent in a future session on this project?" + 2. Context-independent: understandable without the original conversation + 3. Structured body: adds WHY and HOW beyond what the title says + 4. Durable: still relevant weeks or months later, not tied to specific numbers or versions + 5. Information-dense: body adds substance, not just rephrasing the title + + DROP examples: + - "Debounce search input by 300ms" → code-derivable config value + - "Accept empty cross-session analysis" → self-evident, changes nothing + - "Sync workflow has 6 steps" → architecture description, read the code + - "Use timeout 2400 for 327 traces" → ephemeral numbers, will change + - "Merge duplicates into comprehensive target" → generic advice, obvious + + KEEP examples: + - "Restrictive extraction rules always backfire" → non-obvious, quantified, changes approach + - "Replace app-level sandboxing with Docker kernel isolation" → architecture WHY not in code Do NOT rewrite or modify candidates. Return accepted candidates exactly as received. This is a filter, not a rewriter. diff --git a/src/lerim/memory/memory_index.py b/src/lerim/memory/memory_index.py index 6529d12..3c73591 100644 --- a/src/lerim/memory/memory_index.py +++ b/src/lerim/memory/memory_index.py @@ -627,7 +627,12 @@ def find_similar( row = dict(r) existing_text = f"{row.get('title', '')}\n{row.get('tags', '')}\n{row.get('body', '')}" row["lexical_similarity"] = _token_overlap_similarity(candidate_text, existing_text) - data.setdefault(mid, row) + if mid in data: + # Merge vector cosine similarity into the existing FTS row + # so dual-hit results preserve both signals. + data[mid]["similarity"] = row.get("similarity") + else: + data[mid] = row # Sort by fused score descending, return top-limit. ranked = sorted(rrf_scores, key=lambda mid: rrf_scores[mid], reverse=True)[:limit] @@ -635,11 +640,11 @@ def find_similar( for mid in ranked: row = dict(data[mid]) row["fused_score"] = round(rrf_scores[mid], 6) - row["similarity"] = _normalize_similarity( - row.get("similarity") - or row.get("lexical_similarity") - or 0.0 - ) + # Use the strongest similarity signal available. + row["similarity"] = _normalize_similarity(max( + float(row.get("similarity") or 0.0), + float(row.get("lexical_similarity") or 0.0), + )) results.append(row) return results diff --git a/src/lerim/runtime/oai_tools.py b/src/lerim/runtime/oai_tools.py index 6508911..7e77004 100644 --- a/src/lerim/runtime/oai_tools.py +++ b/src/lerim/runtime/oai_tools.py @@ -704,10 +704,9 @@ def _batch_dedup_candidates_impl( top_similarity = 0.0 if similar: top = similar[0] - top_similarity = float( - top.get("similarity") - or top.get("lexical_similarity") - or 0.0 + top_similarity = max( + float(top.get("similarity") or 0.0), + float(top.get("lexical_similarity") or 0.0), ) enriched.append({ "candidate": c, From 632aca1c679df4e905fcd19814584e4518ac4f6f Mon Sep 17 00:00:00 2001 From: Isaac Kargar Date: Tue, 31 Mar 2026 09:10:04 +0300 Subject: [PATCH 12/12] refactor(extract): tighten quality gates, dedup thresholds, and topic saturation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add bug-report, directive/TODO, and generic-knowledge exclusion rules to MemoryExtractSignature - Add decision-vs-learning test and cap 0.9+ confidence to max 1 per session - Require HOW TO APPLY to describe a different action than title (no restating) - Lower dedup similarity thresholds (0.75→0.65 for no_op, 0.45→0.40 for update) - Add topic saturation rule: 2+ existing memories on same topic defaults to no_op - Tighten "update" classification to require at least one concrete absent fact Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lerim/memory/extract_pipeline.py | 65 +++++++++++++++++++++------ src/lerim/runtime/oai_tools.py | 8 ++-- src/lerim/runtime/prompts/oai_sync.py | 16 ++++--- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/src/lerim/memory/extract_pipeline.py b/src/lerim/memory/extract_pipeline.py index b85a832..4b8bbc5 100644 --- a/src/lerim/memory/extract_pipeline.py +++ b/src/lerim/memory/extract_pipeline.py @@ -58,18 +58,25 @@ class MemoryExtractSignature(dspy.Signature): - Facts the assistant learned by READING code, configs, or docs (code-derivable). - Implementation details that will be visible in the codebase once committed (config values, timeout numbers, CLI commands, tool settings, hook args). - - Generic programming knowledge any experienced developer already knows - (debounce inputs, use pre-push hooks, cache API responses). - - Ephemeral task details (line-number fixes, PR comments, TODO items). + - Generic programming practices any senior developer with 5+ years already knows: + debouncing, caching, pagination, client-side vs server-side filtering, URL state + management, REST conventions, "keep it simple". If a senior dev wouldn't need + to be told this → skip. + - Bug reports: specific defects, crashes, or error conditions that will be fixed in code. + A bug is NOT friction. Friction persists structurally; a bug gets a fix. + Test: "After this bug is fixed, is this memory still useful?" If no → skip. + - Directives, tasks, or TODO items: "run X", "do Y next", "we should Z later". + A decision records WHY something was chosen; a directive records WHAT to do next. + Test: "Does this explain a choice with rationale, or just say what to do?" If latter → skip. - Items where the body merely restates the title without adding WHY or HOW. - - Version-specific changelogs or release notes (git log has these). - - Raw web search results that were not synthesized into a conclusion. - Observations that are self-evident or tautological ("keep high-value items", "archive low-value items", "merge duplicates into one"). - Architecture or workflow descriptions ("the pipeline has 6 steps", "extraction runs via DSPy") — these describe WHAT exists, not WHY. - Specific numbers that will change soon (eval scores, trace counts, timeout values, weight coefficients) unless the RATIONALE is the point. + - Version-specific changelogs or release notes (git log has these). + - Raw web search results that were not synthesized into a conclusion. EXTRACT (high-value items only): - Decisions: choices about how to build, structure, or design things — by the user OR by the @@ -99,16 +106,31 @@ class MemoryExtractSignature(dspy.Signature): ✗ "Accept empty cross-session analysis" → OBVIOUS, wouldn't change any behavior ✗ "Merge duplicate topics into comprehensive target" → GENERIC ADVICE, title says it all ✗ "Sync workflow processes sessions in 6 steps" → ARCHITECTURE DESCRIPTION, code has this + ✗ "Runner times out at 900 seconds" → BUG REPORT, will be fixed in code + ✗ "Scorer returns 0.0 for valid inputs" → BUG REPORT, one-time defect + ✗ "Run 10 hours of optimization rounds" → DIRECTIVE/TODO, not a decision or learning + ✗ "Use client-side filtering for small datasets" → GENERIC, any senior dev knows this + ✗ "Store filter state in URL search params" → GENERIC, standard web pattern + ✗ "Use simple solutions by default" → GENERIC, truism with no project specificity + ✓ "LLMs over-constrain with restrictive rules — 5 experiments all regressed" → FRICTION, structural and recurring QUALITY BAR for each candidate: - Actionable: MUST change how an agent behaves in a future session. This is non-negotiable. - Atomic: ONE decision or learning per candidate. Don't bundle multiple items. - Context-independent: understandable without the original conversation. - Structured body: the body must add information NOT present in the title. - Lead with the rule/fact, then WHY, then HOW TO APPLY. - Target: 2-4 sentences. The reader is an expert programmer — focus on the non-obvious WHY. + Lead with the rule/fact, then WHY (what would go wrong otherwise?), + then HOW TO APPLY (a concrete action for future sessions). + HOW TO APPLY must describe a DIFFERENT action than the title statement. + If it would just restate the title, omit HOW TO APPLY entirely. + Target: 2-4 sentences. Focus on the non-obvious WHY. - Durable: still relevant weeks or months later, not tied to a specific moment. + DECISION vs LEARNING test: + - Decision: "We chose X because Y." Someone MADE A CHOICE between alternatives. + - Learning: "We discovered/observed X." Someone LEARNED SOMETHING. + Test: "Was there a moment of choosing?" → decision. "A moment of discovering?" → learning. + Kind (for learnings only): - insight: a reusable observation or pattern - procedure: a step-by-step fix or workflow @@ -122,15 +144,18 @@ class MemoryExtractSignature(dspy.Signature): - Those subtype names belong in the kind field only when primitive is "learning". Confidence calibration: - - 0.9+ = the user or agent EXPLICITLY stated this verbatim. - Only for direct quotes like "I decided X" or "always use Y". - - 0.75-0.85 = strongly implied by consistent behavior across multiple turns, + - 0.9+ = the user or agent EXPLICITLY stated this verbatim using clear language + like "I decided", "always use", "never do". Direct quotes only. + MAXIMUM 1 candidate per session at 0.9+. If you have more, demote all but the strongest. + - 0.75-0.85 = strongly implied by consistent behavior across MULTIPLE turns, or agent chose and user accepted without objection. - 0.55-0.70 = inferred from a single turn. Reasonable interpretation but unconfirmed. - - 0.3-0.5 = weak signal. Only extract if the topic is highly unusual or novel. + - 0.3-0.5 = weak signal. Only extract if highly unusual or novel. - Below 0.3 = do not extract. - SELF-CHECK: If you assigned 0.8+ to more than half your candidates, re-examine - each and ask: "Did the user literally say this, or am I interpreting?" + SELF-CHECK: After generating candidates, re-read each and ask: + 1. "Would a senior dev with 5+ years need to be told this?" If no → remove it entirely. + 2. "Is this specific to THIS project or general engineering?" If general → remove. + 3. If more than 50% of candidates are 0.8+ confidence, demote each by 0.10. Durability calibration: - permanent: user preferences, identity, and convictions (about the person) @@ -253,8 +278,14 @@ class QualityGateSignature(dspy.Signature): - NOT actionable: would not concretely change how an agent behaves in a future session. Ask: "What would an agent do DIFFERENTLY after reading this?" If no clear answer → DROP. - Code-derivable: the information exists in the codebase, git log, docs, or config files. - - Generic knowledge: any experienced programmer already knows this. + - Generic knowledge: any senior developer with 5+ years already knows this. + MUST drop: general UX patterns (debouncing, pagination, caching), basic architecture + choices (client vs server filtering), standard web patterns (URL state), truisms + ("keep it simple", "prefer simple solutions"). - Self-evident / tautological: the observation is obvious and the body just restates the title. + - Bug report: describes a specific defect, crash, or error that will be fixed. + Test: "After someone fixes this, is the memory still useful?" If no → DROP. + - Directive / TODO: tells the agent what to do next rather than capturing a decision or learning. SOFT CRITERIA (fail 2+ → DROP): 1. Atomic: covers ONE decision or learning, not bundled @@ -269,10 +300,16 @@ class QualityGateSignature(dspy.Signature): - "Sync workflow has 6 steps" → architecture description, read the code - "Use timeout 2400 for 327 traces" → ephemeral numbers, will change - "Merge duplicates into comprehensive target" → generic advice, obvious + - "Runner times out at 900s" → bug report, will be fixed + - "Scorer returns 0.0" → bug report, one-time defect + - "Use client-side filtering for small datasets" → generic, any dev knows this + - "Use simple solutions by default" → generic truism + - "Run 10 hours of optimization" → directive/TODO, not a decision KEEP examples: - "Restrictive extraction rules always backfire" → non-obvious, quantified, changes approach - "Replace app-level sandboxing with Docker kernel isolation" → architecture WHY not in code + - "LLMs over-constrain with restrictive rules — 5 experiments regressed" → friction, structural Do NOT rewrite or modify candidates. Return accepted candidates exactly as received. This is a filter, not a rewriter. diff --git a/src/lerim/runtime/oai_tools.py b/src/lerim/runtime/oai_tools.py index 7e77004..c2b22f2 100644 --- a/src/lerim/runtime/oai_tools.py +++ b/src/lerim/runtime/oai_tools.py @@ -731,11 +731,11 @@ def batch_dedup_candidates( Interpreting top_similarity scores: - top_similarity uses normalized 0.0-1.0 similarity (prefer semantic similarity, fall back to lexical overlap when vector data is unavailable). - - 0.75+ : Very likely duplicate. Classify as "no_op" unless candidate has + - 0.65+ : Very likely duplicate. Classify as "no_op" unless candidate has clearly distinct information not present in the existing memory. - - 0.45-0.75 : Related topic. Read both carefully. Classify as "update" if - candidate adds new facts, "no_op" if it's just rephrasing. - - Below 0.45 : Likely a new topic. Classify as "add". + - 0.40-0.65 : Related topic. Read both carefully. Classify as "update" if + candidate adds genuinely new facts, "no_op" if it's just rephrasing. + - Below 0.40 : Likely a new topic. Classify as "add". - 0.0 : No existing memories at all (empty store). All candidates are "add". Returns JSON: {"count": int, "results": [{"candidate": {...}, diff --git a/src/lerim/runtime/prompts/oai_sync.py b/src/lerim/runtime/prompts/oai_sync.py index 5731a3d..254a351 100644 --- a/src/lerim/runtime/prompts/oai_sync.py +++ b/src/lerim/runtime/prompts/oai_sync.py @@ -56,10 +56,10 @@ def build_oai_sync_prompt( Classification rules (use top_similarity score from batch_dedup_candidates): - top_similarity is normalized 0.0-1.0 similarity. It prefers semantic similarity and falls back to lexical overlap when vector similarity is unavailable. - - top_similarity >= 0.75 AND the existing memory covers the same core topic → "no_op" - - top_similarity 0.45-0.75 AND same topic but candidate adds genuinely NEW information + - top_similarity >= 0.65 AND the existing memory covers the same core topic → "no_op" + - top_similarity 0.40-0.65 AND same topic but candidate adds genuinely NEW information not present in the existing memory → "update" - - top_similarity < 0.45 OR no relevant match at all → "add" + - top_similarity < 0.40 OR no relevant match at all → "add" - top_similarity == 0.0 (no existing memories) → always "add" IMPORTANT DEDUP RULES: @@ -67,8 +67,14 @@ def build_oai_sync_prompt( - Before classifying as "add", you MUST name the closest existing memory and explain specifically what new information the candidate contributes that the existing memory does NOT already contain. - - Before classifying as "update", verify the candidate contains concrete details - that are absent from the existing memory (not just rephrasing). + - Before classifying as "update", verify the candidate contains at least ONE concrete + fact (a specific tool name, error message, workaround, or rationale) that is + completely ABSENT from the existing memory. Rephrasing the same insight from a + different angle is NOT new information — classify as "no_op" instead. + - TOPIC SATURATION: If batch_dedup shows 2+ existing memories with similarity > 0.40 + on the same topic, the topic is already well-covered. Default to "no_op" unless + the candidate contains information that CONTRADICTS or SIGNIFICANTLY extends ALL + existing memories on that topic. For "add": call write_memory() with all fields. For "update": call write_memory() with the SAME title as the existing memory,