Skip to content

Commit

Permalink
KafkaSinkCluster: route *PartitionReassignments (#1811)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 14, 2024
1 parent bab7b97 commit 0310cbf
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 6 deletions.
53 changes: 51 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment,
NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
Expand Down Expand Up @@ -1653,6 +1653,54 @@ async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
assert_eq!(actual_results, expected_results);
}

async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.alter_partition_reassignments(HashMap::from([(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
NewPartitionReassignment {
replica_broker_ids: vec![0],
},
)]))
.await;

let actual_results = admin.list_partition_reassignments().await;

if actual_results.is_empty() {
// If too much time passes between requesting the reassignment and listing the reassignment it,
// the reassignment might have already completed so there is nothing to list.
// In that case return early to skip the assertions.
//
// The assertions should still run sometimes, so its worth keeping around.
// And at the very least we know that the messages are sent/received succesfully.
return;
}

assert_eq!(actual_results.len(), 1);
let reassignment = actual_results
.get(&TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
})
.unwrap();
let expected_adding_replica_broker_ids: &[i32] = if reassignment.replica_broker_ids == [0] {
// The original broker is randomly assigned.
// If it happens to be broker 0, matching the new broker we requested,
// then adding_replica_broker_ids will be empty
&[]
} else {
// otherwise it contains the new broker we requested
&[0]
};
assert_eq!(
reassignment.adding_replica_broker_ids,
expected_adding_replica_broker_ids
);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand All @@ -1679,6 +1727,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
create_and_list_partition_reassignments(connection_builder).await;
}
}

Expand Down
4 changes: 3 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,9 @@ The connection to the client has been closed."
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
| RequestBody::ApiVersions(_)
| RequestBody::AlterPartitionReassignments(_)
| RequestBody::ListPartitionReassignments(_),
..
})) => self.route_to_random_broker(request),

Expand Down
70 changes: 67 additions & 3 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult,
Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition, TopicPartitionInfo,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -799,6 +799,70 @@ impl KafkaAdminJava {
.map(|_| ())
}

pub async fn alter_partition_reassignments(
&self,
reassignments: HashMap<TopicPartition, NewPartitionReassignment>,
) {
let reassignments_java: Vec<_> = reassignments
.into_iter()
.map(|(topic_partition, reassignment)| {
(
topic_partition_to_java(&self.jvm, &topic_partition),
self.jvm.call_static(
"java.util.Optional",
"of",
vec![self.jvm.construct(
"org.apache.kafka.clients.admin.NewPartitionReassignment",
vec![self.jvm.new_list(
"java.lang.Integer",
reassignment
.replica_broker_ids
.into_iter()
.map(|x| self.jvm.new_int_object(x))
.collect(),
)],
)],
),
)
})
.collect();
let reassignments_java = self.jvm.new_map(reassignments_java);
self.admin
.call("alterPartitionReassignments", vec![reassignments_java])
.call_async("all", vec![])
.await;
}

pub async fn list_partition_reassignments(
&self,
) -> HashMap<TopicPartition, PartitionReassignment> {
let java_results = self
.admin
.call("listPartitionReassignments", vec![])
.call_async("reassignments", vec![])
.await;

map_iterator(java_results)
.map(|(topic_partition, partition_reassignment)| {
(topic_partition_to_rust(topic_partition), {
let partition_reassignment = partition_reassignment
.cast("org.apache.kafka.clients.admin.PartitionReassignment");
PartitionReassignment {
adding_replica_broker_ids: partition_reassignment
.call("addingReplicas", vec![])
.into_rust(),
removing_replica_broker_ids: partition_reassignment
.call("removingReplicas", vec![])
.into_rust(),
replica_broker_ids: partition_reassignment
.call("replicas", vec![])
.into_rust(),
}
})
})
.collect()
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down
37 changes: 37 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,31 @@ impl KafkaAdmin {
}
}

pub async fn alter_partition_reassignments(
&self,
reassignments: HashMap<TopicPartition, NewPartitionReassignment>,
) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support alter_partition_reassignments")
}
Self::Java(java) => java.alter_partition_reassignments(reassignments).await,
}
}

pub async fn list_partition_reassignments(
&self,
) -> HashMap<TopicPartition, PartitionReassignment> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support list_partition_reassignments")
}
Self::Java(java) => java.list_partition_reassignments().await,
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down Expand Up @@ -712,3 +737,15 @@ pub struct RecordsToDelete {
/// If -1 is given delete all records regardless of offset.
pub delete_before_offset: i64,
}

#[derive(PartialEq, Debug)]
pub struct PartitionReassignment {
pub adding_replica_broker_ids: Vec<i32>,
pub removing_replica_broker_ids: Vec<i32>,
pub replica_broker_ids: Vec<i32>,
}

#[derive(PartialEq, Debug)]
pub struct NewPartitionReassignment {
pub replica_broker_ids: Vec<i32>,
}

0 comments on commit 0310cbf

Please sign in to comment.