From 6b21227ff1eb248c788750dc1b9afa3d47bbc5ec Mon Sep 17 00:00:00 2001 From: Jonathan Jefferies Date: Mon, 10 Jul 2017 22:27:57 +0100 Subject: [PATCH 1/3] Better Keep Alive support --- examples/config.php.example | 4 +++- src/Connector.php | 21 +++++++++++-------- src/packet/Connect.php | 35 ++++++++++++++++++++++++-------- src/packet/ConnectionOptions.php | 10 +++++++++ 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/examples/config.php.example b/examples/config.php.example index 8d249b2..c95b691 100644 --- a/examples/config.php.example +++ b/examples/config.php.example @@ -3,7 +3,9 @@ $config = array( 'server' => 'yourMqttBroker.tld', 'port' => 1883, - 'options' => null, + 'options' => new \oliverlorenz\reactphpmqtt\packet\ConnectionOptions(array( + 'keepAlive' => 120, + )), ); return $config; diff --git a/src/Connector.php b/src/Connector.php index 2400126..4d41a97 100644 --- a/src/Connector.php +++ b/src/Connector.php @@ -76,8 +76,8 @@ public function create( ->then(function (Stream $stream) { return $this->listenForPackets($stream); }) - ->then(function(Stream $stream) { - return $this->keepAlive($stream); + ->then(function(Stream $stream) use ($options) { + return $this->keepAlive($stream, $options->keepAlive); }); } @@ -119,12 +119,16 @@ private function listenForPackets(Stream $stream) return $deferred->promise(); } - private function keepAlive(Stream $stream) + private function keepAlive(Stream $stream, $keepAlive) { - $this->getLoop()->addPeriodicTimer(10, function(Timer $timer) use ($stream) { - $packet = new PingRequest($this->version); - $this->sendPacketToStream($stream, $packet); - }); + if($keepAlive > 0) { + $interval = (int) ($keepAlive / 2); + + $this->getLoop()->addPeriodicTimer($interval, function(Timer $timer) use ($stream) { + $packet = new PingRequest($this->version); + $this->sendPacketToStream($stream, $packet); + }); + } return new FulfilledPromise($stream); } @@ -142,7 +146,8 @@ public function connect(Stream $stream, ConnectionOptions $options) { $options->willTopic, $options->willMessage, $options->willQos, - $options->willRetain + $options->willRetain, + $options->keepAlive ); $message = $packet->get(); echo MessageHelper::getReadableByRawString($message); diff --git a/src/packet/Connect.php b/src/packet/Connect.php index 2591933..6c41af3 100644 --- a/src/packet/Connect.php +++ b/src/packet/Connect.php @@ -39,6 +39,9 @@ class Connect extends ControlPacket { /** @var null */ protected $willRetain; + /** @var int */ + private $keepAlive; + /** * @param Version $version * @param string|null $username @@ -49,6 +52,7 @@ class Connect extends ControlPacket { * @param string|null $willMessage * @param bool|null $willQos * @param null $willRetain + * @param int $keepAlive */ public function __construct( Version $version, @@ -59,7 +63,9 @@ public function __construct( $willTopic = null, $willMessage = null, $willQos = null, - $willRetain = null + $willRetain = null, +// $keepAlive = 0 + $keepAlive = 10 ) { parent::__construct($version); $this->clientId = $clientId; @@ -70,6 +76,7 @@ public function __construct( $this->willMessage = $willMessage; $this->willQos = boolval($willQos); $this->willRetain = $willRetain; + $this->keepAlive = $keepAlive; $this->buildPayload(); } @@ -101,14 +108,24 @@ public static function getControlPacketType() */ protected function getVariableHeader() { - return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1 - . chr(strlen($this->version->getProtocolIdentifierString())) // byte 2 - . $this->version->getProtocolIdentifierString() // byte 3,4,5,6 - . chr($this->version->getProtocolVersion()) // byte 7 - . chr($this->getConnectFlags()) // byte 8 - . chr(0) // byte 9 - . chr(10) // byte 10 - ; + return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1 + . chr(strlen($this->version->getProtocolIdentifierString())) // byte 2 + . $this->version->getProtocolIdentifierString() // byte 3,4,5,6 + . chr($this->version->getProtocolVersion()) // byte 7 + . chr($this->getConnectFlags()) // byte 8 + . $this->getKeepAlive(); // byte 9,10 + } + + /** + * @return string + */ + private function getKeepAlive() + { + $msb = $this->keepAlive >> 8; + $lsb = $this->keepAlive % 256; + + return chr($msb) + . chr($lsb); } /** diff --git a/src/packet/ConnectionOptions.php b/src/packet/ConnectionOptions.php index 1cb1abd..7b3aa3b 100644 --- a/src/packet/ConnectionOptions.php +++ b/src/packet/ConnectionOptions.php @@ -100,6 +100,16 @@ class ConnectionOptions */ public $willRetain = false; + /** + * The Keep Alive is a time interval measured in seconds. + * + * @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Keep_Alive + * + * @var int + */ +// public $keepAlive = 0; + public $keepAlive = 10; + /** * ConnectionOptions constructor. * From cdcb0df601067619e378a12967cbf3b907327aad Mon Sep 17 00:00:00 2001 From: Jonathan Jefferies Date: Tue, 11 Jul 2017 21:11:19 +0100 Subject: [PATCH 2/3] Use zero as default Keep Alive --- src/Connector.php | 2 +- src/packet/Connect.php | 3 +-- src/packet/ConnectionOptions.php | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Connector.php b/src/Connector.php index 4d41a97..fc8bd7f 100644 --- a/src/Connector.php +++ b/src/Connector.php @@ -122,7 +122,7 @@ private function listenForPackets(Stream $stream) private function keepAlive(Stream $stream, $keepAlive) { if($keepAlive > 0) { - $interval = (int) ($keepAlive / 2); + $interval = $keepAlive / 2; $this->getLoop()->addPeriodicTimer($interval, function(Timer $timer) use ($stream) { $packet = new PingRequest($this->version); diff --git a/src/packet/Connect.php b/src/packet/Connect.php index 6c41af3..0b52660 100644 --- a/src/packet/Connect.php +++ b/src/packet/Connect.php @@ -64,8 +64,7 @@ public function __construct( $willMessage = null, $willQos = null, $willRetain = null, -// $keepAlive = 0 - $keepAlive = 10 + $keepAlive = 0 ) { parent::__construct($version); $this->clientId = $clientId; diff --git a/src/packet/ConnectionOptions.php b/src/packet/ConnectionOptions.php index 7b3aa3b..67c4eb3 100644 --- a/src/packet/ConnectionOptions.php +++ b/src/packet/ConnectionOptions.php @@ -107,8 +107,7 @@ class ConnectionOptions * * @var int */ -// public $keepAlive = 0; - public $keepAlive = 10; + public $keepAlive = 0; /** * ConnectionOptions constructor. From 848303a447956850673eaa1dc51fde93d5928e17 Mon Sep 17 00:00:00 2001 From: Jonathan Jefferies Date: Tue, 11 Jul 2017 22:34:23 +0100 Subject: [PATCH 3/3] Expect zero as default Keep Alive --- tests/unit/packet/ConnectTest.php | 37 ++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/tests/unit/packet/ConnectTest.php b/tests/unit/packet/ConnectTest.php index b81e8b0..e5a692d 100644 --- a/tests/unit/packet/ConnectTest.php +++ b/tests/unit/packet/ConnectTest.php @@ -41,7 +41,7 @@ public function testGetHeaderTestVariableHeaderWithoutConnectFlags() chr(4) . // byte 7 chr(0) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -59,7 +59,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagsCleanSession() chr(4) . // byte 7 chr(2) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -79,7 +79,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillFlag() chr(4) . // byte 7 chr(4) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -99,7 +99,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillRetain() chr(4) . // byte 7 chr(32) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -119,7 +119,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagUsername() chr(4) . // byte 7 chr(128) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -139,7 +139,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagPassword() chr(4) . // byte 7 chr(64) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -159,7 +159,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillWillQos() chr(4) . // byte 7 chr(8) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) ); @@ -179,7 +179,28 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagUserNamePasswordCl chr(4) . // byte 7 chr(194) . // byte 8 chr(0) . // byte 9 - chr(10) // byte 10 + chr(0) // byte 10 + ), + MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) + ); + } + + public function testBytesNineAndTenOfVariableHeaderAreKeepAlive() + { + $version = new \oliverlorenz\reactphpmqtt\protocol\Version4(); + $packet = new \oliverlorenz\reactphpmqtt\packet\Connect( + $version, null, null, null, true, null, null, null, null, 999 + ); + + $this->assertEquals( + MessageHelper::getReadableByRawString( + chr(0) . // byte 1 + chr(4) . // byte 2 + 'MQTT' . // byte 3,4,5,6 + chr(4) . // byte 7 + chr(2) . // byte 8 + chr(3) . // byte 9 + chr(231) // byte 10 ), MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10)) );