Skip to content

Commit

Permalink
chore: migrate qdrant client to use new API (#7675)
Browse files Browse the repository at this point in the history
* chore: migrate qdrant client to use new API

* nit: better qdrant bin logs

---------

Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Sep 25, 2024
1 parent 83f2e76 commit 2aa9b70
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 122 deletions.
131 changes: 63 additions & 68 deletions core/bin/qdrant/create_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ use dust::{
},
utils,
};
use qdrant_client::{client::QdrantClient, qdrant};
use qdrant_client::{
qdrant::{
self, quantization_config::Quantization, CreateCollectionBuilder,
CreateFieldIndexCollectionBuilder, CreateShardKeyBuilder, CreateShardKeyRequestBuilder,
HnswConfigDiffBuilder, OptimizersConfigDiffBuilder, VectorParamsBuilder,
},
Qdrant,
};
use tokio;

#[derive(Parser, Debug)]
Expand All @@ -30,58 +37,48 @@ struct Args {
}

async fn create_indexes_for_collection(
raw_client: &Arc<QdrantClient>,
raw_client: &Arc<Qdrant>,
cluster: &QdrantCluster,
collection_name: &String,
) -> Result<()> {
let _ = raw_client
.create_field_index(
.create_field_index(CreateFieldIndexCollectionBuilder::new(
collection_name,
"document_id_hash",
qdrant::FieldType::Keyword,
None,
None,
)
))
.await?;

let _ = raw_client
.create_field_index(
.create_field_index(CreateFieldIndexCollectionBuilder::new(
collection_name,
"data_source_internal_id",
qdrant::FieldType::Keyword,
None,
None,
)
))
.await?;

let _ = raw_client
.create_field_index(
.create_field_index(CreateFieldIndexCollectionBuilder::new(
collection_name,
"tags",
qdrant::FieldType::Keyword,
None,
None,
)
))
.await?;

let _ = raw_client
.create_field_index(
.create_field_index(CreateFieldIndexCollectionBuilder::new(
collection_name,
"parents",
qdrant::FieldType::Keyword,
None,
None,
)
))
.await?;

let _ = raw_client
.create_field_index(
.create_field_index(CreateFieldIndexCollectionBuilder::new(
collection_name,
"timestamp",
qdrant::FieldType::Integer,
None,
None,
)
))
.await?;

println!(
Expand Down Expand Up @@ -127,43 +124,29 @@ async fn create_qdrant_collection(

// First, we create the collection.
let res = raw_client
.create_collection(&qdrant::CreateCollection {
collection_name: collection_name.clone(),
vectors_config: Some(qdrant::VectorsConfig {
config: Some(qdrant::vectors_config::Config::Params(
qdrant::VectorParams {
size: embedder.embedding_size() as u64,
distance: qdrant::Distance::Cosine.into(),
on_disk: Some(true),
..Default::default()
},
)),
}),
hnsw_config: Some(qdrant::HnswConfigDiff {
payload_m: Some(16),
m: Some(0),
..Default::default()
}),
optimizers_config: Some(qdrant::OptimizersConfigDiff {
memmap_threshold: Some(16384),
..Default::default()
}),
quantization_config: Some(qdrant::QuantizationConfig {
quantization: Some(qdrant::quantization_config::Quantization::Scalar(
qdrant::ScalarQuantization {
r#type: qdrant::QuantizationType::Int8.into(),
quantile: Some(0.99),
always_ram: Some(true),
},
)),
}),
on_disk_payload: Some(true),
sharding_method: Some(qdrant::ShardingMethod::Custom.into()),
shard_number: Some(2),
replication_factor: Some(2),
write_consistency_factor: Some(1),
..Default::default()
})
.create_collection(
CreateCollectionBuilder::new(collection_name.clone())
.vectors_config(
VectorParamsBuilder::new(
embedder.embedding_size() as u64,
qdrant::Distance::Cosine,
)
.distance(qdrant::Distance::Cosine)
.on_disk(true),
)
.hnsw_config(HnswConfigDiffBuilder::default().payload_m(16).m(0))
.optimizers_config(OptimizersConfigDiffBuilder::default().memmap_threshold(16384))
.quantization_config(Quantization::Scalar(qdrant::ScalarQuantization {
r#type: qdrant::QuantizationType::Int8.into(),
quantile: Some(0.99),
always_ram: Some(true),
}))
.on_disk_payload(true)
.sharding_method(qdrant::ShardingMethod::Custom.into())
.shard_number(2)
.replication_factor(2)
.write_consistency_factor(1),
)
.await?;

match res.result {
Expand All @@ -184,14 +167,13 @@ async fn create_qdrant_collection(

let operation_result = raw_client
.create_shard_key(
collection_name.clone(),
&qdrant::shard_key::Key::Keyword(shard_key.clone()),
// No need to pass shard_key and replication_factor; using the ones specified during collection creation.
None,
None,
&[],
CreateShardKeyRequestBuilder::new(collection_name.clone()).request(
CreateShardKeyBuilder::default()
.shard_key(qdrant::shard_key::Key::Keyword(shard_key.clone())),
),
)
.await?;
.await
.map_err(|e| anyhow!("Error creating shard key: {}", e))?;

match operation_result.result {
true => {
Expand All @@ -206,7 +188,15 @@ async fn create_qdrant_collection(
}?;
}

create_indexes_for_collection(&raw_client, &cluster, &collection_name).await?;
create_indexes_for_collection(&raw_client, &cluster, &collection_name)
.await
.map_err(|e| {
anyhow!(
"Error creating indexes for collection {}: {}",
collection_name,
e
)
})?;

Ok(())
}
Expand All @@ -224,7 +214,12 @@ async fn main() -> Result<(), anyhow::Error> {
std::process::exit(1);
}

create_qdrant_collection(args.cluster, args.provider, args.model).await?;
create_qdrant_collection(args.cluster, args.provider, args.model)
.await
.map_err(|e| {
eprintln!("Error creating collection: {}", e);
e
})?;

Ok(())
}
4 changes: 2 additions & 2 deletions core/bin/qdrant/migrate_embedder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn migrate_shadow_embedder(
qdrant::PointStruct::new(
r.id.unwrap(),
r.vectors.unwrap(),
Payload::new_from_hashmap(r.payload),
Payload::from(r.payload),
)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -512,7 +512,7 @@ async fn migrate_shadow_embedder(
.iter()
.map(|ci| match embeddings.get(&ci.hash) {
Some(v) => {
let payload = Payload::new_from_hashmap(ci.payload.clone());
let payload = Payload::from(ci.payload.clone());

let ps = qdrant::PointStruct::new(
ci.hash.to_string(),
Expand Down
Loading

0 comments on commit 2aa9b70

Please sign in to comment.