From 41407d04f19a191acf3eea512537aa13d26c8a5d Mon Sep 17 00:00:00 2001 From: xuanhusuo Date: Fri, 20 Mar 2020 16:39:36 +0800 Subject: [PATCH 1/4] =?UTF-8?q?pipeline=E7=BC=96=E8=BE=91=E9=A1=B5?= =?UTF-8?q?=E6=96=B0=E5=A2=9Ecanal=E4=BD=8D=E7=82=B9=E9=87=8D=E7=BD=AE?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../templates/home/screen/editPipeline.vm | 10 ++++- .../templates/home/screen/pipelineInfo.vm | 3 ++ .../home/module/action/PipelineAction.java | 7 ++++ .../web/home/module/screen/EditPipeline.java | 12 +++++- .../web/home/module/screen/PipelineInfo.java | 12 +++++- .../arbitrate/ArbitrateViewService.java | 8 ++++ .../impl/ArbitrateViewServiceImpl.java | 20 +++++++++- .../model/config/pipeline/Pipeline.java | 38 ++++++++++++++----- 8 files changed, 95 insertions(+), 15 deletions(-) diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm index 5a6eed836..f2a59f7a9 100644 --- a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm +++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm @@ -46,8 +46,7 @@ $control.setTemplate("home:navigation.vm") #editPipelineMessage ($pipelineGroup.name)#editPipelineMessage ($pipelineGroup.formPipelineError) - - + Select机器: +
+ #** diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm index 57c74ce1d..135ec1f68 100644 --- a/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm +++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/pipelineInfo.vm @@ -56,6 +56,9 @@ $control.setTemplate("home:navigation.vm") #set ($canalInfoURL = $homeModule.setTarget("canalList.vm").addQueryData("searchKey", $!pipeline.parameters.destinationName)) Canal名字:$!pipeline.parameters.destinationName + + 当前canal位点:$!pipeline.position + #** 主道消费端ID:$!pipeline.parameters.mainstemClientId diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java index c25cfcda1..2075495ac 100644 --- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java +++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/PipelineAction.java @@ -23,6 +23,7 @@ import javax.annotation.Resource; import javax.servlet.http.HttpSession; +import com.alibaba.otter.shared.arbitrate.ArbitrateViewService; import org.apache.commons.lang.ArrayUtils; import com.alibaba.citrus.service.form.CustomErrors; @@ -53,6 +54,9 @@ public class PipelineAction { @Resource(name = "channelService") private ChannelService channelService; + @Resource(name = "arbitrateViewService") + private ArbitrateViewService arbitrateViewService; + public void doAdd(@FormGroup("pipelineInfo") Group pipelineInfo, @FormGroup("pipelineParameterInfo") Group pipelineParameterInfo, @FormField(name = "formPipelineError", group = "pipelineInfo") CustomErrors err, @@ -188,6 +192,9 @@ public void doEdit(@FormGroup("pipelineInfo") Group pipelineInfo, try { pipelineService.modify(pipeline); + // 重置位点 + arbitrateViewService.updateCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getId().shortValue(), + pipeline.getPosition()); } catch (RepeatConfigureException rce) { err.setMessage("invalidPipelineName"); return; diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java index 23ea1d059..cb7fc15c3 100644 --- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java +++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/EditPipeline.java @@ -26,6 +26,8 @@ import com.alibaba.otter.manager.biz.config.node.NodeService; import com.alibaba.otter.manager.biz.config.pipeline.PipelineService; import com.alibaba.otter.manager.web.common.WebConstant; +import com.alibaba.otter.shared.arbitrate.ArbitrateViewService; +import com.alibaba.otter.shared.arbitrate.model.PositionEventData; import com.alibaba.otter.shared.common.model.config.channel.Channel; import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline; @@ -37,11 +39,13 @@ public class EditPipeline { private NodeService nodeService; @Resource(name = "channelService") private ChannelService channelService; + @Resource + private ArbitrateViewService arbitrateViewService; /** * 找到单个Channel,用于编辑Channel信息界面加载信息 * - * @param channelId + * @param pipelineId * @param context * @throws WebxException */ @@ -53,6 +57,12 @@ public void execute(@Param("pipelineId") Long pipelineId, Context context, Navig } Pipeline pipeline = pipelineService.findById(pipelineId); + // 返回canal当前位点信息 + PositionEventData positionEventData = arbitrateViewService.getCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getParameters(). + getMainstemClientId()); + if (null != positionEventData) { + pipeline.setPosition(positionEventData.getPosition()); + } context.put("pipeline", pipeline); context.put("nodes", nodeService.listAll()); } diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java index c882f68f5..d0661d060 100644 --- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java +++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/screen/PipelineInfo.java @@ -20,6 +20,8 @@ import com.alibaba.citrus.turbine.Context; import com.alibaba.citrus.turbine.dataresolver.Param; +import com.alibaba.otter.shared.arbitrate.ArbitrateViewService; +import com.alibaba.otter.shared.arbitrate.model.PositionEventData; import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline; import com.alibaba.otter.manager.biz.config.node.NodeService; import com.alibaba.otter.manager.biz.config.pipeline.PipelineService; @@ -32,9 +34,17 @@ public class PipelineInfo { @Resource(name = "nodeService") private NodeService nodeService; + @Resource(name = "arbitrateViewService") + private ArbitrateViewService arbitrateViewService; + public void execute(@Param("pipelineId") Long pipelineId, Context context) throws Exception { Pipeline pipeline = pipelineService.findById(pipelineId); - + // 返回canal当前位点信息 + PositionEventData positionEventData = arbitrateViewService.getCanalCursor(pipeline.getParameters().getDestinationName(), pipeline.getParameters(). + getMainstemClientId()); + if (null != positionEventData) { + pipeline.setPosition(positionEventData.getPosition()); + } context.put("pipeline", pipeline); context.put("nodes", nodeService.listAll()); } diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java index b632ecc0b..cbf3b40c4 100644 --- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java +++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/ArbitrateViewService.java @@ -64,4 +64,12 @@ public interface ArbitrateViewService { * 删除canal meta信息 */ void removeCanal(String destination); + + /** + * 更新canal位点 + * @param destination + * @param clientId + * @param position + */ + void updateCanalCursor(String destination, short clientId, String position); } diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java index 7c33d98cd..1411dd5dc 100644 --- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java +++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/ArbitrateViewServiceImpl.java @@ -31,6 +31,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import com.alibaba.otter.shared.arbitrate.ArbitrateViewService; @@ -48,6 +50,7 @@ import com.alibaba.otter.shared.common.utils.JsonUtils; import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx; import com.alibaba.otter.shared.common.utils.zookeeper.ZooKeeperx; +import com.alibaba.fastjson.JSONObject; /** * 查询当前的仲裁器的一些运行状态视图 @@ -56,7 +59,7 @@ * @version 4.0.0 */ public class ArbitrateViewServiceImpl implements ArbitrateViewService { - + private static final Logger LOGGER = LoggerFactory.getLogger(ArbitrateViewServiceImpl.class); private static final String CANAL_PATH = "/otter/canal/destinations/%s"; private static final String CANAL_DATA_PATH = CANAL_PATH + "/%s"; private static final String CANAL_CURSOR_PATH = CANAL_PATH + "/%s/cursor"; @@ -258,4 +261,19 @@ public void removeCanal(String destination) { zookeeper.deleteRecursive(path); } + @Override + public void updateCanalCursor(String destination, short clientId, String position) { + String path = String.format(CANAL_CURSOR_PATH, destination, String.valueOf(clientId)); + try { + IZkConnection connection = zookeeper.getConnection(); + ZooKeeper originZk = ((ZooKeeperx) connection).getZookeeper(); + PositionEventData positionEventData = getCanalCursor(destination, clientId); + LOGGER.info("updateCanalCursor origin positionInfo={}", JSONObject.toJSONString(positionEventData)); + originZk.setData(path, position.getBytes("UTF-8"), -1); + LOGGER.info("updateCanalCursor current positionInfo={}", getCanalCursor(destination, clientId)); + } catch (Exception e) { + LOGGER.error("updateCanalCursor exception", e); + } + } + } diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java index dcdbebb35..47c0c4c42 100644 --- a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java +++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/pipeline/Pipeline.java @@ -16,28 +16,38 @@ package com.alibaba.otter.shared.common.model.config.pipeline; -import java.io.Serializable; -import java.util.Date; -import java.util.List; - -import org.apache.commons.lang.builder.ToStringBuilder; - import com.alibaba.otter.shared.common.model.config.data.DataMediaPair; import com.alibaba.otter.shared.common.model.config.node.Node; import com.alibaba.otter.shared.common.utils.OtterToStringStyle; +import org.apache.commons.lang.builder.ToStringBuilder; + +import java.io.Serializable; +import java.util.Date; +import java.util.List; /** * 同步任务数据对象 - * - * @author jianghang 2011-8-31 下午07:35:38 + * + * @author xuanhusuo + * @since 2020年03月20日 */ public class Pipeline implements Serializable { private static final long serialVersionUID = 5055655233043393285L; private Long id; - private Long channelId; // 对应关联的channel唯一标示id + /** + * 对应关联的channel唯一标示id + */ + private Long channelId; private String name; - private String description; // 描述信息 + /** + * 位点 + */ + private String position; + /** + * 描述信息 + */ + private String description; private List selectNodes; private List extractNodes; private List loadNodes; @@ -70,6 +80,14 @@ public void setName(String name) { this.name = name; } + public String getPosition() { + return position; + } + + public void setPosition(String position) { + this.position = position; + } + public String getDescription() { return description; } From 8e22f576a112d46b71e542f53d7973d24d5b10bc Mon Sep 17 00:00:00 2001 From: xuanhusuo Date: Wed, 1 Apr 2020 14:13:00 +0800 Subject: [PATCH 2/4] =?UTF-8?q?pipeline=E7=BC=96=E8=BE=91=E9=A1=B5?= =?UTF-8?q?=E6=96=B0=E5=A2=9Ecanal=E4=BD=8D=E7=82=B9=E9=87=8D=E7=BD=AE?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../deployer/src/main/resources/webapp/WEB-INF/home/form.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml b/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml index 37ff205f8..cf9b4a734 100644 --- a/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml +++ b/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml @@ -106,6 +106,11 @@ + + + ${displayName} 不符合格式({"journalName":"","position":0,"timestamp":0}) + + 必须选择一个以上${displayName} From c4112707614741daf1518b3f6d20879d9fc55213 Mon Sep 17 00:00:00 2001 From: xuanhusuo Date: Fri, 3 Apr 2020 13:34:06 +0800 Subject: [PATCH 3/4] =?UTF-8?q?pipeline=E7=BC=96=E8=BE=91=E9=A1=B5?= =?UTF-8?q?=E6=96=B0=E5=A2=9Ecanal=E4=BD=8D=E7=82=B9=E9=87=8D=E7=BD=AE?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../deployer/src/main/resources/webapp/WEB-INF/home/form.xml | 5 +++++ .../resources/webapp/templates/home/screen/editPipeline.vm | 1 + 2 files changed, 6 insertions(+) diff --git a/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml b/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml index 37ff205f8..85d1a614f 100644 --- a/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml +++ b/manager/deployer/src/main/resources/webapp/WEB-INF/home/form.xml @@ -106,6 +106,11 @@ + + + ${displayName} 格式不正确 + + 必须选择一个以上${displayName} diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm index f2a59f7a9..0330b1831 100644 --- a/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm +++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/editPipeline.vm @@ -167,6 +167,7 @@ $control.setTemplate("home:navigation.vm") Canal位点重置: + 示例:{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.2","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.0","position":0,"serverId":0,"timestamp":0}}
From c202590c4deb944b0f38e0fb07cd9f0b49f2cfbc Mon Sep 17 00:00:00 2001 From: xuanhusuo Date: Fri, 3 Apr 2020 13:39:21 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=9A=82=E5=81=9C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/webapp/templates/home/screen/alarmRuleList.vm | 2 +- .../resources/webapp/templates/home/screen/alarmSystemList.vm | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm index 28d8bccec..73c4e8340 100644 --- a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm +++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmRuleList.vm @@ -167,7 +167,7 @@ $control.setTemplate("home:navigation.vm")
#set ($disableAllURL = $homeModule.setAction("AlarmRuleAction").addQueryData("pipelineId", $pipelineId).addQueryData("status", "disable").addQueryData("eventSubmitDoStatusByPipeline", "true").render()) - 全部暂停 + 全部暂停
#end #end diff --git a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm index 095b4693a..29835755e 100644 --- a/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm +++ b/manager/deployer/src/main/resources/webapp/templates/home/screen/alarmSystemList.vm @@ -113,7 +113,7 @@ $control.setTemplate("home:navigation.vm") |恢复 #else #set ($disableURL = $homeModule.setAction("AlarmRuleAction").addQueryData("alarmRuleId", $alarmRule.id).addQueryData("status", "disable").addQueryData("eventSubmitDoStatusSystem", "true").render()) - |暂停 + |暂停 #end #else #set ($enableURL = $homeModule.setAction("AlarmRuleAction").addQueryData("alarmRuleId", $alarmRule.id).addQueryData("status", "enable").addQueryData("pageIndex", $paginator.page).addQueryData("eventSubmitDoStatusSystem", "true").render())