Skip to content

Commit

Permalink
Merge pull request #6284 from mozilla/mergify/bp/release-v128/pr-6267
Browse files Browse the repository at this point in the history
Bug 1900837 - Speed up re-ingestion  (backport #6267)
  • Loading branch information
DonalMe authored Jun 27, 2024
2 parents fe09137 + d3f0c84 commit 0e4777f
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 21 deletions.
2 changes: 1 addition & 1 deletion components/suggest/benches/benchmark_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn ingest_single_provider(c: &mut Criterion) {
// 100s to run which feels like too long. `ingest-amp-mobile` also would take a around 50s.
group.sample_size(10);
for (name, benchmark) in ingest::all_benchmarks() {
group.bench_function(format!("ingest-{name}"), |b| {
group.bench_function(name.to_string(), |b| {
b.iter_batched(
|| benchmark.generate_input(),
|input| benchmark.benchmarked_code(input),
Expand Down
87 changes: 74 additions & 13 deletions components/suggest/src/benchmarks/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ pub struct IngestBenchmark {
temp_dir: tempfile::TempDir,
client: RemoteSettingsBenchmarkClient,
record_type: SuggestRecordType,
reingest: bool,
}

impl IngestBenchmark {
pub fn new(record_type: SuggestRecordType) -> Self {
pub fn new(record_type: SuggestRecordType, reingest: bool) -> Self {
let temp_dir = tempfile::tempdir().unwrap();
let store = SuggestStoreInner::new(
temp_dir.path().join("warmup.sqlite"),
Expand All @@ -33,6 +34,7 @@ impl IngestBenchmark {
client: RemoteSettingsBenchmarkClient::from(store.into_settings_client()),
temp_dir,
record_type,
reingest,
}
}
}
Expand All @@ -52,6 +54,9 @@ impl BenchmarkWithInput for IngestBenchmark {
));
let store = SuggestStoreInner::new(data_path, self.client.clone());
store.ensure_db_initialized();
if self.reingest {
store.force_reingest(self.record_type);
}
InputType(store)
}

Expand All @@ -64,23 +69,77 @@ impl BenchmarkWithInput for IngestBenchmark {
/// Get IngestBenchmark instances for all record types
pub fn all_benchmarks() -> Vec<(&'static str, IngestBenchmark)> {
vec![
("icon", IngestBenchmark::new(SuggestRecordType::Icon)),
(
"amp-wikipedia",
IngestBenchmark::new(SuggestRecordType::AmpWikipedia),
"ingest-icon",
IngestBenchmark::new(SuggestRecordType::Icon, false),
),
(
"ingest-amp-wikipedia",
IngestBenchmark::new(SuggestRecordType::AmpWikipedia, false),
),
(
"ingest-amo",
IngestBenchmark::new(SuggestRecordType::Amo, false),
),
(
"ingest-pocket",
IngestBenchmark::new(SuggestRecordType::Pocket, false),
),
(
"ingest-yelp",
IngestBenchmark::new(SuggestRecordType::Yelp, false),
),
(
"ingest-mdn",
IngestBenchmark::new(SuggestRecordType::Mdn, false),
),
(
"ingest-weather",
IngestBenchmark::new(SuggestRecordType::Weather, false),
),
(
"ingest-global-config",
IngestBenchmark::new(SuggestRecordType::GlobalConfig, false),
),
(
"ingest-amp-mobile",
IngestBenchmark::new(SuggestRecordType::AmpMobile, false),
),
(
"ingest-again-icon",
IngestBenchmark::new(SuggestRecordType::Icon, true),
),
(
"ingest-again-amp-wikipedia",
IngestBenchmark::new(SuggestRecordType::AmpWikipedia, true),
),
(
"ingest-again-amo",
IngestBenchmark::new(SuggestRecordType::Amo, true),
),
(
"ingest-again-pocket",
IngestBenchmark::new(SuggestRecordType::Pocket, true),
),
(
"ingest-again-yelp",
IngestBenchmark::new(SuggestRecordType::Yelp, true),
),
(
"ingest-again-mdn",
IngestBenchmark::new(SuggestRecordType::Mdn, true),
),
(
"ingest-again-weather",
IngestBenchmark::new(SuggestRecordType::Weather, true),
),
("amo", IngestBenchmark::new(SuggestRecordType::Amo)),
("pocket", IngestBenchmark::new(SuggestRecordType::Pocket)),
("yelp", IngestBenchmark::new(SuggestRecordType::Yelp)),
("mdn", IngestBenchmark::new(SuggestRecordType::Mdn)),
("weather", IngestBenchmark::new(SuggestRecordType::Weather)),
(
"global-config",
IngestBenchmark::new(SuggestRecordType::GlobalConfig),
"ingest-again-global-config",
IngestBenchmark::new(SuggestRecordType::GlobalConfig, true),
),
(
"amp-mobile",
IngestBenchmark::new(SuggestRecordType::AmpMobile),
"ingest-again-amp-mobile",
IngestBenchmark::new(SuggestRecordType::AmpMobile, true),
),
]
}
Expand All @@ -95,6 +154,7 @@ pub fn print_debug_ingestion_sizes() {
.ingest(SuggestIngestionConstraints::default())
.unwrap();
let table_row_counts = store.table_row_counts();
let db_size = store.db_size();
let client = store.into_settings_client();
let total_attachment_size: usize = client
.get_records_responses
Expand All @@ -112,6 +172,7 @@ pub fn print_debug_ingestion_sizes() {
"Total attachment size: {}kb",
(total_attachment_size + 500) / 1000
);
println!("Total database size: {}kb", (db_size + 500) / 1000);
println!();
println!("Database table row counts");
println!("-------------------------");
Expand Down
24 changes: 24 additions & 0 deletions components/suggest/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,18 @@ impl<'a> SuggestDao<'a> {
/// Deletes all suggestions associated with a Remote Settings record from
/// the database.
pub fn drop_suggestions(&mut self, record_id: &SuggestRecordId) -> Result<()> {
self.conn.execute_cached(
"DELETE FROM keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.conn.execute_cached(
"DELETE FROM full_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.conn.execute_cached(
"DELETE FROM prefix_keywords WHERE suggestion_id IN (SELECT id from suggestions WHERE record_id = :record_id)",
named_params! { ":record_id": record_id.as_str() },
)?;
self.conn.execute_cached(
"DELETE FROM suggestions WHERE record_id = :record_id",
named_params! { ":record_id": record_id.as_str() },
Expand Down Expand Up @@ -1000,6 +1012,18 @@ impl<'a> SuggestDao<'a> {
Ok(())
}

#[cfg(feature = "benchmark_api")]
/// Clears the value for a metadata key.
///
/// This is currently only used for the benchmarks.
pub fn clear_meta(&mut self, key: &str) -> Result<()> {
self.conn.execute_cached(
"DELETE FROM meta WHERE key = :key",
named_params! { ":key": key },
)?;
Ok(())
}

/// Updates the last ingest timestamp if the given last modified time is
/// newer than the existing one recorded.
pub fn put_last_ingest_if_newer(
Expand Down
49 changes: 43 additions & 6 deletions components/suggest/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sql_support::open_database::{self, ConnectionInitializer};
/// [`SuggestConnectionInitializer::upgrade_from`].
/// a. If suggestions should be re-ingested after the migration, call `clear_database()` inside
/// the migration.
pub const VERSION: u32 = 19;
pub const VERSION: u32 = 20;

/// The current Suggest database schema.
pub const SQL: &str = "
Expand All @@ -26,17 +26,16 @@ CREATE TABLE meta(
CREATE TABLE keywords(
keyword TEXT NOT NULL,
suggestion_id INTEGER NOT NULL REFERENCES suggestions(id) ON DELETE CASCADE,
full_keyword_id INTEGER NULL REFERENCES full_keywords(id) ON DELETE SET NULL,
suggestion_id INTEGER NOT NULL,
full_keyword_id INTEGER NULL,
rank INTEGER NOT NULL,
PRIMARY KEY (keyword, suggestion_id)
) WITHOUT ROWID;
-- full keywords are what we display to the user when a (partial) keyword matches
-- The FK to suggestion_id makes it so full keywords get deleted when the parent suggestion is deleted.
CREATE TABLE full_keywords(
id INTEGER PRIMARY KEY,
suggestion_id INTEGER NOT NULL REFERENCES suggestions(id) ON DELETE CASCADE,
suggestion_id INTEGER NOT NULL,
full_keyword TEXT NOT NULL
);
Expand All @@ -45,7 +44,7 @@ CREATE TABLE prefix_keywords(
keyword_suffix TEXT NOT NULL DEFAULT '',
confidence INTEGER NOT NULL DEFAULT 0,
rank INTEGER NOT NULL,
suggestion_id INTEGER NOT NULL REFERENCES suggestions(id) ON DELETE CASCADE,
suggestion_id INTEGER NOT NULL,
PRIMARY KEY (keyword_prefix, keyword_suffix, suggestion_id)
) WITHOUT ROWID;
Expand Down Expand Up @@ -193,6 +192,41 @@ CREATE TABLE IF NOT EXISTS dismissed_suggestions (
)?;
Ok(())
}
19 => {
// Clear the database since we're going to be dropping the keywords table and
// re-creating it
clear_database(tx)?;
tx.execute_batch(
"
-- Recreate the various keywords table to drop the foreign keys.
DROP TABLE keywords;
DROP TABLE full_keywords;
DROP TABLE prefix_keywords;
CREATE TABLE keywords(
keyword TEXT NOT NULL,
suggestion_id INTEGER NOT NULL,
full_keyword_id INTEGER NULL,
rank INTEGER NOT NULL,
PRIMARY KEY (keyword, suggestion_id)
) WITHOUT ROWID;
CREATE TABLE full_keywords(
id INTEGER PRIMARY KEY,
suggestion_id INTEGER NOT NULL,
full_keyword TEXT NOT NULL
);
CREATE TABLE prefix_keywords(
keyword_prefix TEXT NOT NULL,
keyword_suffix TEXT NOT NULL DEFAULT '',
confidence INTEGER NOT NULL DEFAULT 0,
rank INTEGER NOT NULL,
suggestion_id INTEGER NOT NULL,
PRIMARY KEY (keyword_prefix, keyword_suffix, suggestion_id)
) WITHOUT ROWID;
CREATE UNIQUE INDEX keywords_suggestion_id_rank ON keywords(suggestion_id, rank);
",
)?;
Ok(())
}
_ => Err(open_database::Error::IncompatibleVersion(version)),
}
}
Expand All @@ -203,6 +237,9 @@ pub fn clear_database(db: &Connection) -> rusqlite::Result<()> {
db.execute_batch(
"
DELETE FROM meta;
DELETE FROM keywords;
DELETE FROM full_keywords;
DELETE FROM prefix_keywords;
DELETE FROM suggestions;
DELETE FROM icons;
DELETE FROM yelp_subjects;
Expand Down
20 changes: 20 additions & 0 deletions components/suggest/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,21 @@ where
self.dbs().unwrap();
}

pub fn force_reingest(&self, ingest_record_type: SuggestRecordType) {
// To force a re-ingestion, we're going to ingest all records then forget the last
// ingestion time.
self.benchmark_ingest_records_by_type(ingest_record_type);
let writer = &self.dbs().unwrap().writer;
writer
.write(|dao| dao.clear_meta(ingest_record_type.last_ingest_meta_key().as_str()))
.unwrap();
}

pub fn benchmark_ingest_records_by_type(&self, ingest_record_type: SuggestRecordType) {
let writer = &self.dbs().unwrap().writer;
writer
.write(|dao| {
dao.clear_meta(ingest_record_type.last_ingest_meta_key().as_str())?;
self.ingest_records_by_type(
ingest_record_type,
dao,
Expand Down Expand Up @@ -615,6 +626,15 @@ where
table_names_with_counts.sort_by(|a, b| (b.1.cmp(&a.1)));
table_names_with_counts
}

pub fn db_size(&self) -> usize {
use sql_support::ConnExt;

let reader = &self.dbs().unwrap().reader;
let conn = reader.conn.lock();
conn.query_one("SELECT page_size * page_count FROM pragma_page_count(), pragma_page_size()")
.unwrap()
}
}

/// Holds a store's open connections to the Suggest database.
Expand Down
40 changes: 39 additions & 1 deletion components/support/sql/src/open_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ pub mod test_utils {
pub fn assert_schema_matches_new_database(&self) {
let db = self.open();
let new_db = open_memory_database(&self.connection_initializer).unwrap();

let table_names = get_table_names(&db);
let new_db_table_names = get_table_names(&new_db);
let extra_tables = Vec::from_iter(table_names.difference(&new_db_table_names));
Expand All @@ -433,14 +434,31 @@ pub mod test_utils {
if !new_db_extra_tables.is_empty() {
panic!("Extra tables only present in new database: {new_db_extra_tables:?}");
}

for table_name in table_names {
assert_eq!(
get_table_sql(&db, &table_name),
get_table_sql(&new_db, &table_name),
"sql differs for table: {table_name}",
);
}

let index_names = get_index_names(&db);
let new_db_index_names = get_index_names(&new_db);
let extra_index = Vec::from_iter(index_names.difference(&new_db_index_names));
if !extra_index.is_empty() {
panic!("Extra indexes not present in new database: {extra_index:?}");
}
let new_db_extra_index = Vec::from_iter(new_db_index_names.difference(&index_names));
if !new_db_extra_index.is_empty() {
panic!("Extra indexes only present in new database: {new_db_extra_index:?}");
}
for index_name in index_names {
assert_eq!(
get_index_sql(&db, &index_name),
get_index_sql(&new_db, &index_name),
"sql differs for index: {index_name}",
);
}
}

pub fn open(&self) -> Connection {
Expand All @@ -467,6 +485,26 @@ pub mod test_utils {
)
.unwrap()
}

fn get_index_names(conn: &Connection) -> HashSet<String> {
conn.query_rows_and_then(
"SELECT name FROM sqlite_master WHERE type='index'",
(),
|row| row.get(0),
)
.unwrap()
.into_iter()
.collect()
}

fn get_index_sql(conn: &Connection, index_name: &str) -> String {
conn.query_row_and_then(
"SELECT sql FROM sqlite_master WHERE name = ? AND type='index'",
(&index_name,),
|row| row.get::<_, String>(0),
)
.unwrap()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 0e4777f

Please sign in to comment.