Skip to content

Commit

Permalink
Merge pull request #22 from jjok/keep-alive
Browse files Browse the repository at this point in the history
Keep Alive support
  • Loading branch information
oliverlorenz authored Jul 15, 2017
2 parents 68c22a1 + 848303a commit c511e59
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 26 deletions.
4 changes: 3 additions & 1 deletion examples/config.php.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
$config = array(
'server' => 'yourMqttBroker.tld',
'port' => 1883,
'options' => null,
'options' => new \oliverlorenz\reactphpmqtt\packet\ConnectionOptions(array(
'keepAlive' => 120,
)),
);

return $config;
21 changes: 13 additions & 8 deletions src/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public function create(
->then(function (Stream $stream) {
return $this->listenForPackets($stream);
})
->then(function(Stream $stream) {
return $this->keepAlive($stream);
->then(function(Stream $stream) use ($options) {
return $this->keepAlive($stream, $options->keepAlive);
});
}

Expand Down Expand Up @@ -119,12 +119,16 @@ private function listenForPackets(Stream $stream)
return $deferred->promise();
}

private function keepAlive(Stream $stream)
private function keepAlive(Stream $stream, $keepAlive)
{
$this->getLoop()->addPeriodicTimer(10, function(Timer $timer) use ($stream) {
$packet = new PingRequest($this->version);
$this->sendPacketToStream($stream, $packet);
});
if($keepAlive > 0) {
$interval = $keepAlive / 2;

$this->getLoop()->addPeriodicTimer($interval, function(Timer $timer) use ($stream) {
$packet = new PingRequest($this->version);
$this->sendPacketToStream($stream, $packet);
});
}

return new FulfilledPromise($stream);
}
Expand All @@ -142,7 +146,8 @@ public function connect(Stream $stream, ConnectionOptions $options) {
$options->willTopic,
$options->willMessage,
$options->willQos,
$options->willRetain
$options->willRetain,
$options->keepAlive
);
$message = $packet->get();
echo MessageHelper::getReadableByRawString($message);
Expand Down
34 changes: 25 additions & 9 deletions src/packet/Connect.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class Connect extends ControlPacket {
/** @var null */
protected $willRetain;

/** @var int */
private $keepAlive;

/**
* @param Version $version
* @param string|null $username
Expand All @@ -49,6 +52,7 @@ class Connect extends ControlPacket {
* @param string|null $willMessage
* @param bool|null $willQos
* @param null $willRetain
* @param int $keepAlive
*/
public function __construct(
Version $version,
Expand All @@ -59,7 +63,8 @@ public function __construct(
$willTopic = null,
$willMessage = null,
$willQos = null,
$willRetain = null
$willRetain = null,
$keepAlive = 0
) {
parent::__construct($version);
$this->clientId = $clientId;
Expand All @@ -70,6 +75,7 @@ public function __construct(
$this->willMessage = $willMessage;
$this->willQos = boolval($willQos);
$this->willRetain = $willRetain;
$this->keepAlive = $keepAlive;
$this->buildPayload();
}

Expand Down Expand Up @@ -101,14 +107,24 @@ public static function getControlPacketType()
*/
protected function getVariableHeader()
{
return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1
. chr(strlen($this->version->getProtocolIdentifierString())) // byte 2
. $this->version->getProtocolIdentifierString() // byte 3,4,5,6
. chr($this->version->getProtocolVersion()) // byte 7
. chr($this->getConnectFlags()) // byte 8
. chr(0) // byte 9
. chr(10) // byte 10
;
return chr(ControlPacketType::MOST_SIGNIFICANT_BYTE) // byte 1
. chr(strlen($this->version->getProtocolIdentifierString())) // byte 2
. $this->version->getProtocolIdentifierString() // byte 3,4,5,6
. chr($this->version->getProtocolVersion()) // byte 7
. chr($this->getConnectFlags()) // byte 8
. $this->getKeepAlive(); // byte 9,10
}

/**
* @return string
*/
private function getKeepAlive()
{
$msb = $this->keepAlive >> 8;
$lsb = $this->keepAlive % 256;

return chr($msb)
. chr($lsb);
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/packet/ConnectionOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ class ConnectionOptions
*/
public $willRetain = false;

/**
* The Keep Alive is a time interval measured in seconds.
*
* @see http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Keep_Alive
*
* @var int
*/
public $keepAlive = 0;

/**
* ConnectionOptions constructor.
*
Expand Down
37 changes: 29 additions & 8 deletions tests/unit/packet/ConnectTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function testGetHeaderTestVariableHeaderWithoutConnectFlags()
chr(4) . // byte 7
chr(0) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -59,7 +59,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagsCleanSession()
chr(4) . // byte 7
chr(2) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -80,7 +80,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillFlag()
chr(4) . // byte 7
chr(4) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -101,7 +101,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillRetain()
chr(4) . // byte 7
chr(32) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -122,7 +122,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagUsername()
chr(4) . // byte 7
chr(128) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -143,7 +143,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagPassword()
chr(4) . // byte 7
chr(64) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -164,7 +164,7 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagWillWillQos()
chr(4) . // byte 7
chr(8) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand All @@ -185,7 +185,28 @@ public function testGetHeaderTestVariableHeaderWithConnectFlagUserNamePasswordCl
chr(4) . // byte 7
chr(194) . // byte 8
chr(0) . // byte 9
chr(10) // byte 10
chr(0) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
}

public function testBytesNineAndTenOfVariableHeaderAreKeepAlive()
{
$version = new \oliverlorenz\reactphpmqtt\protocol\Version4();
$packet = new \oliverlorenz\reactphpmqtt\packet\Connect(
$version, null, null, null, true, null, null, null, null, 999
);

$this->assertEquals(
MessageHelper::getReadableByRawString(
chr(0) . // byte 1
chr(4) . // byte 2
'MQTT' . // byte 3,4,5,6
chr(4) . // byte 7
chr(2) . // byte 8
chr(3) . // byte 9
chr(231) // byte 10
),
MessageHelper::getReadableByRawString(substr($packet->get(), 2, 10))
);
Expand Down

0 comments on commit c511e59

Please sign in to comment.