Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover file store previous state at open #1632

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions crates/file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,51 @@ impl From<io::Error> for FileError {
}

impl std::error::Error for FileError {}

/// An error while opening or creating the file store
#[derive(Debug)]
pub enum StoreError {
/// Entry iter error
EntryIter {
/// Index that caused the error
index: usize,
/// Iter error
iter: IterError,
/// Amount of bytes read so far
bytes_read: u64,
},
/// IO error, this may mean that the file is too short.
Io(io::Error),
/// Magic bytes do not match what is expected.
InvalidMagicBytes { got: Vec<u8>, expected: Vec<u8> },
}

impl core::fmt::Display for StoreError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::EntryIter {
index,
iter,
bytes_read,
} => write!(
f,
"{}: changeset index={}, bytes read={}",
iter, index, bytes_read
),
Self::Io(e) => write!(f, "io error trying to read file: {}", e),
Self::InvalidMagicBytes { got, expected } => write!(
f,
"file has invalid magic bytes: expected={:?} got={:?}",
expected, got,
),
}
}
}

impl std::error::Error for StoreError {}

impl From<io::Error> for StoreError {
fn from(value: io::Error) -> Self {
Self::Io(value)
}
}
225 changes: 224 additions & 1 deletion crates/file_store/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{bincode_options, EntryIter, FileError, IterError};
use crate::{bincode_options, EntryIter, FileError, IterError, StoreError};
use bdk_chain::Merge;
use bincode::Options;
use std::{
Expand Down Expand Up @@ -92,6 +92,62 @@ where
})
}

/// Open an existing [`Store`], read its content, and return it ready to start receiving new
/// changesets.
///
/// Use [`create_new`] to create a new `Store`.
///
/// # Errors
///
/// If the prefixed bytes of the opened file does not match the provided `magic`, the
/// [`StoreError::InvalidMagicBytes`] error variant will be returned.
///
/// If there is an error while decoding the changesets stored, the [`StoreError::EntryIter`]
/// error variant will be returned, with the index of the failing changeset and the error it
/// caused.
///
/// [`create_new`]: Store::create_new
pub fn reopen<P>(magic: &[u8], file_path: P) -> Result<Self, StoreError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing some background but what's the point of doing this unless you return the data you read?

I think a PR here makes sense because we got rid of the "load" system we had before. Maybe this needs a big re-think including deleting lots of the API here. We probably just want append_changeset and create and load as the API. Does anyone need to actually iter changesets?

Copy link

@KnowWhoami KnowWhoami Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the FileStore has APIs that users likely don't need.

IMO, implementing these api's in the following fashion could make FileStore much simpler.

Key APIs:

  • append_changeset:
    We should ensure that duplicate changesets are not stored. Currently, users must call aggregate_changesets to remove duplicates, but the store isn't updated with the final unique changeset. Instead, we can make append_changeset automatically append the given changeset, aggregate them to remove duplicates, and then store the final aggregated changeset back into the FileStore.

  • load:
    As suggested by @ValuedMammal, it makes sense to open the FileStore in append mode rather than write mode. This will place the file pointer at the end of the file, avoiding overwriting and making it easier to append new changesets.

Other Considerations:

  • aggregate_changesets:
    While I'm not certain if downstream users will need this API, it seems valuable to keep it exposed for now.

  • Iteration Over Changesets:
    From a user's perspective, iterating changesets is unnecessary. Currently, it's only used to collect changesets for aggregation. We could either make it private or move the iteration logic directly into aggregate_changesets.

By handling changeset duplication within append_changeset, we can eliminate the need for users to manually aggregate changesets or iterate over them..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your vision.

In this PR I proposed a solution of compromise guided by the principle of stabilizing the API rather than breaking it. I don't know if that's currently something BDK wants to do.


As suggested by @ValuedMammal, it makes sense to open the FileStore in append mode rather than write mode. This will place the file pointer at the end of the file, avoiding overwriting and making it easier to append new changesets.

For example, I discarded the append solution because there seemed to be an expected behavior hidden behind the interaction of two functions:

  1. open not moving the file pointer to the EOF when accessing the file and,
  2. append_changeset setting the file pointer (and length) to the end of the last changeset written.

The interaction of both is what made the append_changeset_truncates_invalid_bytes test to pass in the original code, because 1 leaves the file pointer at the end of the MAGIC_BYTES and 2 removes the invalid bytes completely by shorting the file to the end of the last written changeset.


I'm probably missing some background but what's the point of doing this unless you return the data you read?

That's also why I don't provide the user with the well encoded changesets on error, as that would have implied a more complex structure, using generics (to consider the changeset).
I traded it with the minimum data to locate and isolate the failure in the file.

You may find a clearer idea of this in the fail_to_reopen_if_write_is_short test, at this line.


If there is a common agreement to make the refactoring above instead of trying to fix the current API I can close this PR and try to implement the changes in another one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we want to ensure duplicate changesets are not stored. They do no harm and only happen in the case of some kind of programmer error.

There's no issue with stabalizing the API of this crate I think. We can do what we want with it. To me at least, this reopen makes it make even less sense because it reads in every entry but doesn't return it which is a waste.

I agree with @nymius that opening the file with append is a nice idea in theory but it's actually counter productive here. There's no magic to opening files with append it just makes the operating system handle what we can do better.


I vote for a PR cutting the API down to:

  • create -- rename create_new.
  • load -- does what open does and aggregates the changeset and leaves the file pointer at the right spot. Errors if the file didn't exist or wrong magic bytes.
  • append -- does what append_changeset does
  • create_or_load -- does what open_or_create_new does

Then updating the README to explain that this is the simplest implementation of a bdk compatible database. It's useful for development but should probably not be used beyond that. No effort has been made to make it backwards compatible so you have to do that yourself. Link them to docs on sqlite feature for what they should use in production.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think load should essentially return Result<(C, Self), Error> so that we can get rid of .aggregate_changeset. This forces the caller to only read once.

Copy link

@KnowWhoami KnowWhoami Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interaction of both is what made the append_changeset_truncates_invalid_bytes test to pass in the original code, because 1 leaves the file pointer at the end of the MAGIC_BYTES and 2 removes the invalid bytes completely by shorting the file to the end of the last written changeset.

Got the point.

I don't understand why we want to ensure duplicate changesets are not stored.

On ensuring this -> we can get rid of iterating over the changeset each time user want to get the total changeset.

Instead, we can make append_changeset automatically append the given changeset, aggregate them to remove duplicates, and then store the final aggregated changeset back into the FileStore

Though in my proposed impl -> we have to iterate at the end -> so doesn't make much diference.

They do no harm and only happen in the case of some kind of programmer error.

Though I have no idea regarding this but if that's the case -> then it does not make sense to try to keep unique set of changesets.

I think load should essentially return Result<(C, Self), Error> so that we can get rid of .aggregate_changeset. This forces the caller to only read once.

Combining both the load/open and aggregate_changeset logic does make sense because IMO, both are correlated to each other as in order to load a wallet , a user has to get the aggregated changeset for which they have to load the store instance.
By this, we don't require to create another public api and users don't have to manually call a function to get the required changeset for loading wallet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On ensuring this -> we can get rid of iterating over the changeset each time user want to get the total changeset.

If loading gets too slow, we can just have a method to merge changesets. I.e. overwrite the file with just one entry, the new aggregate changeset. Or aggregate x number of changesets at the end.

It does NOT make sense to have a check that stops duplicate changesets being stored. How are we going to implement this? Have an aggregate changeset of all persisted changes in memory that ensures the new changeset is not a subset of it? That typically doesn't happen, and if it does, it means that there is a problem with the in-memory data structures. The in-memory data structures should only output a changeset because there are changes to it.

Copy link
Contributor Author

@nymius nymius Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think load should essentially return Result<(C, Self), Error> so that we can get rid of .aggregate_changeset. This forces the caller to only read once.

I have found an issue with this approach. The (C, Self) return type is hard to adapt to the WalletPersister::initialize trait.

By the documentation I don't have available any file path to load the store from and I must return all data stored in the persister (i.e. return the AggregatedChangeset):

  • The database schema of the persister (if any), should be initialized...
  • The implementation must return all data currently stored in the persister. If there is no data, return an empty changeset (using [ChangeSet::default()]).
  • some persister implementations may NOT require initialization at all (and not error).
    /// [`persist`]: WalletPersister::persist
    fn initialize(persister: &mut Self) -> Result<ChangeSet, Self::Error>;

file_store's trait implementation currently in master assumes persister (i.e. file_store::Store) has been opened or loaded before reaching the trait method (look how aggregate_changesets() is called without create_new/open ceremony).

#[cfg(feature = "file_store")]
impl WalletPersister for bdk_file_store::Store<ChangeSet> {
    type Error = FileStoreError;

    fn initialize(persister: &mut Self) -> Result<ChangeSet, Self::Error> {
        persister
            .aggregate_changesets()
            .map(Option::unwrap_or_default)
            .map_err(FileStoreError::Load)
    }

Summing up, the constraints for file_store::Store are:

  • Must return aggregated changeset.
  • Cannot load from file at WalletPersister::initialize because file_path is not at scope at that moment
  • including a file_path parameter in WalletPersister::initialize will break API and force changes on rusqlite implementation.

Following LLForun's idea, I propose the following new API for file_store::Store:

  • create: rename of create_new
  • load: open file, place file pointer at EOF and return file_store::Store.
  • dump: aggregate changesets and return the aggregated changeset.
  • append: rename of append_changeset
  • load_or_create: Store::create or Store::load.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on this I realized we can have both, (C, Store) as return type of Store::load and a Store::dump method to call on <bdk_file_store::Store<ChangeSet> as WalletPersister>::initialize, so I will keep both.

where
P: AsRef<Path>,
{
let mut f = OpenOptions::new().read(true).write(true).open(file_path)?;

let mut magic_buf = vec![0_u8; magic.len()];
f.read_exact(&mut magic_buf)?;
if magic_buf != magic {
return Err(StoreError::InvalidMagicBytes {
got: magic_buf,
expected: magic.to_vec(),
});
}

let mut store = Self {
magic_len: magic.len(),
db_file: f,
marker: Default::default(),
};

let mut index: usize = 0;
let mut error: Option<IterError> = None;
for (idx, next_changeset) in store.iter_changesets().enumerate() {
if let Err(iter_error) = next_changeset {
index = idx;
error = Some(iter_error);
};
}

if let Some(iter) = error {
return Err(StoreError::EntryIter {
index,
iter,
bytes_read: store.db_file.stream_position()?,
});
}

Ok(store)
}

/// Attempt to open existing [`Store`] file; create it if the file is non-existent.
///
/// Internally, this calls either [`open`] or [`create_new`].
Expand Down Expand Up @@ -438,4 +494,171 @@ mod test {
assert_eq!(aggregation, exp_aggregation);
}
}

#[test]
fn reopen_recovers_state_after_last_write() {
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("db_file");

let changeset1 = TestChangeSet::from(["1".into(), "2".into(), "3".into()]);
let changeset2 = TestChangeSet::from(["4".into(), "5".into(), "6".into()]);

{
// create new db
let mut db = Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path)
.expect("must create");

// append first changeset to db
db.append_changeset(&changeset1).expect("must succeed");
}

{
// open db
let mut db = Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, &file_path)
.expect("failed to load db");

// now append the second changeset
db.append_changeset(&changeset2).expect("must succeed");

// Retrieve stored changesets from the database
let stored_changesets = db
.aggregate_changesets()
.expect("must succeed")
.expect("must succeed");

// expected changeset must be changeset2 + changeset1
let mut expected_changeset = changeset2.clone();
expected_changeset.extend(changeset1);

// Assert that stored_changesets matches expected_changeset but not changeset2
assert_eq!(stored_changesets, expected_changeset);
assert_ne!(stored_changesets, changeset2);
}

// Open the store again to verify file pointer position at the EOF
let mut db = Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, &file_path)
.expect("failed to load db");

// get the current position of file pointer just after loading store
let current_pointer = db.db_file.stream_position().expect("must suceed");

// get pointer to last position for the loaded db
let expected_pointer = db.db_file.seek(io::SeekFrom::End(0)).expect("must succeed");

// both should match
assert_eq!(current_pointer, expected_pointer);
}

#[test]
fn fail_to_reopen_if_write_is_short() {
let temp_dir = tempfile::tempdir().unwrap();

let changesets = [
TestChangeSet::from(["1".into()]),
TestChangeSet::from(["2".into(), "3".into()]),
TestChangeSet::from(["4".into(), "5".into(), "6".into()]),
];
let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]);
let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap();

for short_write_len in 1..last_changeset_bytes.len() - 1 {
let file_path = temp_dir.path().join(format!("{}.dat", short_write_len));

// simulate creating a file, writing data where the last write is incomplete
{
let mut db =
Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
for changeset in &changesets {
db.append_changeset(changeset).unwrap();
}
// this is the incomplete write
db.db_file
.write_all(&last_changeset_bytes[..short_write_len])
.unwrap();
}

// Reopen file and fail, but recover once file is truncated to valid bytes
{
match Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, &file_path) {
Err(StoreError::EntryIter {
index, bytes_read, ..
}) => {
// Open file again and truncate file to valid content
let mut f = OpenOptions::new()
.read(true)
.write(true)
.open(&file_path)
.expect("should open");
f.set_len(bytes_read)
.expect("should truncate the file length to bytes_read");
f.seek(io::SeekFrom::End(0))
.expect("should position the file pointer to the new EOF");

// Once file is truncated reopen file again
let mut db = Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, &file_path)
.expect("should not fail now");
let exp_aggregation = db
.iter_changesets()
.take(index)
.map(|r| r.expect("must read valid changeset"))
.fold(TestChangeSet::default(), |mut acc, v| {
Merge::merge(&mut acc, v);
acc
});

assert_eq!(
exp_aggregation,
changesets
.iter()
.cloned()
.reduce(|mut acc, cs| {
Merge::merge(&mut acc, cs);
acc
})
.expect("should merge normally"),
"should recover all changesets that are written in full",
);

db.db_file.write_all(&last_changeset_bytes).unwrap();
}
_ => panic!("reopen must fail to read"),
}
}

// load file again - this time we should successfully read all changesets
{
let mut db = Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, &file_path).unwrap();
let aggregated_changesets = db
.aggregate_changesets()
.expect("aggregating all changesets should succeed");
assert_eq!(
aggregated_changesets,
changesets
.iter()
.cloned()
.chain(core::iter::once(last_changeset.clone()))
.reduce(|mut acc, cs| {
Merge::merge(&mut acc, cs);
acc
}),
"should recover all changesets",
);
}
}
}

#[test]
#[should_panic(
expected = "Byte 255 is treated as an extension point; it should not be encoding anything."
)]
fn reopen_fails_to_read_if_invalid_bytes() {
// initial data to write to file (magic bytes + invalid data)
let mut data = [255_u8; 2000];
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);

let mut file = NamedTempFile::new().unwrap();
file.write_all(&data).expect("should write");

Store::<TestChangeSet>::reopen(&TEST_MAGIC_BYTES, file.path()).unwrap();
}
}
Loading