-
-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathviews.py
More file actions
356 lines (305 loc) · 15.5 KB
/
views.py
File metadata and controls
356 lines (305 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
import os
import json
import logging
import time
from typing import Callable
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import AllowAny
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from drf_spectacular.utils import extend_schema, inline_serializer
from rest_framework import serializers as drf_serializers
from openai import OpenAI
from ...services.embedding_services import get_closest_embeddings
from ...services.conversions_services import convert_uuids
# Configure logging
logger = logging.getLogger(__name__)
GPT_5_NANO_PRICING_DOLLARS_PER_MILLION_TOKENS = {"input": 0.05, "output": 0.40}
def calculate_cost_metrics(token_usage: dict, pricing: dict) -> dict:
"""
Calculate cost metrics based on token usage and pricing
Args:
token_usage: Dictionary containing input_tokens and output_tokens
pricing: Dictionary containing input and output pricing per million tokens
Returns:
Dictionary containing input_cost, output_cost, and total_cost in USD
"""
TOKENS_PER_MILLION = 1_000_000
# Pricing is in dollars per million tokens
input_cost_dollars = (pricing["input"] / TOKENS_PER_MILLION) * token_usage.get(
"input_tokens", 0
)
output_cost_dollars = (pricing["output"] / TOKENS_PER_MILLION) * token_usage.get(
"output_tokens", 0
)
total_cost_dollars = input_cost_dollars + output_cost_dollars
return {
"input_cost": input_cost_dollars,
"output_cost": output_cost_dollars,
"total_cost": total_cost_dollars,
}
# Open AI Cookbook: Handling Function Calls with Reasoning Models
# https://cookbook.openai.com/examples/reasoning_function_calls
def invoke_functions_from_response(
response, tool_mapping: dict[str, Callable]
) -> list[dict]:
"""Extract all function calls from the response, look up the corresponding tool function(s) and execute them.
(This would be a good place to handle asynchroneous tool calls, or ones that take a while to execute.)
This returns a list of messages to be added to the conversation history.
Parameters
----------
response : OpenAI Response
The response object from OpenAI containing output items that may include function calls
tool_mapping : dict[str, Callable]
A dictionary mapping function names (as strings) to their corresponding Python functions.
Keys should match the function names defined in the tools schema.
Returns
-------
list[dict]
List of function call output messages formatted for the OpenAI conversation.
Each message contains:
- type: "function_call_output"
- call_id: The unique identifier for the function call
- output: The result returned by the executed function (string or error message)
"""
intermediate_messages = []
for response_item in response.output:
if response_item.type == "function_call":
target_tool = tool_mapping.get(response_item.name)
if target_tool:
try:
arguments = json.loads(response_item.arguments)
logger.info(
f"Invoking tool: {response_item.name} with arguments: {arguments}"
)
tool_output = target_tool(**arguments)
logger.info(f"Tool {response_item.name} completed successfully")
except Exception as e:
msg = f"Error executing function call: {response_item.name}: {e}"
tool_output = msg
logger.error(msg, exc_info=True)
else:
msg = f"ERROR - No tool registered for function call: {response_item.name}"
tool_output = msg
logger.error(msg)
intermediate_messages.append(
{
"type": "function_call_output",
"call_id": response_item.call_id,
"output": tool_output,
}
)
elif response_item.type == "reasoning":
logger.info(f"Reasoning step: {response_item.summary}")
return intermediate_messages
@method_decorator(csrf_exempt, name="dispatch")
class Assistant(APIView):
permission_classes = [AllowAny]
@extend_schema(
request=inline_serializer(name='AssistantRequest', fields={
'message': drf_serializers.CharField(help_text='User message to send to the assistant'),
'previous_response_id': drf_serializers.CharField(required=False, allow_null=True, help_text='ID of previous response for conversation continuity'),
}),
responses={
200: inline_serializer(name='AssistantResponse', fields={
'response_output_text': drf_serializers.CharField(),
'final_response_id': drf_serializers.CharField(),
}),
500: inline_serializer(name='AssistantError', fields={
'error': drf_serializers.CharField(),
}),
}
)
def post(self, request):
try:
user = request.user
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
TOOL_DESCRIPTION = """
Search the user's uploaded documents for information relevant to answering their question.
Call this function when you need to find specific information from the user's documents
to provide an accurate, citation-backed response. Always search before answering questions
about document content.
"""
TOOL_PROPERTY_DESCRIPTION = """
A specific search query to find relevant information in the user's documents.
Use keywords, phrases, or questions related to what the user is asking about.
Be specific rather than generic - use terms that would appear in the relevant documents.
"""
tools = [
{
"type": "function",
"name": "search_documents",
"description": TOOL_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": TOOL_PROPERTY_DESCRIPTION,
}
},
"required": ["query"],
},
}
]
def search_documents(query: str, user=user) -> str:
"""
Search through user's uploaded documents using semantic similarity.
This function performs vector similarity search against the user's document corpus
and returns formatted results with context information for the LLM to use.
Parameters
----------
query : str
The search query string
user : User
The authenticated user whose documents to search
Returns
-------
str
Formatted search results containing document excerpts with metadata
Raises
------
Exception
If embedding search fails
"""
try:
embeddings_results = get_closest_embeddings(
user=user, message_data=query.strip()
)
embeddings_results = convert_uuids(embeddings_results)
if not embeddings_results:
return "No relevant documents found for your query. Please try different search terms or upload documents first."
# Format results with clear structure and metadata
prompt_texts = [
f"[Document {i + 1} - File: {obj['file_id']}, Name: {obj['name']}, Page: {obj['page_number']}, Chunk: {obj['chunk_number']}, Similarity: {1 - obj['distance']:.3f}]\n{obj['text']}\n[End Document {i + 1}]"
for i, obj in enumerate(embeddings_results)
]
return "\n\n".join(prompt_texts)
except Exception as e:
return f"Error searching documents: {str(e)}. Please try again if the issue persists."
INSTRUCTIONS = """
You are an AI assistant that helps users find and understand information about bipolar disorder
from your internal library of bipolar disorder research sources using semantic search.
IMPORTANT CONTEXT:
- You have access to a library of sources that the user CANNOT see
- The user did not upload these sources and doesn't know about them
- You must explain what information exists in your sources and provide clear references
TOPIC RESTRICTIONS:
When a prompt is received that is unrelated to bipolar disorder, mental health treatment,
or psychiatric medications, respond by saying you are limited to bipolar-specific conversations.
SEMANTIC SEARCH STRATEGY:
- Always perform semantic search using the search_documents function when users ask questions
- Use conceptually related terms and synonyms, not just exact keyword matches
- Search for the meaning and context of the user's question, not just literal words
- Consider medical terminology, lay terms, and related conditions when searching
FUNCTION USAGE:
- When a user asks about information that might be in your source library, ALWAYS use the search_documents function first
- Perform semantic searches using concepts, symptoms, treatments, and related terms from the user's question
- Only provide answers based on information found through your source searches
RESPONSE FORMAT:
After gathering information through semantic searches, provide responses that:
1. Answer the user's question directly using only the found information
2. Structure responses with clear sections and paragraphs
3. Explain what information you found in your sources and provide context
4. Include citations using this exact format: [Name {name}, Page {page_number}]
5. Only cite information that directly supports your statements
If no relevant information is found in your source library, clearly state that the information
is not available in your current sources.
REMEMBER: You are working with an internal library of bipolar disorder sources that the user
cannot see. Always search these sources first, explain what you found, and provide proper citations.
"""
MODEL_DEFAULTS = {
"instructions": INSTRUCTIONS,
"model": "gpt-5-nano", # 400,000 token context window
# A summary of the reasoning performed by the model. This can be useful for debugging and understanding the model's reasoning process.
"reasoning": {"effort": "low", "summary": None},
"tools": tools,
}
# We fetch a response and then kick off a loop to handle the response
message = request.data.get("message", None)
previous_response_id = request.data.get("previous_response_id", None)
# Track total duration and cost metrics
start_time = time.time()
total_token_usage = {"input_tokens": 0, "output_tokens": 0}
if not previous_response_id:
response = client.responses.create(
input=[
{"type": "message", "role": "user", "content": str(message)}
],
**MODEL_DEFAULTS,
)
else:
response = client.responses.create(
input=[
{"type": "message", "role": "user", "content": str(message)}
],
previous_response_id=str(previous_response_id),
**MODEL_DEFAULTS,
)
# Accumulate token usage from initial response
if hasattr(response, "usage"):
total_token_usage["input_tokens"] += getattr(
response.usage, "input_tokens", 0
)
total_token_usage["output_tokens"] += getattr(
response.usage, "output_tokens", 0
)
# Open AI Cookbook: Handling Function Calls with Reasoning Models
# https://cookbook.openai.com/examples/reasoning_function_calls
while True:
# Mapping of the tool names we tell the model about and the functions that implement them
function_responses = invoke_functions_from_response(
response, tool_mapping={"search_documents": search_documents}
)
if len(function_responses) == 0: # We're done reasoning
logger.info("Reasoning completed")
final_response_output_text = response.output_text
final_response_id = response.id
logger.info(f"Final response: {final_response_output_text}")
break
else:
logger.info("More reasoning required, continuing...")
response = client.responses.create(
input=function_responses,
previous_response_id=response.id,
**MODEL_DEFAULTS,
)
# Accumulate token usage from reasoning iterations
if hasattr(response, "usage"):
total_token_usage["input_tokens"] += getattr(
response.usage, "input_tokens", 0
)
total_token_usage["output_tokens"] += getattr(
response.usage, "output_tokens", 0
)
# Calculate total duration and cost metrics
total_duration = time.time() - start_time
cost_metrics = calculate_cost_metrics(
total_token_usage, GPT_5_NANO_PRICING_DOLLARS_PER_MILLION_TOKENS
)
# Log cost and duration metrics
logger.info(
f"Request completed: "
f"Duration: {total_duration:.2f}s, "
f"Input tokens: {total_token_usage['input_tokens']}, "
f"Output tokens: {total_token_usage['output_tokens']}, "
f"Total cost: ${cost_metrics['total_cost']:.6f}"
)
return Response(
{
"response_output_text": final_response_output_text,
"final_response_id": final_response_id,
},
status=status.HTTP_200_OK,
)
except Exception as e:
logger.error(
f"Unexpected error in Assistant view for user {request.user.id if hasattr(request, 'user') else 'unknown'}: {e}",
exc_info=True,
)
return Response(
{"error": "An unexpected error occurred. Please try again later."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)