Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Settings: store full changesets instead of just records #6517

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 116 additions & 34 deletions components/remote_settings/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
.get_last_modified_timestamp(&collection_url)?
.unwrap_or(0);
if packaged_data.timestamp > cached_timestamp {
inner
.storage
.set_records(&collection_url, &packaged_data.data)?;
// Remove previously cached data (packaged data does not have tombstones like diff responses do).
inner.storage.empty()?;
// Insert new packaged data.
inner.storage.insert_collection_content(
&collection_url,
&packaged_data.data,
packaged_data.timestamp,
CollectionMetadata::default(),
)?;
return Ok(Some(self.filter_records(packaged_data.data)));
}
}
Expand All @@ -168,9 +174,14 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
(Some(cached_records), _) => Some(self.filter_records(cached_records)),
// Case 3: sync_if_empty=true
(None, true) => {
let records = inner.api_client.get_records(None)?;
inner.storage.set_records(&collection_url, &records)?;
Some(self.filter_records(records))
let changeset = inner.api_client.fetch_changeset(None)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)?;
Some(self.filter_records(changeset.changes))
}
// Case 4: Nothing to return
(None, false) => None,
Expand All @@ -181,8 +192,13 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
let mut inner = self.inner.lock();
let collection_url = inner.api_client.collection_url();
let mtime = inner.storage.get_last_modified_timestamp(&collection_url)?;
let records = inner.api_client.get_records(mtime)?;
inner.storage.merge_records(&collection_url, &records)
let changeset = inner.api_client.fetch_changeset(mtime)?;
inner.storage.insert_collection_content(
&collection_url,
&changeset.changes,
changeset.timestamp,
changeset.metadata,
)
}

/// Downloads an attachment from [attachment_location]. NOTE: there are no guarantees about a
Expand Down Expand Up @@ -221,7 +237,7 @@ impl<C: ApiClient> RemoteSettingsClient<C> {
}

// Try to download the attachment because neither the storage nor the local data had it
let attachment = inner.api_client.get_attachment(&metadata.location)?;
let attachment = inner.api_client.fetch_attachment(&metadata.location)?;

// Verify downloaded data
if attachment.len() as u64 != metadata.size {
Expand Down Expand Up @@ -284,10 +300,10 @@ pub trait ApiClient {
fn collection_url(&self) -> String;

/// Fetch records from the server
fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>>;
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse>;

/// Fetch an attachment from the server
fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>>;

/// Check if this client is pointing to the production server
fn is_prod_server(&self) -> Result<bool>;
Expand Down Expand Up @@ -372,7 +388,7 @@ impl ApiClient for ViaductApiClient {
self.endpoints.collection_url.to_string()
}

fn get_records(&mut self, timestamp: Option<u64>) -> Result<Vec<RemoteSettingsRecord>> {
fn fetch_changeset(&mut self, timestamp: Option<u64>) -> Result<ChangesetResponse> {
let mut url = self.endpoints.changeset_url.clone();
// 0 is used as an arbitrary value for `_expected` because the current implementation does
// not leverage push timestamps or polling from the monitor/changes endpoint. More
Expand All @@ -388,7 +404,7 @@ impl ApiClient for ViaductApiClient {
let resp = self.make_request(url)?;

if resp.is_success() {
Ok(resp.json::<ChangesetResponse>()?.changes)
Ok(resp.json::<ChangesetResponse>()?)
} else {
Err(Error::ResponseError(format!(
"status code: {}",
Expand All @@ -397,7 +413,7 @@ impl ApiClient for ViaductApiClient {
}
}

fn get_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
fn fetch_attachment(&mut self, attachment_location: &str) -> Result<Vec<u8>> {
let attachments_base_url = match &self.remote_state.attachments_base_url {
Some(attachments_base_url) => attachments_base_url.to_owned(),
None => {
Expand Down Expand Up @@ -706,9 +722,24 @@ struct RecordsResponse {
data: Vec<RemoteSettingsRecord>,
}

#[derive(Deserialize, Serialize)]
struct ChangesetResponse {
#[derive(Clone, Deserialize, Serialize)]
pub struct ChangesetResponse {
changes: Vec<RemoteSettingsRecord>,
timestamp: u64,
metadata: CollectionMetadata,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionMetadata {
pub bucket: String,
pub signature: CollectionSignature,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct CollectionSignature {
pub signature: String,
/// X.509 certificate chain Url (x5u)
pub x5u: String,
}

/// A parsed Remote Settings record. Records can contain arbitrary fields, so clients
Expand All @@ -717,6 +748,7 @@ struct ChangesetResponse {
pub struct RemoteSettingsRecord {
pub id: String,
pub last_modified: u64,
/// Tombstone flag (see https://remote-settings.readthedocs.io/en/latest/client-specifications.html#local-state)
#[serde(default)]
pub deleted: bool,
pub attachment: Option<Attachment>,
Expand Down Expand Up @@ -1705,14 +1737,24 @@ mod test_new_client {
attachment: None,
fields: json!({"foo": "bar"}).as_object().unwrap().clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand Down Expand Up @@ -1748,14 +1790,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1766,9 +1813,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1799,14 +1848,19 @@ mod jexl_tests {
.unwrap()
.clone(),
}];
let changeset = ChangesetResponse {
changes: records.clone(),
timestamp: 42,
metadata: CollectionMetadata::default(),
};
api_client.expect_collection_url().returning(|| {
"http://rs.example.com/v1/buckets/main/collections/test-collection".into()
});
api_client.expect_get_records().returning({
let records = records.clone();
api_client.expect_fetch_changeset().returning({
let changeset = changeset.clone();
move |timestamp| {
assert_eq!(timestamp, None);
Ok(records.clone())
Ok(changeset.clone())
}
});
api_client.expect_is_prod_server().returning(|| Ok(false));
Expand All @@ -1817,9 +1871,11 @@ mod jexl_tests {
};

let mut storage = Storage::new(":memory:".into()).expect("Error creating storage");
let _ = storage.set_records(
let _ = storage.insert_collection_content(
"http://rs.example.com/v1/buckets/main/collections/test-collection",
&records,
42,
CollectionMetadata::default(),
);

let rs_client = RemoteSettingsClient::new_from_parts(
Expand Down Expand Up @@ -1902,7 +1958,12 @@ mod cached_data_tests {

let mut api_client = MockApiClient::new();
let mut storage = Storage::new(":memory:".into())?;
storage.set_records(collection_url, &vec![old_record.clone()])?;
storage.insert_collection_content(
collection_url,
&vec![old_record.clone()],
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -1962,10 +2023,21 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
let changeset = ChangesetResponse {
changes: expected_records.clone(),
timestamp: 42,
metadata: CollectionMetadata {
bucket: "main".into(),
signature: CollectionSignature {
signature: "b64sig".into(),
x5u: "http://x5u.com".into(),
},
},
};
api_client
.expect_get_records()
.expect_fetch_changeset()
.withf(|timestamp| timestamp.is_none())
.returning(move |_| Ok(expected_records.clone()));
.returning(move |_| Ok(changeset.clone()));

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2012,7 +2084,7 @@ mod cached_data_tests {
api_client.expect_is_prod_server().returning(|| Ok(true));

// Since sync_if_empty is false, get_records should not be called
// No need to set expectation for api_client.get_records
// No need to set expectation for api_client.fetch_changeset

let rs_client =
RemoteSettingsClient::new_from_parts(collection_name.to_string(), storage, api_client);
Expand Down Expand Up @@ -2046,7 +2118,12 @@ mod cached_data_tests {
attachment: None,
fields: serde_json::Map::new(),
}];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2082,7 +2159,12 @@ mod cached_data_tests {

// Set up empty cached records
let cached_records: Vec<RemoteSettingsRecord> = vec![];
storage.set_records(&collection_url, &cached_records)?;
storage.insert_collection_content(
&collection_url,
&cached_records,
42,
CollectionMetadata::default(),
)?;

api_client
.expect_collection_url()
Expand Down Expand Up @@ -2210,7 +2292,7 @@ mod test_packaged_metadata {
.returning(move || collection_url.clone());
api_client.expect_is_prod_server().returning(|| Ok(true));
api_client
.expect_get_attachment()
.expect_fetch_attachment()
.returning(move |_| Ok(mock_api_data.clone()));

let rs_client =
Expand Down
15 changes: 12 additions & 3 deletions components/remote_settings/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sql_support::open_database::{self, ConnectionInitializer};
/// 1. Bump this version.
/// 2. Add a migration from the old version to the new version in
/// [`RemoteSettingsConnectionInitializer::upgrade_from`].
pub const VERSION: u32 = 1;
pub const VERSION: u32 = 2;

/// The current remote settings database schema.
pub const SQL: &str = r#"
Expand All @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS attachments (
data BLOB NOT NULL);
CREATE TABLE IF NOT EXISTS collection_metadata (
collection_url TEXT PRIMARY KEY,
last_modified INTEGER);
last_modified INTEGER, bucket TEXT, signature TEXT, x5u TEXT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry you needed to format it like this to make the migration test work.

"#;

/// Initializes an SQLite connection to the Remote Settings database, performing
Expand All @@ -37,7 +37,7 @@ pub struct RemoteSettingsConnectionInitializer;

impl ConnectionInitializer for RemoteSettingsConnectionInitializer {
const NAME: &'static str = "remote_settings";
const END_VERSION: u32 = 1;
const END_VERSION: u32 = 2;

fn prepare(&self, conn: &Connection, _db_empty: bool) -> open_database::Result<()> {
let initial_pragmas = "
Expand All @@ -63,6 +63,15 @@ impl ConnectionInitializer for RemoteSettingsConnectionInitializer {
tx.execute("ALTER TABLE collection_metadata DROP column fetched", ())?;
Ok(())
}
1 => {
tx.execute("ALTER TABLE collection_metadata ADD COLUMN bucket TEXT", ())?;
tx.execute(
"ALTER TABLE collection_metadata ADD COLUMN signature TEXT",
(),
)?;
tx.execute("ALTER TABLE collection_metadata ADD COLUMN x5u TEXT", ())?;
Ok(())
}
_ => Err(open_database::Error::IncompatibleVersion(version)),
}
}
Expand Down
Loading