-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsite_etl.py
More file actions
111 lines (91 loc) · 3.07 KB
/
website_etl.py
File metadata and controls
111 lines (91 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
from typing import Any
from hivemind_etl.website.crawlee_client import CrawleeClient
from llama_index.core import Document
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
class WebsiteETL:
def __init__(
self,
community_id: str,
platform_id: str,
) -> None:
"""
Parameters
-----------
community_id : str
the community to save its data
platform_id : str
the platform to save its data
Note: the collection name would be `community_id_platform_id`
"""
if not community_id or not isinstance(community_id, str):
raise ValueError("community_id must be a non-empty string")
self.community_id = community_id
self.platform_id = platform_id
# preparing the ingestion pipeline
self.ingestion_pipeline = CustomIngestionPipeline(
self.community_id, collection_name=self.platform_id, use_cache=False,
)
async def extract(
self,
urls: list[str],
) -> list[dict[str, Any]]:
"""
Extract given urls
Parameters
-----------
urls : list[str]
a list of urls
Returns
---------
extracted_data : list[dict[str, Any]]
The crawled data from urls
"""
if not urls:
raise ValueError("No URLs provided for crawling")
extracted_data = []
for url in urls:
self.crawlee_client = CrawleeClient()
logging.info(f"Crawling {url} and its routes!")
data = await self.crawlee_client.crawl(links=[url])
logging.info(f"{len(data)} data is extracted for route: {url}")
extracted_data.extend(data)
logging.info(f"Extracted {len(extracted_data)} documents!")
if not extracted_data:
raise ValueError(f"No data extracted from URLs: {urls}")
return extracted_data
def transform(self, raw_data: list[dict[str, Any]]) -> list[Document]:
"""
transform raw data to llama-index documents
Parameters
------------
raw_data : list[dict[str, Any]]
crawled data
Returns
---------
documents : list[llama_index.Document]
list of llama-index documents
"""
documents: list[Document] = []
for data in raw_data:
doc_id = data["url"]
doc = Document(
doc_id=doc_id,
text=data["inner_text"],
metadata={
"title": data["title"],
"url": data["url"],
},
)
documents.append(doc)
return documents
def load(self, documents: list[Document]) -> None:
"""
load the documents into the vector db
Parameters
-------------
documents: list[llama_index.Document]
the llama-index documents to be ingested
"""
# loading data into db
self.ingestion_pipeline.run_pipeline(docs=documents)