From 1a73e015b8c764a39d2a03a5a58bf26a78638986 Mon Sep 17 00:00:00 2001
From: yx9o
Date: Wed, 12 Jun 2024 09:10:33 +0800
Subject: [PATCH] [ISSUE #8276] Merge duplicate code in DefaultMQProducer
constructor (#8277)
---
.../client/producer/DefaultMQProducer.java | 42 ++++-----
.../producer/DefaultMQProducerTest.java | 92 ++++++++++++++++---
2 files changed, 99 insertions(+), 35 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 5304887e380..4fd038663b5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.client.producer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -46,11 +39,19 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
/**
* This class is the entry point for applications intending to send messages.
@@ -210,9 +211,7 @@ public DefaultMQProducer(RPCHook rpcHook) {
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
- this.producerGroup = producerGroup;
- defaultMQProducerImpl = new DefaultMQProducerImpl(this, null);
- produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ this(producerGroup, (RPCHook) null);
}
/**
@@ -222,10 +221,7 @@ public DefaultMQProducer(final String producerGroup) {
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
- this.producerGroup = producerGroup;
- this.rpcHook = rpcHook;
- defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
+ this(producerGroup, rpcHook, null);
}
/**
@@ -237,8 +233,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
final List topics) {
- this(producerGroup, rpcHook);
- this.topics = topics;
+ this(producerGroup, rpcHook, topics, false, null);
}
/**
@@ -264,9 +259,7 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
- this(producerGroup, rpcHook);
- this.enableTrace = enableMsgTrace;
- this.traceTopic = customizedTraceTopic;
+ this(producerGroup, rpcHook, null, enableMsgTrace, customizedTraceTopic);
}
/**
@@ -282,8 +275,13 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean en
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List topics,
boolean enableMsgTrace, final String customizedTraceTopic) {
- this(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
+ this.producerGroup = producerGroup;
+ this.rpcHook = rpcHook;
this.topics = topics;
+ this.enableTrace = enableMsgTrace;
+ this.traceTopic = customizedTraceTopic;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index d4153c7cd97..7e1fad62477 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -16,19 +16,6 @@
*/
package org.apache.rocketmq.client.producer;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -41,9 +28,11 @@
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
@@ -58,13 +47,33 @@
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -596,4 +605,61 @@ public void run() {
}
return assertionErrors[0];
}
+
+ @Test
+ public void assertCreateDefaultMQProducer() {
+ String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+ DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
+ assertNotNull(producer1);
+ assertEquals(producerGroupTemp, producer1.getProducerGroup());
+ assertNotNull(producer1.getDefaultMQProducerImpl());
+ assertTrue(producer1.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer1.getBatchMaxBytes() > 0);
+ assertTrue(producer1.getBatchMaxDelayMs() > 0);
+ assertNull(producer1.getTopics());
+ assertFalse(producer1.isEnableTrace());
+ assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
+ DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class));
+ assertNotNull(producer2);
+ assertEquals(producerGroupTemp, producer2.getProducerGroup());
+ assertNotNull(producer2.getDefaultMQProducerImpl());
+ assertTrue(producer2.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer2.getBatchMaxBytes() > 0);
+ assertTrue(producer2.getBatchMaxDelayMs() > 0);
+ assertNull(producer2.getTopics());
+ assertFalse(producer2.isEnableTrace());
+ assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
+ DefaultMQProducer producer3 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"));
+ assertNotNull(producer3);
+ assertEquals(producerGroupTemp, producer3.getProducerGroup());
+ assertNotNull(producer3.getDefaultMQProducerImpl());
+ assertTrue(producer3.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer3.getBatchMaxBytes() > 0);
+ assertTrue(producer3.getBatchMaxDelayMs() > 0);
+ assertNotNull(producer3.getTopics());
+ assertEquals(1, producer3.getTopics().size());
+ assertFalse(producer3.isEnableTrace());
+ assertTrue(UtilAll.isBlank(producer3.getTraceTopic()));
+ DefaultMQProducer producer4 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), true, "custom_trace_topic");
+ assertNotNull(producer4);
+ assertEquals(producerGroupTemp, producer4.getProducerGroup());
+ assertNotNull(producer4.getDefaultMQProducerImpl());
+ assertTrue(producer4.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer4.getBatchMaxBytes() > 0);
+ assertTrue(producer4.getBatchMaxDelayMs() > 0);
+ assertNull(producer4.getTopics());
+ assertTrue(producer4.isEnableTrace());
+ assertEquals("custom_trace_topic", producer4.getTraceTopic());
+ DefaultMQProducer producer5 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"), true, "custom_trace_topic");
+ assertNotNull(producer5);
+ assertEquals(producerGroupTemp, producer5.getProducerGroup());
+ assertNotNull(producer5.getDefaultMQProducerImpl());
+ assertTrue(producer5.getTotalBatchMaxBytes() > 0);
+ assertTrue(producer5.getBatchMaxBytes() > 0);
+ assertTrue(producer5.getBatchMaxDelayMs() > 0);
+ assertNotNull(producer5.getTopics());
+ assertEquals(1, producer5.getTopics().size());
+ assertTrue(producer5.isEnableTrace());
+ assertEquals("custom_trace_topic", producer5.getTraceTopic());
+ }
}