diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/pom.xml index a4b3b6a187be..7ef3aa006524 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/pom.xml +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/pom.xml @@ -25,9 +25,8 @@ jar - - + org.apache.nifi nifi-record @@ -59,6 +58,10 @@ nifi-hadoop-utils 2.0.0-SNAPSHOT + + org.apache.nifi + nifi-dbcp-service-api + @@ -86,6 +89,21 @@ iceberg-orc ${iceberg.version} + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + org.apache.iceberg + iceberg-azure + ${iceberg.version} + + + org.apache.iceberg + iceberg-gcp + ${iceberg.version} + org.apache.hadoop hadoop-client diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java index 83f765df4a11..19da644afb0a 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergCatalogFactory.java @@ -19,16 +19,25 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.services.iceberg.IcebergCatalogProperty; import org.apache.nifi.services.iceberg.IcebergCatalogService; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.CATALOG_NAME; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.FILE_IO_IMPLEMENTATION; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.CLIENT_POOL_SERVICE; import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; @@ -44,6 +53,7 @@ public Catalog create() { return switch (catalogService.getCatalogType()) { case HIVE -> initHiveCatalog(catalogService); case HADOOP -> initHadoopCatalog(catalogService); + case JDBC -> initJdbcCatalog(catalogService); }; } @@ -55,15 +65,15 @@ private Catalog initHiveCatalog(IcebergCatalogService catalogService) { catalog.setConf(configuration); } - final Map catalogProperties = catalogService.getCatalogProperties(); - final Map properties = new HashMap<>(); + final Map catalogProperties = catalogService.getCatalogProperties(); + final Map properties = new HashMap<>(); if (catalogProperties.containsKey(METASTORE_URI)) { - properties.put(CatalogProperties.URI, catalogProperties.get(METASTORE_URI)); + properties.put(CatalogProperties.URI, (String) catalogProperties.get(METASTORE_URI)); } if (catalogProperties.containsKey(WAREHOUSE_LOCATION)) { - properties.put(CatalogProperties.WAREHOUSE_LOCATION, catalogProperties.get(WAREHOUSE_LOCATION)); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, (String) catalogProperties.get(WAREHOUSE_LOCATION)); } catalog.initialize("hive-catalog", properties); @@ -71,8 +81,8 @@ private Catalog initHiveCatalog(IcebergCatalogService catalogService) { } private Catalog initHadoopCatalog(IcebergCatalogService catalogService) { - final Map catalogProperties = catalogService.getCatalogProperties(); - final String warehousePath = catalogProperties.get(WAREHOUSE_LOCATION); + final Map catalogProperties = catalogService.getCatalogProperties(); + final String warehousePath = (String) catalogProperties.get(WAREHOUSE_LOCATION); if (catalogService.getConfigFilePaths() != null) { return new HadoopCatalog(getConfigurationFromFiles(catalogService.getConfigFilePaths()), warehousePath); @@ -80,4 +90,22 @@ private Catalog initHadoopCatalog(IcebergCatalogService catalogService) { return new HadoopCatalog(new Configuration(), warehousePath); } } + + private Catalog initJdbcCatalog(IcebergCatalogService catalogService) { + final Map catalogProperties = catalogService.getCatalogProperties(); + final Map properties = new HashMap<>(); + properties.put(CatalogProperties.URI, ""); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, (String) catalogProperties.get(WAREHOUSE_LOCATION)); + + final Configuration configuration = getConfigurationFromFiles(catalogService.getConfigFilePaths()); + final DBCPService dbcpService = (DBCPService) catalogProperties.get(CLIENT_POOL_SERVICE); + + final Function, JdbcClientPool> clientPoolBuilder = props -> new IcebergJdbcClientPool(props, dbcpService); + final Function, FileIO> ioBuilder = props -> CatalogUtil.loadFileIO((String) catalogProperties.get(FILE_IO_IMPLEMENTATION), props, configuration); + + JdbcCatalog catalog = new JdbcCatalog(ioBuilder, clientPoolBuilder, false); + catalog.setConf(configuration); + catalog.initialize((String) catalogProperties.get(CATALOG_NAME), properties); + return catalog; + } } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergJdbcClientPool.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergJdbcClientPool.java new file mode 100644 index 000000000000..1b3fa0f67359 --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/catalog/IcebergJdbcClientPool.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg.catalog; + +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.nifi.dbcp.DBCPService; + +import java.sql.Connection; +import java.util.Map; + +public class IcebergJdbcClientPool extends JdbcClientPool { + + private final DBCPService dbcpService; + + public IcebergJdbcClientPool(Map properties, DBCPService dbcpService) { + super("", properties); + this.dbcpService = dbcpService; + } + + @Override + protected Connection newClient() { + return dbcpService.getConnection(); + } + +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml index 1ccf8f2c8dd9..e16dd764b815 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/pom.xml @@ -318,6 +318,18 @@ nifi-kerberos-user-service-api test + + org.apache.nifi + nifi-iceberg-services + 2.0.0-SNAPSHOT + test + + + org.apache.nifi + nifi-dbcp-service + 2.0.0-SNAPSHOT + test + \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/AbstractTestPutIceberg.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/AbstractTestPutIceberg.java new file mode 100644 index 000000000000..cd1c5f37c8be --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/AbstractTestPutIceberg.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.NAMESPACE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.RECORD_READER_SERVICE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.createTemporaryDirectory; + +public class AbstractTestPutIceberg { + + protected static final String TABLE_NAME = "users"; + + protected static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME); + + protected static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "department", Types.StringType.get()) + ); + + protected TestRunner runner; + protected PutIceberg processor; + protected Catalog catalog; + protected String warehousePath; + protected static Schema inputSchema; + + protected void initRecordReader() throws InitializationException { + final MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema); + + for (RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField); + } + + readerFactory.addRecord(0, "John", "Finance"); + readerFactory.addRecord(1, "Jill", "Finance"); + readerFactory.addRecord(2, "James", "Marketing"); + readerFactory.addRecord(3, "Joana", "Sales"); + + runner.addControllerService(RECORD_READER_SERVICE, readerFactory); + runner.enableControllerService(readerFactory); + + runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_SERVICE); + } + + @BeforeAll + public static void initSchema() throws IOException { + final String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/user.avsc")), StandardCharsets.UTF_8); + inputSchema = new Schema.Parser().parse(avroSchema); + } + + @BeforeEach + public void setUp() { + warehousePath = createTemporaryDirectory().getAbsolutePath(); + processor = new PutIceberg(); + } + + @AfterEach + public void tearDown() { + catalog.dropTable(TABLE_IDENTIFIER); + } +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java index e3b11a487422..6bdd7dd5f7c9 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestDataFileActions.java @@ -17,25 +17,24 @@ */ package org.apache.nifi.processors.iceberg; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; -import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; import org.apache.nifi.processors.iceberg.writer.IcebergTaskWriterFactory; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -52,6 +51,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -64,6 +64,7 @@ import static org.apache.nifi.processors.iceberg.PutIceberg.MAXIMUM_COMMIT_WAIT_TIME; import static org.apache.nifi.processors.iceberg.PutIceberg.MINIMUM_COMMIT_WAIT_TIME; import static org.apache.nifi.processors.iceberg.PutIceberg.NUMBER_OF_COMMIT_RETRIES; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.createTemporaryDirectory; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.condition.OS.WINDOWS; import static org.mockito.Mockito.doThrow; @@ -196,10 +197,9 @@ public void testMaxCommitDurationExceeded() { verify(appender, times(2)).commit(); } - private Table initCatalog() throws IOException { - TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); - Catalog catalog = catalogFactory.create(); + private Table initCatalog() { + final File warehousePath = createTemporaryDirectory(); + final HadoopCatalog catalog = new HadoopCatalog(new Configuration(), warehousePath.getAbsolutePath()); return catalog.createTable(TABLE_IDENTIFIER, ABORT_SCHEMA, PartitionSpec.unpartitioned()); } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java index d6cf5bb91a25..f1ab1d398328 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergCustomValidation.java @@ -18,17 +18,17 @@ package org.apache.nifi.processors.iceberg; import org.apache.nifi.kerberos.KerberosUserService; -import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.services.iceberg.HiveCatalogService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.List; - +import static org.apache.nifi.services.iceberg.AbstractCatalogService.HADOOP_CONFIGURATION_RESOURCES; +import static org.apache.nifi.services.iceberg.HiveCatalogService.METASTORE_URI; +import static org.apache.nifi.services.iceberg.HiveCatalogService.WAREHOUSE_LOCATION; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -58,10 +58,13 @@ private void initRecordReader() throws InitializationException { runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_NAME); } - private void initCatalogService(List configFilePaths) throws InitializationException { - TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder().withConfigFilePaths(configFilePaths).build(); - + private void initCatalogService(String configFilePaths) throws InitializationException { + final HiveCatalogService catalogService = new HiveCatalogService(); runner.addControllerService(CATALOG_SERVICE_NAME, catalogService); + runner.setProperty(catalogService, METASTORE_URI, "test-metastore"); + runner.setProperty(catalogService, WAREHOUSE_LOCATION, "test-warehouse"); + runner.setProperty(catalogService, HADOOP_CONFIGURATION_RESOURCES, configFilePaths); + runner.enableControllerService(catalogService); runner.setProperty(PutIceberg.CATALOG, CATALOG_SERVICE_NAME); @@ -80,7 +83,7 @@ private void initKerberosUserService() throws InitializationException { @Test public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserService() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); + initCatalogService("src/test/resources/secured-core-site.xml"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -90,7 +93,7 @@ public void testCustomValidateWithKerberosSecurityConfigAndWithoutKerberosUserSe @Test public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/secured-core-site.xml")); + initCatalogService("src/test/resources/secured-core-site.xml"); initKerberosUserService(); @@ -102,7 +105,7 @@ public void testCustomValidateWithKerberosSecurityConfigAndKerberosUserService() @Test public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserService() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); + initCatalogService("src/test/resources/unsecured-core-site.xml"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -112,7 +115,7 @@ public void testCustomValidateWithoutKerberosSecurityConfigAndKerberosUserServic @Test public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserService() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); + initCatalogService("src/test/resources/unsecured-core-site.xml"); initKerberosUserService(); @@ -124,7 +127,7 @@ public void testCustomValidateWithoutKerberosSecurityConfigAndWithKerberosUserSe @Test public void testInvalidSnapshotSummaryDynamicProperty() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); + initCatalogService("src/test/resources/unsecured-core-site.xml"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); @@ -136,7 +139,7 @@ public void testInvalidSnapshotSummaryDynamicProperty() throws InitializationExc @Test public void testValidSnapshotSummaryDynamicProperty() throws InitializationException { initRecordReader(); - initCatalogService(Collections.singletonList("src/test/resources/unsecured-core-site.xml")); + initCatalogService("src/test/resources/unsecured-core-site.xml"); runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAMESPACE); runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java index 484dfc79ccfe..6d120ea2b558 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHadoopCatalog.java @@ -24,24 +24,25 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; -import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.iceberg.HadoopCatalogService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -54,20 +55,21 @@ import java.util.Map; import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.CATALOG_NAME; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.CATALOG_SERVICE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.NAMESPACE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.RECORD_READER_SERVICE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.createTemporaryDirectory; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; +import static org.apache.nifi.services.iceberg.HadoopCatalogService.WAREHOUSE_PATH; import static org.junit.jupiter.api.condition.OS.WINDOWS; public class TestPutIcebergWithHadoopCatalog { - private TestRunner runner; - private PutIceberg processor; - private Schema inputSchema; - private Catalog catalog; + protected static final String TABLE_NAME = "date_test"; - private static final Namespace NAMESPACE = Namespace.of("default"); - - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, "date"); + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME); private static final org.apache.iceberg.Schema DATE_SCHEMA = new org.apache.iceberg.Schema( Types.NestedField.required(1, "timeMicros", Types.TimeType.get()), @@ -75,17 +77,26 @@ public class TestPutIcebergWithHadoopCatalog { Types.NestedField.required(3, "date", Types.DateType.get()) ); - @BeforeEach - public void setUp() throws Exception { - String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8); + private TestRunner runner; + private PutIceberg processor; + private static Schema inputSchema; + private Catalog catalog; + + + @BeforeAll + public static void initSchema() throws IOException { + final String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/date.avsc")), StandardCharsets.UTF_8); inputSchema = new Schema.Parser().parse(avroSchema); + } + @BeforeEach + public void setUp() { processor = new PutIceberg(); } private void initRecordReader() throws InitializationException { - MockRecordParser readerFactory = new MockRecordParser(); - RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema); + final MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema); for (RecordField recordField : recordSchema.getFields()) { readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); @@ -96,27 +107,28 @@ private void initRecordReader() throws InitializationException { readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2016-01-02 15:30:30.800"), Date.valueOf("2016-01-02")); readerFactory.addRecord(Time.valueOf("15:30:30"), Timestamp.valueOf("2017-01-10 15:30:30.800"), Date.valueOf("2017-01-10")); - runner.addControllerService("mock-reader-factory", readerFactory); + runner.addControllerService(RECORD_READER_SERVICE, readerFactory); runner.enableControllerService(readerFactory); - runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); + runner.setProperty(PutIceberg.RECORD_READER, RECORD_READER_SERVICE); } - private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException, IOException { - TestHadoopCatalogService catalogService = new TestHadoopCatalogService(); - IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException { + final File warehousePath = createTemporaryDirectory(); + final HadoopCatalogService catalogService = new HadoopCatalogService(); + runner.addControllerService(CATALOG_SERVICE, catalogService); + runner.setProperty(catalogService, WAREHOUSE_PATH, warehousePath.getAbsolutePath()); + runner.enableControllerService(catalogService); + + final IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); catalog = catalogFactory.create(); - Map tableProperties = new HashMap<>(); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + final Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, tableProperties); - runner.addControllerService("catalog-service", catalogService); - runner.enableControllerService(catalogService); - - runner.setProperty(PutIceberg.CATALOG, "catalog-service"); + runner.setProperty(PutIceberg.CATALOG, CATALOG_SERVICE); } @DisabledOnOs(WINDOWS) @@ -129,16 +141,16 @@ public void onTriggerYearTransform() throws Exception { runner = TestRunners.newTestRunner(processor); initRecordReader(); initCatalog(spec, FileFormat.PARQUET); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); - runner.setProperty(PutIceberg.TABLE_NAME, "date"); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -149,23 +161,23 @@ public void onTriggerYearTransform() throws Exception { @DisabledOnOs(WINDOWS) @Test public void onTriggerMonthTransform() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) + final PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) .month("timestampMicros") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); initCatalog(spec, FileFormat.ORC); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); - runner.setProperty(PutIceberg.TABLE_NAME, "date"); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -177,23 +189,23 @@ public void onTriggerMonthTransform() throws Exception { @DisabledOnOs(WINDOWS) @Test public void onTriggerDayTransform() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) + final PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA) .day("timestampMicros") .build(); runner = TestRunners.newTestRunner(processor); initRecordReader(); initCatalog(spec, FileFormat.AVRO); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default"); - runner.setProperty(PutIceberg.TABLE_NAME, "date"); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); runner.setValidateExpressionUsage(false); runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); Assertions.assertTrue(table.spec().isPartitioned()); Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index d2b942cf58d7..21ff765c1fd0 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -17,41 +17,25 @@ */ package org.apache.nifi.processors.iceberg; -import org.apache.avro.Schema; -import org.apache.commons.io.IOUtils; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Types; -import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.hive.metastore.ThriftMetastore; import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; -import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.record.MockRecordParser; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.iceberg.HiveCatalogService; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -60,130 +44,43 @@ import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT; import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.CATALOG_NAME; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; +import static org.apache.nifi.services.iceberg.HiveCatalogService.METASTORE_URI; +import static org.apache.nifi.services.iceberg.HiveCatalogService.WAREHOUSE_LOCATION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.condition.OS.WINDOWS; @DisabledOnOs(WINDOWS) -public class TestPutIcebergWithHiveCatalog { - - private TestRunner runner; - private PutIceberg processor; - private Schema inputSchema; - private Catalog catalog; +public class TestPutIcebergWithHiveCatalog extends AbstractTestPutIceberg { @RegisterExtension public static ThriftMetastore metastore = new ThriftMetastore(); - private static final String CATALOG_NAME = "test_metastore"; - private static final String TABLE_NAME = "users"; - - private static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME); - - private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME); - - private static final org.apache.iceberg.Schema USER_SCHEMA = new org.apache.iceberg.Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "name", Types.StringType.get()), - Types.NestedField.required(3, "department", Types.StringType.get()) - ); - - @BeforeEach - public void setUp() throws Exception { - String avroSchema = IOUtils.toString(Files.newInputStream(Paths.get("src/test/resources/user.avsc")), StandardCharsets.UTF_8); - inputSchema = new Schema.Parser().parse(avroSchema); - - processor = new PutIceberg(); - } - - @AfterEach - public void tearDown() { - catalog.dropTable(TABLE_IDENTIFIER); - } - - private void initRecordReader() throws InitializationException { - MockRecordParser readerFactory = new MockRecordParser(); - RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema); - - for (RecordField recordField : recordSchema.getFields()) { - readerFactory.addSchemaField(recordField); - } - - readerFactory.addRecord(0, "John", "Finance"); - readerFactory.addRecord(1, "Jill", "Finance"); - readerFactory.addRecord(2, "James", "Marketing"); - readerFactory.addRecord(3, "Joana", "Sales"); - - runner.addControllerService("mock-reader-factory", readerFactory); - runner.enableControllerService(readerFactory); - - runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory"); - } - private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException { - Map tableProperties = new HashMap<>(); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + final Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); - TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder() - .withMetastoreUri(metastore.getThriftConnectionUri()) - .withWarehouseLocation(metastore.getWarehouseLocation()) - .build(); + final HiveCatalogService catalogService = new HiveCatalogService(); + runner.addControllerService("catalog-service", catalogService); + runner.setProperty(catalogService, METASTORE_URI, metastore.getThriftConnectionUri()); + runner.setProperty(catalogService, WAREHOUSE_LOCATION, warehousePath); + runner.enableControllerService(catalogService); - IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); + final IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService); catalog = catalogFactory.create(); catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); - runner.addControllerService("catalog-service", catalogService); - runner.enableControllerService(catalogService); - runner.setProperty(PutIceberg.CATALOG, "catalog-service"); } - @Test - public void onTriggerPartitioned() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) - .bucket("department", 3) - .build(); - - runner = TestRunners.newTestRunner(processor); - initRecordReader(); - initCatalog(spec, FileFormat.AVRO); - runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); - runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); - runner.setValidateExpressionUsage(false); - runner.enqueue(new byte[0]); - runner.run(); - - Table table = catalog.loadTable(TABLE_IDENTIFIER); - - List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) - .add(0, "John", "Finance") - .add(1, "Jill", "Finance") - .add(2, "James", "Marketing") - .add(3, "Joana", "Sales") - .build(); - - runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); - - String tableLocation = new URI(table.location()).getPath(); - assertTrue(table.spec().isPartitioned()); - assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); - validateData(table, expectedRecords, 0); - validateNumberOfDataFiles(tableLocation, 3); - validatePartitionFolders(tableLocation, Arrays.asList( - "department_bucket=0", "department_bucket=1", "department_bucket=2")); - assertProvenanceEvents(); - } - @Test public void onTriggerIdentityPartitioned() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) + final PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) .identity("department") .build(); @@ -196,9 +93,9 @@ public void onTriggerIdentityPartitioned() throws Exception { runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); - List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) + final List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") .add(1, "Jill", "Finance") .add(2, "James", "Marketing") @@ -206,9 +103,9 @@ public void onTriggerIdentityPartitioned() throws Exception { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); - String tableLocation = new URI(table.location()).getPath(); + final String tableLocation = new URI(table.location()).getPath(); assertTrue(table.spec().isPartitioned()); assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); validateData(table, expectedRecords, 0); @@ -220,7 +117,7 @@ public void onTriggerIdentityPartitioned() throws Exception { @Test public void onTriggerMultiLevelIdentityPartitioned() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) + final PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) .identity("name") .identity("department") .build(); @@ -234,9 +131,9 @@ public void onTriggerMultiLevelIdentityPartitioned() throws Exception { runner.enqueue(new byte[0]); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); - List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) + final List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") .add(1, "Jill", "Finance") .add(2, "James", "Marketing") @@ -244,9 +141,9 @@ public void onTriggerMultiLevelIdentityPartitioned() throws Exception { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); - String tableLocation = new URI(table.location()).getPath(); + final String tableLocation = new URI(table.location()).getPath(); assertTrue(table.spec().isPartitioned()); assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); validateData(table, expectedRecords, 0); @@ -269,16 +166,16 @@ public void onTriggerUnPartitioned() throws Exception { runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}"); runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}"); runner.setProperty("snapshot-property.additional-summary-property", "test summary property"); - Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("catalog.name", CATALOG_NAME); attributes.put("table.name", TABLE_NAME); attributes.put("max.filesize", "536870912"); // 512 MB runner.enqueue(new byte[0], attributes); runner.run(); - Table table = catalog.loadTable(TABLE_IDENTIFIER); + final Table table = catalog.loadTable(TABLE_IDENTIFIER); - List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) + final List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) .add(0, "John", "Finance") .add(1, "Jill", "Finance") .add(2, "James", "Marketing") @@ -286,7 +183,7 @@ public void onTriggerUnPartitioned() throws Exception { .build(); runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); assertTrue(table.spec().isUnpartitioned()); assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); @@ -305,7 +202,7 @@ private void assertProvenanceEvents() { } private void assertSnapshotSummaryProperties(Table table, Map summaryProperties) { - Map snapshotSummary = table.currentSnapshot().summary(); + final Map snapshotSummary = table.currentSnapshot().summary(); assertTrue(snapshotSummary.containsKey(ICEBERG_SNAPSHOT_SUMMARY_FLOWFILE_UUID)); diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithJdbcCatalog.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithJdbcCatalog.java new file mode 100644 index 000000000000..c03aff304b29 --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithJdbcCatalog.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.iceberg; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; +import org.apache.nifi.dbcp.DBCPConnectionPool; +import org.apache.nifi.processors.iceberg.catalog.IcebergJdbcClientPool; +import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.iceberg.JdbcCatalogService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD; +import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER; +import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.CATALOG_NAME; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.CATALOG_SERVICE; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.createTemporaryDirectory; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.getSystemTemporaryDirectory; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles; +import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; +import static org.apache.nifi.services.iceberg.JdbcCatalogService.CONNECTION_POOL; +import static org.apache.nifi.services.iceberg.JdbcCatalogService.WAREHOUSE_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.condition.OS.WINDOWS; + +@DisabledOnOs(WINDOWS) +public class TestPutIcebergWithJdbcCatalog extends AbstractTestPutIceberg { + + private static final String DERBY_LOG_PROPERTY = "derby.stream.error.file"; + private static final String CONNECTION_POOL_SERVICE = "connection-pool-service"; + + private static DBCPConnectionPool connectionPool; + + @BeforeAll + public static void initConnectionPool() { + setDerbyLog(); + connectionPool = new DBCPConnectionPool(); + } + + @AfterAll + public static void clearDerbyLog() { + System.clearProperty(DERBY_LOG_PROPERTY); + } + + private void initServices(PartitionSpec spec) throws InitializationException { + initDBCPService(); + initRecordReader(); + initCatalogService(); + createTestTable(spec); + } + + private void initCatalogService() throws InitializationException { + final JdbcCatalogService catalogService = new JdbcCatalogService(); + runner.addControllerService(CATALOG_SERVICE, catalogService); + runner.setProperty(catalogService, CONNECTION_POOL, CONNECTION_POOL_SERVICE); + runner.setProperty(catalogService, WAREHOUSE_PATH, warehousePath); + runner.enableControllerService(catalogService); + runner.setProperty(PutIceberg.CATALOG, CATALOG_SERVICE); + } + + private void initDBCPService() throws InitializationException { + final String url = String.format("jdbc:derby:%s;create=true", createTemporaryDirectory()); + runner.addControllerService(CONNECTION_POOL_SERVICE, connectionPool); + runner.setProperty(connectionPool, DATABASE_URL, url); + runner.setProperty(connectionPool, DB_USER, String.class.getSimpleName()); + runner.setProperty(connectionPool, DB_PASSWORD, String.class.getName()); + runner.setProperty(connectionPool, DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver"); + runner.enableControllerService(connectionPool); + } + + private void createTestTable(PartitionSpec spec) { + final Map properties = new HashMap<>(); + properties.put(CatalogProperties.URI, ""); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehousePath); + + final Function, JdbcClientPool> clientPoolBuilder = props -> new IcebergJdbcClientPool(props, connectionPool); + final Function, FileIO> ioBuilder = props -> CatalogUtil.loadFileIO("org.apache.iceberg.hadoop.HadoopFileIO", props, new Configuration()); + + catalog = new JdbcCatalog(ioBuilder, clientPoolBuilder, true); + catalog.initialize("jdbc-catalog", properties); + + final Map tableProperties = new HashMap<>(); + tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()); + + catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, tableProperties); + } + + public static void setDerbyLog() { + final File derbyLog = new File(getSystemTemporaryDirectory(), "derby.log"); + derbyLog.deleteOnExit(); + System.setProperty(DERBY_LOG_PROPERTY, derbyLog.getAbsolutePath()); + } + + @Test + public void onTriggerBucketPartitioned() throws Exception { + final PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA) + .bucket("department", 3) + .build(); + + runner = TestRunners.newTestRunner(processor); + runner.setValidateExpressionUsage(false); + initServices(spec); + runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME); + runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME); + runner.enqueue(new byte[0]); + runner.run(); + + final Table table = catalog.loadTable(TABLE_IDENTIFIER); + + final List expectedRecords = IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA) + .add(0, "John", "Finance") + .add(1, "Jill", "Finance") + .add(2, "James", "Marketing") + .add(3, "Joana", "Sales") + .build(); + + runner.assertTransferCount(PutIceberg.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutIceberg.REL_SUCCESS).getFirst(); + + final String tableLocation = new URI(table.location()).getPath(); + assertTrue(table.spec().isPartitioned()); + assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); + validateData(table, expectedRecords, 0); + validateNumberOfDataFiles(tableLocation, 3); + validatePartitionFolders(tableLocation, Arrays.asList( + "department_bucket=0", "department_bucket=1", "department_bucket=2")); + } +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java deleted file mode 100644 index 9673b894a3ad..000000000000 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHadoopCatalogService.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.iceberg.catalog; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.services.iceberg.IcebergCatalogProperty; -import org.apache.nifi.services.iceberg.IcebergCatalogService; -import org.apache.nifi.services.iceberg.IcebergCatalogType; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.nio.file.Files.createTempDirectory; - -public class TestHadoopCatalogService extends AbstractControllerService implements IcebergCatalogService { - - private final Map catalogProperties = new HashMap<>(); - - public TestHadoopCatalogService() throws IOException { - File warehouseLocation = createTempDirectory("metastore").toFile(); - catalogProperties.put(IcebergCatalogProperty.WAREHOUSE_LOCATION, warehouseLocation.getAbsolutePath()); - } - - @Override - public IcebergCatalogType getCatalogType() { - return IcebergCatalogType.HADOOP; - } - - @Override - public Map getCatalogProperties() { - return catalogProperties; - } - - @Override - public List getConfigFilePaths() { - return null; - } -} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java deleted file mode 100644 index 0cd7f042a842..000000000000 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/catalog/TestHiveCatalogService.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.iceberg.catalog; - -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.services.iceberg.IcebergCatalogProperty; -import org.apache.nifi.services.iceberg.IcebergCatalogService; -import org.apache.nifi.services.iceberg.IcebergCatalogType; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.METASTORE_URI; -import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; - -public class TestHiveCatalogService extends AbstractControllerService implements IcebergCatalogService { - - private final List configFilePaths; - private final Map catalogProperties; - - public TestHiveCatalogService(Map catalogProperties, List configFilePaths) { - this.catalogProperties = catalogProperties; - this.configFilePaths = configFilePaths; - } - - @Override - public IcebergCatalogType getCatalogType() { - return IcebergCatalogType.HIVE; - } - - @Override - public Map getCatalogProperties() { - return catalogProperties; - } - - @Override - public List getConfigFilePaths() { - return configFilePaths; - } - - public static class Builder { - private String metastoreUri; - private String warehouseLocation; - private List configFilePaths; - - public Builder withMetastoreUri(String metastoreUri) { - this.metastoreUri = metastoreUri; - return this; - } - - public Builder withWarehouseLocation(String warehouseLocation) { - this.warehouseLocation = warehouseLocation; - return this; - } - - public Builder withConfigFilePaths(List configFilePaths) { - this.configFilePaths = configFilePaths; - return this; - } - - public TestHiveCatalogService build() { - Map properties = new HashMap<>(); - - if (metastoreUri != null) { - properties.put(METASTORE_URI, metastoreUri); - } - - if (warehouseLocation != null) { - properties.put(WAREHOUSE_LOCATION, warehouseLocation); - } - - return new TestHiveCatalogService(properties, configFilePaths); - } - } -} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java index 6a248695202e..06918893acac 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/util/IcebergTestUtils.java @@ -21,11 +21,13 @@ import org.apache.commons.lang.Validate; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -34,12 +36,19 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class IcebergTestUtils { + public static final String RECORD_READER_SERVICE = "record-reader-service"; + public static final String CATALOG_SERVICE = "catalog-service"; + public static final String CATALOG_NAME = "iceberg_test"; + + public static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME); + /** * Validates whether the table contains the expected records. The results should be sorted by a unique key, so we do not end up with flaky tests. * @@ -99,6 +108,14 @@ public static void validatePartitionFolders(String tableLocation, List p } } + public static String getSystemTemporaryDirectory() { + return System.getProperty("java.io.tmpdir"); + } + + public static File createTemporaryDirectory() { + return Paths.get(getSystemTemporaryDirectory(), UUID.randomUUID().toString()).toFile(); + } + public static class RecordsBuilder { private final List records = new ArrayList<>(); diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java index f4c55c39c861..1e4987a0a5c5 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogProperty.java @@ -19,11 +19,20 @@ public enum IcebergCatalogProperty { + CATALOG_NAME, METASTORE_URI("hive.metastore.uris"), - WAREHOUSE_LOCATION("hive.metastore.warehouse.dir"); + WAREHOUSE_LOCATION("hive.metastore.warehouse.dir"), + CLIENT_POOL_SERVICE, + FILE_IO_IMPLEMENTATION; + + private static final String EMPTY_STRING = ""; private final String hadoopPropertyName; + IcebergCatalogProperty() { + this.hadoopPropertyName = EMPTY_STRING; + } + IcebergCatalogProperty(String hadoopPropertyName) { this.hadoopPropertyName = hadoopPropertyName; } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java index 56e595d2e994..065d5290eff1 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogService.java @@ -29,7 +29,7 @@ public interface IcebergCatalogService extends ControllerService { IcebergCatalogType getCatalogType(); - Map getCatalogProperties(); + Map getCatalogProperties(); List getConfigFilePaths(); } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java index 4b8640da1d00..8aad41049e33 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services-api/src/main/java/org/apache/nifi/services/iceberg/IcebergCatalogType.java @@ -19,5 +19,6 @@ public enum IcebergCatalogType { HIVE, - HADOOP + HADOOP, + JDBC } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml index e00410723cab..dbc4a5419d47 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/pom.xml @@ -31,6 +31,10 @@ 2.0.0-SNAPSHOT provided + + org.apache.nifi + nifi-dbcp-service-api + org.apache.nifi diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java index f658940087b0..7091143160ee 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/AbstractCatalogService.java @@ -23,6 +23,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider; import org.w3c.dom.Document; @@ -36,17 +37,16 @@ import java.util.List; import java.util.Map; - /** * Abstract class holding common properties and methods for Catalog Service implementations. */ public abstract class AbstractCatalogService extends AbstractControllerService implements IcebergCatalogService { - protected Map catalogProperties = new HashMap<>(); + protected Map catalogProperties = new HashMap<>(); protected List configFilePaths; - static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() .name("hadoop-config-resources") .displayName("Hadoop Configuration Resources") .description("A file, or comma separated list of files, which contain the Hadoop configuration (core-site.xml, etc.). Without this, default configuration will be used.") @@ -56,6 +56,15 @@ public abstract class AbstractCatalogService extends AbstractControllerService i .dynamicallyModifiesClasspath(true) .build(); + public static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder() + .name("warehouse-path") + .displayName("Warehouse Path") + .description("Path to the location of the warehouse.") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + protected List parseConfigFilePaths(String configFilePaths) { List documentList = new ArrayList<>(); for (final String configFile : createFilePathList(configFilePaths)) { @@ -82,7 +91,7 @@ protected List createFilePathList(String configFilePaths) { } @Override - public Map getCatalogProperties() { + public Map getCatalogProperties() { return catalogProperties; } diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/FileIOImplementation.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/FileIOImplementation.java new file mode 100644 index 000000000000..45c3d3cfaa78 --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/FileIOImplementation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg; + +import org.apache.nifi.components.DescribedValue; + +import java.util.HashMap; +import java.util.Map; + +public enum FileIOImplementation implements DescribedValue { + HADOOP( "org.apache.iceberg.hadoop.HadoopFileIO", "Hadoop File IO"), + RESOLVING("org.apache.iceberg.io.ResolvingFileIO", "Resolving File IO"), + S3( "org.apache.iceberg.aws.s3.S3FileIO", "S3 File IO"), + GCS( "org.apache.iceberg.gcp.gcs.GCSFileIO", "GCS File IO"), + ADLS( "org.apache.iceberg.azure.adlsv2.ADLSFileIO", "ADLS File IO"); + + private static final Map ENUM_MAP = new HashMap<>(); + + static { + for (FileIOImplementation strategy : FileIOImplementation.values()) { + ENUM_MAP.put(strategy.getValue(), strategy); + } + } + + private final String value; + private final String displayName; + private final String description; + + FileIOImplementation(String value, String displayName) { + this(value, displayName, null); + } + + FileIOImplementation(String value, String displayName, String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java index 26853ffc80ba..42b364d23ff8 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HadoopCatalogService.java @@ -22,11 +22,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; @@ -35,19 +31,7 @@ @CapabilityDescription("Catalog service that can use HDFS or similar file systems that support atomic rename.") public class HadoopCatalogService extends AbstractCatalogService { - static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder() - .name("warehouse-path") - .displayName("Warehouse Path") - .description("Path to the location of the warehouse.") - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .build(); - - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( - WAREHOUSE_PATH, - HADOOP_CONFIGURATION_RESOURCES - )); + private static final List PROPERTIES = List.of(WAREHOUSE_PATH, HADOOP_CONFIGURATION_RESOURCES); @Override protected List getSupportedPropertyDescriptors() { diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java index 2fcd333af004..c421c8c3b212 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/HiveCatalogService.java @@ -30,16 +30,14 @@ import org.w3c.dom.NodeList; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; @Tags({"iceberg", "catalog", "service", "metastore", "hive"}) @CapabilityDescription("Catalog service that connects to a Hive metastore to keep track of Iceberg tables.") public class HiveCatalogService extends AbstractCatalogService { - static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() + public static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() .name("hive-metastore-uri") .displayName("Hive Metastore URI") .description("The URI location(s) for the Hive metastore; note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.") @@ -47,7 +45,7 @@ public class HiveCatalogService extends AbstractCatalogService { .addValidator(StandardValidators.URI_LIST_VALIDATOR) .build(); - static final PropertyDescriptor WAREHOUSE_LOCATION = new PropertyDescriptor.Builder() + public static final PropertyDescriptor WAREHOUSE_LOCATION = new PropertyDescriptor.Builder() .name("warehouse-location") .displayName("Default Warehouse Location") .description("Location of default database for the warehouse. This field sets or overrides the 'hive.metastore.warehouse.dir' configuration property.") @@ -55,11 +53,7 @@ public class HiveCatalogService extends AbstractCatalogService { .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( - METASTORE_URI, - WAREHOUSE_LOCATION, - HADOOP_CONFIGURATION_RESOURCES - )); + private static final List PROPERTIES = List.of(METASTORE_URI, WAREHOUSE_LOCATION, HADOOP_CONFIGURATION_RESOURCES); @Override protected List getSupportedPropertyDescriptors() { diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/JdbcCatalogService.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/JdbcCatalogService.java new file mode 100644 index 000000000000..9a596aa06f53 --- /dev/null +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/java/org/apache/nifi/services/iceberg/JdbcCatalogService.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.List; + +import static org.apache.nifi.services.iceberg.FileIOImplementation.HADOOP; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.FILE_IO_IMPLEMENTATION; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.CLIENT_POOL_SERVICE; +import static org.apache.nifi.services.iceberg.IcebergCatalogProperty.WAREHOUSE_LOCATION; + +@Tags({"iceberg", "catalog", "service", "jdbc"}) +@CapabilityDescription("Catalog service using relational database to manage Iceberg tables through JDBC.") +public class JdbcCatalogService extends AbstractCatalogService { + + public static final PropertyDescriptor CATALOG_NAME = new PropertyDescriptor.Builder() + .name("Catalog Name") + .description("Name of the Iceberg catalog.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("jdbc-catalog") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() + .name("Connection Pool") + .description("Specifies the JDBC Connection Pool to use in order to communicate with the Iceberg catalog.") + .identifiesControllerService(DBCPService.class) + .required(true) + .build(); + + public static final PropertyDescriptor FILE_IO_IMPL = new PropertyDescriptor.Builder() + .name("File IO Implementation") + .description("Specifies the implementation of FileIO interface to be used. " + + "The provided implementation have to include the class and full package name.") + .required(true) + .defaultValue(HADOOP.getValue()) + .allowableValues(FileIOImplementation.class) + .build(); + + private static final List PROPERTIES = List.of( + CATALOG_NAME, CONNECTION_POOL, FILE_IO_IMPL, WAREHOUSE_PATH, HADOOP_CONFIGURATION_RESOURCES); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + if (context.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet()) { + configFilePaths = createFilePathList(context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue()); + } + + catalogProperties.put(IcebergCatalogProperty.CATALOG_NAME, context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue()); + catalogProperties.put(CLIENT_POOL_SERVICE, context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)); + catalogProperties.put(FILE_IO_IMPLEMENTATION, context.getProperty(FILE_IO_IMPL).evaluateAttributeExpressions().getValue()); + catalogProperties.put(WAREHOUSE_LOCATION, context.getProperty(WAREHOUSE_PATH).evaluateAttributeExpressions().getValue()); + } + + @Override + public IcebergCatalogType getCatalogType() { + return IcebergCatalogType.JDBC; + } + +} diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 4c042964b2e9..44a8e957cd34 100755 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -14,4 +14,5 @@ # limitations under the License. org.apache.nifi.services.iceberg.HiveCatalogService -org.apache.nifi.services.iceberg.HadoopCatalogService \ No newline at end of file +org.apache.nifi.services.iceberg.HadoopCatalogService +org.apache.nifi.services.iceberg.JdbcCatalogService \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java index 7d4636ecd81a..3d2908b3d1f7 100644 --- a/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java +++ b/nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-test-utils/src/main/java/org/apache/nifi/hive/metastore/MetastoreCore.java @@ -73,7 +73,7 @@ */ class MetastoreCore { - private final String DATABASE_NAME = "test_metastore"; + private final String DATABASE_NAME = "iceberg_test"; private String thriftConnectionUri; private Configuration hiveConf;