Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Implement DeletionMode to allow representing deletions as a Kafka header instead of a tombstone #1738

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions crates/avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,27 @@ pub enum Error {
ParseFloat(String, #[source] std::num::ParseFloatError),
}

/// Map a [`doc::Shape`] and key pointers into its equivalent AVRO schema.
pub fn shape_to_avro(
shape: doc::Shape,
key: &[doc::Pointer],
) -> (apache_avro::Schema, apache_avro::Schema) {
(
schema::key_to_avro(key, shape.clone()),
schema::shape_to_avro(json::Location::Root, shape, true),
)
}

/// Map a JSON schema bundle and key pointers into its equivalent AVRO schema.
pub fn json_schema_to_avro(
json_schema: &str,
schema: &str,
key: &[doc::Pointer],
) -> Result<(apache_avro::Schema, apache_avro::Schema), Error> {
let json_schema = doc::validation::build_bundle(json_schema)?;
let json_schema = doc::validation::build_bundle(schema)?;
let validator = doc::Validator::new(json_schema)?;
let shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index());

Ok((
schema::key_to_avro(key, shape.clone()),
schema::shape_to_avro(json::Location::Root, shape, true),
))
Ok(shape_to_avro(shape, key))
}

/// Encode a document into a binary AVRO representation using the given schema.
Expand Down
3 changes: 2 additions & 1 deletion crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
simd-doc = { path = "../simd-doc" }

anyhow = { workspace = true }
axum = { workspace = true }
axum-extra = { workspace = true }
axum-server = { workspace = true }
base64 = { workspace = true }
bumpalo = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
crypto-common = { workspace = true }
Expand All @@ -37,6 +37,7 @@ hex = { workspace = true }
hexdump = { workspace = true }
itertools = { workspace = true }
kafka-protocol = { workspace = true }
lazy_static = { workspace = true }
lz4_flex = { workspace = true }
md5 = { workspace = true }
metrics = { workspace = true }
Expand Down
24 changes: 24 additions & 0 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema, Copy)]
#[serde(rename_all = "snake_case")]
pub enum DeletionMode {
// Handles deletions using the regular Kafka upsert envelope, where a deletion
// is represented by a record containing the key that was deleted, and a null value.
Kafka,
// Handles deletions by passing through the full deletion document as it exists
// in the source collection, as well as including a new field `_meta/is_deleted`
// which is defined as the number `1` on deletions, and `0` otherwise.
#[serde(rename = "cdc")]
CDC,
}

impl Default for DeletionMode {
fn default() -> Self {
Self::Kafka
}
}

/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
Expand All @@ -15,6 +34,11 @@ pub struct DekafConfig {
// #[schemars(extend("secret" = true))]
#[schemars(schema_with = "token_secret")]
pub token: String,
/// How to handle deletion events. "Default" emits them as regular Kafka
/// tombstones with null values, and "Header" emits then as a kafka document
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
pub deletions: DeletionMode,
}

/// Configures a particular binding in a Dekaf-type materialization
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod api_client;
pub use api_client::KafkaApiClient;

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use connector::DekafConfig;
use connector::{DekafConfig, DeletionMode};
use flow_client::client::{refresh_authorizations, RefreshToken};
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use serde::{Deserialize, Serialize};
Expand All @@ -42,10 +42,13 @@ pub struct App {
pub client_base: flow_client::Client,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Copy)]
#[serde(deny_unknown_fields)]
pub struct DeprecatedConfigOptions {
#[serde(default = "bool::<false>")]
pub strict_topic_names: bool,
#[serde(default)]
pub deletions: DeletionMode,
}

pub struct Authenticated {
Expand Down Expand Up @@ -103,7 +106,9 @@ impl App {

let claims = flow_client::client::client_claims(&client)?;

if models::Materialization::regex().is_match(username.as_ref()) {
if models::Materialization::regex().is_match(username.as_ref())
&& !username.starts_with("{")
{
Ok(Authenticated {
client,
access_token: access,
Expand All @@ -119,6 +124,7 @@ impl App {
client,
task_config: DekafConfig {
strict_topic_names: config.strict_topic_names,
deletions: config.deletions,
token: "".to_string(),
},
access_token: access,
Expand Down
58 changes: 44 additions & 14 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use super::{Collection, Partition};
use anyhow::bail;
use crate::connector::DeletionMode;
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, BytesMut};
use doc::AsNode;
use doc::{heap::ArchivedNode, AsNode, HeapNode, OwnedArchivedNode};
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::{Compression, TimestampType};
use kafka_protocol::{
protocol::StrBytes,
records::{Compression, TimestampType},
};
use lazy_static::lazy_static;
use lz4_flex::frame::BlockMode;

pub struct Read {
Expand All @@ -31,6 +36,8 @@ pub struct Read {
// Offset before which no documents should be emitted
offset_start: i64,

deletes: DeletionMode,

pub(crate) rewrite_offsets_from: Option<i64>,
}

Expand All @@ -49,6 +56,10 @@ pub enum ReadTarget {
Docs(usize),
}

lazy_static! {
static ref DELETION_INDICATOR_PTR: doc::Pointer = doc::Pointer::from_str("/_meta/is_deleted");
}

impl Read {
pub fn new(
client: journal::Client,
Expand All @@ -58,6 +69,7 @@ impl Read {
key_schema_id: u32,
value_schema_id: u32,
rewrite_offsets_from: Option<i64>,
deletes: DeletionMode,
) -> Self {
let (not_before_sec, _) = collection.not_before.to_unix();

Expand Down Expand Up @@ -91,6 +103,7 @@ impl Read {

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
deletes,
offset_start: offset,
}
}
Expand All @@ -105,6 +118,8 @@ impl Read {
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
};

let mut alloc = bumpalo::Bump::new();

let mut records: Vec<Record> = Vec::new();
let mut records_bytes: usize = 0;

Expand Down Expand Up @@ -254,18 +269,33 @@ impl Read {
};

// Encode the value.
let value = if is_control || is_deletion {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;
let value =
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());

if matches!(self.deletes, DeletionMode::CDC) {
let mut heap_node = HeapNode::from_node(root.get(), &alloc);
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut heap_node, &alloc)
.context("Unable to add deletion meta indicator")?;

*foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 });

() = avro::encode(&mut tmp, &self.value_schema, &heap_node)?;

alloc.reset();
} else {
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;
}

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};

self.offset = next_offset;

Expand Down
8 changes: 6 additions & 2 deletions crates/dekaf/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ async fn get_subject_latest(
axum::extract::Path(subject): axum::extract::Path<String>,
) -> Response {
wrap(async move {
let Authenticated { client, .. } =
app.authenticate(auth.username(), auth.password()).await?;
let Authenticated {
client,
task_config,
..
} = app.authenticate(auth.username(), auth.password()).await?;

let (is_key, collection) = if subject.ends_with("-value") {
(false, &subject[..subject.len() - 6])
Expand All @@ -89,6 +92,7 @@ async fn get_subject_latest(
&from_downstream_topic_name(TopicName::from(StrBytes::from_string(
collection.to_string(),
))),
task_config.deletions,
)
.await
.context("failed to fetch collection metadata")?
Expand Down
Loading
Loading