Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf: Support reads at a specific offset #1733

Merged
merged 3 commits into from
Oct 23, 2024
Merged

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Oct 23, 2024

This changes the way Dekaf reads from journals in order to support reads issued at a particular document offset. Previously, since the reported document offset is the last inclusive byte of that doc, a new Fetch request issued starting at the requested offset would not include that document. We didn't think this would cause issues as caught-up consumers ought to poll from the next offset, but we were seeing consumers instead try to fetch the last inclusive offset instead.

So this shifts new read requests to read from at most 16mb before the requested offset (the max document size is 16mb). While this will result in higher read latencies as a whole bunch of documents are read through and thrown away, it will allow consumers to fetch any document by the offset it was reported as being at.

Also included:

In order to debug what's happening with infrequent consumer lags, we want to keep track of how many read requests Dekaf is getting, broken down by collection and partition. This tracks that, as well as breaking down by read request type:

  • state => read_pending means the read request came in after a previous request had already fetched the same offset for the topic and partition, and that request is pending.
  • state => collection_not_found and state => partition_not_found are self-evident, though I believe a normal Kafka consumer would have figured out that these topics don't exist during the metadata discovery phase
  • state => new_data_preview_read indicates a fetch request that we flagged as being for a data preview. These requests have special handling and poison the entire connection to only be usable for other data preview reads
  • state => new_regular_read indicates a fetch request for a new topic/partition/offset combination.

Ex:

# TYPE dekaf_fetch_requests counter
dekaf_fetch_requests{topic_name="joseph/dekaf-testing",partition_index="0",state="new_regular_read"} 19
dekaf_fetch_requests{topic_name="joseph/dekaf-testing",partition_index="0",state="read_pending"} 248

This change is Reviewable

@jshearer
Copy link
Contributor Author

Tested this in dekaf-dev, new dataflows are still able to read docs and existing test dataflows still spit out newly written docs without interruption, so I don't think it'll break anything.

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments

@@ -7,7 +7,10 @@ use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::{Compression, TimestampType};
use lz4_flex::frame::BlockMode;
use std::time::{Duration, Instant};
use std::{
cmp::max,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer to use these explicitly. std::time::Duration is not much longer than Duration and clarifies which duration it is -- because chrono and time and sqlx::types::chrono all also implement Duration.

std::cmp::max is similarly short to type and explicit for the reader.

Consider turning off auto-imports if this is happening too implicitly in your workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change. What's your heuristic here? I don't have auto-imports turned on, I manually import these from the rust-analyzer hints. It sounds like you're selecting for some combination of potential for confusion (Duration), and length of the fully-qualified name (std::cmp::max). We obviously shouldn't fully qualify everything, or at least the Rust devs don't think so as they provided use as a feature.

I wonder if I can find a combination of rust-analyzer/rustfmt rules that automate this 🤔

Copy link
Contributor Author

@jshearer jshearer Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to acknowledge that I've asked this before and you've answered, so maybe nothing more needs to be said and I just need to put more thought into this, or try to improve the automation:

#765 (comment)

crates/dekaf/src/read.rs Outdated Show resolved Hide resolved
Do this by starting `Read`s up to the max size of one document before the requested offset
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants