Skip to content

Commit 3cfb2b4

Browse files
TASK: Netlogix.JobQueue.FastRabbit depends on Netlogix.Supervisor
Original: dd0a17b668fa7b1ea93608f936a88d3feb00c164
1 parent d869d24 commit 3cfb2b4

7 files changed

Lines changed: 280 additions & 261 deletions

File tree

Classes/Command/SupervisorCommandController.php

Lines changed: 0 additions & 162 deletions
This file was deleted.

Classes/Job/ConfigurationFactory.php

Lines changed: 87 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,96 +5,132 @@
55

66
use Flowpack\JobQueue\Common\Queue\QueueManager;
77
use Neos\Flow\Annotations as Flow;
8+
use Neos\Flow\Configuration\ConfigurationManager;
9+
use Neos\Flow\Core\Booting\Scripts;
10+
use Neos\Utility\ObjectAccess;
11+
use function defined;
12+
use function preg_replace;
13+
use function rtrim;
14+
use function sprintf;
15+
use function strtolower;
16+
use function trim;
817

918
class ConfigurationFactory
1019
{
11-
const __CONFIG_FILE__ = '__CONFIG_FILE__';
12-
const __QUEUE_NAME__ = '__QUEUE_NAME__';
13-
const __JOB_NAME__ = '__JOB_NAME__';
14-
const __CONTEXT__ = '__CONTEXT__';
15-
const __NUMPROCS__ = '__NUMPROCS__';
16-
17-
/**
18-
* @var string
19-
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.contextName")
20-
*/
21-
protected $contextName;
22-
2320
/**
2421
* @var string
25-
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.programTemplate")
22+
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.applicationName")
2623
*/
27-
protected $programTemplate;
24+
protected $applicationName;
2825

2926
/**
30-
* @var string
31-
* @Flow\InjectConfiguration(package="Netlogix.JobQueue.FastRabbit", path="supervisor.groupTemplate")
27+
* @var QueueManager
3228
*/
33-
protected $groupTemplate;
29+
protected $queueManager;
3430

3531
/**
36-
* @var QueueManager
37-
* @Flow\Inject
32+
* @var ConfigurationManager
3833
*/
39-
protected $queueManager;
34+
private $configurationManager;
4035

41-
public function getShortNameForQueueName(string $queueName): string
36+
public function injectQueueManager(QueueManager $queueManager): void
4237
{
43-
return preg_replace('%[^a-z0-9]%iUm', '-', strtolower($queueName));
38+
$this->queueManager = $queueManager;
4439
}
4540

46-
public function getJobNameForQueueName(string $queueName): string
41+
public function injectConfigurationManager(ConfigurationManager $configurationManager): void
4742
{
48-
return $this->getContextName() . '-' . $this->getShortNameForQueueName($queueName);
43+
$this->configurationManager = $configurationManager;
4944
}
5045

51-
public function buildJobConfigurationForQueue(string $queueName): string
46+
/**
47+
* @param string $queueName
48+
* @return array<string, mixed>
49+
* @throws \Flowpack\JobQueue\Common\Exception
50+
* @throws \Neos\Flow\Configuration\Exception\InvalidConfigurationTypeException
51+
* @throws \Neos\Flow\Exception
52+
*/
53+
public function buildJobConfiguration(string $queueName): array
5254
{
53-
$jobName = $this->getJobNameForQueueName($queueName);
54-
$queueSettings = $this->queueManager->getQueueSettings($queueName);
55-
$fastRabbitSettings = $queueSettings['fastRabbit'] ?? [];
56-
$numProcs = (int)($fastRabbitSettings['numProcs'] ?? 1);
57-
58-
$config = $this->programTemplate;
59-
$config = str_replace(self::__CONFIG_FILE__, $this->getJobConfigurationFile($queueName), $config);
60-
$config = str_replace(self::__QUEUE_NAME__, $queueName, $config);
61-
$config = str_replace(self::__JOB_NAME__, $jobName, $config);
62-
$config = str_replace(self::__CONTEXT__, $this->contextName, $config);
63-
$config = str_replace(self::__NUMPROCS__, $numProcs, $config);
64-
65-
return $config;
55+
assert(defined('FLOW_PATH_FLOW'));
56+
assert(defined('FLOW_PATH_TEMPORARY_BASE'));
57+
58+
$flowSettings = (array)$this->configurationManager->getConfiguration(
59+
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
60+
'Neos.Flow'
61+
);
62+
63+
$command = Scripts::buildPhpCommand(
64+
$flowSettings
65+
);
66+
$command .= sprintf(
67+
' %s %s --queue=%s',
68+
escapeshellarg(\FLOW_PATH_FLOW . 'Scripts/flow.php'),
69+
escapeshellarg('flowpack.jobqueue.common:job:execute'),
70+
escapeshellarg($queueName)
71+
);
72+
73+
$workerPool = (array)$this->configurationManager->getConfiguration(
74+
ConfigurationManager::CONFIGURATION_TYPE_SETTINGS,
75+
'Netlogix.JobQueue.FastRabbit.supervisor.workerPool'
76+
);
77+
78+
$jobConfig = [
79+
'command' => $command,
80+
'queueSettings' => $this->queueManager->getQueueSettings($queueName),
81+
'cacheConfiguration' => $this->configurationManager->getConfiguration(
82+
ConfigurationManager::CONFIGURATION_TYPE_CACHES,
83+
'FlowPackJobQueueCommon_MessageCache'
84+
),
85+
'queueName' => $queueName,
86+
'temporaryDirectoryBase' => \FLOW_PATH_TEMPORARY_BASE,
87+
'applicationIdentifier' => ObjectAccess::getPropertyPath($flowSettings, 'cache.applicationIdentifier'),
88+
'contextString' => ObjectAccess::getPropertyPath($flowSettings, 'core.context'),
89+
'workerPool' => $workerPool,
90+
];
91+
92+
return $jobConfig;
6693
}
6794

68-
public function buildGroupConfigurationForQueues(string ...$queueNames): string
95+
public function getJobName(string $queueName): string
6996
{
70-
$jobNames = array_map([$this, 'getJobNameForQueueName'], $queueNames);
71-
72-
$programs = $this->groupTemplate;
73-
$programs = str_replace('__PROGRAMS__', join(',', $jobNames), $programs);
74-
$programs = str_replace('__CONTEXT__', $this->contextName, $programs);
75-
76-
return $programs;
97+
return self::removeInvalidCharactersFromSupervisorIdentifier($this->applicationName . '-' . $queueName);
7798
}
7899

79-
public function getJobConfigurationFile(string $queueName): string
100+
public function getJobConfigurationFilePath(string $queueName): string
80101
{
81102
return $this->getJobFilePath($queueName, 'json');
82103
}
83104

84-
public function getJobSupervisorFile(string $queueName): string
105+
public function getJobSupervisorFilePath(string $queueName): string
85106
{
86107
return $this->getJobFilePath($queueName, 'conf');
87108
}
88109

110+
public function getNumberOfProcesses(string $queueName): int
111+
{
112+
$queueSettings = $this->queueManager->getQueueSettings($queueName);
113+
return (int)(ObjectAccess::getPropertyPath($queueSettings, 'fastRabbit.numProcs') ?? 1);
114+
}
115+
89116
protected function getJobFilePath(string $queueName, string $suffix): string
90117
{
91-
$jobName = $this->getJobNameForQueueName($queueName);
118+
assert(defined('FLOW_PATH_CONFIGURATION'));
119+
$jobName = $this->getJobName($queueName);
92120
$pathPrefix = rtrim(FLOW_PATH_CONFIGURATION, '/') . '/Supervisor/';
93121
return sprintf($pathPrefix . 'program-%s.%s', $jobName, $suffix);
94122
}
95123

96-
protected function getContextName(): string
124+
private static function removeInvalidCharactersFromSupervisorIdentifier(string $subject): string
97125
{
98-
return trim(preg_replace('%[^\\pL]+%iUum', '-', $this->contextName), '-');
126+
/**
127+
* \p{xx} A character with the xx unicode property. https://php.net/manual/en/regexp.reference.escape.php
128+
* \pL Every Letter https://php.net/manual/en/regexp.reference.unicode.php
129+
*/
130+
$cleanupPattern = '%[^\\pL\\d]+%iUum';
131+
132+
$subject = (string)preg_replace($cleanupPattern, '-', $subject);
133+
$subject = trim($subject, '-');
134+
return strtolower($subject);
99135
}
100136
}

0 commit comments

Comments
 (0)