Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add S3 partition directory verification in integration test #890

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
aws-sdk-s3 = "1.68.0"
url = "2.4.0"


28 changes: 23 additions & 5 deletions crates/integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;

use aws_sdk_s3 as s3;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_test_utils::docker::DockerCompose;
Expand All @@ -27,6 +28,7 @@ const REST_CATALOG_PORT: u16 = 8181;
pub struct TestFixture {
pub _docker_compose: DockerCompose,
pub rest_catalog: RestCatalog,
pub s3_client: s3::Client,
}

pub async fn set_test_fixture(func: &str) -> TestFixture {
Expand All @@ -39,15 +41,14 @@ pub async fn set_test_fixture(func: &str) -> TestFixture {
// Start docker compose
docker_compose.run();

let rest_catalog_ip = docker_compose.get_container_ip("rest");
let minio_ip = docker_compose.get_container_ip("minio");

// let rest_catalog_ip = docker_compose.get_container_ip("rest");
// let minio_ip = docker_compose.get_container_ip("minio");
let config = RestCatalogConfig::builder()
.uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
.uri(format!("http://{}:{}", "localhost", REST_CATALOG_PORT))
.props(HashMap::from([
(
S3_ENDPOINT.to_string(),
format!("http://{}:{}", minio_ip, 9000),
format!("http://{}:{}", "localhost", 9000),
),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
Expand All @@ -56,8 +57,25 @@ pub async fn set_test_fixture(func: &str) -> TestFixture {
.build();
let rest_catalog = RestCatalog::new(config);

let s3_config = s3::config::Builder::new()
.endpoint_url(format!("http://{}:{}", "localhost", 9000))
.region(s3::config::Region::new("us-east-1"))
.behavior_version(s3::config::BehaviorVersion::latest())
.credentials_provider(s3::config::Credentials::new(
"admin",
"password",
None,
None,
"test-minio-credentials",
))
.force_path_style(true)
.build();

let s3_client = s3::Client::from_conf(s3_config);

TestFixture {
_docker_compose: docker_compose,
rest_catalog,
s3_client,
}
}
4 changes: 3 additions & 1 deletion crates/integration_tests/testdata/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ services:
- CATALOG_S3_ENDPOINT=http://minio:9000
depends_on:
- minio
- mc
networks:
rest_bridge:
ports:
Expand All @@ -40,7 +41,7 @@ services:
- 8181

minio:
image: minio/minio:RELEASE.2024-03-07T00-43-48Z
image: minio/minio:RELEASE.2024-12-18T13-15-44Z
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
Expand All @@ -51,6 +52,7 @@ services:
rest_bridge:
ports:
- 9001:9001
- 9000:9000
expose:
- 9001
- 9000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
use iceberg_integration_tests::set_test_fixture;
use parquet::file::properties::WriterProperties;
use url::Url;

#[tokio::test]
async fn test_append_partition_data_file() {
Expand All @@ -50,12 +51,13 @@ async fn test_append_partition_data_file() {
]),
);

println!("Creating namespace");
fixture
.rest_catalog
.create_namespace(ns.name(), ns.properties().clone())
.await
.unwrap();

println!("done creating namespace");
let schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![2])
Expand Down Expand Up @@ -163,6 +165,55 @@ async fn test_append_partition_data_file() {
assert_eq!(batches.len(), 1);
assert_eq!(batches[0], batch);

// Now, let's verify that the partition directory (e.g. "id=100/") exists in MinIO.
// We can parse the S3 location from the table metadata, which is typically something like:
// "s3://my-test-bucket/iceberg/rust/t1"
let table_location = table.metadata().location().to_string();
let table_url = Url::parse(&table_location).expect("failed to parse table location as URL");
// Add debug prints for partition info
println!("Table location: {}", table.metadata().location());
println!("Data file location: {:?}", data_file_valid);
// The bucket is the "host" part (my-test-bucket), the rest is the prefix
let bucket = table_url
.host_str()
.expect("no bucket found in table location");
// The .path() starts with '/', so strip leading slash
let mut prefix = table_url.path().trim_start_matches('/').to_string();
prefix.push_str("/data/id="); // Look in data/id=<value> directory
prefix.push_str(&first_partition_id_value.to_string());
prefix.push_str("/");

println!(
"Looking for objects in bucket '{}' with prefix '{}'",
bucket, prefix
);

let list_resp = fixture
.s3_client
.list_objects_v2()
.bucket(bucket)
.prefix(&prefix)
.send()
.await
.expect("failed to list objects from MinIO");

// Ensure we have some objects under that prefix
let objects = list_resp.contents();
assert!(
!objects.is_empty(),
"No objects found under prefix {} - partition layout may not be correct.",
prefix
);

println!("Found objects under partition prefix '{}':", prefix);
for obj in objects {
println!(
" -> key={}, size={:?}",
obj.key().as_deref().unwrap_or_default(),
obj.size()
);
}

test_schema_incompatible_partition_type(
parquet_writer_builder.clone(),
batch.clone(),
Expand Down
Loading