Skip to content

Commit a1bd89d

Browse files
committed
Merge branch '5.x-dev' into feature/add_redis_cluster_capability
2 parents 374b6ff + 0102f83 commit a1bd89d

15 files changed

Lines changed: 261 additions & 62 deletions

Commands/Monitor.php

Lines changed: 126 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand
1919
protected function configure()
2020
{
2121
$this->setName('queuedtracking:monitor');
22-
$this->setDescription('Shows and updates the current state of the queue every 2 seconds.');
22+
$this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit");
2323
$this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.');
24+
$this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16);
2425
}
2526

2627
/**
@@ -36,6 +37,9 @@ protected function doExecute(): int
3637
$systemCheck->checkRedisIsInstalled();
3738
}
3839

40+
$output->write(str_repeat("\r\n", 100));
41+
$output->write("\e[".(100)."A");
42+
3943
$iterations = $this->getIterationsFromArg();
4044
if ($iterations !== null) {
4145
$output->writeln("<info>Only running " . $iterations . " iterations.</info>");
@@ -58,34 +62,118 @@ protected function doExecute(): int
5862
$output->writeln('The command <comment>./console queuedtracking:process</comment> has to be executed to process request sets within queue');
5963
}
6064

61-
$output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues()));
62-
$output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue',
65+
$output->writeln(sprintf('Up to <info>%d</> workers will be used', $manager->getNumberOfAvailableQueues()));
66+
$output->writeln(sprintf('Processor will start once there are at least <info>%s</> request sets in the queue',
6367
$manager->getNumberOfRequestsToProcessAtSameTime()));
6468
$iterationCount = 0;
69+
70+
$qCurrentPage = 1;
71+
$qCount = count($queues);
72+
$qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount);
73+
$qPageCount = ceil($qCount / $qPerPAge);
74+
75+
readline_callback_handler_install('', function() {});
76+
stream_set_blocking (STDIN, false);
77+
78+
$output->writeln(str_repeat("-", 30));
79+
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20)."</>");
80+
$output->writeln(str_repeat("-", 30));
81+
82+
$lastStatsTimer = microtime(true) - 2;
83+
$lastSumInQueue = false;
84+
$diffSumInQueue = 0;
85+
$keyPressed = "";
86+
87+
$output->write(str_repeat("\r\n", $qPerPAge + 5));
6588

6689
while (1) {
67-
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
90+
if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "")
91+
{
92+
$output->write("\e[".($qPerPAge + 5)."A");
93+
94+
$qCurrentPage = min(max($qCurrentPage, 1), $qPageCount);
95+
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
96+
97+
$sumInQueue = 0;
98+
foreach ($queues as $sumQ) {
99+
$sumInQueue += $sumQ->getNumberOfRequestSetsInQueue();
100+
}
101+
102+
if ($lastSumInQueue !== false) {
103+
$diffSumInQueue = $lastSumInQueue - $sumInQueue;
104+
$diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2);
105+
$diffSumInQueue = $diffSumInQueue < 0 ? "<fg=red;options=bold>".abs($diffRps)."</>" : "<fg=green;options=bold>{$diffRps}</>";
106+
}
107+
108+
$numInQueue = 0;
109+
for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) {
110+
$idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage;
111+
if (isset($queues[$idx])) {
112+
$q = $queues[$idx]->getNumberOfRequestSetsInQueue();
113+
$numInQueue += (int)$q;
114+
$output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT));
115+
} else {
116+
$output->writeln(str_pad("", 10)." | ".str_pad("", 16));
117+
}
118+
}
119+
120+
$output->writeln(str_repeat("-", 30));
121+
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16)."</>");
122+
$output->writeln(str_repeat("-", 30));
123+
$output->writeln(sprintf(
124+
"Q [%s-%s] | <info>page %s/%s</> | <comment>press (0-9.,q) or arrow(L,R,U,D)</> | diff/sec %s \n".
125+
"%s used memory (%s peak). <info>%d</> workers active.".str_repeat(" ", 15),
126+
($idx - $qPerPAge + 1),
127+
$idx, $qCurrentPage, $qPageCount, $diffSumInQueue,
128+
$memory['used_memory_human'] ?? 'Unknown',
129+
$memory['used_memory_peak_human'] ?? 'Unknown',
130+
$lock->getNumberOfAcquiredLocks()
131+
));
132+
133+
if (!is_null($iterations)) {
134+
$iterationCount += 1;
135+
if ($iterationCount >= $iterations) {
136+
break;
137+
}
138+
}
68139

69-
$numInQueue = array();
70-
foreach ($queues as $queue) {
71-
$numInQueue[] = $queue->getNumberOfRequestSetsInQueue();
140+
$lastSumInQueue = $sumInQueue;
141+
$lastStatsTimer = microtime(true);
72142
}
73143

74-
$message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ',
75-
array_sum($numInQueue),
76-
implode('+', $numInQueue),
77-
$memory['used_memory_human'] ?? 'Unknown',
78-
$memory['used_memory_peak_human'] ?? 'unknown',
79-
$lock->getNumberOfAcquiredLocks());
80-
$output->write("\x0D");
81-
$output->write($message);
82-
if (!is_null($iterations)) {
83-
$iterationCount += 1;
84-
if ($iterationCount >= $iterations) {
85-
break;
144+
$keyStroke = stream_get_contents(STDIN, 3);
145+
$keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : "");
146+
if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) {
147+
switch ($keyPressed) {
148+
case "0": case "1": case "2": case "3": case "4":
149+
case "5": case "6": case "7": case "8": case "9":
150+
$keyPressed = $keyPressed != "0" ? $keyPressed : "10";
151+
$qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break;
152+
case "C":
153+
$qCurrentPage++;
154+
break;
155+
case "D":
156+
$qCurrentPage--;
157+
break;
158+
case "A":
159+
$qCurrentPage += 10;
160+
break;
161+
case "B":
162+
$qCurrentPage -= 10;
163+
break;
164+
case ",":
165+
$qCurrentPage = 1;
166+
break;
167+
case ".":
168+
$qCurrentPage = $qPageCount;
169+
break;
170+
case "q":
171+
$output->writeln('');
172+
die;
86173
}
87174
}
88-
sleep(2);
175+
176+
usleep(5000);
89177
}
90178

91179
return self::SUCCESS;
@@ -112,4 +200,22 @@ private function getIterationsFromArg()
112200
return $iterations;
113201
}
114202

203+
/**
204+
* Loads the `perpage` argument from the commands arguments.
205+
*
206+
* @return int|null
207+
*/
208+
private function getPerPageFromArg()
209+
{
210+
$perPage = $this->getInput()->getOption('perpage');
211+
if (!is_numeric($perPage)) {
212+
throw new \Exception('perpage needs to be numeric');
213+
} else {
214+
$perPage = (int)$perPage;
215+
if ($perPage <= 0) {
216+
throw new \Exception('perpage needs to be a non-zero positive number');
217+
}
218+
}
219+
return $perPage;
220+
}
115221
}

Commands/Process.php

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ protected function configure()
2626
$this->setName('queuedtracking:process');
2727
$this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.');
2828
$this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.');
29+
$this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1);
30+
$this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1);
31+
$this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0);
2932
$this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the <comment>--verbose</comment> option or execute the <comment>queuedtracking:monitor</comment> command.');
3033
}
3134

@@ -76,29 +79,84 @@ protected function doExecute(): int
7679
throw new \Exception('Number of requests to process must be a number and at least 1');
7780
}
7881

79-
$output->writeln("<info>Starting to process request sets, this can take a while</info>");
80-
8182
register_shutdown_function(function () use ($queueManager) {
8283
$queueManager->unlock();
8384
});
8485

85-
$startTime = microtime(true);
86-
$processor = new Processor($queueManager);
87-
$processor->setNumberOfMaxBatchesToProcess(500);
88-
$tracker = $processor->process();
8986

90-
$neededTime = (microtime(true) - $startTime);
91-
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
92-
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);
9387

94-
Piwik::postEvent('Tracker.end');
88+
$numberOfProcessCycle = $input->getOption('cycle');
89+
if (!is_numeric($numberOfProcessCycle)) {
90+
throw new \Exception('"cycle" needs to be numeric');
91+
}
92+
$numberOfProcessCycle = (int)$numberOfProcessCycle;
93+
$infiniteCycle = $numberOfProcessCycle == 0;
94+
95+
$delayedBeforeFinish = (int)$input->getOption('delay');
96+
97+
$napster = max(1, $input->getOption('sleep'));
98+
if (!is_numeric($napster)) {
99+
throw new \Exception('"nap" needs to be numeric');
100+
}
101+
$napster = (int)$napster;
102+
103+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
104+
$originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime();
105+
106+
while ($numberOfProcessCycle > 0 || $infiniteCycle) {
107+
$wipingOutQueue = false;
108+
if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) {
109+
$queueManager->setNumberOfRequestsToProcessAtSameTime(1);
110+
$wipingOutQueue = true;
111+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
112+
}
113+
114+
if ($wipingOutQueue) {
115+
$output->writeln("<fg=red;bg=white;options=bold> TRYING TO WIPE OUT THE QUEUE </>");
116+
}
117+
$output->writeln("<info>Starting to process request sets, this can take a while</info>");
118+
119+
$startTime = microtime(true);
120+
$processor = new Processor($queueManager);
121+
$processor->setNumberOfMaxBatchesToProcess(500);
122+
$tracker = $processor->process();
123+
124+
$neededTime = (microtime(true) - $startTime);
125+
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
126+
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);
127+
128+
$this->writeSuccessMessage(
129+
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
130+
);
131+
Piwik::postEvent('Tracker.end');
132+
133+
if ($numRequestsTracked > 0) {
134+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
135+
}
136+
137+
if (!$infiniteCycle) {
138+
$numberOfProcessCycle--;
139+
}
140+
if ($numberOfProcessCycle > 0 || $infiniteCycle) {
141+
$cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle;
142+
$output->writeln("===========================================================================");
143+
$output->writeln("<comment>Taking a nap for {$napster} second(s), before re-running the process. <info>({$cTogo})</info> cyle(s) to go.</comment>");
144+
$output->writeln("===========================================================================");
145+
sleep($napster);
146+
}
147+
148+
if ($wipingOutQueue) {
149+
$queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime);
150+
}
151+
}
95152

153+
// Piwik::postEvent('Tracker.end');
96154
$trackerEnvironment->destroy();
97155

98-
$this->writeSuccessMessage(
99-
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
100-
);
101-
156+
if ($delayedBeforeFinish > 0) {
157+
sleep($delayedBeforeFinish);
158+
}
159+
102160
return self::SUCCESS;
103161
}
104162

Queue/Manager.php

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,17 @@ private function getVisitorIdFromRequest(Tracker\Request $request)
213213

214214
protected function getQueueIdForVisitor($visitorId)
215215
{
216-
$visitorId = strtolower(substr($visitorId, 0, 1));
217-
218-
if (isset($this->mappingLettersToNumeric[$visitorId])) {
219-
$id = $this->mappingLettersToNumeric[$visitorId];
220-
} else {
221-
$id = ord($visitorId);
216+
$visitorId = strtolower(substr($visitorId, 0, 3));
217+
if (ctype_xdigit($visitorId) === true) {
218+
$id = hexdec($visitorId);
222219
}
223-
220+
else {
221+
$pos1 = ord($visitorId);
222+
$pos2 = isset($visitorId[1]) ? ord($visitorId[1]) : $pos1;
223+
$pos3 = isset($visitorId[2]) ? ord($visitorId[2]) : $pos2;
224+
$id = $pos1 + $pos2 + $pos3;
225+
}
226+
224227
return $id % $this->numQueuesAvailable;
225228
}
226229

SystemSettings.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,9 @@ private function createNumberOfQueueWorkerSetting()
214214
$field->title = Piwik::translate('QueuedTracking_NumberOfQueueWorkersFieldTitle');
215215
$field->uiControl = FieldConfig::UI_CONTROL_TEXT;
216216
$field->uiControlAttributes = array('size' => 5);
217-
$field->inlineHelp = Piwik::translate('QueuedTracking_NumberOfQueueWorkersFieldHelp') . '</br>';
217+
$field->inlineHelp = Piwik::translate('QueuedTracking_NumberOfQueueWorkersFieldHelpNew') . '</br>';
218218
$this->assignValueIsIntValidator($field);
219-
$field->validators[] = new NumberRange(1, 16);
219+
$field->validators[] = new NumberRange(1, 4096);
220220
});
221221

222222
$this->addSetting($numQueueWorkers);

docs/faq.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ requests using the [Piwik console](http://developer.piwik.org/guides/piwik-on-th
3232
* Disable the setting "Process during tracking request" in the Piwik UI under "Settings => Plugin Settings"
3333
* Setup a cronjob that executes the command `./console queuedtracking:process` for instance every minute
3434
* That's it
35+
* Or, if you have __"non WINDOWS OS"__ you can use the [Supervisor](http://supervisord.org/) as a cron alternative.
3536

3637
The `queuedtracking:process` command will make sure to process all queued tracking requests whenever possible and the
3738
command will exit as soon as there are not enough requests queued anymore. That's why you should setup a cronjob to start
@@ -43,6 +44,28 @@ Example crontab entry that starts the processor every minute:
4344

4445
`* * * * * cd /piwik && ./console queuedtracking:process >/dev/null 2>&1`
4546

47+
Example Supervisor entry that will start 16 processors/workers with 10 loop cycle times and auto restart:
48+
49+
```ini
50+
[program:matomo]
51+
directory=/path/to/your/matomo
52+
command=/path/to/your/php /path/to/your/matomo/console queuedtracking:process --queue-id=%(process_num)s -c 10 -s 2 -d 5
53+
process_name=queuedtracking-%(process_num)s
54+
55+
#change the number according to how many worker(s) you have
56+
numprocs=16
57+
58+
numprocs_start=0
59+
stopsignal=TERM
60+
autostart=true
61+
autorestart=true
62+
stopwaitsecs=120
63+
#priority=1000
64+
stdout_logfile=/dev/null
65+
stdout_logfile_maxbytes=0
66+
redirect_stderr=true
67+
```
68+
4669
__Can I keep track of the state of the queue?__
4770

4871
Yes, you can. Just execute the command `./console queuedtracking:monitor`. This will show the current state of the queue. To exit this command you can for example press `CTRL + C` key at the same time.

lang/ca.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"NumRequestsToProcessFieldHelp": "Defineix quantes sol·licituds es recolliran de la cua i es processaran alhora. Introduïu un número que sigui &gt;= 1.",
1111
"NumRequestsToProcessFieldTitle": "Nombre de sol·licituds que es processen en un lot",
1212
"NumberOfQueueWorkersFieldHelp": "Nombre màxim de treballadors de cua permesos. Accepta un número entre 1 i 16. La millor pràctica és establir el nombre de CPU que voleu que estiguin disponibles per al processament de la cua. Tingueu en compte que heu d'assegurar-vos d'iniciar els treballadors manualment. Us recomanem que no utilitzeu entre 9 i 15 treballadors, sinó que en utilitzeu 8 o 16, ja que és possible que la cua no es distribueixi uniformement en diferents cues.",
13+
"NumberOfQueueWorkersFieldHelpNew": "Nombre màxim de treballadors de cua permesos. Accepta un nombre entre 1 i 4096. La millor pràctica és establir el nombre de CPU que voleu que estiguin disponibles per al processament de la cua. Tingueu en compte que heu d'assegurar-vos d'iniciar els treballadors manualment. Us recomanem que no utilitzeu entre 9 i 15 treballadors, sinó que en feu servir 8 o 16, ja que és possible que la cua no es distribueixi uniformement en diferents cues.",
1314
"NumberOfQueueWorkersFieldTitle": "Nombre de treballadors de la cua",
1415
"ProcessDuringRequestFieldHelp": "Si està activat, processarem totes les sol·licituds d'una cua durant una sol·licitud de seguiment normal quan hi hagi prou sol·licituds a la cua. Això no retardarà la sol·licitud de seguiment. Si està desactivat, heu de configurar un cronjob que executi l'ordre de la consola %1$s./console queuedtracking:process%2$s, per exemple, cada minut per processar la cua.",
1516
"ProcessDuringRequestFieldTitle": "Procés durant la sol·licitud de seguiment",

0 commit comments

Comments
 (0)