Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add experimental remote HDFS support for native DataFusion reader #1359

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
$(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
endef

# Build optional Comet native features (like hdfs e.g)
FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)')

all: core jvm

core:
Expand Down Expand Up @@ -95,7 +98,7 @@ release-linux: clean
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" && RUSTFLAGS=$$RUSTFLAGS cargo build --release $(FEATURES_ARG)
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
Expand Down
76 changes: 76 additions & 0 deletions docs/source/user-guide/datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,79 @@ converted into Arrow format, allowing native execution to happen after that.

Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
converted into Arrow format, allowing native execution to happen after that.

# Supported Storages

## Local
In progress

## HDFS

Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources)

### Using experimental native DataFusion reader
Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only

To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed

Example:
Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME`
Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK

```shell
export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
```

Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled)
and add additional parameters

```shell
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
--conf dfs.client.use.datanode.hostname = true
```

Query a struct type from Remote HDFS
```shell
spark.read.parquet("hdfs://namenode:9000/user/data").show(false)

root
|-- id: integer (nullable = true)
|-- first_name: string (nullable = true)
|-- personal_info: struct (nullable = true)
| |-- firstName: string (nullable = true)
| |-- lastName: string (nullable = true)
| |-- ageInYears: integer (nullable = true)

25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized
== Physical Plan ==
* CometColumnarToRow (2)
+- CometNativeScan: (1)


(1) CometNativeScan:
Output [3]: [id#0, first_name#1, personal_info#4]
Arguments: [id#0, first_name#1, personal_info#4]

(2) CometColumnarToRow [codegen id : 1]
Input [3]: [id#0, first_name#1, personal_info#4]


25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
+---+----------+-----------------+
|id |first_name|personal_info |
+---+----------+-----------------+
|2 |Jane |{Jane, Smith, 34}|
|1 |John |{John, Doe, 28} |
+---+----------+-----------------+



```

Verify the native scan type should be `CometNativeScan`.

## S3
In progress
120 changes: 119 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ datafusion-comet-proto = { workspace = true }
object_store = { workspace = true }
url = { workspace = true }
chrono = { workspace = true }
datafusion-objectstore-hdfs = { git = "https://github.com/comphead/datafusion-objectstore-hdfs", branch = "master", optional = true }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove I'm keeping the updated HDFS object storage in personal repo for now, let me know if there any concerns


[dev-dependencies]
pprof = { version = "0.14.0", features = ["flamegraph"] }
Expand All @@ -88,6 +89,7 @@ hex = "0.4.3"

[features]
default = []
hdfs = ["datafusion-objectstore-hdfs"]

[lib]
name = "comet"
Expand Down
13 changes: 5 additions & 8 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio

use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
Expand Down Expand Up @@ -1155,12 +1155,9 @@ impl PhysicalPlanner {
))
});

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
// By default, local FS object store registered
// if `hdfs` feature enabled then HDFS file object store registered
register_object_store(Arc::clone(&self.session_ctx))?;

// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
Expand Down Expand Up @@ -1220,7 +1217,7 @@ impl PhysicalPlanner {
// TODO: I think we can remove partition_count in the future, but leave for testing.
assert_eq!(file_groups.len(), partition_count);

let object_store_url = ObjectStoreUrl::local_filesystem();
let object_store_url = ObjectStoreUrl::parse("hdfs://namenode:9000").unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be addressed in #1360

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The url should be available as part of the file path passed in. (see line 1178 above)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @parthchandra it is already fixed.

let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
Expand Down
Loading
Loading