-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetl.py
More file actions
130 lines (113 loc) · 5.19 KB
/
etl.py
File metadata and controls
130 lines (113 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
import os
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from llama_index.core import Document
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
from hivemind_etl.mediawiki.transform_xml import parse_mediawiki_xml
from hivemind_etl.mediawiki.wikiteam_crawler import WikiteamCrawler
class MediawikiETL:
def __init__(
self,
community_id: str,
namespaces: list[int],
platform_id: str,
delete_dump_after_load: bool = True,
) -> None:
self.community_id = community_id
self.platform_id = platform_id
self.proxy_url = os.getenv("PROXY_URL", "")
if self.proxy_url:
logging.info(f"Proxy is set to be used!")
self.wikiteam_crawler = WikiteamCrawler(
community_id, namespaces=namespaces, proxy_url=self.proxy_url
)
self.dump_dir = f"dumps/{self.community_id}"
self.delete_dump_after_load = delete_dump_after_load
def extract(self, api_url: str, dump_dir: str | None = None) -> None:
if dump_dir is None:
dump_dir = self.dump_dir
else:
self.dump_dir = dump_dir
try:
self.wikiteam_crawler.crawl(api_url, dump_dir)
except Exception as e:
logging.error(f"Error crawling {api_url}: {e}")
logging.warning("Removing incomplete dumped data if available!")
if os.path.exists(dump_dir):
shutil.rmtree(dump_dir)
raise e
def transform(self) -> list[Document]:
pages = parse_mediawiki_xml(file_dir=self.dump_dir)
documents: list[Document] = []
for page in pages:
try:
# Generate a ref_doc_id if needed for newer llama-index versions
doc_id = page.page_id
documents.append(
Document(
doc_id=doc_id,
text=page.revision.text,
metadata={
"title": page.title,
"namespace": page.namespace,
"revision_id": page.revision.revision_id,
"parent_revision_id": page.revision.parent_revision_id,
"timestamp": page.revision.timestamp,
"comment": page.revision.comment,
"contributor_username": page.revision.contributor.username,
"contributor_user_id": page.revision.contributor.user_id,
"sha1": page.revision.sha1,
"model": page.revision.model,
"ref_doc_id": doc_id, # Add ref_doc_id to metadata
},
excluded_embed_metadata_keys=[
"namespace",
"revision_id",
"parent_revision_id",
"sha1",
"model",
"contributor_user_id",
"comment",
"timestamp",
],
excluded_llm_metadata_keys=[
"namespace",
"revision_id",
"parent_revision_id",
"sha1",
"model",
"contributor_user_id",
],
)
)
except Exception as e:
logging.error(f"Error transforming page {page.page_id}: {e}")
return documents
def load(self, documents: list[Document]) -> None:
logging.info(f"Loading {len(documents)} documents into Qdrant!")
ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name=self.platform_id
)
# Process batches in parallel using ThreadPoolExecutor
batch_size = 1000
batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)]
with ThreadPoolExecutor(max_workers=10) as executor:
# Submit all batch processing tasks
future_to_batch = {
executor.submit(ingestion_pipeline.run_pipeline, batch): i
for i, batch in enumerate(batches)
}
# Process completed batches and handle any errors
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
future.result() # This will raise any exceptions that occurred
logging.info(f"Successfully loaded batch {batch_idx} of {len(batches)} documents into Qdrant!")
except Exception as e:
logging.error(f"Error processing batch {batch_idx}: {e}")
raise # Re-raise the exception to stop the process
logging.info(f"Loaded {len(documents)} documents into Qdrant!")
if self.delete_dump_after_load:
logging.info(f"Removing dump directory {self.dump_dir}!")
shutil.rmtree(self.dump_dir)