diff --git a/crates/validation/src/lib.rs b/crates/validation/src/lib.rs index 151db2e945..49526c259b 100644 --- a/crates/validation/src/lib.rs +++ b/crates/validation/src/lib.rs @@ -234,6 +234,7 @@ pub async fn validate( let built_materializations = materialization::walk_all_materializations( build_config, &built_collections, + captures, connectors, &image_inspections, materializations, diff --git a/crates/validation/src/materialization.rs b/crates/validation/src/materialization.rs index 3021bc6695..80b6370544 100644 --- a/crates/validation/src/materialization.rs +++ b/crates/validation/src/materialization.rs @@ -8,6 +8,7 @@ use std::collections::{BTreeMap, HashMap}; pub async fn walk_all_materializations( build_config: &flow::build_api::Config, built_collections: &[tables::BuiltCollection], + captures: &[tables::Capture], connectors: &C, images: &[image::Image], materializations: &[tables::Materialization], @@ -20,6 +21,7 @@ pub async fn walk_all_materializations( let mut materialization_errors = tables::Errors::new(); let validation = walk_materialization_request( built_collections, + captures, images, materialization, &mut materialization_errors, @@ -234,6 +236,7 @@ pub async fn walk_all_materializations( fn walk_materialization_request<'a>( built_collections: &'a [tables::BuiltCollection], + captures: &'a [tables::Capture], images: &[image::Image], materialization: &'a tables::Materialization, errors: &mut tables::Errors, @@ -260,6 +263,25 @@ fn walk_materialization_request<'a>( errors, ); + if let Some(source_capture) = materialization.spec.source_capture.as_ref() { + // If the materialization specifies a `sourceCapture`, validate that it + // exists. This also covers cases where the `sourceCapture` references + // a collection or materialization. + // Note that we _don't_ do any validation that the bindings actually match. + // It's permissible for the materialization and capture bindings to be different + // here, as we'll handle that scenario by creating a new publication to update + // the materialization bindings. + let _capture = reference::walk_reference( + scope, + "this materialization sourceCapture", + "capture", + &source_capture, + captures, + |c| (&c.capture.as_str(), Scope::new(&c.scope)), + errors, + )?; + } + let (connector_type, config_json, network_ports) = match endpoint { models::MaterializationEndpoint::Connector(config) => ( flow::materialization_spec::ConnectorType::Image as i32, diff --git a/crates/validation/tests/model.yaml b/crates/validation/tests/model.yaml index f1c278968b..796927bc9d 100644 --- a/crates/validation/tests/model.yaml +++ b/crates/validation/tests/model.yaml @@ -109,6 +109,7 @@ test://example/webhook-deliveries: - test://example/int-halve materializations: testing/webhook/deliveries: + sourceCapture: testing/s3-source endpoint: connector: image: webhook/connector diff --git a/crates/validation/tests/scenario_tests.rs b/crates/validation/tests/scenario_tests.rs index fd20de349a..4b00c5c3eb 100644 --- a/crates/validation/tests/scenario_tests.rs +++ b/crates/validation/tests/scenario_tests.rs @@ -250,6 +250,56 @@ driver: assert_eq!(0, fully_disabled_mat.spec.bindings.len()); } +#[test] +fn materialization_source_capture_missing() { + // The sourceCapture could be present as either a collection or a materialization, + // and the results would be the same. + let models = r##" +test://example/catalog.yaml: + materializations: + testing/missing-source-capture: + sourceCapture: testing/not-a-real-capture + endpoint: { connector: { image: s3, config: {} }} + bindings: [] + + storageMappings: + testing/: + stores: [{provider: S3, bucket: a-bucket}] + recovery/: + stores: [{provider: S3, bucket: a-bucket}] + +driver: + imageInspections: + s3: + output: '[{"Config": {}}]' + + captures: {} + derivations: {} + materializations: + testing/missing-source-capture: + connectorType: IMAGE + config: + image: s3 + config: {} + bindings: [] +"##; + + let tables = run_test( + serde_yaml::from_str(models).unwrap(), + &flow::build_api::Config { + build_id: "missing-source-capture".to_string(), + ..Default::default() + }, + ); + + assert!( + tables.errors.len() == 1, + "expected 1 error, got: {:?}", + tables.errors + ); + insta::assert_debug_snapshot!(tables.errors); +} + #[test] fn test_database_round_trip() { let tables = run_test( diff --git a/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap b/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap index bf6fe2a094..09d9d6acf1 100644 --- a/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap +++ b/crates/validation/tests/snapshots/scenario_tests__golden_all_visits.snap @@ -5917,6 +5917,7 @@ All { scope: test://example/webhook-deliveries#/materializations/testing~1webhook~1deliveries, materialization: testing/webhook/deliveries, spec: { + "sourceCapture": "testing/s3-source", "endpoint": { "connector": { "image": "webhook/connector", @@ -6064,7 +6065,7 @@ All { resource: test://example/webhook-deliveries, content_type: "CATALOG", content: ".. binary ..", - content_dom: {"import":["test://example/int-string","test://example/int-halve"],"materializations":{"testing/webhook/deliveries":{"bindings":[{"fields":{"exclude":["bit"],"include":{"str":{"pass":"through"}},"recommended":true},"resource":{"fixture":"one"},"source":{"name":"testing/int-string","partitions":{"include":{"bit":[true]}}}},{"fields":{"include":{"Len":{}},"recommended":false},"priority":3,"resource":{"fixture":"two"},"source":"testing/int-halve"}],"endpoint":{"connector":{"config":"webhook-config.yaml","image":"webhook/connector"}}}}}, + content_dom: {"import":["test://example/int-string","test://example/int-halve"],"materializations":{"testing/webhook/deliveries":{"bindings":[{"fields":{"exclude":["bit"],"include":{"str":{"pass":"through"}},"recommended":true},"resource":{"fixture":"one"},"source":{"name":"testing/int-string","partitions":{"include":{"bit":[true]}}}},{"fields":{"include":{"Len":{}},"recommended":false},"priority":3,"resource":{"fixture":"two"},"source":"testing/int-halve"}],"endpoint":{"connector":{"config":"webhook-config.yaml","image":"webhook/connector"}},"sourceCapture":"testing/s3-source"}}}, }, ], storage_mappings: [ diff --git a/crates/validation/tests/snapshots/scenario_tests__materialization_source_capture_missing.snap b/crates/validation/tests/snapshots/scenario_tests__materialization_source_capture_missing.snap new file mode 100644 index 0000000000..1e21a2dd83 --- /dev/null +++ b/crates/validation/tests/snapshots/scenario_tests__materialization_source_capture_missing.snap @@ -0,0 +1,10 @@ +--- +source: crates/validation/tests/scenario_tests.rs +expression: tables.errors +--- +[ + Error { + scope: test://example/catalog.yaml#/materializations/testing~1missing-source-capture, + error: capture testing/not-a-real-capture, referenced by this materialization sourceCapture, is not defined, + }, +]