Skip to content

Commit 4ce2198

Browse files
phernandezclaude
andcommitted
fix: use upsert to prevent IntegrityError during parallel search indexing
Replace delete-then-insert pattern with INSERT ... ON CONFLICT for PostgreSQL search index operations. This fixes race conditions where parallel entity indexing could cause UniqueViolationError on the uix_search_index_permalink_project constraint. Changes: - Add index_item() override in PostgresSearchRepository with upsert - Update bulk_index_items() to use ON CONFLICT (permalink, project_id) - Add CREATE_POSTGRES_SEARCH_INDEX_PERMALINK DDL for test fixtures - Add tests for upsert behavior on duplicate permalinks Technical note: Use column-based ON CONFLICT syntax instead of ON CONSTRAINT (which only works for table constraints, not indexes). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> Signed-off-by: phernandez <paul@basicmachines.co>
1 parent eb7fbaf commit 4ce2198

5 files changed

Lines changed: 200 additions & 11 deletions

File tree

src/basic_memory/models/search.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@
4848
CREATE INDEX IF NOT EXISTS idx_search_index_metadata_gin ON search_index USING gin(metadata jsonb_path_ops)
4949
""")
5050

51+
# Partial unique index on (permalink, project_id) for non-null permalinks
52+
# This prevents duplicate permalinks per project and is used by upsert operations
53+
# in PostgresSearchRepository to handle race conditions during parallel indexing
54+
CREATE_POSTGRES_SEARCH_INDEX_PERMALINK = DDL("""
55+
CREATE UNIQUE INDEX IF NOT EXISTS uix_search_index_permalink_project
56+
ON search_index (permalink, project_id)
57+
WHERE permalink IS NOT NULL
58+
""")
59+
5160
# Define FTS5 virtual table creation for SQLite only
5261
# This DDL is executed separately for SQLite databases
5362
CREATE_SEARCH_INDEX = DDL("""

src/basic_memory/repository/postgres_search_repository.py

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ class PostgresSearchRepository(SearchRepositoryBase):
2424
- GIN indexes for performance
2525
- ts_rank() function for relevance scoring
2626
- JSONB containment operators for metadata search
27+
28+
Note: This implementation uses UPSERT patterns (INSERT ... ON CONFLICT) instead of
29+
delete-then-insert to handle race conditions during parallel entity indexing.
30+
The partial unique index uix_search_index_permalink_project prevents duplicate
31+
permalinks per project.
2732
"""
2833

2934
async def init_search_index(self):
@@ -41,6 +46,63 @@ async def init_search_index(self):
4146
# - CREATE INDEX USING GIN on metadata jsonb_path_ops
4247
pass
4348

49+
async def index_item(self, search_index_row: SearchIndexRow) -> None:
50+
"""Index or update a single item using UPSERT.
51+
52+
Uses INSERT ... ON CONFLICT to handle race conditions during parallel
53+
entity indexing. The partial unique index uix_search_index_permalink_project
54+
on (permalink, project_id) WHERE permalink IS NOT NULL prevents duplicate
55+
permalinks.
56+
57+
For rows with non-null permalinks (entities), conflicts are resolved by
58+
updating the existing row. For rows with null permalinks, no conflict
59+
occurs on this index.
60+
"""
61+
async with db.scoped_session(self.session_maker) as session:
62+
# Serialize JSON for raw SQL
63+
insert_data = search_index_row.to_insert(serialize_json=True)
64+
insert_data["project_id"] = self.project_id
65+
66+
# Use upsert to handle race conditions during parallel indexing
67+
# ON CONFLICT (permalink, project_id) matches the partial unique index
68+
# uix_search_index_permalink_project WHERE permalink IS NOT NULL
69+
# For rows with NULL permalinks, no conflict occurs (partial index doesn't apply)
70+
await session.execute(
71+
text("""
72+
INSERT INTO search_index (
73+
id, title, content_stems, content_snippet, permalink, file_path, type, metadata,
74+
from_id, to_id, relation_type,
75+
entity_id, category,
76+
created_at, updated_at,
77+
project_id
78+
) VALUES (
79+
:id, :title, :content_stems, :content_snippet, :permalink, :file_path, :type, :metadata,
80+
:from_id, :to_id, :relation_type,
81+
:entity_id, :category,
82+
:created_at, :updated_at,
83+
:project_id
84+
)
85+
ON CONFLICT (permalink, project_id) WHERE permalink IS NOT NULL DO UPDATE SET
86+
id = EXCLUDED.id,
87+
title = EXCLUDED.title,
88+
content_stems = EXCLUDED.content_stems,
89+
content_snippet = EXCLUDED.content_snippet,
90+
file_path = EXCLUDED.file_path,
91+
type = EXCLUDED.type,
92+
metadata = EXCLUDED.metadata,
93+
from_id = EXCLUDED.from_id,
94+
to_id = EXCLUDED.to_id,
95+
relation_type = EXCLUDED.relation_type,
96+
entity_id = EXCLUDED.entity_id,
97+
category = EXCLUDED.category,
98+
created_at = EXCLUDED.created_at,
99+
updated_at = EXCLUDED.updated_at
100+
"""),
101+
insert_data,
102+
)
103+
logger.debug(f"indexed row {search_index_row}")
104+
await session.commit()
105+
44106
def _prepare_search_term(self, term: str, is_prefix: bool = True) -> str:
45107
"""Prepare a search term for tsquery format.
46108
@@ -316,10 +378,14 @@ async def search(
316378
async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]) -> None:
317379
"""Index multiple items in a single batch operation using UPSERT.
318380
319-
Uses INSERT ... ON CONFLICT DO UPDATE to handle re-indexing of existing
320-
entities (e.g., during forward reference resolution) without requiring
321-
a separate delete operation. This eliminates race conditions between
322-
delete and insert operations in separate transactions.
381+
Uses INSERT ... ON CONFLICT to handle race conditions during parallel
382+
entity indexing. The partial unique index uix_search_index_permalink_project
383+
on (permalink, project_id) WHERE permalink IS NOT NULL prevents duplicate
384+
permalinks.
385+
386+
For rows with non-null permalinks (entities), conflicts are resolved by
387+
updating the existing row. For rows with null permalinks (observations,
388+
relations), the partial index doesn't apply and they are inserted directly.
323389
324390
Args:
325391
search_index_rows: List of SearchIndexRow objects to index
@@ -338,11 +404,10 @@ async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]) -> Non
338404
insert_data["project_id"] = self.project_id
339405
insert_data_list.append(insert_data)
340406

341-
# Use UPSERT (INSERT ... ON CONFLICT) to handle re-indexing
342-
# Primary key is (id, type, project_id)
343-
# This handles race conditions during forward reference resolution
344-
# where an entity might be re-indexed before the delete commits
345-
# Syntax works for both SQLite 3.24+ and PostgreSQL
407+
# Use upsert to handle race conditions during parallel indexing
408+
# ON CONFLICT (permalink, project_id) matches the partial unique index
409+
# uix_search_index_permalink_project WHERE permalink IS NOT NULL
410+
# For rows with NULL permalinks (observations, relations), no conflict occurs
346411
await session.execute(
347412
text("""
348413
INSERT INTO search_index (
@@ -358,12 +423,13 @@ async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]) -> Non
358423
:created_at, :updated_at,
359424
:project_id
360425
)
361-
ON CONFLICT (id, type, project_id) DO UPDATE SET
426+
ON CONFLICT (permalink, project_id) WHERE permalink IS NOT NULL DO UPDATE SET
427+
id = EXCLUDED.id,
362428
title = EXCLUDED.title,
363429
content_stems = EXCLUDED.content_stems,
364430
content_snippet = EXCLUDED.content_snippet,
365-
permalink = EXCLUDED.permalink,
366431
file_path = EXCLUDED.file_path,
432+
type = EXCLUDED.type,
367433
metadata = EXCLUDED.metadata,
368434
from_id = EXCLUDED.from_id,
369435
to_id = EXCLUDED.to_id,

test-int/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async def engine_factory(
125125
CREATE_POSTGRES_SEARCH_INDEX_TABLE,
126126
CREATE_POSTGRES_SEARCH_INDEX_FTS,
127127
CREATE_POSTGRES_SEARCH_INDEX_METADATA,
128+
CREATE_POSTGRES_SEARCH_INDEX_PERMALINK,
128129
)
129130
from basic_memory import db
130131

@@ -160,6 +161,7 @@ async def engine_factory(
160161
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_TABLE)
161162
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_FTS)
162163
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_METADATA)
164+
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_PERMALINK)
163165

164166
yield engine, session_maker
165167

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ async def engine_factory(
210210
CREATE_POSTGRES_SEARCH_INDEX_TABLE,
211211
CREATE_POSTGRES_SEARCH_INDEX_FTS,
212212
CREATE_POSTGRES_SEARCH_INDEX_METADATA,
213+
CREATE_POSTGRES_SEARCH_INDEX_PERMALINK,
213214
)
214215

215216
# Drop and recreate all tables for test isolation
@@ -223,6 +224,7 @@ async def engine_factory(
223224
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_TABLE)
224225
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_FTS)
225226
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_METADATA)
227+
await conn.execute(CREATE_POSTGRES_SEARCH_INDEX_PERMALINK)
226228

227229
yield engine, session_maker
228230

tests/repository/test_search_repository.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,116 @@ async def test_index_item(search_repository, search_entity):
138138
assert results[0].project_id == search_repository.project_id
139139

140140

141+
@pytest.mark.asyncio
142+
async def test_index_item_upsert_on_duplicate_permalink(search_repository, search_entity):
143+
"""Test that indexing the same permalink twice uses upsert instead of failing.
144+
145+
This tests the fix for the race condition where parallel entity indexing
146+
could cause IntegrityError on the unique permalink constraint.
147+
"""
148+
# First insert
149+
search_row1 = SearchIndexRow(
150+
id=search_entity.id,
151+
type=SearchItemType.ENTITY.value,
152+
title="Original Title",
153+
content_stems="original content",
154+
content_snippet="Original content snippet",
155+
permalink=search_entity.permalink,
156+
file_path=search_entity.file_path,
157+
entity_id=search_entity.id,
158+
metadata={"entity_type": search_entity.entity_type},
159+
created_at=search_entity.created_at,
160+
updated_at=search_entity.updated_at,
161+
project_id=search_repository.project_id,
162+
)
163+
await search_repository.index_item(search_row1)
164+
165+
# Verify first insert worked
166+
results = await search_repository.search(search_text="original")
167+
assert len(results) == 1
168+
assert results[0].title == "Original Title"
169+
170+
# Second insert with same permalink but different content (simulates race condition)
171+
# This should NOT raise IntegrityError - it should upsert (update) instead
172+
search_row2 = SearchIndexRow(
173+
id=search_entity.id,
174+
type=SearchItemType.ENTITY.value,
175+
title="Updated Title",
176+
content_stems="updated content",
177+
content_snippet="Updated content snippet",
178+
permalink=search_entity.permalink, # Same permalink!
179+
file_path=search_entity.file_path,
180+
entity_id=search_entity.id,
181+
metadata={"entity_type": search_entity.entity_type},
182+
created_at=search_entity.created_at,
183+
updated_at=search_entity.updated_at,
184+
project_id=search_repository.project_id,
185+
)
186+
# This should succeed without raising IntegrityError
187+
await search_repository.index_item(search_row2)
188+
189+
# Verify the row was updated, not duplicated
190+
results_after = await search_repository.search(search_text="updated")
191+
assert len(results_after) == 1
192+
assert results_after[0].title == "Updated Title"
193+
194+
# Verify old content is gone (was replaced)
195+
results_old = await search_repository.search(search_text="original")
196+
assert len(results_old) == 0
197+
198+
199+
@pytest.mark.asyncio
200+
async def test_bulk_index_items_upsert_on_duplicate_permalink(search_repository, search_entity):
201+
"""Test that bulk_index_items uses upsert for duplicate permalinks.
202+
203+
This tests the fix for race conditions during bulk entity indexing.
204+
"""
205+
# First bulk insert
206+
search_row1 = SearchIndexRow(
207+
id=search_entity.id,
208+
type=SearchItemType.ENTITY.value,
209+
title="Bulk Original Title",
210+
content_stems="bulk original content",
211+
content_snippet="Bulk original content snippet",
212+
permalink=search_entity.permalink,
213+
file_path=search_entity.file_path,
214+
entity_id=search_entity.id,
215+
metadata={"entity_type": search_entity.entity_type},
216+
created_at=search_entity.created_at,
217+
updated_at=search_entity.updated_at,
218+
project_id=search_repository.project_id,
219+
)
220+
await search_repository.bulk_index_items([search_row1])
221+
222+
# Verify first insert worked
223+
results = await search_repository.search(search_text="bulk original")
224+
assert len(results) == 1
225+
assert results[0].title == "Bulk Original Title"
226+
227+
# Second bulk insert with same permalink (simulates race condition)
228+
search_row2 = SearchIndexRow(
229+
id=search_entity.id,
230+
type=SearchItemType.ENTITY.value,
231+
title="Bulk Updated Title",
232+
content_stems="bulk updated content",
233+
content_snippet="Bulk updated content snippet",
234+
permalink=search_entity.permalink, # Same permalink!
235+
file_path=search_entity.file_path,
236+
entity_id=search_entity.id,
237+
metadata={"entity_type": search_entity.entity_type},
238+
created_at=search_entity.created_at,
239+
updated_at=search_entity.updated_at,
240+
project_id=search_repository.project_id,
241+
)
242+
# This should succeed without raising IntegrityError
243+
await search_repository.bulk_index_items([search_row2])
244+
245+
# Verify the row was updated
246+
results_after = await search_repository.search(search_text="bulk updated")
247+
assert len(results_after) == 1
248+
assert results_after[0].title == "Bulk Updated Title"
249+
250+
141251
@pytest.mark.asyncio
142252
async def test_project_isolation(
143253
search_repository, second_project_repository, search_entity, second_entity

0 commit comments

Comments
 (0)