Skip to content

Commit

Permalink
validation: ensure that materialization sourceCapture exists
Browse files Browse the repository at this point in the history
Introduces a new build validation of materializations, which ensures that the
`sourceCapture` of a materialization exists and is a capture.  This relies on
the updated spec-expansion logic introduced in the previous commit, which
ensures that any captures used as a `sourceCapture` will be included in the
build of the materialization.
  • Loading branch information
psFried committed Aug 7, 2023
1 parent b872105 commit 00d6d58
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub async fn validate<C: Connectors, P: ControlPlane>(
let built_materializations = materialization::walk_all_materializations(
build_config,
&built_collections,
captures,
connectors,
&image_inspections,
materializations,
Expand Down
22 changes: 22 additions & 0 deletions crates/validation/src/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::{BTreeMap, HashMap};
pub async fn walk_all_materializations<C: Connectors>(
build_config: &flow::build_api::Config,
built_collections: &[tables::BuiltCollection],
captures: &[tables::Capture],
connectors: &C,
images: &[image::Image],
materializations: &[tables::Materialization],
Expand All @@ -20,6 +21,7 @@ pub async fn walk_all_materializations<C: Connectors>(
let mut materialization_errors = tables::Errors::new();
let validation = walk_materialization_request(
built_collections,
captures,
images,
materialization,
&mut materialization_errors,
Expand Down Expand Up @@ -234,6 +236,7 @@ pub async fn walk_all_materializations<C: Connectors>(

fn walk_materialization_request<'a>(
built_collections: &'a [tables::BuiltCollection],
captures: &'a [tables::Capture],
images: &[image::Image],
materialization: &'a tables::Materialization,
errors: &mut tables::Errors,
Expand All @@ -260,6 +263,25 @@ fn walk_materialization_request<'a>(
errors,
);

if let Some(source_capture) = materialization.spec.source_capture.as_ref() {
// If the materialization specifies a `sourceCapture`, validate that it
// exists. This also covers cases where the `sourceCapture` references
// a collection or materialization.
// Note that we _don't_ do any validation that the bindings actually match.
// It's permissible for the materialization and capture bindings to be different
// here, as we'll handle that scenario by creating a new publication to update
// the materialization bindings.
let _capture = reference::walk_reference(
scope,
"this materialization sourceCapture",
"capture",
&source_capture,
captures,
|c| (&c.capture.as_str(), Scope::new(&c.scope)),
errors,
)?;
}

let (connector_type, config_json, network_ports) = match endpoint {
models::MaterializationEndpoint::Connector(config) => (
flow::materialization_spec::ConnectorType::Image as i32,
Expand Down
1 change: 1 addition & 0 deletions crates/validation/tests/model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ test://example/webhook-deliveries:
- test://example/int-halve
materializations:
testing/webhook/deliveries:
sourceCapture: testing/s3-source
endpoint:
connector:
image: webhook/connector
Expand Down
50 changes: 50 additions & 0 deletions crates/validation/tests/scenario_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,56 @@ driver:
assert_eq!(0, fully_disabled_mat.spec.bindings.len());
}

#[test]
fn materialization_source_capture_missing() {
// The sourceCapture could be present as either a collection or a materialization,
// and the results would be the same.
let models = r##"
test://example/catalog.yaml:
materializations:
testing/missing-source-capture:
sourceCapture: testing/not-a-real-capture
endpoint: { connector: { image: s3, config: {} }}
bindings: []
storageMappings:
testing/:
stores: [{provider: S3, bucket: a-bucket}]
recovery/:
stores: [{provider: S3, bucket: a-bucket}]
driver:
imageInspections:
s3:
output: '[{"Config": {}}]'
captures: {}
derivations: {}
materializations:
testing/missing-source-capture:
connectorType: IMAGE
config:
image: s3
config: {}
bindings: []
"##;

let tables = run_test(
serde_yaml::from_str(models).unwrap(),
&flow::build_api::Config {
build_id: "missing-source-capture".to_string(),
..Default::default()
},
);

assert!(
tables.errors.len() == 1,
"expected 1 error, got: {:?}",
tables.errors
);
insta::assert_debug_snapshot!(tables.errors);
}

#[test]
fn test_database_round_trip() {
let tables = run_test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5917,6 +5917,7 @@ All {
scope: test://example/webhook-deliveries#/materializations/testing~1webhook~1deliveries,
materialization: testing/webhook/deliveries,
spec: {
"sourceCapture": "testing/s3-source",
"endpoint": {
"connector": {
"image": "webhook/connector",
Expand Down Expand Up @@ -6064,7 +6065,7 @@ All {
resource: test://example/webhook-deliveries,
content_type: "CATALOG",
content: ".. binary ..",
content_dom: {"import":["test://example/int-string","test://example/int-halve"],"materializations":{"testing/webhook/deliveries":{"bindings":[{"fields":{"exclude":["bit"],"include":{"str":{"pass":"through"}},"recommended":true},"resource":{"fixture":"one"},"source":{"name":"testing/int-string","partitions":{"include":{"bit":[true]}}}},{"fields":{"include":{"Len":{}},"recommended":false},"priority":3,"resource":{"fixture":"two"},"source":"testing/int-halve"}],"endpoint":{"connector":{"config":"webhook-config.yaml","image":"webhook/connector"}}}}},
content_dom: {"import":["test://example/int-string","test://example/int-halve"],"materializations":{"testing/webhook/deliveries":{"bindings":[{"fields":{"exclude":["bit"],"include":{"str":{"pass":"through"}},"recommended":true},"resource":{"fixture":"one"},"source":{"name":"testing/int-string","partitions":{"include":{"bit":[true]}}}},{"fields":{"include":{"Len":{}},"recommended":false},"priority":3,"resource":{"fixture":"two"},"source":"testing/int-halve"}],"endpoint":{"connector":{"config":"webhook-config.yaml","image":"webhook/connector"}},"sourceCapture":"testing/s3-source"}}},
},
],
storage_mappings: [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
source: crates/validation/tests/scenario_tests.rs
expression: tables.errors
---
[
Error {
scope: test://example/catalog.yaml#/materializations/testing~1missing-source-capture,
error: capture testing/not-a-real-capture, referenced by this materialization sourceCapture, is not defined,
},
]

0 comments on commit 00d6d58

Please sign in to comment.