Skip to content

Commit

Permalink
Add Azure Blob Storage store
Browse files Browse the repository at this point in the history
Related to TraceMachina#1542

Implement Azure Blob Storage (Azure Store) for NativeLink.

* **Add Azure Store Implementation**:
  - Create `nativelink-store/src/azure_store.rs` with Azure Blob Storage store implementation.
  - Use Azure SDK for Rust for interacting with Azure Blob Storage.
  - Implement `StoreDriver` trait for Azure Store.
  - Implement authentication using Azure SDK for Rust.
  - Implement chunked uploads for large files using `put_block` and `put_block_list` methods.

* **Update Store Factory**:
  - Modify `nativelink-store/src/default_store_factory.rs` to include Azure Store in the store factory function.

* **Add Unit Tests**:
  - Create `nativelink-store/tests/azure_store_test.rs` with unit tests for Azure Store.
  - Test basic CRUD operations, error handling, authentication process, and chunked upload functionality.

* **Update Dependencies**:
  - Modify `nativelink-store/Cargo.toml` to include necessary dependencies for Azure Blob Storage.

* **Update Documentation**:
  - Modify `README.md` to include information about the Azure store and how to configure it.
  - Add information about different types of blobs supported by the Azure store.
  • Loading branch information
vishwamartur committed Dec 23, 2024
1 parent b1df876 commit a045a43
Show file tree
Hide file tree
Showing 5 changed files with 550 additions and 0 deletions.
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,64 @@ nix run github:TraceMachina/nativelink ./basic_cas.json5

See the [contribution docs](https://nativelink.com/docs/contribute/nix) for further information.

### Azure Blob Storage Configuration

To configure the Azure Blob Storage (Azure Store) for NativeLink, follow these steps:

1. **Add Azure Store Configuration**:
- Update your configuration file (e.g., `basic_cas.json5`) to include the Azure store configuration. Here is an example configuration:

```json
{
"stores": {
"AZURE_STORE": {
"azure_store": {
"account_name": "your_account_name",
"account_key": "your_account_key",
"container_name": "your_container_name",
"key_prefix": "your_key_prefix",
"max_retry_buffer_per_request": 5242880
}
}
}
}
```

2. **Set Environment Variables**:
- Ensure that the necessary credentials are set as environment variables or stored securely in a configuration file. For example:

```bash
export AZURE_STORAGE_ACCOUNT="your_account_name"
export AZURE_STORAGE_KEY="your_account_key"
```

3. **Run NativeLink with Azure Store**:
- Start NativeLink with the updated configuration file that includes the Azure store configuration.

```bash
docker run \
-v $(pwd)/basic_cas.json5:/config \
-p 50051:50051 \
ghcr.io/tracemachina/nativelink:v0.5.3 \
config
```

### Supported Blob Types

The Azure store in NativeLink supports the following types of blobs:

1. **Block Blobs**:
- Used for storing text and binary data, such as documents and media files.
- Supports large file uploads using chunked uploads with `put_block` and `put_block_list` methods.

2. **Append Blobs**:
- Optimized for append operations, making them suitable for logging scenarios.
- Data can only be appended to an existing blob, not modified or deleted.

3. **Page Blobs**:
- Designed for random read/write operations.
- Used for scenarios like virtual hard disk (VHD) files for Azure virtual machines.

## ✍️ Contributors

<a href="https://github.com/tracemachina/nativelink/graphs/contributors" aria-label="View contributors of the NativeLink project on GitHub">
Expand Down
3 changes: 3 additions & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ tokio-util = { version = "0.7.13" }
tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false }
tracing = { version = "0.1.41", default-features = false }
uuid = { version = "1.11.0", default-features = false, features = ["v4", "serde"] }
azure_core = "0.1.0"
azure_identity = "0.1.0"
azure_storage = "0.1.0"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
180 changes: 180 additions & 0 deletions nativelink-store/src/azure_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_core::auth::TokenCredential;
use azure_core::prelude::*;
use azure_identity::DefaultAzureCredential;
use azure_storage::blob::prelude::*;
use azure_storage::core::prelude::*;
use bytes::Bytes;
use futures::stream::StreamExt;
use nativelink_config::stores::AzureSpec;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tracing::{event, Level};

#[derive(MetricsComponent)]
pub struct AzureStore {
container_client: Arc<ContainerClient>,
credential: Arc<dyn TokenCredential>,
#[metric(help = "The container name for the Azure store")]
container_name: String,
#[metric(help = "The key prefix for the Azure store")]
key_prefix: String,
#[metric(help = "The number of bytes to buffer for retrying requests")]
max_retry_buffer_per_request: usize,
}

impl AzureStore {
pub async fn new(spec: &AzureSpec) -> Result<Arc<Self>, Error> {
let credential = Arc::new(DefaultAzureCredential::default());
let container_client = Arc::new(
StorageAccountClient::new_access_key(
&spec.account_name,
&spec.account_key,
)
.as_container_client(&spec.container_name),
);

Ok(Arc::new(Self {
container_client,
credential,
container_name: spec.container_name.clone(),
key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
max_retry_buffer_per_request: spec
.max_retry_buffer_per_request
.unwrap_or(5 * 1024 * 1024), // 5MB
}))
}

fn make_azure_path(&self, key: &StoreKey<'_>) -> String {
format!("{}{}", self.key_prefix, key.as_str())
}

async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(digest));

match blob_client.get_properties().await {
Ok(properties) => Ok(Some(properties.blob.properties.content_length)),
Err(e) => match e.kind() {
azure_core::error::ErrorKind::ResourceNotFound => Ok(None),
_ => Err(make_err!(Code::Unavailable, "Azure error: {e:?}")),
},
}
}
}

#[async_trait]
impl StoreDriver for AzureStore {
async fn has_with_results(
self: Pin<&Self>,
keys: &[StoreKey<'_>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
keys.iter()
.zip(results.iter_mut())
.map(|(key, result)| async move {
*result = self.has(key).await?;
Ok::<_, Error>(())
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect()
.await
}

async fn update(
self: Pin<&Self>,
key: StoreKey<'_>,
mut reader: DropCloserReadHalf,
upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(&key));

let mut block_list = Vec::new();
let mut block_id = 0;

loop {
let chunk = reader
.consume(Some(self.max_retry_buffer_per_request))
.await
.err_tip(|| "Failed to read chunk in azure_store")?;
if chunk.is_empty() {
break; // EOF
}

let block_id_str = format!("{:032}", block_id);
block_list.push(BlockId::new(block_id_str.clone()));

blob_client
.put_block(block_id_str, chunk.clone())
.await
.err_tip(|| "Failed to upload block to Azure Blob Storage")?;

block_id += 1;
}

blob_client
.put_block_list(block_list)
.await
.err_tip(|| "Failed to commit block list to Azure Blob Storage")?;

Ok(())
}

async fn get_part(
self: Pin<&Self>,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(&key));

let mut stream = blob_client
.get()
.range(offset..length.map_or(u64::MAX, |l| offset + l))
.stream();

while let Some(chunk) = stream.next().await {
let chunk = chunk.err_tip(|| "Failed to read chunk from Azure Blob Storage")?;
writer
.send(Bytes::from(chunk))
.await
.err_tip(|| "Failed to send chunk to writer in azure_store")?;
}

writer
.send_eof()
.err_tip(|| "Failed to send EOF in azure_store get_part")?;

Ok(())
}

fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

default_health_status_indicator!(AzureStore);
2 changes: 2 additions & 0 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::shard_store::ShardStore;
use crate::size_partitioning_store::SizePartitioningStore;
use crate::store_manager::StoreManager;
use crate::verify_store::VerifyStore;
use crate::azure_store::AzureStore;

type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>;

Expand Down Expand Up @@ -97,6 +98,7 @@ pub fn store_factory<'a>(
.await?;
ShardStore::new(spec, stores)?
}
StoreSpec::azure_store(spec) => AzureStore::new(spec).await?,
};

if let Some(health_registry_builder) = maybe_health_registry_builder {
Expand Down
Loading

0 comments on commit a045a43

Please sign in to comment.