Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial Operator::from_uri implementation #5482

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f2267e6
feat: initial OperatorRegistry implementation
jorgehermo9 Dec 30, 2024
f581316
feat: continue with operator registry implementation
jorgehermo9 Dec 30, 2024
b67be38
feat: register enabled services & set global registry
jorgehermo9 Dec 30, 2024
b8c4fb1
feat: glue together Operator and global OperatorRegistry
jorgehermo9 Dec 30, 2024
a50c058
feat: glue together Operator and global OperatorRegistry
jorgehermo9 Dec 30, 2024
539e6ea
feat: implement Configurator::from_uri
jorgehermo9 Dec 30, 2024
3aba98b
test: add small doctest to Operator::from_uri
jorgehermo9 Dec 30, 2024
5979898
chore: add comment
jorgehermo9 Dec 30, 2024
6eed396
chore: remove changes
jorgehermo9 Dec 30, 2024
0e582ee
chore: remove changes
jorgehermo9 Dec 30, 2024
9d730c0
chore: add license header
jorgehermo9 Dec 30, 2024
976e60c
chore: fix typo
jorgehermo9 Dec 30, 2024
a4478a9
fix: clippy lint
jorgehermo9 Dec 30, 2024
a4e3866
retrigger ci
jorgehermo9 Dec 30, 2024
443c11d
feat: drop usage of the url crate
jorgehermo9 Jan 5, 2025
e7215b4
chore: remove TODO
jorgehermo9 Jan 5, 2025
b6dd1dc
feat: wrap OperatorRegistry in Arc Mutex
jorgehermo9 Jan 5, 2025
e887d24
feat: Initialize operator registry in new method
jorgehermo9 Jan 5, 2025
41790a5
chore: remove todo
jorgehermo9 Jan 5, 2025
27d65d9
Merge branch 'main' into feature/operator-from-uri
jorgehermo9 Jan 5, 2025
3b80f70
feat: move service registration to builder methods
jorgehermo9 Jan 5, 2025
c5d1c9c
chore: fix comment typo
jorgehermo9 Jan 5, 2025
e53cdd7
chore: fix comment typo
jorgehermo9 Jan 5, 2025
79091ac
chore: add comment
jorgehermo9 Jan 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub use header::parse_prefixed_headers;
mod uri;
pub use uri::percent_decode_path;
pub use uri::percent_encode_path;
pub use uri::query_pairs;

mod error;
pub use error::new_request_build_error;
Expand Down
19 changes: 19 additions & 0 deletions core/src/raw/http_util/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ pub fn percent_decode_path(path: &str) -> String {
}
}

/// query_pairs will parse a query string encoded as key-value pairs separated by `&` to a vector of key-value pairs.
/// It also does percent decoding for both key and value.
///
/// Note that `?` is not allowed in the query string, and it will be treated as a part of the first key if included.
pub fn query_pairs(query: &str) -> Vec<(String, String)> {
query
.split('&')
.filter_map(|pair| {
let mut iter = pair.splitn(2, '=');
// TODO: should we silently ignore invalid pairs and filter them out without the user noticing?
// or should we return an error in the whole `query_pairs` function so the caller knows it failed?
let key = iter.next()?;
let value = iter.next().unwrap_or("");
Some((key, value))
})
.map(|(key, value)| (percent_decode_path(key), percent_decode_path(value)))
.collect()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
37 changes: 37 additions & 0 deletions core/src/types/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

use std::fmt::Debug;

use http::Uri;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::raw::*;
use crate::*;
use types::operator::{OperatorFactory, OperatorRegistry};

/// Builder is used to set up underlying services.
///
Expand Down Expand Up @@ -56,6 +58,15 @@ pub trait Builder: Default + 'static {

/// Consume the accessor builder to build a service.
fn build(self) -> Result<impl Access>;

/// Register this builder in the given registry.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this doc-comment adequate?

fn register_in_registry(registry: &mut OperatorRegistry) {
let operator_factory: OperatorFactory = |uri, options| {
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I Had to add the type annotation for operator_factory because the compiler could not infer it correctly

error[E0308]: mismatched types
  --> src/types/builder.rs:68:55
   |
68 |         registry.register(Self::SCHEME.into_static(), operator_factory)
   |                                                       ^^^^^^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected fn pointer `for<'a> fn(&'a _, HashMap<_, _>) -> std::result::Result<_, _>`
              found fn pointer `fn(&_, HashMap<_, _>) -> std::result::Result<_, _>`

For more information about this error, try `rustc --explain E0308`.
warning: `opendal` (lib) generated 2 warnings
error: could not compile `opendal` (lib) due to 1 previous error; 2 warnings emitted

let builder = Self::Config::from_uri(uri, options)?.into_builder();
Ok(Operator::new(builder)?.finish())
};
registry.register(Self::SCHEME.into_static(), operator_factory)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a default implementation, we simply register the associated SCHEME with this builder. Is this ok?

}
}

/// Dummy implementation of builder
Expand Down Expand Up @@ -137,6 +148,32 @@ pub trait Configurator: Serialize + DeserializeOwned + Debug + 'static {
})
}

/// TODO: document this.
fn from_uri(uri: &str, options: impl IntoIterator<Item = (String, String)>) -> Result<Self> {
let parsed_uri = uri.parse::<Uri>().map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "uri is invalid")
.with_context("uri", uri)
.set_source(err)
})?;

// TODO: I have some doubts about this default implementation
// It was inspired from https://github.com/apache/opendal/blob/52c96bb8e8cb4d024ccab1f415c4756447c726dd/bin/ofs/src/main.rs#L60
// Parameters should be specified in uri's query param. Example: 'fs://?root=<directory>'
// this is very similar to https://github.com/apache/opendal/blob/52c96bb8e8cb4d024ccab1f415c4756447c726dd/bin/ofs/README.md?plain=1#L45
let query_pairs = parsed_uri.query().map(query_pairs).unwrap_or_default();

// TODO: should we log a warning if the query_pairs vector is empty?

// TODO: we are not interpreting the host or path params
// the `let op = Operator::from_uri("fs:///tmp/test", vec![])?;` statement from the RFC wont work.
// instead we should use `let op = Operator::from_uri("fs://?root=/tmp/test", vec![])?;` as done
// in `ofs`. The `fs` service should override this default implementation if it wants to use the host or path params?

let merged_options = query_pairs.into_iter().chain(options);

Self::from_iter(merged_options)
}

/// Convert this configuration into a service builder.
fn into_builder(self) -> Self::Builder;
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/types/operator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::sync::Arc;
use crate::layers::*;
use crate::raw::*;
use crate::*;
// TODO: is this import path idiomatic to the project?
use super::registry::GLOBAL_OPERATOR_REGISTRY;

/// # Operator build API
///
Expand Down Expand Up @@ -95,6 +97,27 @@ impl Operator {
Ok(OperatorBuilder::new(acc))
}

/// TODO: document this.
///
/// TODO: improve those examples
/// TODO: this test does not work. It always output Ok
/// # Examples
/// ```
/// # use anyhow::Result;
/// use opendal::Operator;
///
/// fn test() -> Result<()> {
/// Operator::from_uri("fs://?root=/tmp/test", vec![])?;
/// Ok(())
/// }
/// ```
pub fn from_uri(
jorgehermo9 marked this conversation as resolved.
Show resolved Hide resolved
uri: &str,
options: impl IntoIterator<Item = (String, String)>,
) -> Result<Self> {
GLOBAL_OPERATOR_REGISTRY.with(|registry| registry.parse(uri, options))
}

/// Create a new operator from given iterator in static dispatch.
///
/// # Notes
Expand Down
8 changes: 8 additions & 0 deletions core/src/types/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ pub use metadata::OperatorInfo;

pub mod operator_functions;
pub mod operator_futures;

// TODO: should we make the registry module public or export the OperatorFactory and OperatorRegistry
// types directly?
mod registry;
// TODO: warning as not used. How can we expose them as public api?
pub use registry::OperatorFactory;
pub use registry::OperatorRegistry;
pub use registry::GLOBAL_OPERATOR_REGISTRY;
227 changes: 227 additions & 0 deletions core/src/types/operator/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::cell::LazyCell;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this file inside the crate::types::operator::registry module. Is it ok? I thought about adding a crate::types::operator_registry, but it seemed better this way.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use http::Uri;

use crate::services::*;
use crate::*;

// TODO: thread local or use LazyLock instead? this way the access is lock-free
// TODO: should we expose the `GLOBAL_OPERATOR_REGISTRY` as public API at `crate::types::operator::GLOBAL_OPERATOR_REGISTRY`?
thread_local! {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the preferred way of having a global static variable such as this?

I prefer to have it thread_local so there is not need for a LazyLock, we can use LazyCell instead (LazyCell is lock-free but LazyLock is not, it synchronizes access through threads)

pub static GLOBAL_OPERATOR_REGISTRY: LazyCell<OperatorRegistry> = LazyCell::new(OperatorRegistry::new);
}

// In order to reduce boilerplate, we should return in this function a `Builder` instead of operator?.
pub type OperatorFactory = fn(&str, HashMap<String, String>) -> Result<Operator>;

// TODO: the default implementation should return an empty registry? Or it should return the initialized
// registry with all the services that are enabled? If so, should we include an `OperatorRegistry::empty` method
// that allows users to create an empty registry?
#[derive(Clone, Debug, Default)]
pub struct OperatorRegistry {
registry: Arc<Mutex<HashMap<String, OperatorFactory>>>,
}

impl OperatorRegistry {
pub fn new() -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's initialize the registry here directly.

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done in e887d24

Derived a Default implementation for OperatorRegistry that rely in HashMap::default so users can get an empty OperatorRegistry if they need it.

Or should the Default implementation include all the registered services like the OperatorRegistry::new? If so, should we include a OpereatorRegistry::empty so users can get an empty OperatorRegistry?

let mut registry = Self::default();
// TODO: is this correct? have a `Builder::enabled()` method that returns the set of enabled services builders?
// Similar to `Scheme::Enabled()`
// or have an `Scheme::associated_builder` that given a scheme returns the associated builder? The problem with this
// is that `Scheme` variants are not gate behind a feature gate and the associated builder is. As a workaround

// TODO: it seems too error-prone to have this list manually updated, we should have a macro that generates this list?
// it could be something like:
//
// ```rust
// apply_for_all_services!{
// $service::register_in_registry(&mut registry>();
// }
// ```
// and the apply_for_all_services macro would gate every statement behind the corresponding feature gate
// This seems to not be the place where we should have a "list of enabled services".
#[cfg(feature = "services-aliyun-drive")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe every service should have its own registration functions. There are two reasons:

  • The service may have different schemes for registration. For example, S3 registers as s3, s3a, minio, r2, and so on, while GCS registers as gcs, gs, and so forth.
  • Provide a register that simplifies integration with OpenDAL for external users.

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, going to move the registration to each service Builder.

something like

impl Builder for S3Builder{

  pub fn register_in_registry(registry: &mut OperatorRegistry){
   registry.register("s3",s3_builder)
   registry.register("r2", r2_builder)
  }
}

I wonder if s3 and r2 (or s3a, minio, etc) have different ways of building, if so, we can use different OperatorFactory such as s3_builder, r2_builder, etc

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented that in 3b80f70. Do you prefer it that way?

Please note that I used a default Builder::register_in_registry for all services.

For example, S3 registers as s3, s3a, minio, r2, and so on, while GCS registers as gcs, gs, and so forth.

This can be addressed in follow-up PR's (as discussed here) and then S3Builder should override the default Builder::register_in_registry implementation to register the custom schemes/factories if needed.

AliyunDrive::register_in_registry(&mut registry);
#[cfg(feature = "services-atomicserver")]
Atomicserver::register_in_registry(&mut registry);
#[cfg(feature = "services-alluxio")]
Alluxio::register_in_registry(&mut registry);
#[cfg(feature = "services-azblob")]
Azblob::register_in_registry(&mut registry);
#[cfg(feature = "services-azdls")]
Azdls::register_in_registry(&mut registry);
#[cfg(feature = "services-azfile")]
Azfile::register_in_registry(&mut registry);
#[cfg(feature = "services-b2")]
B2::register_in_registry(&mut registry);
#[cfg(feature = "services-cacache")]
Cacache::register_in_registry(&mut registry);
#[cfg(feature = "services-cos")]
Cos::register_in_registry(&mut registry);
#[cfg(feature = "services-compfs")]
Compfs::register_in_registry(&mut registry);
#[cfg(feature = "services-dashmap")]
Dashmap::register_in_registry(&mut registry);
#[cfg(feature = "services-dropbox")]
Dropbox::register_in_registry(&mut registry);
#[cfg(feature = "services-etcd")]
Etcd::register_in_registry(&mut registry);
#[cfg(feature = "services-foundationdb")]
Foundationdb::register_in_registry(&mut registry);
#[cfg(feature = "services-fs")]
Fs::register_in_registry(&mut registry);
#[cfg(feature = "services-ftp")]
Ftp::register_in_registry(&mut registry);
#[cfg(feature = "services-gcs")]
Gcs::register_in_registry(&mut registry);
#[cfg(feature = "services-ghac")]
Ghac::register_in_registry(&mut registry);
#[cfg(feature = "services-hdfs")]
Hdfs::register_in_registry(&mut registry);
#[cfg(feature = "services-http")]
Http::register_in_registry(&mut registry);
#[cfg(feature = "services-huggingface")]
Huggingface::register_in_registry(&mut registry);
#[cfg(feature = "services-ipfs")]
Ipfs::register_in_registry(&mut registry);
#[cfg(feature = "services-ipmfs")]
Ipmfs::register_in_registry(&mut registry);
#[cfg(feature = "services-icloud")]
Icloud::register_in_registry(&mut registry);
#[cfg(feature = "services-libsql")]
Libsql::register_in_registry(&mut registry);
#[cfg(feature = "services-memcached")]
Memcached::register_in_registry(&mut registry);
#[cfg(feature = "services-memory")]
Memory::register_in_registry(&mut registry);
#[cfg(feature = "services-mini-moka")]
MiniMoka::register_in_registry(&mut registry);
#[cfg(feature = "services-moka")]
Moka::register_in_registry(&mut registry);
#[cfg(feature = "services-monoiofs")]
Monoiofs::register_in_registry(&mut registry);
#[cfg(feature = "services-mysql")]
Mysql::register_in_registry(&mut registry);
#[cfg(feature = "services-obs")]
Obs::register_in_registry(&mut registry);
#[cfg(feature = "services-onedrive")]
Onedrive::register_in_registry(&mut registry);
#[cfg(feature = "services-postgresql")]
Postgresql::register_in_registry(&mut registry);
#[cfg(feature = "services-gdrive")]
Gdrive::register_in_registry(&mut registry);
#[cfg(feature = "services-oss")]
Oss::register_in_registry(&mut registry);
#[cfg(feature = "services-persy")]
Persy::register_in_registry(&mut registry);
#[cfg(feature = "services-redis")]
Redis::register_in_registry(&mut registry);
#[cfg(feature = "services-rocksdb")]
Rocksdb::register_in_registry(&mut registry);
#[cfg(feature = "services-s3")]
S3::register_in_registry(&mut registry);
#[cfg(feature = "services-seafile")]
Seafile::register_in_registry(&mut registry);
#[cfg(feature = "services-upyun")]
Upyun::register_in_registry(&mut registry);
#[cfg(feature = "services-yandex-disk")]
YandexDisk::register_in_registry(&mut registry);
#[cfg(feature = "services-pcloud")]
Pcloud::register_in_registry(&mut registry);
#[cfg(feature = "services-sftp")]
Sftp::register_in_registry(&mut registry);
#[cfg(feature = "services-sled")]
Sled::register_in_registry(&mut registry);
#[cfg(feature = "services-sqlite")]
Sqlite::register_in_registry(&mut registry);
#[cfg(feature = "services-supabase")]
Supabase::register_in_registry(&mut registry);
#[cfg(feature = "services-swift")]
Swift::register_in_registry(&mut registry);
#[cfg(feature = "services-tikv")]
Tikv::register_in_registry(&mut registry);
#[cfg(feature = "services-vercel-artifacts")]
VercelArtifacts::register_in_registry(&mut registry);
#[cfg(feature = "services-vercel-blob")]
VercelBlob::register_in_registry(&mut registry);
#[cfg(feature = "services-webdav")]
Webdav::register_in_registry(&mut registry);
#[cfg(feature = "services-webhdfs")]
Webhdfs::register_in_registry(&mut registry);
#[cfg(feature = "services-redb")]
Redb::register_in_registry(&mut registry);
#[cfg(feature = "services-mongodb")]
Mongodb::register_in_registry(&mut registry);
#[cfg(feature = "services-hdfs-native")]
HdfsNative::register_in_registry(&mut registry);
#[cfg(feature = "services-surrealdb")]
Surrealdb::register_in_registry(&mut registry);
#[cfg(feature = "services-lakefs")]
Lakefs::register_in_registry(&mut registry);
#[cfg(feature = "services-nebula-graph")]
NebulaGraph::register_in_registry(&mut registry);

registry
}

pub fn register(&mut self, scheme: &str, factory: OperatorFactory) {
// TODO: should we receive a `&str` or a `String`? we are cloning it anyway
self.registry
.lock()
.expect("poisoned lock")
.insert(scheme.to_string(), factory);
}

pub fn parse(
&self,
uri: &str,
options: impl IntoIterator<Item = (String, String)>,
) -> Result<Operator> {
// TODO: we use the `url::Url` struct instead of `http:Uri`, because
// we needed it in `Configurator::from_uri` method.
let parsed_uri = uri.parse::<Uri>().map_err(|err| {
Error::new(ErrorKind::ConfigInvalid, "uri is invalid")
.with_context("uri", uri)
.set_source(err)
})?;

let scheme = parsed_uri.scheme_str().ok_or_else(|| {
Error::new(ErrorKind::ConfigInvalid, "uri is missing scheme").with_context("uri", uri)
})?;

let registry_lock = self.registry.lock().expect("poisoned lock");
let factory = registry_lock.get(scheme).ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"could not find any operator factory for the given scheme",
)
.with_context("uri", uri)
.with_context("scheme", scheme)
})?;

// TODO: `OperatorFactory` should receive `IntoIterator<Item = (String, String)>` instead of `HashMap<String, String>`?
// however, impl Traits in type aliases is unstable and also are not allowed in fn pointers
let options = options.into_iter().collect();

factory(uri, options)
}
}
Loading