Skip to content

Commit

Permalink
Garbage collect blobs from file system (#571)
Browse files Browse the repository at this point in the history
* Return truthy value showing if purge occured from purge_blob

* Fix debug message

* Delete blob views from the filesystem

* More logging and improved naming

* Formatting

* Test for deleted document causing blob to be garbage collected

* Include 'relation' and 'relation_list' fields when calculating child documents

* Return immediately from get_child_document_ids when no children found

* Update CHANGELOG

* Improve comments and naming in garbage_collection task

* Add comments to get_child_document_ids store method

* Order query responses

* Use tokio async remove_file

* fmt
  • Loading branch information
sandreae authored Oct 11, 2023
1 parent 3ac8b70 commit 94bc8e9
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 77 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484)
- Task for automatic garbage collection of unused documents and views [#500](https://github.com/p2panda/aquadoggo/pull/500)
- Blobs directory configuration [#549](https://github.com/p2panda/aquadoggo/pull/549)
- Integrate `Bytes` operation value [554](https://github.com/p2panda/aquadoggo/pull/554/)
- Integrate `Bytes` operation value [#554](https://github.com/p2panda/aquadoggo/pull/554/)
- Implement dependency replication for `blob_v1` and `blob_piece_v1` documents [#514](https://github.com/p2panda/aquadoggo/pull/514)
- Remove deleted/unused blobs from the file system [#571](https://github.com/p2panda/aquadoggo/pull/571)

### Changed

Expand Down
7 changes: 4 additions & 3 deletions aquadoggo/src/db/stores/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ impl SqlStore {
}

/// Purge blob data from the node _if_ it is not related to from another document.
pub async fn purge_blob(&self, document_id: &DocumentId) -> Result<(), SqlStoreError> {
pub async fn purge_blob(&self, document_id: &DocumentId) -> Result<bool, SqlStoreError> {
// Collect the view id of any existing document views which contain a relation to the blob
// which is the purge target.
let blob_reverse_relations = reverse_relations(&self.pool, document_id, None).await?;

// If there are no documents referring to the blob then we continue with the purge.
if blob_reverse_relations.is_empty() {
let should_purge = blob_reverse_relations.is_empty();
if should_purge {
// Collect the document view ids of all pieces this blob has ever referred to in its
// `pieces`
let blob_piece_ids: Vec<String> = query_scalar(
Expand Down Expand Up @@ -227,7 +228,7 @@ impl SqlStore {
}
}

Ok(())
Ok(should_purge)
}

/// Get ids for all blob documents which are related to from any view of the passed document.
Expand Down
81 changes: 58 additions & 23 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ impl SqlStore {
document_views
WHERE
document_views.document_id = $1
ORDER BY
document_views.document_id
",
)
.bind(document_id.as_str())
Expand All @@ -401,38 +403,67 @@ impl SqlStore {
&self,
document_view_id: &DocumentViewId,
) -> Result<Vec<DocumentId>, DocumentStorageError> {
let document_view_ids: Vec<String> = query_scalar(
// Collect all ids or view ids of children related to from the passed document view.
let children_ids: Vec<String> = query_scalar(
"
SELECT
operation_fields_v1.value
FROM
document_view_fields
LEFT JOIN
operation_fields_v1
ON
document_view_fields.operation_id = operation_fields_v1.operation_id
AND
document_view_fields.name = operation_fields_v1.name
WHERE
operation_fields_v1.field_type IN (
'pinned_relation',
'pinned_relation_list',
'relation',
'relation_list'
)
AND
document_view_fields.document_view_id = $1
",
)
.bind(document_view_id.to_string())
.fetch_all(&self.pool)
.await
.map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;

// If no children were found return now already with an empty vec.
if children_ids.is_empty() {
return Ok(vec![]);
}

let args = children_ids
.iter()
.map(|id| format!("'{id}'"))
.collect::<Vec<String>>()
.join(",");

// Query for any document included in the list of children.
let document_ids: Vec<String> = query_scalar(&format!(
"
SELECT DISTINCT
document_views.document_id
FROM
document_views
WHERE
document_views.document_view_id
IN (
SELECT
operation_fields_v1.value
FROM
document_view_fields
LEFT JOIN
operation_fields_v1
ON
document_view_fields.operation_id = operation_fields_v1.operation_id
AND
document_view_fields.name = operation_fields_v1.name
WHERE
operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list')
AND
document_view_fields.document_view_id = $1
)
document_views.document_view_id IN ({})
OR
document_views.document_id IN ({})
ORDER BY
document_views.document_id ASC
",
)
.bind(document_view_id.to_string())
args, args
))
.fetch_all(&self.pool)
.await
.map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;

Ok(document_view_ids
Ok(document_ids
.iter()
.map(|document_id_str| {
document_id_str
Expand Down Expand Up @@ -503,8 +534,12 @@ impl SqlStore {
) -> Result<bool, DocumentStorageError> {
let document_view_id: Option<String> = query_scalar(
"
SELECT documents.document_view_id FROM documents
WHERE documents.document_view_id = $1
SELECT
documents.document_view_id
FROM
documents
WHERE
documents.document_view_id = $1
",
)
.bind(document_view_id.to_string())
Expand Down
Loading

0 comments on commit 94bc8e9

Please sign in to comment.