Skip to content

Commit

Permalink
Minor executor changes and test updates
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 21, 2017
1 parent f9423d0 commit 24fe594
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
23 changes: 12 additions & 11 deletions lib/PgSqlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ public function __construct($handle, $socket) {

while ($result = \pg_get_notify($handle, \PGSQL_ASSOC)) {
$channel = $result["message"];
if (isset($listeners[$channel])) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $result["pid"];
$notification->payload = $result["payload"];
$listeners[$channel]->emit($notification);

if (!isset($listeners[$channel])) {
return;
}

$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $result["pid"];
$notification->payload = $result["payload"];
$listeners[$channel]->emit($notification);
}

if ($deferred === null) {
Expand All @@ -76,7 +79,7 @@ public function __construct($handle, $socket) {

$this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) {
$flush = \pg_flush($handle);
if (0 === $flush) {
if ($flush === 0) {
return; // Not finished sending data, listen again.
}

Expand Down Expand Up @@ -231,7 +234,7 @@ public function listen(string $channel): Promise {
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}

$this->listeners[$channel] = $emitter = new Emitter;
$this->listeners[$channel] = $emitter = new Emitter;

try {
yield $this->query(\sprintf("LISTEN %s", $channel));
Expand Down Expand Up @@ -265,9 +268,7 @@ private function unlisten(string $channel): Promise {
}

$promise = $this->query(\sprintf("UNLISTEN %s", $channel));
$promise->onResolve(function () use ($emitter) {
$emitter->complete();
});
$promise->onResolve([$emitter, "complete"]);
return $promise;
}
}
4 changes: 1 addition & 3 deletions lib/PqExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,7 @@ private function unlisten(string $channel): Promise {
}

$promise = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$promise->onResolve(function () use ($emitter) {
$emitter->complete();
});
$promise->onResolve([$emitter, "complete"]);
return $promise;
}
}
23 changes: 15 additions & 8 deletions test/AbstractConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Amp\Postgres\TransactionError;
use Amp\Postgres\TupleResult;
use PHPUnit\Framework\TestCase;
use function Amp\asyncCall;

abstract class AbstractConnectionTest extends TestCase {
/** @var \Amp\Postgres\Connection */
Expand Down Expand Up @@ -191,18 +192,21 @@ public function testListen() {

$this->assertInstanceOf(Listener::class, $listener);

yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '0'));
yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '1'));
asyncCall(function () use ($channel) {
yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '0'));
yield $this->connection->query(\sprintf("NOTIFY %s, '%s'", $channel, '1'));
});

$count = 0;
Loop::defer(function () use (&$count, $listener) {
Loop::delay(100, function () use ($listener) {
$listener->unlisten();
$this->assertSame(2, $count);
});

while (yield $listener->advance()) {
$this->assertSame($listener->getCurrent()->payload, (string) $count++);
}

$this->assertSame(2, $count);
});
}

Expand All @@ -215,18 +219,21 @@ public function testNotify() {
/** @var \Amp\Postgres\Listener $listener */
$listener = yield $this->connection->listen($channel);

yield $this->connection->notify($channel, '0');
yield $this->connection->notify($channel, '1');
asyncCall(function () use ($channel) {
yield $this->connection->notify($channel, '0');
yield $this->connection->notify($channel, '1');
});

$count = 0;
Loop::defer(function () use (&$count, $listener) {
Loop::delay(100, function () use ($listener) {
$listener->unlisten();
$this->assertSame(2, $count);
});

while (yield $listener->advance()) {
$this->assertSame($listener->getCurrent()->payload, (string) $count++);
}

$this->assertSame(2, $count);
});
}

Expand Down

0 comments on commit 24fe594

Please sign in to comment.