Skip to content

Commit

Permalink
[ISSUE apache#8276] Merge duplicate code in DefaultMQProducer constru…
Browse files Browse the repository at this point in the history
…ctor (apache#8277)
  • Loading branch information
yx9o authored Jun 12, 2024
1 parent b96d6b9 commit 1a73e01
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.client.producer;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
Expand All @@ -46,11 +39,19 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;

/**
* This class is the entry point for applications intending to send messages. </p>
Expand Down Expand Up @@ -210,9 +211,7 @@ public DefaultMQProducer(RPCHook rpcHook) {
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String producerGroup) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, null);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
this(producerGroup, (RPCHook) null);
}

/**
Expand All @@ -222,10 +221,7 @@ public DefaultMQProducer(final String producerGroup) {
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
this(producerGroup, rpcHook, null);
}

/**
Expand All @@ -237,8 +233,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
final List<String> topics) {
this(producerGroup, rpcHook);
this.topics = topics;
this(producerGroup, rpcHook, topics, false, null);
}

/**
Expand All @@ -264,9 +259,7 @@ public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, fin
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(producerGroup, rpcHook);
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
this(producerGroup, rpcHook, null, enableMsgTrace, customizedTraceTopic);
}

/**
Expand All @@ -282,8 +275,13 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean en
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List<String> topics,
boolean enableMsgTrace, final String customizedTraceTopic) {
this(producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
this.topics = topics;
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@
*/
package org.apache.rocketmq.client.producer;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
Expand All @@ -41,9 +28,11 @@
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
Expand All @@ -58,13 +47,33 @@
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -596,4 +605,61 @@ public void run() {
}
return assertionErrors[0];
}

@Test
public void assertCreateDefaultMQProducer() {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
assertNotNull(producer1);
assertEquals(producerGroupTemp, producer1.getProducerGroup());
assertNotNull(producer1.getDefaultMQProducerImpl());
assertTrue(producer1.getTotalBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxDelayMs() > 0);
assertNull(producer1.getTopics());
assertFalse(producer1.isEnableTrace());
assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class));
assertNotNull(producer2);
assertEquals(producerGroupTemp, producer2.getProducerGroup());
assertNotNull(producer2.getDefaultMQProducerImpl());
assertTrue(producer2.getTotalBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxDelayMs() > 0);
assertNull(producer2.getTopics());
assertFalse(producer2.isEnableTrace());
assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
DefaultMQProducer producer3 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"));
assertNotNull(producer3);
assertEquals(producerGroupTemp, producer3.getProducerGroup());
assertNotNull(producer3.getDefaultMQProducerImpl());
assertTrue(producer3.getTotalBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxDelayMs() > 0);
assertNotNull(producer3.getTopics());
assertEquals(1, producer3.getTopics().size());
assertFalse(producer3.isEnableTrace());
assertTrue(UtilAll.isBlank(producer3.getTraceTopic()));
DefaultMQProducer producer4 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), true, "custom_trace_topic");
assertNotNull(producer4);
assertEquals(producerGroupTemp, producer4.getProducerGroup());
assertNotNull(producer4.getDefaultMQProducerImpl());
assertTrue(producer4.getTotalBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxDelayMs() > 0);
assertNull(producer4.getTopics());
assertTrue(producer4.isEnableTrace());
assertEquals("custom_trace_topic", producer4.getTraceTopic());
DefaultMQProducer producer5 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"), true, "custom_trace_topic");
assertNotNull(producer5);
assertEquals(producerGroupTemp, producer5.getProducerGroup());
assertNotNull(producer5.getDefaultMQProducerImpl());
assertTrue(producer5.getTotalBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxDelayMs() > 0);
assertNotNull(producer5.getTopics());
assertEquals(1, producer5.getTopics().size());
assertTrue(producer5.isEnableTrace());
assertEquals("custom_trace_topic", producer5.getTraceTopic());
}
}

0 comments on commit 1a73e01

Please sign in to comment.