diff --git a/crates/doc/src/ser.rs b/crates/doc/src/ser.rs index c713811509..3b968bb921 100644 --- a/crates/doc/src/ser.rs +++ b/crates/doc/src/ser.rs @@ -22,6 +22,12 @@ pub struct SerPolicy { } impl SerPolicy { + /// Maximum node depth which SerPolicy will serialize. + /// Properties and array items below this depth will be omitted. + // This depth is effectively constrained by serde_json, which has a generous + // but hard limit. We follow their lead. + const MAX_DEPTH: usize = 126; + pub const fn noop() -> Self { Self { str_truncate_after: usize::MAX, @@ -42,19 +48,19 @@ impl SerPolicy { &'p self, node: &'n N, truncation_indicator: &'s AtomicBool, - ) -> SerNode<'p, 'n, 's, N, Root> { + ) -> SerNode<'p, 'n, 's, N> { SerNode { - _marker: std::marker::PhantomData::, node, + depth: 0, policy: self, truncation_indicator: Some(truncation_indicator), } } - pub fn on<'p, 'n, N: AsNode>(&'p self, node: &'n N) -> SerNode<'p, 'n, 'static, N, Root> { + pub fn on<'p, 'n, N: AsNode>(&'p self, node: &'n N) -> SerNode<'p, 'n, 'static, N> { SerNode { - _marker: std::marker::PhantomData::, node, + depth: 0, policy: self, truncation_indicator: None, } @@ -120,27 +126,11 @@ impl SerPolicy { } } -pub struct Root; -pub struct Nested; -trait DepthIndicator { - fn is_nested() -> bool; -} -impl DepthIndicator for Root { - fn is_nested() -> bool { - false - } -} -impl DepthIndicator for Nested { - fn is_nested() -> bool { - true - } -} - -pub struct SerNode<'p, 'n, 's, N: AsNode, Depth> { - _marker: std::marker::PhantomData, - truncation_indicator: Option<&'s AtomicBool>, +pub struct SerNode<'p, 'n, 's, N: AsNode> { node: &'n N, + depth: usize, policy: &'p SerPolicy, + truncation_indicator: Option<&'s AtomicBool>, } pub struct SerLazy<'p, 'alloc, 'n, N: AsNode> { @@ -149,32 +139,33 @@ pub struct SerLazy<'p, 'alloc, 'n, N: AsNode> { } pub struct SerOwned<'p, 's> { - truncation_indicator: Option<&'s AtomicBool>, node: &'p OwnedNode, policy: &'p SerPolicy, + truncation_indicator: Option<&'s AtomicBool>, } -impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize - for SerNode<'p, 'n, 's, N, Depth> -{ +impl<'p, 'n, 's, N: AsNode> serde::Serialize for SerNode<'p, 'n, 's, N> { fn serialize(&self, serializer: S) -> Result where S: ::serde::Serializer, { match self.node.as_node() { Node::Array(arr) => { + let item_limit = if self.depth < SerPolicy::MAX_DEPTH { + self.policy.array_truncate_after + } else { + 0 + }; if let Some(indicator) = self.truncation_indicator { - if arr.len() > self.policy.array_truncate_after { + if arr.len() > item_limit { indicator.store(true, Ordering::SeqCst); } } - serializer.collect_seq(arr.iter().take(self.policy.array_truncate_after).map(|d| { - SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, - node: d, - policy: self.policy, - } + serializer.collect_seq(arr.iter().take(item_limit).map(|d| SerNode { + node: d, + depth: self.depth + 1, + policy: self.policy, + truncation_indicator: self.truncation_indicator, })) } Node::Bool(b) => serializer.serialize_bool(b), @@ -193,10 +184,12 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize Node::NegInt(n) => serializer.serialize_i64(n), Node::PosInt(n) => serializer.serialize_u64(n), Node::Object(fields) => { - let key_limit = if Depth::is_nested() { + let key_limit = if self.depth == 0 { + usize::MAX + } else if self.depth < SerPolicy::MAX_DEPTH { self.policy.nested_obj_truncate_after } else { - usize::MAX + 0 }; if let Some(indicator) = self.truncation_indicator { if fields.len() > key_limit { @@ -207,10 +200,10 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize ( field.property(), SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator.clone(), node: field.value(), + depth: self.depth + 1, policy: self.policy, + truncation_indicator: self.truncation_indicator, }, ) })) @@ -224,7 +217,7 @@ impl<'p, 'n, 's, N: AsNode, Depth: DepthIndicator> serde::Serialize } // SerNode may be packed as a FoundationDB tuple. -impl<'p, 'n, 's, N: AsNode> tuple::TuplePack for SerNode<'p, 'n, 's, N, Root> { +impl<'p, 'n, 's, N: AsNode> tuple::TuplePack for SerNode<'p, 'n, 's, N> { fn pack( &self, w: &mut W, @@ -255,18 +248,18 @@ impl serde::Serialize for SerLazy<'_, '_, '_, N> { { match &self.node { LazyNode::Heap(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: None, node: *n, + depth: 0, policy: self.policy, + truncation_indicator: None, } .serialize(serializer), LazyNode::Node(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: None, node: *n, + depth: 0, policy: self.policy, + truncation_indicator: None, } .serialize(serializer), } @@ -280,18 +273,18 @@ impl serde::Serialize for SerOwned<'_, '_> { { match &self.node { OwnedNode::Heap(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, node: n.get(), + depth: 0, policy: self.policy, + truncation_indicator: self.truncation_indicator, } .serialize(serializer), OwnedNode::Archived(n) => SerNode { - _marker: std::marker::PhantomData::, - truncation_indicator: self.truncation_indicator, node: n.get(), + depth: 0, policy: self.policy, + truncation_indicator: self.truncation_indicator, } .serialize(serializer), } @@ -439,6 +432,26 @@ mod test { assert_array_len(&result, "/a", 3); } + #[test] + fn test_ser_policy_deep_nesting() { + let (mut arr, mut obj) = (json!(1234), json!(1234)); + for _ in 0..SerPolicy::MAX_DEPTH + 20 { + arr = json!([arr]); + obj = json!({"p": obj}); + } + let input = json!({"arr": arr, "obj": obj}); + + let policy = SerPolicy { + ..SerPolicy::noop() + }; + let indicator = AtomicBool::new(false); + + // Expect that our overly nested `input` document is truncated and + // is able to be parsed by serde_json. + let _result = round_trip_serde(&policy, input, &indicator); + assert!(indicator.load(Ordering::SeqCst)); + } + fn round_trip_serde(policy: &SerPolicy, input: Value, indicator: &AtomicBool) -> Value { assert!( !indicator.load(Ordering::SeqCst), diff --git a/crates/runtime/src/capture/protocol.rs b/crates/runtime/src/capture/protocol.rs index 1135d49fd5..d443f66162 100644 --- a/crates/runtime/src/capture/protocol.rs +++ b/crates/runtime/src/capture/protocol.rs @@ -363,15 +363,23 @@ pub fn recv_connector_captured( task: &Task, txn: &mut Transaction, ) -> anyhow::Result<()> { - let response::Captured { binding, doc_json } = captured; + let response::Captured { + binding: binding_index, + doc_json, + } = captured; let (memtable, alloc, mut doc) = accumulator .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse captured document as JSON")?; + .with_context(|| { + format!( + "couldn't parse captured document as JSON (target {})", + task.bindings[binding_index as usize].collection_name + ) + })?; let uuid_ptr = &task .bindings - .get(binding as usize) + .get(binding_index as usize) .with_context(|| "invalid captured binding {binding}")? .document_uuid_ptr; @@ -380,9 +388,9 @@ pub fn recv_connector_captured( *node = doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); } } - memtable.add(binding, doc, false)?; + memtable.add(binding_index, doc, false)?; - let stats = txn.stats.entry(binding).or_default(); + let stats = txn.stats.entry(binding_index).or_default(); stats.0.docs_total += 1; stats.0.bytes_total += doc_json.len() as u64; diff --git a/crates/runtime/src/materialize/protocol.rs b/crates/runtime/src/materialize/protocol.rs index b41a803dcc..358b418402 100644 --- a/crates/runtime/src/materialize/protocol.rs +++ b/crates/runtime/src/materialize/protocol.rs @@ -212,10 +212,14 @@ pub fn recv_client_load_or_flush( }) => { let binding = &task.bindings[binding_index as usize]; - let (memtable, _alloc, doc) = - accumulator - .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse materialized document as JSON")?; + let (memtable, _alloc, doc) = accumulator + .doc_bytes_to_heap_node(doc_json.as_bytes()) + .with_context(|| { + format!( + "couldn't parse source document as JSON (source {})", + binding.collection_name + ) + })?; // Encode the binding index and then the packed key as a single Bytes. buf.put_u32(binding_index); @@ -285,6 +289,7 @@ pub async fn recv_connector_acked_or_loaded_or_flushed( saw_acknowledged: &mut bool, saw_flush: &mut bool, saw_flushed: &mut bool, + task: &Task, txn: &mut Transaction, wb: &mut rocksdb::WriteBatch, ) -> anyhow::Result> { @@ -297,9 +302,16 @@ pub async fn recv_connector_acked_or_loaded_or_flushed( }), .. }) => { + let binding = &task.bindings[binding_index as usize]; + let (memtable, _alloc, doc) = accumulator .doc_bytes_to_heap_node(doc_json.as_bytes()) - .context("couldn't parse loaded document as JSON")?; + .with_context(|| { + format!( + "couldn't parse loaded document as JSON (source {})", + binding.collection_name + ) + })?; memtable.add(binding_index, doc, true)?; diff --git a/crates/runtime/src/materialize/serve.rs b/crates/runtime/src/materialize/serve.rs index 2526163e00..c4d86b00c2 100644 --- a/crates/runtime/src/materialize/serve.rs +++ b/crates/runtime/src/materialize/serve.rs @@ -169,6 +169,7 @@ async fn serve_session( &mut saw_acknowledged, &mut saw_flush, &mut saw_flushed, + &task, &mut txn, &mut wb, )