Skip to content

Commit

Permalink
agent: create linked materialization publications
Browse files Browse the repository at this point in the history
Introduces basic support for linked materializations into the agent.  As
captures and materializations are published, the agent will create additional
publications as needed in order to keep materializations in sync with their
`sourceCapture`s.  We always use additional publications when modifying the
materializations so that any automated changes are directly auditable using our
existing mechanisms for that.

The basic explanation for how this works is:
We first identify any captures that may have been modified by the current
publication.  We use those capture names to query for materializations having
any of them as a `sourceCapture`. The agent will then create an additional
publication for each materialization, which updates the bindings of the
materialization to match those of the capture.

The precise meaning of "match" is:

- For each enabled binding in the capture, ensure that there is a corresponding
  enabled binding in the materialization having the same collection name
- For each binding in the materialization, if there is no corresponding enabled
  binding found in the capture, disable it.

Materialization bindings are only ever disabled, never removed, to ensure that
this process is always reversible.

There are no requirements that the user publishing the capture, or the capture
itself, have any capabilities to any of the linked materializations. There _is_
a requirement that the materialization spec have 'read' capability to the
_prefix_ of the capture.  In other words, a materialization with
`sourceCapture: acmeCo/foo/source-s3` must have read capability to all of
`acmeCo/foo/`. This may turn out to be overly broad, but it seems easier to
lift restrictions than to enforce new ones.

The spec expansion logic was itself expanded to pull in any captures that are
named by the `sourceCapture` of any materializations in the draft.  This will
be used in a subsequent commit to validate that the `sourceCapture` exists and
is a capture.
  • Loading branch information
psFried committed Aug 7, 2023
1 parent 02a6af2 commit b55e9c5
Show file tree
Hide file tree
Showing 18 changed files with 1,154 additions and 122 deletions.
20 changes: 20 additions & 0 deletions crates/agent-sql/src/drafts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
use super::{CatalogType, Id, TextJson};

/// Creates a draft for the given user and returns the draft id. A user with
/// the given email must exist, and the email must have been confirmed, or else
/// the insert will fail due to a not-null constraint when inserting into the
/// drafts table.
pub async fn create(
user_email: &str,
detail: String,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<Id> {
let row = sqlx::query!(
r#"insert into drafts (user_id, detail)
values ( (select id from auth.users where email = $1 and email_confirmed_at is not null), $2)
returning id as "id: Id";"#,
user_email,
detail
).fetch_one(txn)
.await?;
Ok(row.id)
}

pub async fn upsert_spec<S>(
draft_id: Id,
catalog_name: &str,
Expand Down
20 changes: 16 additions & 4 deletions crates/agent-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ pub mod directives;
pub mod discovers;
pub mod drafts;
pub mod evolutions;
pub mod linked_materializations;
pub mod publications;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use std::fmt::{self, Display};

mod id;
Expand Down Expand Up @@ -35,14 +37,24 @@ impl Display for CatalogType {
}
}

/// Note that the discriminants here align with those in the database type.
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type, schemars::JsonSchema,
Debug,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
Serialize,
Deserialize,
sqlx::Type,
schemars::JsonSchema,
)]
#[sqlx(type_name = "grant_capability")]
#[sqlx(rename_all = "lowercase")]
#[serde(rename_all = "camelCase")]
pub enum Capability {
Read,
Write,
Admin,
Read = 0x10,
Write = 0x20,
Admin = 0x30,
}
28 changes: 28 additions & 0 deletions crates/agent-sql/src/linked_materializations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::{Id, TextJson};
use serde_json::value::RawValue;

pub struct Row {
pub materialization_name: String,
pub materialization_spec: TextJson<Box<RawValue>>,
pub last_pub_id: Id,
}

pub async fn get_linked_materializations(
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
capture_names: Vec<String>,
) -> sqlx::Result<Vec<Row>> {
sqlx::query_as!(
Row,
r#"
select
catalog_name as materialization_name,
spec as "materialization_spec!: TextJson<Box<RawValue>>",
last_pub_id as "last_pub_id!: Id"
from live_specs
where spec_type = 'materialization' and spec->>'sourceCapture' = ANY ($1::catalog_name[])
"#,
capture_names as Vec<String>
)
.fetch_all(txn)
.await
}
30 changes: 30 additions & 0 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ pub async fn create(
Ok(rec.id)
}

/// Enqueues a new publication of the given `draft_id`.
pub async fn create_with_user_email(
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
user_email: &str,
draft_id: Id,
auto_evolve: bool,
detail: String,
) -> sqlx::Result<Id> {
let rec = sqlx::query!(
r#"insert into publications (user_id, draft_id, auto_evolve, detail)
values ((select id from auth.users where email = $1), $2, $3, $4) returning id as "id: Id";"#,
user_email, draft_id as Id, auto_evolve, detail
)
.fetch_one(txn)
.await?;

Ok(rec.id)
}

// Row is the dequeued task shape of a draft build & test operation.
#[derive(Debug)]
pub struct Row {
Expand Down Expand Up @@ -326,6 +345,15 @@ pub async fn resolve_expanded_rows(
seeds(id) as (
select id from unnest($1::flowid[]) as id
),
-- Expand seed materializations that specify a sourceCapture to the source capture.
-- We'll use this both for validation and for updating the bindings of the seed materialization.
source_captures(id) as (
select ls_captures.id
from seeds
join live_specs ls_mats on seeds.id = ls_mats.id and ls_mats.spec_type = 'materialization'
join live_specs ls_captures
on ls_mats.spec->>'sourceCapture' = ls_captures.catalog_name and ls_captures.spec_type = 'capture'
),
-- A seed collection expands to captures or materializations which bind it.
bound_captures(id) as (
select e.source_id
Expand Down Expand Up @@ -361,6 +389,8 @@ pub async fn resolve_expanded_rows(
select id from bound_captures
union
select id from bound_materializations
union
select id from source_captures
union
select id from seeds
),
Expand Down
62 changes: 62 additions & 0 deletions crates/agent-sql/tests/expansions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,68 @@ async fn test_capture_expansions() {
.await;
}

#[tokio::test]
async fn test_materialization_source_capture_expansions() {
let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL)
.await
.expect("connect");

let mut txn = conn.begin().await.unwrap();

sqlx::query(
r#"
with specs as (
insert into live_specs (id, catalog_name, spec, spec_type) values
('a100000000000000', 'a/collection', '{}', 'collection'),
('a200000000000000', 'a/capture', '{}', 'capture'),
('b100000000000000', 'a/happy/materialization', '{"sourceCapture": "a/capture"}', 'materialization'),
('b200000000000000', 'a/badref/notreal', '{"sourceCapture": "a/notreal"}', 'materialization'),
('b300000000000000', 'a/badref/badspectype', '{"sourceCapture": "a/collection"}', 'materialization'),
('b400000000000000', 'a/badref/badjsontype', '{"sourceCapture": 1.234}', 'materialization')
)
select 1;
"#,
)
.execute(&mut txn)
.await
.unwrap();

assert_set(
vec![0xb1],
vec![0xa2],
&mut txn,
"materialization expands to valid sourceCapture",
)
.await;

// If the sourceCapture references something that doesn't exist, was deleted, or isn't a capture, then
// it simply doesn't get expanded. We handle those scenarios in the validation crate.
assert_set(
vec![0xb2],
vec![],
&mut txn,
"materialization with non-existant sourceCapture",
)
.await;

assert_set(
vec![0xb3],
vec![],
&mut txn,
"materialization with sourceCapture of wrong spec type",
)
.await;

assert_set(
vec![0xb4],
vec![],
&mut txn,
"materialization with invalid sourceCapture json type",
)
.await;
}

#[tokio::test]
async fn test_materialization_expansions() {
let mut conn = sqlx::postgres::PgConnection::connect(&FIXED_DATABASE_URL)
Expand Down
125 changes: 23 additions & 102 deletions crates/agent/src/evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use agent_sql::{
};
use anyhow::Context;
use itertools::Itertools;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{BTreeMap, BTreeSet, HashMap};
Expand Down Expand Up @@ -401,6 +399,13 @@ fn evolve_collection(
format!("updating resource spec of '{mat_name}' binding '{old_collection}'")
})?;
binding.resource = models::RawValue::from_value(&resource_spec);
tracing::debug!(
%mat_name,
%old_collection_name,
%re_create_collection,
?new_collection_name,
new_resource_spec = %resource_spec,
"updated materialization binding");
}
}

Expand All @@ -426,7 +431,7 @@ fn evolve_collection(
}
}

tracing::debug!(?updated_materializations, ?updated_captures, %new_name, old_name=%old_collection_name, "renaming collection in draft");
tracing::debug!(?updated_materializations, ?updated_captures, %re_create_collection, %new_name, old_name=%old_collection_name, "evolved collection in draft");

Ok(EvolvedCollection {
old_name: old_collection.into(),
Expand All @@ -446,38 +451,7 @@ fn has_mat_binding(spec: &models::MaterializationDef, collection: &models::Colle
.any(|b| b.source.collection() == collection)
}

lazy_static! {
static ref NAME_VERSION_RE: Regex = Regex::new(r#".*[_-][vV](\d+)$"#).unwrap();
}

/// Takes an existing name and returns a new name with an incremeted version suffix.
/// The name `foo` will become `foo_v2`, and `foo_v2` will become `foo_v3` and so on.
pub fn next_name(current_name: &str) -> String {
// Does the name already have a version suffix?
// We try to work with whatever suffix is already present. This way, if a user
// is starting with a collection like `acmeCo/foo-V3`, they'll end up with
// `acmeCo/foo-V4` instead of `acmeCo/foo_v4`.
if let Some(capture) = NAME_VERSION_RE.captures_iter(current_name).next() {
if let Ok(current_version_num) = capture[1].parse::<u32>() {
// wrapping_add is just to ensure we don't panic if someone passes
// a naughty name with a u32::MAX version.
return format!(
"{}{}",
current_name.strip_suffix(&capture[1]).unwrap(),
// We don't really care what the collection name ends up as if
// the old name is suffixed by "V-${u32::MAX}", as long as we don't panic.
current_version_num.wrapping_add(1)
);
}
}
// We always use an underscore as the separator. This might look a bit
// unseemly if dashes are already used as separators elsewhere in the
// name, but any sort of heuristic for determining whether to use dashes
// or underscores is rife with edge cases and doesn't seem worth the
// complexity.
format!("{current_name}_v2")
}

/// A helper that's specially suited for the purpose of evolutions.
struct ResourceSpecUpdater {
pointers_by_image: HashMap<String, doc::Pointer>,
}
Expand All @@ -495,25 +469,14 @@ impl ResourceSpecUpdater {
if pointers_by_image.contains_key(&conn.image) {
continue;
}
let image = conn.image.as_str();
let Some(colon_idx) = conn.image.find(':') else {
anyhow::bail!("connector image '{image}' is missing a version tag");
};
let image_name = &image[..colon_idx];
let image_tag = &image[colon_idx..];

let schema_json = agent_sql::evolutions::fetch_resource_spec_schema(
image_name.to_owned(),
image_tag.to_owned(),
txn,
)
.await?
.ok_or_else(|| anyhow::anyhow!("no resource spec schema found for image '{image}"))?;
let pointer = Self::pointer_for_schema(schema_json.get())
let image = conn.image.clone();
let schema_json =
crate::resource_configs::fetch_resource_spec_schema(&conn.image, txn).await?;
let pointer = crate::resource_configs::pointer_for_schema(schema_json.get())
.with_context(|| format!("inspecting resource_spec_schema for image '{image}'"))?;

tracing::debug!(%image_name, %image_tag, %pointer, "parsed resource spec schema");
pointers_by_image.insert(image.to_owned(), pointer);
tracing::debug!(%image, %pointer, "parsed resource spec schema");
pointers_by_image.insert(image, pointer);
}
Ok(ResourceSpecUpdater { pointers_by_image })
}
Expand All @@ -528,58 +491,16 @@ impl ResourceSpecUpdater {
resource_spec: &mut Value,
) -> anyhow::Result<()> {
if let Some(pointer) = self.pointers_by_image.get(image_name) {
let Some(existing) = pointer
.query(&*resource_spec)
.and_then(|v| v.as_str()) else {
// Log noisily about this, but it's not clear that it shoudl be an error. It's
// possible, at least in principle, that a connector allows the field to be empty
// and will apply it's own default value based on the collection name.
tracing::warn!(%materialization_name, ?new_collection_name, %pointer, "not updating resource spec because there is no existing value at that location");
return Ok(())
};

let new_val = new_collection_name
.map(|n| {
n.rsplit('/')
.next()
.expect("collection name must contain a slash")
.to_owned()
})
.unwrap_or_else(|| next_name(existing).to_owned());

if let Some(prev_val) = pointer.create_value(resource_spec) {
tracing::info!(%prev_val, %new_val, %materialization_name, "updating resource spec");
*prev_val = Value::String(new_val);
} else {
anyhow::bail!("creating x-collection-name JSON location failed");
}
crate::resource_configs::update_materialization_resource_spec(
materialization_name,
resource_spec,
pointer,
new_collection_name,
)
} else {
anyhow::bail!(
Err(anyhow::anyhow!(
"no resource spec x-collection-name location exists for image '{image_name}'"
);
}
Ok(())
}

/// Runs inference on the given schema and searches for a location within the resource spec
/// that bears the `x-collection-name` annotation. Returns the pointer to that location, or an
/// error if no such location exists. Errors from parsing the schema are returned directly.
fn pointer_for_schema(schema_json: &str) -> anyhow::Result<doc::Pointer> {
// While all known connector resource spec schemas are self-contained, we don't
// actually do anything to guarantee that they are. This function may fail in that case.
let schema = doc::validation::build_bundle(schema_json)?;
let mut builder = doc::SchemaIndexBuilder::new();
builder.add(&schema)?;
let index = builder.into_index();
let shape = doc::inference::Shape::infer(&schema, &index);

for (ptr, _, prop_shape, _) in shape.locations() {
if prop_shape.annotations.contains_key("x-collection-name") {
return Ok(doc::Pointer::from_str(&ptr));
}
))
}
Err(anyhow::anyhow!(
"resource spec schema does not contain any location annotated with x-collection-name"
))
}
}
Loading

0 comments on commit b55e9c5

Please sign in to comment.