Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make privatelink.endpoint accept json array #20154

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 136 additions & 4 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::error::ConnectorResult;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
const PRIVATELINK_ENDPOINT_HOST_KEY: &str = "host";

#[derive(Debug)]
pub(super) enum PrivateLinkContextRole {
Expand Down Expand Up @@ -135,10 +136,18 @@ pub fn insert_privatelink_broker_rewrite_map(
}

if let Some(endpoint) = privatelink_endpoint {
for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
// rewrite the broker address to endpoint:port
broker_rewrite_map.insert(broker.to_owned(), format!("{}:{}", &endpoint, link.port));
}
// new syntax: endpoint can either be a string or a json array of strings
// if it is a string, rewrite all broker addresses to the same endpoint
// eg. privatelink.endpoint='some_url' ==> broker1:9092 -> some_url:9092, broker2:9093 -> some_url:9093
// if it is a json array, rewrite each broker address to the corresponding endpoint
// eg. privatelink.endpoint = '[{"host": "aaaa"}, {"host": "bbbb"}, {"host": "cccc"}]'
// ==> broker1:9092 -> aaaa:9092, broker2:9093 -> bbbb:9093, broker3:9094 -> cccc:9094
handle_privatelink_endpoint(
&endpoint,
&mut broker_rewrite_map,
&link_targets,
&broker_addrs,
)?;
} else {
if svc.is_none() {
bail!("Privatelink endpoint not found.");
Expand Down Expand Up @@ -166,3 +175,126 @@ pub fn insert_privatelink_broker_rewrite_map(
with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_owned(), json);
Ok(())
}

fn handle_privatelink_endpoint(
endpoint: &str,
broker_rewrite_map: &mut HashMap<String, String>,
link_targets: &[AwsPrivateLinkItem],
broker_addrs: &[&str],
) -> ConnectorResult<()> {
let endpoint = if let Ok(json) = serde_json::from_str::<serde_json::Value>(endpoint) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we introduce a potential risk is if the user input an illegal json, which cannot be parsed, we will see it as a string, instead of returning error. The case seems acceptable to me because he cannot connect to the broker using the wrong endpoint addr and he will find out later.

json
} else {
serde_json::Value::String(endpoint.to_owned())
};
if matches!(endpoint, serde_json::Value::String(_)) {
let endpoint = endpoint.as_str().unwrap();
for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.iter()) {
// rewrite the broker address to endpoint:port
broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", endpoint, link.port));
}
} else if matches!(endpoint, serde_json::Value::Array(_)) {
let endpoint_list = endpoint.as_array().unwrap();
for ((link, broker), endpoint) in link_targets
.iter()
.zip_eq_fast(broker_addrs.iter())
.zip_eq_fast(endpoint_list.iter())
{
let host = endpoint.get(PRIVATELINK_ENDPOINT_HOST_KEY).ok_or_else(|| {
anyhow!(
"privatelink.endpoint's item does not contain key `{}`: {}",
PRIVATELINK_ENDPOINT_HOST_KEY,
endpoint
)
})?;
// rewrite the broker address to endpoint:port
broker_rewrite_map.insert(
broker.to_string(),
format!("{}:{}", host.as_str().unwrap(), link.port),
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
);
}
} else {
unreachable!()
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_handle_privatelink_endpoint() {
let endpoint = "some_url"; // raw string
let link_targets = vec![
AwsPrivateLinkItem {
az_id: None,
port: 9092,
},
AwsPrivateLinkItem {
az_id: None,
port: 9093,
},
];
let broker_addrs = vec!["broker1:9092", "broker2:9093"];
let mut broker_rewrite_map = HashMap::new();
handle_privatelink_endpoint(
endpoint,
&mut broker_rewrite_map,
&link_targets,
&broker_addrs,
)
.unwrap();

assert_eq!(broker_rewrite_map.len(), 2);
assert_eq!(broker_rewrite_map["broker1:9092"], "some_url:9092");
assert_eq!(broker_rewrite_map["broker2:9093"], "some_url:9093");

// example 2: json array
let endpoint = r#"[{"host": "aaaa"}, {"host": "bbbb"}, {"host": "cccc"}]"#;
let broker_addrs = vec!["broker1:9092", "broker2:9093", "broker3:9094"];
let link_targets = vec![
AwsPrivateLinkItem {
az_id: None,
port: 9092,
},
AwsPrivateLinkItem {
az_id: None,
port: 9093,
},
AwsPrivateLinkItem {
az_id: None,
port: 9094,
},
];
let mut broker_rewrite_map = HashMap::new();
handle_privatelink_endpoint(
endpoint,
&mut broker_rewrite_map,
&link_targets,
&broker_addrs,
)
.unwrap();

assert_eq!(broker_rewrite_map.len(), 3);
assert_eq!(broker_rewrite_map["broker1:9092"], "aaaa:9092");
assert_eq!(broker_rewrite_map["broker2:9093"], "bbbb:9093");
assert_eq!(broker_rewrite_map["broker3:9094"], "cccc:9094");

// no `host` in the json array
let endpoint = r#"[{"somekey_1": "aaaa"}, {"somekey_2": "bbbb"}, {"somekey_3": "cccc"}]"#;
let mut broker_rewrite_map = HashMap::new();
let err = handle_privatelink_endpoint(
endpoint,
&mut broker_rewrite_map,
&link_targets,
&broker_addrs,
)
.unwrap_err();
assert_eq!(
err.to_string(),
"privatelink.endpoint's item does not contain key `host`: {\"somekey_1\":\"aaaa\"}"
);
}
}
Loading