Skip to content

Commit

Permalink
tidy up group coordinator into adminstrator
Browse files Browse the repository at this point in the history
  • Loading branch information
shortishly committed Aug 22, 2024
1 parent 2c8ddd6 commit 2c0b242
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 158 deletions.
9 changes: 3 additions & 6 deletions tansu-server/src/coordinator/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

pub mod administrator;
pub mod consumer;
pub mod cooperative_sticky;
pub mod range;
pub mod round_robin;
pub mod typestate;

use crate::Result;
use round_robin::{Fresh, Inner, RoundRobin, Wrapper};
use administrator::{Controller, Fresh, Inner, Wrapper};
use std::{collections::BTreeMap, fmt::Debug};
use tansu_kafka_sans_io::{
join_group_request::JoinGroupRequestProtocol,
Expand Down Expand Up @@ -193,7 +190,7 @@ pub struct GroupProvider;
impl ProvideCoordinator for GroupProvider {
fn provide_coordinator(&mut self) -> Result<Box<dyn Coordinator>> {
let wrapper: Wrapper = Inner::<Fresh>::default().into();
Ok(Box::new(RoundRobin::new(wrapper)) as Box<dyn Coordinator>)
Ok(Box::new(Controller::new(wrapper)) as Box<dyn Coordinator>)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,77 @@ use tracing::{debug, instrument};

use crate::{Error, Result};

use super::{typestate::Group, Coordinator};
use super::Coordinator;

pub trait Group: Debug + Send {
type JoinState;
type SyncState;
type HeartbeatState;
type LeaveState;
type OffsetCommitState;
type OffsetFetchState;

fn join(
self,
now: Instant,
group_id: &str,
session_timeout_ms: i32,
rebalance_timeout_ms: Option<i32>,
member_id: &str,
group_instance_id: Option<&str>,
protocol_type: &str,
protocols: Option<&[JoinGroupRequestProtocol]>,
reason: Option<&str>,
) -> (Self::JoinState, Body);

fn sync(
self,
now: Instant,
group_id: &str,
generation_id: i32,
member_id: &str,
group_instance_id: Option<&str>,
protocol_name: Option<&str>,
assignments: Option<&[SyncGroupRequestAssignment]>,
) -> (Self::SyncState, Body);

fn heartbeat(
self,
now: Instant,
group_id: &str,
generation_id: i32,
member_id: &str,
group_instance_id: Option<&str>,
) -> (Self::HeartbeatState, Body);

fn leave(
self,
now: Instant,
group_id: &str,
member_id: Option<&str>,
members: Option<&[MemberIdentity]>,
) -> (Self::LeaveState, Body);

fn offset_commit(
self,
now: Instant,
group_id: &str,
generation_id_or_member_epoch: Option<i32>,
member_id: Option<&str>,
group_instance_id: Option<&str>,
retention_time_ms: Option<i64>,
topics: Option<&[OffsetCommitRequestTopic]>,
) -> (Self::OffsetCommitState, Body);

fn offset_fetch(
self,
now: Instant,
group_id: Option<&str>,
topics: Option<&[OffsetFetchRequestTopic]>,
groups: Option<&[OffsetFetchRequestGroup]>,
require_stable: Option<bool>,
) -> (Self::OffsetFetchState, Body);
}

#[derive(Debug)]
pub enum Wrapper {
Expand Down Expand Up @@ -455,19 +525,19 @@ impl Wrapper {
}

#[derive(Debug)]
pub struct RoundRobin {
pub struct Controller {
wrapper: Option<Wrapper>,
}

impl RoundRobin {
impl Controller {
pub fn new(wrapper: Wrapper) -> Self {
Self {
wrapper: Some(wrapper),
}
}
}

impl Coordinator for RoundRobin {
impl Coordinator for Controller {
fn join(
&mut self,
group_id: &str,
Expand Down Expand Up @@ -636,26 +706,6 @@ pub struct Forming {
leader: Option<String>,
}

// impl From<Syncing> for Forming {
// fn from(value: Syncing) -> Self {
// Self {
// protocol_type: value.protocol_type,
// protocol_name: value.protocol_name,
// leader: Some(value.leader),
// }
// }
// }

// impl From<Formed> for Forming {
// fn from(value: Formed) -> Self {
// Self {
// protocol_type: value.protocol_type,
// protocol_name: value.protocol_name,
// leader: Some(value.leader),
// }
// }
// }

#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct Syncing {
protocol_type: String,
Expand Down
17 changes: 0 additions & 17 deletions tansu-server/src/coordinator/group/cooperative_sticky.rs

This file was deleted.

17 changes: 0 additions & 17 deletions tansu-server/src/coordinator/group/range.rs

This file was deleted.

94 changes: 0 additions & 94 deletions tansu-server/src/coordinator/group/typestate.rs

This file was deleted.

0 comments on commit 2c0b242

Please sign in to comment.