Skip to content

Commit

Permalink
Fix #79: surface get() errors (#89)
Browse files Browse the repository at this point in the history
* Fix #79: surface get errors

* Move test with dummy and sync_if_empty

* Add test to assert that get() reads storage by default

* Address @smarnach comments
  • Loading branch information
leplatrem authored Mar 22, 2021
1 parent caa960d commit 996e5e8
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 108 deletions.
260 changes: 170 additions & 90 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ impl std::fmt::Debug for Box<dyn Storage> {
}
}


impl Client {
/// Creates a `ClientBuilder` to configure a `Client`.
pub fn builder() -> ClientBuilder {
Expand Down Expand Up @@ -241,48 +240,45 @@ impl Client {
///
/// # Behaviour
/// * Return local data by default;
/// * If local data is empty or malformed, and if `sync_if_empty` is `true` (*default*),
/// * If local data is empty and if `sync_if_empty` is `true` (*default*),
/// then synchronize the local data with the server and return records, otherwise
/// return an empty list.
/// return an error.
///
/// Note: with the [`DummyStorage`], any call to `.get()` will trigger a synchronization.
///
/// Note: with `sync_if_empty` as `false`, if `.sync()` is never called then `.get()` will
/// always return an empty list.
/// always return an error.
///
/// # Errors
/// If an error occurs while fetching or verifying records, a [`ClientError`] is returned.
pub fn get(&mut self) -> Result<Vec<Record>, ClientError> {
let storage_key = self._storage_key();

debug!("Retrieve from storage with key={:?}", storage_key);
let stored_bytes: Vec<u8> = self
.storage
.retrieve(&storage_key)
// TODO: surface errors. See #79
.unwrap_or_default();
let stored: Option<Collection> = serde_json::from_slice(&stored_bytes).unwrap_or(None);

match stored {
Some(collection) => {
let read_result = self.storage.retrieve(&storage_key);

match read_result {
Ok(stored_bytes) => {
// Deserialize content of storage and surface error if fails.
let stored: Collection = serde_json::from_slice(&stored_bytes).map_err(|err| {
StorageError::ReadError(format!("cannot deserialize collection: {}", err))
})?;
// Verify signature of stored data (*optional*)
if !self.trust_local {
debug!("Verify signature of local data.");
self.verifier.verify(&collection)?;
self.verifier.verify(&stored)?;
}

Ok(collection.records)
Ok(stored.records)
}
None => {
if self.sync_if_empty {
debug!("Synchronize data, without knowning which timestamp to expect.");
let collection = self.sync(None)?;
return Ok(collection.records);
}
// TODO: this empty list should be «qualified». Is it empty because never synced
// or empty on the server too. (see Normandy suitabilities).
debug!("Local data is empty or malformed.");
Ok(Vec::new())
// If storage is empty, go on with sync() (*optional*)
Err(StorageError::KeyNotFound { .. }) if self.sync_if_empty => {
debug!("Synchronize data, without knowning which timestamp to expect.");
let collection = self.sync(None)?;
Ok(collection.records)
}
// Otherwise, surface the error.
Err(err) => Err(err.into()),
}
}

Expand Down Expand Up @@ -359,7 +355,7 @@ impl Client {
debug!("Store collection with key={:?}", storage_key);
let collection_bytes: Vec<u8> = serde_json::to_string(&collection)
.map_err(|err| {
StorageError::WriteError(format!("Cannot serialize collection: {}", err))
StorageError::WriteError(format!("cannot serialize collection: {}", err))
})?
.into();
self.storage.store(&storage_key, collection_bytes)?;
Expand Down Expand Up @@ -390,7 +386,7 @@ fn merge_changes(local_records: Vec<Record>, remote_changes: Vec<KintoObject>) -
#[cfg(test)]
mod tests {
use super::signatures::{SignatureError, Verification};
use super::{Client, Collection, MemoryStorage, Record};
use super::{Client, Collection, DummyStorage, DummyVerifier, MemoryStorage, Record};
use env_logger;
use httpmock::MockServer;
use serde_json::json;
Expand Down Expand Up @@ -424,13 +420,13 @@ mod tests {

#[test]
fn test_default_builder() {
let client = Client::builder()
.collection_name("cid")
.build()
.unwrap();
let client = Client::builder().collection_name("cid").build().unwrap();

// Assert defaults for consumers.
assert!(client.server_url.contains("services.mozilla.com"), client.server_url);
assert!(
client.server_url.contains("services.mozilla.com"),
client.server_url
);
assert_eq!(client.bucket_name, "main");
assert_eq!(client.sync_if_empty, true);
assert_eq!(client.trust_local, true);
Expand All @@ -439,18 +435,151 @@ mod tests {
}

#[test]
fn test_get_empty_storage() {
fn test_get_works_with_dummy_storage() {
init();

let mock_server = MockServer::start();
let mut get_latest_change_mock = mock_server.mock(|when, then| {
when.path("/buckets/monitor/collections/changes/changeset")
.query_param("_expected", "0");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "not-read",
"last_modified": 555,
"bucket": "main",
"collection": "top-sites"
}],
"timestamp": 555
}"#,
);
});

let mut get_changeset_mock = mock_server.mock(|when, then| {
when.path("/buckets/main/collections/top-sites/changeset")
.query_param("_expected", "555");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "record-1",
"last_modified": 555,
"foo": "bar"
}],
"timestamp": 555
}"#,
);
});

let mut client = Client::builder()
.server_url(&mock_server.url(""))
.collection_name("top-sites")
.storage(Box::new(DummyStorage {}))
.verifier(Box::new(DummyVerifier {}))
.build()
.unwrap();

let records = client.get().unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["foo"].as_str().unwrap(), "bar");

get_changeset_mock.assert_hits(1);
get_latest_change_mock.assert_hits(1);

// Calling again will pull from network.
let records_twice = client.get().unwrap();
assert_eq!(records_twice.len(), 1);

get_changeset_mock.assert_hits(2);
get_latest_change_mock.assert_hits(2);

get_changeset_mock.delete();
get_latest_change_mock.delete();
}

#[test]
fn test_get_with_empty_storage() {
init();

let mock_server = MockServer::start();
let mut get_latest_change_mock = mock_server.mock(|when, then| {
when.path("/buckets/monitor/collections/changes/changeset")
.query_param("_expected", "0");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "not-read",
"last_modified": 888,
"bucket": "main",
"collection": "pocket"
}],
"timestamp": 555
}"#,
);
});

let mut get_changeset_mock = mock_server.mock(|when, then| {
when.path("/buckets/main/collections/pocket/changeset")
.query_param("_expected", "888");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "record-1",
"last_modified": 888,
"foo": "bar"
}],
"timestamp": 555
}"#,
);
});

let mut client = Client::builder()
.server_url(&mock_server.url(""))
.collection_name("pocket")
.storage(Box::new(MemoryStorage::new()))
.verifier(Box::new(DummyVerifier {}))
.build()
.unwrap();

let records = client.get().unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["foo"].as_str().unwrap(), "bar");

get_changeset_mock.assert();
get_latest_change_mock.assert();

// Calling again won't pull from network.
let records_twice = client.get().unwrap();
assert_eq!(records_twice.len(), 1);

get_changeset_mock.assert_hits(1);
get_latest_change_mock.assert_hits(1);

get_changeset_mock.delete();
get_latest_change_mock.delete();
}

#[test]
fn test_get_empty_storage_no_sync_if_empty() {
init();
let mock_server = MockServer::start();

let mut client = Client::builder()
.server_url(mock_server.url(""))
.collection_name("url-classifier-skip-urls")
// Explicitly disable sync if empty.
.sync_if_empty(false)
.build()
.unwrap();

assert_eq!(client.get().unwrap().len(), 0);
let err = client.get().unwrap_err();
assert_eq!(
err.to_string(),
"storage I/O error: key could not be found: main/url-classifier-skip-urls:collection"
);
}

#[test]
Expand All @@ -461,13 +590,18 @@ mod tests {
let mut client = Client::builder()
.server_url(mock_server.url(""))
.collection_name("cfr")
.storage(Box::new(MemoryStorage::new()))
.sync_if_empty(false)
.build()
.unwrap();

client.storage.store("main/cfr", b"abc".to_vec()).unwrap();
client
.storage
.store("main/cfr:collection", b"abc".to_vec())
.unwrap();

assert_eq!(client.get().unwrap().len(), 0);
let err = client.get().unwrap_err();
assert_eq!(err.to_string(), "storage I/O error: cannot read from storage: cannot deserialize collection: expected value at line 1 column 1");
}

#[test]
Expand Down Expand Up @@ -576,60 +710,6 @@ mod tests {
get_changeset_mock.delete();
}

#[test]
fn test_get_works_with_dummy_storage() {
init();

let mock_server = MockServer::start();
let mut get_latest_change_mock = mock_server.mock(|when, then| {
when.path("/buckets/monitor/collections/changes/changeset")
.query_param("_expected", "0");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "not-read",
"last_modified": 555,
"bucket": "main",
"collection": "top-sites"
}],
"timestamp": 555
}"#,
);
});

let mut get_changeset_mock = mock_server.mock(|when, then| {
when.path("/buckets/main/collections/top-sites/changeset")
.query_param("_expected", "555");
then.body(
r#"{
"metadata": {},
"changes": [{
"id": "record-1",
"last_modified": 555,
"foo": "bar"
}],
"timestamp": 555
}"#,
);
});

let mut client = Client::builder()
.server_url(mock_server.url(""))
.collection_name("top-sites")
.build()
.unwrap();

let records = client.get().unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0]["foo"].as_str().unwrap(), "bar");

get_changeset_mock.assert();
get_changeset_mock.delete();
get_latest_change_mock.assert();
get_latest_change_mock.delete();
}

#[test]
fn test_sync_pulls_current_timestamp_from_changes_endpoint_if_none() {
init();
Expand Down
20 changes: 5 additions & 15 deletions src/client/kinto_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,9 @@ pub fn get_latest_change_timestamp(server: &str, bid: &str, cid: &str) -> Result
collection: cid.to_string(),
})?;

let last_modified =
change["last_modified"]
.as_u64()
.ok_or_else(|| KintoError::InvalidChangesetTimestamp(
change["last_modified"].to_string(),
))?;
let last_modified = change["last_modified"].as_u64().ok_or_else(|| {
KintoError::InvalidChangesetTimestamp(change["last_modified"].to_string())
})?;

debug!("{}/{}: last_modified={}", bid, cid, last_modified);

Expand Down Expand Up @@ -117,19 +114,12 @@ pub fn get_changeset(
// See https://docs.kinto-storage.org/en/stable/api/1.x/errors.html#error-responses
let info: ErrorResponse = match response.json() {
Ok(v) => v,
Err(_) => {
return Err(KintoError::UnexpectedResponse {
response,
})
}
Err(_) => return Err(KintoError::UnexpectedResponse { response }),
};

// Error due to the client. The request must be modified.
if response.is_client_error() {
return Err(KintoError::ClientRequestError {
response,
info,
});
return Err(KintoError::ClientRequestError { response, info });
}

if response.is_server_error() {
Expand Down
4 changes: 1 addition & 3 deletions src/client/signatures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ pub trait Verification {
debug!("Fetching certificate {}", x5u);
let response = Request::get(Url::parse(&x5u)?).send()?;
if !response.is_success() {
return Err(SignatureError::CertificateDownloadError {
response,
});
return Err(SignatureError::CertificateDownloadError { response });
}

Ok(response.body)
Expand Down

0 comments on commit 996e5e8

Please sign in to comment.