Skip to content

Commit

Permalink
blob_v1 and blob_piece_v1 dependency replication (#514)
Browse files Browse the repository at this point in the history
* WIP: collect document dependencies before calculating log heights

* Add some comments

* Complete included document logic for blob_v1 and blob_piece_v1 documents

* Clippy x fmt

* Add comments to test

* Fix comment

* Add another test

* Fixes after rebase

* Remove not needed format

* Remove deprecated store method

* Update blob tests

* Don't query db for log heights when included documents is empty

* fmt

* Update config.toml

* Small grammar fix

* Write blobs in lowercase, explain term and remove newline

* Update CHANGELOG

* Clippy

---------

Co-authored-by: adz <[email protected]>
  • Loading branch information
sandreae and adzialocha authored Sep 22, 2023
1 parent c968e0d commit 27c5069
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Task for automatic garbage collection of unused documents and views [#500](https://github.com/p2panda/aquadoggo/pull/500)
- Blobs directory configuration [#549](https://github.com/p2panda/aquadoggo/pull/549)
- Integrate `Bytes` operation value [554](https://github.com/p2panda/aquadoggo/pull/554/)
- Implement dependency replication for `blob_v1` and `blob_piece_v1` documents [#514](https://github.com/p2panda/aquadoggo/pull/514)

### Changed

Expand Down
52 changes: 52 additions & 0 deletions aquadoggo/src/db/stores/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,58 @@ impl SqlStore {

Ok(())
}

/// Get ids for all blob documents which are related to from any view of the passed document.
pub async fn get_blob_child_relations(
&self,
document_id: &DocumentId,
) -> Result<Vec<DocumentId>, SqlStoreError> {
let document_ids: Vec<String> = query_scalar(
"
SELECT DISTINCT
document_views.document_id
FROM
document_views
WHERE
document_views.schema_id = 'blob_v1'
AND
document_views.document_view_id
IN (
SELECT
operation_fields_v1.value
FROM
document_view_fields
LEFT JOIN
operation_fields_v1
ON
document_view_fields.operation_id = operation_fields_v1.operation_id
AND
document_view_fields.name = operation_fields_v1.name
LEFT JOIN
document_views
ON
document_view_fields.document_view_id = document_views.document_view_id
WHERE
operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list', 'relation_list', 'relation')
AND
document_views.document_id = $1
)
"
)
.bind(document_id.as_str())
.fetch_all(&self.pool)
.await
.map_err(|err| SqlStoreError::Transaction(err.to_string()))?;

Ok(document_ids
.iter()
.map(|document_id_str| {
document_id_str
.parse::<DocumentId>()
.expect("Document Id's coming from the store should be valid")
})
.collect())
}
}

/// Throws an error when database does not contain all related blob pieces yet.
Expand Down
58 changes: 18 additions & 40 deletions aquadoggo/src/db/stores/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::collections::HashMap;
use std::vec;

use async_trait::async_trait;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::entry::{EncodedEntry, Entry, LogId, SeqNum};
use p2panda_rs::hash::Hash;
use p2panda_rs::identity::PublicKey;
use p2panda_rs::operation::EncodedOperation;
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::error::EntryStorageError;
use p2panda_rs::storage_provider::traits::EntryStore;
use sqlx::{query, query_as};
Expand Down Expand Up @@ -184,11 +184,23 @@ impl EntryStore for SqlStore {
}

impl SqlStore {
pub async fn get_log_heights(
pub async fn get_document_log_heights(
&self,
schema_id: &SchemaId,
document_ids: &[DocumentId],
) -> Result<Vec<(PublicKey, Vec<(LogId, SeqNum)>)>, EntryStorageError> {
let log_height_rows = query_as::<_, LogHeightRow>(
// If no document ids were passed then don't query the database. Instead return an empty
// vec now already.
if document_ids.is_empty() {
return Ok(vec![]);
}

let document_ids_str: String = document_ids
.iter()
.map(|document_id| format!("'{}'", document_id.as_str()))
.collect::<Vec<String>>()
.join(", ");

let log_height_rows = query_as::<_, LogHeightRow>(&format!(
"
SELECT
entries.public_key,
Expand All @@ -200,14 +212,13 @@ impl SqlStore {
ON entries.log_id = logs.log_id
AND entries.public_key = logs.public_key
WHERE
logs.schema = $1
logs.document IN ({document_ids_str})
GROUP BY
entries.public_key, entries.log_id
ORDER BY
entries.public_key, CAST(entries.log_id AS NUMERIC)
",
)
.bind(schema_id.to_string())
))
.fetch_all(&self.pool)
.await
.map_err(|e| EntryStorageError::Custom(e.to_string()))?;
Expand Down Expand Up @@ -557,39 +568,6 @@ mod tests {
});
}

#[rstest]
fn get_log_heights(
#[from(populate_store_config)]
#[with(5, 5, 5)]
config: PopulateStoreConfig,
) {
test_runner(|node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
let (key_pairs, _) = populate_store(&node.context.store, &config).await;

let log_heights = node
.context
.store
.get_log_heights(config.schema.id())
.await
.unwrap();

assert_eq!(log_heights.len(), 5);

for (public_key, author_logs) in log_heights {
assert!(key_pairs
.iter()
.find(|key_pair| key_pair.public_key() == public_key)
.is_some());
assert_eq!(author_logs.len(), 5);

for (_, seq_num) in author_logs {
assert_eq!(seq_num.as_u64(), 5)
}
}
});
}

#[rstest]
fn get_entries_from(
#[from(populate_store_config)]
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/replication/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::schema::SchemaProvider;
#[derive(Debug, Clone)]
pub struct SyncIngest {
tx: ServiceSender,
schema_provider: SchemaProvider,
pub schema_provider: SchemaProvider,
}

impl SyncIngest {
Expand Down
18 changes: 16 additions & 2 deletions aquadoggo/src/replication/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ where
mode: &Mode,
local: bool,
) -> Vec<Message> {
let mut session = Session::new(session_id, target_set, mode, local, SUPPORT_LIVE_MODE);
let mut session = Session::new(
session_id,
target_set,
mode,
local,
SUPPORT_LIVE_MODE,
self.ingest.schema_provider.clone(),
);
let initial_messages = session.initial_messages(&self.store).await;

if let Some(sessions) = self.sessions.get_mut(remote_peer) {
Expand All @@ -111,7 +118,14 @@ where
mode: &Mode,
local: bool,
) {
let session = Session::new(session_id, target_set, mode, local, SUPPORT_LIVE_MODE);
let session = Session::new(
session_id,
target_set,
mode,
local,
SUPPORT_LIVE_MODE,
self.ingest.schema_provider.clone(),
);

if let Some(sessions) = self.sessions.get_mut(remote_peer) {
sessions.push(session);
Expand Down
14 changes: 10 additions & 4 deletions aquadoggo/src/replication/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::replication::traits::Strategy;
use crate::replication::{
LogHeightStrategy, Message, Mode, SchemaIdSet, SetReconciliationStrategy, StrategyResult,
};
use crate::schema::SchemaProvider;

pub type SessionId = u64;

Expand Down Expand Up @@ -55,9 +56,10 @@ impl Session {
mode: &Mode,
local: bool,
live_mode: bool,
schema_provider: SchemaProvider,
) -> Self {
let strategy: Box<dyn Strategy> = match mode {
Mode::LogHeight => Box::new(LogHeightStrategy::new(target_set)),
Mode::LogHeight => Box::new(LogHeightStrategy::new(target_set, schema_provider)),
Mode::SetReconciliation => Box::new(SetReconciliationStrategy::new()),
Mode::Unknown => panic!("Unknown replication mode"),
};
Expand Down Expand Up @@ -199,6 +201,7 @@ mod tests {
&Mode::LogHeight,
true,
false,
node.context.schema_provider.clone(),
);
assert!(!session.is_local_done);
assert!(!session.is_local_live_mode);
Expand All @@ -225,18 +228,20 @@ mod tests {
config: PopulateStoreConfig,
) {
test_runner_with_manager(move |manager: TestNodeManager| async move {
let mut node_a = manager.create().await;
let schema_provider = node_a.context.schema_provider.clone();
populate_and_materialize(&mut node_a, &config).await;

let target_set = SchemaIdSet::new(&vec![config.schema.id().to_owned()]);
let mut session = Session::new(
&INITIAL_SESSION_ID,
&target_set,
&Mode::LogHeight,
true,
false,
schema_provider.clone(),
);

let mut node_a = manager.create().await;
populate_and_materialize(&mut node_a, &config).await;

let response_messages = session
.handle_message(&node_a.context.store, &Message::Have(vec![]))
.await
Expand All @@ -255,6 +260,7 @@ mod tests {
&Mode::LogHeight,
true,
false,
schema_provider.clone(),
);

let node_b: TestNode = manager.create().await;
Expand Down
Loading

0 comments on commit 27c5069

Please sign in to comment.