From 93ba814e1a348d2a1b85a3f076fbcd2d6e550ca5 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 25 Oct 2024 11:19:25 -0400 Subject: [PATCH] dekaf: Implement `DeletionMode` to allow representing deletions as a Kafka header instead of a tombstone --- crates/dekaf/src/connector.rs | 18 +++++++++++ crates/dekaf/src/lib.rs | 12 +++++-- crates/dekaf/src/read.rs | 61 +++++++++++++++++++++++++++-------- crates/dekaf/src/session.rs | 20 ++++++++---- 4 files changed, 87 insertions(+), 24 deletions(-) diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 253bdbb2f9..4edcf44849 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -4,6 +4,19 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)] +#[serde(rename_all = "snake_case")] +pub enum DeletionMode { + Default, + Header, +} + +impl Default for DeletionMode { + fn default() -> Self { + Self::Default + } +} + /// Configures the behavior of a whole dekaf task #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] pub struct DekafConfig { @@ -15,6 +28,11 @@ pub struct DekafConfig { // #[schemars(extend("secret" = true))] #[schemars(schema_with = "token_secret")] pub token: String, + /// How to handle deletion events. "Default" emits them as regular Kafka + /// tombstones with null values, and "Header" emits then as a kafka document + /// with empty string and `_is_deleted` header set to `1`. Setting this value + /// will also cause all other non-deletions to have an `_is_deleted` header of `0`. + pub deletions: DeletionMode, } /// Configures a particular binding in a Dekaf-type materialization diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 3648532a93..069e7c4bc9 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -23,7 +23,7 @@ mod api_client; pub use api_client::KafkaApiClient; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; -use connector::DekafConfig; +use connector::{DekafConfig, DeletionMode}; use flow_client::client::{refresh_authorizations, RefreshToken}; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use serde::{Deserialize, Serialize}; @@ -42,10 +42,13 @@ pub struct App { pub client_base: flow_client::Client, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Copy)] +#[serde(deny_unknown_fields)] pub struct DeprecatedConfigOptions { #[serde(default = "bool::")] pub strict_topic_names: bool, + #[serde(default)] + pub deletions: DeletionMode, } pub struct Authenticated { @@ -103,7 +106,9 @@ impl App { let claims = flow_client::client::client_claims(&client)?; - if models::Materialization::regex().is_match(username.as_ref()) { + if models::Materialization::regex().is_match(username.as_ref()) + && !username.starts_with("{") + { Ok(Authenticated { client, access_token: access, @@ -119,6 +124,7 @@ impl App { client, task_config: DekafConfig { strict_topic_names: config.strict_topic_names, + deletions: config.deletions, token: "".to_string(), }, access_token: access, diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 4b676bf44d..19af1bc5b4 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -1,11 +1,15 @@ use super::{Collection, Partition}; +use crate::connector::DeletionMode; use anyhow::bail; use bytes::{Buf, BufMut, BytesMut}; use doc::AsNode; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; -use kafka_protocol::records::{Compression, TimestampType}; +use kafka_protocol::{ + protocol::StrBytes, + records::{Compression, TimestampType}, +}; use lz4_flex::frame::BlockMode; pub struct Read { @@ -31,6 +35,8 @@ pub struct Read { // Offset before which no documents should be emitted offset_start: i64, + deletes: DeletionMode, + pub(crate) rewrite_offsets_from: Option, } @@ -50,6 +56,9 @@ pub enum ReadTarget { } const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size +const DELETION_HEADER: &str = "_is_deleted"; +const DELETION_VAL_DELETED: &[u8] = &[1u8]; +const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8]; impl Read { pub fn new( @@ -60,6 +69,7 @@ impl Read { key_schema_id: u32, value_schema_id: u32, rewrite_offsets_from: Option, + deletes: DeletionMode, ) -> Self { let (not_before_sec, _) = collection.not_before.to_unix(); @@ -94,6 +104,7 @@ impl Read { journal_name: partition.spec.name.clone(), rewrite_offsets_from, + deletes, offset_start: offset, } } @@ -257,18 +268,26 @@ impl Read { }; // Encode the value. - let value = if is_control || is_deletion { - None - } else { - tmp.push(0); - tmp.extend(self.value_schema_id.to_be_bytes()); - () = avro::encode(&mut tmp, &self.value_schema, root.get())?; - - record_bytes += tmp.len(); - buf.extend_from_slice(&tmp); - tmp.clear(); - Some(buf.split().freeze()) - }; + let value = + if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) { + None + } else if is_deletion && matches!(self.deletes, DeletionMode::Header) { + tmp.push(0); + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + } else { + tmp.push(0); + tmp.extend(self.value_schema_id.to_be_bytes()); + () = avro::encode(&mut tmp, &self.value_schema, root.get())?; + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + }; self.offset = next_offset; @@ -293,7 +312,21 @@ impl Read { records.push(Record { control: is_control, - headers: Default::default(), + headers: if matches!(self.deletes, DeletionMode::Header) { + let deletion_val = if is_deletion { + DELETION_VAL_DELETED + } else { + DELETION_VAL_NOT_DELETED + }; + let mut headers = kafka_protocol::indexmap::IndexMap::new(); + headers.insert( + StrBytes::from_static_str(DELETION_HEADER), + Some(bytes::Bytes::from_static(&deletion_val)), + ); + headers + } else { + Default::default() + }, key, offset: kafka_offset, partition_leader_epoch: 1, diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 9d6e816560..e83512726d 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -380,13 +380,17 @@ impl Session { .. } = request; - let mut client = self - .auth - .as_mut() - .ok_or(anyhow::anyhow!("Session not authenticated"))? - .authenticated_client() - .await? - .clone(); + let (mut client, config) = { + let auth = self + .auth + .as_mut() + .ok_or(anyhow::anyhow!("Session not authenticated"))?; + + ( + auth.authenticated_client().await?.clone(), + auth.task_config.to_owned(), + ) + }; let timeout = std::time::Duration::from_millis(max_wait_ms as u64); @@ -537,6 +541,7 @@ impl Session { key_schema_id, value_schema_id, Some(partition_request.fetch_offset - 1), + config.deletions, ) .next_batch( // Have to read at least 2 docs, as the very last doc @@ -564,6 +569,7 @@ impl Session { key_schema_id, value_schema_id, None, + config.deletions, ) .next_batch( crate::read::ReadTarget::Bytes(