Skip to content

Commit e880dfc

Browse files
RafaelPogithub-actions[bot]
authored andcommitted
fix: deduplicate batched agent progress summaries (#5049)
This is only needed temporarily until we have Callum's researcher-first view. ## Summary - Batched agents handle K rows per trace, causing the Engine to return K identical summaries with different `row_index` values - Added `dedupe_summaries()` in the MCP layer that collapses these into one entry per unique text with a `row_indices` list - Applied in both `_fetch_summaries` (tools.py) and `_fetch_summaries_rest` (routes.py), covering all consumers: `futuresearch_progress`, `futuresearch_poll`, and the widget HTTP polling endpoint ## Test plan - [x] `uv run pytest tests/ -x` — 351 passed - [ ] Run a rank task with 5+ rows, verify progress summaries show `[Rows 0, 1, 2]` grouping instead of 3 identical lines 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Sourced from commit f27dd37c9dc9084b0ab51e336de7486b72d7223f
1 parent dd95c46 commit e880dfc

4 files changed

Lines changed: 67 additions & 24 deletions

File tree

futuresearch-mcp/src/futuresearch_mcp/routes.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
from futuresearch_mcp import redis_store
1919
from futuresearch_mcp.config import settings
2020
from futuresearch_mcp.result_store import _sanitize_records
21-
from futuresearch_mcp.tool_helpers import _UI_EXCLUDE, TaskState, _fetch_task_result
21+
from futuresearch_mcp.tool_helpers import (
22+
_UI_EXCLUDE,
23+
TaskState,
24+
_fetch_task_result,
25+
dedupe_summaries,
26+
)
2227

2328
logger = logging.getLogger(__name__)
2429

@@ -97,7 +102,10 @@ async def _fetch_summaries_rest(
97102
)
98103
if resp.status_code == 200:
99104
data = resp.json()
100-
return data.get("summaries") or None, data.get("cursor") or cursor
105+
raw = data.get("summaries") or None
106+
if raw:
107+
raw = dedupe_summaries(raw)
108+
return raw, data.get("cursor") or cursor
101109
except Exception:
102110
logger.debug("Failed to fetch summaries for task %s via REST", task_id)
103111
return None, cursor
@@ -123,9 +131,12 @@ async def _fetch_aggregate_rest(
123131
)
124132
if resp.status_code == 200:
125133
data = resp.json()
134+
micros = data.get("micro_summaries") or None
135+
if micros:
136+
micros = dedupe_summaries(micros)
126137
return (
127138
data.get("aggregate") or None,
128-
data.get("micro_summaries") or None,
139+
micros,
129140
data.get("cursor") or cursor,
130141
)
131142
except Exception:

futuresearch-mcp/src/futuresearch_mcp/templates.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@
292292
if(d.aggregate_summary){
293293
const now=new Date();
294294
const ts=now.toLocaleTimeString([],{hour:"2-digit",minute:"2-digit",second:"2-digit"});
295-
const micros=(d.summaries||[]).map(s=>({text:s.summary||String(s),row_index:s.row_index}));
295+
const micros=(d.summaries||[]).map(s=>({text:s.summary||String(s),row_indices:s.row_indices||null,row_index:s.row_index}));
296296
/* only add if aggregate text is new */
297297
const lastAgg=aggHistory.length?aggHistory[aggHistory.length-1].aggregate:"";
298298
if(d.aggregate_summary!==lastAgg){
@@ -303,7 +303,7 @@
303303
/* fallback: no aggregate, just micro-summaries — create a placeholder entry */
304304
const now=new Date();
305305
const ts=now.toLocaleTimeString([],{hour:"2-digit",minute:"2-digit",second:"2-digit"});
306-
const micros=d.summaries.map(s=>({text:s.summary||String(s),row_index:s.row_index}));
306+
const micros=d.summaries.map(s=>({text:s.summary||String(s),row_indices:s.row_indices||null,row_index:s.row_index}));
307307
const fallbackAgg=micros[0]?.text||"Agent activity";
308308
const lastAgg=aggHistory.length?aggHistory[aggHistory.length-1].aggregate:"";
309309
if(fallbackAgg!==lastAgg){
@@ -365,7 +365,9 @@
365365
if(hasMicros){
366366
al+=`<ul class="agg-micros">`;
367367
for(const m of a.micros){
368-
const rowLabel=m.row_index!=null?`<span class="agg-micro-row">Row ${m.row_index+1}</span>`:"";
368+
let rowLabel="";
369+
if(m.row_indices&&m.row_indices.length>1){rowLabel=`<span class="agg-micro-row">Rows ${m.row_indices.map(r=>r+1).join(", ")}</span>`;}
370+
else if(m.row_index!=null){rowLabel=`<span class="agg-micro-row">Row ${m.row_index+1}</span>`;}
369371
al+=`<li>${rowLabel}${esc(m.text)}</li>`;
370372
}
371373
al+=`</ul>`;

futuresearch-mcp/src/futuresearch_mcp/tool_helpers.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -291,33 +291,56 @@ async def create_tool_response(
291291

292292

293293
def _format_summary_lines(summaries: list[dict[str, Any]]) -> str:
294-
"""Collapse duplicate summaries from batched agents into grouped lines.
294+
"""Format summaries as text lines with row index prefixes.
295295
296-
One trace handling multiple rows produces the same text per row.
297-
Groups by text and merges row indices: ``[Rows 29, 17] Summarizing...``
296+
Accepts both raw (K duplicates from Engine) and already-deduped
297+
summaries (with ``row_indices`` lists from ``dedupe_summaries``).
298298
"""
299-
grouped: dict[str, list[int]] = {}
300-
grouped_order: list[str] = []
301-
for s in summaries:
302-
text = s["summary"]
303-
row_idx = s.get("row_index")
304-
if text not in grouped:
305-
grouped[text] = []
306-
grouped_order.append(text)
307-
if row_idx is not None:
308-
grouped[text].append(row_idx)
299+
# Dedupe only if input lacks row_indices (raw from Engine)
300+
if summaries and "row_indices" not in summaries[0]:
301+
summaries = dedupe_summaries(summaries)
309302
lines = ""
310-
for text in grouped_order:
311-
rows = grouped[text]
303+
for entry in summaries:
304+
text = entry.get("summary", "")
305+
rows = entry.get("row_indices") or []
312306
if rows:
313307
label = "Row" if len(rows) == 1 else "Rows"
314-
prefix = f"[{label} {', '.join(str(r) for r in sorted(rows))}] "
308+
prefix = f"[{label} {', '.join(str(r) for r in rows)}] "
315309
else:
316310
prefix = ""
317311
lines += f"\n- {prefix}{text}"
318312
return lines
319313

320314

315+
def dedupe_summaries(summaries: list[dict[str, Any]]) -> list[dict[str, Any]]:
316+
"""Collapse duplicate summaries from batched agents into one per unique text.
317+
318+
The Engine returns K identical summaries (one per row) when a batched
319+
agent handles K rows. This merges them into a single entry with a
320+
``row_indices`` list, preserving order.
321+
"""
322+
grouped: dict[str, dict[str, Any]] = {}
323+
order: list[str] = []
324+
for s in summaries:
325+
text = s.get("summary", "")
326+
if text not in grouped:
327+
grouped[text] = {**s, "row_indices": []}
328+
order.append(text)
329+
row_idx = s.get("row_index")
330+
if row_idx is not None:
331+
grouped[text]["row_indices"].append(row_idx)
332+
result = []
333+
for text in order:
334+
entry = grouped[text]
335+
indices = sorted(entry["row_indices"])
336+
entry["row_indices"] = indices or None
337+
entry.pop("row_index", None)
338+
if indices:
339+
entry["row_index"] = indices[0]
340+
result.append(entry)
341+
return result
342+
343+
321344
class TaskState(BaseModel):
322345
"""Parsed progress snapshot from an API status response."""
323346

futuresearch-mcp/src/futuresearch_mcp/tools.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
_get_client,
6767
_record_task_ownership,
6868
create_tool_response,
69+
dedupe_summaries,
6970
log_client_info,
7071
)
7172
from futuresearch_mcp.utils import fetch_csv_from_url, is_url, save_result_to_csv
@@ -869,7 +870,10 @@ async def _fetch_partial_rows(
869870
async def _fetch_summaries(
870871
httpx_client: Any, task_id: str, cursor: str | None
871872
) -> tuple[list[dict[str, Any]] | None, str | None]:
872-
"""Fetch progress summaries. Returns (summaries, updated_cursor)."""
873+
"""Fetch progress summaries, deduplicating batched agent copies.
874+
875+
Returns (summaries, updated_cursor).
876+
"""
873877
try:
874878
query: dict[str, Any] = {}
875879
if cursor:
@@ -881,7 +885,10 @@ async def _fetch_summaries(
881885
)
882886
if resp.status_code == 200:
883887
data = resp.json()
884-
return data.get("summaries") or None, data.get("cursor") or cursor
888+
raw = data.get("summaries") or None
889+
if raw:
890+
raw = dedupe_summaries(raw)
891+
return raw, data.get("cursor") or cursor
885892
logger.warning("summaries returned %s for task %s", resp.status_code, task_id)
886893
except Exception:
887894
logger.debug("Failed to fetch summaries for task %s", task_id)

0 commit comments

Comments
 (0)