Skip to content

Commit

Permalink
Merge pull request #177 from andrewwhitehead/fix/pg-15
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra authored Sep 15, 2023
2 parents a82cb18 + c24d491 commit 7537214
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 84 deletions.
12 changes: 6 additions & 6 deletions askar-storage/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Open store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -281,7 +281,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand All @@ -298,7 +298,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Provision store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -316,7 +316,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand All @@ -327,7 +327,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Remove store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -343,7 +343,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand Down
177 changes: 120 additions & 57 deletions askar-storage/src/backend/postgres/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct PostgresStoreOptions {
pub(crate) admin_uri: String,
pub(crate) host: String,
pub(crate) name: String,
pub(crate) username: String,
pub(crate) schema: Option<String>,
}

impl PostgresStoreOptions {
Expand Down Expand Up @@ -73,8 +75,13 @@ impl PostgresStoreOptions {
} else {
DEFAULT_MIN_CONNECTIONS
};
let schema = opts.query.remove("schema");
let admin_acct = opts.query.remove("admin_account");
let admin_pass = opts.query.remove("admin_password");
let username = match opts.user.as_ref() {
"" => "postgres".to_owned(),
a => a.to_owned(),
};
let uri = opts.clone().into_uri();
if admin_acct.is_some() || admin_pass.is_some() {
if let Some(admin_acct) = admin_acct {
Expand All @@ -90,12 +97,11 @@ impl PostgresStoreOptions {
return Err(err_msg!(Input, "Missing database name"));
}
let name = path[1..].to_string();
if name.find(|c| c == '"' || c == '\0').is_some() {
return Err(err_msg!(
Input,
"Invalid character in database name: '\"' and '\\0' are disallowed"
));
if let Some(schema) = schema.as_ref() {
_validate_ident(schema, "schema")?;
}
_validate_ident(&name, "database")?;
_validate_ident(&username, "username")?;
// admin user selects the default database
opts.path = Cow::Borrowed("/postgres");
Ok(Self {
Expand All @@ -107,6 +113,8 @@ impl PostgresStoreOptions {
admin_uri: opts.into_uri(),
host,
name,
username,
schema,
})
}

Expand All @@ -119,6 +127,10 @@ impl PostgresStoreOptions {
.log_statements(log::LevelFilter::Debug)
.log_slow_statements(log::LevelFilter::Debug, Default::default());
}
if let Some(s) = self.schema.as_ref() {
// NB: schema is a validated identifier
conn_opts = conn_opts.options([("search_path", s)]);
}
PgPoolOptions::default()
.acquire_timeout(self.connect_timeout)
.idle_timeout(self.idle_timeout)
Expand All @@ -136,15 +148,18 @@ impl PostgresStoreOptions {
Err(SqlxError::Database(db_err)) if db_err.code() == Some(Cow::Borrowed("3D000")) => {
// error 3D000 is INVALID CATALOG NAME in postgres,
// this indicates that the database does not exist
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref()).await?;
// any character except NUL is allowed in an identifier.
// double quotes must be escaped, but we just disallow those
let create_q = format!("CREATE DATABASE \"{}\"", self.name);
match sqlx::query(&create_q)
.persistent(false)
.execute(&mut admin_conn)
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref())
.await
{
.map_err(err_map!(
Backend,
"Error creating admin connection to database"
))?;
// self.name and self.username are validated identifiers
let create_q = format!(
"CREATE DATABASE \"{}\" OWNER \"{}\"",
self.name, self.username
);
match admin_conn.execute(create_q.as_str()).await {
Ok(_) => (),
Err(SqlxError::Database(db_err))
if db_err.code() == Some(Cow::Borrowed("23505"))
Expand Down Expand Up @@ -181,34 +196,58 @@ impl PostgresStoreOptions {
if recreate {
// remove expected tables
reset_db(&mut txn).await?;
} else if sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema='public' AND table_name='config'",
)
.fetch_one(txn.as_mut())
.await?
== 1
{
// proceed to open, will fail if the version doesn't match
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.host,
self.name,
)
.await;
} else {
// check for presence of config table
let count = if let Some(schema) = self.schema.as_ref() {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema=?1 AND table_name='config'",
)
.persistent(false)
.bind(schema)
.fetch_one(txn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing store"))?
} else {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema=ANY (CURRENT_SCHEMAS(false)) AND table_name='config'",
)
.persistent(false)
.fetch_one(txn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing store"))?
};
if count > 0 {
// proceed to open, will fail if the version doesn't match
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.host,
self.name,
)
.await;
}
}
// else: no 'config' table, assume empty database

// no 'config' table, assume empty database

let (profile_key, enc_profile_key, store_key, store_key_ref) = unblock({
let pass_key = pass_key.into_owned();
move || init_keys(method, pass_key)
})
.await?;
let default_profile = profile.unwrap_or_else(random_profile_name);
let profile_id = init_db(txn, &default_profile, store_key_ref, enc_profile_key).await?;
let profile_id = init_db(
txn,
&default_profile,
store_key_ref,
enc_profile_key,
self.schema.as_ref().unwrap_or(&self.username),
)
.await?;
let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(default_profile.clone(), profile_id, profile_key);

Expand All @@ -235,22 +274,23 @@ impl PostgresStoreOptions {
// this indicates that the database does not exist
Err(err_msg!(NotFound, "The requested database was not found"))
}
Err(e) => Err(e.into()),
Err(err) => Err(err_msg!(Backend, "Error connecting to database pool").with_cause(err)),
}?;
open_db(pool, method, pass_key, profile, self.host, self.name).await
}

/// Remove an existing Postgres store defined by these configuration options
pub async fn remove(self) -> Result<bool, Error> {
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref()).await?;
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref())
.await
.map_err(err_map!(
Backend,
"Error creating admin connection to database"
))?;
// any character except NUL is allowed in an identifier.
// double quotes must be escaped, but we just disallow those
let drop_q = format!("DROP DATABASE \"{}\"", self.name);
match sqlx::query(&drop_q)
.persistent(false)
.execute(&mut admin_conn)
.await
{
match admin_conn.execute(drop_q.as_str()).await {
Ok(_) => Ok(true),
Err(SqlxError::Database(db_err)) if db_err.code() == Some(Cow::Borrowed("3D000")) => {
// invalid catalog name is raised if the database does not exist
Expand Down Expand Up @@ -295,25 +335,28 @@ pub(crate) async fn init_db<'t>(
profile_name: &str,
store_key_ref: String,
enc_profile_key: Vec<u8>,
schema: &str,
) -> Result<ProfileId, Error> {
txn.execute(
"
CREATE TABLE config (
format!(r#"
CREATE SCHEMA IF NOT EXISTS "{schema}";
CREATE TABLE "{schema}".config (
name TEXT NOT NULL,
value TEXT,
PRIMARY KEY(name)
);
CREATE TABLE profiles (
CREATE TABLE "{schema}".profiles (
id BIGSERIAL,
name TEXT NOT NULL,
reference TEXT NULL,
profile_key BYTEA NULL,
PRIMARY KEY(id)
);
CREATE UNIQUE INDEX ix_profile_name ON profiles(name);
CREATE UNIQUE INDEX ix_profile_name ON "{schema}".profiles(name);
CREATE TABLE items (
CREATE TABLE "{schema}".items (
id BIGSERIAL,
profile_id BIGINT NOT NULL,
kind SMALLINT NOT NULL,
Expand All @@ -322,27 +365,28 @@ pub(crate) async fn init_db<'t>(
value BYTEA NOT NULL,
expiry TIMESTAMP NULL,
PRIMARY KEY(id),
FOREIGN KEY(profile_id) REFERENCES profiles(id)
FOREIGN KEY(profile_id) REFERENCES "{schema}".profiles(id)
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE UNIQUE INDEX ix_items_uniq ON items(profile_id, kind, category, name);
CREATE UNIQUE INDEX ix_items_uniq ON "{schema}".items(profile_id, kind, category, name);
CREATE TABLE items_tags (
CREATE TABLE "{schema}".items_tags (
id BIGSERIAL,
item_id BIGINT NOT NULL,
name BYTEA NOT NULL,
value BYTEA NOT NULL,
plaintext SMALLINT NOT NULL,
PRIMARY KEY(id),
FOREIGN KEY(item_id) REFERENCES items(id)
FOREIGN KEY(item_id) REFERENCES "{schema}".items(id)
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ix_items_tags_item_id ON items_tags(item_id);
CREATE INDEX ix_items_tags_name_enc ON items_tags(name, SUBSTR(value, 1, 12)) include (item_id) WHERE plaintext=0;
CREATE INDEX ix_items_tags_name_plain ON items_tags(name, value) include (item_id) WHERE plaintext=1;
",
CREATE INDEX ix_items_tags_item_id ON "{schema}".items_tags(item_id);
CREATE INDEX ix_items_tags_name_enc ON "{schema}".items_tags(name, SUBSTR(value, 1, 12)) INCLUDE (item_id) WHERE plaintext=0;
CREATE INDEX ix_items_tags_name_plain ON "{schema}".items_tags(name, value) INCLUDE (item_id) WHERE plaintext=1;
"#).as_str(),
)
.await?;
.await
.map_err(err_map!(Backend, "Error creating database tables"))?;

sqlx::query(
"INSERT INTO config (name, value) VALUES
Expand All @@ -354,14 +398,16 @@ pub(crate) async fn init_db<'t>(
.bind(profile_name)
.bind(store_key_ref)
.execute(txn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting configuration"))?;

let profile_id =
sqlx::query_scalar("INSERT INTO profiles (name, profile_key) VALUES ($1, $2) RETURNING id")
.bind(profile_name)
.bind(enc_profile_key)
.fetch_one(txn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting default profile"))?;

txn.commit().await?;

Expand Down Expand Up @@ -399,7 +445,8 @@ pub(crate) async fn open_db(
WHERE name IN ('default_profile', 'key', 'version')"#,
)
.fetch_all(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching store configuration"))?;
for row in config {
match row.try_get(0)? {
"default_profile" => {
Expand Down Expand Up @@ -453,6 +500,22 @@ pub(crate) async fn open_db(
))
}

/// Validate a postgres identifier.
/// Any character except NUL is allowed in an identifier. Double quotes must be escaped,
/// but we just disallow those instead.
fn _validate_ident(ident: &str, name: &str) -> Result<(), Error> {
if ident.is_empty() {
Err(err_msg!(Input, "{name} identifier is empty"))
} else if ident.find(|c| c == '"' || c == '\0').is_some() {
Err(err_msg!(
Input,
"Invalid character in {name} identifier: '\"' and '\\0' are disallowed"
))
} else {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 8 additions & 2 deletions askar-storage/src/backend/postgres/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ impl TestDB {
reset_db(&mut init_txn).await?;

// create tables and add default profile
let profile_id =
init_db(init_txn, &default_profile, store_key_ref, enc_profile_key).await?;
let profile_id = init_db(
init_txn,
&default_profile,
store_key_ref,
enc_profile_key,
&opts.username,
)
.await?;

let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(default_profile.clone(), profile_id, profile_key);
Expand Down
Loading

0 comments on commit 7537214

Please sign in to comment.