Skip to content

Commit

Permalink
batch upsert to databases_store (#8283)
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu authored Oct 28, 2024
1 parent b11327b commit 8d4b4ff
Showing 1 changed file with 15 additions and 21 deletions.
36 changes: 15 additions & 21 deletions core/src/databases_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ impl DatabasesStore for PostgresDatabasesStore {
truncate: bool,
) -> Result<()> {
let pool = self.pool.clone();
let mut c = pool.get().await?;
// Start transaction.
let c = c.transaction().await?;
let c = pool.get().await?;

// Truncate table if required.
if truncate {
Expand All @@ -165,31 +163,27 @@ impl DatabasesStore for PostgresDatabasesStore {
c.execute(&stmt, &[&table_id]).await?;
}

// Prepare insertion/updation statement.
let stmt = c
.prepare(
"INSERT INTO tables_rows
(id, table_id, row_id, created, content)
VALUES (DEFAULT, $1, $2, $3, $4)
ON CONFLICT (table_id, row_id) DO UPDATE
SET content = EXCLUDED.content",
(table_id, row_id, created, content)
SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[])
ON CONFLICT (table_id, row_id) DO UPDATE
SET content = EXCLUDED.content",
)
.await?;

for row in rows {
c.execute(
&stmt,
&[
&table_id,
&row.row_id(),
&(utils::now() as i64),
&row.content().to_string(),
],
)
.await?;
}
for chunk in rows.chunks(1024) {
let now = utils::now() as i64;

c.commit().await?;
let table_ids: Vec<&str> = vec![table_id; chunk.len()];
let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect();
let createds: Vec<i64> = vec![now; chunk.len()];
let contents: Vec<String> = chunk.iter().map(|r| r.content().to_string()).collect();

c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents])
.await?;
}

Ok(())
}
Expand Down

0 comments on commit 8d4b4ff

Please sign in to comment.