diff --git a/all/pom.xml b/all/pom.xml index 8a88e5d43..880e138ff 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -56,7 +56,7 @@ 1.8 utf-8 1.7.21 - 1.3.2 + 1.3.15 3.29.2-GA 4.1.44.Final 3.5.2 diff --git a/bom/pom.xml b/bom/pom.xml index 224e520fa..5865f10c2 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -54,7 +54,7 @@ 4.13.1 1.6.6 - 1.3.2 + 1.3.15 3.0.8 1.4.1 diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AllConnectConnectionHolder.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AllConnectConnectionHolder.java index 8f9a2522b..856ded84d 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AllConnectConnectionHolder.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AllConnectConnectionHolder.java @@ -554,13 +554,16 @@ public ClientTransport getAvailableClientTransport(ProviderInfo providerInfo) { transport = uninitializedConnections.get(providerInfo); if (transport != null) { // 未初始化则初始化 - synchronized (this) { + providerLock.lock(); + try { transport = uninitializedConnections.get(providerInfo); if (transport != null) { initClientTransport(consumerConfig.getInterfaceId(), providerInfo, transport); uninitializedConnections.remove(providerInfo); } return getAvailableClientTransport(providerInfo); + } finally { + providerLock.unlock(); } } diff --git a/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/BeanIdMatchFilter.java b/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/BeanIdMatchFilter.java index c4c5214b1..40b14b715 100644 --- a/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/BeanIdMatchFilter.java +++ b/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/BeanIdMatchFilter.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * 规则id的配置形式:a,b,!c,d @@ -49,18 +51,21 @@ public abstract class BeanIdMatchFilter extends Filter { private List excludeId; private volatile boolean formatComplete; - private final Object formatLock = new Object(); + protected final Lock lock = new ReentrantLock(); @Override public boolean needToLoad(FilterInvoker invoker) { AbstractInterfaceConfig config = invoker.config; String invokerId = config.getId(); if (!formatComplete) { - synchronized (formatLock) { + lock.lock(); + try { if (!formatComplete) { formatId(idRule); formatComplete = true; } + } finally { + lock.unlock(); } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/SofaExecutorFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/SofaExecutorFactory.java new file mode 100644 index 000000000..847fe4d39 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/SofaExecutorFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.Extensible; + +import java.util.concurrent.Executor; + +/** + * + * @author junyuan + * @version SofaExecutorFactory.java, v 0.1 2023年12月13日 20:02 junyuan Exp $ + */ +@Extensible +public interface SofaExecutorFactory { + + Executor createExecutor(String namePrefix, ServerConfig serverConfig); +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/ThreadPoolConstant.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/ThreadPoolConstant.java new file mode 100644 index 000000000..ef9dd5b94 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/ThreadPoolConstant.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +/** + * + * @author junyuan + * @version ThreadPoolConstant.java, v 0.1 2023年12月12日 14:01 junyuan Exp $ + */ +public class ThreadPoolConstant { + public static final String DEFAULT_THREAD_NAME_PREFIX = "SOFA-RPC-DEFAULT"; + public static final String BIZ_THREAD_NAME_PREFIX = "SEV-BOLT-BIZ-"; + +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/CachedThreadPoolFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/CachedThreadPoolFactory.java new file mode 100644 index 000000000..06a05223a --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/CachedThreadPoolFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool.extension; + +import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; +import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.server.BusinessPool; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * + * @author junyuan + * @version DefaultThreadPoolFactory.java, v 0.1 2023年12月11日 20:55 junyuan Exp $ + */ +@Extension(value = "cached") +public class CachedThreadPoolFactory implements SofaExecutorFactory { + + @Override + public Executor createExecutor(String namePrefix, ServerConfig serverConfig) { + ThreadPoolExecutor executor = BusinessPool.initPool(serverConfig); + executor.setThreadFactory(new NamedThreadFactory(namePrefix, serverConfig.isDaemon())); + return executor; + } +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/VirtualThreadPoolFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/VirtualThreadPoolFactory.java new file mode 100644 index 000000000..38c293af6 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/threadpool/extension/VirtualThreadPoolFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool.extension; + +import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory; +import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.Extension; + +import java.util.concurrent.Executor; + +/** + * + * @author junyuan + * @version VirtualThreadPoolFactory.java, v 0.1 2023年12月12日 11:11 junyuan Exp $ + */ +@Extension(value = "virtual") +public class VirtualThreadPoolFactory implements SofaExecutorFactory { + + @Override + public Executor createExecutor(String namePrefix, ServerConfig serverConfig) { + // virtual thread does not support any configs now + return SofaVirtualThreadFactory.ofExecutorService(namePrefix); + } +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/ServerConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/ServerConfig.java index e48467249..f77f7c818 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/ServerConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/ServerConfig.java @@ -16,7 +16,9 @@ */ package com.alipay.sofa.rpc.config; +import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.config.RpcConfigKeys; import com.alipay.sofa.rpc.common.utils.ExceptionUtils; import com.alipay.sofa.rpc.common.utils.NetUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; @@ -96,7 +98,10 @@ public class ServerConfig extends AbstractIdConfig implements Serializable { /** * 线程池类型 */ - protected String threadPoolType = getStringValue(SERVER_POOL_TYPE); + protected String threadPoolType = SofaConfigs + .getOrCustomDefault( + RpcConfigKeys.SERVER_THREAD_POOL_TYPE /* 优先读取环境变量 */ + , getStringValue(SERVER_POOL_TYPE) /* 兜底读json配置文件 */); /** * 业务线程池大小 diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java index d4c2760c8..d3cee7245 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author bystander @@ -37,6 +39,11 @@ public class DynamicConfigManagerFactory { */ private final static ConcurrentMap ALL_DYNAMICS = new ConcurrentHashMap(); + /** + * 类锁 + */ + private final static Lock classLock = new ReentrantLock(); + /** * slf4j Logger for this class */ @@ -49,13 +56,14 @@ public class DynamicConfigManagerFactory { * @param alias 别名 * @return DynamicManager 实现 */ - public static synchronized DynamicConfigManager getDynamicManager(String appName, String alias) { - if (ALL_DYNAMICS.size() > 3) { // 超过3次 是不是配错了? - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Size of dynamic manager is greater than 3, Please check it!"); - } - } + public static DynamicConfigManager getDynamicManager(String appName, String alias) { + classLock.lock(); try { + if (ALL_DYNAMICS.size() > 3) { // 超过3次 是不是配错了? + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Size of dynamic manager is greater than 3, Please check it!"); + } + } // 注意:RegistryConfig重写了equals方法,如果多个RegistryConfig属性一样,则认为是一个对象 DynamicConfigManager registry = ALL_DYNAMICS.get(alias); if (registry == null) { @@ -73,6 +81,8 @@ public static synchronized DynamicConfigManager getDynamicManager(String appName } catch (Throwable e) { throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_LOAD_EXT, "DynamicConfigManager", alias), e); + } finally { + classLock.unlock(); } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/server/BusinessPool.java b/core/api/src/main/java/com/alipay/sofa/rpc/server/BusinessPool.java index f391b6b72..ab3581ac4 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/server/BusinessPool.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/server/BusinessPool.java @@ -16,9 +16,12 @@ */ package com.alipay.sofa.rpc.server; +import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory; import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -43,4 +46,11 @@ public static ThreadPoolExecutor initPool(ServerConfig serverConfig) { return new ThreadPoolExecutor(minPoolSize, maxPoolSize, aliveTime, TimeUnit.MILLISECONDS, poolQueue); } + public static Executor initExecutor(String executorName, ServerConfig serverConfig) { + Executor executor = ExtensionLoaderFactory.getExtensionLoader(SofaExecutorFactory.class) + .getExtension(serverConfig.getThreadPoolType()) + .createExecutor(executorName, serverConfig); + return executor; + } + } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/server/UserThreadPool.java b/core/api/src/main/java/com/alipay/sofa/rpc/server/UserThreadPool.java index 425651b6a..5eb20b539 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/server/UserThreadPool.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/server/UserThreadPool.java @@ -19,9 +19,12 @@ import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; import com.alipay.sofa.rpc.common.utils.ThreadPoolUtils; +import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * 给用户配置的自定义业务线程池 @@ -87,20 +90,30 @@ public UserThreadPool(String uniqueThreadPoolName) { /** * 线程池 */ + @Deprecated transient volatile ThreadPoolExecutor executor; + transient volatile Executor userExecutor; + + private Lock lock = new ReentrantLock(); /** * 初始化线程池 */ public void init() { - executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, + userExecutor = buildExecutor(); + } + + protected Executor buildExecutor() { + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, + TimeUnit.MILLISECONDS, ThreadPoolUtils.buildQueue(queueSize), new NamedThreadFactory(threadPoolName)); if (allowCoreThreadTimeOut) { - executor.allowCoreThreadTimeOut(true); + threadPoolExecutor.allowCoreThreadTimeOut(true); } if (prestartAllCoreThreads) { - executor.prestartAllCoreThreads(); + threadPoolExecutor.prestartAllCoreThreads(); } + return threadPoolExecutor; } /** @@ -257,14 +270,28 @@ public UserThreadPool setKeepAliveTime(int keepAliveTime) { * * @return the executor */ + @Deprecated public ThreadPoolExecutor getExecutor() { if (executor == null) { - synchronized (this) { - if (executor == null) { + Executor tmp = getUserExecutor(); + if (tmp instanceof ThreadPoolExecutor) { + executor = (ThreadPoolExecutor) tmp; + } + } + return executor; + } + + public Executor getUserExecutor() { + if (userExecutor == null) { + lock.lock(); + try { + if (userExecutor == null) { init(); } + } finally { + lock.unlock(); } } - return executor; + return userExecutor; } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/server/UserVirtualThreadPool.java b/core/api/src/main/java/com/alipay/sofa/rpc/server/UserVirtualThreadPool.java new file mode 100644 index 000000000..e8aaeb974 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/server/UserVirtualThreadPool.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.server; + +import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory; +import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + * @author junyuan + * @version UserVirtualThreadPool.java, v 0.1 2023年12月14日 14:17 junyuan Exp $ + */ +public class UserVirtualThreadPool extends UserThreadPool { + private static final AtomicInteger POOL_NAME_COUNTER = new AtomicInteger(0); + + /** + * 线程名字 + * + */ + private String threadPoolName; + + public UserVirtualThreadPool() { + this.threadPoolName = DEFAUT_POOL_NAME + "-" + POOL_NAME_COUNTER.getAndIncrement(); + } + + @Override + protected Executor buildExecutor() { + return ExtensionLoaderFactory.getExtensionLoader(SofaExecutorFactory.class) + .getExtension("virtual").createExecutor(threadPoolName, null); + } +} \ No newline at end of file diff --git a/core/api/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory b/core/api/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory new file mode 100644 index 000000000..aac91cad5 --- /dev/null +++ b/core/api/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory @@ -0,0 +1,2 @@ +cached=com.alipay.sofa.rpc.common.threadpool.extension.CachedThreadPoolFactory +virtual=com.alipay.sofa.rpc.common.threadpool.extension.VirtualThreadPoolFactory diff --git a/core/api/src/test/java/com/alipay/sofa/rpc/common/threadpool/ExecutorFactoryTest.java b/core/api/src/test/java/com/alipay/sofa/rpc/common/threadpool/ExecutorFactoryTest.java new file mode 100644 index 000000000..aabb5636e --- /dev/null +++ b/core/api/src/test/java/com/alipay/sofa/rpc/common/threadpool/ExecutorFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * + * @author junyuan + * @version ExecutorFactoryTest.java, v 0.1 2023年12月15日 10:59 junyuan Exp $ + */ +public class ExecutorFactoryTest { + + @Test + public void testBuildCachedPool() { + ServerConfig serverConfig = new ServerConfig(); + Executor executor = ExtensionLoaderFactory.getExtensionLoader(SofaExecutorFactory.class).getExtension("cached") + .createExecutor("test", serverConfig); + Assert.assertTrue(executor instanceof ThreadPoolExecutor); + + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + Assert.assertEquals(threadPoolExecutor.getCorePoolSize(), serverConfig.getCoreThreads()); + } +} diff --git a/core/api/src/test/java/com/alipay/sofa/rpc/config/UserThreadPoolManagerTest.java b/core/api/src/test/java/com/alipay/sofa/rpc/config/UserThreadPoolManagerTest.java index ddab64d26..810a61f2f 100644 --- a/core/api/src/test/java/com/alipay/sofa/rpc/config/UserThreadPoolManagerTest.java +++ b/core/api/src/test/java/com/alipay/sofa/rpc/config/UserThreadPoolManagerTest.java @@ -17,9 +17,12 @@ package com.alipay.sofa.rpc.config; import com.alipay.sofa.rpc.server.UserThreadPool; +import com.alipay.sofa.rpc.server.UserVirtualThreadPool; import org.junit.Assert; import org.junit.Test; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; public class UserThreadPoolManagerTest { @Test @@ -50,4 +53,29 @@ public void getUserThreadPoolMap() { userThreadPoolSet = UserThreadPoolManager.getUserThreadPoolSet(); Assert.assertEquals(4, userThreadPoolSet.size()); } + + @Test + public void userThreadPoolBuildTest() { + UserThreadPool userThreadPool = new UserVirtualThreadPool(); + Object result; + try { + result = userThreadPool.getExecutor(); + } catch (UnsupportedOperationException e) { + // jdk 21 以下, 这里应该抛出 UnsupportedOperationException + return; + } + Assert.assertNull(result); + } + + @Test + public void userThreadPoolCompatibleTest() { + UserThreadPool userThreadPool = new UserThreadPool(); + Object result; + result = userThreadPool.getExecutor(); + Assert.assertNotNull(result); + Executor executorService = userThreadPool.getUserExecutor(); + Assert.assertTrue(executorService instanceof ThreadPoolExecutor); + + Assert.assertEquals(executorService, result); + } } \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java index bba581abf..2b4934ce0 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java @@ -130,4 +130,14 @@ public class RpcConfigKeys { " The default filtering mode is STRICT.You can also set WARN or DISABLE", new String[] { "sofa_rpc_codec_serialize_checkMode" }); + /** + * biz thread pool type + */ + public static ConfigKey SERVER_THREAD_POOL_TYPE = ConfigKey + .build( + "sofa.rpc.server.thread.pool.type", + "cached", + false, + "specify biz thread pool implementation type", + new String[] { "sofa_rpc_server_thread_pool_type" }); } diff --git a/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/event/LookoutSubscriber.java b/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/event/LookoutSubscriber.java index 39b107872..51f292f92 100644 --- a/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/event/LookoutSubscriber.java +++ b/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/event/LookoutSubscriber.java @@ -78,8 +78,10 @@ public void onEvent(Event event) { ServerStartedEvent serverStartedEvent = (ServerStartedEvent) event; - rpcMetrics.collectThreadPool(serverStartedEvent.getServerConfig(), - serverStartedEvent.getThreadPoolExecutor()); + if (serverStartedEvent.getThreadPoolExecutor() != null) { + rpcMetrics.collectThreadPool(serverStartedEvent.getServerConfig(), + serverStartedEvent.getThreadPoolExecutor()); + } } else if (eventClass == ServerStoppedEvent.class) { ServerStoppedEvent serverStartedEvent = (ServerStoppedEvent) event; diff --git a/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/metrics/lookout/RpcLookoutId.java b/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/metrics/lookout/RpcLookoutId.java index f7642adfe..e9d406d78 100644 --- a/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/metrics/lookout/RpcLookoutId.java +++ b/metrics/metrics-lookout/src/main/java/com/alipay/sofa/rpc/metrics/lookout/RpcLookoutId.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author LiWei.Liangen @@ -36,10 +38,12 @@ public class RpcLookoutId { private final ConcurrentMap serverConfigIds = new ConcurrentHashMap(); private volatile Id consumerConfigId; - private final Object consumerConfigIdLock = new Object(); private volatile Id providerConfigId; - private final Object providerConfigIdLock = new Object(); + + private static final Lock classLock = new ReentrantLock(); + private final Lock consumerConfigIdLock = new ReentrantLock(); + private final Lock providerConfigIdLock = new ReentrantLock(); /** * create consumerId @@ -51,12 +55,15 @@ public Id fetchConsumerStatId(Map tags) { String key = tags.toString(); Id lookoutId = consumerIds.get(key); if (lookoutId == null) { - synchronized (RpcLookoutId.class) { + classLock.lock(); + try { lookoutId = consumerIds.get(key); if (lookoutId == null) { lookoutId = Lookout.registry().createId("rpc.consumer.service.stats", tags); consumerIds.put(key, lookoutId); } + } finally { + classLock.unlock(); } } return lookoutId; @@ -71,12 +78,15 @@ public Id fetchProviderStatId(Map tags) { String key = tags.toString(); Id lookoutId = providerIds.get(key); if (lookoutId == null) { - synchronized (RpcLookoutId.class) { + classLock.lock(); + try { lookoutId = providerIds.get(key); if (lookoutId == null) { lookoutId = Lookout.registry().createId("rpc.provider.service.stats", tags); providerIds.put(key, lookoutId); } + } finally { + classLock.unlock(); } } return lookoutId; @@ -84,10 +94,13 @@ public Id fetchProviderStatId(Map tags) { public Id fetchConsumerSubId() { if (consumerConfigId == null) { - synchronized (consumerConfigIdLock) { + consumerConfigIdLock.lock(); + try { if (consumerConfigId == null) { consumerConfigId = Lookout.registry().createId("rpc.consumer.info.stats"); } + } finally { + consumerConfigIdLock.unlock(); } } return consumerConfigId; @@ -95,10 +108,13 @@ public Id fetchConsumerSubId() { public Id fetchProviderPubId() { if (providerConfigId == null) { - synchronized (providerConfigIdLock) { + providerConfigIdLock.lock(); + try { if (providerConfigId == null) { providerConfigId = Lookout.registry().createId("rpc.provider.info.stats"); } + } finally { + providerConfigIdLock.unlock(); } } return providerConfigId; @@ -127,12 +143,15 @@ public Id fetchServerThreadPoolQueueSizeId(ServerConfig serverConfig) { private Id fetchServerConfigId(String key) { Id lookoutId = serverConfigIds.get(key); if (lookoutId == null) { - synchronized (RpcLookout.class) { + classLock.lock(); + try { lookoutId = serverConfigIds.get(key); if (lookoutId == null) { lookoutId = Lookout.registry().createId(key); serverConfigIds.put(key, lookoutId); } + } finally { + classLock.unlock(); } } return lookoutId; diff --git a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServer.java b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServer.java index d1b9cec9d..3a054d98c 100644 --- a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServer.java +++ b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServer.java @@ -20,6 +20,7 @@ import com.alipay.remoting.rpc.RpcServer; import com.alipay.sofa.rpc.common.cache.ReflectCache; import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; +import com.alipay.sofa.rpc.common.threadpool.ThreadPoolConstant; import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; @@ -40,6 +41,8 @@ import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -75,8 +78,14 @@ public class BoltServer implements Server { /** * 业务线程池 */ + @Deprecated protected ThreadPoolExecutor bizThreadPool; + /** + * 业务线程池, 也支持非池化的执行器 + */ + protected Executor bizExecutor; + /** * Invoker列表,接口--> Invoker */ @@ -85,20 +94,37 @@ public class BoltServer implements Server { @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; - // 启动线程池 - bizThreadPool = initThreadPool(serverConfig); + bizExecutor = initExecutor(serverConfig); + if (bizExecutor instanceof ThreadPoolExecutor) { + bizThreadPool = (ThreadPoolExecutor) bizExecutor; + } boltServerProcessor = new BoltServerProcessor(this); } - protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) { - ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig); - threadPool.setThreadFactory(new NamedThreadFactory( - "SEV-BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon())); - threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler()); + /** + * 指定类型初始化线程池 + * @param serverConfig + * @return + */ + protected Executor initExecutor(ServerConfig serverConfig) { + Executor executor = BusinessPool.initExecutor( + ThreadPoolConstant.BIZ_THREAD_NAME_PREFIX + serverConfig.getPort(), serverConfig); + if (executor instanceof ThreadPoolExecutor) { + configureThreadPoolExecutor((ThreadPoolExecutor) executor, serverConfig); + } + return executor; + } + + /** + * 针对 ThreadPoolExecutor 进行额外配置 + * @param executor + * @param serverConfig + */ + protected void configureThreadPoolExecutor(ThreadPoolExecutor executor, ServerConfig serverConfig) { + executor.setRejectedExecutionHandler(new SofaRejectedExecutionHandler()); if (serverConfig.isPreStartCore()) { // 初始化核心线程池 - threadPool.prestartAllCoreThreads(); + executor.prestartAllCoreThreads(); } - return threadPool; } @Override @@ -204,28 +230,73 @@ public void destroy() { return; } int stopTimeout = serverConfig.getStopTimeout(); - if (stopTimeout > 0) { // 需要等待结束时间 - AtomicInteger count = boltServerProcessor.processingCount; - // 有正在执行的请求 或者 队列里有请求 - if (count.get() > 0 || bizThreadPool.getQueue().size() > 0) { - long start = RpcRuntimeContext.now(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("There are {} call in processing and {} call in queue, wait {} ms to end", - count, bizThreadPool.getQueue().size(), stopTimeout); - } - while ((count.get() > 0 || bizThreadPool.getQueue().size() > 0) - && RpcRuntimeContext.now() - start < stopTimeout) { // 等待返回结果 - try { - Thread.sleep(10); - } catch (InterruptedException ignore) { - } + destroyThreadPool(bizExecutor, stopTimeout); + stop(); + } + + /** + * 如果未设置有效的 stopWaitTime, 将直接触发 shutdown + * @param executor + * @param stopWaitTime + */ + private void destroyThreadPool(Executor executor, int stopWaitTime) { + if (stopWaitTime > 0) { + if (executor instanceof ThreadPoolExecutor) { + threadPoolExecutorDestroy((ThreadPoolExecutor) executor, stopWaitTime); + } else if (executor instanceof ExecutorService) { + executorServiceDestroy((ExecutorService) executor, stopWaitTime); + } + } + } + + /** + * 将在 stopWaitTime 时限到期时强制 shutdown + * @param executor + * @param stopWaitTime + */ + private void threadPoolExecutorDestroy(ThreadPoolExecutor executor, int stopWaitTime) { + AtomicInteger count = boltServerProcessor.processingCount; + // 有正在执行的请求 或者 队列里有请求 + if (count.get() > 0 || executor.getQueue().size() > 0) { + long start = RpcRuntimeContext.now(); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("There are {} call in processing and {} call in queue, wait {} ms to end", + count, executor.getQueue().size(), stopWaitTime); + } + while ((count.get() > 0 || executor.getQueue().size() > 0) + && RpcRuntimeContext.now() - start < stopWaitTime) { // 等待返回结果 + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { } - } // 关闭前检查已有请求? + } } + executor.shutdown(); + } - // 关闭线程池 - bizThreadPool.shutdown(); - stop(); + /** + * 针对 ExecutorService, shutdown 后仍然会处理 queue 内任务, 不用判断 queue + * @param executorService + * @param stopWaitTime + */ + private void executorServiceDestroy(ExecutorService executorService, int stopWaitTime) { + AtomicInteger count = boltServerProcessor.processingCount; + // 有正在执行的请求 或者 队列里有请求 + if (count.get() > 0) { + long start = RpcRuntimeContext.now(); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("There are {} call in processing, wait {} ms to end", + count, stopWaitTime); + } + while ((count.get() > 0) + && RpcRuntimeContext.now() - start < stopWaitTime) { // 等待返回结果 + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } + } + } + executorService.shutdown(); } @Override @@ -244,10 +315,15 @@ public void destroy(DestroyHook hook) { * * @return 业务线程池 */ + @Deprecated public ThreadPoolExecutor getBizThreadPool() { return bizThreadPool; } + public Executor getBizExecutor() { + return bizExecutor; + } + /** * 找到服务端Invoker * @@ -269,4 +345,13 @@ public void cleanReflectCache(ProviderConfig providerConfig) { ReflectCache.invalidateMethodSigsCache(key); ReflectCache.invalidateOverloadMethodCache(key); } + + @Deprecated + protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) { + ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig); + threadPool.setThreadFactory(new NamedThreadFactory( + ThreadPoolConstant.BIZ_THREAD_NAME_PREFIX + serverConfig.getPort(), serverConfig.isDaemon())); + configureThreadPoolExecutor(threadPool, serverConfig); + return threadPool; + } } diff --git a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java index e03563614..5939d7683 100644 --- a/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java +++ b/remoting/remoting-bolt/src/main/java/com/alipay/sofa/rpc/server/bolt/BoltServerProcessor.java @@ -332,7 +332,7 @@ public String interest() { @Override public Executor getExecutor() { - return boltServer.getBizThreadPool(); + return boltServer.getBizExecutor(); } @Override @@ -361,7 +361,7 @@ public Executor select(String requestClass, Object requestHeader) { if (service != null) { UserThreadPool threadPool = UserThreadPoolManager.getUserThread(service); if (threadPool != null) { - Executor executor = threadPool.getExecutor(); + Executor executor = threadPool.getUserExecutor(); if (executor != null) { // 存在自定义线程池,且不为空 return executor; diff --git a/remoting/remoting-bolt/src/test/java/com/alipay/sofa/rpc/server/bolt/BoltServerTest.java b/remoting/remoting-bolt/src/test/java/com/alipay/sofa/rpc/server/bolt/BoltServerTest.java index 148cd6b00..066aa89ad 100644 --- a/remoting/remoting-bolt/src/test/java/com/alipay/sofa/rpc/server/bolt/BoltServerTest.java +++ b/remoting/remoting-bolt/src/test/java/com/alipay/sofa/rpc/server/bolt/BoltServerTest.java @@ -75,4 +75,29 @@ public void start() throws Exception { server.destroy(); } + @Test + public void threadPoolDestroyTest() { + String host = "127.0.0.1"; + int port = 17702; + ServerConfig serverConfig = new ServerConfig(); + serverConfig.setBoundHost(host); + serverConfig.setPort(port); + serverConfig.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT); + + BoltServer server = new BoltServer(); + server.init(serverConfig); + server.start(); + Assert.assertTrue(server.started); + Assert.assertTrue(NetUtils.canTelnet(host, port, 1000)); + + server.destroy(); + } + + @Test + public void testDeprecatedInitThreadExecutor() { + BoltServer server = new BoltServer(); + server.initThreadPool(new ServerConfig()); + server.destroy(); + } + } \ No newline at end of file