From e973bb1719d1f998a53157ab84e686e4e3bfc43b Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Mon, 4 Nov 2024 12:38:08 -0600 Subject: [PATCH 1/2] doc: SerPolicy now enforces maximum serialization depth SerPolicy now tracks node depth and omits array items or object properties which would violate a bound on maximum document depth. This is largely motivated by serde_json: in all conditions, we never want to emit a document which cannot be parsed by serde_json. The serde_json limit of 128 is generous so we follow their lead. --- crates/doc/src/ser.rs | 109 +++++++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/crates/doc/src/ser.rs b/crates/doc/src/ser.rs index c713811509..3b968bb921 100644 --- a/crates/doc/src/ser.rs +++ b/crates/doc/src/ser.rs @@ -22,6 +22,12 @@ pub struct SerPolicy { } impl SerPolicy { + /// Maximum node depth which SerPolicy will serialize. + /// Properties and array items below this depth will be omitted. + // This depth is effectively constrained by serde_json, which has a generous + // but hard limit. We follow their lead. + const MAX_DEPTH: usize = 126; + pub const fn noop() -> Self { Self { str_truncate_after: usize::MAX, @@ -42,19 +48,19 @@ impl SerPolicy { &'p self, node: &'n N, truncation_indicator: &'s AtomicBool, - ) -> SerNode<'p, 'n, 's, N, Root> { + ) -> SerNode<'p, 'n, 's, N> { SerNode { - _marker: std::marker::PhantomData::, node, + depth: 0, policy: self, truncation_indicator: Some(truncation_indicator), } } - pub fn on<'p, 'n, N: AsNode>(&'p self, node: &'n N) -> SerNode<'p, 'n, 'static, N, Root> { + pub fn on<'p, 'n, N: AsNode>(&'p self, node: &'n N) -> SerNode<'p, 'n, 'static, N> { SerNode { - _marker: std::marker::PhantomData::, node, + depth: 0, policy: self, truncation_indicator: None, } @@ -120,27 +126,11 @@ impl SerPolicy { } } -pub struct Root; -pub struct Nested; -trait DepthIndicator { - fn is_nested() -> bool; -} -impl DepthIndicator for Root { - fn is_nested() -> bool { - false - } -} -impl DepthIndicator for Nested { - fn is_nested() -> bool { - true - } -} - -pub struct SerNode<'p, 'n, 's, N: AsNode, Depth> { - _marker: std::marker::PhantomData, - truncation_indicator: Option<&'s AtomicBool>, +pub struct SerNode<'p, 'n, 's, N: AsNode> { node: &'n N, + depth: usize, policy: &'p SerPolicy, + truncation_indicator: Option<&'s AtomicBool>, } pub struct SerLazy<'p, 'alloc, 'n, N: AsNode> { @@ -149,32 +139,33 @@ pub struct SerLazy<'p, 'alloc, 'n, N: AsNode> { } pub struct SerOwned<'p, 's> { - truncation_indicator: Option<&'s AtomicBool>, node: &'p OwnedNode, policy: &'p SerPolicy, + truncation_indicator: Option<&'s AtomicBool>, } -impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize - for SerNode<'p, 'n, 's, N, Depth> -{ +impl<'p, 'n, 's, N: AsNode> serde::Serialize for SerNode<'p, 'n, 's, N> { fn serialize(&self, serializer: S) -> Result where S: ::serde::Serializer, { match self.node.as_node() { Node::Array(arr) => { + let item_limit = if self.depth < SerPolicy::MAX_DEPTH { + self.policy.array_truncate_after + } else { + 0 + }; if let Some(indicator) = self.truncation_indicator { - if arr.len() > self.policy.array_truncate_after { + if arr.len() > item_limit { indicator.store(true, Ordering::SeqCst); } } - serializer.collect_seq(arr.iter().take(self.policy.array_truncate_after).map(|d| { - SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, - node: d, - policy: self.policy, - } + serializer.collect_seq(arr.iter().take(item_limit).map(|d| SerNode { + node: d, + depth: self.depth + 1, + policy: self.policy, + truncation_indicator: self.truncation_indicator, })) } Node::Bool(b) => serializer.serialize_bool(b), @@ -193,10 +184,12 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize Node::NegInt(n) => serializer.serialize_i64(n), Node::PosInt(n) => serializer.serialize_u64(n), Node::Object(fields) => { - let key_limit = if Depth::is_nested() { + let key_limit = if self.depth == 0 { + usize::MAX + } else if self.depth < SerPolicy::MAX_DEPTH { self.policy.nested_obj_truncate_after } else { - usize::MAX + 0 }; if let Some(indicator) = self.truncation_indicator { if fields.len() > key_limit { @@ -207,10 +200,10 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize ( field.property(), SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator.clone(), node: field.value(), + depth: self.depth + 1, policy: self.policy, + truncation_indicator: self.truncation_indicator, }, ) })) @@ -224,7 +217,7 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize } // SerNode may be packed as a FoundationDB tuple. -impl<'p, 'n, 's, N: AsNode> tuple::TuplePack for SerNode<'p, 'n, 's, N, Root> { +impl<'p, 'n, 's, N: AsNode> tuple::TuplePack for SerNode<'p, 'n, 's, N> { fn pack( &self, w: &mut W, @@ -255,18 +248,18 @@ impl serde::Serialize for SerLazy<'_, '_, '_, N> { { match &self.node { LazyNode::Heap(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: None, node: *n, + depth: 0, policy: self.policy, + truncation_indicator: None, } .serialize(serializer), LazyNode::Node(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: None, node: *n, + depth: 0, policy: self.policy, + truncation_indicator: None, } .serialize(serializer), } @@ -280,18 +273,18 @@ impl serde::Serialize for SerOwned<'_, '_> { { match &self.node { OwnedNode::Heap(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, node: n.get(), + depth: 0, policy: self.policy, + truncation_indicator: self.truncation_indicator, } .serialize(serializer), OwnedNode::Archived(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, node: n.get(), + depth: 0, policy: self.policy, + truncation_indicator: self.truncation_indicator, } .serialize(serializer), } @@ -439,6 +432,26 @@ mod test { assert_array_len(&result, "/a", 3); } + #[test] + fn test_ser_policy_deep_nesting() { + let (mut arr, mut obj) = (json!(1234), json!(1234)); + for _ in 0..SerPolicy::MAX_DEPTH + 20 { + arr = json!([arr]); + obj = json!({"p": obj}); + } + let input = json!({"arr": arr, "obj": obj}); + + let policy = SerPolicy { + ..SerPolicy::noop() + }; + let indicator = AtomicBool::new(false); + + // Expect that our overly nested `input` document is truncated and + // is able to be parsed by serde_json. + let _result = round_trip_serde(&policy, input, &indicator); + assert!(indicator.load(Ordering::SeqCst)); + } + fn round_trip_serde(policy: &SerPolicy, input: Value, indicator: &AtomicBool) -> Value { assert!( !indicator.load(Ordering::SeqCst), From 815f074ad2ec1356701d2ff8bb82cef7d3d4b36f Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Mon, 4 Nov 2024 12:40:11 -0600 Subject: [PATCH 2/2] runtime: add the source or target collection to parsing errors When these occur, it can be tedious to track down which binding caused the failure. Adding the source or target collection name is key debugging info to aide in investigations. --- crates/runtime/src/capture/protocol.rs | 18 +++++++++++++----- crates/runtime/src/materialize/protocol.rs | 22 +++++++++++++++++----- crates/runtime/src/materialize/serve.rs | 1 + 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/crates/runtime/src/capture/protocol.rs b/crates/runtime/src/capture/protocol.rs index 1135d49fd5..d443f66162 100644 --- a/crates/runtime/src/capture/protocol.rs +++ b/crates/runtime/src/capture/protocol.rs @@ -363,15 +363,23 @@ pub fn recv_connector_captured( task: &Task, txn: &mut Transaction, ) -> anyhow::Result<()> { - let response::Captured { binding, doc_json } = captured; + let response::Captured { + binding: binding_index, + doc_json, + } = captured; let (memtable, alloc, mut doc) = accumulator .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse captured document as JSON")?; + .with_context(|| { + format!( + "couldn't parse captured document as JSON (target {})", + task.bindings[binding_index as usize].collection_name + ) + })?; let uuid_ptr = &task .bindings - .get(binding as usize) + .get(binding_index as usize) .with_context(|| "invalid captured binding {binding}")? .document_uuid_ptr; @@ -380,9 +388,9 @@ pub fn recv_connector_captured( *node = doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); } } - memtable.add(binding, doc, false)?; + memtable.add(binding_index, doc, false)?; - let stats = txn.stats.entry(binding).or_default(); + let stats = txn.stats.entry(binding_index).or_default(); stats.0.docs_total += 1; stats.0.bytes_total += doc_json.len() as u64; diff --git a/crates/runtime/src/materialize/protocol.rs b/crates/runtime/src/materialize/protocol.rs index b41a803dcc..358b418402 100644 --- a/crates/runtime/src/materialize/protocol.rs +++ b/crates/runtime/src/materialize/protocol.rs @@ -212,10 +212,14 @@ pub fn recv_client_load_or_flush( }) => { let binding = &task.bindings[binding_index as usize]; - let (memtable, _alloc, doc) = - accumulator - .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse materialized document as JSON")?; + let (memtable, _alloc, doc) = accumulator + .doc_bytes_to_heap_node(doc_json.as_bytes()) + .with_context(|| { + format!( + "couldn't parse source document as JSON (source {})", + binding.collection_name + ) + })?; // Encode the binding index and then the packed key as a single Bytes. buf.put_u32(binding_index); @@ -285,6 +289,7 @@ pub async fn recv_connector_acked_or_loaded_or_flushed( saw_acknowledged: &mut bool, saw_flush: &mut bool, saw_flushed: &mut bool, + task: &Task, txn: &mut Transaction, wb: &mut rocksdb::WriteBatch, ) -> anyhow::Result> { @@ -297,9 +302,16 @@ pub async fn recv_connector_acked_or_loaded_or_flushed( }), .. }) => { + let binding = &task.bindings[binding_index as usize]; + let (memtable, _alloc, doc) = accumulator .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse loaded document as JSON")?; + .with_context(|| { + format!( + "couldn't parse loaded document as JSON (source {})", + binding.collection_name + ) + })?; memtable.add(binding_index, doc, true)?; diff --git a/crates/runtime/src/materialize/serve.rs b/crates/runtime/src/materialize/serve.rs index 2526163e00..c4d86b00c2 100644 --- a/crates/runtime/src/materialize/serve.rs +++ b/crates/runtime/src/materialize/serve.rs @@ -169,6 +169,7 @@ async fn serve_session( &mut saw_acknowledged, &mut saw_flush, &mut saw_flushed, + &task, &mut txn, &mut wb, )