Skip to content

Commit

Permalink
fmt, group admin returns Some([]) members rather than None
Browse files Browse the repository at this point in the history
  • Loading branch information
shortishly committed Aug 26, 2024
1 parent 714512c commit 90748a3
Show file tree
Hide file tree
Showing 22 changed files with 163 additions and 195 deletions.
10 changes: 2 additions & 8 deletions tansu-kafka-sans-io/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,7 @@ fn kind(
} else if f.kind().is_primitive() {
let t = f.kind().type_name();

if f.nullable().is_none()
&& f.versions()
.is_mandatory(parent.map(Field::versions))
{
if f.nullable().is_none() && f.versions().is_mandatory(parent.map(Field::versions)) {
quote! {
#t
}
Expand All @@ -185,10 +182,7 @@ fn kind(
}
} else {
let t = f.kind().type_name();
if f.nullable().is_none()
&& f.versions()
.is_mandatory(parent.map(Field::versions))
{
if f.nullable().is_none() && f.versions().is_mandatory(parent.map(Field::versions)) {
if parent.is_none() && dependencies.contains(&t) {
quote! {
#module::#t
Expand Down
20 changes: 6 additions & 14 deletions tansu-kafka-sans-io/tests/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,13 +1432,9 @@ fn fetch_request_v16_001() -> Result<()> {
topics: Some(
[FetchTopic {
topic: None,
topic_id: Some(
[
246, 177, 3, 16, 190, 12, 74, 195, 190, 197, 130, 25, 106, 235,
221, 30
]
.into()
),
topic_id: Some([
246, 177, 3, 16, 190, 12, 74, 195, 190, 197, 130, 25, 106, 235, 221, 30
]),
partitions: Some(
[FetchPartition {
partition: 0,
Expand Down Expand Up @@ -1954,13 +1950,9 @@ fn fetch_response_v16_001() -> Result<()> {
responses: Some(
[FetchableTopicResponse {
topic: None,
topic_id: Some(
[
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210,
222
]
.into()
),
topic_id: Some([
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210, 222
]),
partitions: Some(
[PartitionData {
partition_index: 0,
Expand Down
18 changes: 6 additions & 12 deletions tansu-kafka-sans-io/tests/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,12 +1565,9 @@ fn fetch_response_v16_001() -> Result<()> {
responses: Some(
[FetchableTopicResponse {
topic: None,
topic_id: Some(
[
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210, 222,
]
.into(),
),
topic_id: Some([
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210, 222,
]),
partitions: Some(
[PartitionData {
partition_index: 0,
Expand Down Expand Up @@ -1665,12 +1662,9 @@ fn fetch_response_v16_002() -> Result<()> {
responses: Some(
[FetchableTopicResponse {
topic: None,
topic_id: Some(
[
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210, 222,
]
.into(),
),
topic_id: Some([
28, 205, 172, 195, 142, 19, 71, 71, 182, 128, 13, 18, 65, 142, 210, 222,
]),
partitions: Some(
[PartitionData {
partition_index: 0,
Expand Down
4 changes: 2 additions & 2 deletions tansu-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use tansu_kafka_sans_io as _;
use thiserror as _;
use clap::Parser;
use tansu_kafka_sans_io as _;
use tansu_proxy::Result;
use thiserror as _;
use tokio::task::JoinSet;
use tracing::{debug, Level};
use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, prelude::*};
Expand Down
8 changes: 4 additions & 4 deletions tansu-raft/examples/in-memory-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use bytes::{BufMut, Bytes, BytesMut};
use clap::Arg;
use clap::ArgAction;
use clap::Command;
use std::{
collections::{BTreeMap, BTreeSet},
io::Cursor,
ops::RangeFrom,
time::Duration,
};
use clap::Command;
use clap::Arg;
use clap::ArgAction;
use bytes::{BufMut, Bytes, BytesMut};
use tansu_raft::{
blocking::{persistent::PersistentManager, PersistentState, ProvidePersistentState},
log_key_for_index,
Expand Down
5 changes: 4 additions & 1 deletion tansu-raft/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ impl<R, S, T, U> SituationBuilder<R, S, T, U> {
}
}

pub(crate) fn server(self, server: Box<dyn Server>) -> SituationBuilder<R, Box<dyn Server>, T, U> {
pub(crate) fn server(
self,
server: Box<dyn Server>,
) -> SituationBuilder<R, Box<dyn Server>, T, U> {
SituationBuilder {
persistent_state: self.persistent_state,
server,
Expand Down
5 changes: 1 addition & 4 deletions tansu-raft/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ struct Entry {

impl Entry {
fn size_of() -> usize {
size_of::<u64>()
+ size_of::<u64>()
+ size_of::<u64>()
+ size_of::<u8>()
size_of::<u64>() + size_of::<u64>() + size_of::<u64>() + size_of::<u8>()
}
}

Expand Down
2 changes: 1 addition & 1 deletion tansu-raft/src/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod tests {
let mut leader = Leader::default();

ni.iter().cloned().for_each(|(node, index)| {
_ = leader.next_index.insert(node, index);
_ = leader.next_index.insert(node, index);
});

assert_eq!(
Expand Down
6 changes: 4 additions & 2 deletions tansu-raft/tests/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ impl Applicator {
index: Index,
callback: impl Fn(&u32) + Send + Sync + 'static,
) -> Result<()> {
_ = self.when_applied
_ = self
.when_applied
.lock()
.map(|mut when_applied| when_applied.insert(index, Box::new(callback)))?;
Ok(())
Expand All @@ -78,7 +79,8 @@ impl ApplyState for Applicator {
.inspect(|state| _ = dbg!(state))
.unwrap();

_ = self.when_applied
_ = self
.when_applied
.lock()
.map(|mut context| context.remove(&index).map(|callback| callback(&state)))?;

Expand Down
88 changes: 32 additions & 56 deletions tansu-server/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ pub struct Broker {
}

impl Broker {
pub fn new<'a>(
#[allow(clippy::too_many_arguments)]
pub fn new(
node_id: i32,
cluster_id: &'a str,
cluster_id: &str,
context: Raft,
applicator: Applicator,
listener: Url,
Expand Down Expand Up @@ -115,7 +116,7 @@ impl Broker {
self.when_applied(Body::BrokerRegistrationRequest {
broker_id: self.node_id,
cluster_id: self.cluster_id.clone(),
incarnation_id: self.incarnation_id.as_bytes().clone(),
incarnation_id: *self.incarnation_id.as_bytes(),
listeners: Some(vec![Listener {
name: "broker".into(),
host: self.listener.host_str().unwrap_or("localhost").to_owned(),
Expand Down Expand Up @@ -181,10 +182,10 @@ impl Broker {
_ = stream.read_exact(&mut request[4..]).await?;
debug!(?request);

let mut response = self.process_request(&request).await?;
let response = self.process_request(&request).await?;
debug!(?response);

stream.write_all(&mut response).await?;
stream.write_all(&response).await?;
}
}

Expand All @@ -204,7 +205,7 @@ impl Broker {
} => {
debug!(?api_key, ?api_version, ?correlation_id);
let body = self
.response_for(client_id.as_ref().map(|s| s.as_str()), body, correlation_id)
.response_for(client_id.as_deref(), body, correlation_id)
.await?;
debug!(?body, ?correlation_id);
Frame::response(
Expand All @@ -224,7 +225,7 @@ impl Broker {
debug!(?self, ?body);

let id = Uuid::new_v4();
let request = Request::new(id.clone(), body);
let request = Request::new(id, body);
let command = &request as &dyn Command;

let json = serde_json::to_string(command)?;
Expand Down Expand Up @@ -255,12 +256,8 @@ impl Broker {
} => {
let api_versions = ApiVersionsRequest;
Ok(api_versions.response(
client_software_name
.as_ref()
.map(|client_software_name| client_software_name.as_str()),
client_software_version
.as_ref()
.map(|client_software_version| client_software_version.as_str()),
client_software_name.as_deref(),
client_software_version.as_deref(),
))
}

Expand Down Expand Up @@ -290,7 +287,7 @@ impl Broker {
} => {
let describe_configs = DescribeConfigsRequest;
Ok(describe_configs.response(
resources.as_ref().map(|resources| resources.as_slice()),
resources.as_deref(),
include_synonyms,
include_documentation,
))
Expand All @@ -313,7 +310,7 @@ impl Broker {
min_bytes,
max_bytes,
isolation_level,
topics.as_ref().map(|topics| topics.as_slice()),
topics.as_deref(),
&state,
)
.await
Expand All @@ -327,11 +324,9 @@ impl Broker {
let find_coordinator = FindCoordinatorRequest;

Ok(find_coordinator.response(
key.as_ref().map(|key| key.as_str()),
key.as_deref(),
key_type,
coordinator_keys
.as_ref()
.map(|coordinator_keys| coordinator_keys.as_slice()),
coordinator_keys.as_deref(),
self.node_id,
&self.listener,
))
Expand All @@ -356,9 +351,7 @@ impl Broker {
&group_id,
generation_id,
&member_id,
group_instance_id
.as_ref()
.map(|group_instance_id| group_instance_id.as_str()),
group_instance_id.as_deref(),
)
}

Expand All @@ -370,9 +363,7 @@ impl Broker {
} => {
let init_producer_id = InitProducerIdRequest;
Ok(init_producer_id.response(
transactional_id
.as_ref()
.map(|transaction_id| transaction_id.as_str()),
transactional_id.as_deref(),
transaction_timeout_ms,
producer_id,
producer_epoch,
Expand All @@ -399,12 +390,10 @@ impl Broker {
session_timeout_ms,
rebalance_timeout_ms,
&member_id,
group_instance_id
.as_ref()
.map(|group_instance_id| group_instance_id.as_str()),
group_instance_id.as_deref(),
&protocol_type,
protocols.as_ref().map(|protocols| protocols.as_slice()),
reason.as_ref().map(|reason| reason.as_str()),
protocols.as_deref(),
reason.as_deref(),
)
}

Expand All @@ -417,25 +406,20 @@ impl Broker {
groups: self.groups.clone(),
};

leave.response(
&group_id,
member_id.as_ref().map(|member_id| member_id.as_str()),
members.as_ref().map(|members| members.as_slice()),
)
leave.response(&group_id, member_id.as_deref(), members.as_deref())
}

Body::ListPartitionReassignmentsRequest { topics, .. } => {
let state = self.applicator.with_current_state().await;
let list_partition_reassignments = ListPartitionReassignmentsRequest;
Ok(list_partition_reassignments
.response(topics.as_ref().map(|topics| topics.as_slice()), &state))
Ok(list_partition_reassignments.response(topics.as_deref(), &state))
}

Body::MetadataRequest { topics, .. } => {
let controller_id = Some(self.node_id);
let state = self.applicator.with_current_state().await;

let topics = topics.as_ref().map(|topics| topics.as_slice());
let topics = topics.as_deref();

let request = MetadataRequest;
Ok(request.response(controller_id, topics, &state))
Expand All @@ -456,10 +440,10 @@ impl Broker {
offset_commit.response(
&group_id,
generation_id_or_member_epoch,
member_id.as_ref().map(|s| s.as_str()),
group_instance_id.as_ref().map(|s| s.as_str()),
member_id.as_deref(),
group_instance_id.as_deref(),
retention_time_ms,
topics.as_ref().map(|topics| topics.as_slice()),
topics.as_deref(),
)
}

Expand All @@ -474,9 +458,9 @@ impl Broker {
};

offset_fetch.response(
group_id.as_ref().map(|group_id| group_id.as_str()),
topics.as_ref().map(|topics| topics.as_slice()),
groups.as_ref().map(|groups| groups.as_slice()),
group_id.as_deref(),
topics.as_deref(),
groups.as_deref(),
require_stable,
)
}
Expand Down Expand Up @@ -518,18 +502,10 @@ impl Broker {
&group_id,
generation_id,
&member_id,
group_instance_id
.as_ref()
.map(|group_instance_id| group_instance_id.as_str()),
protocol_type
.as_ref()
.map(|protocol_type| protocol_type.as_str()),
protocol_name
.as_ref()
.map(|protocol_name| protocol_name.as_str()),
assignments
.as_ref()
.map(|assignments| assignments.as_slice()),
group_instance_id.as_deref(),
protocol_type.as_deref(),
protocol_name.as_deref(),
assignments.as_deref(),
)
}

Expand Down
Loading

0 comments on commit 90748a3

Please sign in to comment.