Skip to content

Commit

Permalink
php: Implement streaming execute.
Browse files Browse the repository at this point in the history
  • Loading branch information
enisoc committed Aug 25, 2015
1 parent 7285186 commit 31d2418
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 3 deletions.
24 changes: 21 additions & 3 deletions php/src/GoRpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ abstract class GoRpcClient {
abstract protected function sendRequest(VTContext $ctx, GoRpcRequest $req);

abstract protected function readResponse(VTContext $ctx);
private $in_stream_call = FALSE;

public function dial(VTContext $ctx, $addr, $path) {
// Connect to $addr.
Expand All @@ -93,6 +94,10 @@ public function close() {
}

public function call(VTContext $ctx, $method, $request) {
if ($this->in_stream_call) {
throw new Exception("GoRpcClient: can't make another call until results of streaming call are completely fetched.");
}

$req = new GoRpcRequest($this->nextSeq(), $method, $request);
$this->sendRequest($ctx, $req);

Expand All @@ -106,18 +111,31 @@ public function call(VTContext $ctx, $method, $request) {
}

public function streamCall(VTContext $ctx, $method, $request) {
if ($this->in_stream_call) {
throw new Exception("GoRpcClient: can't make another call until results of streaming call are completely fetched.");
}

$req = new GoRpcRequest($this->nextSeq(), $method, $request);
$this->sendRequest($ctx, $req);
$this->in_stream_call = TRUE;
}

public function streamNext(VTContext $ctx) {
if (! $this->in_stream_call) {
throw new Exception("GoRpcClient: streamNext called when not in streaming call.");
}

$resp = $this->readResponse($ctx);
if ($resp->seq() != $this->seq)
if ($resp->seq() != $this->seq) {
throw new GoRpcException("$method: request sequence mismatch");
if ($resp->isEOS())
}
if ($resp->isEOS()) {
$this->in_stream_call = FALSE;
return FALSE;
if ($resp->error())
}
if ($resp->error()) {
throw new GoRpcRemoteError("$method: " . $resp->error());
}
return $resp;
}

Expand Down
74 changes: 74 additions & 0 deletions php/src/VTGateConn.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ private function callExecuteBatch(VTContext $ctx, $queries, $tablet_type, $as_tr
return $results;
}

private function callStreamExecute(VTContext $ctx, $query, array $bind_vars, $tablet_type, $method, $req = array()) {
$req['Query'] = VTBoundQuery::buildBsonP3($query, $bind_vars);
$req['TabletType'] = $tablet_type;
if ($ctx->getCallerId()) {
$req['CallerId'] = $ctx->getCallerId()->toBsonP3();
}

$this->client->streamCall($ctx, $method, $req);

return new VTStreamResults($ctx, $this->client);
}

public function execute(VTContext $ctx, $query, array $bind_vars, $tablet_type) {
return $this->callExecute($ctx, $query, $bind_vars, $tablet_type, 'VTGateP3.Execute');
}
Expand Down Expand Up @@ -214,6 +226,31 @@ public function executeBatchKeyspaceIds(VTContext $ctx, array $bound_keyspace_id
return $this->callExecuteBatch($ctx, VTBoundKeyspaceIdQuery::buildBsonP3Array($bound_keyspace_id_queries), $tablet_type, $as_transaction, 'VTGateP3.ExecuteBatchKeyspaceIds');
}

public function streamExecute(VTContext $ctx, $query, array $bind_vars, $tablet_type) {
return $this->callStreamExecute($ctx, $query, $bind_vars, $tablet_type, 'VTGateP3.StreamExecute2');
}

public function streamExecuteShards(VTContext $ctx, $query, $keyspace, array $shards, array $bind_vars, $tablet_type) {
return $this->callStreamExecute($ctx, $query, $bind_vars, $tablet_type, 'VTGateP3.StreamExecuteShards2', array(
'Keyspace' => $keyspace,
'Shards' => $shards
));
}

public function streamExecuteKeyspaceIds(VTContext $ctx, $query, $keyspace, array $keyspace_ids, array $bind_vars, $tablet_type) {
return $this->callStreamExecute($ctx, $query, $bind_vars, $tablet_type, 'VTGateP3.StreamExecuteKeyspaceIds2', array(
'Keyspace' => $keyspace,
'KeyspaceIds' => VTKeyspaceId::buildBsonP3Array($keyspace_ids)
));
}

public function streamExecuteKeyRanges(VTContext $ctx, $query, $keyspace, array $key_ranges, array $bind_vars, $tablet_type) {
return $this->callStreamExecute($ctx, $query, $bind_vars, $tablet_type, 'VTGateP3.StreamExecuteKeyRanges2', array(
'Keyspace' => $keyspace,
'KeyRanges' => VTKeyRange::buildBsonP3Array($key_ranges)
));
}

public function begin(VTContext $ctx) {
$req = array();
if ($ctx->getCallerId()) {
Expand Down Expand Up @@ -263,3 +300,40 @@ public function close() {
$this->client->close();
}
}

class VTStreamResults {
private $ctx;
private $client;

public function __construct($ctx, $client) {
$this->ctx = $ctx;
$this->client = $client;
}

/**
* fetch reads and returns the next VTQueryResult from the stream.
*
* If there are no more results and the stream has finished successfully,
* it returns FALSE.
*/
public function fetch() {
$resp = $this->client->streamNext($this->ctx);
if ($resp === FALSE) {
return FALSE;
}
VTProto::checkError($resp->reply);
return VTQueryResult::fromBsonP3($resp->reply['Result']);
}

/**
* fetchAll calls fetch in a loop until it returns FALSE, and then returns the
* results as an array.
*/
public function fetchAll() {
$results = array();
while (($result = $this->fetch()) !== FALSE) {
$results[] = $result;
}
return $results;
}
}
43 changes: 43 additions & 0 deletions php/tests/VTGateConnTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,49 @@ public function testEchoExecute() {
$this->assertEquals('true', $echo['asTransaction']);
}

public function testEchoStreamExecute() {
$ctx = $this->ctx;
$conn = $this->conn;

$sr = $conn->streamExecute($ctx, self::$ECHO_QUERY, self::$BIND_VARS, self::$TABLET_TYPE);
$results = $sr->fetchAll();
$echo = self::getEcho($results[0]);
$this->assertEquals(self::$CALLER_ID_ECHO, $echo['callerId']);
$this->assertEquals(self::$ECHO_QUERY, $echo['query']);
$this->assertEquals(self::$BIND_VARS_ECHO, $echo['bindVars']);
$this->assertEquals(self::$TABLET_TYPE_ECHO, $echo['tabletType']);

$sr = $conn->streamExecuteShards($ctx, self::$ECHO_QUERY, self::$KEYSPACE, self::$SHARDS, self::$BIND_VARS, self::$TABLET_TYPE);
$results = $sr->fetchAll();
$echo = self::getEcho($results[0]);
$this->assertEquals(self::$CALLER_ID_ECHO, $echo['callerId']);
$this->assertEquals(self::$ECHO_QUERY, $echo['query']);
$this->assertEquals(self::$KEYSPACE, $echo['keyspace']);
$this->assertEquals(self::$SHARDS_ECHO, $echo['shards']);
$this->assertEquals(self::$BIND_VARS_ECHO, $echo['bindVars']);
$this->assertEquals(self::$TABLET_TYPE_ECHO, $echo['tabletType']);

$sr = $conn->streamExecuteKeyspaceIds($ctx, self::$ECHO_QUERY, self::$KEYSPACE, self::$KEYSPACE_IDS, self::$BIND_VARS, self::$TABLET_TYPE);
$results = $sr->fetchAll();
$echo = self::getEcho($results[0]);
$this->assertEquals(self::$CALLER_ID_ECHO, $echo['callerId']);
$this->assertEquals(self::$ECHO_QUERY, $echo['query']);
$this->assertEquals(self::$KEYSPACE, $echo['keyspace']);
$this->assertEquals(self::$KEYSPACE_IDS_ECHO, $echo['keyspaceIds']);
$this->assertEquals(self::$BIND_VARS_ECHO, $echo['bindVars']);
$this->assertEquals(self::$TABLET_TYPE_ECHO, $echo['tabletType']);

$sr = $conn->streamExecuteKeyRanges($ctx, self::$ECHO_QUERY, self::$KEYSPACE, self::$KEY_RANGES, self::$BIND_VARS, self::$TABLET_TYPE);
$results = $sr->fetchAll();
$echo = self::getEcho($results[0]);
$this->assertEquals(self::$CALLER_ID_ECHO, $echo['callerId']);
$this->assertEquals(self::$ECHO_QUERY, $echo['query']);
$this->assertEquals(self::$KEYSPACE, $echo['keyspace']);
$this->assertEquals(self::$KEY_RANGES_ECHO, $echo['keyRanges']);
$this->assertEquals(self::$BIND_VARS_ECHO, $echo['bindVars']);
$this->assertEquals(self::$TABLET_TYPE_ECHO, $echo['tabletType']);
}

public function testEchoTransactionExecute() {
$ctx = $this->ctx;
$conn = $this->conn;
Expand Down

0 comments on commit 31d2418

Please sign in to comment.