Skip to content

Commit

Permalink
[improvement](fe and broker) support specify broker to getSplits, che…
Browse files Browse the repository at this point in the history
…ck isSplitable, file scan for HMS Multi-catalog (#24830)

I want to use Doris Multi-catalog to accelerate HMS query. My organization has custom distributed file system, and we think wrapping the fs access difference into broker (listLocatedFiles, openReader..) would be a elegant approach.

This pr introduce HMS catalog conf `bind.broker.name`. If we set this conf, file split, query scan operation will send to broker.

usage:
create a hms catalog with broker usage
```
CREATE CATALOG hive_catalog_broker PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://xxx',
    'broker.name' = 'hdfs_broker'
);
```
When we try to query from this catalog, file split and query scan request will send to broker `hdfs_broker`.

More details about this pr:
1. Introduce HMS catalog proporty `bind.broker.name` to specify broker name to do remote path work. When `broker.name` is set, `enable.self.splitter` must be `true` to ensure file splitting process is executed in Fe
2. Introduce 2 more interfaces to broker service:
- `TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request)`, helps to invoke input format `isSplitable` interface.
- `TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request)`, helps to do `listFiles` or `listLocatedStatus` for remote file system
3. 3 parts of whole processing will be executed in broker:
- Check whether the path with specified input format name `isSplittable`
- `listLocatedFiles` of table / partition locations.
- `OpenReader` for specified file splits.

Co-authored-by: chenlinzhong <[email protected]>
  • Loading branch information
WinkerDu and chenlinzhong authored Oct 13, 2023
1 parent ed67d5a commit aa0b74d
Show file tree
Hide file tree
Showing 17 changed files with 447 additions and 46 deletions.
8 changes: 8 additions & 0 deletions docs/en/docs/lakehouse/multi-catalog/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,14 @@ If the variable `truncate_char_or_varchar_columns` is enabled, when the maximum

The variable default is false.

## Access HMS with broker

Add following setting when creating an HMS catalog, file splitting and scanning for Hive external table will be completed by broker named `test_broker`

```sql
"broker.name" = "test_broker"
```

## Integrate with Apache Ranger

Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data security access management on the Hadoop platform.
Expand Down
8 changes: 8 additions & 0 deletions docs/zh-CN/docs/lakehouse/multi-catalog/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ CREATE CATALOG hive PROPERTIES (

该变量默认为 false。

## 使用 broker 访问 HMS

创建 HMS Catalog 时增加如下配置,Hive 外表文件分片和文件扫描将会由名为 `test_broker` 的 broker 完成

```sql
"broker.name" = "test_broker"
```

## 使用 Ranger 进行权限校验

Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,14 @@ public boolean useSelfSplitter() {
return ret;
}

public String bindBrokerName() {
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
}
return null;
}

@Override
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
makeSureInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
private long lastSyncedEventId = -1L;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
// broker name for file split and query scan.
public static final String BIND_BROKER_NAME = "broker.name";
private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";

// -1 means file cache no ttl set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,13 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
// Get File Status by using FileSystem API.
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
JobConf jobConf,
List<String> partitionValues) throws UserException {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
Expand Down Expand Up @@ -429,7 +431,8 @@ private FileCacheValue loadFiles(FileCacheKey key) {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues());
result = getFileCache(finalLocation, inputFormat, jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
Expand Down Expand Up @@ -508,23 +511,23 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
}

public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, true);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName);
}

public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, false);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName);
}

private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean useSelfSplitter, boolean withCache) {
boolean useSelfSplitter, boolean withCache, String bindBrokerName) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), useSelfSplitter)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
p.getInputFormat(), useSelfSplitter, bindBrokerName)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName);
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
return fileCacheKey;
}).collect(Collectors.toList());
Expand Down Expand Up @@ -602,7 +605,7 @@ public void invalidateTableCache(String dbName, String tblName) {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
Expand All @@ -620,7 +623,7 @@ public void invalidateTableCache(String dbName, String tblName) {
* and FE will exit if some network problems occur.
* */
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
dbName, tblName, null, null, false);
dbName, tblName, null, null, false, null);
fileCacheRef.get().invalidate(fileCacheKey);
}
}
Expand All @@ -635,7 +638,7 @@ public void invalidatePartitionCache(String dbName, String tblName, String parti
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
Expand Down Expand Up @@ -781,7 +784,7 @@ public AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> getFileCacheR
}

public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, long tableId) {
boolean isFullAcid, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
try {
Expand Down Expand Up @@ -812,7 +815,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), jobConf, bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
Expand All @@ -833,7 +837,9 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
Expand All @@ -851,7 +857,9 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
Expand Down Expand Up @@ -949,6 +957,8 @@ public static class FileCacheKey {
private String location;
// not in key
private String inputFormat;
// Broker name for file split and file scan.
private String bindBrokerName;
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
Expand All @@ -957,16 +967,18 @@ public static class FileCacheKey {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;

public FileCacheKey(String location, String inputFormat, List<String> partitionValues) {
public FileCacheKey(String location, String inputFormat, List<String> partitionValues, String bindBrokerName) {
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
this.useSelfSplitter = true;
this.bindBrokerName = bindBrokerName;
}

public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
String inputFormat, boolean useSelfSplitter) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null);
String inputFormat, boolean useSelfSplitter,
String bindBrokerName) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
fileCacheKey.dummyKey = dbName + "." + tblName;
fileCacheKey.useSelfSplitter = useSelfSplitter;
return fileCacheKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;

import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -184,12 +186,17 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
}
}

public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path path, JobConf jobConf) {
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat,
String location, JobConf jobConf) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem)
.isSplittable(location, inputFormat.getClass().getCanonicalName());
}

// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}

// use reflection to get isSplitable method on FileInputFormat
// ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable"
Method method = null;
Expand All @@ -205,6 +212,7 @@ public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path path, Job
if (method == null) {
return false;
}
Path path = new Path(location);
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path);
Expand Down
17 changes: 13 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public RemoteFileSystem load(FileSystemCacheKey key) {
}

private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
return FileSystemFactory.getByType(key.type, key.conf);
return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName);
}

public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
Expand All @@ -69,11 +69,13 @@ public static class FileSystemCacheKey {
// eg: hdfs://nameservices1
private final String fsIdent;
private final JobConf conf;
private final String bindBrokerName;

public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf) {
public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf, String bindBrokerName) {
this.type = fs.first;
this.fsIdent = fs.second;
this.conf = conf;
this.bindBrokerName = bindBrokerName;
}

@Override
Expand All @@ -84,14 +86,21 @@ public boolean equals(Object obj) {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
return type.equals(((FileSystemCacheKey) obj).type)
boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& conf == ((FileSystemCacheKey) obj).conf;
if (bindBrokerName == null) {
return equalsWithoutBroker;
}
return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
}

@Override
public int hashCode() {
return Objects.hash(conf, fsIdent, type);
if (bindBrokerName == null) {
return Objects.hash(conf, fsIdent, type);
}
return Objects.hash(conf, fsIdent, type, bindBrokerName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type,
}
}

public static Pair<FileSystemType, String> getFSIdentity(String location) {
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
FileSystemType fsType;
if (S3Util.isObjStorage(location)) {
if (bindBrokerName != null) {
fsType = FileSystemType.BROKER;
} else if (S3Util.isObjStorage(location)) {
if (S3Util.isHdfsOnOssEndpoint(location)) {
// if hdfs service is enabled on oss, use hdfs lib to access oss.
fsType = FileSystemType.DFS;
Expand All @@ -83,7 +85,8 @@ public static Pair<FileSystemType, String> getFSIdentity(String location) {
return Pair.of(fsType, fsIdent);
}

public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) {
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf,
String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
switch (type) {
Expand All @@ -95,6 +98,8 @@ public static RemoteFileSystem getByType(FileSystemType type, Configuration conf
return new OFSFileSystem(properties);
case JFS:
return new JFSFileSystem(properties);
case BROKER:
return new BrokerFileSystem(bindBrokerName, properties);
default:
throw new IllegalStateException("Not supported file system type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum FileSystemType {
DFS,
OFS,
JFS,
BROKER,
FILE
}
Loading

0 comments on commit aa0b74d

Please sign in to comment.