-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactivities.py
More file actions
351 lines (296 loc) · 11.7 KB
/
activities.py
File metadata and controls
351 lines (296 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import json
import logging
from typing import Any
from datetime import datetime, timedelta, timezone
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
from temporalio import activity, workflow
from qdrant_client.models import Filter, FieldCondition, MatchValue
from qdrant_client.http import models
from openai import AsyncOpenAI
import re
with workflow.unsafe.imports_passed_through():
from hivemind_summarizer.schema import (
PlatformSummariesActivityInput,
PlatformSummariesRangeActivityInput,
RealTimeSummaryWorkflowInput,
)
def extract_summary_text(node_content: dict[str, Any]) -> str:
"""
Extract the actual summary text from the node_content.
Parameters
----------
node_content : dict[str, Any]
The parsed node_content object
Returns
-------
str
The extracted summary text
"""
# Based on the example provided, the text is in the "text" field
if isinstance(node_content, dict) and "text" in node_content:
return node_content["text"]
return "Summary text not found"
@activity.defn
async def fetch_platform_summaries_by_date(
input: PlatformSummariesActivityInput,
) -> list[dict[str, Any]] | str:
"""
Activity that fetches Platform summaries for a specific date from Qdrant.
Parameters
----------
input : PlatformSummariesActivityInput
Input object containing date, collection_name and extract_text_only
Returns
-------
list[dict[str, Any]] | str
A list of summary objects for the specified date or a string of summaries
"""
date = input.date
extract_text_only = input.extract_text_only
community_id = input.community_id
collection_name = f"{community_id}_{input.platform_id}_summary"
logging.info("Started fetch_platform_summaries_by_date!")
if not input.platform_id:
raise ValueError("Platform id is required but was not provided")
logging.info(
f"Fetching summaries for date: {date} from collection: {collection_name}"
)
try:
# Get Qdrant client
qdrant_client = QdrantSingleton.get_instance().get_client()
# Create filter for the specified date
if date is not None:
filter_conditions = [
FieldCondition(key="date", match=MatchValue(value=date))
]
date_filter = Filter(must=filter_conditions)
# Query Qdrant for all summaries matching the date using the provided collection name
search_results = qdrant_client.search(
collection_name=collection_name,
query_vector=[0] * 1024,
query_filter=date_filter,
limit=100,
with_payload=True,
with_vectors=False,
)
else:
# pipeline requires a different format for the collection name
pipeline = CustomIngestionPipeline(
community_id=community_id,
collection_name=f"{input.platform_id}_summary",
)
# get the latest date from the collection
latest_date = pipeline.get_latest_document_date(
field_name="date", field_schema=models.PayloadSchemaType.DATETIME
)
filter_conditions = [
FieldCondition(
key="date", match=MatchValue(value=latest_date.strftime("%Y-%m-%d"))
)
]
date_filter = Filter(must=filter_conditions)
search_results = qdrant_client.search(
collection_name=collection_name,
query_vector=[0] * 1024,
query_filter=date_filter,
limit=100,
with_payload=True,
with_vectors=False,
)
summaries = []
for point in search_results:
# Extract the summary data from each point
summary_data = point.payload
# If _node_content is a JSON string, parse it
if "_node_content" in summary_data and isinstance(
summary_data["_node_content"], str
):
try:
node_content = json.loads(summary_data["_node_content"])
if extract_text_only:
summary_data = extract_summary_text(node_content)
else:
summary_data["parsed_content"] = node_content
summary_data["summary_text"] = extract_summary_text(
node_content
)
except json.JSONDecodeError:
logging.warning(
f"Failed to parse _node_content as JSON for point with date {date}"
)
summaries.append(summary_data)
logging.info(
f"Found {len(summaries)} summaries for date {date} in collection {collection_name}"
)
return "\n".join(summaries) if extract_text_only else summaries
except Exception as e:
logging.error(
f"Error fetching summaries for date {date} from collection {collection_name}: {str(e)}"
)
raise
@activity.defn
async def fetch_platform_summaries_by_date_range(
input: PlatformSummariesRangeActivityInput,
) -> dict[str, list[dict[str, Any] | str]]:
"""
Activity that fetches summaries for a range of dates from Qdrant.
Parameters
----------
input : PlatformSummariesRangeActivityInput
Input object containing start_date, end_date, platform_id and community_id
Returns
-------
dict[str, list[dict[str, Any] | str]]
A dictionary mapping dates to lists of summary objects or a string of summaries
Raises
------
ValueError
If end_date is before start_date or platform_id is not provided
"""
start_date = input.start_date
end_date = input.end_date
extract_text_only = input.extract_text_only
platform_id = input.platform_id
community_id = input.community_id
if not platform_id:
raise ValueError("Platform name is required but was not provided")
logging.info(f"Fetching summaries for date range: {start_date} to {end_date}.")
try:
# Parse the date strings to datetime objects
start = datetime.strptime(start_date, "%Y-%m-%d").date()
end = datetime.strptime(end_date, "%Y-%m-%d").date()
# Validate that end_date is not before start_date
if end < start:
raise ValueError("End date cannot be before start date")
# Calculate all dates in the range
date_range = []
current = start
while current <= end:
date_range.append(current.strftime("%Y-%m-%d"))
current += timedelta(days=1)
# Fetch summaries for each date
result = {}
for date in date_range:
date_input = PlatformSummariesActivityInput(
date=date,
extract_text_only=extract_text_only,
platform_id=input.platform_id,
community_id=community_id,
)
summaries = await fetch_platform_summaries_by_date(date_input)
result[date] = summaries
return result
except Exception as e:
logging.error(
f"Error fetching summaries for date range {start_date} to {end_date}: {str(e)}"
)
raise
@activity.defn
async def fetch_and_summarize_realtime_data(
input: RealTimeSummaryWorkflowInput,
) -> str:
"""
Activity that fetches recent data from Qdrant and generates a real-time summary.
Parameters
----------
input : RealTimeSummaryWorkflowInput
Input containing period, collection_name or platform_id/community_id, and extract_text_only
Returns
-------
str
A summarized text of the recent data
"""
try:
# Get Qdrant client
qdrant_client = QdrantSingleton.get_instance().get_client()
# Determine collection name
collection_name = input.collection_name
if not collection_name and (input.platform_id and input.community_id):
collection_name = f"{input.community_id}_{input.platform_id}"
elif not collection_name:
raise ValueError(
"Either collection_name or both platform_id and community_id must be provided"
)
# Calculate time filter based on period
now = datetime.now(tz=timezone.utc)
if input.period:
if re.match(r"^\d+h$", input.period):
hours = int(input.period[:-1])
time_threshold = now - timedelta(hours=hours)
elif re.match(r"^\d{4}-\d{2}-\d{2}$", input.period):
time_threshold = datetime.strptime(input.period, "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
else:
raise ValueError(
"Period must be in format 'Nh' (e.g., '1h', '4h') or 'YYYY-MM-DD'"
)
else:
# Default to last hour if no period specified
time_threshold = now - timedelta(hours=1)
# Create filter for the time period
filter_conditions = [
FieldCondition(
key="createdAt", range=models.Range(gt=time_threshold.timestamp())
)
]
time_filter = Filter(must=filter_conditions)
# Query Qdrant for recent data
search_results = qdrant_client.search(
collection_name=collection_name,
query_vector=[0]
* 1024, # Using zero vector since we only care about the filter
query_filter=time_filter,
limit=500, # hard limit in case the data was a lot
with_payload=True,
with_vectors=False,
)
if not search_results:
return "No recent data found for the specified period."
logging.info(f"found {len(search_results)} raw data points!")
# Extract text content from the results
texts = []
for point in search_results:
if "_node_content" in point.payload:
content = point.payload["_node_content"]
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
pass
if isinstance(content, dict) and "text" in content:
if "author" in content["metadata"]:
texts.append(
content["metadata"]["author"] + ": " + content["text"]
)
else:
texts.append(content["text"])
if not texts:
return "No text content found in the recent data."
# Combine all texts
combined_text = "\n".join(texts)
logging.info("Starting to summarize...")
# Initialize OpenAI client
client = AsyncOpenAI()
# Generate summary using OpenAI
prompt = (
"Please provide a concise summary of the following content, focusing on the key points and main themes. "
f"\n\nContent:\n{combined_text}"
)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that summarizes content concisely and accurately.",
},
{"role": "user", "content": prompt},
],
temperature=0.3,
n=1,
)
return response.choices[0].message.content
except Exception as e:
logging.error(f"Error in fetch_and_summarize_realtime_data: {str(e)}")
raise