Skip to content

Commit

Permalink
runtime: add the source or target collection to parsing errors
Browse files Browse the repository at this point in the history
When these occur, it can be tedious to track down which binding caused
the failure. Adding the source or target collection name is key
debugging info to aide in investigations.
  • Loading branch information
jgraettinger committed Nov 4, 2024
1 parent e973bb1 commit 815f074
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
18 changes: 13 additions & 5 deletions crates/runtime/src/capture/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,23 @@ pub fn recv_connector_captured(
task: &Task,
txn: &mut Transaction,
) -> anyhow::Result<()> {
let response::Captured { binding, doc_json } = captured;
let response::Captured {
binding: binding_index,
doc_json,
} = captured;

let (memtable, alloc, mut doc) = accumulator
.doc_bytes_to_heap_node(doc_json.as_bytes())
.context("couldn't parse captured document as JSON")?;
.with_context(|| {
format!(
"couldn't parse captured document as JSON (target {})",
task.bindings[binding_index as usize].collection_name
)
})?;

let uuid_ptr = &task
.bindings
.get(binding as usize)
.get(binding_index as usize)
.with_context(|| "invalid captured binding {binding}")?
.document_uuid_ptr;

Expand All @@ -380,9 +388,9 @@ pub fn recv_connector_captured(
*node = doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc));
}
}
memtable.add(binding, doc, false)?;
memtable.add(binding_index, doc, false)?;

let stats = txn.stats.entry(binding).or_default();
let stats = txn.stats.entry(binding_index).or_default();
stats.0.docs_total += 1;
stats.0.bytes_total += doc_json.len() as u64;

Expand Down
22 changes: 17 additions & 5 deletions crates/runtime/src/materialize/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,14 @@ pub fn recv_client_load_or_flush(
}) => {
let binding = &task.bindings[binding_index as usize];

let (memtable, _alloc, doc) =
accumulator
.doc_bytes_to_heap_node(doc_json.as_bytes())
.context("couldn't parse materialized document as JSON")?;
let (memtable, _alloc, doc) = accumulator
.doc_bytes_to_heap_node(doc_json.as_bytes())
.with_context(|| {
format!(
"couldn't parse source document as JSON (source {})",
binding.collection_name
)
})?;

// Encode the binding index and then the packed key as a single Bytes.
buf.put_u32(binding_index);
Expand Down Expand Up @@ -285,6 +289,7 @@ pub async fn recv_connector_acked_or_loaded_or_flushed(
saw_acknowledged: &mut bool,
saw_flush: &mut bool,
saw_flushed: &mut bool,
task: &Task,
txn: &mut Transaction,
wb: &mut rocksdb::WriteBatch,
) -> anyhow::Result<Option<Response>> {
Expand All @@ -297,9 +302,16 @@ pub async fn recv_connector_acked_or_loaded_or_flushed(
}),
..
}) => {
let binding = &task.bindings[binding_index as usize];

let (memtable, _alloc, doc) = accumulator
.doc_bytes_to_heap_node(doc_json.as_bytes())
.context("couldn't parse loaded document as JSON")?;
.with_context(|| {
format!(
"couldn't parse loaded document as JSON (source {})",
binding.collection_name
)
})?;

memtable.add(binding_index, doc, true)?;

Expand Down
1 change: 1 addition & 0 deletions crates/runtime/src/materialize/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ async fn serve_session<L: LogHandler>(
&mut saw_acknowledged,
&mut saw_flush,
&mut saw_flushed,
&task,
&mut txn,
&mut wb,
)
Expand Down

0 comments on commit 815f074

Please sign in to comment.