Skip to content

Commit

Permalink
feat(core): Implement list with deleted and versions for oss (#5527)
Browse files Browse the repository at this point in the history
Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
hoslo and Xuanwo authored Jan 13, 2025
1 parent 5ae1a76 commit f0b3aa3
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 11 deletions.
37 changes: 37 additions & 0 deletions .github/services/oss/oss_with_versioning/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: oss_with_versioning
description: "Behavior test for OSS with bucket versioning. This service is sponsored by @datafuse_labs."

runs:
using: "composite"
steps:
- name: Setup
uses: 1password/load-secrets-action@v1
with:
export-env: true
env:
OPENDAL_OSS_BUCKET: op://services/oss/versioning_bucket
OPENDAL_OSS_ENDPOINT: op://services/oss/endpoint
OPENDAL_OSS_ACCESS_KEY_ID: op://services/oss/access_key_id
OPENDAL_OSS_ACCESS_KEY_SECRET: op://services/oss/access_key_secret

- name: Add extra settings
shell: bash
run: |
echo "OPENDAL_OSS_ENABLE_VERSIONING=true" >> $GITHUB_ENV
45 changes: 34 additions & 11 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use reqsign::AliyunOssSigner;
use super::core::*;
use super::delete::OssDeleter;
use super::error::parse_error;
use super::lister::OssLister;
use super::lister::{OssLister, OssListers, OssObjectVersionsLister};
use super::writer::OssWriter;
use super::writer::OssWriters;
use crate::raw::*;
Expand Down Expand Up @@ -97,6 +97,13 @@ impl OssBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Set an endpoint for generating presigned urls.
///
/// You can offer a public endpoint like <https://oss-cn-beijing.aliyuncs.com> to return a presinged url for
Expand Down Expand Up @@ -408,6 +415,7 @@ impl Builder for OssBuilder {
host,
presign_endpoint,
allow_anonymous: self.config.allow_anonymous,
enable_versioning: self.config.enable_versioning,
signer,
loader,
client,
Expand All @@ -428,7 +436,7 @@ pub struct OssBackend {
impl Access for OssBackend {
type Reader = HttpBody;
type Writer = OssWriters;
type Lister = oio::PageLister<OssLister>;
type Lister = OssListers;
type Deleter = oio::BatchDeleter<OssDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -449,16 +457,19 @@ impl Access for OssBackend {
stat_has_content_type: true,
stat_has_content_encoding: true,
stat_has_content_range: true,
stat_with_version: self.core.enable_versioning,
stat_has_etag: true,
stat_has_content_md5: true,
stat_has_last_modified: true,
stat_has_content_disposition: true,
stat_has_user_metadata: true,
stat_has_version: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,
read_with_if_modified_since: true,
read_with_if_unmodified_since: true,

Expand All @@ -470,7 +481,7 @@ impl Access for OssBackend {
write_with_content_type: true,
write_with_content_disposition: true,
// TODO: set this to false while version has been enabled.
write_with_if_not_exists: true,
write_with_if_not_exists: !self.core.enable_versioning,

// The min multipart size of OSS is 100 KiB.
//
Expand All @@ -487,6 +498,7 @@ impl Access for OssBackend {
write_with_user_metadata: true,

delete: true,
delete_with_version: self.core.enable_versioning,
delete_max_size: Some(self.core.delete_max_size),

copy: true,
Expand All @@ -497,6 +509,8 @@ impl Access for OssBackend {
list_with_recursive: true,
list_has_etag: true,
list_has_content_md5: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_content_length: true,
list_has_last_modified: true,

Expand Down Expand Up @@ -574,14 +588,23 @@ impl Access for OssBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = OssLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
let l = if args.versions() || args.deleted() {
TwoWays::Two(oio::PageLister::new(OssObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(oio::PageLister::new(OssLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/oss/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub struct OssConfig {
/// Bucket for oss.
pub bucket: String,

/// is bucket versioning enabled for this bucket
pub enable_versioning: bool,

// OSS features
/// Server side encryption for oss.
pub server_side_encryption: Option<String>,
Expand Down
84 changes: 84 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct OssCore {
pub endpoint: String,
pub presign_endpoint: String,
pub allow_anonymous: bool,
pub enable_versioning: bool,

pub server_side_encryption: Option<HeaderValue>,
pub server_side_encryption_key_id: Option<HeaderValue>,
Expand Down Expand Up @@ -520,6 +521,50 @@ impl OssCore {
self.send(req).await
}

pub async fn oss_list_object_versions(
&self,
prefix: &str,
delimiter: &str,
limit: Option<usize>,
key_marker: &str,
version_id_marker: &str,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, prefix);

let mut url = format!("{}?versions", self.endpoint);
if !p.is_empty() {
write!(url, "&prefix={}", percent_encode_path(p.as_str()))
.expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
}

if let Some(limit) = limit {
write!(url, "&max-keys={}", limit).expect("write into string must succeed");
}
if !key_marker.is_empty() {
write!(url, "&key-marker={}", percent_encode_path(key_marker))
.expect("write into string must succeed");
}
if !version_id_marker.is_empty() {
write!(
url,
"&version-id-marker={}",
percent_encode_path(version_id_marker)
)
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

pub async fn oss_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
let mut req = self.oss_delete_object_request(path, args)?;
self.sign(&mut req).await?;
Expand Down Expand Up @@ -768,6 +813,45 @@ pub struct CommonPrefix {
pub prefix: String,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct OutputCommonPrefix {
pub prefix: String,
}

/// Output of ListObjectVersions
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectVersionsOutput {
pub is_truncated: Option<bool>,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputVersion {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub size: u64,
pub last_modified: String,
#[serde(rename = "ETag")]
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

#[cfg(test)]
mod tests {
use bytes::Buf;
Expand Down
Loading

0 comments on commit f0b3aa3

Please sign in to comment.