Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres provisioning fixes #177

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions askar-storage/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Open store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -281,7 +281,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand All @@ -298,7 +298,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Provision store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -316,7 +316,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand All @@ -327,7 +327,7 @@ impl<'a> ManageBackend<'a> for &'a str {
let opts = self.into_options()?;
debug!("Remove store with options: {:?}", &opts);

match opts.schema.as_ref() {
match opts.scheme.as_ref() {
#[cfg(feature = "postgres")]
"postgres" => {
let opts = postgres::PostgresStoreOptions::new(opts)?;
Expand All @@ -343,7 +343,7 @@ impl<'a> ManageBackend<'a> for &'a str {
_ => Err(err_msg!(
Unsupported,
"Unsupported backend: {}",
&opts.schema
&opts.scheme
)),
}
})
Expand Down
177 changes: 120 additions & 57 deletions askar-storage/src/backend/postgres/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct PostgresStoreOptions {
pub(crate) admin_uri: String,
pub(crate) host: String,
pub(crate) name: String,
pub(crate) username: String,
pub(crate) schema: Option<String>,
}

impl PostgresStoreOptions {
Expand Down Expand Up @@ -73,8 +75,13 @@ impl PostgresStoreOptions {
} else {
DEFAULT_MIN_CONNECTIONS
};
let schema = opts.query.remove("schema");
let admin_acct = opts.query.remove("admin_account");
let admin_pass = opts.query.remove("admin_password");
let username = match opts.user.as_ref() {
"" => "postgres".to_owned(),
a => a.to_owned(),
};
let uri = opts.clone().into_uri();
if admin_acct.is_some() || admin_pass.is_some() {
if let Some(admin_acct) = admin_acct {
Expand All @@ -90,12 +97,11 @@ impl PostgresStoreOptions {
return Err(err_msg!(Input, "Missing database name"));
}
let name = path[1..].to_string();
if name.find(|c| c == '"' || c == '\0').is_some() {
return Err(err_msg!(
Input,
"Invalid character in database name: '\"' and '\\0' are disallowed"
));
if let Some(schema) = schema.as_ref() {
_validate_ident(schema, "schema")?;
}
_validate_ident(&name, "database")?;
_validate_ident(&username, "username")?;
// admin user selects the default database
opts.path = Cow::Borrowed("/postgres");
Ok(Self {
Expand All @@ -107,6 +113,8 @@ impl PostgresStoreOptions {
admin_uri: opts.into_uri(),
host,
name,
username,
schema,
})
}

Expand All @@ -119,6 +127,10 @@ impl PostgresStoreOptions {
.log_statements(log::LevelFilter::Debug)
.log_slow_statements(log::LevelFilter::Debug, Default::default());
}
if let Some(s) = self.schema.as_ref() {
// NB: schema is a validated identifier
conn_opts = conn_opts.options([("search_path", s)]);
}
PgPoolOptions::default()
.acquire_timeout(self.connect_timeout)
.idle_timeout(self.idle_timeout)
Expand All @@ -136,15 +148,18 @@ impl PostgresStoreOptions {
Err(SqlxError::Database(db_err)) if db_err.code() == Some(Cow::Borrowed("3D000")) => {
// error 3D000 is INVALID CATALOG NAME in postgres,
// this indicates that the database does not exist
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref()).await?;
// any character except NUL is allowed in an identifier.
// double quotes must be escaped, but we just disallow those
let create_q = format!("CREATE DATABASE \"{}\"", self.name);
match sqlx::query(&create_q)
.persistent(false)
.execute(&mut admin_conn)
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref())
.await
{
.map_err(err_map!(
Backend,
"Error creating admin connection to database"
))?;
// self.name and self.username are validated identifiers
let create_q = format!(
"CREATE DATABASE \"{}\" OWNER \"{}\"",
self.name, self.username
);
match admin_conn.execute(create_q.as_str()).await {
Ok(_) => (),
Err(SqlxError::Database(db_err))
if db_err.code() == Some(Cow::Borrowed("23505"))
Expand Down Expand Up @@ -181,34 +196,58 @@ impl PostgresStoreOptions {
if recreate {
// remove expected tables
reset_db(&mut txn).await?;
} else if sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema='public' AND table_name='config'",
)
.fetch_one(txn.as_mut())
.await?
== 1
{
// proceed to open, will fail if the version doesn't match
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.host,
self.name,
)
.await;
} else {
// check for presence of config table
let count = if let Some(schema) = self.schema.as_ref() {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema=?1 AND table_name='config'",
)
.persistent(false)
.bind(schema)
.fetch_one(txn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing store"))?
} else {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema=ANY (CURRENT_SCHEMAS(false)) AND table_name='config'",
)
.persistent(false)
.fetch_one(txn.as_mut())
.await
.map_err(err_map!(Backend, "Error checking for existing store"))?
};
if count > 0 {
// proceed to open, will fail if the version doesn't match
return open_db(
conn_pool,
Some(method),
pass_key,
profile,
self.host,
self.name,
)
.await;
}
}
// else: no 'config' table, assume empty database

// no 'config' table, assume empty database

let (profile_key, enc_profile_key, store_key, store_key_ref) = unblock({
let pass_key = pass_key.into_owned();
move || init_keys(method, pass_key)
})
.await?;
let default_profile = profile.unwrap_or_else(random_profile_name);
let profile_id = init_db(txn, &default_profile, store_key_ref, enc_profile_key).await?;
let profile_id = init_db(
txn,
&default_profile,
store_key_ref,
enc_profile_key,
self.schema.as_ref().unwrap_or(&self.username),
)
.await?;
let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(default_profile.clone(), profile_id, profile_key);

Expand All @@ -235,22 +274,23 @@ impl PostgresStoreOptions {
// this indicates that the database does not exist
Err(err_msg!(NotFound, "The requested database was not found"))
}
Err(e) => Err(e.into()),
Err(err) => Err(err_msg!(Backend, "Error connecting to database pool").with_cause(err)),
}?;
open_db(pool, method, pass_key, profile, self.host, self.name).await
}

/// Remove an existing Postgres store defined by these configuration options
pub async fn remove(self) -> Result<bool, Error> {
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref()).await?;
let mut admin_conn = PgConnection::connect(self.admin_uri.as_ref())
.await
.map_err(err_map!(
Backend,
"Error creating admin connection to database"
))?;
// any character except NUL is allowed in an identifier.
// double quotes must be escaped, but we just disallow those
let drop_q = format!("DROP DATABASE \"{}\"", self.name);
match sqlx::query(&drop_q)
.persistent(false)
.execute(&mut admin_conn)
.await
{
match admin_conn.execute(drop_q.as_str()).await {
Ok(_) => Ok(true),
Err(SqlxError::Database(db_err)) if db_err.code() == Some(Cow::Borrowed("3D000")) => {
// invalid catalog name is raised if the database does not exist
Expand Down Expand Up @@ -295,25 +335,28 @@ pub(crate) async fn init_db<'t>(
profile_name: &str,
store_key_ref: String,
enc_profile_key: Vec<u8>,
schema: &str,
) -> Result<ProfileId, Error> {
txn.execute(
"
CREATE TABLE config (
format!(r#"
CREATE SCHEMA IF NOT EXISTS "{schema}";

CREATE TABLE "{schema}".config (
name TEXT NOT NULL,
value TEXT,
PRIMARY KEY(name)
);

CREATE TABLE profiles (
CREATE TABLE "{schema}".profiles (
id BIGSERIAL,
name TEXT NOT NULL,
reference TEXT NULL,
profile_key BYTEA NULL,
PRIMARY KEY(id)
);
CREATE UNIQUE INDEX ix_profile_name ON profiles(name);
CREATE UNIQUE INDEX ix_profile_name ON "{schema}".profiles(name);

CREATE TABLE items (
CREATE TABLE "{schema}".items (
id BIGSERIAL,
profile_id BIGINT NOT NULL,
kind SMALLINT NOT NULL,
Expand All @@ -322,27 +365,28 @@ pub(crate) async fn init_db<'t>(
value BYTEA NOT NULL,
expiry TIMESTAMP NULL,
PRIMARY KEY(id),
FOREIGN KEY(profile_id) REFERENCES profiles(id)
FOREIGN KEY(profile_id) REFERENCES "{schema}".profiles(id)
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE UNIQUE INDEX ix_items_uniq ON items(profile_id, kind, category, name);
CREATE UNIQUE INDEX ix_items_uniq ON "{schema}".items(profile_id, kind, category, name);

CREATE TABLE items_tags (
CREATE TABLE "{schema}".items_tags (
id BIGSERIAL,
item_id BIGINT NOT NULL,
name BYTEA NOT NULL,
value BYTEA NOT NULL,
plaintext SMALLINT NOT NULL,
PRIMARY KEY(id),
FOREIGN KEY(item_id) REFERENCES items(id)
FOREIGN KEY(item_id) REFERENCES "{schema}".items(id)
ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX ix_items_tags_item_id ON items_tags(item_id);
CREATE INDEX ix_items_tags_name_enc ON items_tags(name, SUBSTR(value, 1, 12)) include (item_id) WHERE plaintext=0;
CREATE INDEX ix_items_tags_name_plain ON items_tags(name, value) include (item_id) WHERE plaintext=1;
",
CREATE INDEX ix_items_tags_item_id ON "{schema}".items_tags(item_id);
CREATE INDEX ix_items_tags_name_enc ON "{schema}".items_tags(name, SUBSTR(value, 1, 12)) INCLUDE (item_id) WHERE plaintext=0;
CREATE INDEX ix_items_tags_name_plain ON "{schema}".items_tags(name, value) INCLUDE (item_id) WHERE plaintext=1;
"#).as_str(),
)
.await?;
.await
.map_err(err_map!(Backend, "Error creating database tables"))?;

sqlx::query(
"INSERT INTO config (name, value) VALUES
Expand All @@ -354,14 +398,16 @@ pub(crate) async fn init_db<'t>(
.bind(profile_name)
.bind(store_key_ref)
.execute(txn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting configuration"))?;

let profile_id =
sqlx::query_scalar("INSERT INTO profiles (name, profile_key) VALUES ($1, $2) RETURNING id")
.bind(profile_name)
.bind(enc_profile_key)
.fetch_one(txn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error inserting default profile"))?;

txn.commit().await?;

Expand Down Expand Up @@ -399,7 +445,8 @@ pub(crate) async fn open_db(
WHERE name IN ('default_profile', 'key', 'version')"#,
)
.fetch_all(conn.as_mut())
.await?;
.await
.map_err(err_map!(Backend, "Error fetching store configuration"))?;
for row in config {
match row.try_get(0)? {
"default_profile" => {
Expand Down Expand Up @@ -453,6 +500,22 @@ pub(crate) async fn open_db(
))
}

/// Validate a postgres identifier.
/// Any character except NUL is allowed in an identifier. Double quotes must be escaped,
/// but we just disallow those instead.
fn _validate_ident(ident: &str, name: &str) -> Result<(), Error> {
if ident.is_empty() {
Err(err_msg!(Input, "{name} identifier is empty"))
} else if ident.find(|c| c == '"' || c == '\0').is_some() {
Err(err_msg!(
Input,
"Invalid character in {name} identifier: '\"' and '\\0' are disallowed"
))
} else {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 8 additions & 2 deletions askar-storage/src/backend/postgres/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ impl TestDB {
reset_db(&mut init_txn).await?;

// create tables and add default profile
let profile_id =
init_db(init_txn, &default_profile, store_key_ref, enc_profile_key).await?;
let profile_id = init_db(
init_txn,
&default_profile,
store_key_ref,
enc_profile_key,
&opts.username,
)
.await?;

let mut key_cache = KeyCache::new(store_key);
key_cache.add_profile_mut(default_profile.clone(), profile_id, profile_key);
Expand Down
Loading