Skip to content

Commit

Permalink
[issue_689][taier-data-develop] merge master fixes #689
Browse files Browse the repository at this point in the history
  • Loading branch information
vainhope committed Aug 8, 2022
1 parent d412691 commit 9ce9886
Showing 1 changed file with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public Map<String, Object> readyForSyncImmediatelyJob(Task task, Long tenantId,
Map<String, Object> actionParam = Maps.newHashMap();
try {
List<DevelopTaskParam> taskParamsToReplace = developTaskParamService.getTaskParam(task.getId());
List<DevelopTaskParamShade> developTaskParamShades = developTaskParamService.convertShade(taskParamsToReplace);
readyForTaskStartTrigger(actionParam, tenantId, task, developTaskParamShades);
addConfPropAndParseJob(actionParam, tenantId, task, taskParamsToReplace);
String name = "run_sync_task_" + task.getName() + "_" + System.currentTimeMillis();
actionParam.put("taskSourceId", task.getId());
actionParam.put("taskType", EScheduleJobType.SYNC.getVal());
Expand Down Expand Up @@ -146,4 +145,25 @@ public String replaceSyncParallelism(String taskParams, int parallelism) throws
return sb.toString();
}


public void addConfPropAndParseJob(Map<String, Object> actionParam, Long tenantId, Task task, List<DevelopTaskParam> taskParamsToReplace) throws Exception {
String sql = task.getSqlText() == null ? "" : task.getSqlText();
String taskParams = task.getTaskParams();
JSONObject syncJob = JSON.parseObject(task.getSqlText());
taskParams = replaceSyncParallelism(taskParams, parseSyncChannel(syncJob));

String job = syncJob.getString("job");
// 向导模式根据job中的sourceId填充数据源信息,保证每次运行取到最新的连接信息
job = datasourceService.setJobDataSourceInfo(job, tenantId, syncJob.getIntValue("createModel"));

developTaskParamService.checkParams(developTaskParamService.checkSyncJobParams(job), taskParamsToReplace);

JSONObject confProp = new JSONObject();
taskDirtyDataManageService.buildTaskDirtyDataManageArgs(task.getTaskType(), task.getId(), confProp);
actionParam.put("job", job);
actionParam.put("sqlText", sql);
actionParam.put("taskParams", taskParams);
actionParam.put("confProp", confProp.toJSONString());
}

}

0 comments on commit 9ce9886

Please sign in to comment.