Skip to content

Commit 08647ea

Browse files
Merge pull request #261 from haristku/feature/enhance_monitor_and_processor
enhanced monitor, process commands
2 parents 16356ea + 470e5e1 commit 08647ea

3 files changed

Lines changed: 221 additions & 34 deletions

File tree

Commands/Monitor.php

Lines changed: 126 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ class Monitor extends ConsoleCommand
1919
protected function configure()
2020
{
2121
$this->setName('queuedtracking:monitor');
22-
$this->setDescription('Shows and updates the current state of the queue every 2 seconds.');
22+
$this->setDescription("Shows and updates the current state of the queue every 2 seconds.\n Key ,=first page, .=last page, 0-9=move to page section, arrow LEFT=prev page, RIGHT=next page, UP=next 10 pages, DOWN=prev 10 pages, q=quit");
2323
$this->addRequiredValueOption('iterations', null, 'If set, will limit the number of monitoring iterations done.');
24+
$this->addRequiredValueOption('perpage', 'p', 'Number of queue worker displayed per page.', 16);
2425
}
2526

2627
/**
@@ -36,6 +37,9 @@ protected function doExecute(): int
3637
$systemCheck->checkRedisIsInstalled();
3738
}
3839

40+
$output->write(str_repeat("\r\n", 100));
41+
$output->write("\e[".(100)."A");
42+
3943
$iterations = $this->getIterationsFromArg();
4044
if ($iterations !== null) {
4145
$output->writeln("<info>Only running " . $iterations . " iterations.</info>");
@@ -58,34 +62,118 @@ protected function doExecute(): int
5862
$output->writeln('The command <comment>./console queuedtracking:process</comment> has to be executed to process request sets within queue');
5963
}
6064

61-
$output->writeln(sprintf('Up to %d workers will be used', $manager->getNumberOfAvailableQueues()));
62-
$output->writeln(sprintf('Processor will start once there are at least %s request sets in the queue',
65+
$output->writeln(sprintf('Up to <info>%d</> workers will be used', $manager->getNumberOfAvailableQueues()));
66+
$output->writeln(sprintf('Processor will start once there are at least <info>%s</> request sets in the queue',
6367
$manager->getNumberOfRequestsToProcessAtSameTime()));
6468
$iterationCount = 0;
69+
70+
$qCurrentPage = 1;
71+
$qCount = count($queues);
72+
$qPerPAge = min(max($this->getPerPageFromArg(), 1), $qCount);
73+
$qPageCount = ceil($qCount / $qPerPAge);
74+
75+
readline_callback_handler_install('', function() {});
76+
stream_set_blocking (STDIN, false);
77+
78+
$output->writeln(str_repeat("-", 30));
79+
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" Q INDEX", 10).str_pad(" | REQUEST SETS", 20)."</>");
80+
$output->writeln(str_repeat("-", 30));
81+
82+
$lastStatsTimer = microtime(true) - 2;
83+
$lastSumInQueue = false;
84+
$diffSumInQueue = 0;
85+
$keyPressed = "";
86+
87+
$output->write(str_repeat("\r\n", $qPerPAge + 5));
6588

6689
while (1) {
67-
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
90+
if (microtime(true) - $lastStatsTimer >= 2 || $keyPressed != "")
91+
{
92+
$output->write("\e[".($qPerPAge + 5)."A");
93+
94+
$qCurrentPage = min(max($qCurrentPage, 1), $qPageCount);
95+
$memory = $backend->getMemoryStats(); // I know this will only work with redis currently as it is not defined in backend interface etc. needs to be refactored once we add another backend
96+
97+
$sumInQueue = 0;
98+
foreach ($queues as $sumQ) {
99+
$sumInQueue += $sumQ->getNumberOfRequestSetsInQueue();
100+
}
101+
102+
if ($lastSumInQueue !== false) {
103+
$diffSumInQueue = $lastSumInQueue - $sumInQueue;
104+
$diffRps = round($diffSumInQueue / (microtime(true) - $lastStatsTimer), 2);
105+
$diffSumInQueue = $diffSumInQueue < 0 ? "<fg=red;options=bold>".abs($diffRps)."</>" : "<fg=green;options=bold>{$diffRps}</>";
106+
}
107+
108+
$numInQueue = 0;
109+
for ($idxPage = 0; $idxPage < $qPerPAge; $idxPage++) {
110+
$idx = ($qCurrentPage - 1) * $qPerPAge + $idxPage;
111+
if (isset($queues[$idx])) {
112+
$q = $queues[$idx]->getNumberOfRequestSetsInQueue();
113+
$numInQueue += (int)$q;
114+
$output->writeln(str_pad($idx, 10, " ", STR_PAD_LEFT)." | ".str_pad(number_format($q), 16, " ", STR_PAD_LEFT));
115+
} else {
116+
$output->writeln(str_pad("", 10)." | ".str_pad("", 16));
117+
}
118+
}
119+
120+
$output->writeln(str_repeat("-", 30));
121+
$output->writeln("<fg=black;bg=white;options=bold>".str_pad(" ".($qCount)." Q", 10)." | ".str_pad(number_format($sumInQueue)." R", 16)."</>");
122+
$output->writeln(str_repeat("-", 30));
123+
$output->writeln(sprintf(
124+
"Q [%s-%s] | <info>page %s/%s</> | <comment>press (0-9.,q) or arrow(L,R,U,D)</> | diff/sec %s \n".
125+
"%s used memory (%s peak). <info>%d</> workers active.".str_repeat(" ", 15),
126+
($idx - $qPerPAge + 1),
127+
$idx, $qCurrentPage, $qPageCount, $diffSumInQueue,
128+
$memory['used_memory_human'] ?? 'Unknown',
129+
$memory['used_memory_peak_human'] ?? 'Unknown',
130+
$lock->getNumberOfAcquiredLocks()
131+
));
132+
133+
if (!is_null($iterations)) {
134+
$iterationCount += 1;
135+
if ($iterationCount >= $iterations) {
136+
break;
137+
}
138+
}
68139

69-
$numInQueue = array();
70-
foreach ($queues as $queue) {
71-
$numInQueue[] = $queue->getNumberOfRequestSetsInQueue();
140+
$lastSumInQueue = $sumInQueue;
141+
$lastStatsTimer = microtime(true);
72142
}
73143

74-
$message = sprintf('%s (%s) request sets left in queue. %s used memory (%s peak). %d workers active. ',
75-
array_sum($numInQueue),
76-
implode('+', $numInQueue),
77-
$memory['used_memory_human'] ?? 'Unknown',
78-
$memory['used_memory_peak_human'] ?? 'unknown',
79-
$lock->getNumberOfAcquiredLocks());
80-
$output->write("\x0D");
81-
$output->write($message);
82-
if (!is_null($iterations)) {
83-
$iterationCount += 1;
84-
if ($iterationCount >= $iterations) {
85-
break;
144+
$keyStroke = stream_get_contents(STDIN, 3);
145+
$keyPressed = strlen($keyStroke) == 3 ? $keyStroke[2] : (strlen($keyStroke) > 0 ? $keyStroke[0] : "");
146+
if ($keyPressed != "" and in_array($keyPressed, array(".", ",", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "q"))) {
147+
switch ($keyPressed) {
148+
case "0": case "1": case "2": case "3": case "4":
149+
case "5": case "6": case "7": case "8": case "9":
150+
$keyPressed = $keyPressed != "0" ? $keyPressed : "10";
151+
$qCurrentPage = floor(($qCurrentPage - 0.1) / 10) * 10 + (int)$keyPressed; break;
152+
case "C":
153+
$qCurrentPage++;
154+
break;
155+
case "D":
156+
$qCurrentPage--;
157+
break;
158+
case "A":
159+
$qCurrentPage += 10;
160+
break;
161+
case "B":
162+
$qCurrentPage -= 10;
163+
break;
164+
case ",":
165+
$qCurrentPage = 1;
166+
break;
167+
case ".":
168+
$qCurrentPage = $qPageCount;
169+
break;
170+
case "q":
171+
$output->writeln('');
172+
die;
86173
}
87174
}
88-
sleep(2);
175+
176+
usleep(5000);
89177
}
90178

91179
return self::SUCCESS;
@@ -112,4 +200,22 @@ private function getIterationsFromArg()
112200
return $iterations;
113201
}
114202

203+
/**
204+
* Loads the `perpage` argument from the commands arguments.
205+
*
206+
* @return int|null
207+
*/
208+
private function getPerPageFromArg()
209+
{
210+
$perPage = $this->getInput()->getOption('perpage');
211+
if (!is_numeric($perPage)) {
212+
throw new \Exception('perpage needs to be numeric');
213+
} else {
214+
$perPage = (int)$perPage;
215+
if ($perPage <= 0) {
216+
throw new \Exception('perpage needs to be a non-zero positive number');
217+
}
218+
}
219+
return $perPage;
220+
}
115221
}

Commands/Process.php

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ protected function configure()
2626
$this->setName('queuedtracking:process');
2727
$this->addRequiredValueOption('queue-id', null, 'If set, will only work on that specific queue. For example "0" or "1" (if there are multiple queues). Not recommended when only one worker is in use. If for example 4 workers are in use, you may want to use 0, 1, 2, or 3.');
2828
$this->addRequiredValueOption('force-num-requests-process-at-once', null, 'If defined, it overwrites the setting of how many requests will be picked out of the queue and processed at once. Must be a number which is >= 1. By default, the configured value from the settings will be used. This can be useful for example if you want to process every single request within the queue. If otherwise a batch size of say 100 is configured, then there may be otherwise 99 requests left in the queue. It can be also useful for testing purposes.');
29+
$this->addRequiredValueOption('cycle', 'c', 'The proccess will automatically loop for "n" cycle time(s), set "0" to infinite.', 1);
30+
$this->addRequiredValueOption('sleep', 's', 'Take a nap for "n" second(s) before recycle, minimum is 1 second.', 1);
31+
$this->addRequiredValueOption('delay', 'd', 'Delay before finished', 0);
2932
$this->setDescription('Processes all queued tracking requests in case there are enough requests in the queue and in case they are not already in process by another script. To keep track of the queue use the <comment>--verbose</comment> option or execute the <comment>queuedtracking:monitor</comment> command.');
3033
}
3134

@@ -76,29 +79,84 @@ protected function doExecute(): int
7679
throw new \Exception('Number of requests to process must be a number and at least 1');
7780
}
7881

79-
$output->writeln("<info>Starting to process request sets, this can take a while</info>");
80-
8182
register_shutdown_function(function () use ($queueManager) {
8283
$queueManager->unlock();
8384
});
8485

85-
$startTime = microtime(true);
86-
$processor = new Processor($queueManager);
87-
$processor->setNumberOfMaxBatchesToProcess(500);
88-
$tracker = $processor->process();
8986

90-
$neededTime = (microtime(true) - $startTime);
91-
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
92-
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);
9387

94-
Piwik::postEvent('Tracker.end');
88+
$numberOfProcessCycle = $input->getOption('cycle');
89+
if (!is_numeric($numberOfProcessCycle)) {
90+
throw new \Exception('"cycle" needs to be numeric');
91+
}
92+
$numberOfProcessCycle = (int)$numberOfProcessCycle;
93+
$infiniteCycle = $numberOfProcessCycle == 0;
94+
95+
$delayedBeforeFinish = (int)$input->getOption('delay');
96+
97+
$napster = max(1, $input->getOption('sleep'));
98+
if (!is_numeric($napster)) {
99+
throw new \Exception('"nap" needs to be numeric');
100+
}
101+
$napster = (int)$napster;
102+
103+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
104+
$originalNumberOfRequestsToProcessAtSameTime = $queueManager->getNumberOfRequestsToProcessAtSameTime();
105+
106+
while ($numberOfProcessCycle > 0 || $infiniteCycle) {
107+
$wipingOutQueue = false;
108+
if (microtime(true) - $lastTimeGotMoreThanZeroTrackedReq > 10) {
109+
$queueManager->setNumberOfRequestsToProcessAtSameTime(1);
110+
$wipingOutQueue = true;
111+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
112+
}
113+
114+
if ($wipingOutQueue) {
115+
$output->writeln("<fg=red;bg=white;options=bold> TRYING TO WIPE OUT THE QUEUE </>");
116+
}
117+
$output->writeln("<info>Starting to process request sets, this can take a while</info>");
118+
119+
$startTime = microtime(true);
120+
$processor = new Processor($queueManager);
121+
$processor->setNumberOfMaxBatchesToProcess(500);
122+
$tracker = $processor->process();
123+
124+
$neededTime = (microtime(true) - $startTime);
125+
$numRequestsTracked = $tracker->getCountOfLoggedRequests();
126+
$requestsPerSecond = $this->getNumberOfRequestsPerSecond($numRequestsTracked, $neededTime);
127+
128+
$this->writeSuccessMessage(
129+
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
130+
);
131+
Piwik::postEvent('Tracker.end');
132+
133+
if ($numRequestsTracked > 0) {
134+
$lastTimeGotMoreThanZeroTrackedReq = microtime(true);
135+
}
136+
137+
if (!$infiniteCycle) {
138+
$numberOfProcessCycle--;
139+
}
140+
if ($numberOfProcessCycle > 0 || $infiniteCycle) {
141+
$cTogo = $infiniteCycle ? "infinite" : $numberOfProcessCycle;
142+
$output->writeln("===========================================================================");
143+
$output->writeln("<comment>Taking a nap for {$napster} second(s), before re-running the process. <info>({$cTogo})</info> cyle(s) to go.</comment>");
144+
$output->writeln("===========================================================================");
145+
sleep($napster);
146+
}
147+
148+
if ($wipingOutQueue) {
149+
$queueManager->setNumberOfRequestsToProcessAtSameTime($originalNumberOfRequestsToProcessAtSameTime);
150+
}
151+
}
95152

153+
// Piwik::postEvent('Tracker.end');
96154
$trackerEnvironment->destroy();
97155

98-
$this->writeSuccessMessage(
99-
array(sprintf('This worker finished queue processing with %sreq/s (%s requests in %02.2f seconds)', $requestsPerSecond, $numRequestsTracked, $neededTime))
100-
);
101-
156+
if ($delayedBeforeFinish > 0) {
157+
sleep($delayedBeforeFinish);
158+
}
159+
102160
return self::SUCCESS;
103161
}
104162

docs/faq.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ requests using the [Piwik console](http://developer.piwik.org/guides/piwik-on-th
3232
* Disable the setting "Process during tracking request" in the Piwik UI under "Settings => Plugin Settings"
3333
* Setup a cronjob that executes the command `./console queuedtracking:process` for instance every minute
3434
* That's it
35+
* Or, if you have __"non WINDOWS OS"__ you can use the [Supervisor](http://supervisord.org/) as a cron alternative.
3536

3637
The `queuedtracking:process` command will make sure to process all queued tracking requests whenever possible and the
3738
command will exit as soon as there are not enough requests queued anymore. That's why you should setup a cronjob to start
@@ -43,6 +44,28 @@ Example crontab entry that starts the processor every minute:
4344

4445
`* * * * * cd /piwik && ./console queuedtracking:process >/dev/null 2>&1`
4546

47+
Example Supervisor entry that will start 16 processors/workers with 10 loop cycle times and auto restart:
48+
49+
```ini
50+
[program:matomo]
51+
directory=/path/to/your/matomo
52+
command=/path/to/your/php /path/to/your/matomo/console queuedtracking:process --queue-id=%(process_num)s -c 10 -s 2 -d 5
53+
process_name=queuedtracking-%(process_num)s
54+
55+
#change the number according to how many worker(s) you have
56+
numprocs=16
57+
58+
numprocs_start=0
59+
stopsignal=TERM
60+
autostart=true
61+
autorestart=true
62+
stopwaitsecs=120
63+
#priority=1000
64+
stdout_logfile=/dev/null
65+
stdout_logfile_maxbytes=0
66+
redirect_stderr=true
67+
```
68+
4669
__Can I keep track of the state of the queue?__
4770

4871
Yes, you can. Just execute the command `./console queuedtracking:monitor`. This will show the current state of the queue. To exit this command you can for example press `CTRL + C` key at the same time.

0 commit comments

Comments
 (0)