Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed Jan 31, 2025
1 parent fc11163 commit ce91967
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
5 changes: 1 addition & 4 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
JoinType as DFJoinType, ScalarValue,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression;
use datafusion_expr::{
AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
Expand Down Expand Up @@ -1157,7 +1156,7 @@ impl PhysicalPlanner {

// By default, local FS object store registered
// if `hdfs` feature enabled then HDFS file object store registered
register_object_store(Arc::clone(&self.session_ctx))?;
let object_store_url = register_object_store(Arc::clone(&self.session_ctx))?;

// Generate file groups
let mut file_groups: Vec<Vec<PartitionedFile>> =
Expand Down Expand Up @@ -1216,8 +1215,6 @@ impl PhysicalPlanner {

// TODO: I think we can remove partition_count in the future, but leave for testing.
assert_eq!(file_groups.len(), partition_count);

let object_store_url = ObjectStoreUrl::parse("hdfs://namenode:9000").unwrap();
let partition_fields: Vec<Field> = partition_schema
.fields()
.iter()
Expand Down
23 changes: 11 additions & 12 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::ExecutionError;
use arrow::{
array::{
cast::AsArray,
Expand All @@ -35,13 +36,12 @@ use arrow_array::builder::StringBuilder;
use arrow_array::{DictionaryArray, StringArray, StructArray};
use arrow_schema::DataType;
use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::prelude::SessionContext;
use datafusion_comet_spark_expr::utils::array_with_timezone;
use datafusion_comet_spark_expr::{timezone, EvalMode, SparkError, SparkResult};
use datafusion_common::{cast::as_generic_string_array, Result as DataFusionResult, ScalarValue};
use datafusion_expr::ColumnarValue;
// use datafusion_physical_expr::PhysicalExpr;
use crate::execution::operators::ExecutionError;
use datafusion::prelude::SessionContext;
use num::{
cast::AsPrimitive, integer::div_floor, traits::CheckedNeg, CheckedSub, Integer, Num,
ToPrimitive,
Expand All @@ -50,7 +50,6 @@ use regex::Regex;
use std::collections::HashMap;
use std::str::FromStr;
use std::{fmt::Debug, hash::Hash, num::Wrapping, sync::Arc};
use url::Url;

static TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f");

Expand Down Expand Up @@ -1867,29 +1866,29 @@ fn trim_end(s: &str) -> &str {
#[cfg(not(feature = "hdfs"))]
pub(crate) fn register_object_store(
session_context: Arc<SessionContext>,
) -> Result<(), ExecutionError> {
) -> Result<ObjectStoreUrl, ExecutionError> {
let object_store = object_store::local::LocalFileSystem::new();
let url = Url::try_from("file://").unwrap();
let url = ObjectStoreUrl::parse("file://").unwrap();
session_context
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
Ok(())
.register_object_store((&url).as_ref(), Arc::new(object_store));
Ok(url)
}

#[cfg(feature = "hdfs")]
pub(crate) fn register_object_store(
session_context: Arc<SessionContext>,
) -> Result<(), ExecutionError> {
) -> Result<ObjectStoreUrl, ExecutionError> {
// TODO: read the namenode configuration from file schema or from spark.defaultFS
let url = Url::try_from("hdfs://namenode:9000").unwrap();
let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
if let Some(object_store) =
datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new((&url).as_ref())
{
session_context
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
.register_object_store((&url).as_ref(), Arc::new(object_store));

return Ok(());
return Ok(url);
}

Err(ExecutionError::GeneralError(format!(
Expand Down

0 comments on commit ce91967

Please sign in to comment.