diff --git a/src/main/java/com/rockset/jdbc/RocksetStatement.java b/src/main/java/com/rockset/jdbc/RocksetStatement.java index ecf3fcd1..8e4dd2b5 100644 --- a/src/main/java/com/rockset/jdbc/RocksetStatement.java +++ b/src/main/java/com/rockset/jdbc/RocksetStatement.java @@ -185,6 +185,11 @@ private static String getQueryIdFromQueryResponse(QueryResponse response) { protected boolean executeWithParams(String sql, List params) throws SQLException { clearCurrentResults(); checkOpen(); + final String schema = this.connection.get().getSchema(); + + final String sqlWithWorkspace = schema.equals(RocksetConnection.DEFAULT_SCHEMA) + ? sql + : String.format("OPTION(default_workspace='%s')\n %s", schema, sql); ResultSet resultSet = null; try { @@ -192,12 +197,12 @@ protected boolean executeWithParams(String sql, List params) thr // because rockset queries do not yet have a client-side timeout. QueryResponse resp = connection() - .startQuery(sql, this.fetchSize.get(), params, getStatementSessionProperties()); + .startQuery(sqlWithWorkspace, this.fetchSize.get(), params, getStatementSessionProperties()); // store resuts in memory resultSet = new RocksetResultSet( - sql, + sqlWithWorkspace, resp, this.maxRows.get(), RocksetResultSetPaginationParams.builder() @@ -210,7 +215,7 @@ protected boolean executeWithParams(String sql, List params) thr this.currentResult.set(resultSet); return true; } catch (RuntimeException e) { - String msg = "Error executing query '" + sql + "'" + " error = " + e.getMessage(); + String msg = "Error executing query '" + sqlWithWorkspace + "'" + " error = " + e.getMessage(); RocksetDriver.log(msg); throw new SQLException(msg, e); } catch (Exception e) { diff --git a/src/test/java/com/rockset/client/TestWorkspace.java b/src/test/java/com/rockset/client/TestWorkspace.java index 6da8587b..2e333a97 100644 --- a/src/test/java/com/rockset/client/TestWorkspace.java +++ b/src/test/java/com/rockset/client/TestWorkspace.java @@ -1,6 +1,8 @@ package com.rockset.client; import com.rockset.client.model.*; + +import java.sql.Time; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; @@ -61,7 +63,7 @@ public void testDeleteWorkspace() throws Exception { // wait for collection to go away Awaitility.await("Waiting for collection to be cleaned up ") .atMost(3, TimeUnit.MINUTES) - .pollInterval(1, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) .until( (Callable) () -> { diff --git a/src/test/java/com/rockset/client/util/RetryHelper.java b/src/test/java/com/rockset/client/util/RetryHelper.java new file mode 100644 index 00000000..b8a1a30b --- /dev/null +++ b/src/test/java/com/rockset/client/util/RetryHelper.java @@ -0,0 +1,25 @@ +package com.rockset.client.util; + +import com.rockset.client.ApiException; +import org.awaitility.Awaitility; +import org.hamcrest.Matchers; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +public class RetryHelper { + public static T retryOnApiException(Callable request) { + return Awaitility + .await() + .atMost(5, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + try { + return request.call(); + } catch (ApiException e) { + System.out.println("Encountered error, retrying request: " + e.getMessage()); + return null; + } + }, Matchers.notNullValue()); + } +} diff --git a/src/test/java/com/rockset/jdbc/TestSchema.java b/src/test/java/com/rockset/jdbc/TestSchema.java index 5fa90197..e0ae4d08 100644 --- a/src/test/java/com/rockset/jdbc/TestSchema.java +++ b/src/test/java/com/rockset/jdbc/TestSchema.java @@ -1,5 +1,6 @@ package com.rockset.jdbc; +import static com.rockset.client.util.RetryHelper.retryOnApiException; import static org.testng.Assert.assertEquals; import com.fasterxml.jackson.databind.JsonNode; @@ -8,7 +9,10 @@ import com.rockset.client.model.Collection; import com.rockset.client.model.CreateCollectionRequest; import com.rockset.client.model.CreateCollectionResponse; +import com.rockset.client.model.CreateWorkspaceRequest; import com.rockset.client.model.DeleteCollectionResponse; +import com.rockset.client.model.DeleteWorkspaceResponse; +import com.rockset.client.model.ListWorkspacesResponse; import com.rockset.client.model.QueryRequest; import com.rockset.client.model.QueryRequestSql; import com.rockset.client.model.QueryResponse; @@ -27,6 +31,8 @@ import java.util.List; import java.util.Properties; import java.util.Set; + +import com.rockset.client.model.Workspace; import okhttp3.MediaType; import okhttp3.MultipartBody; import okhttp3.OkHttpClient; @@ -35,7 +41,9 @@ import okhttp3.Response; import org.apache.commons.lang3.RandomStringUtils; import org.testng.Assert; +import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; // @@ -59,6 +67,10 @@ public class TestSchema { final ObjectMapper mapper = new ObjectMapper(); + private static final String TEST_WORKSPACE_PREFIX = "jdbc_test_workspace_"; + + private static final String testWorkspace = TEST_WORKSPACE_PREFIX + RandomStringUtils.randomAlphanumeric(8); + @BeforeSuite public void setUp() throws Exception { String apiKey = System.getenv("ROCKSET_APIKEY"); @@ -79,6 +91,16 @@ public void setUp() throws Exception { new RocksetClient(property.getProperty("apiKey"), property.getProperty("apiServer")); // Register JDBC driver Class.forName(JDBC_DRIVER); + + // Clean up any previous test workspaces in case finalization didn't properly run before + // This will also prevent these tests from running concurrently on a single org, but that should be fine + deleteAllJDBCTestHarnessWorkspaces(); + } + + @AfterSuite + public void cleanUp() throws Exception { + // Ensure any jdbc workspaces are removed + deleteAllJDBCTestHarnessWorkspaces(); } // @@ -86,28 +108,29 @@ public void setUp() throws Exception { // This call should return one record with with one column // TABLE_CATALOG // - @Test - public void testGetTables() throws Exception { + @Test(dataProvider = "schemas") + public void testGetTables(String schema) throws Exception { System.out.println("testGetTables"); // create 3 collections int numCollections = 3; List colls = generateCollectionNames(numCollections); - createCollections(colls); + createCollections(colls, schema); // wait for all leaves to be ready to serve - waitCollections(colls); + waitCollections(colls, schema); // now check these 3 collections are returned via the JDBC call Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection(DB_URL, property); + conn.setSchema(schema); DatabaseMetaData meta = conn.getMetaData(); ResultSet rs = meta.getTables( - RocksetConnection.DEFAULT_CATALOG, RocksetConnection.DEFAULT_SCHEMA, "*", null); + RocksetConnection.DEFAULT_CATALOG, schema, "*", null); int colCatIndex = rs.findColumn("TABLE_CAT"); int colSchemaIndex = rs.findColumn("TABLE_SCHEM"); int colNameIndex = rs.findColumn("TABLE_NAME"); @@ -117,7 +140,7 @@ public void testGetTables() throws Exception { int found = 0; while (rs.next()) { String value = rs.getString(colSchemaIndex); - Assert.assertTrue(value.equals(RocksetConnection.DEFAULT_SCHEMA)); + Assert.assertTrue(value.equals(schema)); value = rs.getString(colCatIndex); Assert.assertTrue(value.equals(RocksetConnection.DEFAULT_CATALOG)); value = rs.getString(colTypeIndex); @@ -129,12 +152,12 @@ public void testGetTables() throws Exception { } Assert.assertEquals(numCollections, found); } finally { - cleanup(colls, stmt, conn); + cleanup(colls, stmt, conn, schema); } } - @Test - public void testDateTimeTimestamp() throws Exception { + @Test(dataProvider = "schemas") + public void testDateTimeTimestamp(String schema) throws Exception { System.out.println("testDateTimeTimestamp"); Connection conn = null; Statement stmt = null; @@ -146,19 +169,20 @@ public void testDateTimeTimestamp() throws Exception { try { // create collection - createCollections(colls); + createCollections(colls, schema); // wait for all leaves to be ready to serve - waitCollections(colls); + waitCollections(colls, schema); String csvParams = "{ \"csv\": { \"columnNames\": [\"c1\", \"c2\", \"c3\"], " + "\"columnTypes\": [\"DATE\", \"TIME\", \"TIMESTAMP\"] } }"; - uploadFile(collectionName, "src/test/resources/basic.csv", csvParams); - waitNumberDocs(collectionName, 1); + uploadFile(collectionName, "src/test/resources/basic.csv", csvParams, schema); + waitNumberDocs(collectionName, 1, schema); conn = DriverManager.getConnection(DB_URL, property); + conn.setSchema(schema); // Execute a query System.out.println("Creating statement 1..."); @@ -177,12 +201,12 @@ public void testDateTimeTimestamp() throws Exception { System.out.println("c1: " + c1.toString() + " c2: " + c2.toString() + " c3: " + c3); } } finally { - cleanup(colls, stmt, conn); + cleanup(colls, stmt, conn, schema); } } - @Test - public void testGetColumns() throws Exception { + @Test(dataProvider = "schemas") + public void testGetColumns(String schema) throws Exception { System.out.println("testGetTableColumns"); Connection conn = null; Statement stmt = null; @@ -194,26 +218,27 @@ public void testGetColumns() throws Exception { try { // create collection - createCollections(colls); + createCollections(colls, schema); // wait for all leaves to be ready to serve - waitCollections(colls); + waitCollections(colls, schema); // upload one file to collection and wait for it to be visible - uploadFile(collectionName, "src/test/resources/basic.json", null); - waitNumberDocs(collectionName, 1); + uploadFile(collectionName, "src/test/resources/basic.json", null, schema); + waitNumberDocs(collectionName, 1, schema); // there should be 5 columns in this test file // a, name, nested, _id, _event_time, _meta final int numColumns = 6; conn = DriverManager.getConnection(DB_URL, property); + conn.setSchema(schema); DatabaseMetaData meta = conn.getMetaData(); ResultSet rs = meta.getColumns( RocksetConnection.DEFAULT_CATALOG, - RocksetConnection.DEFAULT_SCHEMA, + schema, collectionName, null); int colCatIndex = rs.findColumn("TABLE_CAT"); @@ -244,7 +269,7 @@ public void testGetColumns() throws Exception { int found = 0; while (rs.next()) { String value = rs.getString(colSchemaIndex); - Assert.assertTrue(value.equals(RocksetConnection.DEFAULT_SCHEMA)); + Assert.assertTrue(value.equals(schema)); value = rs.getString(colCatIndex); Assert.assertTrue(value.equals(RocksetConnection.DEFAULT_CATALOG)); value = rs.getString(colNameIndex); @@ -253,7 +278,7 @@ public void testGetColumns() throws Exception { } Assert.assertEquals(found, numColumns); } finally { - cleanup(colls, stmt, conn); + cleanup(colls, stmt, conn, schema); } } @@ -271,8 +296,8 @@ private void validatePagination(Statement stmt, int pageSize, String query, Set< Assert.assertEquals(cities.size(), 11); } - @Test - public void testPagination() throws Exception { + @Test(dataProvider = "schemas") + public void testPagination(String schema) throws Exception { System.out.println("testPagination"); Connection conn = null; Statement stmt = null; @@ -286,16 +311,17 @@ public void testPagination() throws Exception { try { // create collection - createCollections(colls); + createCollections(colls, schema); // wait for all leaves to be ready to serve - waitCollections(colls); + waitCollections(colls, schema); // upload one file to collection and wait for it to be visible - uploadFile(collectionName, "src/test/resources/pagination_data.json", null); - waitNumberDocs(collectionName, 1); + uploadFile(collectionName, "src/test/resources/pagination_data.json", null, schema); + waitNumberDocs(collectionName, 1, schema); conn = DriverManager.getConnection(DB_URL, property); + conn.setSchema(schema); String query = String.format("SELECT city FROM %s", collectionName); stmt = conn.createStatement(); @@ -307,17 +333,17 @@ public void testPagination() throws Exception { } catch (SQLException e) { System.out.println("Exception: " + e); } finally { - cleanup(colls, stmt, conn); + cleanup(colls, stmt, conn, schema); } } // // Invoked by all unit tests at the end to cleanup its mess // - private void cleanup(List colls, Statement stmt, Connection conn) { + private void cleanup(List colls, Statement stmt, Connection conn, String schema) { try { if (colls != null) { - deleteCollections(colls); + deleteCollections(colls, schema); } } catch (Exception e) { // nothing we can do @@ -353,9 +379,9 @@ private List generateCollectionNames(int num) { // // Wait for all collections to be ready // - private void waitCollections(List names) throws Exception { + private void waitCollections(List names, String workspace) throws Exception { for (String name : names) { - String sql = "describe \"" + name + "\";"; + String sql = "describe " + workspace + "." + name + ";"; while (true) { try { QueryRequestSql qs = new QueryRequestSql().query(sql); @@ -373,10 +399,11 @@ private void waitCollections(List names) throws Exception { // Create the list of collections. Fail if any of the collection // already exists // - private void createCollections(List names) throws Exception { + private void createCollections(List names, String schema) throws Exception { + createWorkspaceIfNonExistent(schema); for (String name : names) { CreateCollectionRequest request = new CreateCollectionRequest().name(name); - CreateCollectionResponse response = testClient.collections.create("commons", request); + CreateCollectionResponse response = testClient.collections.create(schema, request); Assert.assertEquals(response.getData().getName(), name); Assert.assertEquals(response.getData().getStatus(), Collection.StatusEnum.CREATED); @@ -386,10 +413,10 @@ private void createCollections(List names) throws Exception { // // Delete all specified collections // - private void deleteCollections(List names) throws Exception { + private void deleteCollections(List names, String schema) throws Exception { for (String name : names) { DeleteCollectionResponse deleteCollectionResponse = - testClient.collections.delete("commons", name); + testClient.collections.delete(schema, name); Assert.assertEquals(deleteCollectionResponse.getData().getName(), name); // Assert.assertEquals(deleteCollectionResponse.getData().getStatus(), // Collection.StatusEnum.DELETED); @@ -399,7 +426,7 @@ private void deleteCollections(List names) throws Exception { // // Upload the specified file to the specified collection // - void uploadFile(String collectionName, String path, String params) throws IOException { + void uploadFile(String collectionName, String path, String params, String workspace) throws IOException { final File file = new File(path); final MediaType mt = MediaType.parse("text/json; charset=utf-8"); @@ -419,8 +446,8 @@ void uploadFile(String collectionName, String path, String params) throws IOExce // send the file upload request String url = String.format( - "https://%s/v1/orgs/self/ws/commons/collections/%s/uploads", - property.getProperty("apiServer"), collectionName); + "https://%s/v1/orgs/self/ws/%s/collections/%s/uploads", + property.getProperty("apiServer"),workspace, collectionName); System.out.println("Uploading test file to " + url); Request request = @@ -447,8 +474,8 @@ void uploadFile(String collectionName, String path, String params) throws IOExce // // Wait for the specified docs to appear in the collection // - private void waitNumberDocs(String collectionName, int expectedDocs) throws Exception { - String sql = "select count(*) from \"" + collectionName + "\";"; + private void waitNumberDocs(String collectionName, int expectedDocs, String workspace) throws Exception { + String sql = "select count(*) from " + workspace + "." + collectionName + ";"; int found = 0; while (found < expectedDocs) { try { @@ -468,4 +495,47 @@ private void waitNumberDocs(String collectionName, int expectedDocs) throws Exce Thread.sleep(1000); } } + + private void createWorkspaceIfNonExistent(String workspace) throws Exception { + ListWorkspacesResponse workspaces = testClient.workspaces.list(); + if (workspaces.getData().stream() + .map(Workspace::getName) + .noneMatch(ws -> ws.equals(workspace))) { + CreateWorkspaceRequest request = new CreateWorkspaceRequest().name(workspace); + testClient.workspaces.create(request); + } + } + + private void deleteAllJDBCTestHarnessWorkspaces() throws Exception { + List collections = testClient.collections.list().getData(); + + for (Collection collection : collections) { + if (collection.getWorkspace().startsWith(TEST_WORKSPACE_PREFIX)) { + DeleteCollectionResponse resp = retryOnApiException(() -> testClient.collections.delete(collection.getWorkspace(), collection.getName())); + System.out.println("Deleted collection " + resp.getData().getName()); + } + } + + Set workspaces = new HashSet<>(); + testClient.workspaces.list().getData().stream() + .map(Workspace::getName) + .filter(ws -> ws.startsWith(TEST_WORKSPACE_PREFIX)) + .forEach(workspaces::add); + + for (String ws : workspaces) { + DeleteWorkspaceResponse resp = retryOnApiException(() -> testClient.workspaces.delete(ws)); + System.out.println("Deleted workspace " + resp.getData().getName()); + // Collection deletion is async, wait for this to finish before workspace can be deleted + } + } + + @DataProvider(name = "schemas") + public Object[][] schemas() { + return new Object[][]{ + // commons is default + {"commons"}, + {testWorkspace}, + }; + + } }