-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathrepository.py
More file actions
146 lines (122 loc) · 4.48 KB
/
repository.py
File metadata and controls
146 lines (122 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import json
import sqlite3
from uuid import uuid4
from .models.document import Document
from .settings import Settings
class Repository:
def __init__(self, conn: sqlite3.Connection, settings: Settings):
self._conn = conn
self._settings = settings
def add_document(self, document: Document) -> str:
"""Add a text content to the database"""
cursor = self._conn.cursor()
document_id = str(uuid4())
cursor.execute(
"INSERT INTO documents (id, hash, content, uri, metadata, created_at, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
(
document_id,
document.hash(),
document.content,
document.uri,
json.dumps(document.metadata),
),
)
for chunk in document.chunks:
cursor.execute(
"INSERT INTO chunks (document_id, content, embedding) VALUES (?, ?, ?)",
(document_id, chunk.content, chunk.embedding),
)
chunk_id = cursor.lastrowid
cursor.execute(
"INSERT INTO chunks_fts (rowid, content) VALUES (?, ?)",
(chunk_id, chunk.content),
)
for sentence in chunk.sentences:
cursor.execute(
"INSERT INTO sentences (id, chunk_id, content, embedding, start_offset, end_offset) VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid4()),
chunk_id,
sentence.content,
sentence.embedding,
sentence.start_offset,
sentence.end_offset,
),
)
self._conn.commit()
return document_id
def list_documents(self) -> list[Document]:
"""List all documents in the database"""
cursor = self._conn.cursor()
cursor.execute("SELECT id, content, uri, metadata FROM documents")
rows = cursor.fetchall()
documents = []
for row in rows:
doc_id, content, uri, metadata = row
documents.append(
Document(
id=doc_id,
content=content,
uri=uri,
metadata=json.loads(metadata),
)
)
return documents
def find_document_by_id_or_uri(self, identifier: str) -> Document | None:
"""Find document by ID or URI"""
cursor = self._conn.cursor()
cursor.execute(
"SELECT id, content, uri, metadata, created_at FROM documents WHERE id = ? OR uri = ?",
(identifier, identifier),
)
row = cursor.fetchone()
if row:
doc_id, content, uri, metadata, created_at = row
return Document(
id=doc_id,
content=content,
uri=uri,
metadata=json.loads(metadata),
created_at=created_at,
)
return None
def document_exists_by_hash(self, hash: str) -> bool:
"""Check if a document with the given hash exists"""
cursor = self._conn.cursor()
cursor.execute("SELECT 1 FROM documents WHERE hash = ?", (hash,))
return cursor.fetchone() is not None
def remove_document(self, document_id: str) -> bool:
"""Remove document and its related resources by document ID"""
cursor = self._conn.cursor()
# Check if document exists
cursor.execute(
"SELECT COUNT(*) AS total FROM documents WHERE id = ?", (document_id,)
)
if cursor.fetchone()["total"] == 0:
return False
# Delete sentences
cursor.execute(
"""
DELETE FROM sentences
WHERE chunk_id IN (
SELECT id FROM chunks WHERE document_id = ?
)
""",
(document_id,),
)
# Delete chunks FTS
cursor.execute(
"""
DELETE FROM chunks_fts
WHERE rowid IN (
SELECT rowid FROM chunks WHERE document_id = ?
)
""",
(document_id,),
)
# Delete chunks
cursor.execute("DELETE FROM chunks WHERE document_id = ?", (document_id,))
# Remove document
cursor.execute("DELETE FROM documents WHERE id = ?", (document_id,))
self._conn.commit()
return True