Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support zkAddress updated on flight #774

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/java/com/ctrip/xpipe/config/ConfigKeyListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.ctrip.xpipe.config;

/**
* @author lishanglin
* date 2024/3/8
*/
public interface ConfigKeyListener {

void onChange(String key, String val);

}
3 changes: 3 additions & 0 deletions core/src/main/java/com/ctrip/xpipe/config/ZkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
*/
public interface ZkConfig {

String KEY_ZK_ADDRESS = "zk.address";
String KEY_ZK_NAMESPACE = "zk.namespace";

String getZkConnectionString();

String getZkNameSpace();
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/java/com/ctrip/xpipe/spring/AbstractProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,10 @@


protected ZkClient getZkClient(String zkNameSpace, String zkAddress){

DefaultZkClient zkClient = new DefaultZkClient();

DefaultZkConfig zkConfig = new DefaultZkConfig();
DefaultZkConfig zkConfig = new DefaultZkConfig(zkAddress);

Check warning on line 26 in core/src/main/java/com/ctrip/xpipe/spring/AbstractProfile.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/spring/AbstractProfile.java#L26

Added line #L26 was not covered by tests
zkConfig.setZkNameSpace(zkNameSpace);

zkClient.setZkConfig(zkConfig);
zkClient.setZkAddress(zkAddress);
return zkClient;

return new DefaultZkClient(zkConfig);

Check warning on line 28 in core/src/main/java/com/ctrip/xpipe/spring/AbstractProfile.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/spring/AbstractProfile.java#L28

Added line #L28 was not covered by tests
}

}
3 changes: 2 additions & 1 deletion core/src/main/java/com/ctrip/xpipe/zk/ZkClient.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.ctrip.xpipe.zk;

import com.ctrip.xpipe.config.ConfigKeyListener;
import org.apache.curator.framework.CuratorFramework;

/**
* @author marsqing
*
* Jun 16, 2016 12:05:46 PM
*/
public interface ZkClient{
public interface ZkClient extends ConfigKeyListener {

CuratorFramework get();

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/com/ctrip/xpipe/zk/ZkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ public interface ZkConfig {
int getZkSessionTimeoutMillis();

int waitForZkConnectedMillis();

CuratorFramework create(String address) throws InterruptedException;

String getZkAddress();

void updateZkAddress(String address);

CuratorFramework create() throws InterruptedException;

}
27 changes: 16 additions & 11 deletions core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
public class DefaultZkClient extends AbstractLifecycle implements ZkClient, TopElement, Lifecycle {

private ZkConfig zkConfig = new DefaultZkConfig();
private ZkConfig zkConfig;

private CuratorFramework client;

private String address;

public DefaultZkClient(ZkConfig zkConfig) {
this.zkConfig = zkConfig;
}

Check warning on line 25 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L23-L25

Added lines #L23 - L25 were not covered by tests

protected void doInitialize() throws Exception {

Expand All @@ -29,15 +31,22 @@
@Override
protected void doStart() throws Exception {

logger.info("[doStart]{}", address);
client= zkConfig.create(address);
logger.info("[doStart]{}", this.zkConfig.getZkAddress());
client= zkConfig.create();

Check warning on line 35 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L34-L35

Added lines #L34 - L35 were not covered by tests
}


@Override
protected void doStop() throws Exception {
client.close();
}

@Override
public void onChange(String key, String val) {
if (key.equalsIgnoreCase(com.ctrip.xpipe.config.ZkConfig.KEY_ZK_ADDRESS)) {
this.zkConfig.updateZkAddress(val);

Check warning on line 47 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L47

Added line #L47 was not covered by tests
}
}

Check warning on line 49 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L49

Added line #L49 was not covered by tests

@Override
public CuratorFramework get() {
Expand All @@ -46,16 +55,12 @@

@Override
public void setZkAddress(String address) {
this.address = address;
this.zkConfig.updateZkAddress(address);

Check warning on line 58 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L58

Added line #L58 was not covered by tests
}

@Override
public String getZkAddress(){
return this.address;
}

public void setZkConfig(ZkConfig zkConfig) {
this.zkConfig = zkConfig;
return this.zkConfig.getZkAddress();

Check warning on line 63 in core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkClient.java#L63

Added line #L63 was not covered by tests
}

@Override
Expand Down
28 changes: 23 additions & 5 deletions core/src/main/java/com/ctrip/xpipe/zk/impl/DefaultZkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.zk.ZkConfig;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
Expand All @@ -27,7 +29,23 @@ public class DefaultZkConfig implements ZkConfig{
private int zkConnectionTimeoutMillis = Integer.parseInt(System.getProperty("ZK.CONN.TIMEOUT", "3000"));
private int zkRetries = 3;
private String zkNameSpace = System.getProperty(KEY_ZK_NAMESPACE, DEFAULT_ZK_NAMESPACE);

private EnsembleProvider ensembleProvider;

public DefaultZkConfig(String address) {
this.ensembleProvider = new FixedEnsembleProvider(address, true);
}

@Override
public void updateZkAddress(String address) {
logger.info("[updateZkAddress] {} -> {}", this.ensembleProvider.getConnectionString(), address);
this.ensembleProvider.setConnectionString(address);
}

@Override
public String getZkAddress() {
return this.ensembleProvider.getConnectionString();
}

@Override
public int getZkConnectionTimeoutMillis() {
return zkConnectionTimeoutMillis;
Expand Down Expand Up @@ -80,18 +98,18 @@ public int waitForZkConnectedMillis() {
}

@Override
public CuratorFramework create(String address) throws InterruptedException {
public CuratorFramework create() throws InterruptedException {

Builder builder = CuratorFrameworkFactory.builder();
builder.connectionTimeoutMs(getZkConnectionTimeoutMillis());
builder.connectString(address);
builder.ensembleProvider(this.ensembleProvider);
builder.maxCloseWaitMs(getZkCloseWaitMillis());
builder.namespace(getZkNamespace());
builder.retryPolicy(new RetryNTimes(getZkRetries(), getSleepMsBetweenRetries()));
builder.sessionTimeoutMs(getZkSessionTimeoutMillis());
builder.threadFactory(XpipeThreadFactory.create("Xpipe-ZK-" + address, true));
builder.threadFactory(XpipeThreadFactory.create("Xpipe-ZK-" + this.ensembleProvider.getConnectionString(), true));

logger.info("[create]{}, {}", Codec.DEFAULT.encode(this), address);
logger.info("[create]{}, {}", Codec.DEFAULT.encode(this), this.ensembleProvider.getConnectionString());
CuratorFramework curatorFramework = builder.build();
curatorFramework.start();
curatorFramework.blockUntilConnected(waitForZkConnectedMillis(), TimeUnit.MILLISECONDS);
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
public class SpringZkClient implements ZkClient{

private ZkConfig zkConfig;
private String zkAddress;
private CuratorFramework curatorFramework;

public SpringZkClient(ZkConfig zkConfig, String zkAddress){
public SpringZkClient(ZkConfig zkConfig){

Check warning on line 21 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L21

Added line #L21 was not covered by tests
this.zkConfig = zkConfig;
this.zkAddress = zkAddress;
}

@Override
public void onChange(String key, String val) {
if (key.equalsIgnoreCase(com.ctrip.xpipe.config.ZkConfig.KEY_ZK_ADDRESS)) {
this.zkConfig.updateZkAddress(val);

Check warning on line 28 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L28

Added line #L28 was not covered by tests
}
}

Check warning on line 30 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L30

Added line #L30 was not covered by tests

@PostConstruct
public void postContruct() throws InterruptedException {
curatorFramework = zkConfig.create(zkAddress);
curatorFramework = zkConfig.create();

Check warning on line 34 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L34

Added line #L34 was not covered by tests
}

@Override
Expand All @@ -37,12 +41,12 @@

@Override
public void setZkAddress(String zkAddress) {
this.zkAddress = zkAddress;
this.zkConfig.updateZkAddress(zkAddress);

Check warning on line 44 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L44

Added line #L44 was not covered by tests
}

@Override
public String getZkAddress() {
return zkAddress;
return this.zkConfig.getZkAddress();

Check warning on line 49 in core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/SpringZkClient.java#L49

Added line #L49 was not covered by tests
}

@PreDestroy
Expand Down
17 changes: 12 additions & 5 deletions core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

private volatile CuratorFramework client;

private ZkConfig zkConfig = new DefaultZkConfig();

public static final String ZK_ADDRESS_KEY = "zkAddress";

private String address = System.getProperty(ZK_ADDRESS_KEY, "127.0.0.1:2181");

private ZkConfig zkConfig = new DefaultZkConfig(address);

protected void doInitialize() throws Exception {

Expand All @@ -38,6 +38,13 @@
client = null;
}
}

@Override
public void onChange(String key, String val) {
if (key.equalsIgnoreCase(com.ctrip.xpipe.config.ZkConfig.KEY_ZK_ADDRESS)) {
this.zkConfig.updateZkAddress(val);

Check warning on line 45 in core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java#L45

Added line #L45 was not covered by tests
}
}

Check warning on line 47 in core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java#L47

Added line #L47 was not covered by tests

@Override
public synchronized CuratorFramework get() {
Expand All @@ -52,7 +59,7 @@
}

try {
client = zkConfig.create(address);
client = zkConfig.create();
return client;
} catch (InterruptedException e) {
logger.error("[get]", e);
Expand All @@ -62,12 +69,12 @@

@Override
public void setZkAddress(String address) {
this.address = address;
this.zkConfig.updateZkAddress(address);
}

@Override
public String getZkAddress(){
return this.address;
return this.zkConfig.getZkAddress();

Check warning on line 77 in core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/zk/impl/TestZkClient.java#L77

Added line #L77 was not covered by tests
}

public void setClient(CuratorFramework client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public class DefaultLeaderElectorTest extends AbstractTest{
public void beforeDefaultLeaderElectorTest() throws InterruptedException{
zkTestServer = startRandomZk();

ZkConfig config = new DefaultZkConfig();
client = config.create(String.format("localhost:%d", zkTestServer.getZkPort()));
ZkConfig config = new DefaultZkConfig(String.format("localhost:%d", zkTestServer.getZkPort()));
client = config.create();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public class DefaultZkConfigTest {

@Test
public void testGetZkConnectionTimeoutMillisDefaultValue() {
zkConfig = new DefaultZkConfig();
zkConfig = new DefaultZkConfig("localhost:2181");
Assert.assertEquals(3000, zkConfig.getZkConnectionTimeoutMillis());
}

@Test
public void testGetZkConnectionTimeoutMillis() {
System.setProperty("ZK.CONN.TIMEOUT", "1000");
zkConfig = new DefaultZkConfig();
zkConfig = new DefaultZkConfig("localhost:2181");
Assert.assertEquals(1000, zkConfig.getZkConnectionTimeoutMillis());
}

@Test
public void testGetZkSessionTimeoutMillisDefaultValue() {
zkConfig = new DefaultZkConfig();
zkConfig = new DefaultZkConfig("localhost:2181");
Assert.assertEquals(5000, zkConfig.getZkSessionTimeoutMillis());
}

@Test
public void testGetZkSessionTimeoutMillis() {
System.setProperty("ZK.SESSION.TIMEOUT", "1000");
zkConfig = new DefaultZkConfig();
zkConfig = new DefaultZkConfig("localhost:2181");
Assert.assertEquals(1000, zkConfig.getZkSessionTimeoutMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public class AbstractZkUsageTest extends AbstractTest{
@Before
public void beforeAbstractZkUsageTest() throws InterruptedException{

ZkConfig zkConfig = new DefaultZkConfig();
client = zkConfig.create(zkAddress);
ZkConfig zkConfig = new DefaultZkConfig(zkAddress);
client = zkConfig.create();
}

public CuratorFramework getClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DcClusterDelayMarkDown;
import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;
import com.ctrip.xpipe.redis.console.util.HickwallMetricInfo;
import com.ctrip.xpipe.redis.core.config.CoreConfig;
import com.ctrip.xpipe.redis.core.meta.QuorumConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DcClusterDelayMarkDown;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;
import com.ctrip.xpipe.redis.console.util.HickwallMetricInfo;
import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig;
import com.ctrip.xpipe.redis.core.meta.QuorumConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

@Bean
public ZkClient getZkClient(ConsoleConfig consoleConfig) {
return getZkClient(consoleConfig.getZkNameSpace(), consoleConfig.getZkConnectionString());
ZkClient zkClient = getZkClient(consoleConfig.getZkNameSpace(), consoleConfig.getZkConnectionString());
consoleConfig.addListener(zkClient);
return zkClient;

Check warning on line 26 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java#L24-L26

Added lines #L24 - L26 were not covered by tests
}

@Override
protected ZkClient getZkClient(String zkNameSpace, String zkAddress) {

DefaultZkConfig zkConfig = new DefaultZkConfig();
DefaultZkConfig zkConfig = new DefaultZkConfig(zkAddress);

Check warning on line 32 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java#L32

Added line #L32 was not covered by tests
zkConfig.setZkNameSpace(zkNameSpace);

SpringZkClient springZkClient = new SpringZkClient(zkConfig, zkAddress);
return springZkClient;
return new SpringZkClient(zkConfig);

Check warning on line 34 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/Production.java#L34

Added line #L34 was not covered by tests
}

@Bean
Expand Down
Loading
Loading