diff --git a/CHANGELOG.md b/CHANGELOG.md index 432887990..eca96d49e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484) - Task for automatic garbage collection of unused documents and views [#500](https://github.com/p2panda/aquadoggo/pull/500) - Blobs directory configuration [#549](https://github.com/p2panda/aquadoggo/pull/549) -- Integrate `Bytes` operation value [554](https://github.com/p2panda/aquadoggo/pull/554/) +- Integrate `Bytes` operation value [#554](https://github.com/p2panda/aquadoggo/pull/554/) - Implement dependency replication for `blob_v1` and `blob_piece_v1` documents [#514](https://github.com/p2panda/aquadoggo/pull/514) +- Remove deleted/unused blobs from the file system [#571](https://github.com/p2panda/aquadoggo/pull/571) ### Changed diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index ce6ccbbc3..a1e7304d6 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -176,13 +176,14 @@ impl SqlStore { } /// Purge blob data from the node _if_ it is not related to from another document. - pub async fn purge_blob(&self, document_id: &DocumentId) -> Result<(), SqlStoreError> { + pub async fn purge_blob(&self, document_id: &DocumentId) -> Result { // Collect the view id of any existing document views which contain a relation to the blob // which is the purge target. let blob_reverse_relations = reverse_relations(&self.pool, document_id, None).await?; // If there are no documents referring to the blob then we continue with the purge. - if blob_reverse_relations.is_empty() { + let should_purge = blob_reverse_relations.is_empty(); + if should_purge { // Collect the document view ids of all pieces this blob has ever referred to in its // `pieces` let blob_piece_ids: Vec = query_scalar( @@ -227,7 +228,7 @@ impl SqlStore { } } - Ok(()) + Ok(should_purge) } /// Get ids for all blob documents which are related to from any view of the passed document. diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 260596770..a219b9783 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -379,6 +379,8 @@ impl SqlStore { document_views WHERE document_views.document_id = $1 + ORDER BY + document_views.document_id ", ) .bind(document_id.as_str()) @@ -401,38 +403,67 @@ impl SqlStore { &self, document_view_id: &DocumentViewId, ) -> Result, DocumentStorageError> { - let document_view_ids: Vec = query_scalar( + // Collect all ids or view ids of children related to from the passed document view. + let children_ids: Vec = query_scalar( + " + SELECT + operation_fields_v1.value + FROM + document_view_fields + LEFT JOIN + operation_fields_v1 + ON + document_view_fields.operation_id = operation_fields_v1.operation_id + AND + document_view_fields.name = operation_fields_v1.name + WHERE + operation_fields_v1.field_type IN ( + 'pinned_relation', + 'pinned_relation_list', + 'relation', + 'relation_list' + ) + AND + document_view_fields.document_view_id = $1 + ", + ) + .bind(document_view_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + // If no children were found return now already with an empty vec. + if children_ids.is_empty() { + return Ok(vec![]); + } + + let args = children_ids + .iter() + .map(|id| format!("'{id}'")) + .collect::>() + .join(","); + + // Query for any document included in the list of children. + let document_ids: Vec = query_scalar(&format!( " SELECT DISTINCT document_views.document_id FROM document_views WHERE - document_views.document_view_id - IN ( - SELECT - operation_fields_v1.value - FROM - document_view_fields - LEFT JOIN - operation_fields_v1 - ON - document_view_fields.operation_id = operation_fields_v1.operation_id - AND - document_view_fields.name = operation_fields_v1.name - WHERE - operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list') - AND - document_view_fields.document_view_id = $1 - ) + document_views.document_view_id IN ({}) + OR + document_views.document_id IN ({}) + ORDER BY + document_views.document_id ASC ", - ) - .bind(document_view_id.to_string()) + args, args + )) .fetch_all(&self.pool) .await .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; - Ok(document_view_ids + Ok(document_ids .iter() .map(|document_id_str| { document_id_str @@ -503,8 +534,12 @@ impl SqlStore { ) -> Result { let document_view_id: Option = query_scalar( " - SELECT documents.document_view_id FROM documents - WHERE documents.document_view_id = $1 + SELECT + documents.document_view_id + FROM + documents + WHERE + documents.document_view_id = $1 ", ) .bind(document_view_id.to_string()) diff --git a/aquadoggo/src/materializer/tasks/garbage_collection.rs b/aquadoggo/src/materializer/tasks/garbage_collection.rs index 688785e8c..0e109652e 100644 --- a/aquadoggo/src/materializer/tasks/garbage_collection.rs +++ b/aquadoggo/src/materializer/tasks/garbage_collection.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use tokio::fs::remove_file; + use log::debug; use p2panda_rs::document::DocumentViewId; use p2panda_rs::operation::traits::AsOperation; @@ -19,11 +21,13 @@ pub async fn garbage_collection_task(context: Context, input: TaskInput) -> Task // This task is concerned with a document which may now have dangling views. We want // to check for this and delete any views which are no longer needed. debug!( - "Prune document views for document: {}", + "Garbage collect views for document: {}", document_id.display() ); // Collect the ids of all views for this document. + // + // Does not include the current view of a deleted document. let all_document_view_ids: Vec = context .store .get_all_document_view_ids(&document_id) @@ -35,9 +39,15 @@ pub async fn garbage_collection_task(context: Context, input: TaskInput) -> Task // // Deletes on "document_views" cascade to "document_view_fields" so rows there are also removed // from the database. - let mut all_effected_child_relations = vec![]; - let mut deleted_views_count = 0; + // + // During iteration we collect ids for all effected child documents, deleted views and + // remaining views. + let mut effected_child_documents = vec![]; + let mut deleted_views = vec![]; + let mut remaining_views = vec![]; for document_view_id in &all_document_view_ids { + debug!("Handling view with id: {}", document_view_id); + // Check if this is the current view of its document. This will still return true // if the document in question is deleted. let is_current_view = context @@ -46,19 +56,32 @@ pub async fn garbage_collection_task(context: Context, input: TaskInput) -> Task .await .map_err(|err| TaskError::Critical(err.to_string()))?; - let mut effected_child_relations = vec![]; + debug!("Current view: {}", is_current_view); + + let mut child_relations = vec![]; let mut view_deleted = false; + // Skip this step if this is the current view as this shouldn't be garbage + // collected in any case (blobs are an exception which we deal with below). if !is_current_view { - // Before attempting to delete this view we need to fetch the ids of any child documents - // which might have views that could become unpinned as a result of this delete. These - // will be returned if the deletion is successful. - effected_child_relations = context + // Before attempting to delete this view we need to fetch the ids of any child + // documents which might have views which would become unpinned as a result of + // this delete. New garbage collection tasks will be issued for each document if + // the deletion of this view is successful. + // + // We include children referred to by "regular" relation here as well in order + // to correctly issue garbage collection tasks for any documents (for example + // `blob` documents) which should be purged entirely if no relation to them exists. + child_relations = context .store .get_child_document_ids(document_view_id) .await .map_err(|err| TaskError::Critical(err.to_string()))?; + for document_id in child_relations.iter() { + debug!("Child relation: {}", document_id); + } + // Attempt to delete the view. If it is pinned from an existing view the deletion will // not go ahead. view_deleted = context @@ -68,43 +91,70 @@ pub async fn garbage_collection_task(context: Context, input: TaskInput) -> Task .map_err(|err| TaskError::Critical(err.to_string()))?; } - // If the view was deleted then push the effected children to the return array if view_deleted { debug!("Deleted view: {}", document_view_id); - deleted_views_count += 1; - all_effected_child_relations.extend(effected_child_relations); + deleted_views.push(document_view_id); + effected_child_documents.extend(child_relations); } else { debug!("Did not delete view: {}", document_view_id); + remaining_views.push(document_view_id); } } - // If the number of deleted views equals the total existing views (minus one for the - // current view), then there is a chance this became completely detached. In this case - // we should check if this document is a blob document and then try to purge it. - if all_document_view_ids.len() - 1 == deleted_views_count { - let operation = context + // Retrieve the schema id for this document. + let operation = context + .store + .get_operation(&document_id.as_str().parse().unwrap()) + .await + .map_err(|err| TaskError::Failure(err.to_string()))? + .expect("Operation exists in store"); + + let is_blob = matches!(operation.schema_id(), SchemaId::Blob(1)); + + // If the number of remaining views is equal to one (the current view) and this is a + // blob document then we should attempt to purge the blob completely from the store + // and filesystem. + if remaining_views.len() == 1 && is_blob { + // Attempt to purge the blob and all its pieces. This only succeeds if no document + // refers to the blob document by either a relation or pinned relation. + let purge_success = context .store - .get_operation(&document_id.as_str().parse().unwrap()) + .purge_blob(&document_id) .await - .map_err(|err| TaskError::Failure(err.to_string()))? - .expect("Operation exists in store"); + .map_err(|err| TaskError::Failure(err.to_string()))?; - if let SchemaId::Blob(_) = operation.schema_id() { - // Purge the blob and all its pieces. This only succeeds if no document refers - // to the blob document by either a relation or pinned relation. - context - .store - .purge_blob(&document_id) + // If the purging succeeded add the current view to the deleted views array. + if purge_success { + debug!("Purged blob from the database: {}", document_id); + + // Push the blobs current view id to the deleted views array. + // + // Unwrap as we checked length above. + let current_view_id = remaining_views.pop().unwrap(); + deleted_views.push(current_view_id); + } + } + + // We now remove all deleted blob views from the filesystem. + if is_blob { + for view_id in deleted_views { + // Delete this blob view from the filesystem also. + let blob_view_path = context.config.blobs_base_path.join(view_id.to_string()); + remove_file(blob_view_path.clone()) .await - .map_err(|err| TaskError::Failure(err.to_string()))?; + .map_err(|err| TaskError::Critical(err.to_string()))?; + debug!("Deleted blob view from filesystem: {}", view_id); } } // We compose some more prune tasks based on the effected documents returned above. - let next_tasks: Vec> = all_effected_child_relations + let next_tasks: Vec> = effected_child_documents .iter() .map(|document_id| { - debug!("Issue prune task for document: {document_id:#?}"); + debug!( + "Dispatch garbage_collection task for document: {}", + document_id.display() + ); Task::new( "garbage_collection", TaskInput::DocumentId(document_id.to_owned()), @@ -124,6 +174,8 @@ pub async fn garbage_collection_task(context: Context, input: TaskInput) -> Task #[cfg(test)] mod tests { + use std::fs; + use p2panda_rs::document::DocumentId; use p2panda_rs::identity::KeyPair; use p2panda_rs::schema::SchemaId; @@ -131,17 +183,18 @@ mod tests { use p2panda_rs::test_utils::fixtures::{key_pair, random_document_view_id}; use rstest::rstest; - use crate::materializer::tasks::garbage_collection_task; + use crate::materializer::tasks::{blob_task, garbage_collection_task}; use crate::materializer::{Task, TaskInput}; use crate::test_utils::{ - add_blob, add_schema_and_documents, assert_query, test_runner, update_document, TestNode, + add_blob, add_schema_and_documents, assert_query, delete_document, test_runner, + update_document, TestNode, }; #[rstest] fn e2e_pruning(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { // Publish some documents which we will later point relations at. - let (child_schema, child_document_view_ids) = add_schema_and_documents( + let (child_schema, mut child_document_view_ids) = add_schema_and_documents( &mut node, "schema_for_child", vec![ @@ -152,6 +205,8 @@ mod tests { ) .await; + child_document_view_ids.sort(); + // Create some parent documents which contain a pinned relation list pointing to the // children created above. let (parent_schema, parent_document_view_ids) = add_schema_and_documents( @@ -303,7 +358,6 @@ mod tests { next_tasks, child_document_view_ids .iter() - .rev() .map(|document_view_id| { let document_id: DocumentId = document_view_id.to_string().parse().unwrap(); Task::new("garbage_collection", TaskInput::DocumentId(document_id)) @@ -374,7 +428,7 @@ mod tests { } #[rstest] - fn purges_blobs(key_pair: KeyPair) { + fn purges_blob_from_store(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { // Publish a blob let blob_document_view = add_blob( @@ -396,7 +450,15 @@ mod tests { .unwrap(); assert!(blob.is_some()); - // Run a garbage collection task for the blob document + // Run a blob task which persists the blob to the filesystem. + let _next_tasks = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_document_view.clone()), + ) + .await + .unwrap(); + + // Run a garbage collection task for the blob document. let next_tasks = garbage_collection_task( node.context.clone(), TaskInput::DocumentId(blob_document_id.clone()), @@ -427,6 +489,54 @@ mod tests { }); } + #[rstest] + fn purges_blob_from_filesystem(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Publish a blob + let blob_document_view = add_blob( + &mut node, + "Hello World!".as_bytes(), + 6, + "text/plain", + &key_pair, + ) + .await; + + let blob_document_id: DocumentId = blob_document_view.to_string().parse().unwrap(); + + // Run a blob task which persists the blob to the filesystem. + let _next_tasks = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_document_view.clone()), + ) + .await + .unwrap(); + + let blob_view_path = node + .context + .config + .blobs_base_path + .join(blob_document_view.to_string()); + + let result = fs::read(blob_view_path.clone()); + assert!(result.is_ok()); + + // Run a garbage collection task for the blob document. + let next_tasks = garbage_collection_task( + node.context.clone(), + TaskInput::DocumentId(blob_document_id.clone()), + ) + .await + .unwrap(); + + // It shouldn't return any new tasks + assert!(next_tasks.is_none()); + + let result = fs::read(blob_view_path); + assert!(result.is_err()); + }); + } + #[rstest] fn purges_newly_detached_blobs(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { @@ -435,6 +545,14 @@ mod tests { let blob_view_id = add_blob(&mut node, &blob_data, 6, "text/plain", &key_pair).await; let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + // Run a blob task which persists the blob to the filesystem. + let _next_tasks = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_view_id.clone()), + ) + .await + .unwrap(); + // Relate to the blob from a new document let (schema, documents_pinning_blob) = add_schema_and_documents( &mut node, @@ -476,7 +594,7 @@ mod tests { // No new tasks issued assert!(next_tasks.is_none()); - // The blob has correctly been purged + // The blob has correctly been purged from the store let blob = node .context .store @@ -500,6 +618,108 @@ mod tests { 0, ) .await; + + // And it no longer exists on the file system. + let blob_view_path = node + .context + .config + .blobs_base_path + .join(blob_view_id.to_string()); + + let result = fs::read(blob_view_path.clone()); + assert!(result.is_err()); + }) + } + + #[rstest] + fn purges_blob_of_deleted_document(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Create a blob document + let blob_data = "Hello, World!".as_bytes(); + let blob_view_id = add_blob(&mut node, &blob_data, 6, "text/plain", &key_pair).await; + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // Run a blob task which persists the blob to the filesystem. + let _next_tasks = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_view_id.clone()), + ) + .await + .unwrap(); + + // Relate to the blob from a new document. + let (schema, documents_pinning_blob) = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + // Now delete the document. This means the previously created blob is now "dangling". + delete_document( + &mut node, + schema.id(), + &documents_pinning_blob[0].clone(), + &key_pair, + ) + .await; + + // Run a task for the parent document + let document_id: DocumentId = documents_pinning_blob[0].to_string().parse().unwrap(); + let next_tasks = + garbage_collection_task(node.context.clone(), TaskInput::DocumentId(document_id)) + .await + .unwrap() + .unwrap(); + + // It issues one new task which is for the blob document + assert_eq!(next_tasks.len(), 1); + let next_tasks = + garbage_collection_task(node.context.clone(), next_tasks[0].input().to_owned()) + .await + .unwrap(); + // No new tasks issued + assert!(next_tasks.is_none()); + + // The blob has correctly been purged from the store + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + + assert!(blob.is_none()); + + // And all expected rows deleted from the database. + assert_query( + &node, + "SELECT operation_id FROM operations_v1 WHERE schema_id = 'blob_v1'", + 0, + ) + .await; + assert_query(&node, "SELECT log_id FROM logs WHERE schema = 'blob_v1'", 1).await; + assert_query( + &node, + "SELECT document_id FROM documents WHERE schema_id = 'blob_v1'", + 0, + ) + .await; + + // And it no longer exists on the file system. + let blob_view_path = node + .context + .config + .blobs_base_path + .join(blob_view_id.to_string()); + + let result = fs::read(blob_view_path.clone()); + assert!(result.is_err()); }) } @@ -511,6 +731,14 @@ mod tests { let blob_view_id = add_blob(&mut node, &blob_data, 6, "text/plain", &key_pair).await; let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + // Run a blob task which persists the blob to the filesystem. + let _next_tasks = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_view_id.clone()), + ) + .await + .unwrap(); + // Relate to the blob from a new document. let (schema, documents_pinning_blob) = add_schema_and_documents( &mut node, @@ -574,6 +802,16 @@ mod tests { .unwrap(); assert!(blob.is_some()); + + // And it should still be on the file system. + let blob_view_path = node + .context + .config + .blobs_base_path + .join(blob_view_id.to_string()); + + let result = fs::read(blob_view_path.clone()); + assert!(result.is_ok()); }) } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index e86e91fd9..7d077c8a4 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -254,7 +254,7 @@ async fn reduce_document + WithPublicKey>( if document.is_deleted() || document.is_edited() { debug!( - "Dispatch prune task for document with id: {}", + "Dispatch garbage collection task for document with id: {}", document.id() ); diff --git a/aquadoggo/src/test_utils/mod.rs b/aquadoggo/src/test_utils/mod.rs index 6ccd10816..acfe52dae 100644 --- a/aquadoggo/src/test_utils/mod.rs +++ b/aquadoggo/src/test_utils/mod.rs @@ -12,7 +12,7 @@ pub use config::TestConfiguration; pub use db::{drop_database, initialize_db, initialize_sqlite_db}; pub use helpers::{doggo_fields, doggo_schema, generate_key_pairs, schema_from_fields}; pub use node::{ - add_blob, add_document, add_schema, add_schema_and_documents, assert_query, + add_blob, add_document, add_schema, add_schema_and_documents, assert_query, delete_document, populate_and_materialize, populate_store, populate_store_config, update_blob, update_document, PopulateStoreConfig, TestNode, }; diff --git a/aquadoggo/src/test_utils/node.rs b/aquadoggo/src/test_utils/node.rs index 535a2137b..5f0a9c7cc 100644 --- a/aquadoggo/src/test_utils/node.rs +++ b/aquadoggo/src/test_utils/node.rs @@ -308,23 +308,52 @@ pub async fn update_document( .expect("Can get document id"); let input = TaskInput::DocumentId(document_id); - let next_tasks = reduce_task(node.context.clone(), input.clone()) + let _next_tasks = reduce_task(node.context.clone(), input.clone()) .await .expect("Reduce document"); - // Run dependency tasks - if let Some(tasks) = next_tasks { - // We only want to issue dependency tasks. - let dependency_tasks = tasks - .iter() - .filter(|task| task.worker_name() == "dependency"); + DocumentViewId::from(entry_signed.hash()) +} + +/// Helper method for deleting documents. +pub async fn delete_document( + node: &mut TestNode, + schema_id: &SchemaId, + previous: &DocumentViewId, + key_pair: &KeyPair, +) -> DocumentViewId { + // Get requested schema from store. + let schema = node + .context + .schema_provider + .get(schema_id) + .await + .expect("Schema not found"); + + // Build, publish and reduce an update operation for document. + let delete_op = OperationBuilder::new(schema.id()) + .action(OperationAction::Delete) + .previous(previous) + .build() + .expect("Build operation"); + + let (entry_signed, _) = send_to_store(&node.context.store, &delete_op, &schema, key_pair) + .await + .expect("Publish UPDATE operation"); + + let document_id = node + .context + .store + .get_document_id_by_operation_id(&OperationId::from(entry_signed.hash())) + .await + .expect("No db errors") + .expect("Can get document id"); + + let input = TaskInput::DocumentId(document_id); + let _next_tasks = reduce_task(node.context.clone(), input.clone()) + .await + .expect("Reduce document"); - for task in dependency_tasks { - dependency_task(node.context.clone(), task.input().to_owned()) - .await - .expect("Run dependency task"); - } - } DocumentViewId::from(entry_signed.hash()) }