Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions apps/memos-local-plugin/core/capture/ALGORITHMS.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ priority once reward arrives.
## V7 §3.2 batched variant — `batch-scorer.ts`

The per-step path (`reflection-synth.ts` + `alpha-scorer.ts`) issues 2N
LLM calls per N-step episode. `batch-scorer.ts` collapses them into ONE:
LLM calls per N-step episode. `batch-scorer.ts` collapses up to
`batchThreshold` steps into one call:

```
inputs = [{idx, state, action, outcome, reflection, synth_allowed}, …]
Expand All @@ -91,8 +92,8 @@ Dispatch (in `capture.ts`):
| `cfg.batchMode` | `cfg.batchThreshold` | behavior |
|-------------------|----------------------|----------|
| `per_step` | (ignored) | legacy: 2N calls |
| `per_episode` | (ignored) | always batch |
| `auto` (default) | `12` | batch when `N ≤ 12`; else per-step |
| `per_episode` | chunk size | batch when `N ≤ threshold`; else chunk-batch |
| `auto` (default) | `12` | batch when `N ≤ 12`; else chunk-batch |

The dispatcher also refuses to batch when no LLM is wired — same fallback
path as missing-LLM in per-step mode.
Expand All @@ -107,15 +108,15 @@ Failure handling:

- LLM throws / facade gives up after `malformedRetries=1` → capture
catches in `runBatchScoring`, surfaces a `{stage: "batch"}` warning,
and the per-step path runs as a fallback.
and the per-step path runs as a fallback for that chunk.
- Validator rejects on length mismatch, missing/non-numeric `alpha`,
non-boolean `usable`, non-string `reflection_text`. Same fallback.

Bookkeeping (`CaptureResult.llmCalls`):

- `batchedReflection`: 0 or 1 per episode (1 on a successful batch).
- `batchedReflection`: number of successful batch/chunk calls.
- `reflectionSynth` / `alphaScoring`: only nonzero when the per-step path
ran (either selected directly, or as fallback after a batch failure).
ran (either selected directly, or as fallback after a chunk failure).

Stable prompt fingerprint:

Expand Down
21 changes: 16 additions & 5 deletions apps/memos-local-plugin/core/capture/batch-scorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
* `transferability` axes benefit directly.
*
* Trade-offs (encoded in capture.ts dispatch):
* - Prompt grows linearly with N steps. Capped via `batchThreshold`;
* long episodes degrade to the per-step path automatically.
* - One bad output value forces a single batched retry instead of N
* isolated retries — but the facade already does `malformedRetries`
* for us, and on hard failure capture.ts falls back to per-step.
* - Prompt grows linearly with N steps. Each call is capped at
* `batchThreshold`; long episodes run as several bounded chunks.
* - One bad chunk forces a single batched retry for that chunk instead
* of N isolated retries — but the facade already does
* `malformedRetries` for us, and on hard failure capture.ts falls
* back to per-step for that chunk only.
*
* Wire format ↔ prompt:
* Send `{ host_context?, task_context?, steps: [{idx, state, action, outcome, reflection, synth_allowed}] }`.
Expand Down Expand Up @@ -170,6 +171,7 @@ export async function batchScoreReflections(
validate: (v) => validateBatchPayload(v, inputs.length),
malformedRetries: 1,
temperature: 0,
maxTokens: batchMaxTokens(inputs.length),
},
);

Expand Down Expand Up @@ -321,6 +323,15 @@ function validateBatchPayload(v: unknown, expected: number): void {
}
}

function batchMaxTokens(stepCount: number): number {
// Batch output scales with step count; keep a per-step budget but cap below
// the 16k range that triggered avoidable reasoning spend on mimo replay.
const perStepOutputBudget = 512;
const baseBudget = 768;
const ceiling = 8_192;
return Math.min(ceiling, baseBudget + Math.max(1, stepCount) * perStepOutputBudget);
}

function lastToolOutcome(step: NormalizedStep, max: number): string {
const last = step.toolCalls[step.toolCalls.length - 1];
if (!last) return "(assistant-only step)";
Expand Down
75 changes: 56 additions & 19 deletions apps/memos-local-plugin/core/capture/capture.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,14 +435,14 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
}

// Batch reflection + α across every step of the now-closed
// episode. Falls back to per-step scoring when over the threshold
// or when batching fails / no LLM is wired. The reflect pass uses
// episode. Long episodes are chunk-batched at `batchThreshold`;
// failed chunks fall back to per-step scoring. The reflect pass uses
// `reflectLlm` (skill-evolver model when configured) for higher
// quality reflections; per-turn lite capture still uses `llm`.
const reflectStart = now();
const rLlm = deps.reflectLlm ?? deps.llm;
const useBatch = shouldBatch(deps.cfg, normalized.length, rLlm !== null);
const contextEnabled = contextModeFor(deps.cfg, useBatch, normalized.length);
const scoringPlan = planScoring(deps.cfg, normalized.length, rLlm !== null);
const contextEnabled = contextModeFor(deps.cfg, scoringPlan, normalized.length);
const taskSummary = contextEnabled.includeTask
? buildTaskReflectionSummary(input.episode, normalized, deps.cfg.taskContextMaxChars)
: null;
Expand All @@ -453,18 +453,24 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
episodeId: input.episode.id,
sessionId: input.episode.sessionId,
steps: normalized.length,
mode: useBatch ? "batch" : contextEnabled.includeDownstream ? "per_step_downstream" : "per_step",
mode: scoringPlan === "per_step" && contextEnabled.includeDownstream ? "per_step_downstream" : scoringPlan,
chunks: scoringPlan === "chunk_batch"
? Math.ceil(normalized.length / Math.max(1, deps.cfg.batchThreshold))
: undefined,
reflectionContextMode: deps.cfg.reflectionContextMode,
downstreamPreview: contextEnabled.includeDownstream,
provider: rLlm?.provider ?? "none",
model: rLlm?.model ?? "none",
taskSummary: taskSummary ? taskSummary.slice(0, 240) : null,
});
let scored: ScoredStep[] = [];
if (useBatch) {
if (scoringPlan === "batch") {
scored = await runBatchScoring(normalized, rLlm!, deps, warnings, llmCalls, input.episode.id, taskSummary);
}
if (!useBatch || scored.length === 0) {
if (scoringPlan === "chunk_batch") {
scored = await runChunkedBatchScoring(normalized, rLlm!, deps, warnings, llmCalls, input.episode.id, taskSummary);
}
if (scoringPlan === "per_step" || scored.length === 0) {
scored = await runPerStepScoring(
normalized,
rLlm,
Expand Down Expand Up @@ -1018,30 +1024,30 @@ export function createCaptureRunner(deps: CaptureDeps): CaptureRunner {
// ─── helpers ────────────────────────────────────────────────────────────────

/**
* Decide whether to use the batched reflection+α path.
* Decide which reflection+α path to use.
*
* `per_step` → never (legacy path).
* `per_episode` → always, when an LLM is available.
* `auto` → batch when step count fits inside `batchThreshold`.
* `per_episode` → batch up to threshold, then chunk-batch.
* `auto` → batch up to threshold, then chunk-batch.
*/
function shouldBatch(cfg: CaptureConfig, stepCount: number, hasLlm: boolean): boolean {
if (!hasLlm) return false;
if (stepCount === 0) return false;
if (cfg.batchMode === "per_step") return false;
if (cfg.batchMode === "per_episode") return true;
// "auto"
return stepCount <= cfg.batchThreshold;
type ScoringPlan = "per_step" | "batch" | "chunk_batch";

function planScoring(cfg: CaptureConfig, stepCount: number, hasLlm: boolean): ScoringPlan {
if (!hasLlm) return "per_step";
if (stepCount === 0) return "per_step";
if (cfg.batchMode === "per_step") return "per_step";
return stepCount <= Math.max(1, cfg.batchThreshold) ? "batch" : "chunk_batch";
}

function contextModeFor(
cfg: CaptureConfig,
useBatch: boolean,
scoringPlan: ScoringPlan,
stepCount: number,
): { includeTask: boolean; includeDownstream: boolean } {
const mode = cfg.reflectionContextMode;
const includeTask = mode === "task" || mode === "task_downstream";
const wantsDownstream = mode === "downstream" || mode === "task_downstream";
const longPerStep = !useBatch && stepCount > cfg.batchThreshold;
const longPerStep = scoringPlan === "per_step" && stepCount > cfg.batchThreshold;
const includeDownstream =
wantsDownstream &&
cfg.longEpisodeReflectMode === "per_step_downstream" &&
Expand Down Expand Up @@ -1101,6 +1107,37 @@ async function runBatchScoring(
}
}

async function runChunkedBatchScoring(
normalized: NormalizedStep[],
llm: LlmClient,
deps: CaptureDeps,
warnings: CaptureResult["warnings"],
llmCalls: { reflectionSynth: number; alphaScoring: number; batchedReflection: number },
episodeId: string,
taskSummary: string | null,
): Promise<ScoredStep[]> {
const chunkSize = Math.max(1, deps.cfg.batchThreshold);
const chunks: NormalizedStep[][] = [];
for (let start = 0; start < normalized.length; start += chunkSize) {
chunks.push(normalized.slice(start, start + chunkSize));
}
const concurrency = Math.max(1, deps.cfg.llmConcurrency);
const scoredChunks = await runConcurrently(chunks, concurrency, async (chunk): Promise<ScoredStep[]> => {
const scored = await runBatchScoring(chunk, llm, deps, warnings, llmCalls, episodeId, taskSummary);
if (scored.length > 0) return scored;
return runPerStepScoring(
chunk,
llm,
deps,
warnings,
llmCalls,
episodeId,
buildReflectionContexts(chunk, taskSummary, chunk.map(() => [])),
);
});
return scoredChunks.flat();
}

async function runPerStepScoring(
normalized: NormalizedStep[],
llm: LlmClient | null,
Expand Down
4 changes: 2 additions & 2 deletions apps/memos-local-plugin/tests/helpers/fake-llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface FakeLlmScript {
complete?: Record<string, string | ((input: unknown) => string | Promise<string>)>;
completeJson?: Record<
string,
unknown | ((input: unknown) => unknown | Promise<unknown>)
unknown | ((input: unknown, opts?: unknown) => unknown | Promise<unknown>)
>;
/** Override the served-by identifier. */
servedBy?: LlmProviderName | "host_fallback";
Expand Down Expand Up @@ -64,7 +64,7 @@ export function fakeLlm(script: FakeLlmScript = {}): LlmClient {
throw new Error(`fakeLlm: no completeJson mock for op="${op}"`);
}
const value = (typeof entry === "function"
? await (entry as (x: unknown) => unknown)(input)
? await (entry as (x: unknown, o?: unknown) => unknown)(input, opts)
: entry) as T;
if (o?.validate) o.validate(value);
return {
Expand Down
Loading