Skip to content

Commit b791eab

Browse files
committed
feat: add simple ingestion workflow and document processing activities
- Introduced IngestionWorkflow for orchestrating document ingestion requests. - Added process_document activity to handle document processing logic. - Created schema for IngestionRequest to define the structure of ingestion requests. - Updated registry and workflows to include new ingestion components.
1 parent a2b8526 commit b791eab

7 files changed

Lines changed: 137 additions & 1 deletion

File tree

hivemind_etl/activities.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
transform_mediawiki_data,
1414
load_mediawiki_data,
1515
)
16+
from hivemind_etl.simple_ingestion.pipeline import (
17+
process_document,
18+
)
1619

1720
from temporalio import activity
1821

hivemind_etl/simple_ingestion/__init__.py

Whitespace-only changes.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
from temporalio.common import RetryPolicy
5+
from temporalio.workflow import execute_activity
6+
from .schema import IngestionRequest
7+
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
8+
from llama_index.core import Document
9+
10+
11+
@workflow.defn
12+
class IngestionWorkflow:
13+
"""A Temporal workflow for processing document ingestion requests.
14+
15+
This workflow handles the orchestration of document processing activities,
16+
including retry logic and timeout configurations.
17+
"""
18+
19+
@workflow.run
20+
async def run(self, ingestion_request: IngestionRequest) -> None:
21+
"""Execute the ingestion workflow.
22+
23+
Parameters
24+
----------
25+
ingestion_request : IngestionRequest
26+
The request containing all necessary information for document processing,
27+
including community ID, platform ID, text content, and metadata.
28+
29+
Notes
30+
-----
31+
The workflow implements a retry policy with the following configuration:
32+
- Initial retry interval: 1 second
33+
- Maximum retry interval: 1 minute
34+
- Maximum retry attempts: 3
35+
- Activity timeout: 5 minutes
36+
"""
37+
retry_policy = RetryPolicy(
38+
initial_interval=timedelta(seconds=1),
39+
maximum_interval=timedelta(minutes=1),
40+
maximum_attempts=3,
41+
)
42+
43+
await execute_activity(
44+
process_document,
45+
ingestion_request,
46+
retry_policy=retry_policy,
47+
start_to_close_timeout=timedelta(minutes=5),
48+
)
49+
50+
51+
@workflow.activity
52+
async def process_document(
53+
ingestion_request: IngestionRequest,
54+
) -> None:
55+
"""Process the document according to the ingestion request specifications.
56+
57+
Parameters
58+
----------
59+
ingestion_request : IngestionRequest
60+
The request containing all necessary information for document processing,
61+
including community ID, platform ID, text content, and metadata.
62+
63+
Notes
64+
-----
65+
This activity will be implemented by the user to handle the actual document
66+
processing logic, including any necessary embedding or LLM operations.
67+
"""
68+
if ingestion_request.collectionName is None:
69+
collection_name = (
70+
f"{ingestion_request.communityId}_{ingestion_request.platformId}"
71+
)
72+
else:
73+
collection_name = ingestion_request.collectionName
74+
75+
# Initialize the ingestion pipeline
76+
pipeline = CustomIngestionPipeline(
77+
community_id=ingestion_request.communityId,
78+
collectionName=collection_name,
79+
)
80+
81+
document = Document(
82+
doc_id=ingestion_request.docId,
83+
text=ingestion_request.text,
84+
metadata=ingestion_request.metadata,
85+
)
86+
87+
pipeline.run_pipeline(docs=[document])
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from pydantic import BaseModel
2+
from uuid import uuid4
3+
4+
5+
class IngestionRequest(BaseModel):
6+
"""A model representing an ingestion request for document processing.
7+
8+
Parameters
9+
----------
10+
communityId : str
11+
The unique identifier of the community.
12+
platformId : str
13+
The unique identifier of the platform.
14+
text : str
15+
The text content to be processed.
16+
metadata : dict
17+
Additional metadata associated with the document.
18+
docId : str, optional
19+
Unique identifier for the document. If not provided, a UUID will be generated.
20+
Default is a new UUID.
21+
excludedEmbedMetadataKeys : list[str], optional
22+
List of metadata keys to exclude from embedding process.
23+
Default is an empty list.
24+
excludedLlmMetadataKeys : list[str], optional
25+
List of metadata keys to exclude from LLM processing.
26+
Default is an empty list.
27+
collectionName : str | None, optional
28+
The name of the collection to use for the document.
29+
Default is `None` means it would follow the default pattern of `[communityId]_[platformId]`
30+
"""
31+
32+
communityId: str
33+
platformId: str
34+
text: str
35+
metadata: dict
36+
docId: str = str(uuid4())
37+
excludedEmbedMetadataKeys: list[str] = []
38+
excludedLlmMetadataKeys: list[str] = []
39+
collectionName: str | None = None

hivemind_etl/website/website_etl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(
1919
the community to save its data
2020
platform_id : str
2121
the platform to save its data
22-
22+
2323
Note: the collection name would be `community_id_platform_id`
2424
"""
2525
if not community_id or not isinstance(community_id, str):

registry.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
get_hivemind_mediawiki_platforms,
99
transform_mediawiki_data,
1010
load_mediawiki_data,
11+
process_document,
1112
)
1213
from hivemind_summarizer.activities import (
1314
fetch_platform_summaries_by_date,
@@ -20,6 +21,7 @@
2021
WebsiteIngestionSchedulerWorkflow,
2122
MediaWikiETLWorkflow,
2223
PlatformSummariesWorkflow,
24+
IngestionWorkflow,
2325
)
2426

2527
WORKFLOWS = [
@@ -28,6 +30,7 @@
2830
WebsiteIngestionSchedulerWorkflow,
2931
MediaWikiETLWorkflow,
3032
PlatformSummariesWorkflow,
33+
IngestionWorkflow,
3134
]
3235

3336
ACTIVITIES = [
@@ -43,4 +46,5 @@
4346
fetch_platform_summaries_by_date,
4447
fetch_platform_summaries_by_date_range,
4548
get_platform_name,
49+
process_document,
4650
]

workflows.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
from hivemind_etl.mediawiki.workflows import (
1111
MediaWikiETLWorkflow,
1212
)
13+
from hivemind_etl.simple_ingestion.pipeline import (
14+
IngestionWorkflow,
15+
)
1316
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
1417

1518
from temporalio import workflow

0 commit comments

Comments
 (0)