Skip to content

Commit

Permalink
dekaf: Implement DeletionMode to allow representing deletions as a …
Browse files Browse the repository at this point in the history
…Kafka header instead of a tombstone
  • Loading branch information
jshearer committed Oct 25, 2024
1 parent 8356f99 commit 93ba814
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 24 deletions.
18 changes: 18 additions & 0 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)]
#[serde(rename_all = "snake_case")]
pub enum DeletionMode {
Default,
Header,
}

impl Default for DeletionMode {
fn default() -> Self {
Self::Default
}
}

/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
Expand All @@ -15,6 +28,11 @@ pub struct DekafConfig {
// #[schemars(extend("secret" = true))]
#[schemars(schema_with = "token_secret")]
pub token: String,
/// How to handle deletion events. "Default" emits them as regular Kafka
/// tombstones with null values, and "Header" emits then as a kafka document
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
pub deletions: DeletionMode,
}

/// Configures a particular binding in a Dekaf-type materialization
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod api_client;
pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::DekafConfig;
use connector::{DekafConfig, DeletionMode};
use flow_client::client::{refresh_authorizations, RefreshToken};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
Expand All @@ -42,10 +42,13 @@ pub struct App {
pub client_base: flow_client::Client,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Copy)]
#[serde(deny_unknown_fields)]
pub struct DeprecatedConfigOptions {
#[serde(default = "bool::<false>")]
pub strict_topic_names: bool,
#[serde(default)]
pub deletions: DeletionMode,
}

pub struct Authenticated {
Expand Down Expand Up @@ -103,7 +106,9 @@ impl App {

let claims = flow_client::client::client_claims(&client)?;

if models::Materialization::regex().is_match(username.as_ref()) {
if models::Materialization::regex().is_match(username.as_ref())
&& !username.starts_with("{")
{
Ok(Authenticated {
client,
access_token: access,
Expand All @@ -119,6 +124,7 @@ impl App {
client,
task_config: DekafConfig {
strict_topic_names: config.strict_topic_names,
deletions: config.deletions,
token: "".to_string(),
},
access_token: access,
Expand Down
61 changes: 47 additions & 14 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::{Collection, Partition};
use crate::connector::DeletionMode;
use anyhow::bail;
use bytes::{Buf, BufMut, BytesMut};
use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::{Compression, TimestampType};
use kafka_protocol::{
protocol::StrBytes,
records::{Compression, TimestampType},
};
use lz4_flex::frame::BlockMode;

pub struct Read {
Expand All @@ -31,6 +35,8 @@ pub struct Read {
// Offset before which no documents should be emitted
offset_start: i64,

deletes: DeletionMode,

pub(crate) rewrite_offsets_from: Option<i64>,
}

Expand All @@ -50,6 +56,9 @@ pub enum ReadTarget {
}

const OFFSET_READBACK: i64 = 2 << 25 + 1; // 64mb, single document max size
const DELETION_HEADER: &str = "_is_deleted";
const DELETION_VAL_DELETED: &[u8] = &[1u8];
const DELETION_VAL_NOT_DELETED: &[u8] = &[0u8];

impl Read {
pub fn new(
Expand All @@ -60,6 +69,7 @@ impl Read {
key_schema_id: u32,
value_schema_id: u32,
rewrite_offsets_from: Option<i64>,
deletes: DeletionMode,
) -> Self {
let (not_before_sec, _) = collection.not_before.to_unix();

Expand Down Expand Up @@ -94,6 +104,7 @@ impl Read {

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
deletes,
offset_start: offset,
}
}
Expand Down Expand Up @@ -257,18 +268,26 @@ impl Read {
};

// Encode the value.
let value = if is_control || is_deletion {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};
let value =
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Default)) {
None
} else if is_deletion && matches!(self.deletes, DeletionMode::Header) {
tmp.push(0);

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};

self.offset = next_offset;

Expand All @@ -293,7 +312,21 @@ impl Read {

records.push(Record {
control: is_control,
headers: Default::default(),
headers: if matches!(self.deletes, DeletionMode::Header) {
let deletion_val = if is_deletion {
DELETION_VAL_DELETED
} else {
DELETION_VAL_NOT_DELETED
};
let mut headers = kafka_protocol::indexmap::IndexMap::new();
headers.insert(
StrBytes::from_static_str(DELETION_HEADER),
Some(bytes::Bytes::from_static(&deletion_val)),
);
headers
} else {
Default::default()
},
key,
offset: kafka_offset,
partition_leader_epoch: 1,
Expand Down
20 changes: 13 additions & 7 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,17 @@ impl Session {
..
} = request;

let mut client = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?
.clone();
let (mut client, config) = {
let auth = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?;

(
auth.authenticated_client().await?.clone(),
auth.task_config.to_owned(),
)
};

let timeout = std::time::Duration::from_millis(max_wait_ms as u64);

Expand Down Expand Up @@ -537,6 +541,7 @@ impl Session {
key_schema_id,
value_schema_id,
Some(partition_request.fetch_offset - 1),
config.deletions,
)
.next_batch(
// Have to read at least 2 docs, as the very last doc
Expand Down Expand Up @@ -564,6 +569,7 @@ impl Session {
key_schema_id,
value_schema_id,
None,
config.deletions,
)
.next_batch(
crate::read::ReadTarget::Bytes(
Expand Down

0 comments on commit 93ba814

Please sign in to comment.