Skip to content

Commit

Permalink
Optimize interface-level dynamic config
Browse files Browse the repository at this point in the history
  • Loading branch information
Narzisss committed Oct 14, 2024
1 parent 853ed3f commit 92351f2
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.sofa.rpc.client.Cluster;
import com.alipay.sofa.rpc.client.ClusterFactory;
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.SofaConfigs;
import com.alipay.sofa.rpc.common.SofaOptions;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static com.alipay.sofa.rpc.common.RpcConstants.REGISTRY_PROTOCOL_DOMAIN;
import static com.alipay.sofa.common.config.SofaConfigs.getOrDefault;

/**
* Default consumer bootstrap.
Expand Down Expand Up @@ -145,7 +147,8 @@ public T refer() {
// build cluster
cluster = ClusterFactory.getCluster(this);
// build listeners
consumerConfig.setConfigListener(buildConfigListener(this));
ConfigListener configListener = buildConfigListener(this);
consumerConfig.setConfigListener(configListener);
consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
// init cluster
cluster.init();
Expand All @@ -155,26 +158,23 @@ public T refer() {
proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
proxyInvoker);

//启用请求级别动态配置
//请求级别动态配置参数
final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
if (StringUtils.isNotBlank(dynamicAlias)) {
final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
consumerConfig.getAppName(), dynamicAlias);
consumerConfig.getAppName(), dynamicAlias);
dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
}

//启用接口级别动态配置
final String dynamicUrl = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_URL);
if (StringUtils.isNotBlank(dynamicUrl)) {
//接口级别动态配置参数
final String dynamicUrl = getOrDefault(DynamicConfigKeys.DYNAMIC_URL);
if ( StringUtils.isNotBlank(dynamicUrl)) {
//启用接口级别动态配置
final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl(
consumerConfig.getAppName(), dynamicUrl);

//build listeners for dynamic config
consumerConfig.setConfigListener(buildConfigListener(this,dynamicManager));
dynamicManager.addListener(consumerConfig.getInterfaceId(), configListener);
dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId(), configListener);
}



} catch (Exception e) {
if (cluster != null) {
cluster.destroy();
Expand Down Expand Up @@ -214,17 +214,6 @@ protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap) {
return new ConsumerAttributeListener();
}

/**
* Build ConfigListener for consumer bootstrap with dynamic config.
*
* @param bootstrap ConsumerBootstrap
* @param dynamicManager DynamicConfigManager
* @return ConfigListener
*/
protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap, DynamicConfigManager dynamicManager) {
return new ConsumerAttributeListener(dynamicManager);
}

/**
* Build ProviderInfoListener for consumer bootstrap.
*
Expand Down Expand Up @@ -463,36 +452,35 @@ private class ConsumerAttributeListener implements ConfigListener {

private Map<String, String> newValueMap = new HashMap<>();

ConsumerAttributeListener() {
// 动态配置项
private List<String> dynamicConfigKeys = Arrays.asList(RpcConstants.CONFIG_KEY_TIMEOUT,
RpcConstants.CONFIG_KEY_RETRIES, RpcConstants.CONFIG_KEY_LOADBALANCER);

}

ConsumerAttributeListener(DynamicConfigManager dynamicManager) {
this.initWith(consumerConfig.getInterfaceId(),dynamicManager);
}
ConsumerAttributeListener() {

public void initWith(String key, DynamicConfigManager dynamicManager) {
dynamicManager.addListener(key, this);
// 初始化配置值
String rawConfig = dynamicManager.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
process(new ConfigChangedEvent(key, "sofa-rpc",rawConfig));
}
}

@Override
public void process(ConfigChangedEvent event){
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
newValueMap = null;
} else {
public void process(ConfigChangedEvent event) {
for (String key : newValueMap.keySet()) {
newValueMap.put(key, "");
}
if (!event.getChangeType().equals(ConfigChangeType.DELETED)) {
// ADDED or MODIFIED
String[] lines = event.getContent().split("\n");
for (String line : lines) {
String[] keyValue = line.split("=", 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
newValueMap.put(key, value);
for (String dynamicConfigKey : dynamicConfigKeys) {
if (key.equals(dynamicConfigKey) || key.endsWith("." + dynamicConfigKey)) {
newValueMap.put(key, value);
break;
}
}
} else {
LOGGER.warn("Malformed configuration line: {}", line);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

import com.alipay.sofa.common.config.SofaConfigs;
import com.alipay.sofa.rpc.auth.AuthRuleGroup;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.dynamic.*;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.listener.ConfigListener;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
Expand All @@ -45,60 +44,83 @@
@Extension(value = "apollo", override = true)
public class ApolloDynamicConfigManager extends DynamicConfigManager {

Logger LOGGER = LoggerFactory.getLogger(ApolloDynamicConfigManager.class);

private static final String APOLLO_APPID_KEY = "app.id";

private static final String APOLLO_ADDR_KEY = "apollo.meta";

private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";

private static final String APOLLO_PROTOCOL_PREFIX = "http://";

private Config config;

private final ConcurrentMap<String, ApolloListener> watchListenerMap = new ConcurrentHashMap<>();

protected ApolloDynamicConfigManager(String appName) {
super(appName);
System.setProperty(APOLLO_APPID_KEY, appName);
System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + SofaConfigs.getOrDefault(DynamicConfigKeys.APOLLO_ADDRESS));
config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP);
super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.APOLLO_ADDRESS,""));
if (StringUtils.isNotBlank(appName)) {
System.setProperty(APOLLO_APPID_KEY, appName);
}
if (StringUtils.isNotBlank(getAddress())) {
System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getAddress());
}
config = ConfigService.getAppConfig();
}

protected ApolloDynamicConfigManager(String appName,String address) {
super(appName);
protected ApolloDynamicConfigManager(String appName, String remainUrl) {
super(appName, remainUrl);
System.setProperty(APOLLO_APPID_KEY, appName);
System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + address);
config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP);
System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getAddress());
String params[] = getParams();
if (params!= null && params.length > 0){
for (String param : params) {
String[] keyValue = param.split("=");
if (keyValue.length == 2) {
if ("cluster".equals(keyValue[0])) {
System.setProperty(APOLLO_CLUSTER_KEY, keyValue[1]);
}
}
}
}
config = ConfigService.getAppConfig();
}

@Override
public void initServiceConfiguration(String service) {
//TODO not now
// TODO 暂不支持
}

@Override
public void initServiceConfiguration(String service, ConfigListener listener) {
String rawConfig = config.getProperty(service, "");
if (StringUtils.isNotBlank(rawConfig)) {
listener.process(new ConfigChangedEvent(service, rawConfig));
}
}

@Override
public String getProviderServiceProperty(String service, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
}

@Override
public String getConsumerServiceProperty(String service, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);

}

@Override
public String getProviderMethodProperty(String service, String method, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
}

@Override
public String getConsumerMethodProperty(String service, String method, String key) {
return config.getProperty(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key),
DynamicHelper.DEFAULT_DYNAMIC_VALUE);
DynamicHelper.DEFAULT_DYNAMIC_VALUE);

}

Expand All @@ -109,12 +131,7 @@ public AuthRuleGroup getServiceAuthRule(String service) {
}

@Override
public String getConfig(String key){
return config.getProperty(key, DynamicHelper.DEFAULT_DYNAMIC_VALUE);
}

@Override
public void addListener(String key, ConfigListener listener){
public void addListener(String key, ConfigListener listener) {
ApolloListener apolloListener = watchListenerMap.computeIfAbsent(key, k -> new ApolloListener());
apolloListener.addListener(listener);
config.addChangeListener(apolloListener, Collections.singleton(key));
Expand All @@ -124,19 +141,15 @@ public class ApolloListener implements ConfigChangeListener {

private Set<ConfigListener> listeners = new CopyOnWriteArraySet<>();

ApolloListener() {}
ApolloListener() {
}

@Override
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
ConfigChange change = changeEvent.getChange(key);
if ("".equals(change.getNewValue())) {
LOGGER.info("an empty rule is received for key: " + key);
return;
}

ConfigChangedEvent event =
new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change));
new ConfigChangedEvent(key, change.getNewValue(), getChangeType(change));
listeners.forEach(listener -> listener.process(event));
}
}
Expand Down
Loading

0 comments on commit 92351f2

Please sign in to comment.