Skip to content

Commit

Permalink
Merge pull request #26 from emoncms/low_write
Browse files Browse the repository at this point in the history
Low write
  • Loading branch information
TrystanLea authored Jul 13, 2019
2 parents 1854bef + 87dd9a6 commit 3f3e70c
Show file tree
Hide file tree
Showing 9 changed files with 1,378 additions and 1,278 deletions.
98 changes: 52 additions & 46 deletions demandshaper-module/MQTTRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,62 @@ public function __construct($mqtt_server)

public function request($topic,$payload,$result_topic)
{
$this->request = new stdClass();
$this->request->topic = $topic;
$this->request->payload = $payload;

$this->result = new stdClass();
$this->result->topic = $result_topic;
$this->result->payload = false;

$this->client = new Mosquitto\Client();

$this->client->onConnect(function($r, $message){
$this->connect($r, $message);
});
$this->client->onDisconnect(function(){
$this->disconnect();
});
$this->client->onSubscribe(function(){
$this->subscribe();
});
$this->client->onMessage(function($message){
$this->message($message);
});

$this->state = 0; // 0: startfetch
// 1: connected
// 2: subscribed
// 3: complete
try {
$this->request = new stdClass();
$this->request->topic = $topic;
$this->request->payload = $payload;

$this->result = new stdClass();
$this->result->topic = $result_topic;
$this->result->payload = false;

$this->client = new Mosquitto\Client();

$this->client->onConnect(function($r, $message){
$this->connect($r, $message);
});
$this->client->onDisconnect(function(){
$this->disconnect();
});
$this->client->onSubscribe(function(){
$this->subscribe();
});
$this->client->onMessage(function($message){
$this->message($message);
});

$this->state = 0; // 0: startfetch
// 1: connected
// 2: subscribed
// 3: complete

$this->client->setCredentials($this->username,$this->password);
$this->client->connect($this->host, $this->port, 5);

$start = time();
while((time()-$start)<10.0) {
try {
$this->client->loop(10);
} catch (Exception $e) {
if ($this->state) return "error: ".$e;
$this->client->setCredentials($this->username,$this->password);
$this->client->connect($this->host, $this->port, 5);

$start = time();
while((time()-$start)<10.0) {
try {
$this->client->loop(10);
} catch (Exception $e) {
if ($this->state) return "error: ".$e;
}

if ((time()-$start)>=3.0) {
$this->client->disconnect();
}

if ($this->state==3) break;

usleep(50000);
}

if ((time()-$start)>=3.0) {
$this->client->disconnect();
if ($this->result->payload) {
return $this->result->payload;
} else {
return "API Timeout";
}

if ($this->state==3) break;
}

if ($this->result->payload) {
return $this->result->payload;
} else {
return "API Timeout";
} catch (Exception $e) {
return "Mosquitto error";
}
}

Expand Down
184 changes: 126 additions & 58 deletions demandshaper-module/demandshaper_controller.php

Large diffs are not rendered by default.

80 changes: 59 additions & 21 deletions demandshaper-module/demandshaper_model.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ class DemandShaper
{
private $mysqli;
private $redis;
private $log;

public function __construct($mysqli,$redis)
{
$this->log = new EmonLogger(__FILE__);
$this->mysqli = $mysqli;
$this->redis = $redis;
}
Expand All @@ -45,36 +47,58 @@ public function set($userid,$schedules)
{
// Basic validation
$userid = (int) $userid;
$schedules = json_encode($schedules);

$result = $this->mysqli->query("SELECT `userid` FROM demandshaper WHERE `userid`='$userid'");
if ($result->num_rows) {
$stmt = $this->mysqli->prepare("UPDATE demandshaper SET `schedules`=? WHERE `userid`=?");
$stmt->bind_param("si",$schedules,$userid);
if (!$stmt->execute()) {
return array('success'=>false, 'message'=>"Error saving demandshaper settings");
}

$this->redis->set("schedules",$schedules);
return array('success'=>true);

} else {
$stmt = $this->mysqli->prepare("INSERT INTO demandshaper (`userid`,`schedules`) VALUES (?,?)");
$stmt->bind_param("is", $userid,$schedules);
if (!$stmt->execute()) {
return array('success'=>false, 'message'=>"Error saving demandshaper settings");
if ($schedules_old = $this->redis->get("demandshaper:schedules")) {
$schedules_old = json_decode($schedules_old);
}
$this->redis->set("demandshaper:schedules",json_encode($schedules));

// remove runtime settings
$schedules_to_disk = json_decode(json_encode($schedules));
foreach ($schedules_to_disk as $device=>$schedule) {
unset($schedules_to_disk->$device->runtime);
}

// remove runtime settings
$last_schedules_to_disk = $schedules_old;
foreach ($last_schedules_to_disk as $device=>$schedule) {
unset($last_schedules_to_disk->$device->runtime);
}

if (json_encode($schedules_to_disk)!=json_encode($last_schedules_to_disk)) {

$schedules_to_disk = json_encode($schedules_to_disk);

$result = $this->mysqli->query("SELECT `userid` FROM demandshaper WHERE `userid`='$userid'");
if ($result->num_rows) {
$stmt = $this->mysqli->prepare("UPDATE demandshaper SET `schedules`=? WHERE `userid`=?");
$stmt->bind_param("si",$schedules_to_disk,$userid);
if (!$stmt->execute()) {
return array('success'=>false, 'message'=>"Error saving demandshaper settings");
}
$this->log->error("Saved to disk");
return array('success'=>true, 'message'=>"Saved to disk");

} else {
$stmt = $this->mysqli->prepare("INSERT INTO demandshaper (`userid`,`schedules`) VALUES (?,?)");
$stmt->bind_param("is", $userid,$schedules_to_disk);
if (!$stmt->execute()) {
return array('success'=>false, 'message'=>"Error saving demandshaper settings");
}
$this->log->error("Saved to disk");
return array('success'=>true, 'message'=>"Saved to disk");
}
$this->redis->set("schedules",$schedules);
return array('success'=>true);
}
$this->log->info("Saved to redis only");
return array('success'=>true, 'message'=>"Saved to redis only");
}

public function get($userid)
{
$userid = (int) $userid;

// Attempt first to load from cache
$schedulesjson = $this->redis->get("schedules");
$schedulesjson = $this->redis->get("demandshaper:schedules");

if ($schedulesjson) {
$schedules = json_decode($schedulesjson);
Expand All @@ -83,14 +107,28 @@ public function get($userid)
$result = $this->mysqli->query("SELECT schedules FROM demandshaper WHERE `userid`='$userid'");
if ($row = $result->fetch_object()) {
$schedules = json_decode($row->schedules);
$this->redis->set("schedules",json_encode($schedules));
foreach ($schedules as $device=>$schedule) {
$schedules->$device->runtime = new stdClass();
$schedules->$device->runtime->timeleft = 0;
$schedules->$device->runtime->periods = array();
}
$this->redis->set("demandshaper:schedules",json_encode($schedules));
} else {
$schedules = new stdClass();
}
}

if (!$schedules || !is_object($schedules)) $schedules = new stdClass();

foreach ($schedules as $device=>$schedule) {
if (!isset($schedules->$device->runtime)) {
$schedules->$device->runtime = new stdClass();
$schedules->$device->runtime->timeleft = 0;
$schedules->$device->runtime->periods = array();
}
}


return $schedules;
}
}
Loading

0 comments on commit 3f3e70c

Please sign in to comment.