Skip to content

Commit fbeccb7

Browse files
abnegateclaude
andcommitted
(refactor): Extract shared HTTP request handling into Handler trait
- Create Handler trait with onRequest, forwardRequest, forwardRawRequest, addTelemetryHeaders, and isValidHostname - Both Swoole and SwooleCoroutine HTTP servers now use the trait - Eliminates ~300 lines of duplicated request-handling logic - Bug fixes in one location now apply to both server implementations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 06deef7 commit fbeccb7

3 files changed

Lines changed: 389 additions & 756 deletions

File tree

src/Server/HTTP/Handler.php

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
<?php
2+
3+
namespace Utopia\Proxy\Server\HTTP;
4+
5+
use Swoole\Coroutine\Channel;
6+
use Swoole\Coroutine\Client as CoroutineClient;
7+
use Swoole\Coroutine\Http\Client as HttpClient;
8+
use Swoole\Http\Request;
9+
use Swoole\Http\Response;
10+
use Utopia\Console;
11+
use Utopia\Proxy\Adapter;
12+
use Utopia\Validator\Hostname;
13+
14+
/**
15+
* Shared HTTP request handling logic used by both
16+
* the event-driven Swoole server and the coroutine server.
17+
*/
18+
trait Handler
19+
{
20+
protected Adapter $adapter;
21+
22+
protected Config $config;
23+
24+
/** @var array<string, Channel> */
25+
protected array $pools = [];
26+
27+
public function onRequest(Request $request, Response $response): void
28+
{
29+
if ($this->config->requestHandler !== null) {
30+
try {
31+
($this->config->requestHandler)($request, $response, $this->adapter);
32+
} catch (\Throwable $e) {
33+
Console::error("Request handler error: {$e->getMessage()}");
34+
$response->status(500);
35+
$response->end('Internal Server Error');
36+
}
37+
38+
return;
39+
}
40+
41+
try {
42+
if ($this->config->directResponse !== null) {
43+
$response->status($this->config->directResponseStatus);
44+
$response->end($this->config->directResponse);
45+
46+
return;
47+
}
48+
49+
$endpoint = is_string($this->config->fixedBackend) ? $this->config->fixedBackend : null;
50+
$result = null;
51+
if ($endpoint === null) {
52+
/** @var array<string, string> $requestHeaders */
53+
$requestHeaders = $request->header ?? [];
54+
$hostname = $requestHeaders['host'] ?? null;
55+
56+
if (!$hostname) {
57+
$response->status(400);
58+
$response->end('Missing Host header');
59+
60+
return;
61+
}
62+
63+
if (!$this->isValidHostname($hostname)) {
64+
$response->status(400);
65+
$response->end('Invalid Host header');
66+
67+
return;
68+
}
69+
70+
$result = $this->adapter->route($hostname);
71+
$endpoint = $result->endpoint;
72+
}
73+
74+
$telemetry = null;
75+
if ($this->config->telemetry && !$this->config->fastPath) {
76+
$telemetry = new Telemetry(
77+
startTime: microtime(true),
78+
result: $result,
79+
);
80+
}
81+
82+
/** @var string $endpoint */
83+
if ($this->config->rawBackend) {
84+
$this->forwardRawRequest($request, $response, $endpoint, $telemetry);
85+
} else {
86+
$this->forwardRequest($request, $response, $endpoint, $telemetry);
87+
}
88+
89+
} catch (\Exception $e) {
90+
Console::error("Proxy error: {$e->getMessage()} in {$e->getFile()}:{$e->getLine()}");
91+
92+
$response->status(503);
93+
$response->header('Content-Type', 'application/json');
94+
$response->end(json_encode([
95+
'error' => 'Service Unavailable',
96+
'message' => 'The requested service is temporarily unavailable',
97+
]));
98+
}
99+
}
100+
101+
protected function forwardRequest(Request $request, Response $response, string $endpoint, ?Telemetry $telemetry = null): void
102+
{
103+
[$host, $port] = Adapter::parseEndpoint($endpoint, 80);
104+
105+
$poolKey = "{$host}:{$port}";
106+
if (!isset($this->pools[$poolKey])) {
107+
$this->pools[$poolKey] = new Channel($this->config->poolSize);
108+
}
109+
$pool = $this->pools[$poolKey];
110+
111+
$isNew = false;
112+
$client = $pool->pop($this->config->poolTimeout);
113+
if (!$client instanceof HttpClient) {
114+
$client = new HttpClient($host, $port);
115+
$client->set([
116+
'timeout' => $this->config->timeout,
117+
'keep_alive' => $this->config->keepAlive,
118+
]);
119+
$isNew = true;
120+
}
121+
122+
if ($this->config->fastPath) {
123+
if ($isNew) {
124+
$client->setHeaders([
125+
'Host' => $port === 80 ? $host : "{$host}:{$port}",
126+
]);
127+
}
128+
} else {
129+
$headers = [];
130+
/** @var array<string, string> $requestHeaders */
131+
$requestHeaders = $request->header ?? [];
132+
foreach ($requestHeaders as $key => $value) {
133+
$lower = strtolower($key);
134+
if ($lower !== 'host' && $lower !== 'connection') {
135+
$headers[$key] = $value;
136+
}
137+
}
138+
$headers['Host'] = $port === 80 ? $host : "{$host}:{$port}";
139+
$client->setHeaders($headers);
140+
if (!empty($request->cookie)) {
141+
/** @var array<string, string> $cookies */
142+
$cookies = $request->cookie;
143+
$client->setCookies($cookies);
144+
}
145+
}
146+
147+
/** @var array<string, string> $requestServer */
148+
$requestServer = $request->server ?? [];
149+
$method = strtoupper($requestServer['request_method'] ?? 'GET');
150+
$path = $requestServer['request_uri'] ?? '/';
151+
$query = $requestServer['query_string'] ?? '';
152+
if ($query !== '') {
153+
$path .= '?'.$query;
154+
}
155+
$body = '';
156+
if ($method !== 'GET' && $method !== 'HEAD') {
157+
$body = $request->getContent() ?: '';
158+
}
159+
160+
switch ($method) {
161+
case 'GET':
162+
$client->get($path);
163+
break;
164+
case 'POST':
165+
$client->post($path, $body);
166+
break;
167+
case 'HEAD':
168+
$client->setMethod($method);
169+
$client->execute($path);
170+
break;
171+
default:
172+
$client->setMethod($method);
173+
if ($body !== '') {
174+
$client->setData($body);
175+
}
176+
$client->execute($path);
177+
break;
178+
}
179+
180+
if (!$this->config->fastPathAssumeOk) {
181+
$response->status($client->statusCode);
182+
}
183+
184+
if (!$this->config->fastPath) {
185+
if (!empty($client->headers)) {
186+
/** @var array<string, string> $responseHeaders */
187+
$responseHeaders = $client->headers;
188+
foreach ($responseHeaders as $key => $value) {
189+
$response->header($key, $value);
190+
}
191+
}
192+
193+
if (!empty($client->set_cookie_headers)) {
194+
/** @var list<string> $cookieHeaders */
195+
$cookieHeaders = $client->set_cookie_headers;
196+
foreach ($cookieHeaders as $cookie) {
197+
$response->header('Set-Cookie', $cookie);
198+
}
199+
}
200+
}
201+
202+
$this->addTelemetryHeaders($response, $telemetry);
203+
204+
$response->end($client->body);
205+
206+
if ($client->connected) {
207+
if (!$pool->push($client, 0.001)) {
208+
$client->close();
209+
}
210+
} else {
211+
$client->close();
212+
}
213+
}
214+
215+
/**
216+
* Raw TCP HTTP forwarder for benchmark-only usage.
217+
*
218+
* Assumptions:
219+
* - Backend replies with Content-Length (no chunked encoding).
220+
* - Only GET/HEAD are supported; other methods fall back to HTTP client.
221+
*/
222+
protected function forwardRawRequest(Request $request, Response $response, string $endpoint, ?Telemetry $telemetry = null): void
223+
{
224+
/** @var array<string, string> $requestServer */
225+
$requestServer = $request->server ?? [];
226+
$method = strtoupper($requestServer['request_method'] ?? 'GET');
227+
if ($method !== 'GET' && $method !== 'HEAD') {
228+
$this->forwardRequest($request, $response, $endpoint, $telemetry);
229+
230+
return;
231+
}
232+
233+
[$host, $port] = Adapter::parseEndpoint($endpoint, 80);
234+
235+
$poolKey = "raw:{$host}:{$port}";
236+
if (!isset($this->pools[$poolKey])) {
237+
$this->pools[$poolKey] = new Channel($this->config->poolSize);
238+
}
239+
$pool = $this->pools[$poolKey];
240+
241+
$client = $pool->pop($this->config->poolTimeout);
242+
if (!$client instanceof CoroutineClient || !$client->isConnected()) {
243+
$client = new CoroutineClient(SWOOLE_SOCK_TCP);
244+
$client->set([
245+
'timeout' => $this->config->timeout,
246+
]);
247+
if (!$client->connect($host, $port, $this->config->connectTimeout)) {
248+
$response->status(502);
249+
$response->end('Bad Gateway');
250+
251+
return;
252+
}
253+
}
254+
255+
$path = $requestServer['request_uri'] ?? '/';
256+
$query = $requestServer['query_string'] ?? '';
257+
if ($query !== '') {
258+
$path .= '?'.$query;
259+
}
260+
$hostHeader = $port === 80 ? $host : "{$host}:{$port}";
261+
$requestLine = $method.' '.$path." HTTP/1.1\r\n".
262+
'Host: '.$hostHeader."\r\n".
263+
"Connection: keep-alive\r\n\r\n";
264+
265+
if ($client->send($requestLine) === false) {
266+
$client->close();
267+
$response->status(502);
268+
$response->end('Bad Gateway');
269+
270+
return;
271+
}
272+
273+
$buffer = '';
274+
while (strpos($buffer, "\r\n\r\n") === false) {
275+
/** @var string|false $chunk */
276+
$chunk = $client->recv(8192);
277+
if ($chunk === '' || $chunk === false) {
278+
$client->close();
279+
$response->status(502);
280+
$response->end('Bad Gateway');
281+
282+
return;
283+
}
284+
$buffer .= $chunk;
285+
}
286+
287+
[$headerPart, $bodyPart] = explode("\r\n\r\n", $buffer, 2);
288+
$contentLength = null;
289+
$statusCode = 200;
290+
$chunked = false;
291+
292+
$lines = explode("\r\n", $headerPart);
293+
if (preg_match('/^HTTP\/\\d+\\.\\d+\\s+(\\d+)/', $lines[0], $matches)) {
294+
$statusCode = (int) $matches[1];
295+
}
296+
$skipHeaders = ['connection', 'keep-alive', 'transfer-encoding', 'content-length'];
297+
for ($i = 1; $i < count($lines); $i++) {
298+
$colonPos = strpos($lines[$i], ':');
299+
if ($colonPos === false) {
300+
continue;
301+
}
302+
$key = substr($lines[$i], 0, $colonPos);
303+
$value = trim(substr($lines[$i], $colonPos + 1));
304+
$lower = strtolower($key);
305+
if ($lower === 'content-length') {
306+
$contentLength = (int) $value;
307+
} elseif ($lower === 'transfer-encoding' && stripos($value, 'chunked') !== false) {
308+
$chunked = true;
309+
}
310+
if (!in_array($lower, $skipHeaders, true)) {
311+
$response->header($key, $value);
312+
}
313+
}
314+
315+
if (!$this->config->rawBackendAssumeOk) {
316+
$response->status($statusCode);
317+
}
318+
319+
if ($chunked || $contentLength === null) {
320+
$response->end($bodyPart);
321+
$client->close();
322+
323+
return;
324+
}
325+
326+
/** @var string $bodyPartStr */
327+
$bodyPartStr = $bodyPart;
328+
$body = $bodyPartStr;
329+
$remaining = $contentLength - strlen($bodyPartStr);
330+
while ($remaining > 0) {
331+
$chunk = $client->recv(min(8192, $remaining));
332+
if ($chunk === '' || $chunk === false) {
333+
$client->close();
334+
$response->status(502);
335+
$response->end('Bad Gateway');
336+
337+
return;
338+
}
339+
/** @var string $chunkStr */
340+
$chunkStr = $chunk;
341+
$body .= $chunkStr;
342+
$remaining -= strlen($chunkStr);
343+
}
344+
345+
$this->addTelemetryHeaders($response, $telemetry);
346+
347+
$response->end($body);
348+
349+
if ($client->isConnected()) {
350+
if (!$pool->push($client, 0.001)) {
351+
$client->close();
352+
}
353+
} else {
354+
$client->close();
355+
}
356+
}
357+
358+
protected function addTelemetryHeaders(Response $response, ?Telemetry $telemetry): void
359+
{
360+
if ($telemetry === null) {
361+
return;
362+
}
363+
364+
$latency = round((microtime(true) - $telemetry->startTime) * 1000, 2);
365+
$response->header('X-Proxy-Latency-Ms', (string) $latency);
366+
367+
if ($telemetry->result !== null) {
368+
$response->header('X-Proxy-Protocol', $telemetry->result->protocol->value);
369+
370+
if (isset($telemetry->result->metadata['cached'])) {
371+
$response->header('X-Proxy-Cache', $telemetry->result->metadata['cached'] ? 'HIT' : 'MISS');
372+
}
373+
}
374+
}
375+
376+
protected function isValidHostname(string $hostname): bool
377+
{
378+
$host = preg_replace('/:\d+$/', '', $hostname);
379+
if ($host === null) {
380+
return false;
381+
}
382+
383+
return (new Hostname())->isValid($host);
384+
}
385+
}

0 commit comments

Comments
 (0)