diff --git a/tansu-server/src/coordinator/group.rs b/tansu-server/src/coordinator/group.rs index f2edc13..400192e 100644 --- a/tansu-server/src/coordinator/group.rs +++ b/tansu-server/src/coordinator/group.rs @@ -13,14 +13,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +pub mod administrator; pub mod consumer; -pub mod cooperative_sticky; -pub mod range; -pub mod round_robin; -pub mod typestate; use crate::Result; -use round_robin::{Fresh, Inner, RoundRobin, Wrapper}; +use administrator::{Controller, Fresh, Inner, Wrapper}; use std::{collections::BTreeMap, fmt::Debug}; use tansu_kafka_sans_io::{ join_group_request::JoinGroupRequestProtocol, @@ -193,7 +190,7 @@ pub struct GroupProvider; impl ProvideCoordinator for GroupProvider { fn provide_coordinator(&mut self) -> Result> { let wrapper: Wrapper = Inner::::default().into(); - Ok(Box::new(RoundRobin::new(wrapper)) as Box) + Ok(Box::new(Controller::new(wrapper)) as Box) } } diff --git a/tansu-server/src/coordinator/group/round_robin.rs b/tansu-server/src/coordinator/group/administrator.rs similarity index 97% rename from tansu-server/src/coordinator/group/round_robin.rs rename to tansu-server/src/coordinator/group/administrator.rs index bc2c518..4ec797a 100644 --- a/tansu-server/src/coordinator/group/round_robin.rs +++ b/tansu-server/src/coordinator/group/administrator.rs @@ -35,7 +35,77 @@ use tracing::{debug, instrument}; use crate::{Error, Result}; -use super::{typestate::Group, Coordinator}; +use super::Coordinator; + +pub trait Group: Debug + Send { + type JoinState; + type SyncState; + type HeartbeatState; + type LeaveState; + type OffsetCommitState; + type OffsetFetchState; + + fn join( + self, + now: Instant, + group_id: &str, + session_timeout_ms: i32, + rebalance_timeout_ms: Option, + member_id: &str, + group_instance_id: Option<&str>, + protocol_type: &str, + protocols: Option<&[JoinGroupRequestProtocol]>, + reason: Option<&str>, + ) -> (Self::JoinState, Body); + + fn sync( + self, + now: Instant, + group_id: &str, + generation_id: i32, + member_id: &str, + group_instance_id: Option<&str>, + protocol_name: Option<&str>, + assignments: Option<&[SyncGroupRequestAssignment]>, + ) -> (Self::SyncState, Body); + + fn heartbeat( + self, + now: Instant, + group_id: &str, + generation_id: i32, + member_id: &str, + group_instance_id: Option<&str>, + ) -> (Self::HeartbeatState, Body); + + fn leave( + self, + now: Instant, + group_id: &str, + member_id: Option<&str>, + members: Option<&[MemberIdentity]>, + ) -> (Self::LeaveState, Body); + + fn offset_commit( + self, + now: Instant, + group_id: &str, + generation_id_or_member_epoch: Option, + member_id: Option<&str>, + group_instance_id: Option<&str>, + retention_time_ms: Option, + topics: Option<&[OffsetCommitRequestTopic]>, + ) -> (Self::OffsetCommitState, Body); + + fn offset_fetch( + self, + now: Instant, + group_id: Option<&str>, + topics: Option<&[OffsetFetchRequestTopic]>, + groups: Option<&[OffsetFetchRequestGroup]>, + require_stable: Option, + ) -> (Self::OffsetFetchState, Body); +} #[derive(Debug)] pub enum Wrapper { @@ -455,11 +525,11 @@ impl Wrapper { } #[derive(Debug)] -pub struct RoundRobin { +pub struct Controller { wrapper: Option, } -impl RoundRobin { +impl Controller { pub fn new(wrapper: Wrapper) -> Self { Self { wrapper: Some(wrapper), @@ -467,7 +537,7 @@ impl RoundRobin { } } -impl Coordinator for RoundRobin { +impl Coordinator for Controller { fn join( &mut self, group_id: &str, @@ -636,26 +706,6 @@ pub struct Forming { leader: Option, } -// impl From for Forming { -// fn from(value: Syncing) -> Self { -// Self { -// protocol_type: value.protocol_type, -// protocol_name: value.protocol_name, -// leader: Some(value.leader), -// } -// } -// } - -// impl From for Forming { -// fn from(value: Formed) -> Self { -// Self { -// protocol_type: value.protocol_type, -// protocol_name: value.protocol_name, -// leader: Some(value.leader), -// } -// } -// } - #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct Syncing { protocol_type: String, diff --git a/tansu-server/src/coordinator/group/cooperative_sticky.rs b/tansu-server/src/coordinator/group/cooperative_sticky.rs deleted file mode 100644 index 24cee70..0000000 --- a/tansu-server/src/coordinator/group/cooperative_sticky.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright ⓒ 2024 Peter Morgan -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -#[allow(dead_code)] -struct CooperativeSticky; diff --git a/tansu-server/src/coordinator/group/range.rs b/tansu-server/src/coordinator/group/range.rs deleted file mode 100644 index 402d28a..0000000 --- a/tansu-server/src/coordinator/group/range.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright ⓒ 2024 Peter Morgan -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -#[allow(dead_code)] -struct Range; diff --git a/tansu-server/src/coordinator/group/typestate.rs b/tansu-server/src/coordinator/group/typestate.rs deleted file mode 100644 index ca48f79..0000000 --- a/tansu-server/src/coordinator/group/typestate.rs +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright ⓒ 2024 Peter Morgan -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::{fmt::Debug, time::Instant}; -use tansu_kafka_sans_io::{ - join_group_request::JoinGroupRequestProtocol, - leave_group_request::MemberIdentity, - offset_commit_request::OffsetCommitRequestTopic, - offset_fetch_request::{OffsetFetchRequestGroup, OffsetFetchRequestTopic}, - sync_group_request::SyncGroupRequestAssignment, - Body, -}; - -pub(crate) trait Group: Debug + Send { - type JoinState; - type SyncState; - type HeartbeatState; - type LeaveState; - type OffsetCommitState; - type OffsetFetchState; - - fn join( - self, - now: Instant, - group_id: &str, - session_timeout_ms: i32, - rebalance_timeout_ms: Option, - member_id: &str, - group_instance_id: Option<&str>, - protocol_type: &str, - protocols: Option<&[JoinGroupRequestProtocol]>, - reason: Option<&str>, - ) -> (Self::JoinState, Body); - - fn sync( - self, - now: Instant, - group_id: &str, - generation_id: i32, - member_id: &str, - group_instance_id: Option<&str>, - protocol_name: Option<&str>, - assignments: Option<&[SyncGroupRequestAssignment]>, - ) -> (Self::SyncState, Body); - - fn heartbeat( - self, - now: Instant, - group_id: &str, - generation_id: i32, - member_id: &str, - group_instance_id: Option<&str>, - ) -> (Self::HeartbeatState, Body); - - fn leave( - self, - now: Instant, - group_id: &str, - member_id: Option<&str>, - members: Option<&[MemberIdentity]>, - ) -> (Self::LeaveState, Body); - - fn offset_commit( - self, - now: Instant, - group_id: &str, - generation_id_or_member_epoch: Option, - member_id: Option<&str>, - group_instance_id: Option<&str>, - retention_time_ms: Option, - topics: Option<&[OffsetCommitRequestTopic]>, - ) -> (Self::OffsetCommitState, Body); - - fn offset_fetch( - self, - now: Instant, - group_id: Option<&str>, - topics: Option<&[OffsetFetchRequestTopic]>, - groups: Option<&[OffsetFetchRequestGroup]>, - require_stable: Option, - ) -> (Self::OffsetFetchState, Body); -}