Skip to content

Commit

Permalink
Reuse statement handles
Browse files Browse the repository at this point in the history
Allows the same query to be prepared multiple times on a single connection without error or performance penalty.
  • Loading branch information
trowski committed Jul 28, 2017
1 parent 65ede1b commit 0c3b706
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 20 deletions.
2 changes: 2 additions & 0 deletions lib/Executor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Amp\Promise;

interface Executor {
const STATEMENT_NAME_PREFIX = "amp_";

/**
* @param string $sql
*
Expand Down
8 changes: 8 additions & 0 deletions lib/Internal/PqStatementStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Amp\Postgres\Internal;

class PqStatementStorage extends StatementStorage {
/** @var \pq\Statement */
public $statement;
}
15 changes: 15 additions & 0 deletions lib/Internal/StatementStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

namespace Amp\Postgres\Internal;

use Amp\Struct;

class StatementStorage {
use Struct;

/** @var \Amp\Promise|null */
public $promise;

/** @var int */
public $count = 1;
}
44 changes: 37 additions & 7 deletions lib/PgSqlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Amp\Emitter;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use function Amp\call;

class PgSqlExecutor implements Executor {
Expand Down Expand Up @@ -36,6 +37,9 @@ class PgSqlExecutor implements Executor {
/** @var callable */
private $unlisten;

/** @var \Amp\Postgres\Internal\StatementStorage[] */
private $statements = [];

/**
* Connection constructor.
*
Expand Down Expand Up @@ -199,8 +203,18 @@ private function sendExecute(string $name, array $params): Promise {
});
}

private function sendDeallocate(string $name): Promise {
return $this->query(\sprintf("DEALLOCATE %s", $name));
private function sendDeallocate(string $name) {
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");

$storage = $this->statements[$name];

if (--$storage->count) {
return;
}

unset($this->statements[$name]);

Promise\rethrow($this->query(\sprintf("DEALLOCATE %s", $name)));
}

/**
Expand All @@ -225,11 +239,29 @@ public function execute(string $sql, ...$params): Promise {
* {@inheritdoc}
*/
public function prepare(string $sql): Promise {
return call(function () use ($sql) {
$name = "amphp" . \sha1($sql);
$name = self::STATEMENT_NAME_PREFIX . \sha1($sql);

if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
++$storage->count;

if ($storage->promise) {
return $storage->promise;
}

return new Success(new PgSqlStatement($name, $sql, $this->executeCallback, $this->deallocateCallback));
}

$this->statements[$name] = $storage = new Internal\StatementStorage;

$storage->promise = call(function () use ($name, $sql) {
yield from $this->send("pg_send_prepare", $name, $sql);
return new PgSqlStatement($name, $sql, $this->executeCallback, $this->deallocateCallback);
});
$storage->promise->onResolve(function () use ($storage) {
$storage->promise = null;
});
return $storage->promise;
}

/**
Expand Down Expand Up @@ -274,9 +306,7 @@ public function listen(string $channel): Promise {
* @throws \Error
*/
private function unlisten(string $channel): Promise {
if (!isset($this->listeners[$channel])) {
throw new \Error("Not listening on that channel");
}
\assert(isset($this->listeners[$channel]), "Not listening on that channel");

$emitter = $this->listeners[$channel];
unset($this->listeners[$channel]);
Expand Down
59 changes: 51 additions & 8 deletions lib/PqExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\Emitter;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use pq;
use function Amp\call;
use function Amp\coroutine;
Expand All @@ -33,6 +34,9 @@ class PqExecutor implements Executor {
/** @var \Amp\Emitter[] */
private $listeners;

/** @var \Amp\Postgres\Internal\PqStatementStorage[] */
private $statements = [];

/** @var callable */
private $send;

Expand All @@ -45,6 +49,9 @@ class PqExecutor implements Executor {
/** @var callable */
private $release;

/** @var callable */
private $deallocate;

/**
* Connection constructor.
*
Expand Down Expand Up @@ -89,6 +96,7 @@ public function __construct(pq\Connection $handle) {
$this->fetch = coroutine($this->callableFromInstanceMethod("fetch"));
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
$this->release = $this->callableFromInstanceMethod("release");
$this->deallocate = $this->callableFromInstanceMethod("deallocate");
}

/**
Expand Down Expand Up @@ -134,10 +142,6 @@ private function send(callable $method, ...$args): \Generator {
$this->deferred = null;
}

if ($handle instanceof pq\Statement) {
return new PqStatement($handle, $this->send);
}

if (!$result instanceof pq\Result) {
throw new FailureException("Unknown query result");
}
Expand All @@ -152,6 +156,10 @@ private function send(callable $method, ...$args): \Generator {
throw new QueryError("Empty query string");

case pq\Result::COMMAND_OK:
if ($handle instanceof pq\Statement) {
return $handle; // Will be wrapped into a PqStatement object.
}

return new PqCommandResult($result);

case pq\Result::TUPLES_OK:
Expand Down Expand Up @@ -208,6 +216,20 @@ private function release() {
$deferred->resolve();
}

private function deallocate(string $name) {
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");

$storage = $this->statements[$name];

if (--$storage->count) {
return;
}

unset($this->statements[$name]);

Promise\rethrow(new Coroutine($this->send([$storage->statement, "deallocateAsync"])));
}

/**
* {@inheritdoc}
*/
Expand All @@ -226,7 +248,30 @@ public function execute(string $sql, ...$params): Promise {
* {@inheritdoc}
*/
public function prepare(string $sql): Promise {
return new Coroutine($this->send([$this->handle, "prepareAsync"], "amphp" . \sha1($sql), $sql));
$name = self::STATEMENT_NAME_PREFIX . \sha1($sql);

if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
++$storage->count;

if ($storage->promise) {
return $storage->promise;
}

return new Success(new PqStatement($storage->statement, $name, $this->send, $this->deallocate));
}

$this->statements[$name] = $storage = new Internal\PqStatementStorage;

$storage->promise = call(function () use ($storage, $name, $sql) {
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $sql);
$storage->statement = $statement;
return new PqStatement($statement, $name, $this->send, $this->deallocate);
});
$storage->promise->onResolve(function () use ($storage) {
$storage->promise = null;
});
return $storage->promise;
}

/**
Expand Down Expand Up @@ -277,9 +322,7 @@ static function (string $channel, string $message, int $pid) use ($emitter) {
* @throws \Error
*/
private function unlisten(string $channel): Promise {
if (!isset($this->listeners[$channel])) {
throw new \Error("Not listening on that channel");
}
\assert(isset($this->listeners[$channel]), "Not listening on that channel");

$emitter = $this->listeners[$channel];
unset($this->listeners[$channel]);
Expand Down
14 changes: 12 additions & 2 deletions lib/PqStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,32 @@ class PqStatement implements Statement {
/** @var \pq\Statement */
private $statement;

/** @var string */
private $name;

/** @var callable */
private $execute;

/** @var callable */
private $deallocate;

/**
* @internal
*
* @param \pq\Statement $statement
* @param string $name
* @param callable $execute
* @param callable $deallocate
*/
public function __construct(pq\Statement $statement, callable $execute) {
public function __construct(pq\Statement $statement, string $name, callable $execute, callable $deallocate) {
$this->statement = $statement;
$this->name = $name;
$this->execute = $execute;
$this->deallocate = $deallocate;
}

public function __destruct() {
($this->execute)([$this->statement, "deallocateAsync"]);
($this->deallocate)($this->name);
}

/**
Expand Down
Loading

0 comments on commit 0c3b706

Please sign in to comment.