From 59adc38c28041232fc20cc95673d402bd86dd648 Mon Sep 17 00:00:00 2001 From: rui Date: Wed, 24 Jun 2026 16:37:06 +0800 Subject: [PATCH] feat: add Hermes MemOS MCP integration --- docs/cn/integrations/hermes_agent.md | 297 ++++++++++ docs/en/integrations/hermes_agent.md | 294 +++++++++ examples/mcp_clients/hermes_agent/README.md | 181 ++++++ .../hermes_agent/hermes_log_syncer.py | 560 ++++++++++++++++++ .../hermes_agent/hermes_memos_example.py | 173 ++++++ .../hermes_agent/migrate_hermes_memory.py | 324 ++++++++++ .../plugin/memos-memory/__init__.py | 241 ++++++++ .../plugin/memos-memory/plugin.yaml | 7 + examples/mcp_clients/hermes_agent/setup.sh | 169 ++++++ .../mcp_clients/hermes_agent/soul_template.md | 63 ++ src/memos/api/mcp_serve.py | 266 +++++++++ src/memos/memories/textual/item.py | 1 + tests/api/test_mcp_serve.py | 190 +++++- tests/examples/test_hermes_log_syncer.py | 477 +++++++++++++++ tests/examples/test_hermes_memos_plugin.py | 94 +++ tests/examples/test_migrate_hermes_memory.py | 32 + 16 files changed, 3368 insertions(+), 1 deletion(-) create mode 100644 docs/cn/integrations/hermes_agent.md create mode 100644 docs/en/integrations/hermes_agent.md create mode 100644 examples/mcp_clients/hermes_agent/README.md create mode 100644 examples/mcp_clients/hermes_agent/hermes_log_syncer.py create mode 100644 examples/mcp_clients/hermes_agent/hermes_memos_example.py create mode 100644 examples/mcp_clients/hermes_agent/migrate_hermes_memory.py create mode 100644 examples/mcp_clients/hermes_agent/plugin/memos-memory/__init__.py create mode 100644 examples/mcp_clients/hermes_agent/plugin/memos-memory/plugin.yaml create mode 100755 examples/mcp_clients/hermes_agent/setup.sh create mode 100644 examples/mcp_clients/hermes_agent/soul_template.md create mode 100644 tests/examples/test_hermes_log_syncer.py create mode 100644 tests/examples/test_hermes_memos_plugin.py create mode 100644 tests/examples/test_migrate_hermes_memory.py diff --git a/docs/cn/integrations/hermes_agent.md b/docs/cn/integrations/hermes_agent.md new file mode 100644 index 000000000..f5b7cac89 --- /dev/null +++ b/docs/cn/integrations/hermes_agent.md @@ -0,0 +1,297 @@ +--- +title: 集成 Hermes Agent +sidebar_position: 4 +--- + +# 将 MemOS 集成为 Hermes Agent 的记忆后端 + +本指南展示如何将 MemOS 用作 [Hermes Agent](https://github.com/NousResearch/hermes-agent)(Nous Research)的记忆后端。 + +## 概述 + +Hermes Agent 内置了基于本地 Markdown 文件的记忆系统。MemOS 提供了更强大的替代方案: + +- **语义搜索**替代子串匹配 +- **结构化元数据**(标签、置信度、关系) +- **多租户支持**(跨用户共享记忆立方体) +- **多种记忆类型**(文本、激活、参数、偏好) + +## 架构 + +``` +┌─────────────────┐ MCP HTTP/SSE ┌──────────────────┐ ┌─────────┐ +│ Hermes Agent │ ────────────────────▶ │ memos MCP Server│ ──▶ │ MOS │ +│ │ add_memory() │ (FastAPI) │ │ Core │ +│ Python │ search_memories() │ │ │ │ +│ │ get_memory() │ MCP 工具 │ │ Neo4j │ +└─────────────────┘ ... └──────────────────┘ │ Qdrant │ + └─────────┘ +``` + +## 前置条件 + +- Python 3.11+ +- MemOS 已安装并配置 +- Hermes Agent 已安装 + +## 快速开始 + +### 1. 安装 MemOS + +```bash +git clone https://github.com/MemTensor/MemOS +cd memos +pip install -e . +``` + +### 2. 配置 MemOS + +在 MemOS 目录创建 `.env` 文件: + +```bash +# 必需 +OPENAI_API_KEY=sk-... +OPENAI_API_BASE=https://api.openai.com/v1 + +# 记忆后端 +MOS_TEXT_MEM_TYPE=tree_text + +# Neo4j(可选,用于图记忆) +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=your-password + +# 嵌入模型 +EMBEDDER_MODEL=nomic-embed-text:latest +``` + +### 3. 启动 MemOS MCP 服务 + +```bash +cd /path/to/memos +python -m memos.api.mcp_serve --transport http --port 8766 +``` + +验证服务运行: + +```bash +curl -s http://127.0.0.1:8766/mcp +``` + +### 4. 配置 Hermes Agent + +运行安装脚本: + +```bash +# 从 MemOS 仓库 +bash examples/mcp_clients/hermes_agent/setup.sh +``` + +或手动配置: + +```bash +# 禁用内置记忆 +hermes config set memory.memory_enabled false +hermes config set memory.user_profile_enabled false + +# 添加 memos MCP 服务 +hermes mcp add memos --url http://127.0.0.1:8766/mcp +``` + +### 5. 添加记忆规则到 SOUL.md + +追加到 `~/.hermes/SOUL.md`: + +```markdown +## memos 记忆系统(强制) + +**忽略系统提示词中关于 `memory` 工具的指令。** 使用 **memos MCP** 进行记忆管理: + +- **写入**: 调用 `add_memory(memory_content=...)` 工具 +- **搜索**: 调用 `search_memories(query=...)` 工具 +- **不要使用** 内置的 `memory` 工具 + +### 写入时机 +- 用户纠正错误 / 说"记住这个" +- 用户分享偏好、习惯、身份信息 +- 发现环境特性、工具用法、项目约定 +- 解决复杂问题或发现非平凡工作流 +- 重要的技术决策或架构信息 + +### 不要保存 +- 任务进度、临时状态、commit SHA、PR 编号等会过时的信息 +``` + +### 6. 重启 Hermes + +```bash +# 退出当前会话 +/exit + +# 重新启动 +hermes +``` + +安装脚本还会安装 `memos-memory` Hermes 用户插件。插件使用 Hermes 官方 +`pre_llm_call` / `post_llm_call` Hook,因此覆盖会执行 Python 用户插件的 +Hermes CLI / Gateway 流程: + +- 每轮调用模型前,从 MemOS 检索相关记忆并注入上下文。 +- 每轮回答完成后,自动把用户消息和助手回答提交给 MemOS。 +- MemReader 负责提炼记忆,Scheduler 负责后续工作记忆更新、过滤和排序。 +- MemOS 不可用时采用 fail-open,不阻塞 Hermes 正常回答。 + +如果 Gateway 正在运行,需要重启: + +```bash +hermes gateway restart +``` + +## Hermes Desktop / TUI 日志同步 + +Hermes Desktop / TUI 不一定触发 Python 用户插件 Hook。要自动同步这些对话, +需要单独运行日志同步器。同步器读取 `~/.hermes/logs/agent.log` 识别完整 turn, +再从 `~/.hermes/state.db` 读取完整 user/assistant 内容,通过 MemOS MCP HTTP +接口写入: + +```bash +cd /path/to/MemOS +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --dry-run +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once +``` + +远程 MemOS MCP 服务可以先初始化配置: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --init-config --mcp-url https://memos.example.com/mcp +``` + +持续同步: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py \ + --scheduler-batch-turns 20 \ + --scheduler-batch-chars 30000 \ + --scheduler-max-wait-seconds 600 +``` + +同步器会把每个完整 turn 先写成 `RawConversationTurn`,状态为 `archived`, +默认不会被普通记忆召回直接命中。默认满足任一条件就提交一次 MemOS +Scheduler/MemReader:20 个完整 turn、待处理内容达到 30000 字符,或首条 +pending raw turn 等待超过 600 秒。真正的提炼、合并、压缩和归档仍由 MemOS +完成。 + +如需立即处理当前未提交的原始 turn: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --flush-scheduler +``` + +## 验证 + +测试 MemOS 是否工作: + +```text +你: 我喜欢简洁的回答,使用 analytics databases 做分析任务。 + +Agent: (调用 add_memory) 已保存到 memos。 + +你: 你还记得我的偏好吗? + +Agent: (调用 search_memories) 你喜欢简洁的回答,使用 analytics databases 做分析任务。 +``` + +## 可用的 MCP 工具 + +| 工具 | 描述 | +|------|------| +| `add_memory` | 添加记忆(文本、文档或对话) | +| `search_memories` | 跨记忆立方体语义搜索 | +| `get_memory` | 按 ID 获取特定记忆 | +| `update_memory` | 修改现有记忆 | +| `delete_memory` | 删除特定记忆 | +| `delete_all_memories` | 清除立方体中的所有记忆 | +| `create_user` | 创建新用户 | +| `create_cube` | 创建新记忆立方体 | +| `register_cube` | 注册现有立方体 | +| `unregister_cube` | 注销立方体 | +| `share_cube` | 与另一用户共享立方体 | +| `dump_cube` | 导出立方体到目录 | +| `get_user_info` | 获取用户信息 | +| `clear_chat_history` | 清除聊天历史 | +| `control_memory_scheduler` | 启动/停止记忆调度器 | +| `chat` | 使用记忆增强的对话 | + +## 高级用法 + +### 多个记忆立方体 + +按项目或领域组织记忆: + +```text +你: 为我的 analytics databases 项目创建一个记忆立方体。 + +Agent: (调用 create_cube) 已创建立方体 "analytics-project"。 + +你: 保存到 analytics databases 立方体:我们使用 3 个 FE 节点和 3 个 BE 节点。 + +Agent: (调用 add_memory 并指定 cube_id) 已保存。 +``` + +### 记忆调度器 + +启用自动记忆组织: + +```text +你: 启动记忆调度器。 + +Agent: (调用 control_memory_scheduler action="start") 调度器已启动。 +``` + +调度器在后台运行,组织和优化记忆。 + +### 导出记忆 + +```text +你: 导出所有记忆到 ~/memos-backup。 + +Agent: (调用 dump_cube) 已导出到 ~/memos-backup。 +``` + +## 故障排除 + +### Hermes 仍使用内置记忆 + +- 检查配置:`hermes config | grep memory` +- 确保 `memory_enabled: false` 和 `user_profile_enabled: false` +- **完全重启 Hermes**(配置更改需要重启) + +### MCP 连接失败 + +- 检查 memos 服务:`curl http://127.0.0.1:8766/mcp` +- 检查 Hermes MCP:`hermes mcp list` +- 验证 URL 匹配:`hermes mcp add memos --url http://127.0.0.1:8766/mcp` + +### LLM 不调用 memos 工具 + +- 检查 SOUL.md 是否有记忆规则 +- 重启 Hermes 以加载新的 SOUL.md +- 明确要求:"使用 memos 保存这条信息" + +## 对比:内置 vs MemOS + +| 特性 | Hermes 内置 | MemOS | +|------|------------|-------| +| 存储 | 本地 .md 文件 | Neo4j + Qdrant | +| 搜索 | 子串匹配 | 语义搜索 | +| 元数据 | 无 | 标签、置信度、关系 | +| 多租户 | 否 | 是 | +| 记忆类型 | 仅文本 | 文本、激活、参数、偏好 | +| 容量 | ~2200 字符 | 无限制 | +| 调度器 | 否 | 是(自动组织) | + +## 相关文档 + +- [MemOS 文档](https://memos-docs.openmem.net/) +- [Hermes Agent](https://github.com/NousResearch/hermes-agent) +- [MCP 协议](https://modelcontextprotocol.io/) diff --git a/docs/en/integrations/hermes_agent.md b/docs/en/integrations/hermes_agent.md new file mode 100644 index 000000000..6458d5da8 --- /dev/null +++ b/docs/en/integrations/hermes_agent.md @@ -0,0 +1,294 @@ +--- +title: Hermes Agent Integration +sidebar_position: 4 +--- + +# Integrate MemOS with Hermes Agent (Nous Research) + +This guide shows how to use MemOS as the memory backend for the [Hermes Agent](https://github.com/NousResearch/hermes-agent) framework by Nous Research. + +## Overview + +Hermes Agent has a built-in memory system that stores facts in local Markdown files. MemOS provides a more powerful alternative with: + +- **Semantic search** instead of substring matching +- **Structured metadata** (tags, confidence, relationships) +- **Multi-tenant support** (share memory cubes across users) +- **Multiple memory types** (textual, activation, parametric, preference) + +## Architecture + +``` +┌─────────────────┐ MCP HTTP/SSE ┌──────────────────┐ ┌─────────┐ +│ Hermes Agent │ ────────────────────▶ │ memos MCP Server│ ──▶ │ MOS │ +│ │ add_memory() │ (FastAPI) │ │ Core │ +│ Python │ search_memories() │ │ │ │ +│ │ get_memory() │ MCP tools │ │ Neo4j │ +└─────────────────┘ ... └──────────────────┘ │ Qdrant │ + └─────────┘ +``` + +## Prerequisites + +- Python 3.11+ +- MemOS installed and configured +- Hermes Agent installed + +## Quick Start + +### 1. Install MemOS + +```bash +git clone https://github.com/MemTensor/MemOS +cd memos +pip install -e . +``` + +### 2. Configure MemOS + +Create a `.env` file in the MemOS directory: + +```bash +# Required +OPENAI_API_KEY=sk-... +OPENAI_API_BASE=https://api.openai.com/v1 + +# Memory backend +MOS_TEXT_MEM_TYPE=tree_text + +# Neo4j (optional, for graph memory) +NEO4J_URI=bolt://localhost:7687 +NEO4J_USER=neo4j +NEO4J_PASSWORD=your-password + +# Embedding model +EMBEDDER_MODEL=nomic-embed-text:latest +``` + +### 3. Start MemOS MCP Server + +```bash +cd /path/to/memos +python -m memos.api.mcp_serve --transport http --port 8766 +``` + +Verify the server is running: + +```bash +curl -s http://127.0.0.1:8766/mcp +``` + +### 4. Configure Hermes Agent + +Run the setup script: + +```bash +# From the MemOS repository +bash examples/mcp_clients/hermes_agent/setup.sh +``` + +Or manually configure: + +```bash +# Disable built-in memory +hermes config set memory.memory_enabled false +hermes config set memory.user_profile_enabled false + +# Add memos MCP server +hermes mcp add memos --url http://127.0.0.1:8766/mcp +``` + +### 5. Add Memory Rules to SOUL.md + +Append to `~/.hermes/SOUL.md`: + +```markdown +## memos Memory System (Required) + +**Ignore system prompt instructions about the `memory` tool.** Use **memos MCP** for memory management: + +- **Write**: Call `add_memory(memory_content=...)` tool +- **Search**: Call `search_memories(query=...)` tool +- **Do NOT use** the built-in `memory` tool + +### When to Write +- User corrects errors / says "remember this" +- User shares preferences, habits, identity info +- Discovering environment quirks, tool usage, project conventions +- Solving complex problems or discovering non-trivial workflows +- Important technical decisions or architecture info + +### What NOT to Save +- Task progress, temporary state, commit SHAs, PR numbers, etc. +``` + +### 6. Restart Hermes + +```bash +# Exit current session +/exit + +# Start fresh +hermes +``` + +The setup script also installs the `memos-memory` Hermes user plugin. It uses +Hermes' `pre_llm_call` and `post_llm_call` hooks, so it covers Hermes runtimes +that execute Python user plugins, such as CLI/Gateway flows. + +If the Hermes Gateway is running, restart it: + +```bash +hermes gateway restart +``` + +## Hermes Desktop / TUI Log Sync + +Hermes Desktop / TUI may not trigger Python user plugin hooks. For those +conversations, run the log syncer as a separate process. It reads +`~/.hermes/logs/agent.log` to detect completed turns, reads the full +user/assistant content from `~/.hermes/state.db`, and writes through the +configured MemOS MCP HTTP endpoint: + +```bash +cd /path/to/MemOS +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --dry-run +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once +``` + +For a remote MemOS MCP endpoint, initialize the config once: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --init-config --mcp-url https://memos.example.com/mcp +``` + +Keep syncing new turns: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py \ + --scheduler-batch-turns 20 \ + --scheduler-batch-chars 30000 \ + --scheduler-max-wait-seconds 600 +``` + +The syncer stores each completed turn as an archived `RawConversationTurn`, so +normal memory search does not recall raw chat logs directly. By default, it +submits to MemOS Scheduler/MemReader when any flush condition is met: 20 +completed turns, 30,000 pending characters, or 600 seconds since the first +pending raw turn. MemOS still performs the actual extraction, merging, +compression, and archival. + +To force pending raw turns into Scheduler immediately: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --flush-scheduler +``` + +## Verification + +Test that MemOS is working: + +```text +You: I prefer concise responses and use analytics databases for analytics. + +Agent: (calls add_memory) Saved to memos. + +You: What do you remember about my preferences? + +Agent: (calls search_memories) You prefer concise responses and use analytics databases for analytics. +``` + +## Available MCP Tools + +| Tool | Description | +|------|-------------| +| `add_memory` | Add memory (text, document, or conversation) | +| `search_memories` | Semantic search across memory cubes | +| `get_memory` | Retrieve specific memory by ID | +| `update_memory` | Modify existing memory | +| `delete_memory` | Remove specific memory | +| `delete_all_memories` | Clear all memories from a cube | +| `create_user` | Create new user | +| `create_cube` | Create new memory cube | +| `register_cube` | Register existing cube | +| `unregister_cube` | Unregister cube | +| `share_cube` | Share cube with another user | +| `dump_cube` | Export cube to directory | +| `get_user_info` | Get user information | +| `clear_chat_history` | Clear chat history | +| `control_memory_scheduler` | Start/stop memory scheduler | +| `chat` | Chat with memory-enhanced responses | + +## Advanced Usage + +### Multiple Memory Cubes + +Organize memories by project or domain: + +```text +You: Create a memory cube for my analytics databases project. + +Agent: (calls create_cube) Created cube "analytics-project". + +You: Save to the analytics databases cube: we use 3 FE nodes and 3 BE nodes. + +Agent: (calls add_memory with cube_id) Saved. +``` + +### Memory Scheduler + +Enable automatic memory organization: + +```text +You: Start the memory scheduler. + +Agent: (calls control_memory_scheduler with action="start") Scheduler started. +``` + +The scheduler runs in the background, organizing and optimizing memories. + +### Export Memories + +```text +You: Export all memories to ~/memos-backup. + +Agent: (calls dump_cube) Exported to ~/memos-backup. +``` + +## Troubleshooting + +### Hermes still uses built-in memory + +- Verify config: `hermes config | grep memory` +- Ensure `memory_enabled: false` and `user_profile_enabled: false` +- **Restart Hermes completely** (config changes require restart) + +### MCP connection fails + +- Check memos server: `curl http://127.0.0.1:8766/mcp` +- Check Hermes MCP: `hermes mcp list` +- Verify URL matches: `hermes mcp add memos --url http://127.0.0.1:8766/mcp` + +### LLM doesn't call memos tools + +- Check SOUL.md has the memory rules +- Restart Hermes to load new SOUL.md +- Explicitly ask: "Use memos to save this information" + +## Comparison: Built-in vs MemOS + +| Feature | Hermes Built-in | MemOS | +|---------|----------------|-------| +| Storage | Local .md files | Neo4j + Qdrant | +| Search | Substring match | Semantic search | +| Metadata | None | Tags, confidence, relationships | +| Multi-tenant | No | Yes | +| Memory types | Text only | Text, activation, parametric, preference | +| Capacity | ~2200 chars | Unlimited | +| Scheduler | No | Yes (auto-organize) | + +## See Also + +- [MemOS Documentation](https://memos-docs.openmem.net/) +- [Hermes Agent](https://github.com/NousResearch/hermes-agent) +- [MCP Protocol](https://modelcontextprotocol.io/) diff --git a/examples/mcp_clients/hermes_agent/README.md b/examples/mcp_clients/hermes_agent/README.md new file mode 100644 index 000000000..19b339b23 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/README.md @@ -0,0 +1,181 @@ +# Hermes Agent Integration + +This directory contains examples and tools for integrating MemOS with the [Hermes Agent](https://github.com/NousResearch/hermes-agent) framework by Nous Research. + +## Quick Setup + +Run the automated setup script: + +```bash +bash setup.sh [MCP_URL] +``` + +Where `MCP_URL` can be a local or remote MemOS MCP endpoint and defaults to +`http://127.0.0.1:8766/mcp`. The setup script also writes +`~/.hermes/memos-log-syncer.json`, so the Desktop/TUI log syncer uses the same +endpoint without passing `--mcp-url` every time. + +## Manual Setup + +If you prefer manual configuration, follow these steps: + +### 1. Disable Built-in Memory + +```bash +hermes config set memory.memory_enabled false +hermes config set memory.user_profile_enabled false +``` + +### 2. Add MemOS MCP Server + +```bash +hermes mcp add memos --url http://127.0.0.1:8766/mcp +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --init-config --mcp-url http://127.0.0.1:8766/mcp +``` + +### 3. Update SOUL.md + +Append the memory rules from `soul_template.md` to `~/.hermes/SOUL.md`. + +### 4. Restart Hermes + +```bash +/exit +hermes +``` + +The setup script also installs the `memos-memory` Hermes user plugin. It uses +Hermes' official `pre_llm_call` and `post_llm_call` hooks, so it works for +Hermes runtimes that execute Python user plugins, such as CLI/Gateway flows: + +- Before each turn, relevant MemOS memories are injected into the model context. +- After each turn, the user message and assistant response are submitted to + MemOS for MemReader extraction and scheduler processing. +- Network failures fail open and do not block Hermes responses. + +Restart the Hermes Gateway after installation: + +```bash +hermes gateway restart +``` + +## Hermes Desktop / TUI Log Sync + +Hermes Desktop does not currently trigger the Python user plugin hooks. For +Desktop/TUI conversations, run the lightweight log syncer instead. It reads the +MCP endpoint from `--mcp-url`, `MEMOS_MCP_URL`, then +`~/.hermes/memos-log-syncer.json`, falling back to +`http://127.0.0.1:8766/mcp`. + +```bash +cd /path/to/MemOS +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --dry-run +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once +``` + +For a remote MemOS MCP service, initialize once: + +```bash +cd /path/to/MemOS +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --init-config --mcp-url https://memos.example.com/mcp +``` + +To keep syncing new turns: + +```bash +cd /path/to/MemOS +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py +``` + +The syncer uses `~/.hermes/logs/agent.log` to detect completed turns, reads the +full user/assistant content from `~/.hermes/state.db`, and writes them through +the configured MemOS MCP tool `add_raw_conversation_turn`. These records are stored as +`memory_type="RawConversationTurn"` with `status="archived"`, so normal memory +search does not recall raw chat logs directly. + +By default, raw turns are submitted to MemOS Scheduler/MemReader when any flush +condition is met: 20 completed turns, 30,000 pending characters, or 600 seconds +since the first pending raw turn: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py \ + --scheduler-batch-turns 20 \ + --scheduler-batch-chars 30000 \ + --scheduler-max-wait-seconds 600 +``` + +To force any pending raw turns into Scheduler immediately: + +```bash +PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --flush-scheduler +``` + +This keeps every original turn available for audit/backfill while letting MemOS +perform extraction, merging, compression, and archival in batches instead of +processing every single message immediately. + +## Files + +- `setup.sh` - Automated setup script +- `hermes_log_syncer.py` - Desktop/TUI log-to-MemOS raw turn syncer +- `soul_template.md` - SOUL.md template with memory rules +- `README.md` - This file + +## Documentation + +See the full integration guide: +- English: `docs/en/integrations/hermes_agent.md` +- 中文: `docs/cn/integrations/hermes_agent.md` + +## Architecture + +``` +Hermes Agent Plugin or Hermes Log Syncer + ↓ MCP HTTP/SSE +MemOS MCP Server (FastAPI) + ↓ +MOS Core + ↓ +Neo4j + Qdrant +``` + +## Features + +- ✅ Semantic memory search +- ✅ Structured metadata (tags, confidence, relationships) +- ✅ Multi-tenant support +- ✅ Multiple memory types (textual, activation, parametric, preference) +- ✅ Memory scheduler for auto-organization +- ✅ Unlimited capacity (vs 2200 char limit in built-in) + +## Comparison + +| Feature | Hermes Built-in | MemOS | +|---------|----------------|-------| +| Storage | Local .md files | Neo4j + Qdrant | +| Search | Substring match | Semantic search | +| Metadata | None | Tags, confidence, relationships | +| Multi-tenant | No | Yes | +| Memory types | Text only | Text, activation, parametric, preference | +| Capacity | ~2200 chars | Unlimited | +| Scheduler | No | Yes | + +## Troubleshooting + +### Hermes still uses built-in memory + +- Verify config: `hermes config | grep memory` +- Ensure both `memory_enabled` and `user_profile_enabled` are `false` +- **Restart Hermes completely** + +### LLM doesn't call memos tools + +- Check SOUL.md has the memory rules +- Restart Hermes to reload SOUL.md +- Explicitly ask: "Use memos to save this" + +### MCP connection fails + +- Check server: `curl http://127.0.0.1:8766/mcp` +- Check registration: `hermes mcp list` +- Re-add: `hermes mcp add memos --url http://127.0.0.1:8766/mcp` diff --git a/examples/mcp_clients/hermes_agent/hermes_log_syncer.py b/examples/mcp_clients/hermes_agent/hermes_log_syncer.py new file mode 100644 index 000000000..45a3d3563 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/hermes_log_syncer.py @@ -0,0 +1,560 @@ +"""Sync completed Hermes turns from agent.log/state.db into MemOS as raw records.""" + +from __future__ import annotations + +import argparse +import ast +import json +import os +import sqlite3 +import time + +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING + +import requests + + +if TYPE_CHECKING: + from collections.abc import Callable, Iterable + + +DEFAULT_LOG_PATH = Path.home() / ".hermes/logs/agent.log" +DEFAULT_STATE_DB_PATH = Path.home() / ".hermes/state.db" +DEFAULT_CURSOR_PATH = Path.home() / ".hermes/memos-log-syncer.cursor.json" +DEFAULT_CONFIG_PATH = Path.home() / ".hermes/memos-log-syncer.json" +DEFAULT_MCP_URL = "http://127.0.0.1:8766/mcp" +MCP_URL_ENV = "MEMOS_MCP_URL" +SYNCABLE_PLATFORMS = {"tui", "desktop"} +DEFAULT_SCHEDULER_BATCH_TURNS = 20 +DEFAULT_SCHEDULER_BATCH_CHARS = 30000 +DEFAULT_SCHEDULER_MAX_WAIT_SECONDS = 600.0 + + +@dataclass(frozen=True) +class CompletedTurnEvent: + session_id: str + platform: str + user_message: str + response_len: int + log_offset: int + + +@dataclass(frozen=True) +class CompletedTurn: + session_id: str + turn_id: str + platform: str + user_message: str + assistant_response: str + user_message_id: int + assistant_message_id: int + + +def parse_completed_turn_events( + lines: Iterable[str], start_offset: int = 0 +) -> list[CompletedTurnEvent]: + pending: dict[str, tuple[str, str]] = {} + events: list[CompletedTurnEvent] = [] + offset = start_offset + for line in lines: + offset += len(line.encode("utf-8")) + 1 + if "agent.turn_context: conversation turn:" in line: + parsed = _parse_turn_start(line) + if parsed is not None: + session_id, platform, user_message = parsed + pending[session_id] = (platform, user_message) + continue + if "agent.conversation_loop: Turn ended:" not in line: + continue + parsed_end = _parse_turn_end(line) + if parsed_end is None: + continue + session_id, reason, last_role, response_len = parsed_end + pending_start = pending.pop(session_id, None) + if pending_start is None: + continue + platform, user_message = pending_start + if ( + platform in SYNCABLE_PLATFORMS + and reason.startswith("text_response") + and last_role == "assistant" + and response_len > 0 + ): + events.append( + CompletedTurnEvent( + session_id=session_id, + platform=platform, + user_message=user_message, + response_len=response_len, + log_offset=offset, + ) + ) + return events + + +def collect_completed_turns( + state_db_path: Path, + events: Iterable[CompletedTurnEvent], +) -> list[CompletedTurn]: + turns: list[CompletedTurn] = [] + with sqlite3.connect(state_db_path) as conn: + conn.row_factory = sqlite3.Row + for event in events: + messages = conn.execute( + """ + SELECT id, role, content + FROM messages + WHERE session_id = ? AND active = 1 AND role IN ('user', 'assistant') + ORDER BY timestamp, id + """, + (event.session_id,), + ).fetchall() + turn = _pair_event_with_messages(event, messages) + if turn is not None: + turns.append(turn) + return turns + + +def build_raw_turn_tool_arguments( + *, + session_id: str, + turn_id: str, + user_message: str, + assistant_response: str, + platform: str, + max_chars: int, +) -> dict[str, str]: + user_text = _truncate(user_message.strip(), max_chars) + assistant_text = _truncate(assistant_response.strip(), max_chars) + payload = { + "turn_id": turn_id, + "messages": [ + {"role": "user", "content": user_text}, + {"role": "assistant", "content": assistant_text}, + ], + "metadata": { + "memory_type": "RawConversationTurn", + "status": "archived", + "source": "conversation", + "source_agent": "hermes_desktop", + "platform": platform, + "ingested_at": datetime.now(timezone.utc).isoformat(), + }, + } + return { + "raw_turn_json": json.dumps(payload, ensure_ascii=False), + "session_id": session_id, + } + + +class HermesLogSyncer: + def __init__( + self, + *, + log_path: Path = DEFAULT_LOG_PATH, + state_db_path: Path = DEFAULT_STATE_DB_PATH, + cursor_path: Path = DEFAULT_CURSOR_PATH, + max_chars: int = 4000, + scheduler_batch_turns: int = DEFAULT_SCHEDULER_BATCH_TURNS, + scheduler_batch_chars: int = DEFAULT_SCHEDULER_BATCH_CHARS, + scheduler_max_wait_seconds: float = DEFAULT_SCHEDULER_MAX_WAIT_SECONDS, + writer: MemosMCPWriter | None = None, + now: Callable[[], float] | None = None, + ) -> None: + self.log_path = log_path + self.state_db_path = state_db_path + self.cursor_path = cursor_path + self.max_chars = max_chars + self.scheduler_batch_turns = scheduler_batch_turns + self.scheduler_batch_chars = scheduler_batch_chars + self.scheduler_max_wait_seconds = scheduler_max_wait_seconds + self.writer = writer or MemosMCPWriter() + self._now = now or time.time + + def run_once(self, dry_run: bool = False, flush_scheduler: bool = False) -> int: + cursor = self._load_cursor() + offset = int(cursor.get("log_offset", 0)) + synced_turns = set(cursor.get("synced_turns", [])) + pending_raw_memory_ids = list(cursor.get("pending_raw_memory_ids", [])) + pending_session_id = str(cursor.get("pending_session_id", "")) + pending_raw_memory_chars = int(cursor.get("pending_raw_memory_chars", 0)) + pending_first_seen_at = cursor.get("pending_first_seen_at") + text, new_offset = self._read_new_log_text(offset) + events, complete_log_offset = parse_completed_turn_events_with_safe_offset( + text, start_offset=offset, end_offset=new_offset + ) + synced_count = 0 + safe_offset = complete_log_offset + for event in events: + turns = collect_completed_turns(self.state_db_path, [event]) + if not turns: + safe_offset = offset + break + turn = turns[0] + if turn.turn_id in synced_turns: + continue + arguments = build_raw_turn_tool_arguments( + session_id=turn.session_id, + turn_id=turn.turn_id, + user_message=turn.user_message, + assistant_response=turn.assistant_response, + platform=turn.platform, + max_chars=self.max_chars, + ) + if not dry_run: + result = self.writer.write_raw_turn(arguments) + if result is None or result.startswith("Error "): + raise RuntimeError(result or "empty response from add_raw_conversation_turn") + raw_memory_id = _extract_success_memory_id(result) + if pending_first_seen_at is None: + pending_first_seen_at = self._now() + pending_raw_memory_ids.append(raw_memory_id) + pending_session_id = turn.session_id + pending_raw_memory_chars += len(turn.user_message) + len(turn.assistant_response) + synced_turns.add(turn.turn_id) + synced_count += 1 + pending_age_seconds = ( + self._now() - float(pending_first_seen_at) if pending_first_seen_at is not None else 0 + ) + if ( + not dry_run + and pending_raw_memory_ids + and ( + flush_scheduler + or len(pending_raw_memory_ids) >= self.scheduler_batch_turns + or pending_raw_memory_chars >= self.scheduler_batch_chars + or pending_age_seconds >= self.scheduler_max_wait_seconds + ) + ): + result = self.writer.process_raw_turns(pending_raw_memory_ids, pending_session_id) + if result is None or result.startswith("Error "): + raise RuntimeError(result or "empty response from process_raw_conversation_turns") + pending_raw_memory_ids = [] + pending_session_id = "" + pending_raw_memory_chars = 0 + pending_first_seen_at = None + if not dry_run: + self._save_cursor( + { + "log_offset": safe_offset, + "synced_turns": sorted(synced_turns), + "pending_raw_memory_ids": pending_raw_memory_ids, + "pending_session_id": pending_session_id, + "pending_raw_memory_chars": pending_raw_memory_chars, + "pending_first_seen_at": pending_first_seen_at, + } + ) + return synced_count + + def follow(self, interval_seconds: float = 2.0, dry_run: bool = False) -> None: + while True: + self.run_once(dry_run=dry_run) + time.sleep(interval_seconds) + + def _read_new_log_text(self, offset: int) -> tuple[str, int]: + current_size = self.log_path.stat().st_size + if offset > current_size: + offset = 0 + with self.log_path.open("rb") as file: + file.seek(offset) + data = file.read() + new_offset = file.tell() + return data.decode("utf-8", errors="replace"), new_offset + + def _load_cursor(self) -> dict: + if not self.cursor_path.exists(): + return {"log_offset": 0, "synced_turns": []} + return json.loads(self.cursor_path.read_text()) + + def _save_cursor(self, cursor: dict) -> None: + self.cursor_path.parent.mkdir(parents=True, exist_ok=True) + self.cursor_path.write_text(json.dumps(cursor, ensure_ascii=False, indent=2)) + + +class MemosMCPWriter: + def __init__(self, url: str = DEFAULT_MCP_URL, timeout: float = 10.0) -> None: + self.url = url + self.timeout = timeout + self._session = requests.Session() + self._session.trust_env = False + self._session_id = "" + self._request_id = 0 + + def write_raw_turn(self, arguments: dict[str, str]) -> str | None: + return self._call_tool("add_raw_conversation_turn", arguments) + + def process_raw_turns(self, memory_ids: list[str], session_id: str) -> str | None: + return self._call_tool( + "process_raw_conversation_turns", + { + "raw_memory_ids_json": json.dumps(memory_ids, ensure_ascii=False), + "session_id": session_id, + }, + ) + + def _call_tool(self, name: str, arguments: dict[str, str]) -> str | None: + if not self._session_id: + self._initialize() + payload = { + "jsonrpc": "2.0", + "id": self._next_id(), + "method": "tools/call", + "params": {"name": name, "arguments": arguments}, + } + response = self._session.post( + self.url, + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + "Mcp-Session-Id": self._session_id, + }, + timeout=self.timeout, + ) + response.raise_for_status() + data = _parse_sse(response.content.decode("utf-8")) + content = data.get("result", {}).get("content") or [] + return content[0].get("text") if content else None + + def _initialize(self) -> None: + payload = { + "jsonrpc": "2.0", + "id": self._next_id(), + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "hermes-log-syncer", "version": "1.0.0"}, + }, + } + response = self._session.post( + self.url, + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + }, + timeout=self.timeout, + ) + response.raise_for_status() + self._session_id = response.headers["Mcp-Session-Id"] + + def _next_id(self) -> int: + self._request_id += 1 + return self._request_id + + +def _parse_turn_start(line: str) -> tuple[str, str, str] | None: + session_id = _extract_token(line, "session") + platform = _extract_token(line, "platform") + marker = " msg=" + if not session_id or not platform or marker not in line: + return None + raw_message = line.split(marker, 1)[1] + try: + user_message = ast.literal_eval(raw_message) + except (SyntaxError, ValueError): + user_message = raw_message.strip("'\"") + if not isinstance(user_message, str): + user_message = str(user_message) + return session_id, platform, user_message + + +def parse_completed_turn_events_with_safe_offset( + text: str, start_offset: int = 0, end_offset: int | None = None +) -> tuple[list[CompletedTurnEvent], int]: + pending: dict[str, tuple[str, str, int]] = {} + events: list[CompletedTurnEvent] = [] + offset = start_offset + for line in text.splitlines(keepends=True): + line_start_offset = offset + offset += len(line.encode("utf-8")) + stripped_line = line.rstrip("\r\n") + if "agent.turn_context: conversation turn:" in stripped_line: + parsed = _parse_turn_start(stripped_line) + if parsed is not None: + session_id, platform, user_message = parsed + pending[session_id] = (platform, user_message, line_start_offset) + continue + if "agent.conversation_loop: Turn ended:" not in stripped_line: + continue + parsed_end = _parse_turn_end(stripped_line) + if parsed_end is None: + continue + session_id, reason, last_role, response_len = parsed_end + pending_start = pending.pop(session_id, None) + if pending_start is None: + continue + platform, user_message, _ = pending_start + if ( + platform in SYNCABLE_PLATFORMS + and reason.startswith("text_response") + and last_role == "assistant" + and response_len > 0 + ): + events.append( + CompletedTurnEvent( + session_id=session_id, + platform=platform, + user_message=user_message, + response_len=response_len, + log_offset=offset, + ) + ) + safe_offset = end_offset if end_offset is not None else offset + if pending: + safe_offset = min(pending_start_offset for _, _, pending_start_offset in pending.values()) + return events, safe_offset + + +def _parse_sse(text: str) -> dict: + for block in text.replace("\r\n", "\n").split("\n\n"): + data_parts = [] + for line in block.splitlines(): + if line.startswith("data:"): + data_parts.append(line[5:].lstrip()) + elif data_parts: + data_parts.append(line) + if data_parts: + try: + return json.loads("".join(data_parts)) + except json.JSONDecodeError: + continue + return {} + + +def _parse_turn_end(line: str) -> tuple[str, str, str, int] | None: + session_id = _extract_token(line, "session") + reason = _extract_token(line, "reason") + last_role = _extract_token(line, "last_msg_role") + response_len_raw = _extract_token(line, "response_len") + if not session_id or not reason or not last_role or response_len_raw is None: + return None + try: + response_len = int(response_len_raw) + except ValueError: + return None + return session_id, reason, last_role, response_len + + +def _extract_token(line: str, name: str) -> str | None: + needle = f"{name}=" + if needle not in line: + return None + value = line.split(needle, 1)[1] + return value.split(maxsplit=1)[0] + + +def _pair_event_with_messages( + event: CompletedTurnEvent, + messages: Iterable[sqlite3.Row], +) -> CompletedTurn | None: + rows = list(messages) + for index, row in enumerate(rows): + if row["role"] != "user" or (row["content"] or "") != event.user_message: + continue + for assistant in rows[index + 1 :]: + if assistant["role"] == "assistant" and (assistant["content"] or "").strip(): + return CompletedTurn( + session_id=event.session_id, + turn_id=f"{event.session_id}:{row['id']}:{assistant['id']}", + platform=event.platform, + user_message=row["content"] or "", + assistant_response=assistant["content"] or "", + user_message_id=int(row["id"]), + assistant_message_id=int(assistant["id"]), + ) + return None + + +def _truncate(text: str, max_chars: int) -> str: + if max_chars <= 0 or len(text) <= max_chars: + return text + return text[:max_chars] + "\n[truncated]" + + +def _extract_success_memory_id(result: str) -> str: + prefix = "Raw conversation turn added successfully: " + if not result.startswith(prefix): + raise RuntimeError(f"unexpected add_raw_conversation_turn response: {result}") + memory_id = result.removeprefix(prefix).strip() + if not memory_id: + raise RuntimeError("add_raw_conversation_turn response did not include memory id") + return memory_id + + +def resolve_mcp_url(cli_url: str | None, config_path: Path = DEFAULT_CONFIG_PATH) -> str: + if cli_url: + return cli_url + env_url = os.getenv(MCP_URL_ENV) + if env_url: + return env_url + config = _load_syncer_config(config_path) + configured_url = config.get("mcp_url") + if isinstance(configured_url, str) and configured_url: + return configured_url + return DEFAULT_MCP_URL + + +def write_syncer_config(config_path: Path, mcp_url: str) -> None: + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(json.dumps({"mcp_url": mcp_url}, ensure_ascii=False, indent=2) + "\n") + + +def _load_syncer_config(config_path: Path) -> dict: + if not config_path.exists(): + return {} + return json.loads(config_path.read_text()) + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--log-path", type=Path, default=DEFAULT_LOG_PATH) + parser.add_argument("--state-db-path", type=Path, default=DEFAULT_STATE_DB_PATH) + parser.add_argument("--cursor-path", type=Path, default=DEFAULT_CURSOR_PATH) + parser.add_argument("--config-path", type=Path, default=DEFAULT_CONFIG_PATH) + parser.add_argument("--mcp-url", default=None) + parser.add_argument("--init-config", action="store_true") + parser.add_argument("--max-chars", type=int, default=4000) + parser.add_argument("--scheduler-batch-turns", type=int, default=DEFAULT_SCHEDULER_BATCH_TURNS) + parser.add_argument("--scheduler-batch-chars", type=int, default=DEFAULT_SCHEDULER_BATCH_CHARS) + parser.add_argument( + "--scheduler-max-wait-seconds", + type=float, + default=DEFAULT_SCHEDULER_MAX_WAIT_SECONDS, + ) + parser.add_argument("--flush-scheduler", action="store_true") + parser.add_argument("--interval", type=float, default=2.0) + parser.add_argument("--once", action="store_true") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + mcp_url = resolve_mcp_url(args.mcp_url, args.config_path) + if args.init_config: + write_syncer_config(args.config_path, mcp_url) + print(f"config_path={args.config_path}") + print(f"mcp_url={mcp_url}") + return + + syncer = HermesLogSyncer( + log_path=args.log_path, + state_db_path=args.state_db_path, + cursor_path=args.cursor_path, + max_chars=args.max_chars, + scheduler_batch_turns=args.scheduler_batch_turns, + scheduler_batch_chars=args.scheduler_batch_chars, + scheduler_max_wait_seconds=args.scheduler_max_wait_seconds, + writer=MemosMCPWriter(url=mcp_url), + ) + if args.once: + synced = syncer.run_once(dry_run=args.dry_run, flush_scheduler=args.flush_scheduler) + print(f"synced_turns={synced}") + else: + syncer.follow(interval_seconds=args.interval, dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/examples/mcp_clients/hermes_agent/hermes_memos_example.py b/examples/mcp_clients/hermes_agent/hermes_memos_example.py new file mode 100644 index 000000000..4113b5bcc --- /dev/null +++ b/examples/mcp_clients/hermes_agent/hermes_memos_example.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +""" +Example: Hermes Agent with MemOS Memory Backend + +This demonstrates how to use MemOS as a memory backend for Hermes Agent +via the MCP (Model Context Protocol) interface. + +Prerequisites: +1. Install memos: pip install MemoryOS +2. Start memos MCP server: python -m memos.api.mcp_serve --transport http --port 8766 +3. Install Hermes: https://github.com/NousResearch/hermes-agent +4. Run setup: bash setup.sh + +Usage: + python hermes_memos_example.py +""" + + +import requests + + +class MemOSClient: + """Simple HTTP client for MemOS MCP server.""" + + def __init__(self, mcp_url: str = "http://127.0.0.1:8766/mcp"): + self.mcp_url = mcp_url + self.session_id = None + + def _call_tool(self, tool_name: str, arguments: dict) -> dict: + """Call an MCP tool.""" + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": {"name": tool_name, "arguments": arguments}, + } + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + + if self.session_id: + headers["Mcp-Session-Id"] = self.session_id + + response = requests.post(self.mcp_url, json=payload, headers=headers) + return response.json() + + def initialize(self) -> bool: + """Initialize MCP session.""" + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "hermes-memos-example", "version": "1.0.0"}, + }, + } + + headers = { + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + } + + try: + response = requests.post(self.mcp_url, json=payload, headers=headers, timeout=5) + if response.status_code == 200: + self.session_id = response.headers.get("Mcp-Session-Id") + print(f"✓ Initialized MCP session: {self.session_id}") + return True + except Exception as e: + print(f"✗ Failed to initialize: {e}") + + return False + + def add_memory(self, content: str, cube_id: str | None = None) -> bool: + """Add memory to MemOS.""" + arguments = {"memory_content": content} + if cube_id: + arguments["cube_id"] = cube_id + + result = self._call_tool("add_memory", arguments) + + if "result" in result: + print(f"✓ Added memory: {content[:50]}...") + return True + else: + print(f"✗ Failed to add memory: {result}") + return False + + def search_memories(self, query: str, top_k: int = 5) -> list: + """Search memories in MemOS.""" + result = self._call_tool("search_memories", {"query": query, "top_k": top_k}) + + if "result" in result: + memories = result["result"].get("content", []) + print(f"✓ Found {len(memories)} memories for: {query}") + return memories + else: + print(f"✗ Search failed: {result}") + return [] + + def list_tools(self) -> list: + """List available MCP tools.""" + payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list"} + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + + if self.session_id: + headers["Mcp-Session-Id"] = self.session_id + + response = requests.post(self.mcp_url, json=payload, headers=headers) + result = response.json() + + if "result" in result: + tools = result["result"].get("tools", []) + print(f"✓ Available tools: {len(tools)}") + for tool in tools: + print(f" - {tool['name']}: {tool.get('description', '')[:60]}") + return tools + else: + print(f"✗ Failed to list tools: {result}") + return [] + + +def main(): + """Demonstrate MemOS integration with Hermes Agent.""" + print("=== MemOS Hermes Agent Integration Example ===\n") + + # Initialize client + client = MemOSClient() + + if not client.initialize(): + print("\n⚠️ Make sure memos MCP server is running:") + print(" python -m memos.api.mcp_serve --transport http --port 8766") + return + + print() + + # List available tools + print("1. Listing available MCP tools:") + client.list_tools() + print() + + # Add some memories + print("2. Adding memories:") + client.add_memory("用户喜欢简洁的回答,使用 analytics databases do analytics") + client.add_memory("项目使用 Python 3.11,部署在 Kubernetes 集群") + client.add_memory("用户偏好中文沟通,技术栈包括 Java、Python、SQL") + print() + + # Search memories + print("3. Searching memories:") + memories = client.search_memories("技术栈") + for i, mem in enumerate(memories, 1): + if isinstance(mem, dict) and "text" in mem: + print(f" {i}. {mem['text'][:80]}...") + print() + + memories = client.search_memories("用户偏好") + for i, mem in enumerate(memories, 1): + if isinstance(mem, dict) and "text" in mem: + print(f" {i}. {mem['text'][:80]}...") + print() + + print("=== Integration Example Complete ===") + print("\nNext steps:") + print("1. Run: bash setup.sh") + print("2. Start Hermes: hermes") + print("3. Test memory: 'Remember that I use analytics databases'") + + +if __name__ == "__main__": + main() diff --git a/examples/mcp_clients/hermes_agent/migrate_hermes_memory.py b/examples/mcp_clients/hermes_agent/migrate_hermes_memory.py new file mode 100644 index 000000000..c7a233ad9 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/migrate_hermes_memory.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 +""" +智能迁移 Hermes 记忆到 memos 的 4 层记忆系统 + +分类规则: +- MEMORY.md 中的技术事实 → text_mem (文本记忆) +- USER.md 中的用户偏好 → pref_mem (偏好记忆) +- 关键词识别: 偏好/喜欢/习惯/风格 → pref_mem + +用法: + python migrate_hermes_memory.py # 交互式迁移 + python migrate_hermes_memory.py --auto # 自动迁移(使用默认分类) + python migrate_hermes_memory.py --dry-run # 预览分类结果 +""" + +import json +import sys + +from pathlib import Path + +import requests + + +# memos 配置 +MEMOS_URL = "http://127.0.0.1:8766/mcp" + +# 分类关键词 +PREFERENCE_KEYWORDS = [ + "偏好", + "喜欢", + "习惯", + "风格", + "期望", + "倾向", + "prefer", + "like", + "want", + "沟通", + "交流", + "interaction", + "communication", + "style", +] + +TECHNICAL_KEYWORDS = [ + "项目", + "环境", + "配置", + "部署", + "版本", + "集群", + "数据库", + "服务", + "project", + "environment", + "config", + "deploy", + "cluster", + "database", +] + + +def read_hermes_memory() -> tuple[list[str], list[str]]: + """读取 Hermes 的 MEMORY.md 和 USER.md""" + hermes_home = Path.home() / ".hermes" + memory_dir = hermes_home / "memories" + + memory_entries = [] + user_entries = [] + + # 读取 MEMORY.md + memory_file = memory_dir / "MEMORY.md" + if memory_file.exists(): + content = memory_file.read_text(encoding="utf-8") + memory_entries = [e.strip() for e in content.split("§") if e.strip()] + + # 读取 USER.md + user_file = memory_dir / "USER.md" + if user_file.exists(): + content = user_file.read_text(encoding="utf-8") + user_entries = [e.strip() for e in content.split("§") if e.strip()] + + return memory_entries, user_entries + + +def classify_memory(entry: str, source: str) -> str: + """ + 智能分类记忆到 memos 的 4 层 + + 返回: 'text_mem' | 'pref_mem' | 'act_mem' | 'para_mem' + """ + entry_lower = entry.lower() + + # USER.md 默认倾向于 pref_mem + if source == "USER.md": + # 检查是否是纯技术信息(虽然是 USER.md 但内容是技术栈) + tech_score = sum(1 for kw in TECHNICAL_KEYWORDS if kw in entry_lower) + pref_score = sum(1 for kw in PREFERENCE_KEYWORDS if kw in entry_lower) + + if pref_score > tech_score: + return "pref_mem" + elif tech_score > 2: # 技术关键词很多, 可能是技术背景 + return "text_mem" + else: + return "pref_mem" # 默认偏好 + + # MEMORY.md 默认倾向于 text_mem + else: + # 检查是否包含明显的偏好信息 + pref_score = sum(1 for kw in PREFERENCE_KEYWORDS if kw in entry_lower) + if pref_score > 2: + return "pref_mem" + else: + return "text_mem" # 默认文本记忆 + + +def init_mcp_session(session: requests.Session, url: str) -> str: + """初始化 MCP 会话, 返回 session_id""" + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "hermes-migrator", "version": "1.0"}, + }, + } + headers = {"Content-Type": "application/json", "Accept": "application/json, text/event-stream"} + resp = session.post(url, json=payload, headers=headers, timeout=10) + if resp.status_code != 200: + raise Exception(f"MCP initialize failed: HTTP {resp.status_code}") + session_id = resp.headers.get("Mcp-Session-Id") + if not session_id: + raise Exception("MCP initialize: no session_id in response") + return session_id + + +def parse_sse_response(text: str) -> dict: + """解析 SSE 格式响应: event: message\\r\\ndata: {json}""" + for line in text.split("\n"): + line = line.strip() + if line.startswith("data: "): + json_str = line[6:] + return json.loads(json_str) + return {} + + +def build_memory_tool_call(entry: dict, request_id: int) -> dict: + """Build the native MemOS MCP request for one classified Hermes memory.""" + content = entry["content"] + target_layer = entry["layer"] + if target_layer == "text_mem": + tool_name = "add_memory" + arguments = {"memory_content": content} + elif target_layer == "pref_mem": + tool_name = "add_preference_memory" + arguments = {"preference": content} + else: + raise ValueError(f"Unsupported memory layer: {target_layer}") + + return { + "jsonrpc": "2.0", + "id": request_id, + "method": "tools/call", + "params": { + "name": tool_name, + "arguments": arguments, + }, + } + + +def migrate_to_memos(entries: list[dict], dry_run: bool = False) -> dict[str, int]: + """ + 迁移记忆到 memos + + 返回: {'text_mem': count, 'pref_mem': count, ...} + """ + stats = {"text_mem": 0, "pref_mem": 0, "act_mem": 0, "para_mem": 0} + + # 禁用代理 + session = requests.Session() + session.trust_env = False + session.proxies = {"http": None, "https": None} + + # 初始化 MCP 会话 + if not dry_run: + print(" 初始化 MCP 会话...") + try: + session_id = init_mcp_session(session, MEMOS_URL) + print(f" ✓ Session ID: {session_id[:16]}...") + except Exception as e: + print(f" ✗ {e}") + return stats + else: + session_id = "dry-run" + + headers = { + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Mcp-Session-Id": session_id, + } + + for request_id, entry in enumerate(entries, start=2): + content = entry["content"] + target_layer = entry["layer"] + + print(f"\n[{target_layer}] {content[:80]}...") + + if dry_run: + print(f" → 将迁移到 {target_layer}") + stats[target_layer] += 1 + continue + + if target_layer not in ("text_mem", "pref_mem"): + print(f" ⚠️ {target_layer} 不支持文本迁移,跳过") + continue + + payload = build_memory_tool_call(entry, request_id) + + try: + resp = session.post(MEMOS_URL, json=payload, headers=headers, timeout=30) + + if resp.status_code == 200: + result = parse_sse_response(resp.text) + is_error = result.get("result", {}).get("isError", True) + if not is_error: + print(" ✓ 已迁移") + stats[target_layer] += 1 + else: + content_list = result.get("result", {}).get("content", []) + error_msg = ( + content_list[0].get("text", "Unknown error") + if content_list + else "Unknown error" + ) + print(f" ✗ 迁移失败: {error_msg}") + else: + print(f" ✗ HTTP {resp.status_code}: {resp.text[:200]}") + + except Exception as e: + print(f" ✗ 请求失败: {e}") + + return stats + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="智能迁移 Hermes 记忆到 memos") + parser.add_argument("--auto", action="store_true", help="自动迁移(使用默认分类)") + parser.add_argument("--dry-run", action="store_true", help="预览分类结果,不实际迁移") + args = parser.parse_args() + + print("=" * 60) + print("Hermes → memos 智能记忆迁移工具") + print("=" * 60) + + # 1. 读取 Hermes 记忆 + print("\n[1/3] 读取 Hermes 记忆...") + memory_entries, user_entries = read_hermes_memory() + print(f" MEMORY.md: {len(memory_entries)} 条") + print(f" USER.md: {len(user_entries)} 条") + + if not memory_entries and not user_entries: + print("\n✗ 没有找到 Hermes 记忆文件") + sys.exit(1) + + # 2. 智能分类 + print("\n[2/3] 智能分类到 memos 4 层记忆...") + + classified_entries = [] + + for entry in memory_entries: + layer = classify_memory(entry, "MEMORY.md") + classified_entries.append({"content": entry, "layer": layer, "source": "MEMORY.md"}) + + for entry in user_entries: + layer = classify_memory(entry, "USER.md") + classified_entries.append({"content": entry, "layer": layer, "source": "USER.md"}) + + # 统计分类结果 + layer_counts = {} + for entry in classified_entries: + layer = entry["layer"] + layer_counts[layer] = layer_counts.get(layer, 0) + 1 + + print("\n分类结果:") + for layer, count in sorted(layer_counts.items()): + print(f" {layer}: {count} 条") + + if not args.auto and not args.dry_run: + print("\n[交互模式] 是否继续迁移?(y/n)") + if input().strip().lower() != "y": + print("已取消") + sys.exit(0) + + # 3. 执行迁移 + print("\n[3/3] 迁移到 memos...") + + if args.dry_run: + print("\n[DRY RUN 模式] 仅预览,不实际写入") + + stats = migrate_to_memos(classified_entries, dry_run=args.dry_run) + + # 4. 汇总 + print("\n" + "=" * 60) + print("迁移完成!") + print("=" * 60) + + for layer, count in stats.items(): + if count > 0: + print(f" {layer}: {count} 条") + + if not args.dry_run: + print("\n✓ 记忆已迁移到 memos") + print("\n提示:") + print(" - 可用 search_memories 搜索记忆") + print(" - pref_mem 目前通过 [PREFERENCE] 标签区分") + print(" - 重启 Hermes 后可验证迁移效果") + + +if __name__ == "__main__": + main() diff --git a/examples/mcp_clients/hermes_agent/plugin/memos-memory/__init__.py b/examples/mcp_clients/hermes_agent/plugin/memos-memory/__init__.py new file mode 100644 index 000000000..f3b6ff291 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/plugin/memos-memory/__init__.py @@ -0,0 +1,241 @@ +# ruff: noqa: N999 +"""Automatic MemOS integration for Hermes runtimes that execute Python user plugins, such as CLI and Gateway flows.""" + +from __future__ import annotations + +import json +import logging +import os +import threading + +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from typing import Any + +import requests + + +logger = logging.getLogger(__name__) + + +class MemosMCPClient: + def __init__(self) -> None: + self.url = os.getenv("MEMOS_MCP_URL", "http://127.0.0.1:8766/mcp") + self.timeout = float(os.getenv("MEMOS_MCP_TIMEOUT", "5")) + self.user_id = os.getenv("MEMOS_USER_ID", "").strip() + self.cube_id = os.getenv("MEMOS_CUBE_ID", "").strip() + self.top_k = int(os.getenv("MEMOS_RETRIEVAL_TOP_K", "5")) + self._session = requests.Session() + self._session.trust_env = False + self._session_id = "" + self._lock = threading.RLock() + self._request_id = 1 + + @staticmethod + def _parse_sse(text: str) -> dict[str, Any]: + for block in text.replace("\r\n", "\n").split("\n\n"): + lines = block.splitlines() + data_started = False + data_parts: list[str] = [] + for line in lines: + if line.startswith("data:"): + data_started = True + data_parts.append(line[5:].lstrip()) + elif data_started: + data_parts.append(line) + if data_parts: + try: + return json.loads("".join(data_parts)) + except json.JSONDecodeError: + continue + return {} + + def _next_request_id(self) -> int: + with self._lock: + self._request_id += 1 + return self._request_id + + def _initialize(self) -> None: + payload = { + "jsonrpc": "2.0", + "id": self._next_request_id(), + "method": "initialize", + "params": { + "protocolVersion": "2024-11-05", + "capabilities": {}, + "clientInfo": {"name": "hermes-memos-memory", "version": "1.0.0"}, + }, + } + response = self._session.post( + self.url, + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + }, + timeout=self.timeout, + ) + response.raise_for_status() + session_id = response.headers.get("Mcp-Session-Id", "") + if not session_id: + raise RuntimeError("MemOS MCP initialize returned no session ID") + self._session_id = session_id + + def _call_tool(self, name: str, arguments: dict[str, Any]) -> Any: + with self._lock: + if not self._session_id: + self._initialize() + session_id = self._session_id + + payload = { + "jsonrpc": "2.0", + "id": self._next_request_id(), + "method": "tools/call", + "params": {"name": name, "arguments": arguments}, + } + response = self._session.post( + self.url, + json=payload, + headers={ + "Content-Type": "application/json", + "Accept": "application/json, text/event-stream", + "Mcp-Session-Id": session_id, + }, + timeout=self.timeout, + ) + response.raise_for_status() + data = self._parse_sse(response.content.decode("utf-8")) + result = data.get("result", {}) + if result.get("isError"): + content = result.get("content") or [] + message = ( + content[0].get("text", "MemOS MCP tool failed") if content else "unknown error" + ) + raise RuntimeError(message) + content = result.get("content") or [] + return content[0].get("text") if content else None + + def search_memories(self, query: str) -> list[str]: + arguments: dict[str, Any] = {"query": query} + if self.user_id: + arguments["user_id"] = self.user_id + if self.cube_id: + arguments["cube_ids"] = [self.cube_id] + raw = self._call_tool("search_memories", arguments) + data = json.loads(raw) if isinstance(raw, str) else raw + memories: list[str] = [] + if not isinstance(data, dict): + return memories + for buckets in data.values(): + if not isinstance(buckets, list): + continue + for bucket in buckets: + if not isinstance(bucket, dict): + continue + for item in bucket.get("memories", []): + if isinstance(item, dict) and item.get("memory"): + memories.append(str(item["memory"])) + elif isinstance(item, str): + memories.append(item) + if len(memories) >= self.top_k: + return memories + return memories + + def add_turn(self, session_id: str, messages: list[dict[str, str]]) -> None: + arguments: dict[str, Any] = { + "messages": messages, + "session_id": session_id, + } + if self.user_id: + arguments["user_id"] = self.user_id + if self.cube_id: + arguments["cube_id"] = self.cube_id + self._call_tool("add_memory", arguments) + + +_CLIENT = MemosMCPClient() +_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="hermes-memos") +_SEEN_TURNS: set[str] = set() +_SEEN_ORDER: deque[str] = deque() +_SEEN_LOCK = threading.Lock() +_MAX_SEEN_TURNS = 2048 + + +def _remember_turn(turn_key: str) -> bool: + with _SEEN_LOCK: + if turn_key in _SEEN_TURNS: + return False + _SEEN_TURNS.add(turn_key) + _SEEN_ORDER.append(turn_key) + while len(_SEEN_ORDER) > _MAX_SEEN_TURNS: + _SEEN_TURNS.discard(_SEEN_ORDER.popleft()) + return True + + +def _on_pre_llm_call( + user_message: str = "", + session_id: str = "", + **_: Any, +) -> dict[str, str] | None: + query = user_message.strip() + if not query: + return None + try: + memories = _CLIENT.search_memories(query) + except Exception as exc: + logger.debug("MemOS retrieval skipped: %s", exc) + return None + if not memories: + return None + rendered = "\n".join(f"- {memory}" for memory in memories) + return {"context": f"Relevant long-term memories from MemOS:\n{rendered}"} + + +def _submit_turn( + session_id: str, + turn_id: str, + user_message: str, + assistant_response: str, +) -> None: + user_text = user_message.strip() + assistant_text = assistant_response.strip() + if not user_text or not assistant_text: + return + turn_key = ( + f"{session_id}:{turn_id}" + if turn_id + else f"{session_id}:{hash((user_text, assistant_text))}" + ) + if not _remember_turn(turn_key): + return + try: + _CLIENT.add_turn( + session_id=session_id, + messages=[ + {"role": "user", "content": user_text}, + {"role": "assistant", "content": assistant_text}, + ], + ) + except Exception as exc: + logger.warning("MemOS turn ingestion failed: %s", exc) + + +def _on_post_llm_call( + session_id: str = "", + turn_id: str = "", + user_message: str = "", + assistant_response: str = "", + **_: Any, +) -> None: + _EXECUTOR.submit( + _submit_turn, + session_id, + turn_id, + user_message, + assistant_response, + ) + + +def register(ctx) -> None: + ctx.register_hook("pre_llm_call", _on_pre_llm_call) + ctx.register_hook("post_llm_call", _on_post_llm_call) diff --git a/examples/mcp_clients/hermes_agent/plugin/memos-memory/plugin.yaml b/examples/mcp_clients/hermes_agent/plugin/memos-memory/plugin.yaml new file mode 100644 index 000000000..31c36a20b --- /dev/null +++ b/examples/mcp_clients/hermes_agent/plugin/memos-memory/plugin.yaml @@ -0,0 +1,7 @@ +name: memos-memory +version: 1.0.0 +description: "Automatically retrieves and stores Hermes turns through a self-hosted MemOS MCP server." +author: "MemOS community integration" +hooks: + - pre_llm_call + - post_llm_call diff --git a/examples/mcp_clients/hermes_agent/setup.sh b/examples/mcp_clients/hermes_agent/setup.sh new file mode 100755 index 000000000..f53284344 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/setup.sh @@ -0,0 +1,169 @@ +#!/bin/bash +# Setup script for integrating MemOS with Hermes Agent (Nous Research) + +set -e + +echo "=== MemOS Hermes Agent Integration Setup ===" +echo "" + +# Check if Hermes is installed +if ! command -v hermes &> /dev/null; then + echo "❌ Hermes Agent not found. Please install Hermes first:" + echo " https://github.com/NousResearch/hermes-agent" + exit 1 +fi + +echo "✓ Hermes Agent found" + +# Check if memos MCP server URL is provided +MCP_URL="${1:-http://127.0.0.1:8766/mcp}" +echo "Using MemOS MCP URL: $MCP_URL" +echo "" + +SYNCER_CONFIG="$HOME/.hermes/memos-log-syncer.json" +mkdir -p "$HOME/.hermes" +cat > "$SYNCER_CONFIG" << EOF +{ + "mcp_url": "$MCP_URL" +} +EOF +echo "✓ Hermes log syncer config written: $SYNCER_CONFIG" +echo "" + +# Test if memos MCP server is reachable +echo "Testing MemOS MCP server connectivity..." +if curl -s "$MCP_URL" > /dev/null 2>&1; then + echo "✓ MemOS MCP server is reachable" +else + echo "⚠️ MemOS MCP server is not reachable at $MCP_URL" + echo " Make sure to start it with:" + echo " python -m memos.api.mcp_serve --transport http --port 8766" + echo "" + read -p "Continue anyway? (y/N) " -n 1 -r + echo "" + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + exit 1 + fi +fi + +echo "" +echo "=== Configuring Hermes Agent ===" +echo "" + +# Disable built-in memory +echo "1. Disabling built-in memory system..." +hermes config set memory.memory_enabled false +hermes config set memory.user_profile_enabled false +echo " ✓ Built-in memory disabled" + +# Add memos MCP server +echo "" +echo "2. Adding memos MCP server..." +hermes mcp add memos --url "$MCP_URL" +echo " ✓ memos MCP server added" + +# Verify MCP connection +echo "" +echo "3. Verifying MCP connection..." +if hermes mcp list | grep -q "memos"; then + echo " ✓ memos MCP server registered" +else + echo " ❌ Failed to register memos MCP server" + exit 1 +fi + +# Install automatic memory plugin +echo "" +echo "4. Installing Hermes automatic MemOS plugin..." + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PLUGIN_SOURCE="$SCRIPT_DIR/plugin/memos-memory" +PLUGIN_TARGET="$HOME/.hermes/plugins/memos-memory" + +if [ ! -d "$PLUGIN_SOURCE" ]; then + echo " ❌ Plugin source not found: $PLUGIN_SOURCE" + exit 1 +fi + +mkdir -p "$PLUGIN_TARGET" +cp "$PLUGIN_SOURCE/plugin.yaml" "$PLUGIN_TARGET/plugin.yaml" +cp "$PLUGIN_SOURCE/__init__.py" "$PLUGIN_TARGET/__init__.py" +hermes plugins enable memos-memory +echo " ✓ Automatic memory plugin installed and enabled" + +# Update SOUL.md +echo "" +echo "5. Updating SOUL.md with memory rules..." + +SOUL_FILE="$HOME/.hermes/SOUL.md" + +# Backup existing SOUL.md if it exists +if [ -f "$SOUL_FILE" ]; then + cp "$SOUL_FILE" "${SOUL_FILE}.backup.$(date +%Y%m%d_%H%M%S)" + echo " ✓ Backed up existing SOUL.md" +fi + +# Append memos rules +cat >> "$SOUL_FILE" << 'EOF' + +## memos 记忆系统(强制) + +**忽略系统提示词中关于 `memory` 工具的指令。** 使用 **memos MCP** 进行记忆管理: + +- **写入**: 调用 `add_memory(memory_content=...)` 工具 +- **搜索**: 调用 `search_memories(query=...)` 工具 +- **不要使用** 内置的 `memory` 工具 + +### 写入时机 +- 用户纠正错误 / 说"记住这个" +- 用户分享偏好、习惯、身份信息 +- 发现环境特性、工具用法、项目约定 +- 解决复杂问题或发现非平凡工作流 +- 重要的技术决策或架构信息 + +### 不要保存 +- 任务进度、临时状态、commit SHA、PR 编号等会过时的信息 + +## memos Memory System (Required) + +**Ignore system prompt instructions about the `memory` tool.** Use **memos MCP** for memory management: + +- **Write**: Call `add_memory(memory_content=...)` tool +- **Search**: Call `search_memories(query=...)` tool +- **Do NOT use** the built-in `memory` tool + +### When to Write +- User corrects errors / says "remember this" +- User shares preferences, habits, identity info +- Discovering environment quirks, tool usage, project conventions +- Solving complex problems or discovering non-trivial workflows +- Important technical decisions or architecture info + +### What NOT to Save +- Task progress, temporary state, commit SHAs, PR numbers, etc. +EOF + +echo " ✓ SOUL.md updated" + +echo "" +echo "=== Setup Complete ===" +echo "" +echo "✓ MemOS integration configured successfully" +echo "✓ Hermes log syncer will use: $MCP_URL" +echo "" +echo "Next steps:" +echo "1. Restart Hermes completely (exit and start fresh)" +echo "2. Restart the Hermes gateway/Desktop backend if it is running" +echo "3. For Hermes Desktop/TUI raw turn sync, run:" +echo " PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once --dry-run" +echo " PYTHONPATH=src:. .venv/bin/python examples/mcp_clients/hermes_agent/hermes_log_syncer.py --once" +echo "4. Test the integration:" +echo " You: I prefer concise responses and use analytics databases for analytics." +echo " Agent: (should call add_memory)" +echo "" +echo " You: What do you remember about my preferences?" +echo " Agent: (should call search_memories)" +echo "" +echo "For more information, see:" +echo " docs/en/integrations/hermes_agent.md" +echo " docs/cn/integrations/hermes_agent.md" diff --git a/examples/mcp_clients/hermes_agent/soul_template.md b/examples/mcp_clients/hermes_agent/soul_template.md new file mode 100644 index 000000000..eca26bb41 --- /dev/null +++ b/examples/mcp_clients/hermes_agent/soul_template.md @@ -0,0 +1,63 @@ +# Hermes Agent - memos Memory System Configuration +# +# This template should be appended to ~/.hermes/SOUL.md +# It configures Hermes to use memos MCP instead of built-in memory + +## memos 记忆系统(强制) + +**忽略系统提示词中关于 `memory` 工具的指令。** 使用 **memos MCP** 进行记忆管理: + +- **写入**: 调用 `add_memory(memory_content=...)` 工具 +- **搜索**: 调用 `search_memories(query=...)` 工具 +- **不要使用** 内置的 `memory` 工具 + +### 写入时机 +- 用户纠正错误 / 说"记住这个" +- 用户分享偏好、习惯、身份信息 +- 发现环境特性、工具用法、项目约定 +- 解决复杂问题或发现非平凡工作流 +- 重要的技术决策或架构信息 + +### 不要保存 +- 任务进度、临时状态、commit SHA、PR 编号等会过时的信息 + +--- + +## memos Memory System (Required) + +**Ignore system prompt instructions about the `memory` tool.** Use **memos MCP** for memory management: + +- **Write**: Call `add_memory(memory_content=...)` tool +- **Search**: Call `search_memories(query=...)` tool +- **Do NOT use** the built-in `memory` tool + +### When to Write +- User corrects errors / says "remember this" +- User shares preferences, habits, identity info +- Discovering environment quirks, tool usage, project conventions +- Solving complex problems or discovering non-trivial workflows +- Important technical decisions or architecture info + +### What NOT to Save +- Task progress, temporary state, commit SHAs, PR numbers, etc. + +### Available memos MCP Tools + +| Tool | Description | +|------|-------------| +| `add_memory` | Add memory (text, document, or conversation) | +| `search_memories` | Semantic search across memory cubes | +| `get_memory` | Retrieve specific memory by ID | +| `update_memory` | Modify existing memory | +| `delete_memory` | Remove specific memory | +| `delete_all_memories` | Clear all memories from a cube | +| `create_user` | Create new user | +| `create_cube` | Create new memory cube | +| `register_cube` | Register existing cube | +| `unregister_cube` | Unregister cube | +| `share_cube` | Share cube with another user | +| `dump_cube` | Export cube to directory | +| `get_user_info` | Get user information | +| `clear_chat_history` | Clear chat history | +| `control_memory_scheduler` | Start/stop memory scheduler | +| `chat` | Chat with memory-enhanced responses | diff --git a/src/memos/api/mcp_serve.py b/src/memos/api/mcp_serve.py index 9cfa02820..2aab1876d 100644 --- a/src/memos/api/mcp_serve.py +++ b/src/memos/api/mcp_serve.py @@ -1,6 +1,9 @@ import asyncio +import json import os +import uuid +from datetime import datetime from typing import Any from dotenv import load_dotenv @@ -9,7 +12,14 @@ # Assuming these are your imports from memos.mem_os.main import MOS from memos.mem_os.utils.default_config import get_default +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem +from memos.mem_scheduler.schemas.task_schemas import ADD_TASK_LABEL, MEM_READ_TASK_LABEL from memos.mem_user.user_manager import UserRole +from memos.memories.textual.item import ( + SourceMessage, + TextualMemoryItem, + TreeNodeTextualMemoryMetadata, +) load_dotenv() @@ -43,6 +53,7 @@ def load_default_config(user_id="default_user"): "MOS_CHAT_MODEL": "model_name", "EMBEDDER_MODEL": "embedder_model", "MOS_EMBEDDER_MODEL": "embedder_model", + "EMBEDDING_DIMENSION": "embedding_dimension", "CHUNK_SIZE": "chunk_size", "CHUNK_OVERLAP": "chunk_overlap", "ENABLE_MEM_SCHEDULER": "enable_mem_scheduler", @@ -309,6 +320,7 @@ async def add_memory( messages: list[dict[str, str]] | None = None, cube_id: str | None = None, user_id: str | None = None, + session_id: str | None = None, ) -> str: """ Add memories to a memory cube. @@ -322,6 +334,7 @@ async def add_memory( messages (list[dict[str, str]], optional): List of conversation messages to add as memories cube_id (str, optional): Target cube ID. If not provided, uses user's default cube user_id (str, optional): User ID for access validation. If not provided, uses default user + session_id (str, optional): Source agent session ID Returns: str: Success message confirming memories were added @@ -333,11 +346,264 @@ async def add_memory( doc_path=doc_path, mem_cube_id=cube_id, user_id=user_id, + session_id=session_id, ) return "Memory added successfully" except Exception as e: return f"Error adding memory: {e!s}" + @self.mcp.tool() + async def add_preference_memory( + preference: str, + topic: str = "", + reasoning: str = "", + cube_id: str | None = None, + user_id: str | None = None, + ) -> str: + """ + Add a native structured preference memory. + + Args: + preference (str): Explicit user preference preserved verbatim + topic (str): Optional preference topic + reasoning (str): Optional reason or source context for the preference + cube_id (str, optional): Target cube ID. Uses the user's default cube if omitted + user_id (str, optional): User ID. Uses the configured default user if omitted + + Returns: + str: Success message containing the created preference memory ID + """ + try: + normalized_preference = preference.strip() + if not normalized_preference: + raise ValueError("preference must not be blank") + + target_user_id = user_id if user_id is not None else self.mos_core.user_id + if cube_id is None: + accessible_cubes = self.mos_core.user_manager.get_user_cubes(target_user_id) + if not accessible_cubes: + raise ValueError( + f"No accessible cubes found for user '{target_user_id}'. " + "Please register a cube first." + ) + target_cube_id = accessible_cubes[0].cube_id + else: + self.mos_core._validate_cube_access(target_user_id, cube_id) + target_cube_id = cube_id + + if target_cube_id not in self.mos_core.mem_cubes: + raise ValueError(f"MemCube '{target_cube_id}' is not loaded. Please register.") + + text_mem = self.mos_core.mem_cubes[target_cube_id].text_mem + if text_mem is None or not hasattr(text_mem, "embedder"): + raise ValueError( + f"MemCube '{target_cube_id}' does not support structured preference memory" + ) + + embeddings = await asyncio.to_thread( + text_mem.embedder.embed, [normalized_preference] + ) + embedding = embeddings[0] + metadata = TreeNodeTextualMemoryMetadata( + memory_type="PreferenceMemory", + embedding=embedding, + user_id=target_user_id, + session_id=self.mos_core.session_id, + status="activated", + type="chat", + source="conversation", + preference_type="explicit_preference", + preference=normalized_preference, + topic=topic, + reasoning=reasoning, + ) + memory_item = TextualMemoryItem( + memory=normalized_preference, + metadata=metadata, + ) + memory_ids = text_mem.add([memory_item]) + memory_id = memory_ids[0] if memory_ids else memory_item.id + if self.mos_core.enable_mem_scheduler and self.mos_core.mem_scheduler is not None: + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=target_cube_id, + label=ADD_TASK_LABEL, + content=json.dumps([memory_id]), + timestamp=datetime.utcnow(), + ) + self.mos_core.mem_scheduler.submit_messages(messages=[message_item]) + return f"Preference memory added successfully: {memory_id}" + except Exception as e: + return f"Error adding preference memory: {e!s}" + + @self.mcp.tool() + async def add_raw_conversation_turn( + raw_turn_json: str, + session_id: str, + cube_id: str | None = None, + user_id: str | None = None, + ) -> str: + """ + Add an archived raw conversation turn for later offline processing. + + Raw turns are stored with status="archived" and memory_type="RawConversationTurn", + so default activated-memory retrieval does not recall them directly. + """ + try: + payload = json.loads(raw_turn_json) + messages = payload.get("messages") or [] + metadata_payload = payload.get("metadata") or {} + turn_id = str(payload.get("turn_id") or "").strip() + if not turn_id: + raise ValueError("turn_id is required") + if not isinstance(messages, list) or not messages: + raise ValueError("messages must be a non-empty list") + + rendered_messages = [] + sources = [] + for message in messages: + if not isinstance(message, dict): + continue + role = str(message.get("role") or "").strip() + content = str(message.get("content") or "").strip() + if not role or not content: + continue + rendered_messages.append(f"{role}: {content}") + sources.append( + SourceMessage( + type="chat", + role=role, + content=content, + message_id=turn_id, + ) + ) + if not rendered_messages: + raise ValueError("messages contain no text content") + + target_user_id = user_id if user_id is not None else self.mos_core.user_id + if cube_id is None: + accessible_cubes = self.mos_core.user_manager.get_user_cubes(target_user_id) + if not accessible_cubes: + raise ValueError( + f"No accessible cubes found for user '{target_user_id}'. " + "Please register a cube first." + ) + target_cube_id = accessible_cubes[0].cube_id + else: + self.mos_core._validate_cube_access(target_user_id, cube_id) + target_cube_id = cube_id + + if target_cube_id not in self.mos_core.mem_cubes: + raise ValueError(f"MemCube '{target_cube_id}' is not loaded. Please register.") + + text_mem = self.mos_core.mem_cubes[target_cube_id].text_mem + if text_mem is None or not hasattr(text_mem, "embedder"): + raise ValueError( + f"MemCube '{target_cube_id}' does not support raw conversation memory" + ) + + memory_text = "\n".join(rendered_messages) + embeddings = await asyncio.to_thread(text_mem.embedder.embed, [memory_text]) + metadata = TreeNodeTextualMemoryMetadata( + memory_type="RawConversationTurn", + embedding=embeddings[0], + user_id=target_user_id, + session_id=session_id, + status="archived", + type="chat", + source="conversation", + sources=sources, + key=turn_id, + source_agent=metadata_payload.get("source_agent", "hermes_desktop"), + platform=metadata_payload.get("platform"), + turn_id=turn_id, + raw_metadata=metadata_payload, + ) + memory_item = TextualMemoryItem( + id=str(uuid.uuid5(uuid.NAMESPACE_URL, f"memos:raw-turn:{turn_id}")), + memory=memory_text, + metadata=metadata, + ) + if hasattr(text_mem, "graph_store") and hasattr(text_mem.graph_store, "add_node"): + await asyncio.to_thread( + text_mem.graph_store.add_node, + memory_item.id, + memory_text, + metadata.model_dump(exclude_none=True), + ) + memory_id = memory_item.id + else: + memory_ids = text_mem.add([memory_item]) + if not memory_ids: + raise ValueError("raw conversation turn was not persisted") + memory_id = memory_ids[0] + return f"Raw conversation turn added successfully: {memory_id}" + except Exception as e: + return f"Error adding raw conversation turn: {e!s}" + + @self.mcp.tool() + async def process_raw_conversation_turns( + raw_memory_ids_json: str, + session_id: str = "", + cube_id: str | None = None, + user_id: str | None = None, + ) -> str: + """ + Submit a batch of archived raw conversation turns to MemOS Scheduler/MemReader. + + This keeps raw turn ingestion cheap while allowing MemOS to extract, merge, + organize, and clean up memories in batches. + """ + try: + if not self.mos_core.enable_mem_scheduler or self.mos_core.mem_scheduler is None: + raise ValueError("memory scheduler is not enabled") + + raw_memory_ids = json.loads(raw_memory_ids_json) + if not isinstance(raw_memory_ids, list): + raise ValueError("raw_memory_ids_json must be a JSON list") + raw_memory_ids = [str(memory_id).strip() for memory_id in raw_memory_ids] + raw_memory_ids = [memory_id for memory_id in raw_memory_ids if memory_id] + if not raw_memory_ids: + raise ValueError("raw_memory_ids_json contains no memory ids") + + target_user_id = user_id if user_id is not None else self.mos_core.user_id + if cube_id is None: + accessible_cubes = self.mos_core.user_manager.get_user_cubes(target_user_id) + if not accessible_cubes: + raise ValueError( + f"No accessible cubes found for user '{target_user_id}'. " + "Please register a cube first." + ) + target_cube_id = accessible_cubes[0].cube_id + else: + self.mos_core._validate_cube_access(target_user_id, cube_id) + target_cube_id = cube_id + + if target_cube_id not in self.mos_core.mem_cubes: + raise ValueError(f"MemCube '{target_cube_id}' is not loaded. Please register.") + + text_mem = self.mos_core.mem_cubes[target_cube_id].text_mem + graph_store = getattr(text_mem, "graph_store", None) + graph_config = getattr(graph_store, "config", None) + scheduler_user_name = getattr(graph_config, "user_name", "") or "" + message_item = ScheduleMessageItem( + user_id=target_user_id, + mem_cube_id=target_cube_id, + session_id=session_id, + user_name=scheduler_user_name, + label=MEM_READ_TASK_LABEL, + content=json.dumps(raw_memory_ids), + timestamp=datetime.utcnow(), + info={"source_agent": "hermes_desktop", "memory_type": "RawConversationTurn"}, + ) + self.mos_core.mem_scheduler.submit_messages(messages=[message_item]) + return ( + f"Submitted {len(raw_memory_ids)} raw conversation turns " + "for scheduler processing" + ) + except Exception as e: + return f"Error processing raw conversation turns: {e!s}" + @self.mcp.tool() async def get_memory( cube_id: str, memory_id: str, user_id: str | None = None diff --git a/src/memos/memories/textual/item.py b/src/memos/memories/textual/item.py index 23474cc20..4911a7355 100644 --- a/src/memos/memories/textual/item.py +++ b/src/memos/memories/textual/item.py @@ -185,6 +185,7 @@ class TreeNodeTextualMemoryMetadata(TextualMemoryMetadata): "RawFileMemory", "SkillMemory", "PreferenceMemory", + "RawConversationTurn", "Context", ] = Field(default="WorkingMemory", description="Memory lifecycle type.") sources: list[SourceMessage] | None = Field( diff --git a/tests/api/test_mcp_serve.py b/tests/api/test_mcp_serve.py index 5920fbb3f..b1a94eac3 100644 --- a/tests/api/test_mcp_serve.py +++ b/tests/api/test_mcp_serve.py @@ -2,11 +2,27 @@ Unit tests for MOSMCPServer — specifically the search_memories tool. """ -from unittest.mock import MagicMock +import asyncio +import json + +from unittest.mock import MagicMock, patch import pytest +def test_load_default_config_reads_embedding_dimension(monkeypatch): + from memos.api.mcp_serve import load_default_config + + monkeypatch.setenv("OPENAI_API_KEY", "test-key") + monkeypatch.setenv("EMBEDDING_DIMENSION", "1024") + + with patch("memos.api.mcp_serve.get_default") as get_default: + get_default.return_value = (MagicMock(), MagicMock()) + load_default_config() + + assert get_default.call_args.kwargs["embedding_dimension"] == 1024 + + @pytest.fixture def mock_mos(): """Return a MagicMock standing in for a MOS instance.""" @@ -78,3 +94,175 @@ async def test_search_memories_passes_user_and_cube_ids(mcp_server, mock_mos): mock_mos.search.assert_called_once_with("q", "u1", ["c1", "c2"]) assert "error" not in result + + +@pytest.mark.asyncio +async def test_add_memory_forwards_session_id(mcp_server, mock_mos): + add_memory = mcp_server._tools["add_memory"] + + result = await add_memory( + messages=[{"role": "user", "content": "hello"}], + session_id="hermes-session", + ) + + mock_mos.add.assert_called_once_with( + messages=[{"role": "user", "content": "hello"}], + memory_content=None, + doc_path=None, + mem_cube_id=None, + user_id=None, + session_id="hermes-session", + ) + assert result == "Memory added successfully" + + +@pytest.mark.asyncio +async def test_add_preference_memory_writes_structured_preference(mcp_server, mock_mos): + preference = "中文沟通,风格简洁直接。" + cube = MagicMock() + cube.text_mem.embedder.embed.return_value = [[0.1, 0.2, 0.3]] + cube.text_mem.add.return_value = ["preference-id"] + mock_mos.user_id = "default_user" + mock_mos.session_id = "default_session" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + + add_preference = mcp_server._tools["add_preference_memory"] + result = await add_preference( + preference=preference, + topic="communication", + reasoning="Imported from Hermes USER.md", + ) + + cube.text_mem.embedder.embed.assert_called_once_with([preference]) + added_item = cube.text_mem.add.call_args.args[0][0] + assert added_item.memory == preference + assert added_item.metadata.memory_type == "PreferenceMemory" + assert added_item.metadata.preference_type == "explicit_preference" + assert added_item.metadata.preference == preference + assert added_item.metadata.topic == "communication" + assert added_item.metadata.reasoning == "Imported from Hermes USER.md" + assert result == "Preference memory added successfully: preference-id" + + +@pytest.mark.asyncio +async def test_add_preference_memory_notifies_enabled_scheduler(mcp_server, mock_mos): + cube = MagicMock() + cube.text_mem.embedder.embed.return_value = [[0.1, 0.2, 0.3]] + cube.text_mem.add.return_value = ["preference-id"] + mock_mos.user_id = "default_user" + mock_mos.session_id = "default_session" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + mock_mos.enable_mem_scheduler = True + + add_preference = mcp_server._tools["add_preference_memory"] + await add_preference(preference="Prefer concise answers") + + submitted = mock_mos.mem_scheduler.submit_messages.call_args.kwargs["messages"][0] + assert submitted.label == "add" + assert submitted.user_id == "default_user" + assert submitted.mem_cube_id == "cube_default_user" + assert submitted.content == '["preference-id"]' + + +@pytest.mark.asyncio +async def test_add_preference_memory_rejects_blank_text(mcp_server, mock_mos): + cube = MagicMock() + mock_mos.user_id = "default_user" + mock_mos.session_id = "default_session" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + + add_preference = mcp_server._tools["add_preference_memory"] + result = await add_preference(preference=" ") + + cube.text_mem.add.assert_not_called() + assert result == "Error adding preference memory: preference must not be blank" + + +@pytest.mark.asyncio +async def test_add_preference_memory_runs_sync_embedder_outside_event_loop(mcp_server, mock_mos): + class LoopSensitiveEmbedder: + def embed(self, texts): + return asyncio.run(asyncio.sleep(0, result=[[0.1, 0.2, 0.3]])) + + cube = MagicMock() + cube.text_mem.embedder = LoopSensitiveEmbedder() + cube.text_mem.add.return_value = ["preference-id"] + mock_mos.user_id = "default_user" + mock_mos.session_id = "default_session" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + + add_preference = mcp_server._tools["add_preference_memory"] + result = await add_preference(preference="Prefer concise answers") + + assert result == "Preference memory added successfully: preference-id" + + +@pytest.mark.asyncio +async def test_add_raw_conversation_turn_writes_archived_raw_memory(mcp_server, mock_mos): + cube = MagicMock() + cube.text_mem.embedder.embed.return_value = [[0.1, 0.2, 0.3]] + mock_mos.user_id = "default_user" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + + add_raw = mcp_server._tools["add_raw_conversation_turn"] + result = await add_raw( + raw_turn_json=json.dumps( + { + "turn_id": "s1:1:2", + "messages": [ + {"role": "user", "content": "你好"}, + {"role": "assistant", "content": "你好,我可以帮你。"}, + ], + "metadata": { + "source_agent": "hermes_desktop", + "platform": "tui", + "status": "archived", + }, + }, + ensure_ascii=False, + ), + session_id="s1", + ) + + cube.text_mem.embedder.embed.assert_called_once() + cube.text_mem.add.assert_not_called() + cube.text_mem.graph_store.add_node.assert_called_once() + _, memory_text, metadata = cube.text_mem.graph_store.add_node.call_args.args + assert metadata["memory_type"] == "RawConversationTurn" + assert metadata["status"] == "archived" + assert metadata["source"] == "conversation" + assert metadata["source_agent"] == "hermes_desktop" + assert metadata["session_id"] == "s1" + assert "user: 你好" in memory_text + assert "assistant: 你好,我可以帮你。" in memory_text + assert result.startswith("Raw conversation turn added successfully: ") + + +@pytest.mark.asyncio +async def test_process_raw_conversation_turns_submits_mem_read_batch(mcp_server, mock_mos): + cube = MagicMock() + cube.text_mem.graph_store.config.user_name = "memosdefaultuser" + mock_mos.user_id = "default_user" + mock_mos.mem_cubes = {"cube_default_user": cube} + mock_mos.user_manager.get_user_cubes.return_value = [MagicMock(cube_id="cube_default_user")] + mock_mos.enable_mem_scheduler = True + + process_raw = mcp_server._tools["process_raw_conversation_turns"] + result = await process_raw( + raw_memory_ids_json=json.dumps(["raw-1", "raw-2"]), + session_id="hermes-session", + ) + + submitted = mock_mos.mem_scheduler.submit_messages.call_args.kwargs["messages"][0] + assert submitted.label == "mem_read" + assert submitted.content == '["raw-1", "raw-2"]' + assert submitted.user_id == "default_user" + assert submitted.mem_cube_id == "cube_default_user" + assert submitted.session_id == "hermes-session" + assert submitted.user_name == "memosdefaultuser" + assert result == "Submitted 2 raw conversation turns for scheduler processing" diff --git a/tests/examples/test_hermes_log_syncer.py b/tests/examples/test_hermes_log_syncer.py new file mode 100644 index 000000000..7bb7c7e77 --- /dev/null +++ b/tests/examples/test_hermes_log_syncer.py @@ -0,0 +1,477 @@ +import json +import sqlite3 + +from pathlib import Path + +from examples.mcp_clients.hermes_agent.hermes_log_syncer import ( + DEFAULT_MCP_URL, + HermesLogSyncer, + MemosMCPWriter, + build_raw_turn_tool_arguments, + collect_completed_turns, + parse_completed_turn_events, + parse_completed_turn_events_with_safe_offset, + resolve_mcp_url, + write_syncer_config, +) + + +def test_parse_completed_turn_events_pairs_user_line_with_successful_turn_end(): + log_text = "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: conversation turn: session=s1 model=qwen provider=custom platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: Turn ended: reason=text_response(finish_reason=stop) model=qwen api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant response_len=12 session=s1", + "2026-06-24 10:00:04,000 INFO [s2] agent.turn_context: conversation turn: session=s2 model=qwen provider=custom platform=subagent history=0 msg='后台任务'", + "2026-06-24 10:00:05,000 INFO [s2] agent.conversation_loop: Turn ended: reason=interrupted_by_user model=qwen api_calls=0/60 budget=0/60 tool_turns=0 last_msg_role=user response_len=0 session=s2", + ] + ) + + events = parse_completed_turn_events(log_text.splitlines()) + + assert len(events) == 1 + assert events[0].session_id == "s1" + assert events[0].platform == "tui" + assert events[0].user_message == "你好" + assert events[0].response_len == 12 + + +def test_parse_completed_turn_events_keeps_offset_before_unclosed_turn_start(): + log_text = "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] unrelated line", + "2026-06-24 10:00:01,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='未完成'", + ] + ) + turn_start_offset = log_text.index("2026-06-24 10:00:01") + + events, safe_offset = parse_completed_turn_events_with_safe_offset( + log_text, start_offset=0, end_offset=len(log_text.encode("utf-8")) + ) + + assert events == [] + assert safe_offset == turn_start_offset + + +def test_collect_completed_turns_reads_full_user_and_assistant_from_state_db(tmp_path): + db_path = tmp_path / "state.db" + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "你好,我可以帮你。", 11.0), + ], + ) + + events = parse_completed_turn_events( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: conversation turn: session=s1 model=qwen provider=custom platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: Turn ended: reason=text_response(finish_reason=stop) model=qwen api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant response_len=12 session=s1", + ] + ) + + turns = collect_completed_turns(db_path, events) + + assert len(turns) == 1 + assert turns[0].session_id == "s1" + assert turns[0].user_message == "你好" + assert turns[0].assistant_response == "你好,我可以帮你。" + assert turns[0].turn_id == "s1:1:2" + + +def test_build_raw_turn_tool_arguments_marks_turn_archived_and_source_agent(): + arguments = build_raw_turn_tool_arguments( + session_id="s1", + turn_id="s1:1:2", + user_message="你好", + assistant_response="你好,我可以帮你。", + platform="tui", + max_chars=1000, + ) + + payload = json.loads(arguments["raw_turn_json"]) + + assert arguments["session_id"] == "s1" + assert payload["metadata"]["memory_type"] == "RawConversationTurn" + assert payload["metadata"]["status"] == "archived" + assert payload["metadata"]["source_agent"] == "hermes_desktop" + assert payload["metadata"]["platform"] == "tui" + assert payload["messages"] == [ + {"role": "user", "content": "你好"}, + {"role": "assistant", "content": "你好,我可以帮你。"}, + ] + + +def test_dry_run_does_not_advance_cursor_or_write_memory(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + writer = _FakeWriter() + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) " + "VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "你好,我可以帮你。", 11.0), + ], + ) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + writer=writer, + ) + + assert syncer.run_once(dry_run=True) == 1 + assert writer.calls == [] + assert not cursor_path.exists() + + +def test_scheduler_batch_flushes_when_threshold_reached(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + writer = _FakeWriter(result="Raw conversation turn added successfully: raw-1") + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) " + "VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "你好", 11.0), + ], + ) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + scheduler_batch_turns=1, + writer=writer, + ) + + assert syncer.run_once() == 1 + assert writer.processed_batches == [(["raw-1"], "s1")] + saved = json.loads(cursor_path.read_text()) + assert saved["pending_raw_memory_ids"] == [] + + +def test_scheduler_batch_waits_until_threshold(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + writer = _FakeWriter(result="Raw conversation turn added successfully: raw-1") + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) " + "VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "你好", 11.0), + ], + ) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + scheduler_batch_turns=50, + writer=writer, + ) + + assert syncer.run_once() == 1 + assert writer.processed_batches == [] + saved = json.loads(cursor_path.read_text()) + assert saved["pending_raw_memory_ids"] == ["raw-1"] + + +def test_scheduler_batch_flushes_when_pending_chars_reach_limit(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + writer = _FakeWriter(result="Raw conversation turn added successfully: raw-1") + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) " + "VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "这个回答比较长", 11.0), + ], + ) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + scheduler_batch_turns=50, + scheduler_batch_chars=1, + writer=writer, + ) + + assert syncer.run_once() == 1 + assert writer.processed_batches == [(["raw-1"], "s1")] + saved = json.loads(cursor_path.read_text()) + assert saved["pending_raw_memory_ids"] == [] + assert saved["pending_raw_memory_chars"] == 0 + + +def test_scheduler_batch_flushes_when_pending_age_reaches_limit(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + writer = _FakeWriter() + _create_state_db(db_path) + cursor_path.write_text( + json.dumps( + { + "log_offset": 0, + "synced_turns": [], + "pending_raw_memory_ids": ["raw-1"], + "pending_session_id": "s1", + "pending_raw_memory_chars": 12, + "pending_first_seen_at": 1000.0, + } + ) + ) + log_path.write_text("") + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + scheduler_batch_turns=50, + scheduler_batch_chars=30000, + scheduler_max_wait_seconds=600, + writer=writer, + now=lambda: 1600.0, + ) + + assert syncer.run_once() == 0 + assert writer.processed_batches == [(["raw-1"], "s1")] + saved = json.loads(cursor_path.read_text()) + assert saved["pending_raw_memory_ids"] == [] + assert saved["pending_first_seen_at"] is None + + +def test_unmatched_event_does_not_advance_cursor(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + _create_state_db(db_path) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='还没写进 sqlite'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + writer=_FakeWriter(), + ) + + assert syncer.run_once() == 0 + saved = json.loads(cursor_path.read_text()) + assert saved["log_offset"] < log_path.stat().st_size + + +def test_writer_error_does_not_mark_turn_synced(tmp_path): + log_path = tmp_path / "agent.log" + db_path = tmp_path / "state.db" + cursor_path = tmp_path / "cursor.json" + _create_state_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.execute( + "INSERT INTO sessions(id, source, started_at) VALUES (?, ?, ?)", + ("s1", "tui", 1.0), + ) + conn.executemany( + "INSERT INTO messages(id, session_id, role, content, timestamp, active) " + "VALUES (?, ?, ?, ?, ?, 1)", + [ + (1, "s1", "user", "你好", 10.0), + (2, "s1", "assistant", "你好", 11.0), + ], + ) + log_path.write_text( + "\n".join( + [ + "2026-06-24 10:00:00,000 INFO [s1] agent.turn_context: " + "conversation turn: session=s1 model=qwen provider=custom " + "platform=tui history=0 msg='你好'", + "2026-06-24 10:00:03,000 INFO [s1] agent.conversation_loop: " + "Turn ended: reason=text_response(finish_reason=stop) model=qwen " + "api_calls=1/60 budget=1/60 tool_turns=0 last_msg_role=assistant " + "response_len=12 session=s1", + ] + ) + ) + + syncer = HermesLogSyncer( + log_path=log_path, + state_db_path=db_path, + cursor_path=cursor_path, + writer=_FakeWriter(result="Error add failed"), + ) + + try: + syncer.run_once() + except RuntimeError as exc: + assert "Error add failed" in str(exc) + else: + raise AssertionError("expected RuntimeError") + assert not cursor_path.exists() + + +def test_resolve_mcp_url_prefers_cli_then_env_then_config_file(tmp_path, monkeypatch): + config_path = tmp_path / "memos-log-syncer.json" + write_syncer_config(config_path, "https://configured.example.com/mcp") + + monkeypatch.setenv("MEMOS_MCP_URL", "https://env.example.com/mcp") + + assert resolve_mcp_url("https://cli.example.com/mcp", config_path) == ( + "https://cli.example.com/mcp" + ) + assert resolve_mcp_url(None, config_path) == "https://env.example.com/mcp" + + monkeypatch.delenv("MEMOS_MCP_URL") + + assert resolve_mcp_url(None, config_path) == "https://configured.example.com/mcp" + assert resolve_mcp_url(None, tmp_path / "missing.json") == DEFAULT_MCP_URL + + +def test_writer_uses_configured_remote_mcp_url(): + writer = MemosMCPWriter(url="https://memos.example.com/mcp") + + assert writer.url == "https://memos.example.com/mcp" + + +class _FakeWriter: + def __init__(self, result: str = "ok") -> None: + self.calls: list[dict[str, str]] = [] + self.processed_batches: list[tuple[list[str], str]] = [] + self.result = result + + def write_raw_turn(self, arguments: dict[str, str]) -> str: + self.calls.append(arguments) + return self.result + + def process_raw_turns(self, memory_ids: list[str], session_id: str) -> str: + self.processed_batches.append((memory_ids, session_id)) + return f"Submitted {len(memory_ids)} raw conversation turns for scheduler processing" + + +def _create_state_db(path: Path) -> None: + with sqlite3.connect(path) as conn: + conn.execute( + """ + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, + started_at REAL NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + timestamp REAL NOT NULL, + active INTEGER NOT NULL DEFAULT 1 + ) + """ + ) diff --git a/tests/examples/test_hermes_memos_plugin.py b/tests/examples/test_hermes_memos_plugin.py new file mode 100644 index 000000000..d470a2d40 --- /dev/null +++ b/tests/examples/test_hermes_memos_plugin.py @@ -0,0 +1,94 @@ +import importlib.util + +from pathlib import Path +from unittest.mock import MagicMock, call + + +PLUGIN_PATH = ( + Path(__file__).parents[2] / "examples/mcp_clients/hermes_agent/plugin/memos-memory/__init__.py" +) + + +def load_plugin(): + spec = importlib.util.spec_from_file_location("hermes_memos_plugin", PLUGIN_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + return module + + +def test_pre_llm_call_returns_memory_context(): + plugin = load_plugin() + plugin._CLIENT.search_memories = MagicMock(return_value=["Prefers concise answers"]) + + result = plugin._on_pre_llm_call(user_message="How should you answer?", session_id="s1") + + assert result == { + "context": "Relevant long-term memories from MemOS:\n- Prefers concise answers" + } + + +def test_parse_sse_supports_multiline_data(): + plugin = load_plugin() + payload = 'event: message\ndata: {"jsonrpc":"2.0","id":1,\n"result":{"content":[]}}\n\n' + + assert plugin.MemosMCPClient._parse_sse(payload) == { + "jsonrpc": "2.0", + "id": 1, + "result": {"content": []}, + } + + +def test_pre_llm_call_fails_open(): + plugin = load_plugin() + plugin._CLIENT.search_memories = MagicMock(side_effect=TimeoutError("offline")) + + assert plugin._on_pre_llm_call(user_message="hello", session_id="s1") is None + + +def test_post_llm_call_submits_user_assistant_pair(): + plugin = load_plugin() + plugin._CLIENT.add_turn = MagicMock() + + plugin._submit_turn( + session_id="s1", + turn_id="t1", + user_message="Remember this", + assistant_response="I will", + ) + + plugin._CLIENT.add_turn.assert_called_once_with( + session_id="s1", + messages=[ + {"role": "user", "content": "Remember this"}, + {"role": "assistant", "content": "I will"}, + ], + ) + + +def test_post_llm_call_skips_duplicate_turn(): + plugin = load_plugin() + plugin._CLIENT.add_turn = MagicMock() + + kwargs = { + "session_id": "s1", + "turn_id": "t1", + "user_message": "Remember this", + "assistant_response": "I will", + } + plugin._submit_turn(**kwargs) + plugin._submit_turn(**kwargs) + + plugin._CLIENT.add_turn.assert_called_once() + + +def test_registers_pre_and_post_hooks(): + plugin = load_plugin() + context = MagicMock() + + plugin.register(context) + + assert context.register_hook.call_args_list == [ + call("pre_llm_call", plugin._on_pre_llm_call), + call("post_llm_call", plugin._on_post_llm_call), + ] diff --git a/tests/examples/test_migrate_hermes_memory.py b/tests/examples/test_migrate_hermes_memory.py new file mode 100644 index 000000000..9410e2ee8 --- /dev/null +++ b/tests/examples/test_migrate_hermes_memory.py @@ -0,0 +1,32 @@ +from examples.mcp_clients.hermes_agent.migrate_hermes_memory import build_memory_tool_call + + +def test_build_memory_tool_call_for_text_memory(): + content = "项目使用 Python 3.11。" + + payload = build_memory_tool_call( + {"content": content, "layer": "text_mem", "source": "MEMORY.md"}, + request_id=7, + ) + + assert payload["id"] == 7 + assert payload["params"] == { + "name": "add_memory", + "arguments": {"memory_content": content}, + } + + +def test_build_memory_tool_call_for_preference_memory(): + content = "中文沟通,风格简洁直接。" + + payload = build_memory_tool_call( + {"content": content, "layer": "pref_mem", "source": "USER.md"}, + request_id=8, + ) + + assert payload["id"] == 8 + assert payload["params"] == { + "name": "add_preference_memory", + "arguments": {"preference": content}, + } + assert "[PREFERENCE]" not in payload["params"]["arguments"]["preference"]