Skip to content

Commit

Permalink
Merge pull request #1037 from enisoc/php
Browse files Browse the repository at this point in the history
PHP: Implement streaming execute, split query, etc.
  • Loading branch information
enisoc committed Aug 25, 2015
2 parents 427bd4e + 31d2418 commit 20afe43
Show file tree
Hide file tree
Showing 8 changed files with 583 additions and 218 deletions.
95 changes: 7 additions & 88 deletions go/vt/vtgate/bsonp3vtgateconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,7 @@ func (conn *vtgateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []p
}

func (conn *vtgateConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
req := &pb.StreamExecuteRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Query: tproto.BoundQueryToProto3(query, bindVars),
TabletType: tabletType,
}
sr := make(chan *pb.StreamExecuteResponse, 10)
c := conn.rpcConn.StreamGo("VTGateP3.StreamExecute", req, sr)
srr := make(chan streamResult)
go func() {
for v := range sr {
srr <- streamResult{qr: v.Result, err: v.Error}
}
close(srr)
}()
return sendStreamResults(c, srr)
return conn.StreamExecute2(ctx, query, bindVars, tabletType)
}

func (conn *vtgateConn) StreamExecute2(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
Expand All @@ -247,23 +233,7 @@ func (conn *vtgateConn) StreamExecute2(ctx context.Context, query string, bindVa
}

func (conn *vtgateConn) StreamExecuteShards(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
req := &pb.StreamExecuteShardsRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Query: tproto.BoundQueryToProto3(query, bindVars),
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
}
sr := make(chan *pb.StreamExecuteShardsResponse, 10)
c := conn.rpcConn.StreamGo("VTGateP3.StreamExecuteShards", req, sr)
srr := make(chan streamResult)
go func() {
for v := range sr {
srr <- streamResult{qr: v.Result, err: v.Error}
}
close(srr)
}()
return sendStreamResults(c, srr)
return conn.StreamExecuteShards2(ctx, query, keyspace, shards, bindVars, tabletType)
}

func (conn *vtgateConn) StreamExecuteShards2(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
Expand All @@ -287,23 +257,7 @@ func (conn *vtgateConn) StreamExecuteShards2(ctx context.Context, query string,
}

func (conn *vtgateConn) StreamExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []*topopb.KeyRange, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
req := &pb.StreamExecuteKeyRangesRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Query: tproto.BoundQueryToProto3(query, bindVars),
Keyspace: keyspace,
KeyRanges: keyRanges,
TabletType: tabletType,
}
sr := make(chan *pb.StreamExecuteKeyRangesResponse, 10)
c := conn.rpcConn.StreamGo("VTGateP3.StreamExecuteKeyRanges", req, sr)
srr := make(chan streamResult)
go func() {
for v := range sr {
srr <- streamResult{qr: v.Result, err: v.Error}
}
close(srr)
}()
return sendStreamResults(c, srr)
return conn.StreamExecuteKeyRanges2(ctx, query, keyspace, keyRanges, bindVars, tabletType)
}

func (conn *vtgateConn) StreamExecuteKeyRanges2(ctx context.Context, query string, keyspace string, keyRanges []*topopb.KeyRange, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
Expand All @@ -327,23 +281,7 @@ func (conn *vtgateConn) StreamExecuteKeyRanges2(ctx context.Context, query strin
}

func (conn *vtgateConn) StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds [][]byte, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
req := &pb.StreamExecuteKeyspaceIdsRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Query: tproto.BoundQueryToProto3(query, bindVars),
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
}
sr := make(chan *pb.StreamExecuteKeyspaceIdsResponse, 10)
c := conn.rpcConn.StreamGo("VTGateP3.StreamExecuteKeyspaceIds", req, sr)
srr := make(chan streamResult)
go func() {
for v := range sr {
srr <- streamResult{qr: v.Result, err: v.Error}
}
close(srr)
}()
return sendStreamResults(c, srr)
return conn.StreamExecuteKeyspaceIds2(ctx, query, keyspace, keyspaceIds, bindVars, tabletType)
}

func (conn *vtgateConn) StreamExecuteKeyspaceIds2(ctx context.Context, query string, keyspace string, keyspaceIds [][]byte, bindVars map[string]interface{}, tabletType topopb.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc, error) {
Expand Down Expand Up @@ -402,34 +340,15 @@ func sendStreamResults(c *rpcplus.Call, sr chan streamResult) (<-chan *mproto.Qu
}

func (conn *vtgateConn) Begin(ctx context.Context) (interface{}, error) {
request := &pb.BeginRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
}
response := &pb.BeginResponse{}
if err := conn.rpcConn.Call(ctx, "VTGateP3.Begin", request, response); err != nil {
return nil, err
}
return response.Session, nil
return conn.Begin2(ctx)
}

func (conn *vtgateConn) Commit(ctx context.Context, session interface{}) error {
s := session.(*pb.Session)
request := &pb.CommitRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Session: s,
}
response := &pb.CommitResponse{}
return conn.rpcConn.Call(ctx, "VTGateP3.Commit", request, response)
return conn.Commit2(ctx, session)
}

func (conn *vtgateConn) Rollback(ctx context.Context, session interface{}) error {
s := session.(*pb.Session)
request := &pb.RollbackRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Session: s,
}
response := &pb.RollbackResponse{}
return conn.rpcConn.Call(ctx, "VTGateP3.Rollback", request, response)
return conn.Rollback2(ctx, session)
}

func (conn *vtgateConn) Begin2(ctx context.Context) (interface{}, error) {
Expand Down
107 changes: 0 additions & 107 deletions go/vt/vtgate/bsonp3vtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,24 +216,6 @@ func (vtg *VTGateP3) ExecuteBatchKeyspaceIds(ctx context.Context, request *pb.Ex
return vtgErr
}

// StreamExecute is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecute(ctx context.Context, request *pb.StreamExecuteRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
request.CallerId,
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecute(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
request.TabletType,
func(reply *proto.QueryResult) error {
return sendReply(&pb.StreamExecuteResponse{
Error: vtgate.VtGateErrorToVtRPCError(nil, reply.Error),
Result: mproto.QueryResultToProto3(reply.Result),
})
})
}

// StreamExecute2 is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecute2(ctx context.Context, request *pb.StreamExecuteRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
Expand Down Expand Up @@ -267,26 +249,6 @@ func (vtg *VTGateP3) StreamExecute2(ctx context.Context, request *pb.StreamExecu
return vtgErr
}

// StreamExecuteShards is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteShards(ctx context.Context, request *pb.StreamExecuteShardsRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
request.CallerId,
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteShards(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
request.Keyspace,
request.Shards,
request.TabletType,
func(reply *proto.QueryResult) error {
return sendReply(&pb.StreamExecuteShardsResponse{
Error: vtgate.VtGateErrorToVtRPCError(nil, reply.Error),
Result: mproto.QueryResultToProto3(reply.Result),
})
})
}

// StreamExecuteShards2 is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteShards2(ctx context.Context, request *pb.StreamExecuteShardsRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
Expand Down Expand Up @@ -322,27 +284,6 @@ func (vtg *VTGateP3) StreamExecuteShards2(ctx context.Context, request *pb.Strea
return vtgErr
}

// StreamExecuteKeyspaceIds is the RPC version of
// vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteKeyspaceIds(ctx context.Context, request *pb.StreamExecuteKeyspaceIdsRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
request.CallerId,
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteKeyspaceIds(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
request.Keyspace,
key.ProtoToKeyspaceIds(request.KeyspaceIds),
request.TabletType,
func(reply *proto.QueryResult) error {
return sendReply(&pb.StreamExecuteKeyspaceIdsResponse{
Error: vtgate.VtGateErrorToVtRPCError(nil, reply.Error),
Result: mproto.QueryResultToProto3(reply.Result),
})
})
}

// StreamExecuteKeyspaceIds2 is the RPC version of
// vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteKeyspaceIds2(ctx context.Context, request *pb.StreamExecuteKeyspaceIdsRequest, sendReply func(interface{}) error) (err error) {
Expand Down Expand Up @@ -379,27 +320,6 @@ func (vtg *VTGateP3) StreamExecuteKeyspaceIds2(ctx context.Context, request *pb.
return vtgErr
}

// StreamExecuteKeyRanges is the RPC version of
// vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteKeyRanges(ctx context.Context, request *pb.StreamExecuteKeyRangesRequest, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
request.CallerId,
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteKeyRanges(ctx,
string(request.Query.Sql),
tproto.Proto3ToBindVariables(request.Query.BindVariables),
request.Keyspace,
key.ProtoToKeyRanges(request.KeyRanges),
request.TabletType,
func(reply *proto.QueryResult) error {
return sendReply(&pb.StreamExecuteKeyRangesResponse{
Error: vtgate.VtGateErrorToVtRPCError(nil, reply.Error),
Result: mproto.QueryResultToProto3(reply.Result),
})
})
}

// StreamExecuteKeyRanges2 is the RPC version of
// vtgateservice.VTGateService method
func (vtg *VTGateP3) StreamExecuteKeyRanges2(ctx context.Context, request *pb.StreamExecuteKeyRangesRequest, sendReply func(interface{}) error) (err error) {
Expand Down Expand Up @@ -436,33 +356,6 @@ func (vtg *VTGateP3) StreamExecuteKeyRanges2(ctx context.Context, request *pb.St
return vtgErr
}

// Begin is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) Begin(ctx context.Context, request *pb.BeginRequest, response *pb.BeginResponse) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
session := &proto.Session{}
err = vtg.server.Begin(ctx, session)
response.Session = proto.SessionToProto(session)
return
}

// Commit is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) Commit(ctx context.Context, request *pb.CommitRequest, response *pb.CommitResponse) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Commit(ctx, proto.ProtoToSession(request.Session))
}

// Rollback is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) Rollback(ctx context.Context, request *pb.RollbackRequest, response *pb.RollbackResponse) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Rollback(ctx, proto.ProtoToSession(request.Session))
}

// Begin2 is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGateP3) Begin2(ctx context.Context, request *pb.BeginRequest, response *pb.BeginResponse) (err error) {
defer vtg.server.HandlePanic(&err)
Expand Down
1 change: 1 addition & 0 deletions php/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ To run the tests, first install PHPUnit:
``` sh
$ wget https://phar.phpunit.de/phpunit.phar
$ mv phpunit.phar $VTROOT/bin/phpunit
$ chmod +x $VTROOT/bin/phpunit
```

Then run the tests like this:
Expand Down
24 changes: 21 additions & 3 deletions php/src/GoRpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ abstract class GoRpcClient {
abstract protected function sendRequest(VTContext $ctx, GoRpcRequest $req);

abstract protected function readResponse(VTContext $ctx);
private $in_stream_call = FALSE;

public function dial(VTContext $ctx, $addr, $path) {
// Connect to $addr.
Expand All @@ -93,6 +94,10 @@ public function close() {
}

public function call(VTContext $ctx, $method, $request) {
if ($this->in_stream_call) {
throw new Exception("GoRpcClient: can't make another call until results of streaming call are completely fetched.");
}

$req = new GoRpcRequest($this->nextSeq(), $method, $request);
$this->sendRequest($ctx, $req);

Expand All @@ -106,18 +111,31 @@ public function call(VTContext $ctx, $method, $request) {
}

public function streamCall(VTContext $ctx, $method, $request) {
if ($this->in_stream_call) {
throw new Exception("GoRpcClient: can't make another call until results of streaming call are completely fetched.");
}

$req = new GoRpcRequest($this->nextSeq(), $method, $request);
$this->sendRequest($ctx, $req);
$this->in_stream_call = TRUE;
}

public function streamNext(VTContext $ctx) {
if (! $this->in_stream_call) {
throw new Exception("GoRpcClient: streamNext called when not in streaming call.");
}

$resp = $this->readResponse($ctx);
if ($resp->seq() != $this->seq)
if ($resp->seq() != $this->seq) {
throw new GoRpcException("$method: request sequence mismatch");
if ($resp->isEOS())
}
if ($resp->isEOS()) {
$this->in_stream_call = FALSE;
return FALSE;
if ($resp->error())
}
if ($resp->error()) {
throw new GoRpcRemoteError("$method: " . $resp->error());
}
return $resp;
}

Expand Down
Loading

0 comments on commit 20afe43

Please sign in to comment.