From bbe014b94d061846c6d348d5fe4d301f3b9af0f9 Mon Sep 17 00:00:00 2001 From: phernandez Date: Mon, 13 Apr 2026 16:52:10 -0500 Subject: [PATCH 1/3] type checking and removing extra spans in api flow --- .../api/v2/routers/knowledge_router.py | 677 ++++++++---------- src/basic_memory/models/knowledge.py | 6 +- src/basic_memory/services/entity_service.py | 2 +- .../api/v2/test_knowledge_router_telemetry.py | 47 +- 4 files changed, 297 insertions(+), 435 deletions(-) diff --git a/src/basic_memory/api/v2/routers/knowledge_router.py b/src/basic_memory/api/v2/routers/knowledge_router.py index ba25aa7f..2cad89b8 100644 --- a/src/basic_memory/api/v2/routers/knowledge_router.py +++ b/src/basic_memory/api/v2/routers/knowledge_router.py @@ -75,34 +75,40 @@ async def get_graph( Returns a flat node/edge structure optimized for rendering with graph libraries. Only includes resolved relations (where to_id is not null). """ - logger.info("API v2 request: get_graph") + with telemetry.operation( + "api.request.knowledge.get_graph", + entrypoint="api", + domain="knowledge", + action="get_graph", + ): + logger.info("API v2 request: get_graph") - # Fetch all entities for this project - entities = await entity_repository.find_all(use_load_options=False) - nodes = [ - GraphNode( - external_id=entity.external_id, - title=entity.title, - note_type=entity.note_type, - file_path=entity.file_path, - ) - for entity in entities - ] - - # Fetch all resolved relations (to_id is not null) with eager-loaded entities - relations = await relation_repository.find_all() - edges = [ - GraphEdge( - from_id=relation.from_entity.external_id, - to_id=relation.to_entity.external_id, - relation_type=relation.relation_type, - ) - for relation in relations - if relation.to_entity is not None - ] + # Fetch all entities for this project + entities = await entity_repository.find_all(use_load_options=False) + nodes = [ + GraphNode( + external_id=entity.external_id, + title=entity.title, + note_type=entity.note_type, + file_path=entity.file_path, + ) + for entity in entities + ] + + # Fetch all resolved relations (to_id is not null) with eager-loaded entities + relations = await relation_repository.find_all() + edges = [ + GraphEdge( + from_id=relation.from_entity.external_id, + to_id=relation.to_entity.external_id, + relation_type=relation.relation_type, + ) + for relation in relations + if relation.to_entity is not None + ] - logger.info(f"API v2 response: graph with {len(nodes)} nodes and {len(edges)} edges") - return GraphResponse(nodes=nodes, edges=edges) + logger.info(f"API v2 response: graph with {len(nodes)} nodes and {len(edges)} edges") + return GraphResponse(nodes=nodes, edges=edges) ## Resolution endpoint @@ -151,25 +157,13 @@ async def resolve_identifier( ): logger.info(f"API v2 request: resolve_identifier for '{data.identifier}'") - with telemetry.scope( - "api.knowledge.resolve_entity.lookup_entity", - domain="knowledge", - action="resolve_entity", - phase="lookup_entity", - ): - entity = await entity_repository.get_by_external_id(data.identifier) + entity = await entity_repository.get_by_external_id(data.identifier) resolution_method = "external_id" if entity else "search" if not entity: - with telemetry.scope( - "api.knowledge.resolve_entity.resolve_link", - domain="knowledge", - action="resolve_entity", - phase="resolve_link", - ): - entity = await link_resolver.resolve_link( - data.identifier, source_path=data.source_path, strict=data.strict - ) + entity = await link_resolver.resolve_link( + data.identifier, source_path=data.source_path, strict=data.strict + ) if entity: if entity.permalink == data.identifier: resolution_method = "permalink" @@ -183,20 +177,14 @@ async def resolve_identifier( if not entity: raise HTTPException(status_code=404, detail=f"Entity not found: '{data.identifier}'") - with telemetry.scope( - "api.knowledge.resolve_entity.shape_response", - domain="knowledge", - action="resolve_entity", - phase="shape_response", - ): - result = EntityResolveResponse( - external_id=entity.external_id, - entity_id=entity.id, - permalink=entity.permalink, - file_path=entity.file_path, - title=entity.title, - resolution_method=resolution_method, - ) + result = EntityResolveResponse( + external_id=entity.external_id, + entity_id=entity.id, + permalink=entity.permalink, + file_path=entity.file_path, + title=entity.title, + resolution_method=resolution_method, + ) logger.debug( f"API v2 response: resolved '{data.identifier}' to external_id={result.external_id} via {resolution_method}" @@ -236,25 +224,13 @@ async def get_entity_by_id( ): logger.info(f"API v2 request: get_entity_by_id entity_id={entity_id}") - with telemetry.scope( - "api.knowledge.get_entity.load_entity", - domain="knowledge", - action="get_entity", - phase="load_entity", - ): - entity = await entity_repository.get_by_external_id(entity_id) + entity = await entity_repository.get_by_external_id(entity_id) if not entity: raise HTTPException( status_code=404, detail=f"Entity with external_id '{entity_id}' not found" ) - with telemetry.scope( - "api.knowledge.get_entity.shape_response", - domain="knowledge", - action="get_entity", - phase="shape_response", - ): - result = EntityResponseV2.model_validate(entity) + result = EntityResponseV2.model_validate(entity) logger.info(f"API v2 response: external_id={entity_id}, title='{result.title}'") return result @@ -297,75 +273,42 @@ async def create_entity( "API v2 request", endpoint="create_entity", note_type=data.note_type, title=data.title ) - with telemetry.scope( - "api.knowledge.create_entity.write_entity", - domain="knowledge", - action="create_entity", - phase="write_entity", - fast=fast, - ): - if fast: - entity = await entity_service.fast_write_entity(data) - written_content = None - search_content = None - else: - write_result = await entity_service.create_entity_with_content(data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content + if fast: + entity = await entity_service.fast_write_entity(data) + written_content = None + search_content = None + else: + write_result = await entity_service.create_entity_with_content(data) + entity = write_result.entity + written_content = write_result.content + search_content = write_result.search_content if fast: - with telemetry.scope( - "api.knowledge.create_entity.enqueue_reindex", - domain="knowledge", - action="create_entity", - phase="enqueue_reindex", - fast=fast, - ): - task_scheduler.schedule( - "reindex_entity", - entity_id=entity.id, - project_id=project_id, - ) + task_scheduler.schedule( + "reindex_entity", + entity_id=entity.id, + project_id=project_id, + ) else: - with telemetry.scope( - "api.knowledge.create_entity.search_index", - domain="knowledge", - action="create_entity", - phase="search_index", - ): - await search_service.index_entity(entity, content=search_content) - with telemetry.scope( - "api.knowledge.create_entity.vector_sync", - domain="knowledge", - action="create_entity", - phase="vector_sync", - ): - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=entity.id, - project_id=project_id, - ) + await search_service.index_entity(entity, content=search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=entity.id, + project_id=project_id, + ) result = EntityResponseV2.model_validate(entity) if fast: result = result.model_copy(update={"observations": [], "relations": []}) - with telemetry.scope( - "api.knowledge.create_entity.read_content", - domain="knowledge", - action="create_entity", - phase="read_content", - source="file" if fast else "memory", - ): - if fast: - content = await file_service.read_file_content(entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content + if fast: + content = await file_service.read_file_content(entity.file_path) + else: + # Non-fast writes already captured the markdown in memory. Reuse it here + # instead of re-reading the file; format_on_save is the one config that can + # still make the persisted file diverge because write_file only returns a checksum. + content = written_content result = result.model_copy(update={"content": content}) logger.info( @@ -415,106 +358,67 @@ async def update_entity_by_id( ): logger.info(f"API v2 request: update_entity_by_id entity_id={entity_id}") - with telemetry.scope( - "api.knowledge.update_entity.load_entity", - domain="knowledge", - action="update_entity", - phase="load_entity", - ): - existing = await entity_repository.get_by_external_id(entity_id) + existing = await entity_repository.get_by_external_id(entity_id) created = existing is None - with telemetry.scope( - "api.knowledge.update_entity.write_entity", - domain="knowledge", - action="update_entity", - phase="write_entity", - fast=fast, - ): - if fast: - entity = await entity_service.fast_write_entity(data, external_id=entity_id) - written_content = None - search_content = None - response.status_code = 200 if existing else 201 + if fast: + entity = await entity_service.fast_write_entity(data, external_id=entity_id) + written_content = None + search_content = None + response.status_code = 200 if existing else 201 + else: + if existing: + write_result = await entity_service.update_entity_with_content(existing, data) + entity = write_result.entity + written_content = write_result.content + search_content = write_result.search_content + response.status_code = 200 else: - if existing: - write_result = await entity_service.update_entity_with_content(existing, data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - response.status_code = 200 - else: - write_result = await entity_service.create_entity_with_content(data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - if entity.external_id != entity_id: - entity = await entity_repository.update( - entity.id, - {"external_id": entity_id}, + write_result = await entity_service.create_entity_with_content(data) + entity = write_result.entity + written_content = write_result.content + search_content = write_result.search_content + if entity.external_id != entity_id: + entity = await entity_repository.update( + entity.id, + {"external_id": entity_id}, + ) + # external_id fixup only changes the DB row. The file content is unchanged, + # so the markdown captured during the write remains valid downstream. + if not entity: + raise HTTPException( + status_code=404, + detail=f"Entity with external_id '{entity_id}' not found", ) - # external_id fixup only changes the DB row. The file content is unchanged, - # so the markdown captured during the write remains valid downstream. - if not entity: - raise HTTPException( - status_code=404, - detail=f"Entity with external_id '{entity_id}' not found", - ) - response.status_code = 201 + response.status_code = 201 if fast: - with telemetry.scope( - "api.knowledge.update_entity.enqueue_reindex", - domain="knowledge", - action="update_entity", - phase="enqueue_reindex", - fast=fast, - ): - task_scheduler.schedule( - "reindex_entity", - entity_id=entity.id, - project_id=project_id, - resolve_relations=created, - ) + task_scheduler.schedule( + "reindex_entity", + entity_id=entity.id, + project_id=project_id, + resolve_relations=created, + ) else: - with telemetry.scope( - "api.knowledge.update_entity.search_index", - domain="knowledge", - action="update_entity", - phase="search_index", - ): - await search_service.index_entity(entity, content=search_content) - with telemetry.scope( - "api.knowledge.update_entity.vector_sync", - domain="knowledge", - action="update_entity", - phase="vector_sync", - ): - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=entity.id, - project_id=project_id, - ) + await search_service.index_entity(entity, content=search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=entity.id, + project_id=project_id, + ) result = EntityResponseV2.model_validate(entity) if fast: result = result.model_copy(update={"observations": [], "relations": []}) - with telemetry.scope( - "api.knowledge.update_entity.read_content", - domain="knowledge", - action="update_entity", - phase="read_content", - source="file" if fast else "memory", - ): - if fast: - content = await file_service.read_file_content(entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content + if fast: + content = await file_service.read_file_content(entity.file_path) + else: + # Non-fast writes already captured the markdown in memory. Reuse it here + # instead of re-reading the file; format_on_save is the one config that can + # still make the persisted file diverge because write_file only returns a checksum. + content = written_content result = result.model_copy(update={"content": content}) logger.info( @@ -563,103 +467,64 @@ async def edit_entity_by_id( f"API v2 request: edit_entity_by_id entity_id={entity_id}, operation='{data.operation}'" ) - with telemetry.scope( - "api.knowledge.edit_entity.load_entity", - domain="knowledge", - action="edit_entity", - phase="load_entity", - ): - entity = await entity_repository.get_by_external_id(entity_id) + entity = await entity_repository.get_by_external_id(entity_id) if not entity: # pragma: no cover raise HTTPException( status_code=404, detail=f"Entity with external_id '{entity_id}' not found" ) try: - with telemetry.scope( - "api.knowledge.edit_entity.write_entity", - domain="knowledge", - action="edit_entity", - phase="write_entity", - fast=fast, - ): - if fast: - updated_entity = await entity_service.fast_edit_entity( - entity=entity, - operation=data.operation, - content=data.content, - section=data.section, - find_text=data.find_text, - expected_replacements=data.expected_replacements, - ) - written_content = None - search_content = None - else: - identifier = entity.permalink or entity.file_path - write_result = await entity_service.edit_entity_with_content( - identifier=identifier, - operation=data.operation, - content=data.content, - section=data.section, - find_text=data.find_text, - expected_replacements=data.expected_replacements, - ) - updated_entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content + if fast: + updated_entity = await entity_service.fast_edit_entity( + entity=entity, + operation=data.operation, + content=data.content, + section=data.section, + find_text=data.find_text, + expected_replacements=data.expected_replacements, + ) + written_content = None + search_content = None + else: + identifier = entity.permalink or entity.file_path + write_result = await entity_service.edit_entity_with_content( + identifier=identifier, + operation=data.operation, + content=data.content, + section=data.section, + find_text=data.find_text, + expected_replacements=data.expected_replacements, + ) + updated_entity = write_result.entity + written_content = write_result.content + search_content = write_result.search_content if fast: - with telemetry.scope( - "api.knowledge.edit_entity.enqueue_reindex", - domain="knowledge", - action="edit_entity", - phase="enqueue_reindex", - fast=fast, - ): - task_scheduler.schedule( - "reindex_entity", - entity_id=updated_entity.id, - project_id=project_id, - ) + task_scheduler.schedule( + "reindex_entity", + entity_id=updated_entity.id, + project_id=project_id, + ) else: - with telemetry.scope( - "api.knowledge.edit_entity.search_index", - domain="knowledge", - action="edit_entity", - phase="search_index", - ): - await search_service.index_entity(updated_entity, content=search_content) - with telemetry.scope( - "api.knowledge.edit_entity.vector_sync", - domain="knowledge", - action="edit_entity", - phase="vector_sync", - ): - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=updated_entity.id, - project_id=project_id, - ) + await search_service.index_entity(updated_entity, content=search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=updated_entity.id, + project_id=project_id, + ) result = EntityResponseV2.model_validate(updated_entity) if fast: result = result.model_copy(update={"observations": [], "relations": []}) - with telemetry.scope( - "api.knowledge.edit_entity.read_content", - domain="knowledge", - action="edit_entity", - phase="read_content", - source="file" if fast else "memory", - ): - if fast: - content = await file_service.read_file_content(updated_entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content + if fast: + content = await file_service.read_file_content(updated_entity.file_path) + else: + # Non-fast writes already captured the markdown in memory. Reuse it here + # instead of re-reading the file; format_on_save is the one config that can + # still make the persisted file diverge because write_file only returns a checksum. + content = written_content result = result.model_copy(update={"content": content}) logger.info( @@ -695,23 +560,29 @@ async def delete_entity_by_id( Note: Returns deleted=False if entity doesn't exist (idempotent) """ - logger.info(f"API v2 request: delete_entity_by_id entity_id={entity_id}") + with telemetry.operation( + "api.request.knowledge.delete_entity", + entrypoint="api", + domain="knowledge", + action="delete_entity", + ): + logger.info(f"API v2 request: delete_entity_by_id entity_id={entity_id}") - entity = await entity_repository.get_by_external_id(entity_id) - if entity is None: - logger.info(f"API v2 response: external_id={entity_id} not found, deleted=False") - return DeleteEntitiesResponse(deleted=False) + entity = await entity_repository.get_by_external_id(entity_id) + if entity is None: + logger.info(f"API v2 response: external_id={entity_id} not found, deleted=False") + return DeleteEntitiesResponse(deleted=False) - # Delete the entity using internal ID - deleted = await entity_service.delete_entity(entity.id) + # Delete the entity using internal ID + deleted = await entity_service.delete_entity(entity.id) - # Remove from search index if search service available - if search_service: - background_tasks.add_task(search_service.handle_delete, entity) # pragma: no cover + # Remove from search index if search service available + if search_service: + background_tasks.add_task(search_service.handle_delete, entity) # pragma: no cover - logger.info(f"API v2 response: external_id={entity_id}, deleted={deleted}") + logger.info(f"API v2 response: external_id={entity_id}, deleted={deleted}") - return DeleteEntitiesResponse(deleted=deleted) + return DeleteEntitiesResponse(deleted=deleted) ## Move endpoint @@ -743,48 +614,58 @@ async def move_entity( Returns: Updated entity with new file path """ - logger.info( - f"API v2 request: move_entity entity_id={entity_id}, destination='{data.destination_path}'" - ) - - try: - # First, get the entity by external_id to verify it exists - entity = await entity_repository.get_by_external_id(entity_id) - if not entity: # pragma: no cover - raise HTTPException( - status_code=404, detail=f"Entity with external_id '{entity_id}' not found" - ) - - # Move the entity using its current file path as identifier - moved_entity = await entity_service.move_entity( - identifier=entity.file_path, # Use file path for resolution - destination_path=data.destination_path, - project_config=project_config, - app_config=app_config, + with telemetry.operation( + "api.request.knowledge.move_entity", + entrypoint="api", + domain="knowledge", + action="move_entity", + ): + logger.info( + f"API v2 request: move_entity entity_id={entity_id}, destination='{data.destination_path}'" ) - # Reindex at new location - reindexed_entity = await entity_service.link_resolver.resolve_link(data.destination_path) - if reindexed_entity: - await search_service.index_entity(reindexed_entity) - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, + try: + # First, get the entity by external_id to verify it exists + entity = await entity_repository.get_by_external_id(entity_id) + if not entity: # pragma: no cover + raise HTTPException( + status_code=404, detail=f"Entity with external_id '{entity_id}' not found" + ) + + # Move the entity using its current file path as identifier + moved_entity = await entity_service.move_entity( + identifier=entity.file_path, # Use file path for resolution + destination_path=data.destination_path, + project_config=project_config, app_config=app_config, - entity_id=reindexed_entity.id, - project_id=project_id, ) - result = EntityResponseV2.model_validate(moved_entity) + # Reindex at new location + reindexed_entity = await entity_service.link_resolver.resolve_link( + data.destination_path + ) + if reindexed_entity: + await search_service.index_entity(reindexed_entity) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=reindexed_entity.id, + project_id=project_id, + ) + + result = EntityResponseV2.model_validate(moved_entity) - logger.info(f"API v2 response: moved external_id={entity_id} to '{data.destination_path}'") + logger.info( + f"API v2 response: moved external_id={entity_id} to '{data.destination_path}'" + ) - return result + return result - except HTTPException: # pragma: no cover - raise # pragma: no cover - except Exception as e: - logger.error(f"Error moving entity: {e}") - raise HTTPException(status_code=400, detail=str(e)) + except HTTPException: # pragma: no cover + raise # pragma: no cover + except Exception as e: + logger.error(f"Error moving entity: {e}") + raise HTTPException(status_code=400, detail=str(e)) ## Move directory endpoint @@ -814,40 +695,46 @@ async def move_directory( Returns: DirectoryMoveResult with counts and details of moved files """ - logger.info( - f"API v2 request: move_directory source='{data.source_directory}', destination='{data.destination_directory}'" - ) - - try: - # Move the directory using the service - result = await entity_service.move_directory( - source_directory=data.source_directory, - destination_directory=data.destination_directory, - project_config=project_config, - app_config=app_config, + with telemetry.operation( + "api.request.knowledge.move_directory", + entrypoint="api", + domain="knowledge", + action="move_directory", + ): + logger.info( + f"API v2 request: move_directory source='{data.source_directory}', destination='{data.destination_directory}'" ) - # Reindex moved entities - for file_path in result.moved_files: - entity = await entity_service.link_resolver.resolve_link(file_path) - if entity: - await search_service.index_entity(entity) - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=entity.id, - project_id=project_id, - ) + try: + # Move the directory using the service + result = await entity_service.move_directory( + source_directory=data.source_directory, + destination_directory=data.destination_directory, + project_config=project_config, + app_config=app_config, + ) - logger.info( - f"API v2 response: move_directory " - f"total={result.total_files}, success={result.successful_moves}, failed={result.failed_moves}" - ) - return result + # Reindex moved entities + for file_path in result.moved_files: + entity = await entity_service.link_resolver.resolve_link(file_path) + if entity: + await search_service.index_entity(entity) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=entity.id, + project_id=project_id, + ) + + logger.info( + f"API v2 response: move_directory " + f"total={result.total_files}, success={result.successful_moves}, failed={result.failed_moves}" + ) + return result - except Exception as e: - logger.error(f"Error moving directory: {e}") - raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error moving directory: {e}") + raise HTTPException(status_code=400, detail=str(e)) ## Delete directory endpoint @@ -872,20 +759,26 @@ async def delete_directory( Returns: DirectoryDeleteResult with counts and details of deleted files """ - logger.info(f"API v2 request: delete_directory directory='{data.directory}'") + with telemetry.operation( + "api.request.knowledge.delete_directory", + entrypoint="api", + domain="knowledge", + action="delete_directory", + ): + logger.info(f"API v2 request: delete_directory directory='{data.directory}'") - try: - # Delete the directory using the service - result = await entity_service.delete_directory( - directory=data.directory, - ) + try: + # Delete the directory using the service + result = await entity_service.delete_directory( + directory=data.directory, + ) - logger.info( - f"API v2 response: delete_directory " - f"total={result.total_files}, success={result.successful_deletes}, failed={result.failed_deletes}" - ) - return result + logger.info( + f"API v2 response: delete_directory " + f"total={result.total_files}, success={result.successful_deletes}, failed={result.failed_deletes}" + ) + return result - except Exception as e: - logger.error(f"Error deleting directory: {e}") - raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error deleting directory: {e}") + raise HTTPException(status_code=400, detail=str(e)) diff --git a/src/basic_memory/models/knowledge.py b/src/basic_memory/models/knowledge.py index 0a8eabf3..98d290a1 100644 --- a/src/basic_memory/models/knowledge.py +++ b/src/basic_memory/models/knowledge.py @@ -62,7 +62,7 @@ class Entity(Base): ) # Core identity - id: Mapped[int] = mapped_column(Integer, primary_key=True) + id: Mapped[int] = mapped_column(Integer, primary_key=True) # pyright: ignore [reportIncompatibleVariableOverride] # External UUID for API references - stable identifier that won't change external_id: Mapped[str] = mapped_column(String, unique=True, default=lambda: str(uuid.uuid4())) title: Mapped[str] = mapped_column(String) @@ -229,7 +229,7 @@ class Observation(Base): Index("ix_observation_category", "category"), # Add category index ) - id: Mapped[int] = mapped_column(Integer, primary_key=True) + id: Mapped[int] = mapped_column(Integer, primary_key=True) # pyright: ignore [reportIncompatibleVariableOverride] project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), index=True) entity_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE")) content: Mapped[str] = mapped_column(Text) @@ -276,7 +276,7 @@ class Relation(Base): Index("ix_relation_to_id", "to_id"), ) - id: Mapped[int] = mapped_column(Integer, primary_key=True) + id: Mapped[int] = mapped_column(Integer, primary_key=True) # pyright: ignore [reportIncompatibleVariableOverride] project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), index=True) from_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE")) to_id: Mapped[Optional[int]] = mapped_column( diff --git a/src/basic_memory/services/entity_service.py b/src/basic_memory/services/entity_service.py index 51e9876a..aa59979e 100644 --- a/src/basic_memory/services/entity_service.py +++ b/src/basic_memory/services/entity_service.py @@ -1021,7 +1021,7 @@ async def update_entity_relations( target_entity: Optional[Entity] = None if not isinstance(resolved, Exception): # Type narrowing: resolved is Optional[Entity] here, not Exception - target_entity = resolved + target_entity = resolved # pyright: ignore [reportAssignmentType] # if the target is found, store the id target_id = target_entity.id if target_entity else None diff --git a/tests/api/v2/test_knowledge_router_telemetry.py b/tests/api/v2/test_knowledge_router_telemetry.py index 05e5ee2a..771463de 100644 --- a/tests/api/v2/test_knowledge_router_telemetry.py +++ b/tests/api/v2/test_knowledge_router_telemetry.py @@ -48,14 +48,12 @@ def _fake_entity(*, external_id: str = "entity-123", file_path: str = "notes/tes ) -def _assert_names_in_order(names: list[str], expected: list[str]) -> None: - cursor = 0 - for expected_name in expected: - cursor = names.index(expected_name, cursor) + 1 +def _assert_only_root_span(spans: list[tuple[str, dict]], expected_name: str) -> None: + assert [name for name, _ in spans] == [expected_name] @pytest.mark.asyncio -async def test_create_entity_emits_root_and_nested_spans(monkeypatch) -> None: +async def test_create_entity_emits_only_root_span(monkeypatch) -> None: spans, fake_span = _capture_spans() monkeypatch.setattr(knowledge_router_module.telemetry, "span", fake_span) @@ -104,20 +102,11 @@ async def read_file_content(self, path): ) assert result.content == response_content - _assert_names_in_order( - [name for name, _ in spans], - [ - "api.request.knowledge.create_entity", - "api.knowledge.create_entity.write_entity", - "api.knowledge.create_entity.search_index", - "api.knowledge.create_entity.vector_sync", - "api.knowledge.create_entity.read_content", - ], - ) + _assert_only_root_span(spans, "api.request.knowledge.create_entity") @pytest.mark.asyncio -async def test_update_entity_emits_root_and_nested_spans(monkeypatch) -> None: +async def test_update_entity_emits_only_root_span(monkeypatch) -> None: spans, fake_span = _capture_spans() monkeypatch.setattr(knowledge_router_module.telemetry, "span", fake_span) @@ -172,21 +161,11 @@ async def read_file_content(self, path): ) assert result.content == response_content - _assert_names_in_order( - [name for name, _ in spans], - [ - "api.request.knowledge.update_entity", - "api.knowledge.update_entity.load_entity", - "api.knowledge.update_entity.write_entity", - "api.knowledge.update_entity.search_index", - "api.knowledge.update_entity.vector_sync", - "api.knowledge.update_entity.read_content", - ], - ) + _assert_only_root_span(spans, "api.request.knowledge.update_entity") @pytest.mark.asyncio -async def test_edit_entity_emits_root_and_nested_spans(monkeypatch) -> None: +async def test_edit_entity_emits_only_root_span(monkeypatch) -> None: spans, fake_span = _capture_spans() monkeypatch.setattr(knowledge_router_module.telemetry, "span", fake_span) @@ -233,14 +212,4 @@ async def read_file_content(self, path): ) assert result.content == response_content - _assert_names_in_order( - [name for name, _ in spans], - [ - "api.request.knowledge.edit_entity", - "api.knowledge.edit_entity.load_entity", - "api.knowledge.edit_entity.write_entity", - "api.knowledge.edit_entity.search_index", - "api.knowledge.edit_entity.vector_sync", - "api.knowledge.edit_entity.read_content", - ], - ) + _assert_only_root_span(spans, "api.request.knowledge.edit_entity") From 51717931216d6a1b717f0a6066b405a28cbbe83a Mon Sep 17 00:00:00 2001 From: phernandez Date: Mon, 13 Apr 2026 18:54:05 -0500 Subject: [PATCH 2/3] refactor(core): simplify note write flow Signed-off-by: phernandez --- .../api/v2/routers/knowledge_router.py | 230 +--- src/basic_memory/cli/commands/doctor.py | 2 +- src/basic_memory/deps/services.py | 25 - src/basic_memory/mcp/clients/knowledge.py | 17 +- src/basic_memory/mcp/tools/edit_note.py | 8 +- src/basic_memory/mcp/tools/write_note.py | 4 +- src/basic_memory/services/entity_service.py | 1086 +++++++---------- tests/api/v2/test_knowledge_router.py | 33 +- .../api/v2/test_knowledge_router_telemetry.py | 23 +- tests/mcp/clients/test_clients.py | 61 + tests/services/test_entity_service.py | 149 --- tests/services/test_entity_service_prepare.py | 130 ++ .../services/test_entity_service_telemetry.py | 80 +- .../services/test_task_scheduler_semantic.py | 62 +- .../test_upsert_entity_optimization.py | 90 -- 15 files changed, 754 insertions(+), 1246 deletions(-) create mode 100644 tests/services/test_entity_service_prepare.py diff --git a/src/basic_memory/api/v2/routers/knowledge_router.py b/src/basic_memory/api/v2/routers/knowledge_router.py index 2cad89b8..e674cbb3 100644 --- a/src/basic_memory/api/v2/routers/knowledge_router.py +++ b/src/basic_memory/api/v2/routers/knowledge_router.py @@ -10,7 +10,7 @@ - Simplified caching strategies """ -from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends, Response, Path, Query +from fastapi import APIRouter, HTTPException, Response, Path from loguru import logger from basic_memory import telemetry @@ -24,7 +24,6 @@ RelationRepositoryV2ExternalDep, ProjectExternalIdPathDep, TaskSchedulerDep, - FileServiceV2ExternalDep, ) from basic_memory.schemas import DeleteEntitiesResponse from basic_memory.schemas.base import Entity @@ -243,21 +242,15 @@ async def get_entity_by_id( async def create_entity( project_id: ProjectExternalIdPathDep, data: Entity, - background_tasks: BackgroundTasks, entity_service: EntityServiceV2ExternalDep, search_service: SearchServiceV2ExternalDep, task_scheduler: TaskSchedulerDep, - file_service: FileServiceV2ExternalDep, app_config: AppConfigDep, - fast: bool = Query( - True, description="If true, write quickly and defer indexing to background tasks." - ), ) -> EntityResponseV2: """Create a new entity. Args: data: Entity data to create - fast: If True, defer indexing to background tasks Returns: Created entity with generated external_id (UUID) and file content @@ -267,49 +260,26 @@ async def create_entity( entrypoint="api", domain="knowledge", action="create_entity", - fast=fast, ): logger.info( "API v2 request", endpoint="create_entity", note_type=data.note_type, title=data.title ) - if fast: - entity = await entity_service.fast_write_entity(data) - written_content = None - search_content = None - else: - write_result = await entity_service.create_entity_with_content(data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - - if fast: - task_scheduler.schedule( - "reindex_entity", - entity_id=entity.id, - project_id=project_id, - ) - else: - await search_service.index_entity(entity, content=search_content) - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=entity.id, - project_id=project_id, - ) + # Note writes are now internally consistent before the response returns. We only leave + # truly derived work, like semantic vectors, on the async scheduler. + write_result = await entity_service.create_entity_with_content(data) + entity = write_result.entity + await search_service.index_entity(entity, content=write_result.search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=entity.id, + project_id=project_id, + ) result = EntityResponseV2.model_validate(entity) - if fast: - result = result.model_copy(update={"observations": [], "relations": []}) - - if fast: - content = await file_service.read_file_content(entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content - result = result.model_copy(update={"content": content}) + # The write service already returns the canonical markdown accepted for this request. + result = result.model_copy(update={"content": write_result.content}) logger.info( f"API v2 response: endpoint='create_entity' external_id={entity.external_id}, title={result.title}, permalink={result.permalink}, status_code=201" @@ -324,18 +294,13 @@ async def create_entity( async def update_entity_by_id( data: Entity, response: Response, - background_tasks: BackgroundTasks, project_id: ProjectExternalIdPathDep, entity_service: EntityServiceV2ExternalDep, search_service: SearchServiceV2ExternalDep, entity_repository: EntityRepositoryV2ExternalDep, task_scheduler: TaskSchedulerDep, - file_service: FileServiceV2ExternalDep, app_config: AppConfigDep, entity_id: str = Path(..., description="Entity external ID (UUID)"), - fast: bool = Query( - True, description="If true, write quickly and defer indexing to background tasks." - ), ) -> EntityResponseV2: """Update an entity by external ID. @@ -344,7 +309,6 @@ async def update_entity_by_id( Args: entity_id: External ID (UUID string) data: Updated entity data - fast: If True, defer indexing to background tasks Returns: Updated entity with file content @@ -354,72 +318,43 @@ async def update_entity_by_id( entrypoint="api", domain="knowledge", action="update_entity", - fast=fast, ): logger.info(f"API v2 request: update_entity_by_id entity_id={entity_id}") existing = await entity_repository.get_by_external_id(entity_id) created = existing is None - if fast: - entity = await entity_service.fast_write_entity(data, external_id=entity_id) - written_content = None - search_content = None - response.status_code = 200 if existing else 201 + if existing: + write_result = await entity_service.update_entity_with_content(existing, data) + entity = write_result.entity + response.status_code = 200 else: - if existing: - write_result = await entity_service.update_entity_with_content(existing, data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - response.status_code = 200 - else: - write_result = await entity_service.create_entity_with_content(data) - entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - if entity.external_id != entity_id: - entity = await entity_repository.update( - entity.id, - {"external_id": entity_id}, + write_result = await entity_service.create_entity_with_content(data) + entity = write_result.entity + if entity.external_id != entity_id: + entity = await entity_repository.update( + entity.id, + {"external_id": entity_id}, + ) + # external_id fixup only changes the DB row. The file content is unchanged, + # so the markdown captured during the write remains valid downstream. + if not entity: + raise HTTPException( + status_code=404, + detail=f"Entity with external_id '{entity_id}' not found", ) - # external_id fixup only changes the DB row. The file content is unchanged, - # so the markdown captured during the write remains valid downstream. - if not entity: - raise HTTPException( - status_code=404, - detail=f"Entity with external_id '{entity_id}' not found", - ) - response.status_code = 201 - - if fast: - task_scheduler.schedule( - "reindex_entity", - entity_id=entity.id, - project_id=project_id, - resolve_relations=created, - ) - else: - await search_service.index_entity(entity, content=search_content) - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=entity.id, - project_id=project_id, - ) + response.status_code = 201 - result = EntityResponseV2.model_validate(entity) - if fast: - result = result.model_copy(update={"observations": [], "relations": []}) + await search_service.index_entity(entity, content=write_result.search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=entity.id, + project_id=project_id, + ) - if fast: - content = await file_service.read_file_content(entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content - result = result.model_copy(update={"content": content}) + result = EntityResponseV2.model_validate(entity) + result = result.model_copy(update={"content": write_result.content}) logger.info( f"API v2 response: external_id={entity_id}, created={created}, status_code={response.status_code}" @@ -430,25 +365,19 @@ async def update_entity_by_id( @router.patch("/entities/{entity_id}", response_model=EntityResponseV2) async def edit_entity_by_id( data: EditEntityRequest, - background_tasks: BackgroundTasks, project_id: ProjectExternalIdPathDep, entity_service: EntityServiceV2ExternalDep, search_service: SearchServiceV2ExternalDep, entity_repository: EntityRepositoryV2ExternalDep, task_scheduler: TaskSchedulerDep, - file_service: FileServiceV2ExternalDep, app_config: AppConfigDep, entity_id: str = Path(..., description="Entity external ID (UUID)"), - fast: bool = Query( - True, description="If true, write quickly and defer indexing to background tasks." - ), ) -> EntityResponseV2: """Edit an existing entity by external ID using operations like append, prepend, etc. Args: entity_id: External ID (UUID string) data: Edit operation details - fast: If True, defer indexing to background tasks Returns: Updated entity with file content @@ -461,7 +390,6 @@ async def edit_entity_by_id( entrypoint="api", domain="knowledge", action="edit_entity", - fast=fast, ): logger.info( f"API v2 request: edit_entity_by_id entity_id={entity_id}, operation='{data.operation}'" @@ -474,58 +402,26 @@ async def edit_entity_by_id( ) try: - if fast: - updated_entity = await entity_service.fast_edit_entity( - entity=entity, - operation=data.operation, - content=data.content, - section=data.section, - find_text=data.find_text, - expected_replacements=data.expected_replacements, - ) - written_content = None - search_content = None - else: - identifier = entity.permalink or entity.file_path - write_result = await entity_service.edit_entity_with_content( - identifier=identifier, - operation=data.operation, - content=data.content, - section=data.section, - find_text=data.find_text, - expected_replacements=data.expected_replacements, - ) - updated_entity = write_result.entity - written_content = write_result.content - search_content = write_result.search_content - - if fast: - task_scheduler.schedule( - "reindex_entity", - entity_id=updated_entity.id, - project_id=project_id, - ) - else: - await search_service.index_entity(updated_entity, content=search_content) - _schedule_vector_sync_if_enabled( - task_scheduler=task_scheduler, - app_config=app_config, - entity_id=updated_entity.id, - project_id=project_id, - ) + identifier = entity.permalink or entity.file_path + write_result = await entity_service.edit_entity_with_content( + identifier=identifier, + operation=data.operation, + content=data.content, + section=data.section, + find_text=data.find_text, + expected_replacements=data.expected_replacements, + ) + updated_entity = write_result.entity + await search_service.index_entity(updated_entity, content=write_result.search_content) + _schedule_vector_sync_if_enabled( + task_scheduler=task_scheduler, + app_config=app_config, + entity_id=updated_entity.id, + project_id=project_id, + ) result = EntityResponseV2.model_validate(updated_entity) - if fast: - result = result.model_copy(update={"observations": [], "relations": []}) - - if fast: - content = await file_service.read_file_content(updated_entity.file_path) - else: - # Non-fast writes already captured the markdown in memory. Reuse it here - # instead of re-reading the file; format_on_save is the one config that can - # still make the persisted file diverge because write_file only returns a checksum. - content = written_content - result = result.model_copy(update={"content": content}) + result = result.model_copy(update={"content": write_result.content}) logger.info( f"API v2 response: external_id={entity_id}, operation='{data.operation}', status_code=200" @@ -543,12 +439,10 @@ async def edit_entity_by_id( @router.delete("/entities/{entity_id}", response_model=DeleteEntitiesResponse) async def delete_entity_by_id( - background_tasks: BackgroundTasks, project_id: ProjectExternalIdPathDep, entity_service: EntityServiceV2ExternalDep, entity_repository: EntityRepositoryV2ExternalDep, entity_id: str = Path(..., description="Entity external ID (UUID)"), - search_service=Depends(lambda: None), # Optional for now ) -> DeleteEntitiesResponse: """Delete an entity by external ID. @@ -576,10 +470,6 @@ async def delete_entity_by_id( # Delete the entity using internal ID deleted = await entity_service.delete_entity(entity.id) - # Remove from search index if search service available - if search_service: - background_tasks.add_task(search_service.handle_delete, entity) # pragma: no cover - logger.info(f"API v2 response: external_id={entity_id}, deleted={deleted}") return DeleteEntitiesResponse(deleted=deleted) @@ -591,7 +481,6 @@ async def delete_entity_by_id( @router.put("/entities/{entity_id}/move", response_model=EntityResponseV2) async def move_entity( data: MoveEntityRequestV2, - background_tasks: BackgroundTasks, project_id: ProjectExternalIdPathDep, entity_service: EntityServiceV2ExternalDep, entity_repository: EntityRepositoryV2ExternalDep, @@ -674,7 +563,6 @@ async def move_entity( @router.post("/move-directory", response_model=DirectoryMoveResult) async def move_directory( data: MoveDirectoryRequestV2, - background_tasks: BackgroundTasks, project_id: ProjectExternalIdPathDep, entity_service: EntityServiceV2ExternalDep, project_config: ProjectConfigV2ExternalDep, diff --git a/src/basic_memory/cli/commands/doctor.py b/src/basic_memory/cli/commands/doctor.py index d148f0dc..defcabe2 100644 --- a/src/basic_memory/cli/commands/doctor.py +++ b/src/basic_memory/cli/commands/doctor.py @@ -69,7 +69,7 @@ async def run_doctor() -> None: content=f"# {api_note_title}\n\n- [note] API to file check", entity_metadata={"tags": ["doctor"]}, ) - api_result = await knowledge_client.create_entity(api_note.model_dump(), fast=False) + api_result = await knowledge_client.create_entity(api_note.model_dump()) api_file = project_path / api_result.file_path if not api_file.exists(): diff --git a/src/basic_memory/deps/services.py b/src/basic_memory/deps/services.py index e7366b6e..f482d4cf 100644 --- a/src/basic_memory/deps/services.py +++ b/src/basic_memory/deps/services.py @@ -492,7 +492,6 @@ def schedule(self, task_name: str, **payload: Any) -> None: async def get_task_scheduler( - entity_service: EntityServiceV2ExternalDep, sync_service: SyncServiceV2ExternalDep, search_service: SearchServiceV2ExternalDep, project_config: ProjectConfigV2ExternalDep, @@ -500,28 +499,6 @@ async def get_task_scheduler( ) -> TaskScheduler: """Create a scheduler that maps task specs to coroutines.""" - scheduler: LocalTaskScheduler | None = None - - async def _reindex_entity( - entity_id: int, - resolve_relations: bool = False, - **_: Any, - ) -> None: - await entity_service.reindex_entity(entity_id) - # Trigger: caller requests relation resolution - # Why: resolve forward references created before the entity existed - # Outcome: updates unresolved relations pointing to this entity - if resolve_relations: - await sync_service.resolve_relations(entity_id=entity_id) - # Trigger: semantic search enabled in local config. - # Why: vector chunks are derived and should refresh after canonical reindex completes. - # Outcome: schedules out-of-band vector sync without extending write latency. - if app_config.semantic_search_enabled and scheduler is not None: - scheduler.schedule("sync_entity_vectors", entity_id=entity_id) - - async def _resolve_relations(entity_id: int, **_: Any) -> None: - await sync_service.resolve_relations(entity_id=entity_id) - async def _sync_entity_vectors(entity_id: int, **_: Any) -> None: await search_service.sync_entity_vectors(entity_id) @@ -537,8 +514,6 @@ async def _reindex_project(**_: Any) -> None: scheduler = LocalTaskScheduler( { - "reindex_entity": _reindex_entity, - "resolve_relations": _resolve_relations, "sync_entity_vectors": _sync_entity_vectors, "sync_project": _sync_project, "reindex_project": _reindex_project, diff --git a/src/basic_memory/mcp/clients/knowledge.py b/src/basic_memory/mcp/clients/knowledge.py index cb705fe0..c9fd5e57 100644 --- a/src/basic_memory/mcp/clients/knowledge.py +++ b/src/basic_memory/mcp/clients/knowledge.py @@ -44,9 +44,7 @@ def __init__(self, http_client: AsyncClient, project_id: str): # --- Entity CRUD Operations --- - async def create_entity( - self, entity_data: dict[str, Any], *, fast: bool | None = None - ) -> EntityResponse: + async def create_entity(self, entity_data: dict[str, Any]) -> EntityResponse: """Create a new entity. Args: @@ -58,18 +56,15 @@ async def create_entity( Raises: ToolError: If the request fails """ - params = {"fast": fast} if fast is not None else None with telemetry.scope( "mcp.client.knowledge.create_entity", client_name="knowledge", operation="create_entity", - fast=fast, ): response = await call_post( self.http_client, f"{self._base_path}/entities", json=entity_data, - params=params, client_name="knowledge", operation="create_entity", path_template="/v2/projects/{project_id}/knowledge/entities", @@ -80,8 +75,6 @@ async def update_entity( self, entity_id: str, entity_data: dict[str, Any], - *, - fast: bool | None = None, ) -> EntityResponse: """Update an existing entity (full replacement). @@ -95,18 +88,15 @@ async def update_entity( Raises: ToolError: If the request fails """ - params = {"fast": fast} if fast is not None else None with telemetry.scope( "mcp.client.knowledge.update_entity", client_name="knowledge", operation="update_entity", - fast=fast, ): response = await call_put( self.http_client, f"{self._base_path}/entities/{entity_id}", json=entity_data, - params=params, client_name="knowledge", operation="update_entity", path_template="/v2/projects/{project_id}/knowledge/entities/{entity_id}", @@ -143,8 +133,6 @@ async def patch_entity( self, entity_id: str, patch_data: dict[str, Any], - *, - fast: bool | None = None, ) -> EntityResponse: """Partially update an entity. @@ -158,18 +146,15 @@ async def patch_entity( Raises: ToolError: If the request fails """ - params = {"fast": fast} if fast is not None else None with telemetry.scope( "mcp.client.knowledge.patch_entity", client_name="knowledge", operation="patch_entity", - fast=fast, ): response = await call_patch( self.http_client, f"{self._base_path}/entities/{entity_id}", json=patch_data, - params=params, client_name="knowledge", operation="patch_entity", path_template="/v2/projects/{project_id}/knowledge/entities/{entity_id}", diff --git a/src/basic_memory/mcp/tools/edit_note.py b/src/basic_memory/mcp/tools/edit_note.py index c0bb3c46..d9a623bd 100644 --- a/src/basic_memory/mcp/tools/edit_note.py +++ b/src/basic_memory/mcp/tools/edit_note.py @@ -374,9 +374,7 @@ async def edit_note( directory=directory, operation=operation, ) - result = await knowledge_client.create_entity( - entity.model_dump(), fast=False - ) + result = await knowledge_client.create_entity(entity.model_dump()) file_created = True else: # find_replace/replace_section require existing content — re-raise @@ -399,9 +397,7 @@ async def edit_note( edit_data["expected_replacements"] = str(effective_replacements) # Call the PATCH endpoint - result = await knowledge_client.patch_entity( - entity_id, edit_data, fast=False - ) + result = await knowledge_client.patch_entity(entity_id, edit_data) # --- Format response --- # result is always set: either by create_entity (auto-create) or patch_entity (edit) diff --git a/src/basic_memory/mcp/tools/write_note.py b/src/basic_memory/mcp/tools/write_note.py index 2ec997e9..885175eb 100644 --- a/src/basic_memory/mcp/tools/write_note.py +++ b/src/basic_memory/mcp/tools/write_note.py @@ -222,7 +222,7 @@ async def write_note( logger.debug(f"Attempting to create entity permalink={entity.permalink}") action = "Created" # Default to created try: - result = await knowledge_client.create_entity(entity.model_dump(), fast=False) + result = await knowledge_client.create_entity(entity.model_dump()) action = "Created" except Exception as e: # If creation failed due to conflict (already exists), try to update @@ -260,7 +260,7 @@ async def write_note( ) # pragma: no cover entity_id = await knowledge_client.resolve_entity(entity.permalink) result = await knowledge_client.update_entity( - entity_id, entity.model_dump(), fast=False + entity_id, entity.model_dump() ) action = "Updated" except Exception as update_error: # pragma: no cover diff --git a/src/basic_memory/services/entity_service.py b/src/basic_memory/services/entity_service.py index aa59979e..6ad312ab 100644 --- a/src/basic_memory/services/entity_service.py +++ b/src/basic_memory/services/entity_service.py @@ -1,17 +1,16 @@ """Service for managing entities in the database.""" from collections.abc import Callable +from copy import deepcopy from dataclasses import dataclass -from datetime import datetime from pathlib import Path -from typing import List, Optional, Sequence, Tuple, Union +from typing import Any, List, Optional, Sequence, Tuple, Union import frontmatter import yaml from loguru import logger from sqlalchemy.exc import IntegrityError -from basic_memory import telemetry from basic_memory.config import ProjectConfig, BasicMemoryConfig from basic_memory.file_utils import ( has_frontmatter, @@ -60,6 +59,31 @@ class EntityWriteResult: search_content: str +@dataclass(frozen=True) +class PreparedEntityWrite: + """Accepted note state before any persistence side effects happen. + + Prepare methods return this object after all note semantics have been resolved, + but before any file writes or database mutations occur. + + Attributes: + file_path: Canonical note path implied by the request. + markdown_content: Full markdown to persist, including frontmatter. + search_content: Frontmatter-stripped content for inline FTS indexing. + entity_fields: Entity row values that mirror the accepted markdown state. + Keys are ``title``, ``note_type``, ``file_path``, ``content_type``, + ``entity_metadata``, and ``permalink``. + entity_markdown: Parsed markdown reused by the local write path to update + entities, observations, and relations without reparsing a second time. + """ + + file_path: Path + markdown_content: str + search_content: str + entity_fields: dict[str, Any] + entity_markdown: EntityMarkdown + + def _frontmatter_permalink(value: object) -> str | None: """Return an explicit frontmatter permalink only when YAML parsed a real string.""" return value if isinstance(value, str) and value else None @@ -237,543 +261,445 @@ def _build_frontmatter_markdown( relations=[], ) - async def create_or_update_entity(self, schema: EntitySchema) -> Tuple[EntityModel, bool]: - """Create new entity or update existing one. - Returns: (entity, is_new) where is_new is True if a new entity was created - """ - logger.debug( - f"Creating or updating entity: {schema.file_path}, permalink: {schema.permalink}" - ) + def _coerce_schema_input(self, schema: EntitySchema | EntityModel) -> EntitySchema: + """Normalize legacy Entity-like inputs into the schema shape prepare methods expect.""" + if isinstance(schema, EntitySchema): + return schema - # Try to find existing entity using strict resolution (no fuzzy search) - # This prevents incorrectly matching similar file paths like "Node A.md" and "Node C.md" - existing = await self.link_resolver.resolve_link( - schema.file_path, - strict=True, - load_relations=False, + # create_or_update_entity historically tolerated callers passing an ORM entity that had + # been annotated with ad-hoc content. Preserve that compatibility at the wrapper boundary + # so the prepare layer itself can stay strict and schema-focused. + directory = Path(schema.file_path).parent.as_posix() + normalized = EntitySchema( + title=schema.title, + content=getattr(schema, "content", None), + directory="" if directory == "." else directory, + note_type=schema.note_type, + entity_metadata=schema.entity_metadata, + content_type=schema.content_type, ) - if not existing and schema.permalink: - existing = await self.link_resolver.resolve_link( - schema.permalink, - strict=True, - load_relations=False, - ) + normalized._permalink = schema.permalink + return normalized - if existing: - logger.debug(f"Found existing entity: {existing.file_path}") - return await self.update_entity(existing, schema), False - else: - # Create new entity - return await self.create_entity(schema), True + def _sync_prepared_schema_state( + self, + source_schema: EntitySchema | EntityModel, + prepared: PreparedEntityWrite, + ) -> None: + """Preserve the legacy side effect where write helpers populate the caller's schema.""" + if not isinstance(source_schema, EntitySchema): + return + + # Older service flows mutated the request schema with the resolved permalink and any + # frontmatter-derived note type. Several callers and tests still rely on that behavior + # after create/update returns. + source_schema.title = prepared.entity_fields["title"] + source_schema.note_type = prepared.entity_fields["note_type"] + source_schema.content_type = prepared.entity_fields["content_type"] + source_schema.entity_metadata = prepared.entity_fields["entity_metadata"] - async def create_entity(self, schema: EntitySchema) -> EntityModel: - """Create a new entity and write to filesystem.""" - return (await self.create_entity_with_content(schema)).entity + if self.app_config and self.app_config.disable_permalinks: + source_schema._permalink = "" + else: + source_schema._permalink = prepared.entity_fields["permalink"] - async def create_entity_with_content(self, schema: EntitySchema) -> EntityWriteResult: - """Create a new entity and return both the entity row and written markdown.""" - logger.debug(f"Creating entity: {schema.title}") + def _apply_schema_frontmatter_overrides(self, schema: EntitySchema) -> EntityMarkdown | None: + """Apply schema content frontmatter overrides and return permalink resolution metadata.""" + if not schema.content or not has_frontmatter(schema.content): + return None - # Get file path and ensure it's a Path object - file_path = Path(schema.file_path) + # Trigger: callers supply markdown that already contains frontmatter. + # Why: user-authored frontmatter is part of accepted note semantics, not a persistence detail. + # Outcome: note_type/permalink derivation happens before any write path decides how to persist. + content_frontmatter = parse_frontmatter(schema.content) - if await self.file_service.exists(file_path): - raise EntityAlreadyExistsError( - f"file for entity {schema.directory}/{schema.title} already exists: {file_path}" - ) + if "type" in content_frontmatter: + schema.note_type = _coerce_to_string(content_frontmatter["type"]) - # Parse content frontmatter to check for user-specified permalink and note_type - content_markdown = None - if schema.content and has_frontmatter(schema.content): - content_frontmatter = parse_frontmatter(schema.content) + if "permalink" not in content_frontmatter: + return None - # If content has type, use it to override the schema note_type - if "type" in content_frontmatter: - schema.note_type = content_frontmatter["type"] + content_permalink = _frontmatter_permalink(content_frontmatter["permalink"]) + if content_permalink is None: + return None - if "permalink" in content_frontmatter: - content_permalink = _frontmatter_permalink(content_frontmatter["permalink"]) - if content_permalink is not None: - content_markdown = self._build_frontmatter_markdown( - schema.title, - schema.note_type, - content_permalink, - ) + return self._build_frontmatter_markdown( + schema.title, + schema.note_type, + content_permalink, + ) - # Get unique permalink (prioritizing content frontmatter) unless disabled + async def _resolve_schema_permalink( + self, + schema: EntitySchema, + *, + file_path: Path, + current_permalink: str | None = None, + content_markdown: EntityMarkdown | None = None, + skip_conflict_check: bool = False, + ) -> str | None: + """Resolve the canonical permalink for a create/update write.""" if self.app_config and self.app_config.disable_permalinks: - schema._permalink = "" - else: - with telemetry.scope( - "entity_service.create.resolve_permalink", - domain="entity_service", - action="create", - phase="resolve_permalink", - ): - permalink = await self.resolve_permalink(file_path, content_markdown) - schema._permalink = permalink - - post = await schema_to_markdown(schema) - - final_content = dump_frontmatter(post) - with telemetry.scope( - "entity_service.create.write_file", - domain="entity_service", - action="create", - phase="write_file", - ): - checksum = await self.file_service.write_file(file_path, final_content) - - with telemetry.scope( - "entity_service.create.parse_markdown", - domain="entity_service", - action="create", - phase="parse_markdown", - ): - entity_markdown = await self.entity_parser.parse_markdown_content( - file_path=file_path, - content=final_content, - ) - - with telemetry.scope( - "entity_service.create.upsert_entity", - domain="entity_service", - action="create", - phase="upsert_entity", - ): - entity = await self.upsert_entity_from_markdown(file_path, entity_markdown, is_new=True) - - with telemetry.scope( - "entity_service.create.update_checksum", - domain="entity_service", - action="create", - phase="update_checksum", - ): - updated = await self.repository.update(entity.id, {"checksum": checksum}) - if not updated: # pragma: no cover - raise ValueError(f"Failed to update entity checksum after create: {entity.id}") - return EntityWriteResult( - entity=updated, - content=final_content, - search_content=remove_frontmatter(final_content), + if current_permalink is None: + schema._permalink = "" + return None + schema._permalink = current_permalink + return current_permalink + + if current_permalink and not (content_markdown and content_markdown.frontmatter.permalink): + schema._permalink = current_permalink + return current_permalink + + resolved_permalink = await self.resolve_permalink( + file_path, + content_markdown, + skip_conflict_check=skip_conflict_check, ) + schema._permalink = resolved_permalink + return resolved_permalink - async def update_entity(self, entity: EntityModel, schema: EntitySchema) -> EntityModel: - """Update an entity's content and metadata.""" - return (await self.update_entity_with_content(entity, schema)).entity + def _build_entity_fields( + self, + *, + file_path: Path, + title: str, + note_type: str, + content_type: str, + metadata: dict[str, Any] | None, + permalink: str | None, + ) -> dict[str, Any]: + """Build the entity row data that mirrors accepted markdown state.""" + normalized_metadata = normalize_frontmatter_metadata(metadata or {}) + entity_metadata = {k: v for k, v in normalized_metadata.items() if v is not None} + return { + "title": title, + "note_type": note_type, + "file_path": file_path.as_posix(), + "content_type": content_type, + "entity_metadata": entity_metadata or None, + "permalink": permalink, + } - async def update_entity_with_content( - self, entity: EntityModel, schema: EntitySchema - ) -> EntityWriteResult: - """Update an entity and return both the entity row and written markdown.""" - logger.debug( - f"Updating entity with permalink: {entity.permalink} content-type: {schema.content_type}" + async def _build_prepared_write( + self, + *, + file_path: Path, + markdown_content: str, + entity_fields: dict[str, Any], + ) -> PreparedEntityWrite: + """Parse accepted markdown once so all persistence paths share the same state.""" + # Trigger: both local and cloud-style callers need the exact same accepted markdown. + # Why: parsing twice creates opportunities for drift between "what we accepted" and + # "what we indexed/persisted". + # Outcome: callers carry one prepared object through file writes, DB writes, and indexing. + entity_markdown = await self.entity_parser.parse_markdown_content( + file_path=file_path, + content=markdown_content, + ) + return PreparedEntityWrite( + file_path=file_path, + markdown_content=markdown_content, + search_content=remove_frontmatter(markdown_content), + entity_fields=entity_fields, + entity_markdown=entity_markdown, ) - # Convert file path string to Path - file_path = Path(entity.file_path) - - with telemetry.scope( - "entity_service.update.read_file", - domain="entity_service", - action="update", - phase="read_file", - ): - existing_content = await self.file_service.read_file_content(file_path) - with telemetry.scope( - "entity_service.update.parse_markdown", - domain="entity_service", - action="update", - phase="parse_markdown", - ): - existing_markdown = await self.entity_parser.parse_markdown_content( - file_path=file_path, - content=existing_content, - ) - - # Parse content frontmatter to check for user-specified permalink and note_type - content_markdown = None - if schema.content and has_frontmatter(schema.content): - content_frontmatter = parse_frontmatter(schema.content) - - # If content has type, use it to override the schema note_type - if "type" in content_frontmatter: - schema.note_type = content_frontmatter["type"] - - if "permalink" in content_frontmatter: - content_permalink = _frontmatter_permalink(content_frontmatter["permalink"]) - if content_permalink is not None: - content_markdown = self._build_frontmatter_markdown( - schema.title, - schema.note_type, - content_permalink, - ) - - # Check if we need to update the permalink based on content frontmatter (unless disabled) - new_permalink = entity.permalink # Default to existing - if self.app_config and not self.app_config.disable_permalinks: - if content_markdown and content_markdown.frontmatter.permalink: - # Resolve permalink with the new content frontmatter - with telemetry.scope( - "entity_service.update.resolve_permalink", - domain="entity_service", - action="update", - phase="resolve_permalink", - ): - resolved_permalink = await self.resolve_permalink(file_path, content_markdown) - if resolved_permalink != entity.permalink: - new_permalink = resolved_permalink - # Update the schema to use the new permalink - schema._permalink = new_permalink - - # Create post with new content from schema - post = await schema_to_markdown(schema) - - # Merge new metadata with existing metadata - existing_markdown.frontmatter.metadata.update(post.metadata) - - # Always ensure the permalink in the metadata is the canonical one from the database. - # The schema_to_markdown call above uses EntitySchema.permalink which computes a - # non-prefixed permalink (e.g., "test/note"). The metadata merge on the previous line - # would overwrite the project-prefixed permalink (e.g., "project/test/note") stored - # in the existing file. Setting it unconditionally preserves the correct value. - existing_markdown.frontmatter.metadata["permalink"] = new_permalink - - # Create a new post with merged metadata. - # Avoid **metadata unpacking — user frontmatter may contain reserved keys - # like 'content' or 'handler' that conflict with Post.__init__ (cloud#375). - merged_post = frontmatter.Post(post.content) - merged_post.metadata.update(existing_markdown.frontmatter.metadata) - - final_content = dump_frontmatter(merged_post) - with telemetry.scope( - "entity_service.update.write_file", - domain="entity_service", - action="update", - phase="write_file", - ): - checksum = await self.file_service.write_file(file_path, final_content) - - with telemetry.scope( - "entity_service.update.parse_markdown", - domain="entity_service", - action="update", - phase="parse_markdown", - ): - entity_markdown = await self.entity_parser.parse_markdown_content( - file_path=file_path, - content=final_content, - ) + async def prepare_create_entity_content( + self, + schema: EntitySchema, + *, + check_storage_exists: bool = True, + skip_conflict_check: bool = False, + ) -> PreparedEntityWrite: + """Derive accepted markdown and entity fields for a new note. + + This is a public prepare step: it resolves frontmatter overrides, + permalink semantics, and the final markdown body, but it does not write + files or mutate database rows. + + Storage touch points: + - When ``check_storage_exists`` is ``True`` (the default), this method + calls ``file_service.exists(file_path)`` and raises + ``EntityAlreadyExistsError`` if the target already exists. + - When ``check_storage_exists`` is ``False``, callers opt into DB-first + acceptance and must perform any external storage conflict handling + themselves. + """ + # Work on a copy so prepare methods are pure from the caller's perspective. + # The router/service layer still receives the same accepted result, but we avoid mutating + # the original schema instance in surprising ways. + schema = schema.model_copy(deep=True) + file_path = Path(schema.file_path) - with telemetry.scope( - "entity_service.update.upsert_entity", - domain="entity_service", - action="update", - phase="upsert_entity", - ): - entity = await self.upsert_entity_from_markdown( - file_path, entity_markdown, is_new=False + if check_storage_exists and await self.file_service.exists(file_path): + raise EntityAlreadyExistsError( + f"file for entity {schema.directory}/{schema.title} already exists: {file_path}" ) - with telemetry.scope( - "entity_service.update.update_checksum", - domain="entity_service", - action="update", - phase="update_checksum", - ): - entity = await self.repository.update(entity.id, {"checksum": checksum}) - if not entity: # pragma: no cover - raise ValueError(f"Failed to update entity checksum after update: {file_path}") - - return EntityWriteResult( - entity=entity, - content=final_content, - search_content=remove_frontmatter(final_content), + content_markdown = self._apply_schema_frontmatter_overrides(schema) + permalink = await self._resolve_schema_permalink( + schema, + file_path=file_path, + content_markdown=content_markdown, + skip_conflict_check=skip_conflict_check, ) - async def fast_write_entity( - self, - schema: EntitySchema, - external_id: Optional[str] = None, - ) -> EntityModel: - """Write file and upsert a minimal entity row for fast responses.""" - logger.debug( - "Fast-writing entity", + # Build the final markdown once here. Local mode will write it immediately; cloud mode can + # store it in note_content first and materialize later without re-deriving anything. + post = await schema_to_markdown(schema) + markdown_content = dump_frontmatter(post) + entity_fields = self._build_entity_fields( + file_path=file_path, title=schema.title, - external_id=external_id, + note_type=schema.note_type, content_type=schema.content_type, + metadata=post.metadata, + permalink=permalink, + ) + return await self._build_prepared_write( + file_path=file_path, + markdown_content=markdown_content, + entity_fields=entity_fields, ) - # --- Identity & File Path --- - with telemetry.scope( - "entity_service.fast_write.resolve_entity", - domain="entity_service", - action="fast_write", - phase="resolve_entity", - ): - existing = ( - await self.repository.get_by_external_id(external_id) if external_id else None - ) - - # Trigger: external_id already exists - # Why: avoid duplicate entities when title-derived paths change - # Outcome: update in-place and keep the existing file path - file_path = Path(existing.file_path) if existing else Path(schema.file_path) - - if not existing and await self.file_service.exists(file_path): - raise EntityAlreadyExistsError( - f"file for entity {schema.directory}/{schema.title} already exists: {file_path}" - ) + async def prepare_update_entity_content( + self, + entity: EntityModel, + schema: EntitySchema, + existing_content: str, + *, + skip_conflict_check: bool = False, + ) -> PreparedEntityWrite: + """Derive accepted markdown and entity fields for a full note replacement. - # --- Frontmatter Overrides --- - content_markdown = None - if schema.content and has_frontmatter(schema.content): - content_frontmatter = parse_frontmatter(schema.content) + This method does not read or write storage on its own. The caller must + supply ``existing_content`` for the current note body because full updates + preserve unrecognized frontmatter keys from that explicit base content. + No database rows are mutated here. + """ + schema = schema.model_copy(deep=True) + file_path = Path(entity.file_path) + existing_markdown = await self.entity_parser.parse_markdown_content( + file_path=file_path, + content=existing_content, + ) - if "type" in content_frontmatter: - schema.note_type = content_frontmatter["type"] + content_markdown = self._apply_schema_frontmatter_overrides(schema) + resolved_permalink = await self._resolve_schema_permalink( + schema, + file_path=file_path, + current_permalink=entity.permalink, + content_markdown=content_markdown, + skip_conflict_check=skip_conflict_check, + ) - if "permalink" in content_frontmatter: - content_permalink = _frontmatter_permalink(content_frontmatter["permalink"]) - if content_permalink is not None: - content_markdown = self._build_frontmatter_markdown( - schema.title, - schema.note_type, - content_permalink, - ) + post = await schema_to_markdown(schema) - # --- Permalink Resolution --- - if self.app_config and self.app_config.disable_permalinks: - schema._permalink = "" - else: - if existing and not (content_markdown and content_markdown.frontmatter.permalink): - with telemetry.scope( - "entity_service.fast_write.resolve_permalink", - domain="entity_service", - action="fast_write", - phase="resolve_permalink", - ): - schema._permalink = existing.permalink or await self.resolve_permalink( - file_path, skip_conflict_check=True - ) - else: - with telemetry.scope( - "entity_service.fast_write.resolve_permalink", - domain="entity_service", - action="fast_write", - phase="resolve_permalink", - ): - schema._permalink = await self.resolve_permalink( - file_path, content_markdown, skip_conflict_check=True - ) + # Full updates preserve unrecognized frontmatter keys from the existing note. + # That keeps Basic Memory's write semantics stable for hand-authored metadata while still + # letting the incoming schema replace the fields it explicitly owns. + merged_metadata = deepcopy(existing_markdown.frontmatter.metadata) + merged_metadata.update(post.metadata) + merged_metadata["permalink"] = resolved_permalink - post = await schema_to_markdown(schema) - final_content = dump_frontmatter(post) - with telemetry.scope( - "entity_service.fast_write.write_file", - domain="entity_service", - action="fast_write", - phase="write_file", - ): - checksum = await self.file_service.write_file(file_path, final_content) - - # --- Minimal DB Upsert --- - metadata = normalize_frontmatter_metadata(post.metadata or {}) - entity_metadata = {k: v for k, v in metadata.items() if v is not None} - update_data = { - "title": schema.title, - "note_type": schema.note_type, - "file_path": file_path.as_posix(), - "content_type": schema.content_type, - "entity_metadata": entity_metadata or None, - "permalink": schema.permalink, - "checksum": checksum, - "updated_at": datetime.now().astimezone(), - } + merged_post = frontmatter.Post(post.content) + merged_post.metadata.update(merged_metadata) - user_id = self.get_user_id() + markdown_content = dump_frontmatter(merged_post) + entity_fields = self._build_entity_fields( + file_path=file_path, + title=schema.title, + note_type=schema.note_type, + content_type=schema.content_type, + metadata=merged_post.metadata, + permalink=resolved_permalink, + ) + return await self._build_prepared_write( + file_path=file_path, + markdown_content=markdown_content, + entity_fields=entity_fields, + ) - if existing: - # Preserve existing created_by; only update last_updated_by - if user_id is not None: - update_data["last_updated_by"] = user_id - with telemetry.scope( - "entity_service.fast_write.upsert_entity", - domain="entity_service", - action="fast_write", - phase="upsert_entity", - ): - updated = await self.repository.update(existing.id, update_data) - if not updated: - raise ValueError(f"Failed to update entity in database: {existing.id}") - return updated - - create_data = dict(update_data) - if external_id is not None: - create_data["external_id"] = external_id - if user_id is not None: - create_data["created_by"] = user_id - create_data["last_updated_by"] = user_id - with telemetry.scope( - "entity_service.fast_write.upsert_entity", - domain="entity_service", - action="fast_write", - phase="upsert_entity", - ): - return await self.repository.create(create_data) - - async def fast_edit_entity( + async def prepare_edit_entity_content( self, entity: EntityModel, + current_content: str, + *, operation: str, content: str, section: Optional[str] = None, find_text: Optional[str] = None, expected_replacements: int = 1, - ) -> EntityModel: - """Edit an entity quickly and defer full indexing to background.""" - logger.debug(f"Fast editing entity: {entity.external_id}, operation: {operation}") + skip_conflict_check: bool = False, + ) -> PreparedEntityWrite: + """Derive accepted markdown and entity fields for an edit request. + This method operates only on the caller-provided ``current_content``. It + does not read files, write files, or mutate database rows. That makes the + edit base explicit so higher layers can reject stale content instead of + silently editing whichever storage copy happens to be newest. + """ file_path = Path(entity.file_path) - with telemetry.scope( - "entity_service.fast_edit.read_file", - domain="entity_service", - action="fast_edit", - phase="read_file", - ): - current_content, _ = await self.file_service.read_file(file_path) - with telemetry.scope( - "entity_service.fast_edit.apply_operation", - domain="entity_service", - action="fast_edit", - phase="apply_operation", - ): - new_content = self.apply_edit_operation( - current_content, operation, content, section, find_text, expected_replacements - ) - with telemetry.scope( - "entity_service.fast_edit.write_file", - domain="entity_service", - action="fast_edit", - phase="write_file", - ): - checksum = await self.file_service.write_file(file_path, new_content) - - # --- Frontmatter Overrides --- - update_data = { - "checksum": checksum, - "updated_at": datetime.now().astimezone(), - } - user_id = self.get_user_id() - if user_id is not None: - update_data["last_updated_by"] = user_id + # Edits are intentionally based on explicit caller-supplied content. That makes stale-base + # handling visible to the caller instead of quietly reading whatever persistence layer + # happens to be newest. + markdown_content = self.apply_edit_operation( + current_content, + operation, + content, + section, + find_text, + expected_replacements, + ) + + title = entity.title + note_type = entity.note_type + permalink = entity.permalink + metadata = entity.entity_metadata - content_markdown = None - if has_frontmatter(new_content): - content_frontmatter = parse_frontmatter(new_content) + if has_frontmatter(markdown_content): + content_frontmatter = parse_frontmatter(markdown_content) - # Coerce to string — YAML may parse these as lists (cloud#376) if "title" in content_frontmatter: - update_data["title"] = _coerce_to_string(content_frontmatter["title"]) + title = _coerce_to_string(content_frontmatter["title"]) if "type" in content_frontmatter: - update_data["note_type"] = _coerce_to_string(content_frontmatter["type"]) + note_type = _coerce_to_string(content_frontmatter["type"]) - if "permalink" in content_frontmatter: + if self.app_config and self.app_config.disable_permalinks: + permalink = entity.permalink + elif "permalink" in content_frontmatter: content_permalink = _frontmatter_permalink(content_frontmatter["permalink"]) if content_permalink is not None: content_markdown = self._build_frontmatter_markdown( - _coerce_to_string(update_data.get("title", entity.title)), - _coerce_to_string(update_data.get("note_type", entity.note_type)), + title, + note_type, content_permalink, ) + permalink = await self.resolve_permalink( + file_path, + content_markdown, + skip_conflict_check=skip_conflict_check, + ) - metadata = normalize_frontmatter_metadata(content_frontmatter or {}) - update_data["entity_metadata"] = {k: v for k, v in metadata.items() if v is not None} + normalized_metadata = normalize_frontmatter_metadata(content_frontmatter or {}) + metadata = {k: v for k, v in normalized_metadata.items() if v is not None} or None - # --- Permalink Resolution --- - if self.app_config and self.app_config.disable_permalinks: - update_data["permalink"] = None - elif content_markdown and content_markdown.frontmatter.permalink: - with telemetry.scope( - "entity_service.fast_edit.resolve_permalink", - domain="entity_service", - action="fast_edit", - phase="resolve_permalink", - ): - update_data["permalink"] = await self.resolve_permalink( - file_path, content_markdown, skip_conflict_check=True - ) + entity_fields = self._build_entity_fields( + file_path=file_path, + title=title, + note_type=note_type, + content_type=entity.content_type, + metadata=metadata, + permalink=permalink, + ) + return await self._build_prepared_write( + file_path=file_path, + markdown_content=markdown_content, + entity_fields=entity_fields, + ) - with telemetry.scope( - "entity_service.fast_edit.update_entity", - domain="entity_service", - action="fast_edit", - phase="update_entity", - ): - updated = await self.repository.update(entity.id, update_data) - if not updated: - raise ValueError(f"Failed to update entity in database: {entity.id}") - return updated - - async def reindex_entity(self, entity_id: int) -> None: - """Parse file content and rebuild observations/relations/search for an entity.""" - with telemetry.scope( - "entity_service.reindex.load_entity", - domain="entity_service", - action="reindex", - phase="load_entity", - ): - entity = await self.repository.find_by_id(entity_id) - if not entity: - raise EntityNotFoundError(f"Entity not found: {entity_id}") + async def create_or_update_entity(self, schema: EntitySchema) -> Tuple[EntityModel, bool]: + """Create new entity or update existing one. + Returns: (entity, is_new) where is_new is True if a new entity was created + """ + logger.debug( + f"Creating or updating entity: {schema.file_path}, permalink: {schema.permalink}" + ) - file_path = Path(entity.file_path) - with telemetry.scope( - "entity_service.reindex.read_file", - domain="entity_service", - action="reindex", - phase="read_file", - ): - content = await self.file_service.read_file_content(file_path) - with telemetry.scope( - "entity_service.reindex.parse_markdown", - domain="entity_service", - action="reindex", - phase="parse_markdown", - ): - entity_markdown = await self.entity_parser.parse_markdown_content( - file_path=file_path, - content=content, + # Try to find existing entity using strict resolution (no fuzzy search) + # This prevents incorrectly matching similar file paths like "Node A.md" and "Node C.md" + existing = await self.link_resolver.resolve_link( + schema.file_path, + strict=True, + load_relations=False, + ) + if not existing and schema.permalink: + existing = await self.link_resolver.resolve_link( + schema.permalink, + strict=True, + load_relations=False, ) - with telemetry.scope( - "entity_service.reindex.upsert_entity", - domain="entity_service", - action="reindex", - phase="upsert_entity", - ): - updated = await self.upsert_entity_from_markdown( - file_path, entity_markdown, is_new=False - ) - with telemetry.scope( - "entity_service.reindex.update_checksum", - domain="entity_service", - action="reindex", - phase="update_checksum", - ): - checksum = await self.file_service.compute_checksum(file_path) - updated = await self.repository.update(updated.id, {"checksum": checksum}) - if not updated: - raise ValueError(f"Failed to update entity in database: {entity.id}") - - if self.search_service: - with telemetry.scope( - "entity_service.reindex.search_index", - domain="entity_service", - action="reindex", - phase="search_index", - ): - await self.search_service.index_entity_data(updated, content=content) + if existing: + logger.debug(f"Found existing entity: {existing.file_path}") + return await self.update_entity(existing, self._coerce_schema_input(schema)), False + else: + # Create new entity + return await self.create_entity(self._coerce_schema_input(schema)), True + + async def create_entity(self, schema: EntitySchema) -> EntityModel: + """Create a new entity and write to filesystem.""" + return (await self.create_entity_with_content(schema)).entity + + async def create_entity_with_content(self, schema: EntitySchema) -> EntityWriteResult: + """Create a new entity and return both the entity row and written markdown.""" + logger.debug(f"Creating entity: {schema.title}") + # --- Prepare Accepted State --- + # Derive the canonical markdown/entity fields before touching the filesystem. + prepared = await self.prepare_create_entity_content(schema) + self._sync_prepared_schema_state(schema, prepared) + # --- Persist File, Then Indexable DB State --- + # Local mode still writes the file immediately; the prepare object keeps semantics separate + # from that persistence step. + checksum = await self.file_service.write_file(prepared.file_path, prepared.markdown_content) + entity = await self.upsert_entity_from_markdown( + prepared.file_path, + prepared.entity_markdown, + is_new=True, + ) + updated = await self.repository.update(entity.id, {"checksum": checksum}) + if not updated: # pragma: no cover + raise ValueError(f"Failed to update entity checksum after create: {entity.id}") + return EntityWriteResult( + entity=updated, + content=prepared.markdown_content, + search_content=prepared.search_content, + ) + + async def update_entity(self, entity: EntityModel, schema: EntitySchema) -> EntityModel: + """Update an entity's content and metadata.""" + return ( + await self.update_entity_with_content(entity, self._coerce_schema_input(schema)) + ).entity + + async def update_entity_with_content( + self, entity: EntityModel, schema: EntitySchema + ) -> EntityWriteResult: + """Update an entity and return both the entity row and written markdown.""" + schema = self._coerce_schema_input(schema) + logger.debug( + f"Updating entity with permalink: {entity.permalink} content-type: {schema.content_type}" + ) + + # --- Read Current File State --- + # Full replacements merge with existing frontmatter, so local mode still needs the current + # file contents as input to the prepare step. + existing_content = await self.file_service.read_file_content(entity.file_path) + prepared = await self.prepare_update_entity_content( + entity, + schema, + existing_content, + ) + self._sync_prepared_schema_state(schema, prepared) + # --- Persist Prepared State --- + checksum = await self.file_service.write_file( + prepared.file_path, + prepared.markdown_content, + ) + entity = await self.upsert_entity_from_markdown( + prepared.file_path, + prepared.entity_markdown, + is_new=False, + ) + entity = await self.repository.update(entity.id, {"checksum": checksum}) + if not entity: # pragma: no cover + raise ValueError(f"Failed to update entity checksum after update: {prepared.file_path}") + + return EntityWriteResult( + entity=entity, + content=prepared.markdown_content, + search_content=prepared.search_content, + ) async def delete_entity(self, permalink_or_id: str | int) -> bool: """Delete entity and its file.""" @@ -890,22 +816,10 @@ async def update_entity_and_observations( """ logger.debug(f"Updating entity and observations: {file_path}") - with telemetry.scope( - "upsert.update.fetch_entity", - domain="entity_service", - action="upsert", - phase="fetch_entity", - ): - db_entity = await self.repository.get_by_file_path(file_path.as_posix()) + db_entity = await self.repository.get_by_file_path(file_path.as_posix()) # Clear observations for entity - with telemetry.scope( - "upsert.update.delete_observations", - domain="entity_service", - action="upsert", - phase="delete_observations", - ): - await self.observation_repository.delete_by_fields(entity_id=db_entity.id) + await self.observation_repository.delete_by_fields(entity_id=db_entity.id) # add new observations observations = [ @@ -919,14 +833,7 @@ async def update_entity_and_observations( ) for obs in markdown.observations ] - with telemetry.scope( - "upsert.update.insert_observations", - domain="entity_service", - action="upsert", - phase="insert_observations", - count=len(observations), - ): - await self.observation_repository.add_all(observations) + await self.observation_repository.add_all(observations) # update values from markdown db_entity = entity_model_from_markdown(file_path, markdown, db_entity) @@ -940,16 +847,10 @@ async def update_entity_and_observations( db_entity.last_updated_by = user_id # update entity - with telemetry.scope( - "upsert.update.save_entity", - domain="entity_service", - action="upsert", - phase="save_entity", - ): - return await self.repository.update( - db_entity.id, - db_entity, - ) + return await self.repository.update( + db_entity.id, + db_entity, + ) async def upsert_entity_from_markdown( self, @@ -980,13 +881,7 @@ async def update_entity_relations( logger.debug(f"Updating relations for entity: {entity.file_path}") # Clear existing relations first - with telemetry.scope( - "upsert.relations.delete_existing", - domain="entity_service", - action="upsert", - phase="delete_relations", - ): - await self.relation_repository.delete_outgoing_relations_from_entity(entity_id) + await self.relation_repository.delete_outgoing_relations_from_entity(entity_id) # Batch resolve all relation targets in parallel if markdown.relations: @@ -1005,14 +900,7 @@ async def update_entity_relations( ] # Execute all lookups in parallel - with telemetry.scope( - "upsert.relations.resolve_links", - domain="entity_service", - action="upsert", - phase="resolve_links", - count=len(lookup_tasks), - ): - resolved_entities = await asyncio.gather(*lookup_tasks, return_exceptions=True) + resolved_entities = await asyncio.gather(*lookup_tasks, return_exceptions=True) # Process results and create relation records relations_to_add = [] @@ -1041,36 +929,23 @@ async def update_entity_relations( # Batch insert all relations if relations_to_add: - with telemetry.scope( - "upsert.relations.insert_relations", - domain="entity_service", - action="upsert", - phase="insert_relations", - count=len(relations_to_add), - ): - try: - await self.relation_repository.add_all(relations_to_add) - except IntegrityError: - # Some relations might be duplicates - fall back to individual inserts - logger.debug("Batch relation insert failed, trying individual inserts") - for relation in relations_to_add: - try: - await self.relation_repository.add(relation) - except IntegrityError: - # Unique constraint violation - relation already exists - logger.debug( - f"Skipping duplicate relation {relation.relation_type} from {entity.permalink}" - ) - continue + try: + await self.relation_repository.add_all(relations_to_add) + except IntegrityError: + # Some relations might be duplicates - fall back to individual inserts + logger.debug("Batch relation insert failed, trying individual inserts") + for relation in relations_to_add: + try: + await self.relation_repository.add(relation) + except IntegrityError: + # Unique constraint violation - relation already exists + logger.debug( + f"Skipping duplicate relation {relation.relation_type} from {entity.permalink}" + ) + continue # Reload entity with relations via PK lookup (faster than get_by_file_path string match) - with telemetry.scope( - "upsert.relations.reload_entity", - domain="entity_service", - action="upsert", - phase="reload_entity", - ): - reloaded = await self.repository.find_by_ids([entity_id]) + reloaded = await self.repository.find_by_ids([entity_id]) return reloaded[0] async def edit_entity( @@ -1122,82 +997,51 @@ async def edit_entity_with_content( """Edit an entity and return both the entity row and written markdown.""" logger.debug(f"Editing entity: {identifier}, operation: {operation}") - with telemetry.scope( - "entity_service.edit.resolve_entity", - domain="entity_service", - action="edit", - phase="resolve_entity", - ): - entity = await self.link_resolver.resolve_link( - identifier, - strict=True, - load_relations=False, - ) + entity = await self.link_resolver.resolve_link( + identifier, + strict=True, + load_relations=False, + ) if not entity: raise EntityNotFoundError(f"Entity not found: {identifier}") file_path = Path(entity.file_path) - with telemetry.scope( - "entity_service.edit.read_file", - domain="entity_service", - action="edit", - phase="read_file", - ): - current_content, _ = await self.file_service.read_file(file_path) - - with telemetry.scope( - "entity_service.edit.apply_operation", - domain="entity_service", - action="edit", - phase="apply_operation", - ): - new_content = self.apply_edit_operation( - current_content, operation, content, section, find_text, expected_replacements - ) + current_content, _ = await self.file_service.read_file(file_path) + # --- Prepare Against Explicit Base Content --- + # The edit operation is the semantic step; file/DB writes below are just persistence of that + # accepted result. + prepared = await self.prepare_edit_entity_content( + entity, + current_content, + operation=operation, + content=content, + section=section, + find_text=find_text, + expected_replacements=expected_replacements, + ) - with telemetry.scope( - "entity_service.edit.write_file", - domain="entity_service", - action="edit", - phase="write_file", - ): - checksum = await self.file_service.write_file(file_path, new_content) - - with telemetry.scope( - "entity_service.edit.parse_markdown", - domain="entity_service", - action="edit", - phase="parse_markdown", - ): - entity_markdown = await self.entity_parser.parse_markdown_content( - file_path=file_path, - content=new_content, - ) + checksum = await self.file_service.write_file( + file_path, + prepared.markdown_content, + ) - with telemetry.scope( - "entity_service.edit.upsert_entity", - domain="entity_service", - action="edit", - phase="upsert_entity", - ): - entity = await self.upsert_entity_from_markdown( - file_path, entity_markdown, is_new=False - ) + # --- Rebuild Structured Knowledge State --- + # Non-fast edits remain fully synchronous locally: once the file write succeeds, we refresh + # observations, relations, and checksum in the same request. + entity = await self.upsert_entity_from_markdown( + file_path, + prepared.entity_markdown, + is_new=False, + ) - with telemetry.scope( - "entity_service.edit.update_checksum", - domain="entity_service", - action="edit", - phase="update_checksum", - ): - entity = await self.repository.update(entity.id, {"checksum": checksum}) + entity = await self.repository.update(entity.id, {"checksum": checksum}) if not entity: # pragma: no cover raise ValueError(f"Failed to update entity checksum after edit: {file_path}") return EntityWriteResult( entity=entity, - content=new_content, - search_content=remove_frontmatter(new_content), + content=prepared.markdown_content, + search_content=prepared.search_content, ) def apply_edit_operation( diff --git a/tests/api/v2/test_knowledge_router.py b/tests/api/v2/test_knowledge_router.py index d6121a80..3733623e 100644 --- a/tests/api/v2/test_knowledge_router.py +++ b/tests/api/v2/test_knowledge_router.py @@ -375,10 +375,10 @@ async def test_update_entity_by_id( @pytest.mark.asyncio -async def test_update_entity_by_id_fast_does_not_duplicate( +async def test_update_entity_by_id_does_not_duplicate( client: AsyncClient, v2_project_url, entity_repository ): - """Fast PUT updates the existing external_id without creating duplicates.""" + """PUT updates the existing external_id without creating duplicates.""" create_data = { "title": "07 - Get Started", "directory": "docs", @@ -405,10 +405,10 @@ async def test_update_entity_by_id_fast_does_not_duplicate( @pytest.mark.asyncio -async def test_put_entity_fast_returns_minimal_row( +async def test_put_entity_with_fast_param_returns_fully_indexed_row( client: AsyncClient, v2_project_url, entity_repository ): - """Fast PUT returns a minimal row and persists the external_id immediately.""" + """PUT ignores the legacy fast param and still returns a fully indexed row.""" external_id = str(uuid.uuid4()) update_data = { "title": "FastPutEntity", @@ -431,18 +431,19 @@ async def test_put_entity_fast_returns_minimal_row( assert response.status_code == 201 created_entity = EntityResponseV2.model_validate(response.json()) assert created_entity.external_id == external_id - assert created_entity.observations == [] - assert created_entity.relations == [] + assert len(created_entity.observations) == 1 + assert len(created_entity.relations) == 1 db_entity = await entity_repository.get_by_external_id(external_id) assert db_entity is not None @pytest.mark.asyncio -async def test_fast_create_schedules_reindex_task( - client: AsyncClient, v2_project_url, task_scheduler_spy +async def test_create_with_fast_param_does_not_schedule_reindex_task( + client: AsyncClient, v2_project_url, task_scheduler_spy, app_config ): - """Fast create should enqueue a background reindex task.""" + """Legacy fast=true should not resurrect the removed reindex note-write path.""" + app_config.semantic_search_enabled = False start_count = len(task_scheduler_spy) response = await client.post( f"{v2_project_url}/knowledge/entities", @@ -454,18 +455,14 @@ async def test_fast_create_schedules_reindex_task( params={"fast": True}, ) assert response.status_code == 200 - assert len(task_scheduler_spy) == start_count + 1 - created_entity = EntityResponseV2.model_validate(response.json()) - scheduled = task_scheduler_spy[-1] - assert scheduled["task_name"] == "reindex_entity" - assert scheduled["payload"]["entity_id"] == created_entity.id + assert len(task_scheduler_spy) == start_count @pytest.mark.asyncio -async def test_non_fast_create_schedules_vector_sync_when_semantic_enabled( +async def test_create_schedules_vector_sync_when_semantic_enabled( client: AsyncClient, v2_project_url, task_scheduler_spy, app_config ): - """Non-fast create should schedule vector sync when semantic mode is enabled.""" + """Create should schedule vector sync when semantic mode is enabled.""" app_config.semantic_search_enabled = True start_count = len(task_scheduler_spy) @@ -488,10 +485,10 @@ async def test_non_fast_create_schedules_vector_sync_when_semantic_enabled( @pytest.mark.asyncio -async def test_non_fast_create_skips_vector_sync_when_semantic_disabled( +async def test_create_skips_vector_sync_when_semantic_disabled( client: AsyncClient, v2_project_url, task_scheduler_spy, app_config ): - """Non-fast create should not schedule vector sync when semantic mode is disabled.""" + """Create should not schedule vector sync when semantic mode is disabled.""" app_config.semantic_search_enabled = False start_count = len(task_scheduler_spy) diff --git a/tests/api/v2/test_knowledge_router_telemetry.py b/tests/api/v2/test_knowledge_router_telemetry.py index 771463de..a203a2a7 100644 --- a/tests/api/v2/test_knowledge_router_telemetry.py +++ b/tests/api/v2/test_knowledge_router_telemetry.py @@ -9,7 +9,7 @@ from typing import Any, cast import pytest -from fastapi import BackgroundTasks, Response +from fastapi import Response from basic_memory.schemas.base import Entity from basic_memory.schemas.request import EditEntityRequest @@ -79,10 +79,6 @@ class FakeTaskScheduler: def schedule(self, *args, **kwargs): return None - class FakeFileService: - async def read_file_content(self, path): - raise AssertionError("non-fast create should not re-read file content") - result = await knowledge_router_module.create_entity( project_id=123, data=Entity( @@ -92,13 +88,10 @@ async def read_file_content(self, path): content_type="text/markdown", content="telemetry content", ), - background_tasks=BackgroundTasks(), entity_service=cast(Any, FakeEntityService()), search_service=cast(Any, FakeSearchService()), task_scheduler=FakeTaskScheduler(), - file_service=cast(Any, FakeFileService()), app_config=cast(Any, SimpleNamespace(semantic_search_enabled=False)), - fast=False, ) assert result.content == response_content @@ -134,10 +127,6 @@ class FakeTaskScheduler: def schedule(self, *args, **kwargs): return None - class FakeFileService: - async def read_file_content(self, path): - raise AssertionError("non-fast update should not re-read file content") - response = Response() result = await knowledge_router_module.update_entity_by_id( data=Entity( @@ -148,16 +137,13 @@ async def read_file_content(self, path): content="updated telemetry content", ), response=response, - background_tasks=BackgroundTasks(), project_id=123, entity_service=cast(Any, FakeEntityService()), search_service=cast(Any, FakeSearchService()), entity_repository=cast(Any, FakeEntityRepository()), task_scheduler=FakeTaskScheduler(), - file_service=cast(Any, FakeFileService()), app_config=cast(Any, SimpleNamespace(semantic_search_enabled=False)), entity_id=entity.external_id, - fast=False, ) assert result.content == response_content @@ -193,22 +179,15 @@ class FakeTaskScheduler: def schedule(self, *args, **kwargs): return None - class FakeFileService: - async def read_file_content(self, path): - raise AssertionError("non-fast edit should not re-read file content") - result = await knowledge_router_module.edit_entity_by_id( data=EditEntityRequest(operation="append", content="edited telemetry content"), - background_tasks=BackgroundTasks(), project_id=123, entity_service=cast(Any, FakeEntityService()), search_service=cast(Any, FakeSearchService()), entity_repository=cast(Any, FakeEntityRepository()), task_scheduler=FakeTaskScheduler(), - file_service=cast(Any, FakeFileService()), app_config=cast(Any, SimpleNamespace(semantic_search_enabled=False)), entity_id=entity.external_id, - fast=False, ) assert result.content == response_content diff --git a/tests/mcp/clients/test_clients.py b/tests/mcp/clients/test_clients.py index 6d74d58e..aa2e7514 100644 --- a/tests/mcp/clients/test_clients.py +++ b/tests/mcp/clients/test_clients.py @@ -44,6 +44,7 @@ async def test_create_entity(self, monkeypatch): async def mock_call_post(client, url, **kwargs): assert "/v2/projects/proj-123/knowledge/entities" in url + assert kwargs.get("params") is None return mock_response monkeypatch.setattr(knowledge_mod, "call_post", mock_call_post) @@ -53,6 +54,66 @@ async def mock_call_post(client, url, **kwargs): result = await client.create_entity({"title": "Test"}) assert result.title == "Test" + @pytest.mark.asyncio + async def test_update_entity(self, monkeypatch): + """Test update_entity calls correct endpoint without fast query params.""" + from basic_memory.mcp.clients import knowledge as knowledge_mod + + mock_response = MagicMock() + mock_response.json.return_value = { + "permalink": "test", + "title": "Test", + "file_path": "test.md", + "note_type": "note", + "content_type": "text/markdown", + "observations": [], + "relations": [], + "created_at": "2024-01-01T00:00:00", + "updated_at": "2024-01-01T00:00:00", + } + + async def mock_call_put(client, url, **kwargs): + assert "/v2/projects/proj-123/knowledge/entities/entity-123" in url + assert kwargs.get("params") is None + return mock_response + + monkeypatch.setattr(knowledge_mod, "call_put", mock_call_put) + + mock_http = MagicMock() + client = KnowledgeClient(mock_http, "proj-123") + result = await client.update_entity("entity-123", {"title": "Test"}) + assert result.title == "Test" + + @pytest.mark.asyncio + async def test_patch_entity(self, monkeypatch): + """Test patch_entity calls correct endpoint without fast query params.""" + from basic_memory.mcp.clients import knowledge as knowledge_mod + + mock_response = MagicMock() + mock_response.json.return_value = { + "permalink": "test", + "title": "Test", + "file_path": "test.md", + "note_type": "note", + "content_type": "text/markdown", + "observations": [], + "relations": [], + "created_at": "2024-01-01T00:00:00", + "updated_at": "2024-01-01T00:00:00", + } + + async def mock_call_patch(client, url, **kwargs): + assert "/v2/projects/proj-123/knowledge/entities/entity-123" in url + assert kwargs.get("params") is None + return mock_response + + monkeypatch.setattr(knowledge_mod, "call_patch", mock_call_patch) + + mock_http = MagicMock() + client = KnowledgeClient(mock_http, "proj-123") + result = await client.patch_entity("entity-123", {"operation": "append"}) + assert result.title == "Test" + @pytest.mark.asyncio async def test_resolve_entity(self, monkeypatch): """Test resolve_entity returns external_id.""" diff --git a/tests/services/test_entity_service.py b/tests/services/test_entity_service.py index 1a97ab82..3fd450c0 100644 --- a/tests/services/test_entity_service.py +++ b/tests/services/test_entity_service.py @@ -514,69 +514,6 @@ async def test_update_note_entity_content(entity_service: EntityService, file_se assert metadata.get("status") == "draft" -@pytest.mark.asyncio -async def test_fast_write_and_reindex_entity( - entity_repository: EntityRepository, - observation_repository, - relation_repository, - entity_parser: EntityParser, - file_service: FileService, - link_resolver, - search_service: SearchService, - app_config: BasicMemoryConfig, -): - """Fast write should defer observations/relations until reindex.""" - service = EntityService( - entity_repository=entity_repository, - observation_repository=observation_repository, - relation_repository=relation_repository, - entity_parser=entity_parser, - file_service=file_service, - link_resolver=link_resolver, - search_service=search_service, - app_config=app_config, - ) - - schema = EntitySchema( - title="Reindex Target", - directory="test", - note_type="note", - content=dedent(""" - # Reindex Target - - - [note] Deferred observation - - relates_to [[Other Entity]] - """).strip(), - ) - external_id = str(uuid.uuid4()) - fast_entity = await service.fast_write_entity(schema, external_id=external_id) - - assert fast_entity.external_id == external_id - assert len(fast_entity.observations) == 0 - assert len(fast_entity.relations) == 0 - - await service.reindex_entity(fast_entity.id) - reindexed = await entity_repository.get_by_external_id(external_id) - - assert reindexed is not None - assert len(reindexed.observations) == 1 - assert len(reindexed.relations) == 1 - - -@pytest.mark.asyncio -async def test_fast_write_entity_generates_external_id(entity_service: EntityService): - """Fast write should generate an external_id when one is not provided.""" - title = f"Fast Write {uuid.uuid4()}" - schema = EntitySchema( - title=title, - directory="test", - note_type="note", - ) - - fast_entity = await entity_service.fast_write_entity(schema) - assert fast_entity.external_id - - @pytest.mark.asyncio async def test_create_or_update_new(entity_service: EntityService, file_service: FileService): """Should create a new entity.""" @@ -2499,92 +2436,6 @@ async def test_update_preserves_created_by(entity_service: EntityService): assert updated.last_updated_by == editor_id # updated -@pytest.mark.asyncio -async def test_fast_write_entity_sets_user_tracking(entity_service: EntityService): - """fast_write_entity sets created_by and last_updated_by on create.""" - user_id = str(uuid.uuid4()) - entity_service.get_user_id = lambda: user_id - - schema = EntitySchema( - title="Fast Write Tracked", - directory="test", - entity_type="note", - ) - entity = await entity_service.fast_write_entity(schema, external_id=str(uuid.uuid4())) - assert entity.created_by == user_id - assert entity.last_updated_by == user_id - - -@pytest.mark.asyncio -async def test_fast_write_entity_update_preserves_created_by(entity_service: EntityService): - """fast_write_entity update path preserves created_by, sets last_updated_by.""" - creator_id = str(uuid.uuid4()) - editor_id = str(uuid.uuid4()) - external_id = str(uuid.uuid4()) - - # Create - entity_service.get_user_id = lambda: creator_id - schema = EntitySchema( - title="Fast Write Update", - directory="test", - entity_type="note", - ) - entity = await entity_service.fast_write_entity(schema, external_id=external_id) - assert entity.created_by == creator_id - - # Update (same external_id triggers update path) - entity_service.get_user_id = lambda: editor_id - update_schema = EntitySchema( - title="Fast Write Update", - directory="test", - entity_type="note", - content="Updated", - ) - updated = await entity_service.fast_write_entity(update_schema, external_id=external_id) - assert updated.created_by == creator_id # preserved - assert updated.last_updated_by == editor_id # updated - - -@pytest.mark.asyncio -async def test_fast_edit_entity_sets_last_updated_by(entity_service: EntityService): - """fast_edit_entity sets last_updated_by on edit.""" - creator_id = str(uuid.uuid4()) - editor_id = str(uuid.uuid4()) - - # Create entity first - entity_service.get_user_id = lambda: creator_id - schema = EntitySchema( - title="Fast Edit Tracked", - directory="test", - entity_type="note", - content="Original content", - ) - entity = await entity_service.fast_write_entity(schema, external_id=str(uuid.uuid4())) - - # Edit as different user - entity_service.get_user_id = lambda: editor_id - edited = await entity_service.fast_edit_entity( - entity=entity, - operation="append", - content="\nAppended content", - ) - assert edited.created_by == creator_id # preserved - assert edited.last_updated_by == editor_id # updated - - -@pytest.mark.asyncio -async def test_fast_write_entity_null_user_id(entity_service: EntityService): - """fast_write_entity with default get_user_id (None) leaves tracking fields null.""" - schema = EntitySchema( - title="No User Tracking", - directory="test", - entity_type="note", - ) - entity = await entity_service.fast_write_entity(schema, external_id=str(uuid.uuid4())) - assert entity.created_by is None - assert entity.last_updated_by is None - - # --- Concurrent Delete Resilience --- diff --git a/tests/services/test_entity_service_prepare.py b/tests/services/test_entity_service_prepare.py new file mode 100644 index 00000000..d2a8cbb3 --- /dev/null +++ b/tests/services/test_entity_service_prepare.py @@ -0,0 +1,130 @@ +"""Parity tests for prepare-first entity write semantics.""" + +from __future__ import annotations + +import pytest + +from basic_memory.file_utils import parse_frontmatter +from basic_memory.schemas import Entity as EntitySchema + + +@pytest.mark.asyncio +async def test_prepare_create_entity_content_matches_create_entity_with_content( + entity_service, +) -> None: + schema = EntitySchema( + title="Prepared Create", + directory="notes", + note_type="note", + content="---\nstatus: draft\npermalink: prepared/create\n---\nCreate body", + ) + + prepared = await entity_service.prepare_create_entity_content(schema) + result = await entity_service.create_entity_with_content(schema) + + assert prepared.file_path.as_posix() == result.entity.file_path + assert prepared.markdown_content == result.content + assert prepared.search_content == result.search_content + assert prepared.entity_fields["title"] == result.entity.title + assert prepared.entity_fields["note_type"] == result.entity.note_type + assert prepared.entity_fields["permalink"] == result.entity.permalink + assert prepared.entity_fields["entity_metadata"] == result.entity.entity_metadata + + +@pytest.mark.asyncio +async def test_prepare_create_entity_content_can_skip_storage_existence_check( + entity_service, +) -> None: + async def fail_if_called(*args, **kwargs): + raise AssertionError("file_service.exists should not be called") + + entity_service.file_service.exists = fail_if_called + + prepared = await entity_service.prepare_create_entity_content( + EntitySchema( + title="Prepared Create No HEAD", + directory="notes", + note_type="note", + content="Create body", + ), + check_storage_exists=False, + ) + + assert prepared.file_path.as_posix() == "notes/Prepared Create No HEAD.md" + assert prepared.entity_fields["title"] == "Prepared Create No HEAD" + + +@pytest.mark.asyncio +async def test_prepare_update_entity_content_matches_update_entity_with_content( + entity_service, + file_service, +) -> None: + created = await entity_service.create_entity( + EntitySchema( + title="Prepared Update", + directory="notes", + note_type="note", + content="---\nstatus: draft\nowner: alice\n---\nOriginal body", + ) + ) + + existing_content = await file_service.read_file_content(created.file_path) + update_schema = EntitySchema( + title="Prepared Update", + directory="notes", + note_type="note", + content="---\nstatus: published\nreviewed_by: bob\n---\nUpdated body", + ) + + prepared = await entity_service.prepare_update_entity_content( + created, + update_schema, + existing_content, + ) + result = await entity_service.update_entity_with_content(created, update_schema) + prepared_frontmatter = parse_frontmatter(prepared.markdown_content) + + assert prepared.markdown_content == result.content + assert prepared.search_content == result.search_content + assert prepared.entity_fields["title"] == result.entity.title + assert prepared.entity_fields["note_type"] == result.entity.note_type + assert prepared.entity_fields["permalink"] == result.entity.permalink + assert prepared_frontmatter["owner"] == "alice" + assert prepared_frontmatter["status"] == "published" + assert prepared_frontmatter["reviewed_by"] == "bob" + + +@pytest.mark.asyncio +async def test_prepare_edit_entity_content_matches_edit_entity_with_content( + entity_service, + file_service, +) -> None: + created = await entity_service.create_entity( + EntitySchema( + title="Prepared Edit", + directory="notes", + note_type="note", + content="Before edit", + ) + ) + + current_content = await file_service.read_file_content(created.file_path) + prepared = await entity_service.prepare_edit_entity_content( + created, + current_content, + operation="find_replace", + content="After edit", + find_text="Before edit", + ) + result = await entity_service.edit_entity_with_content( + identifier=created.permalink, + operation="find_replace", + content="After edit", + find_text="Before edit", + ) + + assert prepared.markdown_content == result.content + assert prepared.search_content == result.search_content + assert prepared.entity_fields["title"] == result.entity.title + assert prepared.entity_fields["note_type"] == result.entity.note_type + assert prepared.entity_fields["permalink"] == result.entity.permalink diff --git a/tests/services/test_entity_service_telemetry.py b/tests/services/test_entity_service_telemetry.py index e13dfa44..696c6f38 100644 --- a/tests/services/test_entity_service_telemetry.py +++ b/tests/services/test_entity_service_telemetry.py @@ -1,4 +1,4 @@ -"""Telemetry coverage for entity service write/edit/reindex paths.""" +"""Telemetry coverage for the lower-level file spans used by entity service.""" from __future__ import annotations @@ -9,7 +9,7 @@ from basic_memory.schemas import Entity as EntitySchema -entity_service_module = importlib.import_module("basic_memory.services.entity_service") +telemetry_module = importlib.import_module("basic_memory.telemetry") def _capture_spans(): @@ -23,16 +23,10 @@ def fake_span(name: str, **attrs): return spans, fake_span -def _assert_names_in_order(names: list[str], expected: list[str]) -> None: - cursor = 0 - for expected_name in expected: - cursor = names.index(expected_name, cursor) + 1 - - @pytest.mark.asyncio -async def test_create_entity_emits_expected_phase_spans(entity_service, monkeypatch) -> None: +async def test_create_entity_emits_file_write_span(entity_service, monkeypatch) -> None: spans, fake_span = _capture_spans() - monkeypatch.setattr(entity_service_module.telemetry, "span", fake_span) + monkeypatch.setattr(telemetry_module, "span", fake_span) schema = EntitySchema( title="Telemetry Create", @@ -45,22 +39,11 @@ async def test_create_entity_emits_expected_phase_spans(entity_service, monkeypa entity = await entity_service.create_entity(schema) assert entity.title == "Telemetry Create" - span_names = [name for name, _ in spans] - _assert_names_in_order( - span_names, - [ - "entity_service.create.resolve_permalink", - "entity_service.create.write_file", - "file_service.write", - "entity_service.create.parse_markdown", - "entity_service.create.upsert_entity", - "entity_service.create.update_checksum", - ], - ) + assert [name for name, _ in spans] == ["file_service.write"] @pytest.mark.asyncio -async def test_edit_entity_emits_expected_phase_spans(entity_service, monkeypatch) -> None: +async def test_edit_entity_emits_file_read_and_write_spans(entity_service, monkeypatch) -> None: created = await entity_service.create_entity( EntitySchema( title="Telemetry Edit", @@ -72,7 +55,7 @@ async def test_edit_entity_emits_expected_phase_spans(entity_service, monkeypatc ) spans, fake_span = _capture_spans() - monkeypatch.setattr(entity_service_module.telemetry, "span", fake_span) + monkeypatch.setattr(telemetry_module, "span", fake_span) updated = await entity_service.edit_entity( created.file_path, @@ -81,51 +64,4 @@ async def test_edit_entity_emits_expected_phase_spans(entity_service, monkeypatc ) assert updated.id == created.id - span_names = [name for name, _ in spans] - _assert_names_in_order( - span_names, - [ - "entity_service.edit.resolve_entity", - "entity_service.edit.read_file", - "file_service.read", - "entity_service.edit.apply_operation", - "entity_service.edit.write_file", - "file_service.write", - "entity_service.edit.parse_markdown", - "entity_service.edit.upsert_entity", - "entity_service.edit.update_checksum", - ], - ) - - -@pytest.mark.asyncio -async def test_reindex_entity_emits_expected_phase_spans(entity_service, monkeypatch) -> None: - created = await entity_service.create_entity( - EntitySchema( - title="Telemetry Reindex", - directory="notes", - note_type="note", - content_type="text/markdown", - content="Reindex telemetry content", - ) - ) - - spans, fake_span = _capture_spans() - monkeypatch.setattr(entity_service_module.telemetry, "span", fake_span) - - await entity_service.reindex_entity(created.id) - - span_names = [name for name, _ in spans] - _assert_names_in_order( - span_names, - [ - "entity_service.reindex.load_entity", - "entity_service.reindex.read_file", - "file_service.read_content", - "entity_service.reindex.parse_markdown", - "entity_service.reindex.upsert_entity", - "entity_service.reindex.update_checksum", - ], - ) - if entity_service.search_service is not None: - assert "entity_service.reindex.search_index" in span_names + assert [name for name, _ in spans] == ["file_service.read", "file_service.write"] diff --git a/tests/services/test_task_scheduler_semantic.py b/tests/services/test_task_scheduler_semantic.py index 6dd0233c..d6fd6ce4 100644 --- a/tests/services/test_task_scheduler_semantic.py +++ b/tests/services/test_task_scheduler_semantic.py @@ -1,4 +1,4 @@ -"""Task scheduler semantic indexing tests.""" +"""Task scheduler tests for derived async work.""" import asyncio from pathlib import Path @@ -10,14 +10,6 @@ from basic_memory.deps.services import get_task_scheduler -class StubEntityService: - def __init__(self) -> None: - self.reindexed: list[int] = [] - - async def reindex_entity(self, entity_id: int) -> None: - self.reindexed.append(entity_id) - - class StubSyncService: def __init__(self) -> None: self.resolved: list[int] = [] @@ -43,9 +35,8 @@ async def reindex_all(self) -> None: @pytest.mark.asyncio -async def test_reindex_entity_task_chains_vector_sync_when_semantic_enabled(tmp_path): - """Reindex task should enqueue vector sync when semantic mode is enabled.""" - entity_service = StubEntityService() +async def test_sync_entity_vectors_task_maps_to_search_service(tmp_path): + """Explicit sync_entity_vectors task should call SearchService sync method.""" sync_service = StubSyncService() search_service = StubSearchService() app_config = BasicMemoryConfig( @@ -57,7 +48,6 @@ async def test_reindex_entity_task_chains_vector_sync_when_semantic_enabled(tmp_ project_config = ProjectConfig(name="test-project", home=tmp_path) scheduler = await get_task_scheduler( - entity_service=cast(Any, entity_service), sync_service=cast(Any, sync_service), search_service=cast(Any, search_service), project_config=project_config, @@ -65,47 +55,15 @@ async def test_reindex_entity_task_chains_vector_sync_when_semantic_enabled(tmp_ ) # Enable background tasks for this test — uses stubs, no real DB race risk cast(Any, scheduler)._test_mode = False - scheduler.schedule("reindex_entity", entity_id=42) - await asyncio.sleep(0.05) - - assert entity_service.reindexed == [42] - assert search_service.vector_synced == [42] - - -@pytest.mark.asyncio -async def test_reindex_entity_task_skips_vector_sync_when_semantic_disabled(tmp_path): - """Reindex task should not enqueue vector sync when semantic mode is disabled.""" - entity_service = StubEntityService() - sync_service = StubSyncService() - search_service = StubSearchService() - app_config = BasicMemoryConfig( - env="test", - projects={"test-project": str(tmp_path)}, - default_project="test-project", - semantic_search_enabled=False, - ) - project_config = ProjectConfig(name="test-project", home=tmp_path) - - scheduler = await get_task_scheduler( - entity_service=cast(Any, entity_service), - sync_service=cast(Any, sync_service), - search_service=cast(Any, search_service), - project_config=project_config, - app_config=app_config, - ) - # Enable background tasks for this test — uses stubs, no real DB race risk - cast(Any, scheduler)._test_mode = False - scheduler.schedule("reindex_entity", entity_id=42) + scheduler.schedule("sync_entity_vectors", entity_id=7) await asyncio.sleep(0.05) - assert entity_service.reindexed == [42] - assert search_service.vector_synced == [] + assert search_service.vector_synced == [7] @pytest.mark.asyncio -async def test_sync_entity_vectors_task_maps_to_search_service(tmp_path): - """Explicit sync_entity_vectors task should call SearchService sync method.""" - entity_service = StubEntityService() +async def test_sync_project_task_maps_to_sync_service(tmp_path): + """Explicit sync_project task should call SyncService sync method.""" sync_service = StubSyncService() search_service = StubSearchService() app_config = BasicMemoryConfig( @@ -117,15 +75,13 @@ async def test_sync_entity_vectors_task_maps_to_search_service(tmp_path): project_config = ProjectConfig(name="test-project", home=tmp_path) scheduler = await get_task_scheduler( - entity_service=cast(Any, entity_service), sync_service=cast(Any, sync_service), search_service=cast(Any, search_service), project_config=project_config, app_config=app_config, ) - # Enable background tasks for this test — uses stubs, no real DB race risk cast(Any, scheduler)._test_mode = False - scheduler.schedule("sync_entity_vectors", entity_id=7) + scheduler.schedule("sync_project", force_full=True) await asyncio.sleep(0.05) - assert search_service.vector_synced == [7] + assert sync_service.synced == [(str(tmp_path), "test-project", True)] diff --git a/tests/services/test_upsert_entity_optimization.py b/tests/services/test_upsert_entity_optimization.py index b6e0499d..e50d3f97 100644 --- a/tests/services/test_upsert_entity_optimization.py +++ b/tests/services/test_upsert_entity_optimization.py @@ -9,8 +9,6 @@ from __future__ import annotations -import importlib -from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from types import SimpleNamespace @@ -27,9 +25,6 @@ from basic_memory.schemas import Entity as EntitySchema from basic_memory.services.entity_service import EntityService -entity_service_module = importlib.import_module("basic_memory.services.entity_service") - - # --- Helpers --- @@ -48,17 +43,6 @@ def _make_markdown( ) -def _capture_spans(): - spans: list[tuple[str, dict]] = [] - - @contextmanager - def fake_span(name: str, **attrs): - spans.append((name, attrs)) - yield - - return spans, fake_span - - # --- Optimization 1: No redundant get_by_file_path in update_entity_relations --- @@ -167,80 +151,6 @@ async def fake_resolve_link(link_text: str, **kwargs): ] -# --- Telemetry sub-spans --- - - -@pytest.mark.asyncio -async def test_upsert_update_emits_sub_spans(entity_service: EntityService, monkeypatch): - """upsert_entity_from_markdown (update path) should emit sub-spans for each DB phase.""" - entity = await entity_service.create_entity( - EntitySchema( - title="Span Test", - directory="notes", - note_type="note", - content="# Span Test\n\n## Observations\n- [fact] original", - ) - ) - - spans, fake_span = _capture_spans() - monkeypatch.setattr(entity_service_module.telemetry, "span", fake_span) - - markdown = _make_markdown( - title="Span Test", - observations=[MarkdownObservation(content="updated", category="fact")], - ) - await entity_service.upsert_entity_from_markdown(Path(entity.file_path), markdown, is_new=False) - - span_names = [name for name, _ in spans] - - # update_entity_and_observations sub-spans - assert "upsert.update.fetch_entity" in span_names - assert "upsert.update.delete_observations" in span_names - assert "upsert.update.insert_observations" in span_names - assert "upsert.update.save_entity" in span_names - - # update_entity_relations sub-spans - assert "upsert.relations.delete_existing" in span_names - assert "upsert.relations.reload_entity" in span_names - - -@pytest.mark.asyncio -async def test_upsert_with_relations_emits_resolve_and_insert_spans( - entity_service: EntityService, monkeypatch -): - """When relations exist, resolve_links and insert_relations spans should be emitted.""" - # Create two entities so the relation can resolve - await entity_service.create_entity( - EntitySchema( - title="Target Entity", - directory="notes", - note_type="note", - content="# Target Entity", - ) - ) - source = await entity_service.create_entity( - EntitySchema( - title="Source Entity", - directory="notes", - note_type="note", - content="# Source Entity", - ) - ) - - spans, fake_span = _capture_spans() - monkeypatch.setattr(entity_service_module.telemetry, "span", fake_span) - - markdown = _make_markdown( - title="Source Entity", - relations=[MarkdownRelation(type="links_to", target="Target Entity")], - ) - await entity_service.upsert_entity_from_markdown(Path(source.file_path), markdown, is_new=False) - - span_names = [name for name, _ in spans] - assert "upsert.relations.resolve_links" in span_names - assert "upsert.relations.insert_relations" in span_names - - @pytest.mark.asyncio async def test_upsert_with_relations_uses_lightweight_exact_resolution( entity_service: EntityService, monkeypatch From cc68cebd0f02571bda81758625f431a7f665979b Mon Sep 17 00:00:00 2001 From: phernandez Date: Mon, 13 Apr 2026 19:13:15 -0500 Subject: [PATCH 3/3] fix(core): fail prepend on malformed frontmatter Signed-off-by: phernandez --- src/basic_memory/services/entity_service.py | 45 +++++------ tests/services/test_entity_service_prepare.py | 81 ++++++++++++++++++- 2 files changed, 101 insertions(+), 25 deletions(-) diff --git a/src/basic_memory/services/entity_service.py b/src/basic_memory/services/entity_service.py index 6ad312ab..3df531c4 100644 --- a/src/basic_memory/services/entity_service.py +++ b/src/basic_memory/services/entity_service.py @@ -1262,33 +1262,30 @@ def insert_relative_to_section( def _prepend_after_frontmatter(self, current_content: str, content: str) -> str: """Prepend content after frontmatter, preserving frontmatter structure.""" - # Check if file has frontmatter + # Trigger: the note starts with frontmatter delimiters. + # Why: prepend must preserve the existing YAML block and insert content into the body, + # not silently rewrite malformed metadata into a corrupted accepted note state. + # Outcome: valid frontmatter is preserved, and malformed frontmatter fails fast. if has_frontmatter(current_content): - try: - # Parse and separate frontmatter from body - frontmatter_data = parse_frontmatter(current_content) - body_content = remove_frontmatter(current_content) - - # Prepend content to the body - if content and not content.endswith("\n"): - new_body = content + "\n" + body_content - else: - new_body = content + body_content + # Parse and separate frontmatter from body. Parse errors are intentional caller-visible + # failures so prepare_edit_entity_content can reject unsafe accepted writes. + frontmatter_data = parse_frontmatter(current_content) + body_content = remove_frontmatter(current_content) + + # Prepend content to the body + if content and not content.endswith("\n"): + new_body = content + "\n" + body_content + else: + new_body = content + body_content - # Reconstruct file with frontmatter + prepended body - yaml_fm = yaml.dump(frontmatter_data, sort_keys=False, allow_unicode=True) - return f"---\n{yaml_fm}---\n\n{new_body.strip()}" + # Reconstruct file with frontmatter + prepended body + yaml_fm = yaml.dump(frontmatter_data, sort_keys=False, allow_unicode=True) + return f"---\n{yaml_fm}---\n\n{new_body.strip()}" - except Exception as e: # pragma: no cover - logger.warning( - f"Failed to parse frontmatter during prepend: {e}" - ) # pragma: no cover - # Fall back to simple prepend if frontmatter parsing fails # pragma: no cover - - # No frontmatter or parsing failed - do simple prepend # pragma: no cover - if content and not content.endswith("\n"): # pragma: no cover - return content + "\n" + current_content # pragma: no cover - return content + current_content # pragma: no cover + # No frontmatter means prepend is a plain text edit. + if content and not content.endswith("\n"): + return content + "\n" + current_content + return content + current_content async def move_entity( self, diff --git a/tests/services/test_entity_service_prepare.py b/tests/services/test_entity_service_prepare.py index d2a8cbb3..4cbf1353 100644 --- a/tests/services/test_entity_service_prepare.py +++ b/tests/services/test_entity_service_prepare.py @@ -4,7 +4,7 @@ import pytest -from basic_memory.file_utils import parse_frontmatter +from basic_memory.file_utils import ParseError, parse_frontmatter, remove_frontmatter from basic_memory.schemas import Entity as EntitySchema @@ -128,3 +128,82 @@ async def test_prepare_edit_entity_content_matches_edit_entity_with_content( assert prepared.entity_fields["title"] == result.entity.title assert prepared.entity_fields["note_type"] == result.entity.note_type assert prepared.entity_fields["permalink"] == result.entity.permalink + + +@pytest.mark.asyncio +async def test_prepare_edit_entity_content_prepend_preserves_valid_frontmatter( + entity_service, + file_service, +) -> None: + created = await entity_service.create_entity( + EntitySchema( + title="Prepared Prepend Frontmatter", + directory="notes", + note_type="note", + content="---\nstatus: draft\ntags:\n - one\n---\nOriginal body", + ) + ) + + current_content = await file_service.read_file_content(created.file_path) + prepared = await entity_service.prepare_edit_entity_content( + created, + current_content, + operation="prepend", + content="Prepended line", + ) + + assert parse_frontmatter(prepared.markdown_content) == { + "title": "Prepared Prepend Frontmatter", + "type": "note", + "status": "draft", + "tags": ["one"], + "permalink": created.permalink, + } + assert remove_frontmatter(prepared.markdown_content) == "Prepended line\nOriginal body" + + +@pytest.mark.asyncio +async def test_prepare_edit_entity_content_prepend_fails_for_malformed_frontmatter( + entity_service, +) -> None: + created = await entity_service.create_entity( + EntitySchema( + title="Prepared Prepend Parse Error", + directory="notes", + note_type="note", + content="Original body", + ) + ) + + malformed_content = "---\nstatus: [draft\n---\nOriginal body" + + with pytest.raises(ParseError, match="Invalid YAML in frontmatter"): + await entity_service.prepare_edit_entity_content( + created, + malformed_content, + operation="prepend", + content="Prepended line", + ) + + +@pytest.mark.asyncio +async def test_prepare_edit_entity_content_prepend_without_frontmatter_uses_simple_prepend( + entity_service, +) -> None: + created = await entity_service.create_entity( + EntitySchema( + title="Prepared Prepend Simple", + directory="notes", + note_type="note", + content="Original body", + ) + ) + + prepared = await entity_service.prepare_edit_entity_content( + created, + "Original body", + operation="prepend", + content="Prepended line", + ) + + assert prepared.markdown_content == "Prepended line\nOriginal body"