diff --git a/flink-web-console/flink-admin.sql b/flink-web-console/flink-admin.sql index 8ff5a1dbf9a094b36fc52be47926a46f1424295a..9d19276479b8c57b6a17fb3d2321210efd62b7fa 100644 --- a/flink-web-console/flink-admin.sql +++ b/flink-web-console/flink-admin.sql @@ -86,6 +86,9 @@ CREATE TABLE `cluster` ( `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '名称', `url` varchar(300) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '集群地址', `enable_flag` tinyint(4) NULL DEFAULT NULL COMMENT '状态', + `check_point_path` varchar(128) NULL COMMENT 'checkpoint', + `save_point_path` varchar(128) NULL COMMENT 'savepoint', + `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java index 0b38c05b44f0e98db1048aef2e0ecd653757e152..650d3cf16217635204bb8d60b697de96835fede5 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java @@ -29,7 +29,7 @@ public interface JobMapper extends Mapper { @Select("SELECT job.id ,job.job_id jobId, fc.url flinkColonyUrl" + " FROM job LEFT JOIN cluster fc ON job.flink_colony_id = fc.id " + "WHERE job.enable_flag = 1 AND job.status != 0 ") - List selectAllRuningJob(); + List selectAllRunningJob(); @Select("SELECT id FROM job WHERE job.enable_flag = 1 AND job.entry_class = #{ entryClass } ") diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java index 1fd63d6315ea491d61ecfa98a02ac8e9d886a92c..88e5c7e2fa313480f2e5a528c8e2caf04476ffcb 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java @@ -30,11 +30,20 @@ public class BaseJob implements Serializable { private Integer enableFlag; + /** + * 集群ID,@see "cluster.id" + */ private Integer flinkColonyId; + /** + * @see "cluster.name" + */ @Transient private String flinkColonyName; + /** + * 集群 web url, @see "cluster.url" + */ @Transient private String flinkColonyUrl; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java index 0b906ef6a43ac8eb8127f20ac5e70ef9cddc08c4..60d3cf78378ace0a41d2926f41bcde1bbdb64216 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java @@ -24,6 +24,16 @@ public class Cluster implements Serializable { private Integer enableFlag; + /** + * 不同的集群可能使用不同的 State Backend机制. + */ + private String checkPointPath; + + /** + * 不同的集群可能使用不同的 State Backend机制. + */ + private String savePointPath; + public Cluster(Integer id) { this.id = id; } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java index a1a4777aee3038d41561c1e16422712b4519cf5e..e764ddbb850783bce3cdd16c102530310dfcf33a 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java @@ -54,7 +54,7 @@ public class JobApi { * @param jobId {"request-id":"4ff40b1af9aad8e18222a442b705b472"} * @return */ - public String savepoint(String flinkUrl, String jobId) { + public String savepoint(String flinkUrl, String jobId, String savepointPath) { String url = flinkUrl + JOBS_RUL_PREFIX + jobId + "/savepoints"; Map params = new HashMap(); params.put("target-directory", savepointPath); @@ -84,9 +84,13 @@ public class JobApi { * @return */ public String stopJob(String flinkUrl, String jobId) { + return stopJob(flinkUrl, jobId, savepointPath); + } + + public String stopJob(String flinkUrl, String jobId, String checkPointPath) { String url = flinkUrl + JOBS_RUL_PREFIX + jobId + "/stop"; - Map params = new HashMap(); - params.put("targetDirectory", savepointPath); + Map params = new HashMap(63); + params.put("targetDirectory", checkPointPath); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity entity = new HttpEntity<>(JSON.toJSONString(params), headers); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java index 4c3842e8de7b8cb57b25ebcf2c72776158881c12..9927413d4711d05bde1ac54f0b969a7de215e40a 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java @@ -41,10 +41,10 @@ public interface ClusterService { /** * 返回一个 - * @param flinkColonyConfigId + * @param id cluster id * @return */ - Cluster getClusterById(Integer flinkColonyConfigId); + Cluster getClusterById(Integer id); /** * 返回全部 diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java index 88d0c3312fbd75dc9f26eabe3b98892b67ec9e72..15df953c0f065db4612d53ea1a14fb887be803a9 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java @@ -55,6 +55,9 @@ public class CdcJobServiceImpl implements CdcJobService { @Autowired private JobApi jobApi; + @Autowired + private ClusterService clusterService; + @Override public List selectAll(CdcJob cdcJob) { return cdcJobMapper.select(cdcJob); @@ -282,7 +285,8 @@ public class CdcJobServiceImpl implements CdcJobService { dataAuthorityService.checkDataAuthority(cdcJob, Constant.DataType.CDCJOB, loginUserId); - String triggerId = jobApi.savepoint(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId()); + Cluster cluster = clusterService.getClusterById(cdcJob.getFlinkColonyId()); + String triggerId = jobApi.savepoint(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.CDCJOB.getValue()); @@ -302,7 +306,8 @@ public class CdcJobServiceImpl implements CdcJobService { dataAuthorityService.checkDataAuthority(cdcJob, Constant.DataType.CDCJOB, loginUserId); - String triggerId = jobApi.stopJob(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId()); + Cluster cluster = clusterService.getClusterById(cdcJob.getFlinkColonyId()); + String triggerId = jobApi.stopJob(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.CDCJOB.getValue()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java index 2a3c830307589c7b90186c3acf07f3e142493cba..61658d64bbf03f68a9ec43d29557163f0d350cd0 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java @@ -86,8 +86,8 @@ public class ClusterServiceImpl implements ClusterService { } @Override - public Cluster getClusterById(Integer flinkColonyConfigId) { - return clusterMapper.selectByPrimaryKey(flinkColonyConfigId); + public Cluster getClusterById(Integer id) { + return clusterMapper.selectByPrimaryKey(id); } @Override diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java index 05b7f1968644eee79cefea09248742bb60044faf..cacd4266449420131428d9b501d7fb66cb6f7bb7 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java @@ -2,16 +2,10 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.constants.Constant; import cn.chongho.inf.flink.mapper.JobMapper; -import cn.chongho.inf.flink.model.BaseJob; -import cn.chongho.inf.flink.model.CheckPointInfo; -import cn.chongho.inf.flink.model.DbSource; -import cn.chongho.inf.flink.model.Job; +import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.restapi.JarApi; import cn.chongho.inf.flink.restapi.JobApi; -import cn.chongho.inf.flink.service.CheckPointInfoService; -import cn.chongho.inf.flink.service.DataAuthorityService; -import cn.chongho.inf.flink.service.DbSourceService; -import cn.chongho.inf.flink.service.JobService; +import cn.chongho.inf.flink.service.*; import cn.chongho.inf.flink.service.tasks.SyncSavePointTask; import cn.chongho.inf.flink.utils.DesUtils; import cn.chongho.inf.flink.utils.StringUtils; @@ -80,6 +74,9 @@ public class JobServiceImpl implements JobService { @Autowired private DataAuthorityService dataAuthorityService; + @Autowired + private ClusterService clusterService; + @Override public List selectAll(Job job) { return jobMapper.select(job); @@ -87,7 +84,7 @@ public class JobServiceImpl implements JobService { @Override public List selectEnabledAndRunJob() { - return jobMapper.selectAllRuningJob(); + return jobMapper.selectAllRunningJob(); } @Override @@ -152,6 +149,7 @@ public class JobServiceImpl implements JobService { public boolean runJob(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); List argsArray = getDbParameter(job.getTargetDbId()); if(!StringUtils.isEmpty(job.getArgs())){ argsArray.addAll(Arrays.asList(job.getArgs().split(" "))); @@ -161,7 +159,7 @@ public class JobServiceImpl implements JobService { */ if (!argsArray.contains(CHECK_POINT_PATH_KEY)) { argsArray.add(CHECK_POINT_PATH_KEY); - argsArray.add(checkpointPath); + argsArray.add(cluster.getCheckPointPath()); } Map params= new HashMap<>(4); @@ -222,7 +220,11 @@ public class JobServiceImpl implements JobService { public boolean savepoint(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId()); + + // 不同集群可能使用不同的 state backend + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + + String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); @@ -240,7 +242,10 @@ public class JobServiceImpl implements JobService { public boolean stopJob(Integer id, Integer loginUserId){ Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId()); + + // 不同集群可能使用不同的 state backend + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java index 80a613796d84a373d40bf0c2e010db8760f267dd..db1ec673c84e7df4ac11c64117f5bdb79154438a 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java @@ -2,10 +2,7 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.constants.Constant; import cn.chongho.inf.flink.mapper.JobMapper; -import cn.chongho.inf.flink.model.JobRunConfig; -import cn.chongho.inf.flink.model.CheckPointInfo; -import cn.chongho.inf.flink.model.Jar; -import cn.chongho.inf.flink.model.Job; +import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.restapi.JarApi; import cn.chongho.inf.flink.restapi.JobApi; import cn.chongho.inf.flink.service.*; @@ -61,6 +58,9 @@ public class SqlJobServiceImpl implements SqlJobService { @Resource private JarService jarService; + @Resource + private ClusterService clusterService; + @Override public List selectAll(Job job) { return jobMapper.select(job); @@ -68,7 +68,7 @@ public class SqlJobServiceImpl implements SqlJobService { @Override public List selectEnabledAndRunJob() { - return jobMapper.selectAllRuningJob(); + return jobMapper.selectAllRunningJob(); } @Override @@ -133,6 +133,8 @@ public class SqlJobServiceImpl implements SqlJobService { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + JobRunConfig jobRunConfig = jobRunConfigService.selectByJobType(Constant.JobConfigType.SQL_JOB.name()); log.info("JobRunConfig{}", JSON.toJSONString(jobRunConfig)); if(jobRunConfig == null){ @@ -148,8 +150,10 @@ public class SqlJobServiceImpl implements SqlJobService { /** * 增加checkpoint路径 */ - argsArray.add(CHECK_POINT_PATH_KEY); - argsArray.add(checkpointPath); + if (!argsArray.contains(CHECK_POINT_PATH_KEY)) { + argsArray.add(CHECK_POINT_PATH_KEY); + argsArray.add(cluster.getCheckPointPath()); + } Map params= new HashMap<>(4); params.put("entryClass" , jobRunConfig.getEntryClass()); @@ -210,7 +214,9 @@ public class SqlJobServiceImpl implements SqlJobService { public boolean savepoint(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId()); + + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); @@ -228,7 +234,9 @@ public class SqlJobServiceImpl implements SqlJobService { public boolean stopJob(Integer id, Integer loginUserId){ Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId()); + + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java index ae0212035052a35b511f1dc644986ac6f9edc380..28d28dcee1cb7336dea56ec76a2b3059776ee34a 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java @@ -51,29 +51,29 @@ public class ClusterStatusCheck { public void doCheck(){ log.info("sync cluster status... "); - List allFlinkColonyConfig = clusterService.getAllCluster(); + List clusterList = clusterService.getAllCluster(); - for(Cluster flinkColonyConfig : allFlinkColonyConfig){ + for(Cluster cluster : clusterList){ try { - jobApi.getOverviewInfo(flinkColonyConfig.getUrl()); + jobApi.getOverviewInfo(cluster.getUrl()); }catch (Exception e){ - String msg ="flink集群[" + flinkColonyConfig.getName() + "]状态不可用,请注意排查。"; + String msg ="flink集群[" + cluster.getName() + "]状态不可用,请注意排查。"; log.info(msg); DingTalkMsg dingTalkMsg = new DingTalkMsg(); dingTalkMsg.setMessage(msg); dingTalkMsg.setGroupId(robotId); long thisTime = System.currentTimeMillis(); - Long lastTime = pushTime.get(flinkColonyConfig.getId()); + Long lastTime = pushTime.get(cluster.getId()); if(lastTime == null){ if(alertEventService.eventUpload(dingTalkMsg)){ - pushTime.put(flinkColonyConfig.getId(), thisTime); + pushTime.put(cluster.getId(), thisTime); } }else{ long seconds = TimeUnit.MILLISECONDS.toSeconds(thisTime - lastTime); if(seconds > EXPIRE_TIME){ if(alertEventService.eventUpload(dingTalkMsg)){ - pushTime.put(flinkColonyConfig.getId(), thisTime); + pushTime.put(cluster.getId(), thisTime); } } } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java index 5e605a6ba40b17073ee6628c224b552ef2185c41..edf32936c61582afee0ac4755dc84afd6a16a0bb 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java @@ -57,10 +57,10 @@ public class SyncCheckPoint { private static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; - @Scheduled(fixedRate = 60*1000L) + @Scheduled(fixedRate = 60 * 1000L) public void doSync() { - Map> checkpointMap = new HashMap<>(); + Map> checkpointMap = new HashMap<>(63); Map> jobPoint = getJobPoint(); if(jobPoint != null){ @@ -151,7 +151,7 @@ public class SyncCheckPoint { } } }catch (Exception e){ - log.error("同步任务checkPoint失败{},{}",job.getJobName(), e.getMessage()); + log.error("sync checkPoint fail:{},{}", job.getJobName(), e.getMessage()); } } return checkpointMap; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java index bd28010e8557083eb55571578d612fb93a80ce46..de63f34165c22b27d526bb91b7c1826f24e5c8ea 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java @@ -76,17 +76,17 @@ public class SyncJobStatusTask { } } - private void doSyncCluster(Cluster flinkColonyConfig){ + private void doSyncCluster(Cluster cluster){ - Integer flinkColonyId = flinkColonyConfig.getId(); + Integer clusterId = cluster.getId(); - Map jobMap = findRunningJob(flinkColonyId).stream().collect(Collectors.toMap(Job::getJobId, Function.identity())); + Map jobMap = findRunningJob(clusterId).stream().collect(Collectors.toMap(Job::getJobId, Function.identity())); - Map cdcJobMap = findRunningCdcJob(flinkColonyId).stream().collect(Collectors.toMap(CdcJob::getJobId, Function.identity())); + Map cdcJobMap = findRunningCdcJob(clusterId).stream().collect(Collectors.toMap(CdcJob::getJobId, Function.identity())); - String flinkColonyUrl = flinkColonyConfig.getUrl(); + String clusterUrl = cluster.getUrl(); - List allJobs = jobApi.getAllJobs(flinkColonyUrl); + List allJobs = jobApi.getAllJobs(clusterUrl); List allJobIdList = new ArrayList<>(allJobs == null ? 0:allJobs.size()); if(allJobs != null){ for(JSONObject jsonObject : allJobs){ @@ -102,7 +102,7 @@ public class SyncJobStatusTask { jobMapper.updateJobStatusByJobId(jid, status); } CompletableFuture.runAsync(() -> sendJobStatusChange(baseJob, jobState)); - cancelJob(baseJob, jobState, flinkColonyUrl); + cancelJob(baseJob, jobState, clusterUrl); }else if(cdcJobMap.containsKey(jid)){ baseJob = cdcJobMap.get(jid); //状态改变 @@ -110,13 +110,13 @@ public class SyncJobStatusTask { cdcJobMapper.updateJobStatusByJobId(jid, status); } CompletableFuture.runAsync(() -> sendJobStatusChange(baseJob, jobState)); - cancelJob(baseJob, jobState, flinkColonyUrl); + cancelJob(baseJob, jobState, clusterUrl); } allJobIdList.add(jsonObject.getString("jid")); } } - updateOtherJobStatus(allJobIdList, flinkColonyId); + updateOtherJobStatus(allJobIdList, clusterId); } @@ -139,28 +139,28 @@ public class SyncJobStatusTask { /** * 查询不到的任务修改状态为取消 * @param allJobIdList - * @param flinkColonyId + * @param clusterId */ - private void updateOtherJobStatus(List allJobIdList, Integer flinkColonyId){ + private void updateOtherJobStatus(List allJobIdList, Integer clusterId){ Example jobExample = new Example(Job.class); Example.Criteria jobCriteria = jobExample.createCriteria(); - jobCriteria.andEqualTo("flinkColonyId", flinkColonyId); + jobCriteria.andEqualTo("flinkColonyId", clusterId); Example cdcJobExample = new Example(CdcJob.class); Example.Criteria cdcJobCriteria = cdcJobExample.createCriteria(); - cdcJobCriteria.andEqualTo("flinkColonyId", flinkColonyId); + cdcJobCriteria.andEqualTo("flinkColonyId", clusterId); if(allJobIdList.isEmpty()){ - jobCriteria.andNotEqualTo("status",Constant.JobState.CANCELED.ordinal()); + jobCriteria.andNotEqualTo("status", Constant.JobState.CANCELED.ordinal()); - cdcJobCriteria.andNotEqualTo("status",Constant.JobState.CANCELED.ordinal()); + cdcJobCriteria.andNotEqualTo("status", Constant.JobState.CANCELED.ordinal()); }else{ - jobCriteria.andNotIn("jobId",allJobIdList); + jobCriteria.andNotIn("jobId", allJobIdList); - cdcJobCriteria.andNotIn("jobId",allJobIdList); + cdcJobCriteria.andNotIn("jobId", allJobIdList); } Job updateJob = new Job(); diff --git a/flink-web-console/src/main/resources/application-local.yml b/flink-web-console/src/main/resources/application-local.yml index 3ffec899ad222e85ba2e08f32c25748db4063432..6b6007c2e5e8df615bfba629edeb4c3b89bf6d2a 100644 --- a/flink-web-console/src/main/resources/application-local.yml +++ b/flink-web-console/src/main/resources/application-local.yml @@ -35,8 +35,8 @@ logging: tk.mybatis: debug sys: url: http://172.16.5.7:8081 - savepoint-path: /home/flink/savepoints - checkpoint-path: /home/flink/checkpoints + savepoint-path: /home/flink/savepoints # 弃用中,改为配置到集群配置中. + checkpoint-path: /home/flink/checkpoints # 弃用中,改为配置到集群配置中. des: secretkey: CF97166B99485FA6E186D6182B814EEE offset: 88888888 diff --git a/flink-web-console/src/main/resources/templates/cluster/edit.ftl b/flink-web-console/src/main/resources/templates/cluster/edit.ftl index 660bf071134822179c7487a0316df8236797482d..a089770486ebae1571904d4a28ad3382b346d782 100644 --- a/flink-web-console/src/main/resources/templates/cluster/edit.ftl +++ b/flink-web-console/src/main/resources/templates/cluster/edit.ftl @@ -26,6 +26,18 @@ +
+ +
+ +
+
+
+ +
+ +
+