Skip to content

Commit

Permalink
Allow totalBytesProcessed and cacheHit to be null (#109)
Browse files Browse the repository at this point in the history
* Allow totalBytesProcessed and cacheHit to be null

* Make logic equivalent to preexisting

* Fix tests

* Also get ready for release
  • Loading branch information
wnob authored Aug 17, 2021
1 parent 39e66bd commit 8a14db2
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
to a groupId that we have ownership over -->
<groupId>com.github.jonathanswenson</groupId>
<artifactId>bqjdbc</artifactId>
<version>2.3.11-SNAPSHOT</version>
<version>2.3.11</version>
<name>Big Query over JDBC</name>
<description>A simple JDBC driver, to reach Google's BigQuery</description>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.*;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -88,11 +89,11 @@ public class BQForwardOnlyResultSet implements java.sql.ResultSet {
/** the ProjectId */
private String projectId;
/** Reference for the Job */
private Job completedJob;
private @Nullable Job completedJob;
/** The total number of bytes processed while creating this ResultSet */
private final long totalBytesProcessed;
private final @Nullable Long totalBytesProcessed;
/** Whether the ResultSet came from BigQuery's cache */
private final boolean cacheHit;
private final @Nullable Boolean cacheHit;
/** Cursor position which goes from -1 to FETCH_SIZE then 0 to FETCH_SIZE
* The -1 is needed because of the while(Result.next() == true) { } iterating method*/
private int Cursor = -1;
Expand All @@ -102,8 +103,8 @@ public class BQForwardOnlyResultSet implements java.sql.ResultSet {
.withZone(ZoneId.of("UTC"));

public BQForwardOnlyResultSet(Bigquery bigquery, String projectId,
Job completedJob, BQStatementRoot bqStatementRoot) throws SQLException {
this(bigquery, projectId, completedJob, bqStatementRoot, null, false, null, 0, false);
@Nullable Job completedJob, BQStatementRoot bqStatementRoot) throws SQLException {
this(bigquery, projectId, completedJob, bqStatementRoot, null, false, null, 0L, false);
}

/**
Expand All @@ -117,10 +118,10 @@ public BQForwardOnlyResultSet(Bigquery bigquery, String projectId,
* @throws SQLException - if we fail to get the results
*/
public BQForwardOnlyResultSet(Bigquery bigquery, String projectId,
Job completedJob, BQStatementRoot bqStatementRoot,
@Nullable Job completedJob, BQStatementRoot bqStatementRoot,
List<TableRow> prefetchedRows, boolean prefetchedAllRows,
TableSchema schema,
long totalBytesProcessed, boolean cacheHit
@Nullable Long totalBytesProcessed, @Nullable Boolean cacheHit
) throws SQLException {
logger.debug("Created forward only resultset TYPE_FORWARD_ONLY");
this.Statementreference = (Statement) bqStatementRoot;
Expand All @@ -138,6 +139,9 @@ public BQForwardOnlyResultSet(Bigquery bigquery, String projectId,
this.cacheHit = cacheHit;
} else {
// initial load
if (completedJob == null) {
throw new BQSQLException("Cannot poll results without a job reference");
}
GetQueryResultsResponse result;
try {
result = BQSupportFuncts.getQueryResultsDivided(bigquery,
Expand All @@ -158,7 +162,7 @@ public BQForwardOnlyResultSet(Bigquery bigquery, String projectId,
fetchPos = fetchPos.add(BigInteger.valueOf(this.rowsofResult.size()));
}
this.totalBytesProcessed = result.getTotalBytesProcessed();
this.cacheHit = Boolean.TRUE.equals(result.getCacheHit()); // coerce Boolean nullable object to boolean primitive
this.cacheHit = result.getCacheHit();
}
}
}
Expand Down Expand Up @@ -1466,6 +1470,9 @@ public boolean next() throws SQLException {
return false;
}

if (completedJob == null) {
throw new BQSQLException("Cannot poll results without a job reference");
}
GetQueryResultsResponse result;
try {
result = BQSupportFuncts.getQueryResultsDivided(bigquery,
Expand Down Expand Up @@ -2729,11 +2736,11 @@ public boolean wasNull() throws SQLException {
return this.wasnull;
}

public long getTotalBytesProcessed() {
public @Nullable Long getTotalBytesProcessed() {
return totalBytesProcessed;
}

public boolean getCacheHit() {
public @Nullable Boolean getCacheHit() {
return cacheHit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSetMetaData;
Expand Down Expand Up @@ -63,10 +64,10 @@ public class BQScrollableResultSet extends ScrollableResultset<Object> implement
private BQStatement Statementreference = null;

/** The total number of bytes processed while creating this ResultSet */
private final long totalBytesProcessed;
private final @Nullable Long totalBytesProcessed;

/** Whether the ResultSet came from BigQuery's cache */
private final boolean cacheHit;
private final @Nullable Boolean cacheHit;

private TableSchema schema;

Expand All @@ -93,7 +94,7 @@ public BQScrollableResultSet(GetQueryResultsResponse bigQueryGetQueryResultRespo
} // Should not happen.
}

public BQScrollableResultSet(List<TableRow> rows, BQStatementRoot bqStatementRoot, TableSchema schema, long totalBytesProcessed, boolean cacheHit) {
public BQScrollableResultSet(List<TableRow> rows, BQStatementRoot bqStatementRoot, TableSchema schema, @Nullable Long totalBytesProcessed, @Nullable Boolean cacheHit) {
logger.debug("Created Scrollable resultset TYPE_SCROLL_INSENSITIVE");
try {
maxFieldSize = bqStatementRoot.getMaxFieldSize();
Expand Down Expand Up @@ -256,11 +257,11 @@ public String getString(int columnIndex) throws SQLException {
}
}

public long getTotalBytesProcessed() {
public @Nullable Long getTotalBytesProcessed() {
return totalBytesProcessed;
}

public boolean getCacheHit() {
public @Nullable Boolean getCacheHit() {
return cacheHit;
}
}
35 changes: 19 additions & 16 deletions src/main/java/net/starschema/clouddb/jdbc/BQStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
}

this.starttime = System.currentTimeMillis();
Job referencedJob;
Job referencedJob = null;
int retries = 0;
boolean jobAlreadyCompleted = false;

Expand All @@ -190,33 +190,30 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
(qr.getTotalRows().equals(BigInteger.ZERO) ||
(qr.getRows() != null && qr.getTotalRows().equals(BigInteger.valueOf(qr.getRows().size()))));
// Don't look up the job if we have nothing else we need to do
referencedJob = fetchedAll || this.connection.isClosed() ?
null :
qr.getJobReference() == null ?
null :
this.connection.getBigquery()
.jobs()
.get(projectId, qr.getJobReference().getJobId())
.setLocation(qr.getJobReference().getLocation())
.execute();
if (!(fetchedAll || this.connection.isClosed())) {
if (qr.getJobReference() != null) {
referencedJob =
this.connection.getBigquery()
.jobs()
.get(projectId, qr.getJobReference().getJobId())
.setLocation(qr.getJobReference().getLocation())
.execute();
}
}
if (jobComplete) {
if (resultSetType != ResultSet.TYPE_SCROLL_INSENSITIVE) {
Boolean cacheHit = defaultValueIfNull(qr.getCacheHit(), false);
Long totalBytesProcessed = defaultValueIfNull(qr.getTotalBytesProcessed(), 0L);
List<TableRow> rows = defaultValueIfNull(qr.getRows(), new ArrayList<TableRow>());
TableSchema schema = defaultValueIfNull(qr.getSchema(), new TableSchema());

return new BQForwardOnlyResultSet(
this.connection.getBigquery(),
projectId,
referencedJob, this, rows, fetchedAll, schema, totalBytesProcessed, cacheHit);
referencedJob, this, rows, fetchedAll, schema, qr.getTotalBytesProcessed(), qr.getCacheHit());
} else if (fetchedAll) {
// We can only return scrollable result sets here if we have all the rows: otherwise we'll
// have to go get more below
Boolean cacheHit = defaultValueIfNull(qr.getCacheHit(), false);
Long totalBytesProcessed = defaultValueIfNull(qr.getTotalBytesProcessed(), 0L);
TableSchema schema = defaultValueIfNull(qr.getSchema(), new TableSchema());
return new BQScrollableResultSet(qr.getRows(), this, schema, totalBytesProcessed, cacheHit);
return new BQScrollableResultSet(qr.getRows(), this, schema, qr.getTotalBytesProcessed(), qr.getCacheHit());
}
jobAlreadyCompleted = true;
}
Expand All @@ -235,6 +232,9 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy
if (jobAlreadyCompleted) {
status = "DONE";
} else {
if (referencedJob == null) {
throw new BQSQLException("Cannot poll results without a job reference");
}
try {
status = BQSupportFuncts.getQueryState(referencedJob,
this.connection.getBigquery(),
Expand All @@ -252,6 +252,9 @@ private ResultSet executeQueryHelper(String querySql, boolean unlimitedBillingBy

if (status.equals("DONE")) {
if (resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE) {
if (referencedJob == null) {
throw new BQSQLException("Cannot poll results without a job reference");
}
return new BQScrollableResultSet(BQSupportFuncts.getQueryResults(
this.connection.getBigquery(),
projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,8 @@ public ResultSet executeQuery(String querySql, boolean unlimitedBillingBytes) th
if (defaultValueIfNull(qr.getJobComplete(), false)) {
List<TableRow> rows = defaultValueIfNull(qr.getRows(), new ArrayList<TableRow>());
if (BigInteger.valueOf(rows.size()).equals(qr.getTotalRows())) {
Boolean cacheHit = defaultValueIfNull(qr.getCacheHit(), false);
Long totalBytesProcessed = defaultValueIfNull(qr.getTotalBytesProcessed(), 0L);
TableSchema schema = defaultValueIfNull(qr.getSchema(), new TableSchema());
return new BQScrollableResultSet(rows, this, schema, totalBytesProcessed, cacheHit);
return new BQScrollableResultSet(rows, this, schema, qr.getTotalBytesProcessed(), qr.getCacheHit());
}
jobAlreadyCompleted = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
package net.starschema.clouddb.jdbc;

import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import java.io.IOException;
Expand Down Expand Up @@ -594,7 +597,8 @@ public void TestResultSetTotalBytesProcessedCacheHit() {
QueryLoad();
Assert.assertTrue(resultForTest instanceof BQForwardOnlyResultSet);
BQForwardOnlyResultSet results = (BQForwardOnlyResultSet)resultForTest;
Assert.assertEquals(results.getTotalBytesProcessed() == 0, results.getCacheHit());
final Boolean processedNoBytes = new Long(0L).equals(results.getTotalBytesProcessed());
Assert.assertEquals(processedNoBytes, results.getCacheHit());
}

@Test
Expand Down Expand Up @@ -653,4 +657,45 @@ protected long getSyncTimeoutMillis() {
stmt.executeQuery(cleanupSql);
}
}

@Test
public void testHandlesAllNullResponseFields() throws Exception {
try {
mockResponse("{}");
} catch (BQSQLException e) {
Assert.assertTrue(e.getMessage().contains("without a job reference"));
return;
}
throw new AssertionError("Expected graceful failure due to lack of job reference");
}

@Test
public void testHandlesSomeNullResponseFields() throws Exception {
// Make sure we don't get any NPE's due to null values;
mockResponse(
"{ \"jobComplete\": true, "
+ "\"totalRows\": \"0\", "
+ "\"rows\": [] }");
}

private void mockResponse(String jsonResponse) throws Exception {
Properties properties =
BQSupportFuncts.readFromPropFile(
getClass().getResource("/installedaccount.properties").getFile());
String url = BQSupportFuncts.constructUrlFromPropertiesFile(properties, true, null);
// Mock an empty response object.
MockHttpTransport mockTransport =
new MockHttpTransport.Builder()
.setLowLevelHttpResponse(
new MockLowLevelHttpResponse().setContent(jsonResponse))
.build();
BQConnection bq = new BQConnection(url + "&useLegacySql=false", properties, mockTransport);
BQStatement stmt = new BQStatement(properties.getProperty("projectid"), bq, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
String sqlStmt = "SELECT word from publicdata:samples.shakespeare LIMIT 100";

BQForwardOnlyResultSet results = ((BQForwardOnlyResultSet)stmt.executeQuery(sqlStmt));

results.getTotalBytesProcessed();
results.getCacheHit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
*/
package net.starschema.clouddb.jdbc;

import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;

import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import junit.framework.Assert;

Expand Down Expand Up @@ -722,7 +727,48 @@ public void TestResultSetRelative() {
public void TestResultSetTotalBytesProcessedCacheHit() {
Assert.assertTrue(Result instanceof BQScrollableResultSet);
BQScrollableResultSet results = (BQScrollableResultSet)Result;
Assert.assertEquals(results.getTotalBytesProcessed() == 0, results.getCacheHit());
final Boolean processedNoBytes = new Long(0L).equals(results.getTotalBytesProcessed());
Assert.assertEquals(processedNoBytes, results.getCacheHit());
}

@Test
public void testHandlesAllNullResponseFields() throws Exception {
try {
mockResponse("{}");
} catch (BQSQLException e) {
Assert.assertTrue(e.getMessage().contains("without a job reference"));
return;
}
throw new AssertionError("Expected graceful failure due to lack of job reference");
}

@Test
public void testHandlesSomeNullResponseFields() throws Exception {
// Make sure we don't get any NPE's due to null values;
mockResponse(
"{ \"jobComplete\": true, "
+ "\"totalRows\": \"0\", "
+ "\"rows\": [] }");
}

private void mockResponse(String jsonResponse) throws Exception {
Properties properties =
BQSupportFuncts.readFromPropFile(
getClass().getResource("/installedaccount.properties").getFile());
String url = BQSupportFuncts.constructUrlFromPropertiesFile(properties, true, null);
// Mock an empty response object.
MockHttpTransport mockTransport =
new MockHttpTransport.Builder()
.setLowLevelHttpResponse(
new MockLowLevelHttpResponse().setContent(jsonResponse))
.build();
BQConnection bq = new BQConnection(url + "&useLegacySql=false", properties, mockTransport);
BQStatement stmt = new BQStatement(properties.getProperty("projectid"), bq, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
String sqlStmt = "SELECT word from publicdata:samples.shakespeare LIMIT 100";

BQScrollableResultSet results = ((BQScrollableResultSet)stmt.executeQuery(sqlStmt));

results.getTotalBytesProcessed();
results.getCacheHit();
}
}

0 comments on commit 8a14db2

Please sign in to comment.