Skip to content

Commit

Permalink
Feat/threadpool ext (#1383)
Browse files Browse the repository at this point in the history
* feat: refine synchronized to reentrant

* feat: support thread pool extension

* add lock

* chore: format

* chore: format

* refactor: extract init method

* refactor: type

* fix: extension default value

* fix: virtual thread constructor with config

* chore: format

* bump sofa common tools to 1.3.15

* chore: add test case

* chore: format

* chore: cr

* fix:cr

* cr

* format lock

* fix

* format

* refactor: method name for get executor

---------

Co-authored-by: Lo1nt <[email protected]>
  • Loading branch information
Lo1nt and Lo1nt authored Jan 9, 2024
1 parent 9faa8b8 commit bd25a4e
Show file tree
Hide file tree
Showing 22 changed files with 522 additions and 57 deletions.
2 changes: 1 addition & 1 deletion all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
<slf4j.version>1.7.21</slf4j.version>
<sofa.common.tools.version>1.3.2</sofa.common.tools.version>
<sofa.common.tools.version>1.3.15</sofa.common.tools.version>
<javassist.version>3.29.2-GA</javassist.version>
<netty.version>4.1.44.Final</netty.version>
<hessian.version>3.5.2</hessian.version>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<junit.version>4.13.1</junit.version>
<!-- alipay libs -->
<bolt.version>1.6.6</bolt.version>
<sofa.common.tools.version>1.3.2</sofa.common.tools.version>
<sofa.common.tools.version>1.3.15</sofa.common.tools.version>
<tracer.version>3.0.8</tracer.version>
<lookout.version>1.4.1</lookout.version>
<!-- Build args -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,16 @@ public ClientTransport getAvailableClientTransport(ProviderInfo providerInfo) {
transport = uninitializedConnections.get(providerInfo);
if (transport != null) {
// 未初始化则初始化
synchronized (this) {
providerLock.lock();
try {
transport = uninitializedConnections.get(providerInfo);
if (transport != null) {
initClientTransport(consumerConfig.getInterfaceId(), providerInfo, transport);
uninitializedConnections.remove(providerInfo);
}
return getAvailableClientTransport(providerInfo);
} finally {
providerLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 规则id的配置形式:a,b,!c,d
Expand Down Expand Up @@ -49,18 +51,21 @@ public abstract class BeanIdMatchFilter extends Filter {
private List<String> excludeId;

private volatile boolean formatComplete;
private final Object formatLock = new Object();
protected final Lock lock = new ReentrantLock();

@Override
public boolean needToLoad(FilterInvoker invoker) {
AbstractInterfaceConfig config = invoker.config;
String invokerId = config.getId();
if (!formatComplete) {
synchronized (formatLock) {
lock.lock();
try {
if (!formatComplete) {
formatId(idRule);
formatComplete = true;
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common.threadpool;

import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.ext.Extensible;

import java.util.concurrent.Executor;

/**
*
* @author junyuan
* @version SofaExecutorFactory.java, v 0.1 2023年12月13日 20:02 junyuan Exp $
*/
@Extensible
public interface SofaExecutorFactory {

Executor createExecutor(String namePrefix, ServerConfig serverConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common.threadpool;

/**
*
* @author junyuan
* @version ThreadPoolConstant.java, v 0.1 2023年12月12日 14:01 junyuan Exp $
*/
public class ThreadPoolConstant {
public static final String DEFAULT_THREAD_NAME_PREFIX = "SOFA-RPC-DEFAULT";
public static final String BIZ_THREAD_NAME_PREFIX = "SEV-BOLT-BIZ-";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common.threadpool.extension;

import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.server.BusinessPool;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
*
* @author junyuan
* @version DefaultThreadPoolFactory.java, v 0.1 2023年12月11日 20:55 junyuan Exp $
*/
@Extension(value = "cached")
public class CachedThreadPoolFactory implements SofaExecutorFactory {

@Override
public Executor createExecutor(String namePrefix, ServerConfig serverConfig) {
ThreadPoolExecutor executor = BusinessPool.initPool(serverConfig);
executor.setThreadFactory(new NamedThreadFactory(namePrefix, serverConfig.isDaemon()));
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.common.threadpool.extension;

import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.ext.Extension;

import java.util.concurrent.Executor;

/**
*
* @author junyuan
* @version VirtualThreadPoolFactory.java, v 0.1 2023年12月12日 11:11 junyuan Exp $
*/
@Extension(value = "virtual")
public class VirtualThreadPoolFactory implements SofaExecutorFactory {

@Override
public Executor createExecutor(String namePrefix, ServerConfig serverConfig) {
// virtual thread does not support any configs now
return SofaVirtualThreadFactory.ofExecutorService(namePrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.alipay.sofa.rpc.config;

import com.alipay.sofa.common.config.SofaConfigs;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.config.RpcConfigKeys;
import com.alipay.sofa.rpc.common.utils.ExceptionUtils;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
Expand Down Expand Up @@ -96,7 +98,10 @@ public class ServerConfig extends AbstractIdConfig implements Serializable {
/**
* 线程池类型
*/
protected String threadPoolType = getStringValue(SERVER_POOL_TYPE);
protected String threadPoolType = SofaConfigs
.getOrCustomDefault(
RpcConfigKeys.SERVER_THREAD_POOL_TYPE /* 优先读取环境变量 */
, getStringValue(SERVER_POOL_TYPE) /* 兜底读json配置文件 */);

/**
* 业务线程池大小
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author bystander
Expand All @@ -37,6 +39,11 @@ public class DynamicConfigManagerFactory {
*/
private final static ConcurrentMap<String, DynamicConfigManager> ALL_DYNAMICS = new ConcurrentHashMap<String, DynamicConfigManager>();

/**
* 类锁
*/
private final static Lock classLock = new ReentrantLock();

/**
* slf4j Logger for this class
*/
Expand All @@ -49,13 +56,14 @@ public class DynamicConfigManagerFactory {
* @param alias 别名
* @return DynamicManager 实现
*/
public static synchronized DynamicConfigManager getDynamicManager(String appName, String alias) {
if (ALL_DYNAMICS.size() > 3) { // 超过3次 是不是配错了?
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Size of dynamic manager is greater than 3, Please check it!");
}
}
public static DynamicConfigManager getDynamicManager(String appName, String alias) {
classLock.lock();
try {
if (ALL_DYNAMICS.size() > 3) { // 超过3次 是不是配错了?
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Size of dynamic manager is greater than 3, Please check it!");
}
}
// 注意:RegistryConfig重写了equals方法,如果多个RegistryConfig属性一样,则认为是一个对象
DynamicConfigManager registry = ALL_DYNAMICS.get(alias);
if (registry == null) {
Expand All @@ -73,6 +81,8 @@ public static synchronized DynamicConfigManager getDynamicManager(String appName
} catch (Throwable e) {
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_LOAD_EXT, "DynamicConfigManager", alias),
e);
} finally {
classLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package com.alipay.sofa.rpc.server;

import com.alipay.sofa.rpc.common.threadpool.SofaExecutorFactory;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.ext.ExtensionLoaderFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -43,4 +46,11 @@ public static ThreadPoolExecutor initPool(ServerConfig serverConfig) {
return new ThreadPoolExecutor(minPoolSize, maxPoolSize, aliveTime, TimeUnit.MILLISECONDS, poolQueue);
}

public static Executor initExecutor(String executorName, ServerConfig serverConfig) {
Executor executor = ExtensionLoaderFactory.getExtensionLoader(SofaExecutorFactory.class)
.getExtension(serverConfig.getThreadPoolType())
.createExecutor(executorName, serverConfig);
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.ThreadPoolUtils;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 给用户配置的自定义业务线程池
Expand Down Expand Up @@ -87,20 +90,30 @@ public UserThreadPool(String uniqueThreadPoolName) {
/**
* 线程池
*/
@Deprecated
transient volatile ThreadPoolExecutor executor;
transient volatile Executor userExecutor;

private Lock lock = new ReentrantLock();

/**
* 初始化线程池
*/
public void init() {
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
userExecutor = buildExecutor();
}

protected Executor buildExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS,
ThreadPoolUtils.buildQueue(queueSize), new NamedThreadFactory(threadPoolName));
if (allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
if (prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
threadPoolExecutor.prestartAllCoreThreads();
}
return threadPoolExecutor;
}

/**
Expand Down Expand Up @@ -257,14 +270,28 @@ public UserThreadPool setKeepAliveTime(int keepAliveTime) {
*
* @return the executor
*/
@Deprecated
public ThreadPoolExecutor getExecutor() {
if (executor == null) {
synchronized (this) {
if (executor == null) {
Executor tmp = getUserExecutor();
if (tmp instanceof ThreadPoolExecutor) {
executor = (ThreadPoolExecutor) tmp;
}
}
return executor;
}

public Executor getUserExecutor() {
if (userExecutor == null) {
lock.lock();
try {
if (userExecutor == null) {
init();
}
} finally {
lock.unlock();
}
}
return executor;
return userExecutor;
}
}
Loading

0 comments on commit bd25a4e

Please sign in to comment.