-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrequest.py
More file actions
137 lines (113 loc) · 4.71 KB
/
request.py
File metadata and controls
137 lines (113 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import aiohttp
import aiofiles
import asyncio
from pathlib import Path
from typing import Any, Optional
from ..types import FileInput
from ..models import ModelDefinition
from ..errors import InvalidInputError, ProcessingError, FileTooLargeError, MAX_FILE_SIZE
from .._user_agent import build_user_agent
async def file_input_to_bytes(
input_data: FileInput, session: aiohttp.ClientSession
) -> tuple[bytes, str]:
"""Convert various file input types to bytes asynchronously.
Args:
input_data: The file input (bytes, Path, str, or file-like object)
session: Reusable aiohttp session for URL fetching
Returns:
Tuple of (content bytes, content type)
Raises:
InvalidInputError: If input is invalid or processing fails
"""
if isinstance(input_data, bytes):
return input_data, "application/octet-stream"
if isinstance(input_data, Path):
# Async file reading with aiofiles
try:
async with aiofiles.open(input_data, mode="rb") as f:
content = await f.read()
return content, "application/octet-stream"
except FileNotFoundError:
raise InvalidInputError(f"File not found: {input_data}")
except Exception as e:
raise InvalidInputError(f"Failed to read file {input_data}: {str(e)}")
if isinstance(input_data, str):
# Check if it's a file path
path = Path(input_data)
if path.exists():
try:
async with aiofiles.open(path, mode="rb") as f:
content = await f.read()
return content, "application/octet-stream"
except Exception as e:
raise InvalidInputError(f"Failed to read file {input_data}: {str(e)}")
# Otherwise treat as URL
if not input_data.startswith(("http://", "https://")):
raise InvalidInputError(
f"Input must be a URL (http:// or https://) or existing file path: {input_data}"
)
# Use the provided session instead of creating a new one
async with session.get(input_data) as response:
if not response.ok:
raise InvalidInputError(f"Failed to fetch file from URL: {response.status}")
content = await response.read()
content_type = response.headers.get("Content-Type", "application/octet-stream")
return content, content_type
from ..types import HasRead
if isinstance(input_data, HasRead):
# Sync file-like objects (for backwards compatibility)
content = await asyncio.to_thread(input_data.read)
if isinstance(content, str):
content = content.encode()
return content, "application/octet-stream"
raise InvalidInputError(f"Invalid file input type: {type(input_data)}")
async def send_request(
session: aiohttp.ClientSession,
base_url: str,
api_key: str,
model: ModelDefinition,
inputs: dict[str, Any],
cancel_token: Optional[asyncio.Event] = None,
integration: Optional[str] = None,
) -> bytes:
form_data = aiohttp.FormData()
for key, value in inputs.items():
if value is not None:
if key in ("data", "start", "end"):
content, content_type = await file_input_to_bytes(value, session)
limit = model.max_file_size or MAX_FILE_SIZE
if len(content) > limit:
raise FileTooLargeError(len(content), limit, key)
form_data.add_field(key, content, content_type=content_type)
else:
form_data.add_field(key, str(value))
endpoint = f"{base_url}{model.url_path}"
async def make_request() -> bytes:
async with session.post(
endpoint,
headers={
"X-API-KEY": api_key,
"User-Agent": build_user_agent(integration),
},
data=form_data,
) as response:
if not response.ok:
error_text = await response.text()
raise ProcessingError(f"Processing failed: {response.status} - {error_text}")
return await response.read()
if cancel_token:
request_task = asyncio.create_task(make_request())
cancel_task = asyncio.create_task(cancel_token.wait())
done, pending = await asyncio.wait(
[request_task, cancel_task], return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
if cancel_task in done:
try:
await request_task
except asyncio.CancelledError:
pass
raise asyncio.CancelledError("Request cancelled by user")
return await request_task
return await make_request()