Skip to content

Commit

Permalink
[ISSUE apache#9044]Supplemental checks that are missing from the Upda…
Browse files Browse the repository at this point in the history
…teSubGroup operation
  • Loading branch information
Willhow-Gao committed Dec 16, 2024
1 parent 93e2689 commit f6cc7d0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,9 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c

SubscriptionGroupConfig config = RemotingSerializable.decode(request.getBody(), SubscriptionGroupConfig.class);
if (config != null) {
if (!updateSubGroupPreCheck(config, response)) {
return response;
}
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(config);
}

Expand All @@ -1638,15 +1641,18 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
final List<SubscriptionGroupConfig> groupConfigList = subscriptionGroupList.getGroupConfigList();

final StringBuilder builder = new StringBuilder();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
for (SubscriptionGroupConfig config : groupConfigList) {
if (!updateSubGroupPreCheck(config, response)) {
return response;
}
builder.append(config.getGroupName()).append(";");
}
final String groupNames = builder.toString();
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroupList: groupNames: {}, called by {}",
groupNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
response.setCode(ResponseCode.SUCCESS);
Expand All @@ -1665,6 +1671,15 @@ private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerConte
return response;
}

private boolean updateSubGroupPreCheck(SubscriptionGroupConfig config,RemotingCommand resp) {
if (StringUtils.isBlank(config.getGroupName())) {
resp.setCode(ResponseCode.ILLEGAL_ARGUMENT);
resp.setRemark("The subscription group name cannot be empty");
return false;
}
return true;
}

private void initConsumerOffset(String clientHost, String groupName, int mode, TopicConfig topicConfig)
throws ConsumeQueueException {
String topic = topicConfig.getTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,7 @@
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.AclInfo;
import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.body.*;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
Expand Down Expand Up @@ -633,6 +626,30 @@ public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandExcepti
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testEmptyNameWhenUpdateSubGroup() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("");
request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT);
}

@Test
public void testEmptyNameWhenUpdateSubGroupList() throws RemotingCommandException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST, null);
SubscriptionGroupList groupList = new SubscriptionGroupList();
List<SubscriptionGroupConfig> list = new ArrayList<>();
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName("");
list.add(subscriptionGroupConfig);
groupList.setGroupConfigList(list);
request.setBody(JSON.toJSON(groupList).toString().getBytes());
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_ARGUMENT);
}

@Test
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
initRocksdbSubscriptionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class ResponseCode extends RemotingSysResponseCode {

public static final int ILLEGAL_OPERATION = 604;

public static final int ILLEGAL_ARGUMENT = 605;

public static final int RPC_UNKNOWN = -1000;
public static final int RPC_ADDR_IS_NULL = -1002;
public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
Expand Down

0 comments on commit f6cc7d0

Please sign in to comment.