Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hadoop catalog support #118

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM azul/zulu-openjdk:17 as builder
FROM azul/zulu-openjdk:17 AS builder

COPY . /app/
WORKDIR /app/
Expand All @@ -29,14 +29,10 @@ RUN \

COPY --from=builder --chown=iceberg:iceberg /app/build/libs/iceberg-rest-image-all.jar /usr/lib/iceberg-rest/iceberg-rest-image-all.jar

ENV CATALOG_CATALOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
ENV CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
ENV CATALOG_JDBC_USER=user
ENV CATALOG_JDBC_PASSWORD=password
ENV REST_PORT=8181

EXPOSE $REST_PORT
USER iceberg:iceberg
ENV LANG en_US.UTF-8
ENV LANG=en_US.UTF-8
WORKDIR /usr/lib/iceberg-rest
CMD ["java", "-jar", "iceberg-rest-image-all.jar"]
26 changes: 24 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,43 @@ repositories {
}

ext {
icebergVersion = '1.6.0'
icebergVersion = '1.6.1'
hadoopVersion = '3.3.6'
}

configurations {
iceberg_core_test { transitive = false }
}

dependencies {
iceberg_core_test "org.apache.iceberg:iceberg-core:${icebergVersion}:tests"
}

task unzipJar(type: Copy) {
from zipTree(configurations.iceberg_core_test.singleFile)
into ("$buildDir/libs/iceberg-tests")
include '**/*.class'
exclude '**/S3ABlockOutputStream.class'
dependsOn configurations.iceberg_core_test
}

dependencies {
implementation "org.apache.iceberg:iceberg-api:${icebergVersion}"
implementation "org.apache.iceberg:iceberg-core:${icebergVersion}"
implementation "org.apache.iceberg:iceberg-core:${icebergVersion}:tests"

implementation files("$buildDir/libs/iceberg-tests") {
builtBy "unzipJar"
}
// implementation "org.apache.iceberg:iceberg-core:${icebergVersion}:tests"
implementation "org.apache.iceberg:iceberg-aws:${icebergVersion}"
implementation "org.apache.iceberg:iceberg-azure:${icebergVersion}"
implementation "org.apache.iceberg:iceberg-gcp:${icebergVersion}"
implementation "org.apache.iceberg:iceberg-bundled-guava:${icebergVersion}"

implementation "org.apache.hadoop:hadoop-common:${hadoopVersion}"
implementation "org.apache.hadoop:hadoop-hdfs-client:${hadoopVersion}"
implementation "org.apache.hadoop:hadoop-main:${hadoopVersion}"
implementation "org.apache.hadoop:hadoop-aws:${hadoopVersion}"

runtimeOnly "org.apache.iceberg:iceberg-aws-bundle:${icebergVersion}"
runtimeOnly "org.apache.iceberg:iceberg-azure-bundle:${icebergVersion}"
Expand Down
59 changes: 37 additions & 22 deletions src/main/java/org/apache/iceberg/rest/RESTCatalogServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

package org.apache.iceberg.rest;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -34,13 +28,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

public class RESTCatalogServer {
private static final Logger LOG = LoggerFactory.getLogger(RESTCatalogServer.class);
private static final String CATALOG_ENV_PREFIX = "CATALOG_";
private static final String HADOOP_ENV_PREFIX = "CATALOG_HADOOP_";

private RESTCatalogServer() {}
private RESTCatalogServer() {
}

record CatalogContext(Catalog catalog, Map<String,String> configuration) { }
record CatalogContext(Catalog catalog, Map<String, String> configuration) {
}

private static CatalogContext backendCatalog() throws IOException {
// Translate environment variable to catalog properties
Expand All @@ -49,24 +53,21 @@ private static CatalogContext backendCatalog() throws IOException {
.filter(e -> e.getKey().startsWith(CATALOG_ENV_PREFIX))
.collect(
Collectors.toMap(
e ->
e.getKey()
.replaceFirst(CATALOG_ENV_PREFIX, "")
.replaceAll("__", "-")
.replaceAll("_", ".")
.toLowerCase(Locale.ROOT),
e -> envKeyToPropertyKey(e.getKey(), CATALOG_ENV_PREFIX),
Map.Entry::getValue,
(m1, m2) -> {
throw new IllegalArgumentException("Duplicate key: " + m1);
},
HashMap::new));

// Fallback to a JDBCCatalog impl if one is not set
catalogProperties.putIfAbsent(
CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.jdbc.JdbcCatalog");
catalogProperties.putIfAbsent(
CatalogProperties.URI, "jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory");
catalogProperties.putIfAbsent("jdbc.schema-version", "V1");
if (!catalogProperties.containsKey(CatalogProperties.CATALOG_IMPL)) {
catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.jdbc.JdbcCatalog");
catalogProperties.putIfAbsent(CatalogProperties.URI, "jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory");
catalogProperties.putIfAbsent("jdbc.user", "user");
catalogProperties.putIfAbsent("jdbc.password", "password");
catalogProperties.putIfAbsent("jdbc.schema-version", "V1");
}

// Configure a default location if one is not specified
String warehouseLocation = catalogProperties.get(CatalogProperties.WAREHOUSE_LOCATION);
Expand All @@ -80,8 +81,23 @@ private static CatalogContext backendCatalog() throws IOException {
LOG.info("No warehouse location set. Defaulting to temp location: {}", warehouseLocation);
}

Configuration hadoopConf = new Configuration();

System.getenv().entrySet().stream()
.filter(e -> e.getKey().startsWith(HADOOP_ENV_PREFIX))
.forEach(e -> hadoopConf.set(envKeyToPropertyKey(e.getKey(), HADOOP_ENV_PREFIX), e.getValue()));

LOG.info("Creating catalog with properties: {}", catalogProperties);
return new CatalogContext(CatalogUtil.buildIcebergCatalog("rest_backend", catalogProperties, new Configuration()), catalogProperties);

return new CatalogContext(CatalogUtil.buildIcebergCatalog("rest_backend", catalogProperties, hadoopConf), catalogProperties);
}

private static String envKeyToPropertyKey(String key, String prefix) {
return key
.replaceFirst(prefix, "")
.replace("__", "-")
.replace("_", ".")
.toLowerCase(Locale.ROOT);
}

public static void main(String[] args) throws Exception {
Expand All @@ -98,8 +114,7 @@ public static void main(String[] args) throws Exception {
context.setVirtualHosts(null);
context.setGzipHandler(new GzipHandler());

Server httpServer =
new Server(PropertyUtil.propertyAsInt(System.getenv(), "REST_PORT", 8181));
Server httpServer = new Server(PropertyUtil.propertyAsInt(System.getenv(), "REST_PORT", 8181));
httpServer.setHandler(context);

httpServer.start();
Expand Down