Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

image-rs: fix image layer ordering #404

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 65 additions & 23 deletions image-rs/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, bail, Result};
use futures_util::stream::{self, StreamExt, TryStreamExt};
use oci_distribution::manifest::{OciDescriptor, OciImageManifest};
use oci_distribution::{secrets::RegistryAuth, Client, Reference};
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -92,35 +93,32 @@ impl<'a> PullClient<'a> {
decrypt_config: &Option<&str>,
meta_store: Arc<Mutex<MetaStore>>,
) -> Result<Vec<LayerMeta>> {
let layer_metas = stream::iter(layer_descs)
let meta_store = &meta_store;
let layer_metas: Vec<(usize, LayerMeta)> = stream::iter(layer_descs)
.enumerate()
.map(|(i, layer)| {
let client = &self.client;
let reference = &self.reference;
let ms = meta_store.clone();

async move {
let layer_reader = client
.async_pull_blob(reference, &layer.digest)
.await
.map_err(|e| anyhow!("failed to async pull blob {}", e.to_string()))?;

self.async_handle_layer(
layer,
diff_ids[i].clone(),
decrypt_config,
layer_reader,
ms,
)
.map(|(i, layer)| async move {
let layer_reader = self
.client
.async_pull_blob(&self.reference, &layer.digest)
.await
.map_err(|e| anyhow!("failed to handle layer: {:?}", e))
}
.map_err(|e| anyhow!("failed to async pull blob {}", e.to_string()))?;
self.async_handle_layer(
layer,
diff_ids[i].clone(),
decrypt_config,
layer_reader,
meta_store.clone(),
)
.await
.map_err(|e| anyhow!("failed to handle layer: {:?}", e))
.map(|layer_meta| (i, layer_meta))
})
.buffer_unordered(self.max_concurrent_download)
.try_collect()
.await?;

Ok(layer_metas)
let meta_map: BTreeMap<usize, _> = layer_metas.into_iter().collect();
let sorted_layer_metas = meta_map.into_values().collect();
Ok(sorted_layer_metas)
}

async fn async_handle_layer(
Expand Down Expand Up @@ -215,6 +213,50 @@ mod tests {

use test_utils::{assert_result, assert_retry};

#[ignore]
#[tokio::test]
async fn image_layer_order() {
let image_url =
"nginx@sha256:9700d098d545f9d2ee0660dfb155fe64f4447720a0a763a93f2cf08997227279";
let tempdir = tempfile::tempdir().unwrap();
let image = Reference::try_from(image_url.to_string()).expect("create reference failed");
let mut client = PullClient::new(
image,
tempdir.path(),
&RegistryAuth::Anonymous,
DEFAULT_MAX_CONCURRENT_DOWNLOAD,
)
.unwrap();
let (image_manifest, _image_digest, image_config) = client.pull_manifest().await.unwrap();

let image_config = ImageConfiguration::from_reader(image_config.as_bytes()).unwrap();
let diff_ids = image_config.rootfs().diff_ids();

// retry 3 times w/ timeout
for i in 0..3 {
let wait = std::time::Duration::from_secs(i * 2);
tokio::time::sleep(wait).await;

let result = client
.async_pull_layers(
image_manifest.layers.clone(),
diff_ids,
&None,
Arc::new(Mutex::new(MetaStore::default())),
)
.await;
if let Ok(layer_metas) = result {
let digests: Vec<String> = layer_metas
.iter()
.map(|l| l.uncompressed_digest.clone())
.collect();
assert_eq!(&digests, diff_ids, "hashes should be in same order");
return;
}
}
panic!("failed to pull layers");
}

#[tokio::test]
async fn test_async_pull_client() {
let oci_images = [
Expand Down
Loading