Skip to content

Commit

Permalink
fix: make sure to propagate the response when throttling is enabled (#…
Browse files Browse the repository at this point in the history
…1908)

Change-Id: I690c522aebea03a966155d930bff26042d1bb1f1

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
igorbernstein2 authored Sep 12, 2023
1 parent 100dcd4 commit f743187
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ protected void onResponseImpl(MutateRowsResponse response) {
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
}
outerObserver.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.RateLimitInfo;
import com.google.cloud.bigtable.gaxx.testing.FakeStatusCode;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.rpc.Code;
import com.google.rpc.Status;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -138,6 +142,46 @@ public void testErrorInfoLowerQPS() throws Exception {
assertThat(newQps).isWithin(0.1).of(oldQps * RateLimitingServerStreamingCallable.MIN_FACTOR);
}

@Test
public void testResponseIsPropagated() {
MutateRowsResponse expectedResponse =
MutateRowsResponse.newBuilder()
.addEntries(
MutateRowsResponse.Entry.newBuilder()
.setIndex(0)
.setStatus(Status.newBuilder().setCode(Code.PERMISSION_DENIED_VALUE)))
.build();
innerCallable =
new MockCallable() {
@Override
public void call(
MutateRowsRequest mutateRowsRequest,
ResponseObserver<MutateRowsResponse> responseObserver,
ApiCallContext apiCallContext) {
responseObserver.onResponse(expectedResponse);
responseObserver.onComplete();
}
};

callableToTest = new RateLimitingServerStreamingCallable(innerCallable);

ResponseObserver<MutateRowsResponse> mockObserver = Mockito.mock(ResponseObserver.class);

MutateRowsRequest req =
MutateRowsRequest.newBuilder()
.addEntries(
MutateRowsRequest.Entry.newBuilder()
.setRowKey(ByteString.copyFromUtf8("k1"))
.addMutations(
Mutation.newBuilder()
.setDeleteFromRow(Mutation.DeleteFromRow.getDefaultInstance())))
.build();

callableToTest.call(req, mockObserver, context);

Mockito.verify(mockObserver, Mockito.times(1)).onResponse(Mockito.eq(expectedResponse));
}

private static class MockResponseObserver implements ResponseObserver<MutateRowsResponse> {

private ResponseObserver<MutateRowsResponse> observer;
Expand Down

0 comments on commit f743187

Please sign in to comment.