Skip to content

Commit

Permalink
[Feature][Zeta] Checkpoint support hdfs ha mode (#4942)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin authored Aug 14, 2023
1 parent 25cdb72 commit f188039
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 1 deletion.
22 changes: 22 additions & 0 deletions docs/en/seatunnel-engine/checkpoint-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,28 @@ seatunnel:
kerberosKeytab: your-kerberos-keytab
```

if HDFS is in HA mode , you can config like this:

```yaml
seatunnel:
engine:
checkpoint:
storage:
type: hdfs
max-retained: 3
plugin-config:
storage.type: hdfs
fs.defaultFS: hdfs://usdp-bing
seatunnel.hadoop.dfs.nameservices: usdp-bing
seatunnel.hadoop.dfs.ha.namenodes.usdp-bing: nn1,nn2
seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1: usdp-bing-nn1:8020
seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2: usdp-bing-nn2:8020
seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
```

if HDFS has some other configs in `hdfs-site.xml` or `core-site.xml` , just set HDFS config by using `seatunnel.hadoop.` prefix.

#### LocalFile

```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class HdfsConfiguration extends AbstractConfiguration {

private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";

private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";

@Override
public Configuration buildConfiguration(Map<String, String> config)
throws CheckpointStorageException {
Expand All @@ -69,7 +71,15 @@ public Configuration buildConfiguration(Map<String, String> config)
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
}
// todo support other hdfs optional config keys
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
.forEach(
entry -> {
String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
String value = entry.getValue();
hadoopConf.set(key, value);
});
return hadoopConf;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.seatunnel.engine.checkpoint.storage.hdfs;

import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;

import java.util.HashMap;
import java.util.Map;

@Disabled(
"HDFS is not available in CI, if you want to run this test, please set up your own HDFS environment")
public class HDFSFileCheckpointTest extends AbstractFileCheckPointTest {

@BeforeAll
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "hdfs");
config.put("fs.defaultFS", "hdfs://usdp-bing");
config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1", "usdp-bing-nn1:8020");
config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn2", "usdp-bing-nn2:8020");
config.put(
"seatunnel.hadoop.dfs.client.failover.proxy.provider.usdp-bing",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
STORAGE = new HdfsStorage(config);
initStorageData();
}
}

0 comments on commit f188039

Please sign in to comment.