Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.3.9 - Statistic migration #2145

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/Storage/PdoStorageService.php
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ public function exists($sql, $params, $connection = 'default', $reconnect = fals
$this->close($connection);
}

return $exists;
if ($exists) {
return true;
} else {
return false;
}
} catch (\PDOException $PDOException) {
// Throw if we're not expected to reconnect.
if (!$reconnect) {
Expand Down Expand Up @@ -329,7 +333,6 @@ public function select($sql, $params, $connection = 'default', $reconnect = fals
$this->close($connection);
}
return $records;

} catch (\PDOException $PDOException) {
// Throw if we're not expected to reconnect.
if (!$reconnect) {
Expand Down
78 changes: 31 additions & 47 deletions lib/XTR/StatsMigrationTask.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<?php
/**
* Copyright (C) 2020 Xibo Signage Ltd
/*
* Copyright (C) 2023 Xibo Signage Ltd
*
* Xibo - Digital Signage - http://www.xibo.org.uk
* Xibo - Digital Signage - https://xibosignage.com
*
* This file is part of Xibo.
*
Expand All @@ -21,6 +21,7 @@
*/

namespace Xibo\XTR;

use Carbon\Carbon;
use Xibo\Entity\Task;
use Xibo\Factory\DisplayFactory;
Expand Down Expand Up @@ -94,7 +95,6 @@ public function migrateStats()
}

if ($options['killSwitch'] == 0) {

// Stat Archive Task
$this->archiveTask = $this->taskFactory->getByClass('\Xibo\XTR\\StatsArchiveTask');

Expand All @@ -107,21 +107,19 @@ public function migrateStats()
$timeSeriesStore = $this->timeSeriesStore->getEngine();

if ($timeSeriesStore == 'mongodb') {

// If no records in both the stat and stat_archive then disable the task
$statSql = $this->store->getConnection()->prepare('SELECT statId FROM stat LIMIT 1');
$statSql->execute();

$statArchiveSqlCount = 0;
if ( $this->archiveExist === true) {
if ($this->archiveExist) {
/** @noinspection SqlResolve */
$statArchiveSql = $this->store->getConnection()->prepare('SELECT statId FROM stat_archive LIMIT 1');
$statArchiveSql->execute();
$statArchiveSqlCount = $statArchiveSql->rowCount();
}

if(($statSql->rowCount() == 0) && ($statArchiveSqlCount == 0)) {

if (($statSql->rowCount() == 0) && ($statArchiveSqlCount == 0)) {
$this->runMessage = '## Stat migration to Mongo' . PHP_EOL ;
$this->appendRunMessage('- Both stat_archive and stat is empty. '. PHP_EOL);

Expand All @@ -138,10 +136,8 @@ public function migrateStats()
$this->log->debug('Enabling Stats Archive Task.');
}
} else {

// if any of the two tables contain any records
$this->quitMigrationTaskOrDisableStatArchiveTask();

}

$this->moveStatsToMongoDb($options);
Expand All @@ -151,15 +147,13 @@ public function migrateStats()
// and there isn't a stat_archive table, then it should disable itself and not run again
// (work is considered to be done at that point).
else {

if ($this->archiveExist == true) {
if ($this->archiveExist) {
$this->runMessage = '## Moving from stat_archive to stat (MySQL)' . PHP_EOL;

$this->quitMigrationTaskOrDisableStatArchiveTask();

// Start migration
$this->moveStatsFromStatArchiveToStatMysql($options);

} else {
// Disable the task

Expand Down Expand Up @@ -214,7 +208,6 @@ public function moveStatsFromStatArchiveToStatMysql($options)

// End of records
if ($this->checkEndOfRecords($recordCount, $fileName) === true) {

$this->appendRunMessage(PHP_EOL. '# End of records.' . PHP_EOL. '- Dropping stat_archive.');
$this->log->debug('End of records in stat_archive (migration to MYSQL). Dropping table.');

Expand Down Expand Up @@ -250,7 +243,6 @@ public function moveStatsFromStatArchiveToStatMysql($options)

$statIgnoredCount = 0;
foreach ($stats->fetchAll() as $stat) {

$watermark = $stat['statId'];

$columns = 'type, statDate, scheduleId, displayId, campaignId, layoutId, mediaId, widgetId, `start`, `end`, tag, duration, `count`';
Expand All @@ -260,7 +252,7 @@ public function moveStatsFromStatArchiveToStatMysql($options)
if (($stat['type'] != 'event') && ($stat['layoutId'] != null)) {
try {
// Search the campaignId in the temp array first to reduce query in layouthistory
if (array_key_exists($stat['layoutId'], $temp) ) {
if (array_key_exists($stat['layoutId'], $temp)) {
$campaignId = $temp[$stat['layoutId']];
} else {
$campaignId = $this->layoutFactory->getCampaignIdFromLayoutHistory($stat['layoutId']);
Expand Down Expand Up @@ -294,7 +286,6 @@ public function moveStatsFromStatArchiveToStatMysql($options)
// Do the insert
$this->store->insert('INSERT INTO `stat` (' . $columns . ') VALUES (' . $values . ')', $params);
$this->store->commitIfNecessary();

}

if ($statIgnoredCount > 0) {
Expand All @@ -319,12 +310,13 @@ public function moveStatsToMongoDb($options)

// Migration from stat_archive table to Mongo
// After migration delete only stat_archive
if ($this->archiveExist == true) {
if ($this->archiveExist) {
$this->migrationStatArchiveToMongo($options);
}
}

function migrationStatToMongo($options) {
private function migrationStatToMongo($options)
{

$this->appendRunMessage('## Moving from stat to Mongo');

Expand All @@ -351,7 +343,6 @@ function migrationStatToMongo($options) {

// End of records
if ($this->checkEndOfRecords($recordCount, $fileName) === true) {

$this->appendRunMessage(PHP_EOL. '# End of records.' . PHP_EOL. '- Truncating and Optimising stat.');
$this->log->debug('End of records in stat table. Truncate and Optimise.');

Expand All @@ -376,7 +367,6 @@ function migrationStatToMongo($options) {

$statCount = 0;
foreach ($stats->fetchAll() as $stat) {

// We get the watermark now because if we skip the records our watermark will never reach 0
$watermark = $stat['statId'];

Expand Down Expand Up @@ -426,7 +416,8 @@ function migrationStatToMongo($options) {
}
}

function migrationStatArchiveToMongo($options) {
private function migrationStatArchiveToMongo($options)
{

$this->appendRunMessage(PHP_EOL. '## Moving from stat_archive to Mongo');
$fileName = $this->config->getSetting('LIBRARY_LOCATION') . '.watermark_stat_archive_mongo.txt';
Expand Down Expand Up @@ -457,7 +448,6 @@ function migrationStatArchiveToMongo($options) {

// End of records
if ($this->checkEndOfRecords($recordCount, $fileName) === true) {

$this->appendRunMessage(PHP_EOL. '# End of records.' . PHP_EOL. '- Dropping stat_archive.');
$this->log->debug('End of records in stat_archive (migration to Mongo). Dropping table.');

Expand All @@ -480,7 +470,6 @@ function migrationStatArchiveToMongo($options) {
$statIgnoredCount = 0;
$statCount = 0;
foreach ($stats->fetchAll() as $stat) {

// We get the watermark now because if we skip the records our watermark will never reach 0
$watermark = $stat['statId'];

Expand All @@ -499,7 +488,7 @@ function migrationStatArchiveToMongo($options) {
if (($stat['type'] != 'event') && ($stat['layoutId'] != null)) {
try {
// Search the campaignId in the temp array first to reduce query in layouthistory
if (array_key_exists($stat['layoutId'], $temp) ) {
if (array_key_exists($stat['layoutId'], $temp)) {
$campaignId = $temp[$stat['layoutId']];
} else {
$campaignId = $this->layoutFactory->getCampaignIdFromLayoutHistory($stat['layoutId']);
Expand Down Expand Up @@ -534,7 +523,6 @@ function migrationStatArchiveToMongo($options) {
// Add stats in store $this->stats
$this->timeSeriesStore->addStat($entry);
$statCount++;

}

if ($statIgnoredCount > 0) {
Expand All @@ -551,8 +539,7 @@ function migrationStatArchiveToMongo($options) {

// Give Mongo time to recover
if ($watermark > 0) {

if($statCount > 0 ) {
if ($statCount > 0) {
$this->appendRunMessage('- '. $count. ' rows processed. ' . $statCount. ' rows migrated');
$this->log->debug('Mongo stats migration from stat_archive. '.$count.' rows effected, sleeping.');
}
Expand All @@ -562,17 +549,15 @@ function migrationStatArchiveToMongo($options) {
}

// Get low watermark from file
function getWatermarkFromFile($fileName, $tableName) {
private function getWatermarkFromFile($fileName, $tableName)
{

if (file_exists($fileName)) {

$file = fopen($fileName, 'r');
$line = fgets($file);
fclose($file);
$watermark = (int) $line;

} else {

// Save mysql low watermark in file if .watermark.txt file is not found
/** @noinspection SqlResolve */
$statId = $this->store->select('SELECT MAX(statId) as statId FROM '.$tableName, []);
Expand All @@ -591,9 +576,10 @@ function getWatermarkFromFile($fileName, $tableName) {
}

// Check if end of records
function checkEndOfRecords($recordCount, $fileName) {
private function checkEndOfRecords($recordCount, $fileName)
{

if($recordCount == 0) {
if ($recordCount == 0) {
// No records in stat, save watermark in file
$watermark = -1;

Expand All @@ -608,10 +594,10 @@ function checkEndOfRecords($recordCount, $fileName) {
}

// Check loop limits
function checkLoopLimits($numberOfLoops, $optionsNumberOfLoops, $fileName, $watermark) {

if($numberOfLoops == $optionsNumberOfLoops) {
private function checkLoopLimits($numberOfLoops, $optionsNumberOfLoops, $fileName, $watermark)
{

if ($numberOfLoops == $optionsNumberOfLoops) {
// Save watermark in file
$watermark = $watermark - 1;
$this->log->debug(' Loop reached limit. Watermark is now '.$watermark);
Expand All @@ -627,7 +613,8 @@ function checkLoopLimits($numberOfLoops, $optionsNumberOfLoops, $fileName, $wate
}

// Disable the task
function disableTask() {
private function disableTask()
{

$this->appendRunMessage('# Disabling task.');
$this->log->debug('Disabling task.');
Expand All @@ -641,7 +628,8 @@ function disableTask() {
}

// Disable the task
function quitMigrationTaskOrDisableStatArchiveTask() {
private function quitMigrationTaskOrDisableStatArchiveTask()
{

// Quit the migration task if stat archive task is running
if ($this->archiveTask->status == Task::$STATUS_RUNNING) {
Expand All @@ -664,32 +652,28 @@ function quitMigrationTaskOrDisableStatArchiveTask() {
}

// Cahce/Get display
function getDisplay($displayId) {
private function getDisplay($displayId)
{

// Get display if in memory
if (array_key_exists($displayId, $this->displays)) {
$display = $this->displays[$displayId];
} else if (array_key_exists($displayId, $this->displaysNotFound)) {
} elseif (array_key_exists($displayId, $this->displaysNotFound)) {
// Display not found
return false;

} else {

try {
$display = $this->displayFactory->getById($displayId);

// Cache display
$this->displays[$displayId] = $display;

} catch (NotFoundException $error) {

// Cache display not found
$this->displaysNotFound[$displayId] = $displayId;
return false;
}
}

return $display;

}
}
}