Skip to content
This repository was archived by the owner on Apr 29, 2026. It is now read-only.

Commit 7f4570d

Browse files
committed
feat(dispatch): add contextual dispatch logic
1 parent 1af73dd commit 7f4570d

3 files changed

Lines changed: 173 additions & 0 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
use Swoole\Table;
7+
8+
/**
9+
* Dispatch requests to workers according to the HTTP request message
10+
*/
11+
abstract class ContextualDispatch implements DispatchInterface
12+
{
13+
private Table $dispatchMap;
14+
15+
/**
16+
* @param int $dispatcherMapSize This should correspond to the max_connection paramter
17+
* https://wiki.swoole.com/en/#/server/setting?id=max_conn-max_connection
18+
*/
19+
public function __construct(readonly int $dispatcherMapSize)
20+
{
21+
$this->dispatchMap = new Table($dispatcherMapSize);
22+
$this->dispatchMap->column('workerId', Table::TYPE_INT);
23+
$this->dispatchMap->create();
24+
}
25+
26+
/**
27+
* {@inheritdoc}
28+
*/
29+
public function __invoke(Server $server, int $fd, int $type, ?string $data = null): int
30+
{
31+
if ($this->dispatchMap->exists($fd)) {
32+
$workerId = $this->dispatchMap->get($fd, 'workerId');
33+
} else {
34+
$workerId = $this->resolveWorkerId($server, $data);
35+
$this->dispatchMap->set($fd, ['workerId' => $workerId]);
36+
}
37+
if ($type == self::CONNECTION_CLOSE) {
38+
$this->dispatchMap->delete($fd);
39+
}
40+
return $workerId;
41+
}
42+
43+
/**
44+
* Extract request identifying information from a request message
45+
*
46+
* @param Server $server
47+
* @param ?string $data
48+
*
49+
* @return int
50+
*/
51+
abstract protected function resolveWorkerId(Server $server, ?string $data): int;
52+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
7+
interface DispatchInterface
8+
{
9+
/**
10+
* Connection dispatch type
11+
*
12+
* @link https://www.swoole.co.uk/docs/modules/swoole-server/configuration#dispatch_func
13+
*/
14+
public const CONNECTION_FETCH = 10;
15+
public const CONNECTION_START = 5;
16+
public const CONNECTION_CLOSE = 4;
17+
18+
/**
19+
* Resolve requests to corresponding worker processes
20+
*
21+
* @param Server $server
22+
* @param int $fd Client ID number
23+
* @param int $type Dispatch type
24+
* @param ?string $data Request packet data (0-8180 bytes)
25+
*
26+
* @return int Worker ID number
27+
*/
28+
public function __invoke(Server $server, int $fd, int $type, ?string $data = null): int;
29+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
7+
abstract class RiskyRequest extends ContextualDispatch
8+
{
9+
/**
10+
* @param int $riskyWorkersPercent 0 - 100
11+
*/
12+
public function __construct(
13+
readonly int $dispatcherMapSize,
14+
private readonly int $riskyWorkersPercent
15+
)
16+
{
17+
parent::__construct($dispatcherMapSize);
18+
}
19+
20+
/**
21+
* @param string $request
22+
* @param string $domain
23+
* @return bool
24+
*/
25+
abstract protected function isRisky(string $request, string $domain): bool;
26+
27+
protected function randomRiskyWorker(int $riskyWorkers, int $totalWorkers): int
28+
{
29+
return rand($riskyWorkers, $totalWorkers - 1);
30+
}
31+
32+
protected function randomSafeWorker(int $riskyWorkers): int
33+
{
34+
return rand(0, $riskyWorkers - 1);
35+
}
36+
37+
protected function resolveWorkerId(Server $server, ?string $data): int
38+
{
39+
$totalWorkers = $server->setting['worker_num'];
40+
41+
// If data is not set, we can send the request to any worker.
42+
// First we try to pick an idle worker, otherwise we randomly pick a worker.
43+
if (empty($data)) {
44+
for ($i = 0; $i < $totalWorkers; $i++) {
45+
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
46+
return $i;
47+
}
48+
}
49+
return rand(0, $totalWorkers - 1);
50+
}
51+
52+
// Each worker has a numeric ID, starting from 0 and incrementing
53+
// From 0 to $riskyWorkers, we consider safe workers
54+
// From $riskyWorkers to $totalWorkers, we consider risky workers
55+
$riskyWorkers = (int) floor($totalWorkers * $this->riskyWorkersPercent); // Absolute number of risky workers
56+
57+
$headers = explode("\n", strstr($data, "\r\n", true));
58+
$request = $headers[0];
59+
$domain = '';
60+
if (count($headers) > 1) {
61+
$domain = trim(explode('Host: ', $headers[1])[1]);
62+
}
63+
64+
$risky = $this->isRisky($request, $domain);
65+
66+
if ($risky) {
67+
// If risky request, only consider risky workers
68+
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
69+
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
70+
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
71+
// If idle worker found, give to him
72+
return $j;
73+
}
74+
}
75+
76+
// If no idle workers, give to random risky worker
77+
return $this->randomRiskyWorker($riskyWorkers, $totalWorkers);
78+
}
79+
80+
// If safe request, give to any idle worker
81+
// It's fine to pick a risky worker here because it's idle. Idle is never actually risky
82+
for ($i = 0; $i < $totalWorkers; $i++) {
83+
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
84+
return $i;
85+
}
86+
}
87+
88+
// If no idle worker found, give to a random safe worker
89+
// We avoid risky workers here, as it could be in work - not idle. That's exactly when they are risky.
90+
return $this->randomSafeWorker($riskyWorkers);
91+
}
92+
}

0 commit comments

Comments
 (0)