Skip to content

Commit

Permalink
chore: batch queries for balances and supplies (#12)
Browse files Browse the repository at this point in the history
* fix: batch balance changes

* fix: batch supply changes
  • Loading branch information
rafaelcr authored Jul 5, 2024
1 parent 503a996 commit 98f56bd
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 73 deletions.
3 changes: 2 additions & 1 deletion src/db/cache/db_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
pg_insert_balance_changes, pg_insert_ledger_entries, pg_insert_runes,
pg_insert_supply_changes,
},
try_debug,
try_debug, try_info,
};

/// Holds rows that have yet to be inserted into the database.
Expand All @@ -37,6 +37,7 @@ impl DbCache {

/// Insert all data into the DB and clear cache.
pub async fn flush(&mut self, db_tx: &mut Transaction<'_>, ctx: &Context) {
try_info!(ctx, "Flushing cache");
if self.runes.len() > 0 {
try_debug!(ctx, "Flushing {} runes", self.runes.len());
let _ = pg_insert_runes(&self.runes, db_tx, ctx).await;
Expand Down
173 changes: 101 additions & 72 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,49 +155,71 @@ pub async fn pg_insert_supply_changes(
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> Result<bool, Error> {
let stmt = db_tx
.prepare(
"WITH previous AS (
SELECT * FROM supply_changes
WHERE rune_id = $1 AND block_height <= $2
ORDER BY block_height DESC LIMIT 1
)
INSERT INTO supply_changes
(rune_id, block_height, minted, total_mints, burned, total_burns, total_operations)
VALUES ($1, $2,
COALESCE((SELECT minted FROM previous), 0) + $3,
COALESCE((SELECT total_mints FROM previous), 0) + $4,
COALESCE((SELECT burned FROM previous), 0) + $5,
COALESCE((SELECT total_burns FROM previous), 0) + $6,
COALESCE((SELECT total_operations FROM previous), 0) + $7)
ON CONFLICT (rune_id, block_height) DO UPDATE SET
minted = EXCLUDED.minted,
total_mints = EXCLUDED.total_mints,
burned = EXCLUDED.burned,
total_burns = EXCLUDED.total_burns,
total_operations = EXCLUDED.total_operations",
)
.await
.expect("Unable to prepare statement");
for row in rows.iter() {
for chunk in rows.chunks(500) {
let mut arg_num = 1;
let mut arg_str = String::new();
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
arg_str.push_str(
format!(
"(${},${}::numeric,${}::numeric,${}::numeric,${}::numeric,${}::numeric,${}::numeric),",
arg_num,
arg_num + 1,
arg_num + 2,
arg_num + 3,
arg_num + 4,
arg_num + 5,
arg_num + 6
)
.as_str(),
);
arg_num += 7;
params.push(&row.rune_id);
params.push(&row.block_height);
params.push(&row.minted);
params.push(&row.total_mints);
params.push(&row.burned);
params.push(&row.total_burns);
params.push(&row.total_operations);
}
arg_str.pop();
match db_tx
.execute(
&stmt,
&[
&row.rune_id,
&row.block_height,
&row.minted,
&row.total_mints,
&row.burned,
&row.total_burns,
&row.total_operations,
],
.query(
&format!("
WITH changes (rune_id, block_height, minted, total_mints, burned, total_burns, total_operations) AS (VALUES {}),
previous AS (
SELECT DISTINCT ON (rune_id) *
FROM supply_changes
WHERE rune_id IN (SELECT rune_id FROM changes)
ORDER BY rune_id, block_height DESC
),
inserts AS (
SELECT c.rune_id,
c.block_height,
COALESCE(p.minted, 0) + c.minted AS minted,
COALESCE(p.total_mints, 0) + c.total_mints AS total_mints,
COALESCE(p.burned, 0) + c.burned AS burned,
COALESCE(p.total_burns, 0) + c.total_burns AS total_burns,
COALESCE(p.total_operations, 0) + c.total_operations AS total_operations
FROM changes AS c
LEFT JOIN previous AS p ON c.rune_id = p.rune_id
)
INSERT INTO supply_changes (rune_id, block_height, minted, total_mints, burned, total_burns, total_operations)
(SELECT * FROM inserts)
ON CONFLICT (rune_id, block_height) DO UPDATE SET
minted = EXCLUDED.minted,
total_mints = EXCLUDED.total_mints,
burned = EXCLUDED.burned,
total_burns = EXCLUDED.total_burns,
total_operations = EXCLUDED.total_operations
", arg_str),
&params,
)
.await
{
Ok(_) => {}
Err(e) => {
try_error!(ctx, "Error updating rune supply: {:?} {:?}", e, row);
try_error!(ctx, "Error inserting supply changes: {:?}", e);
panic!()
}
};
Expand All @@ -212,50 +234,57 @@ pub async fn pg_insert_balance_changes(
ctx: &Context,
) -> Result<bool, Error> {
let sign = if increase { "+" } else { "-" };
let stmt = db_tx
.prepare(
format!(
"WITH previous AS (
SELECT * FROM balance_changes
WHERE rune_id = $1 AND block_height <= $2 AND address = $3
ORDER BY block_height DESC LIMIT 1
for chunk in rows.chunks(500) {
let mut arg_num = 1;
let mut arg_str = String::new();
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
arg_str.push_str(
format!(
"(${},${}::numeric,${},${}::numeric,${}::bigint),",
arg_num,
arg_num + 1,
arg_num + 2,
arg_num + 3,
arg_num + 4
)
.as_str(),
);
arg_num += 5;
params.push(&row.rune_id);
params.push(&row.block_height);
params.push(&row.address);
params.push(&row.balance);
params.push(&row.total_operations);
}
arg_str.pop();
match db_tx
.query(
&format!("WITH changes (rune_id, block_height, address, balance, total_operations) AS (VALUES {}),
previous AS (
SELECT DISTINCT ON (rune_id, address) *
FROM balance_changes
WHERE (rune_id, address) IN (SELECT rune_id, address FROM changes)
ORDER BY rune_id, address, block_height DESC
),
inserts AS (
SELECT c.rune_id, c.block_height, c.address, COALESCE(p.balance, 0) {} c.balance AS balance,
COALESCE(p.total_operations, 0) + c.total_operations AS total_operations
FROM changes AS c
LEFT JOIN previous AS p ON c.rune_id = p.rune_id AND c.address = p.address
)
INSERT INTO balance_changes (rune_id, block_height, address, balance, total_operations)
VALUES ($1, $2, $3,
COALESCE((SELECT balance FROM previous), 0) {} $4,
COALESCE((SELECT total_operations FROM previous), 0) + $5)
(SELECT * FROM inserts)
ON CONFLICT (rune_id, block_height, address) DO UPDATE SET
balance = EXCLUDED.balance,
total_operations = EXCLUDED.total_operations",
sign
)
.as_str(),
)
.await
.expect("Unable to prepare statement");
for row in rows.iter() {
match db_tx
.execute(
&stmt,
&[
&row.rune_id,
&row.block_height,
&row.address,
&row.balance,
&row.total_operations,
],
total_operations = EXCLUDED.total_operations", arg_str, sign),
&params,
)
.await
{
Ok(_) => {}
Err(e) => {
try_error!(
ctx,
"Error updating balance (increase={}): {:?} {:?}",
increase,
e,
row
);
try_error!(ctx, "Error inserting balance changes: {:?}", e);
panic!()
}
};
Expand Down

0 comments on commit 98f56bd

Please sign in to comment.