From 65953d83f33447e98704233f1bba4136f989dcbd Mon Sep 17 00:00:00 2001 From: Peter Morgan Date: Mon, 26 Aug 2024 11:28:39 +0100 Subject: [PATCH] group coordinator lifecycle --- .../src/coordinator/group/administrator.rs | 313 +++++++++++++++--- 1 file changed, 266 insertions(+), 47 deletions(-) diff --git a/tansu-server/src/coordinator/group/administrator.rs b/tansu-server/src/coordinator/group/administrator.rs index 70b80ae..6bad3db 100644 --- a/tansu-server/src/coordinator/group/administrator.rs +++ b/tansu-server/src/coordinator/group/administrator.rs @@ -31,7 +31,7 @@ use tansu_kafka_sans_io::{ sync_group_request::SyncGroupRequestAssignment, Body, ErrorCode, }; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use uuid::Uuid; use crate::{Error, Result}; @@ -753,7 +753,6 @@ pub struct Inner { rebalance_timeout_ms: Option, group_instance_id: Option, members: BTreeMap, - member_id: i32, generation_id: i32, state: S, } @@ -765,7 +764,6 @@ impl Default for Inner { rebalance_timeout_ms: None, group_instance_id: None, members: BTreeMap::new(), - member_id: 1_000, generation_id: 0, state: Fresh, } @@ -868,10 +866,9 @@ impl Group for Inner { rebalance_timeout_ms, group_instance_id: None, members, - member_id: self.member_id, state: Forming { - protocol_name: String::from(protocol.name.as_str()), - protocol_type: String::from(protocol_type), + protocol_name: protocol.name.as_str().to_owned(), + protocol_type: protocol_type.to_owned(), leader: Some(member_id.to_owned()), }, } @@ -885,7 +882,7 @@ impl Group for Inner { error_code: ErrorCode::None.into(), generation_id, protocol_name: Some(protocol.name.clone()), - protocol_type: Some(String::from(protocol_type)), + protocol_type: Some(protocol_type.to_owned()), leader: member_id.to_owned(), skip_assignment: Some(false), member_id: member_id.to_owned(), @@ -1137,6 +1134,15 @@ impl Group for Inner { debug!(?member_id, ?self.members); + if self.state.leader.is_none() { + info!( + "{member_id} is leader of: {group_id} in generation: {}", + self.generation_id + ); + + _ = self.state.leader.replace(member_id.to_owned()); + } + let body = { Body::JoinGroupResponse { throttle_time_ms: Some(0), @@ -1286,7 +1292,6 @@ impl Group for Inner { group_instance_id: self.group_instance_id, members: self.members, generation_id: self.generation_id, - member_id: self.member_id, state: Formed { protocol_name: self.state.protocol_name, protocol_type: self.state.protocol_type, @@ -1573,7 +1578,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -1590,7 +1594,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -1701,7 +1704,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -1875,7 +1877,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -1898,7 +1899,7 @@ impl Group for Inner { leader: state .leader() .map(|s| s.to_owned()) - .unwrap_or(String::from("")), + .unwrap_or("".to_owned()), skip_assignment: Some(false), member_id: member_id.to_string(), members: Some(members), @@ -1916,7 +1917,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -2113,11 +2113,10 @@ impl Group for Inner { }) }; - let state: Wrapper = if members.iter().any(|member| { - let error_code = i16::from(ErrorCode::None); - - member.error_code == error_code - }) { + let state: Wrapper = if members + .iter() + .any(|member| member.error_code == i16::from(ErrorCode::None)) + { let leader = if self.members.contains_key(&self.state.leader) { Some(self.state.leader) } else { @@ -2130,7 +2129,6 @@ impl Group for Inner { rebalance_timeout_ms: self.rebalance_timeout_ms, group_instance_id: self.group_instance_id, members: self.members, - member_id: self.member_id, state: Forming { protocol_type: self.state.protocol_type, protocol_name: self.state.protocol_name, @@ -2650,7 +2648,7 @@ mod tests { } #[test] - fn formed_join() -> Result<()> { + fn lifecycle() -> Result<()> { let _guard = init_tracing()?; let s: Wrapper = Inner::::default().into(); @@ -2706,14 +2704,14 @@ mod tests { protocol_name: None, leader, skip_assignment: Some(false), - members, + members: Some(members), member_id, }, ) => { assert_eq!(error_code, i16::from(ErrorCode::MemberIdRequired)); assert!(leader.is_empty()); assert!(member_id.starts_with(CLIENT_ID)); - assert_eq!(Some(0), members.map(|members| members.len())); + assert_eq!(0, members.len()); let (s, join_response) = s.join( now, @@ -2762,14 +2760,14 @@ mod tests { assert_eq!(1, s.members().len()); - let first_member_assignment_01 = Bytes::from_static(b"assignment_01"); + let s = { + let first_member_assignment_01 = Bytes::from_static(b"assignment_01"); - let assignments = [SyncGroupRequestAssignment { - member_id: first_member_id.clone(), - assignment: first_member_assignment_01.clone(), - }]; + let assignments = [SyncGroupRequestAssignment { + member_id: first_member_id.clone(), + assignment: first_member_assignment_01.clone(), + }]; - let s = { let generation_id = s.generation_id(); let (s, sync_response) = s.sync( @@ -2853,14 +2851,14 @@ mod tests { protocol_name: None, leader, skip_assignment: Some(false), - members, + members: Some(members), member_id, }, ) => { assert_eq!(error_code, i16::from(ErrorCode::MemberIdRequired)); assert!(leader.is_empty()); assert!(member_id.starts_with(CLIENT_ID)); - assert_eq!(Some(0), members.map(|members| members.len())); + assert_eq!(0, members.len()); let previous_generation = s.generation_id(); @@ -3055,20 +3053,9 @@ mod tests { metadata: second_member_sticky_meta.clone(), }, ]; + let unchanged_generation_id = s.generation_id(); - let join_response_expected = Body::JoinGroupResponse { - throttle_time_ms: Some(0), - error_code: ErrorCode::None.into(), - generation_id: s.generation_id(), - protocol_type: Some(PROTOCOL_TYPE.into()), - protocol_name: Some(RANGE.into()), - leader: first_member_id.clone(), - skip_assignment: Some(false), - member_id: second_member_id.clone(), - members: Some([].into()), - }; - - let (s, join_group_response) = s.join( + match s.join( now, Some(CLIENT_ID), GROUP_ID, @@ -3079,11 +3066,34 @@ mod tests { PROTOCOL_TYPE, Some(&protocols), reason, - ); + ) { + ( + s, + Body::JoinGroupResponse { + throttle_time_ms: Some(0), + error_code, + generation_id, + protocol_type: Some(protocol_type), + protocol_name: Some(protocol_name), + leader, + skip_assignment: Some(false), + member_id, + members: Some(members), + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + assert_eq!(unchanged_generation_id, generation_id); + assert_eq!(PROTOCOL_TYPE, protocol_type); + assert_eq!(RANGE, protocol_name); + assert_eq!(first_member_id, leader); + assert_eq!(second_member_id, member_id); + assert_eq!(0, members.len()); - assert_eq!(join_response_expected, join_group_response); + s + } - s + otherwise => panic!("{otherwise:?}"), + } }; let second_member_assignment_02 = Bytes::from_static(b"second_member_assignment_02"); @@ -3131,6 +3141,8 @@ mod tests { let s = { let generation_id = s.generation_id(); + let assignments = []; + let (s, sync_response) = s.sync( now, GROUP_ID, @@ -3155,6 +3167,213 @@ mod tests { s }; + let s = { + let generation_id = s.generation_id(); + + match s.heartbeat( + now, + GROUP_ID, + generation_id, + &first_member_id, + group_instance_id, + ) { + ( + s, + Body::HeartbeatResponse { + throttle_time_ms: Some(0), + error_code, + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + s + } + + otherwise => panic!("{otherwise:?}"), + } + }; + + let s = { + let generation_id = s.generation_id(); + + match s.heartbeat( + now, + GROUP_ID, + generation_id, + &second_member_id, + group_instance_id, + ) { + ( + s, + Body::HeartbeatResponse { + throttle_time_ms: Some(0), + error_code, + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + s + } + + otherwise => panic!("{otherwise:?}"), + } + }; + + let generation_prior_to_leader_leaving = s.generation_id(); + + let s = { + match s.leave( + now, + GROUP_ID, + None, + Some(&[MemberIdentity { + member_id: first_member_id.clone(), + group_instance_id: None, + reason: Some("the consumer is being closed".into()), + }]), + ) { + ( + s, + Body::LeaveGroupResponse { + throttle_time_ms: Some(0), + error_code, + members: Some(members), + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + + assert_eq!( + vec![MemberResponse { + member_id: first_member_id.clone(), + group_instance_id: None, + error_code: 0, + }], + members + ); + + s + } + otherwise => panic!("{otherwise:?}"), + } + }; + + let s = { + match s.heartbeat( + now, + GROUP_ID, + generation_prior_to_leader_leaving, + &second_member_id, + group_instance_id, + ) { + ( + s, + Body::HeartbeatResponse { + throttle_time_ms: Some(0), + error_code, + }, + ) => { + assert_eq!(i16::from(ErrorCode::RebalanceInProgress), error_code); + s + } + + otherwise => panic!("{otherwise:?}"), + } + }; + + let s = { + let protocols = [ + JoinGroupRequestProtocol { + name: RANGE.into(), + metadata: second_member_range_meta.clone(), + }, + JoinGroupRequestProtocol { + name: COOPERATIVE_STICKY.into(), + metadata: second_member_sticky_meta.clone(), + }, + ]; + let unchanged_generation_id = s.generation_id(); + + match s.join( + now, + Some(CLIENT_ID), + GROUP_ID, + session_timeout_ms, + rebalance_timeout_ms, + &second_member_id, + group_instance_id, + PROTOCOL_TYPE, + Some(&protocols), + reason, + ) { + ( + s, + Body::JoinGroupResponse { + throttle_time_ms: Some(0), + error_code, + generation_id, + protocol_type: Some(protocol_type), + protocol_name: Some(protocol_name), + leader, + skip_assignment: Some(false), + member_id, + members: Some(members), + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + assert_eq!(unchanged_generation_id, generation_id); + assert_eq!(PROTOCOL_TYPE, protocol_type); + assert_eq!(RANGE, protocol_name); + assert_eq!(second_member_id, leader); + assert_eq!(second_member_id, member_id); + assert_eq!(1, members.len()); + + s + } + + otherwise => panic!("{otherwise:?}"), + } + }; + + let s = { + let second_member_assignment_03 = Bytes::from_static(b"second_member_assignment_03"); + + let assignments = [SyncGroupRequestAssignment { + member_id: second_member_id.clone(), + assignment: second_member_assignment_03.clone(), + }]; + + let generation_id = s.generation_id(); + + match s.sync( + now, + GROUP_ID, + generation_id, + &second_member_id, + group_instance_id, + Some(PROTOCOL_TYPE), + Some(RANGE), + Some(&assignments), + ) { + ( + s, + Body::SyncGroupResponse { + throttle_time_ms: Some(0), + error_code, + protocol_type: Some(protocol_type), + protocol_name: Some(protocol_name), + assignment, + }, + ) => { + assert_eq!(i16::from(ErrorCode::None), error_code); + assert_eq!(PROTOCOL_TYPE, protocol_type); + assert_eq!(RANGE, protocol_name); + assert_eq!(second_member_assignment_03, assignment); + + s + } + + otherwise => panic!("{otherwise:?}"), + } + }; + let _ = s; Ok(())