Skip to content
This repository was archived by the owner on Feb 11, 2026. It is now read-only.

Commit 71ddfa1

Browse files
committed
feat(dispatch): add contextual dispatch logic
1 parent 1af73dd commit 71ddfa1

3 files changed

Lines changed: 175 additions & 0 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
use Swoole\Table;
7+
8+
/**
9+
* Dispatch requests to workers according to the HTTP request message
10+
*/
11+
abstract class ContextualDispatch implements DispatchInterface
12+
{
13+
private Table $dispatchMap;
14+
15+
/**
16+
* @param int $dispatcherMapSize This should correspond to the max_connection paramter
17+
* https://wiki.swoole.com/en/#/server/setting?id=max_conn-max_connection
18+
*/
19+
public function __construct(readonly int $dispatcherMapSize)
20+
{
21+
$this->dispatchMap = new Table($dispatcherMapSize);
22+
$this->dispatchMap->column('workerId', Table::TYPE_INT);
23+
$this->dispatchMap->create();
24+
}
25+
26+
/**
27+
* {@inheritdoc}
28+
*/
29+
public function __invoke(Server $server, int $fd, int $type, ?string $data = null): int
30+
{
31+
if ($this->dispatchMap->exists($fd)) {
32+
$workerId = $this->dispatchMap->get($fd, 'workerId');
33+
} else {
34+
$workerId = $this->resolveWorkerId($server, $data);
35+
$this->dispatchMap->set($fd, ['workerId' => $workerId]);
36+
}
37+
if ($type == self::CONNECTION_CLOSE) {
38+
$this->dispatchMap->delete($fd);
39+
}
40+
return $workerId;
41+
}
42+
43+
/**
44+
* Extract request identifying information from a request message
45+
*
46+
* @param Server $server
47+
* @param ?string $data
48+
*
49+
* @return int
50+
*/
51+
abstract protected function resolveWorkerId(Server $server, ?string $data): int;
52+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
7+
interface DispatchInterface
8+
{
9+
/**
10+
* Connection dispatch type
11+
*
12+
* @link https://www.swoole.co.uk/docs/modules/swoole-server/configuration#dispatch_func
13+
*/
14+
public const CONNECTION_FETCH = 10;
15+
public const CONNECTION_START = 5;
16+
public const CONNECTION_CLOSE = 4;
17+
18+
/**
19+
* Resolve requests to corresponding worker processes
20+
*
21+
* @param Server $server
22+
* @param int $fd Client ID number
23+
* @param int $type Dispatch type
24+
* @param ?string $data Request packet data (0-8180 bytes)
25+
*
26+
* @return int Worker ID number
27+
*/
28+
public function __invoke(Server $server, int $fd, int $type, ?string $data = null): int;
29+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
namespace Utopia\Swoole\Dispatch;
4+
5+
use Swoole\Server;
6+
7+
abstract class RiskyRequest extends ContextualDispatch
8+
{
9+
/**
10+
* @param float $riskyWorkersPercent Decimal form 0 to 1
11+
*/
12+
public function __construct(
13+
int $dispatcherMapSize,
14+
private readonly float $riskyWorkersPercent
15+
) {
16+
parent::__construct($dispatcherMapSize);
17+
if ($this->riskyWorkersPercent < 0 || $this->riskyWorkersPercent > 1) {
18+
throw new \InvalidArgumentException('riskyWorkersPercent must be >=0 && <=1');
19+
}
20+
}
21+
22+
/**
23+
* @param string $request
24+
* @param string $domain
25+
* @return bool
26+
*/
27+
abstract protected function isRisky(string $request, string $domain): bool;
28+
29+
protected function randomRiskyWorker(int $riskyWorkers, int $totalWorkers): int
30+
{
31+
return rand($riskyWorkers, $totalWorkers - 1);
32+
}
33+
34+
protected function randomSafeWorker(int $riskyWorkers): int
35+
{
36+
return rand(0, $riskyWorkers - 1);
37+
}
38+
39+
protected function resolveWorkerId(Server $server, ?string $data): int
40+
{
41+
$totalWorkers = $server->setting['worker_num'];
42+
43+
// If data is not set, we can send the request to any worker.
44+
// First we try to pick an idle worker, otherwise we randomly pick a worker.
45+
if (empty($data)) {
46+
for ($i = 0; $i < $totalWorkers; $i++) {
47+
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
48+
return $i;
49+
}
50+
}
51+
return rand(0, $totalWorkers - 1);
52+
}
53+
54+
// Each worker has a numeric ID, starting from 0 and incrementing
55+
// From 0 to $riskyWorkers, we consider safe workers
56+
// From $riskyWorkers to $totalWorkers, we consider risky workers
57+
$riskyWorkers = (int) floor($totalWorkers * $this->riskyWorkersPercent); // Absolute number of risky workers
58+
59+
$headers = explode("\n", strstr($data, "\r\n", true));
60+
$request = $headers[0];
61+
$domain = '';
62+
if (count($headers) > 1) {
63+
$domain = trim(explode('Host: ', $headers[1])[1]);
64+
}
65+
66+
$risky = $this->isRisky($request, $domain);
67+
68+
if ($risky) {
69+
// If risky request, only consider risky workers
70+
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
71+
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
72+
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
73+
// If idle worker found, give to him
74+
return $j;
75+
}
76+
}
77+
78+
// If no idle workers, give to random risky worker
79+
return $this->randomRiskyWorker($riskyWorkers, $totalWorkers);
80+
}
81+
82+
// If safe request, give to any idle worker
83+
// It's fine to pick a risky worker here because it's idle. Idle is never actually risky
84+
for ($i = 0; $i < $totalWorkers; $i++) {
85+
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
86+
return $i;
87+
}
88+
}
89+
90+
// If no idle worker found, give to a random safe worker
91+
// We avoid risky workers here, as it could be in work - not idle. That's exactly when they are risky.
92+
return $this->randomSafeWorker($riskyWorkers);
93+
}
94+
}

0 commit comments

Comments
 (0)