必须选择一个以上${displayName}
diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm
index 28d8bcce..73c4e834 100644
--- a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm
+++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm
@@ -167,7 +167,7 @@ $control.setTemplate("home:navigation.vm")
#set ($disableAllURL = $homeModule.setAction("AlarmRuleAction").addQueryData("pipelineId", $pipelineId).addQueryData("status", "disable").addQueryData("eventSubmitDoStatusByPipeline", "true").render())
-
全部暂停
+
全部暂停
#end
#end
diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm
index 095b4693..29835755 100644
--- a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm
+++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm
@@ -113,7 +113,7 @@ $control.setTemplate("home:navigation.vm")
|恢复
#else
#set ($disableURL = $homeModule.setAction("AlarmRuleAction").addQueryData("alarmRuleId", $alarmRule.id).addQueryData("status", "disable").addQueryData("eventSubmitDoStatusSystem", "true").render())
- |暂停
+ |暂停
#end
#else
#set ($enableURL = $homeModule.setAction("AlarmRuleAction").addQueryData("alarmRuleId", $alarmRule.id).addQueryData("status", "enable").addQueryData("pageIndex", $paginator.page).addQueryData("eventSubmitDoStatusSystem", "true").render())
diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm
index 5a6eed83..0330b183 100644
--- a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm
+++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm
@@ -46,8 +46,7 @@ $control.setTemplate("home:navigation.vm")
#editPipelineMessage ($pipelineGroup.name)#editPipelineMessage ($pipelineGroup.formPipelineError)
-
-
+
Select机器: |
|
+
+
+ Canal位点重置: |
+
+
+ 示例:{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.2","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.0","position":0,"serverId":0,"timestamp":0}}
+
+ |
#**
diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm
index 57c74ce1..135ec1f6 100644
--- a/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm
+++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm
@@ -56,6 +56,9 @@ $control.setTemplate("home:navigation.vm")
#set ($canalInfoURL = $homeModule.setTarget("canalList.vm").addQueryData("searchKey", $!pipeline.parameters.destinationName))
Canal名字: | $!pipeline.parameters.destinationName |
+
+ 当前canal位点: | $!pipeline.position |
+
#**
主道消费端ID: | $!pipeline.parameters.mainstemClientId |
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java
index c25cfcda..2075495a 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java
@@ -23,6 +23,7 @@
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
+import com.alibaba.otter.shared.arbitrate.ArbitrateViewService;
import org.apache.commons.lang.ArrayUtils;
import com.alibaba.citrus.service.form.CustomErrors;
@@ -53,6 +54,9 @@ public class PipelineAction {
@Resource(name = "channelService")
private ChannelService channelService;
+ @Resource(name = "arbitrateViewService")
+ private ArbitrateViewService arbitrateViewService;
+
public void doAdd(@FormGroup("pipelineInfo") Group pipelineInfo,
@FormGroup("pipelineParameterInfo") Group pipelineParameterInfo,
@FormField(name = "formPipelineError", group = "pipelineInfo") CustomErrors err,
@@ -188,6 +192,9 @@ public void doEdit(@FormGroup("pipelineInfo") Group pipelineInfo,
try {
pipelineService.modify(pipeline);
+ // 重置位点
+ arbitrateViewService.updateCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getId().shortValue(),
+ pipeline.getPosition());
} catch (RepeatConfigureException rce) {
err.setMessage("invalidPipelineName");
return;
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java
index 23ea1d05..cb7fc15c 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java
@@ -26,6 +26,8 @@
import com.alibaba.otter.manager.biz.config.node.NodeService;
import com.alibaba.otter.manager.biz.config.pipeline.PipelineService;
import com.alibaba.otter.manager.web.common.WebConstant;
+import com.alibaba.otter.shared.arbitrate.ArbitrateViewService;
+import com.alibaba.otter.shared.arbitrate.model.PositionEventData;
import com.alibaba.otter.shared.common.model.config.channel.Channel;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
@@ -37,11 +39,13 @@ public class EditPipeline {
private NodeService nodeService;
@Resource(name = "channelService")
private ChannelService channelService;
+ @Resource
+ private ArbitrateViewService arbitrateViewService;
/**
* 找到单个Channel,用于编辑Channel信息界面加载信息
*
- * @param channelId
+ * @param pipelineId
* @param context
* @throws WebxException
*/
@@ -53,6 +57,12 @@ public void execute(@Param("pipelineId") Long pipelineId, Context context, Navig
}
Pipeline pipeline = pipelineService.findById(pipelineId);
+ // 返回canal当前位点信息
+ PositionEventData positionEventData = arbitrateViewService.getCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getParameters().
+ getMainstemClientId());
+ if (null != positionEventData) {
+ pipeline.setPosition(positionEventData.getPosition());
+ }
context.put("pipeline", pipeline);
context.put("nodes", nodeService.listAll());
}
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java
index c882f68f..d0661d06 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java
@@ -20,6 +20,8 @@
import com.alibaba.citrus.turbine.Context;
import com.alibaba.citrus.turbine.dataresolver.Param;
+import com.alibaba.otter.shared.arbitrate.ArbitrateViewService;
+import com.alibaba.otter.shared.arbitrate.model.PositionEventData;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.manager.biz.config.node.NodeService;
import com.alibaba.otter.manager.biz.config.pipeline.PipelineService;
@@ -32,9 +34,17 @@ public class PipelineInfo {
@Resource(name = "nodeService")
private NodeService nodeService;
+ @Resource(name = "arbitrateViewService")
+ private ArbitrateViewService arbitrateViewService;
+
public void execute(@Param("pipelineId") Long pipelineId, Context context) throws Exception {
Pipeline pipeline = pipelineService.findById(pipelineId);
-
+ // 返回canal当前位点信息
+ PositionEventData positionEventData = arbitrateViewService.getCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getParameters().
+ getMainstemClientId());
+ if (null != positionEventData) {
+ pipeline.setPosition(positionEventData.getPosition());
+ }
context.put("pipeline", pipeline);
context.put("nodes", nodeService.listAll());
}
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java
index b632ecc0..cbf3b40c 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java
@@ -64,4 +64,12 @@ public interface ArbitrateViewService {
* 删除canal meta信息
*/
void removeCanal(String destination);
+
+ /**
+ * 更新canal位点
+ * @param destination
+ * @param clientId
+ * @param position
+ */
+ void updateCanalCursor(String destination, short clientId, String position);
}
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java
index 7c33d98c..1411dd5d 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java
@@ -31,6 +31,8 @@
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import com.alibaba.otter.shared.arbitrate.ArbitrateViewService;
@@ -48,6 +50,7 @@
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;
import com.alibaba.otter.shared.common.utils.zookeeper.ZooKeeperx;
+import com.alibaba.fastjson.JSONObject;
/**
* 查询当前的仲裁器的一些运行状态视图
@@ -56,7 +59,7 @@
* @version 4.0.0
*/
public class ArbitrateViewServiceImpl implements ArbitrateViewService {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArbitrateViewServiceImpl.class);
private static final String CANAL_PATH = "/otter/canal/destinations/%s";
private static final String CANAL_DATA_PATH = CANAL_PATH + "/%s";
private static final String CANAL_CURSOR_PATH = CANAL_PATH + "/%s/cursor";
@@ -258,4 +261,19 @@ public void removeCanal(String destination) {
zookeeper.deleteRecursive(path);
}
+ @Override
+ public void updateCanalCursor(String destination, short clientId, String position) {
+ String path = String.format(CANAL_CURSOR_PATH, destination, String.valueOf(clientId));
+ try {
+ IZkConnection connection = zookeeper.getConnection();
+ ZooKeeper originZk = ((ZooKeeperx) connection).getZookeeper();
+ PositionEventData positionEventData = getCanalCursor(destination, clientId);
+ LOGGER.info("updateCanalCursor origin positionInfo={}", JSONObject.toJSONString(positionEventData));
+ originZk.setData(path, position.getBytes("UTF-8"), -1);
+ LOGGER.info("updateCanalCursor current positionInfo={}", getCanalCursor(destination, clientId));
+ } catch (Exception e) {
+ LOGGER.error("updateCanalCursor exception", e);
+ }
+ }
+
}
diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java
index dcdbebb3..47c0c4c4 100644
--- a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java
+++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java
@@ -16,28 +16,38 @@
package com.alibaba.otter.shared.common.model.config.pipeline;
-import java.io.Serializable;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.alibaba.otter.shared.common.utils.OtterToStringStyle;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.List;
/**
* 同步任务数据对象
- *
- * @author jianghang 2011-8-31 下午07:35:38
+ *
+ * @author xuanhusuo
+ * @since 2020年03月20日
*/
public class Pipeline implements Serializable {
private static final long serialVersionUID = 5055655233043393285L;
private Long id;
- private Long channelId; // 对应关联的channel唯一标示id
+ /**
+ * 对应关联的channel唯一标示id
+ */
+ private Long channelId;
private String name;
- private String description; // 描述信息
+ /**
+ * 位点
+ */
+ private String position;
+ /**
+ * 描述信息
+ */
+ private String description;
private List selectNodes;
private List extractNodes;
private List loadNodes;
@@ -70,6 +80,14 @@ public void setName(String name) {
this.name = name;
}
+ public String getPosition() {
+ return position;
+ }
+
+ public void setPosition(String position) {
+ this.position = position;
+ }
+
public String getDescription() {
return description;
}