Skip to content

Commit

Permalink
8
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 22, 2025
1 parent d5c0feb commit 01bd487
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class ConnectionProperties {

protected ConnectionProperties(Map<String, String> origProps) {
this.origProps = origProps;
normalizedAndCheckProps();
}

protected void normalizedAndCheckProps() {
Expand Down Expand Up @@ -65,7 +64,7 @@ protected void normalizedAndCheckProps() {
// Subclass can override this method to load properties from file.
// The return value is the properties loaded from file, not include original properties
protected Map<String, String> loadConfigFromFile(String resourceConfig) {
if (Strings.isNullOrEmpty(resourceConfig)) {
if (Strings.isNullOrEmpty(origProps.get(resourceConfig))) {
return Maps.newHashMap();
}
Configuration conf = ConfigurationUtils.loadConfigurationFromHadoopConfDir(resourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.doris.datasource.property;

import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;

@Retention(RetentionPolicy.RUNTIME)
public @interface ConnectorProperty {
String[] names() default {};
String description() default "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class PropertyUtils {
public static List<Field> getConnectorProperties(Class<?> clazz) {
List<Field> fields = Lists.newArrayList();
for (Field field : clazz.getDeclaredFields()) {
field.setAccessible(true);
if (field.isAnnotationPresent(ConnectorProperty.class)) {
// Get annotation of the field
ConnectorProperty connectorProperty = field.getAnnotation(ConnectorProperty.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.paimon.options.Options;

import java.util.Map;
Expand All @@ -30,7 +29,7 @@ public class AliyunDLFProperties extends MetastoreProperties {

@ConnectorProperty(names = {"dlf.access_key", "dlf.catalog.accessKeyId"},
description = "The access key of the Aliyun DLF.")
private String dlfAccessKey = "";
public String dlfAccessKey = "";

@ConnectorProperty(names = {"dlf.secret_key", "dlf.catalog.accessKeySecret"},
description = "The secret key of the Aliyun DLF.")
Expand All @@ -50,6 +49,7 @@ public class AliyunDLFProperties extends MetastoreProperties {
private String dlfUid = "";

@ConnectorProperty(names = {"dlf.access.public", "dlf.catalog.accessPublic"},
required = false,
description = "Enable public access to Aliyun DLF.")
private String dlfAccessPublic = "false";

Expand Down Expand Up @@ -99,37 +99,6 @@ private String getEndpointOrFromRegion(String endpoint, String region, String dl

@Override
protected String getResourceConfigPropName() {
return "dlf.resouce_config";
}

@Getter
public static class OSSConfiguration {
private Map<String, String> conf = Maps.newHashMap();

public OSSConfiguration(AliyunDLFProperties props) {
conf.put("oss.region", getOssRegionFromDlfRegion(props.dlfRegion));
conf.put("oss.endpoint", getOssEndpointFromDlfRegion(props.dlfRegion, props.dlfAccessPublic));
conf.put("oss.access_key", props.dlfAccessKey);
conf.put("oss.secret_key", props.dlfSecretKey);
}

private String getOssRegionFromDlfRegion(String dlfRegion) {
return "oss-" + dlfRegion;
}

private String getOssEndpointFromDlfRegion(String dlfRegion, String dlfAccessPublic) {
if ("true".equalsIgnoreCase(dlfAccessPublic)) {
return "oss-" + dlfRegion + ".aliyuncs.com";
} else {
return "oss-" + dlfRegion + "-internal.aliyuncs.com";
}
}

private String getEndpointOrFromRegion(String endpoint, String region) {
if (!Strings.isNullOrEmpty(endpoint)) {
return endpoint;
}
return "dlf-vpc." + region + ".aliyuncs.com";
}
return "dlf.resource_config";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,71 +52,101 @@ public static MetastoreProperties create(Map<String, String> origProps) {

// 'type'='hms',
// "paimon.catalog.type" = "hms",dlf, filesystem
Type msType = Type.UNKNOWN;
if (origProps.containsKey(METASTORE_TYPE)) {
String type = origProps.get(METASTORE_TYPE);
switch (type) {
case "hms":
return new HMSProperties(origProps);
msType = Type.HMS;
case "glue":
return new AWSGlueProperties(origProps);
msType = Type.GLUE;
case "dlf":
return new AliyunDLFProperties(origProps);
msType = Type.DLF;
case "rest":
return new IcebergRestProperties(origProps);
msType = Type.ICEBERG_REST;
case "dataproc":
return new DataProcProperties(origProps);
msType = Type.DATAPROC;
case "filesystem":
return new FileMetastoreProperties(origProps);
msType = Type.FILE_SYSTEM;
default:
throw new IllegalArgumentException("Unknown metastore type: " + type);
throw new IllegalArgumentException("Unknown 'metastore.type': " + type);
}
} else if (origProps.containsKey("hive.metastore.type")) {
String type = origProps.get("hive.metastore.type");
switch (type) {
case "hms":
return new HMSProperties(origProps);
msType = Type.HMS;
case "glue":
return new AWSGlueProperties(origProps);
msType = Type.GLUE;
case "dlf":
return new AliyunDLFProperties(origProps);
msType = Type.DLF;
default:
throw new IllegalArgumentException("Unknown metastore type: " + type);
throw new IllegalArgumentException("Unknown 'hive.metastore.type': " + type);
}
} else if (origProps.containsKey("iceberg.catalog.type")) {
String type = origProps.get("iceberg.catalog.type");
switch (type) {
case "hms":
return new HMSProperties(origProps);
msType = Type.HMS;
case "glue":
return new AWSGlueProperties(origProps);
msType = Type.GLUE;
case "rest":
return new IcebergRestProperties(origProps);
msType = Type.ICEBERG_REST;
case "hadoop":
return new FileMetastoreProperties(origProps);
msType = Type.FILE_SYSTEM;
default:
throw new IllegalArgumentException("Unknown iceberg catalog type: " + type);
throw new IllegalArgumentException("Unknown 'iceberg.catalog.type': " + type);
}
} else if (origProps.containsKey("paimon.catalog.type")) {
String type = origProps.get("paimon.catalog.type");
switch (type) {
case "hms":
return new HMSProperties(origProps);
msType = Type.HMS;
case "dlf":
return new AliyunDLFProperties(origProps);
msType = Type.DLF;
default:
// default is "filesystem"
return new FileMetastoreProperties(origProps);
msType = Type.FILE_SYSTEM;
}
} else if (origProps.containsKey("type")) {
String type = origProps.get("type");
switch (type) {
case "hms":
return new HMSProperties(origProps);
msType = Type.HMS;
default:
throw new IllegalArgumentException("Unknown metastore type: " + type);
throw new IllegalArgumentException("Unknown metastore 'type': " + type);
}
}
throw new IllegalArgumentException("Can not find metastore type in properties");

return MetastoreProperties.create(msType, origProps);
}

public static MetastoreProperties create(Type type, Map<String, String> origProps) {
MetastoreProperties metastoreProperties;
switch (type) {
case HMS:
metastoreProperties = new HMSProperties(origProps);
break;
case GLUE:
metastoreProperties = new AWSGlueProperties(origProps);
break;
case DLF:
metastoreProperties = new AliyunDLFProperties(origProps);
break;
case ICEBERG_REST:
metastoreProperties = new IcebergRestProperties(origProps);
break;
case DATAPROC:
metastoreProperties = new DataProcProperties(origProps);
break;
case FILE_SYSTEM:
metastoreProperties = new FileMetastoreProperties(origProps);
break;
default:
throw new IllegalArgumentException("Unknown metastore type: " + type);
}
metastoreProperties.normalizedAndCheckProps();
return metastoreProperties;
}

protected MetastoreProperties(Type type, Map<String, String> origProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ public static List<StorageProperties> create(Map<String, String> origProps) {
throw new RuntimeException("Unsupported native AZURE filesystem");
}

throw new RuntimeException("Unknown storage type");
if (storageProperties.isEmpty()) {
throw new RuntimeException("Unknown storage type");
} else {
for (StorageProperties storageProperty : storageProperties) {
storageProperty.normalizedAndCheckProps();
}
}
return storageProperties;
}

protected StorageProperties(Type type, Map<String, String> origProps) {
Expand All @@ -88,6 +95,7 @@ private static boolean isFsSupport(Map<String, String> origProps, String fsEnabl

protected static boolean checkIdentifierKey(Map<String, String> origProps, List<Field> fields) {
for (Field field : fields) {
field.setAccessible(true);
ConnectorProperty annotation = field.getAnnotation(ConnectorProperty.class);
for (String key : annotation.names()) {
if (origProps.containsKey(key)) {
Expand Down
Loading

0 comments on commit 01bd487

Please sign in to comment.