Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Settings: store full changesets instead of just records #6517

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 116 additions & 34 deletions components/remote_settings/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
.get_last_modified_timestamp(&collection_url)?
.unwrap_or(0);
if packaged_data.timestamp > cached_timestamp {
inner
.storage
.set_records(&collection_url, &packaged_data.data)?;
// Remove previously cached data (packaged data does not have tombstones like diff responses do).
inner.storage.empty()?;
// Insert new packaged data.
inner.storage.insert_collection_content(
&collection_url,
&packaged_data.data,
packaged_data.timestamp,
CollectionMetadata::default(),
)?;
return Ok(Some(self.filter_records(packaged_data.data)));
}
}
Expand All @@ -168,9 +174,14 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
(Some(cached_records), _) => Some(self.filter_records(cached_records)),
// Case 3: sync_if_empty=true
(None, true) => {
let records = inner.api_client.get_records(None)?;
inner.storage.set_records(&collection_url, &records)?;
Some(self.filter_records(records))
let changeset = inner.api_client.fetch_changeset(None)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)?;
Some(self.filter_records(changeset.changes))
}
// Case 4: Nothing to return
(None, false) => None,
Expand All @@ -181,8 +192,13 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
let mut inner = self.inner.lock();
let collection_url = inner.api_client.collection_url();
let mtime = inner.storage.get_last_modified_timestamp(&collection_url)?;
let records = inner.api_client.get_records(mtime)?;
inner.storage.merge_records(&collection_url, &records)
let changeset = inner.api_client.fetch_changeset(mtime)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)
}

/// Downloads an attachment from [attachment_location]. NOTE: there are no guarantees about a
Expand Down Expand Up @@ -221,7 +237,7 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
}

// Try to download the attachment because neither the storage nor the local data had it
let attachment = inner.api_client.get_attachment(&metadata.location)?;
let attachment = inner.api_client.fetch_attachment(&metadata.location)?;

// Verify downloaded data
if attachment.len() as u64 != metadata.size {
Expand Down Expand Up @@ -284,10 +300,10 @@ pub trait ApiClient {
fn collection_url(&self) -> String;

/// Fetch records from the server
fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>>;
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse>;

/// Fetch an attachment from the server
fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;

/// Check if this client is pointing to the production server
fn is_prod_server(&self) -> Result<bool>;
Expand Down Expand Up @@ -372,7 +388,7 @@ impl ApiClient for ViaductApiClient {
self.endpoints.collection_url.to_string()
}

fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>> {
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse> {
let mut url = self.endpoints.changeset_url.clone();
// 0 is used as an arbitrary value for `_expected` because the current implementation does
// not leverage push timestamps or polling from the monitor/changes endpoint. More
Expand All @@ -388,7 +404,7 @@ impl ApiClient for ViaductApiClient {
let resp = self.make_request(url)?;

if resp.is_success() {
Ok(resp.json::<ChangesetResponse>()?.changes)
Ok(resp.json::<ChangesetResponse>()?)
} else {
Err(Error::ResponseError(format!(
"status code: {}",
Expand All @@ -397,7 +413,7 @@ impl ApiClient for ViaductApiClient {
}
}

fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
let attachments_base_url = match &self.remote_state.attachments_base_url {
Some(attachments_base_url) => attachments_base_url.to_owned(),
None => {
Expand Down Expand Up @@ -706,9 +722,24 @@ struct RecordsResponse {
data: Vec<RemoteSettingsRecord>,
}

#[derive(Deserialize, Serialize)]
struct ChangesetResponse {
#[derive(Clone, Deserialize, Serialize)]
pub struct ChangesetResponse {
changes: Vec<RemoteSettingsRecord>,
timestamp: u64,
metadata: CollectionMetadata,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionMetadata {
pub bucket: String,
pub signature: CollectionSignature,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionSignature {
pub signature: String,
/// X.509 certificate chain Url (x5u)
pub x5u: String,
}

/// A parsed Remote Settings record. Records can contain arbitrary fields, so clients
Expand All @@ -717,6 +748,7 @@ struct ChangesetResponse {
pub struct RemoteSettingsRecord {
pub id: String,
pub last_modified: u64,
/// Tombstone flag (see https://remote-settings.readthedocs.io/en/latest/client-specifications.html#local-state)
#[serde(default)]
pub deleted: bool,
pub attachment: Option<Attachment>,
Expand Down Expand Up @@ -1705,14 +1737,24 @@ mod test_new_client {
attachment: None,
fields: json!({"foo": "bar"}).as_object().unwrap().clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand Down Expand Up @@ -1748,14 +1790,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1766,9 +1813,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1799,14 +1848,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1817,9 +1871,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1902,7 +1958,12 @@ mod cached_data_tests {

let mut api_client = MockApiClient::new();
let mut storage = Storage::new(":memory:".into())?;
storage.set_records(collection_url, &vec![old_record.clone()])?;
storage.insert_collection_content(
collection_url,
&vec![old_record.clone()],
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -1962,10 +2023,21 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
let changeset = ChangesetResponse {
changes: expected_records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client
.expect_get_records()
.expect_fetch_changeset()
.withf(|timestamp| timestamp.is_none())
.returning(move |_| Ok(expected_records.clone()));
.returning(move |_| Ok(changeset.clone()));

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2012,7 +2084,7 @@ mod cached_data_tests {
api_client.expect_is_prod_server().returning(|| Ok(true));

// Since sync_if_empty is false, get_records should not be called
// No need to set expectation for api_client.get_records
// No need to set expectation for api_client.fetch_changeset

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2046,7 +2118,12 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2082,7 +2159,12 @@ mod cached_data_tests {

// Set up empty cached records
let cached_records: Vec<RemoteSettingsRecord> = vec![];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2210,7 +2292,7 @@ mod test_packaged_metadata {
.returning(move || collection_url.clone());
api_client.expect_is_prod_server().returning(|| Ok(true));
api_client
.expect_get_attachment()
.expect_fetch_attachment()
.returning(move |_| Ok(mock_api_data.clone()));

let rs_client =
Expand Down
Loading