From 9b408af70f4e7a62b56830c1c5183ecbbc48f2ff Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 15 Aug 2022 16:03:50 +0800 Subject: [PATCH 1/5] =?UTF-8?q?opts:=E4=BC=98=E5=8C=96=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=E5=91=BD=E5=90=8D=E9=97=AE=E9=A2=98.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/tasks/ClusterStatusCheck.java | 14 ++++---- .../service/tasks/SyncJobStatusTask.java | 34 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java index ae02120..28d28dc 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/ClusterStatusCheck.java @@ -51,29 +51,29 @@ public class ClusterStatusCheck { public void doCheck(){ log.info("sync cluster status... "); - List allFlinkColonyConfig = clusterService.getAllCluster(); + List clusterList = clusterService.getAllCluster(); - for(Cluster flinkColonyConfig : allFlinkColonyConfig){ + for(Cluster cluster : clusterList){ try { - jobApi.getOverviewInfo(flinkColonyConfig.getUrl()); + jobApi.getOverviewInfo(cluster.getUrl()); }catch (Exception e){ - String msg ="flink集群[" + flinkColonyConfig.getName() + "]状态不可用,请注意排查。"; + String msg ="flink集群[" + cluster.getName() + "]状态不可用,请注意排查。"; log.info(msg); DingTalkMsg dingTalkMsg = new DingTalkMsg(); dingTalkMsg.setMessage(msg); dingTalkMsg.setGroupId(robotId); long thisTime = System.currentTimeMillis(); - Long lastTime = pushTime.get(flinkColonyConfig.getId()); + Long lastTime = pushTime.get(cluster.getId()); if(lastTime == null){ if(alertEventService.eventUpload(dingTalkMsg)){ - pushTime.put(flinkColonyConfig.getId(), thisTime); + pushTime.put(cluster.getId(), thisTime); } }else{ long seconds = TimeUnit.MILLISECONDS.toSeconds(thisTime - lastTime); if(seconds > EXPIRE_TIME){ if(alertEventService.eventUpload(dingTalkMsg)){ - pushTime.put(flinkColonyConfig.getId(), thisTime); + pushTime.put(cluster.getId(), thisTime); } } } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java index bd28010..de63f34 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncJobStatusTask.java @@ -76,17 +76,17 @@ public class SyncJobStatusTask { } } - private void doSyncCluster(Cluster flinkColonyConfig){ + private void doSyncCluster(Cluster cluster){ - Integer flinkColonyId = flinkColonyConfig.getId(); + Integer clusterId = cluster.getId(); - Map jobMap = findRunningJob(flinkColonyId).stream().collect(Collectors.toMap(Job::getJobId, Function.identity())); + Map jobMap = findRunningJob(clusterId).stream().collect(Collectors.toMap(Job::getJobId, Function.identity())); - Map cdcJobMap = findRunningCdcJob(flinkColonyId).stream().collect(Collectors.toMap(CdcJob::getJobId, Function.identity())); + Map cdcJobMap = findRunningCdcJob(clusterId).stream().collect(Collectors.toMap(CdcJob::getJobId, Function.identity())); - String flinkColonyUrl = flinkColonyConfig.getUrl(); + String clusterUrl = cluster.getUrl(); - List allJobs = jobApi.getAllJobs(flinkColonyUrl); + List allJobs = jobApi.getAllJobs(clusterUrl); List allJobIdList = new ArrayList<>(allJobs == null ? 0:allJobs.size()); if(allJobs != null){ for(JSONObject jsonObject : allJobs){ @@ -102,7 +102,7 @@ public class SyncJobStatusTask { jobMapper.updateJobStatusByJobId(jid, status); } CompletableFuture.runAsync(() -> sendJobStatusChange(baseJob, jobState)); - cancelJob(baseJob, jobState, flinkColonyUrl); + cancelJob(baseJob, jobState, clusterUrl); }else if(cdcJobMap.containsKey(jid)){ baseJob = cdcJobMap.get(jid); //状态改变 @@ -110,13 +110,13 @@ public class SyncJobStatusTask { cdcJobMapper.updateJobStatusByJobId(jid, status); } CompletableFuture.runAsync(() -> sendJobStatusChange(baseJob, jobState)); - cancelJob(baseJob, jobState, flinkColonyUrl); + cancelJob(baseJob, jobState, clusterUrl); } allJobIdList.add(jsonObject.getString("jid")); } } - updateOtherJobStatus(allJobIdList, flinkColonyId); + updateOtherJobStatus(allJobIdList, clusterId); } @@ -139,28 +139,28 @@ public class SyncJobStatusTask { /** * 查询不到的任务修改状态为取消 * @param allJobIdList - * @param flinkColonyId + * @param clusterId */ - private void updateOtherJobStatus(List allJobIdList, Integer flinkColonyId){ + private void updateOtherJobStatus(List allJobIdList, Integer clusterId){ Example jobExample = new Example(Job.class); Example.Criteria jobCriteria = jobExample.createCriteria(); - jobCriteria.andEqualTo("flinkColonyId", flinkColonyId); + jobCriteria.andEqualTo("flinkColonyId", clusterId); Example cdcJobExample = new Example(CdcJob.class); Example.Criteria cdcJobCriteria = cdcJobExample.createCriteria(); - cdcJobCriteria.andEqualTo("flinkColonyId", flinkColonyId); + cdcJobCriteria.andEqualTo("flinkColonyId", clusterId); if(allJobIdList.isEmpty()){ - jobCriteria.andNotEqualTo("status",Constant.JobState.CANCELED.ordinal()); + jobCriteria.andNotEqualTo("status", Constant.JobState.CANCELED.ordinal()); - cdcJobCriteria.andNotEqualTo("status",Constant.JobState.CANCELED.ordinal()); + cdcJobCriteria.andNotEqualTo("status", Constant.JobState.CANCELED.ordinal()); }else{ - jobCriteria.andNotIn("jobId",allJobIdList); + jobCriteria.andNotIn("jobId", allJobIdList); - cdcJobCriteria.andNotIn("jobId",allJobIdList); + cdcJobCriteria.andNotIn("jobId", allJobIdList); } Job updateJob = new Job(); -- Gitee From 05111abaed03d1134b5678d44cf36407f521107b Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 15 Aug 2022 16:14:08 +0800 Subject: [PATCH 2/5] opts --- .../cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java index 5e605a6..edf3293 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/tasks/SyncCheckPoint.java @@ -57,10 +57,10 @@ public class SyncCheckPoint { private static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; - @Scheduled(fixedRate = 60*1000L) + @Scheduled(fixedRate = 60 * 1000L) public void doSync() { - Map> checkpointMap = new HashMap<>(); + Map> checkpointMap = new HashMap<>(63); Map> jobPoint = getJobPoint(); if(jobPoint != null){ @@ -151,7 +151,7 @@ public class SyncCheckPoint { } } }catch (Exception e){ - log.error("同步任务checkPoint失败{},{}",job.getJobName(), e.getMessage()); + log.error("sync checkPoint fail:{},{}", job.getJobName(), e.getMessage()); } } return checkpointMap; -- Gitee From 603351da2fefbd4d31bfb50c33235a26de069949 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Wed, 17 Aug 2022 17:36:43 +0800 Subject: [PATCH 3/5] =?UTF-8?q?feats:=20=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E9=9B=86=E7=BE=A4=EF=BC=8C=E5=A4=9A=E7=A7=8Dcheckpoint?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E6=96=B9=E5=BC=8F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flink-web-console/flink-admin.sql | 3 ++ .../chongho/inf/flink/mapper/JobMapper.java | 2 +- .../cn/chongho/inf/flink/model/BaseJob.java | 9 ++++++ .../cn/chongho/inf/flink/model/Cluster.java | 10 +++++++ .../cn/chongho/inf/flink/restapi/JobApi.java | 10 +++++-- .../inf/flink/service/ClusterService.java | 4 +-- .../flink/service/impl/CdcJobServiceImpl.java | 9 ++++-- .../service/impl/ClusterServiceImpl.java | 4 +-- .../flink/service/impl/JobServiceImpl.java | 29 +++++++++++-------- .../flink/service/impl/SqlJobServiceImpl.java | 26 +++++++++++------ .../main/resources/templates/cluster/edit.ftl | 24 ++++++++++++++- .../resources/templates/cluster/index.ftl | 6 +++- .../src/main/resources/templates/index.ftl | 2 +- 13 files changed, 104 insertions(+), 34 deletions(-) diff --git a/flink-web-console/flink-admin.sql b/flink-web-console/flink-admin.sql index 8ff5a1d..9d19276 100644 --- a/flink-web-console/flink-admin.sql +++ b/flink-web-console/flink-admin.sql @@ -86,6 +86,9 @@ CREATE TABLE `cluster` ( `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '名称', `url` varchar(300) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '集群地址', `enable_flag` tinyint(4) NULL DEFAULT NULL COMMENT '状态', + `check_point_path` varchar(128) NULL COMMENT 'checkpoint', + `save_point_path` varchar(128) NULL COMMENT 'savepoint', + `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java index 0b38c05..650d3cf 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/JobMapper.java @@ -29,7 +29,7 @@ public interface JobMapper extends Mapper { @Select("SELECT job.id ,job.job_id jobId, fc.url flinkColonyUrl" + " FROM job LEFT JOIN cluster fc ON job.flink_colony_id = fc.id " + "WHERE job.enable_flag = 1 AND job.status != 0 ") - List selectAllRuningJob(); + List selectAllRunningJob(); @Select("SELECT id FROM job WHERE job.enable_flag = 1 AND job.entry_class = #{ entryClass } ") diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java index 1fd63d6..88e5c7e 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/BaseJob.java @@ -30,11 +30,20 @@ public class BaseJob implements Serializable { private Integer enableFlag; + /** + * 集群ID,@see "cluster.id" + */ private Integer flinkColonyId; + /** + * @see "cluster.name" + */ @Transient private String flinkColonyName; + /** + * 集群 web url, @see "cluster.url" + */ @Transient private String flinkColonyUrl; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java index 0b906ef..60d3cf7 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/Cluster.java @@ -24,6 +24,16 @@ public class Cluster implements Serializable { private Integer enableFlag; + /** + * 不同的集群可能使用不同的 State Backend机制. + */ + private String checkPointPath; + + /** + * 不同的集群可能使用不同的 State Backend机制. + */ + private String savePointPath; + public Cluster(Integer id) { this.id = id; } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java index a1a4777..e764ddb 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java @@ -54,7 +54,7 @@ public class JobApi { * @param jobId {"request-id":"4ff40b1af9aad8e18222a442b705b472"} * @return */ - public String savepoint(String flinkUrl, String jobId) { + public String savepoint(String flinkUrl, String jobId, String savepointPath) { String url = flinkUrl + JOBS_RUL_PREFIX + jobId + "/savepoints"; Map params = new HashMap(); params.put("target-directory", savepointPath); @@ -84,9 +84,13 @@ public class JobApi { * @return */ public String stopJob(String flinkUrl, String jobId) { + return stopJob(flinkUrl, jobId, savepointPath); + } + + public String stopJob(String flinkUrl, String jobId, String checkPointPath) { String url = flinkUrl + JOBS_RUL_PREFIX + jobId + "/stop"; - Map params = new HashMap(); - params.put("targetDirectory", savepointPath); + Map params = new HashMap(63); + params.put("targetDirectory", checkPointPath); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity entity = new HttpEntity<>(JSON.toJSONString(params), headers); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java index 4c3842e..9927413 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java @@ -41,10 +41,10 @@ public interface ClusterService { /** * 返回一个 - * @param flinkColonyConfigId + * @param id cluster id * @return */ - Cluster getClusterById(Integer flinkColonyConfigId); + Cluster getClusterById(Integer id); /** * 返回全部 diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java index 88d0c33..15df953 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcJobServiceImpl.java @@ -55,6 +55,9 @@ public class CdcJobServiceImpl implements CdcJobService { @Autowired private JobApi jobApi; + @Autowired + private ClusterService clusterService; + @Override public List selectAll(CdcJob cdcJob) { return cdcJobMapper.select(cdcJob); @@ -282,7 +285,8 @@ public class CdcJobServiceImpl implements CdcJobService { dataAuthorityService.checkDataAuthority(cdcJob, Constant.DataType.CDCJOB, loginUserId); - String triggerId = jobApi.savepoint(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId()); + Cluster cluster = clusterService.getClusterById(cdcJob.getFlinkColonyId()); + String triggerId = jobApi.savepoint(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.CDCJOB.getValue()); @@ -302,7 +306,8 @@ public class CdcJobServiceImpl implements CdcJobService { dataAuthorityService.checkDataAuthority(cdcJob, Constant.DataType.CDCJOB, loginUserId); - String triggerId = jobApi.stopJob(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId()); + Cluster cluster = clusterService.getClusterById(cdcJob.getFlinkColonyId()); + String triggerId = jobApi.stopJob(cdcJob.getFlinkColonyUrl(), cdcJob.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.CDCJOB.getValue()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java index 2a3c830..b4d8cea 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java @@ -86,8 +86,8 @@ public class ClusterServiceImpl implements ClusterService { } @Override - public Cluster getClusterById(Integer flinkColonyConfigId) { - return clusterMapper.selectByPrimaryKey(flinkColonyConfigId); + public Cluster getClusterById(Integer Id) { + return clusterMapper.selectByPrimaryKey(Id); } @Override diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java index 05b7f19..cacd426 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java @@ -2,16 +2,10 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.constants.Constant; import cn.chongho.inf.flink.mapper.JobMapper; -import cn.chongho.inf.flink.model.BaseJob; -import cn.chongho.inf.flink.model.CheckPointInfo; -import cn.chongho.inf.flink.model.DbSource; -import cn.chongho.inf.flink.model.Job; +import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.restapi.JarApi; import cn.chongho.inf.flink.restapi.JobApi; -import cn.chongho.inf.flink.service.CheckPointInfoService; -import cn.chongho.inf.flink.service.DataAuthorityService; -import cn.chongho.inf.flink.service.DbSourceService; -import cn.chongho.inf.flink.service.JobService; +import cn.chongho.inf.flink.service.*; import cn.chongho.inf.flink.service.tasks.SyncSavePointTask; import cn.chongho.inf.flink.utils.DesUtils; import cn.chongho.inf.flink.utils.StringUtils; @@ -80,6 +74,9 @@ public class JobServiceImpl implements JobService { @Autowired private DataAuthorityService dataAuthorityService; + @Autowired + private ClusterService clusterService; + @Override public List selectAll(Job job) { return jobMapper.select(job); @@ -87,7 +84,7 @@ public class JobServiceImpl implements JobService { @Override public List selectEnabledAndRunJob() { - return jobMapper.selectAllRuningJob(); + return jobMapper.selectAllRunningJob(); } @Override @@ -152,6 +149,7 @@ public class JobServiceImpl implements JobService { public boolean runJob(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); List argsArray = getDbParameter(job.getTargetDbId()); if(!StringUtils.isEmpty(job.getArgs())){ argsArray.addAll(Arrays.asList(job.getArgs().split(" "))); @@ -161,7 +159,7 @@ public class JobServiceImpl implements JobService { */ if (!argsArray.contains(CHECK_POINT_PATH_KEY)) { argsArray.add(CHECK_POINT_PATH_KEY); - argsArray.add(checkpointPath); + argsArray.add(cluster.getCheckPointPath()); } Map params= new HashMap<>(4); @@ -222,7 +220,11 @@ public class JobServiceImpl implements JobService { public boolean savepoint(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId()); + + // 不同集群可能使用不同的 state backend + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + + String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); @@ -240,7 +242,10 @@ public class JobServiceImpl implements JobService { public boolean stopJob(Integer id, Integer loginUserId){ Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId()); + + // 不同集群可能使用不同的 state backend + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java index 80a6137..db1ec67 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/SqlJobServiceImpl.java @@ -2,10 +2,7 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.constants.Constant; import cn.chongho.inf.flink.mapper.JobMapper; -import cn.chongho.inf.flink.model.JobRunConfig; -import cn.chongho.inf.flink.model.CheckPointInfo; -import cn.chongho.inf.flink.model.Jar; -import cn.chongho.inf.flink.model.Job; +import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.restapi.JarApi; import cn.chongho.inf.flink.restapi.JobApi; import cn.chongho.inf.flink.service.*; @@ -61,6 +58,9 @@ public class SqlJobServiceImpl implements SqlJobService { @Resource private JarService jarService; + @Resource + private ClusterService clusterService; + @Override public List selectAll(Job job) { return jobMapper.select(job); @@ -68,7 +68,7 @@ public class SqlJobServiceImpl implements SqlJobService { @Override public List selectEnabledAndRunJob() { - return jobMapper.selectAllRuningJob(); + return jobMapper.selectAllRunningJob(); } @Override @@ -133,6 +133,8 @@ public class SqlJobServiceImpl implements SqlJobService { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + JobRunConfig jobRunConfig = jobRunConfigService.selectByJobType(Constant.JobConfigType.SQL_JOB.name()); log.info("JobRunConfig{}", JSON.toJSONString(jobRunConfig)); if(jobRunConfig == null){ @@ -148,8 +150,10 @@ public class SqlJobServiceImpl implements SqlJobService { /** * 增加checkpoint路径 */ - argsArray.add(CHECK_POINT_PATH_KEY); - argsArray.add(checkpointPath); + if (!argsArray.contains(CHECK_POINT_PATH_KEY)) { + argsArray.add(CHECK_POINT_PATH_KEY); + argsArray.add(cluster.getCheckPointPath()); + } Map params= new HashMap<>(4); params.put("entryClass" , jobRunConfig.getEntryClass()); @@ -210,7 +214,9 @@ public class SqlJobServiceImpl implements SqlJobService { public boolean savepoint(Integer id, Integer loginUserId) { Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId()); + + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.savepoint(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); @@ -228,7 +234,9 @@ public class SqlJobServiceImpl implements SqlJobService { public boolean stopJob(Integer id, Integer loginUserId){ Job job = jobMapper.findJobById(id); dataAuthorityService.checkDataAuthority(job, Constant.DataType.JOB, loginUserId); - String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId()); + + Cluster cluster = clusterService.getClusterById(job.getFlinkColonyId()); + String triggerId = jobApi.stopJob(job.getFlinkColonyUrl(), job.getJobId(), cluster.getSavePointPath()); CheckPointInfo checkPointInfo = new CheckPointInfo(id ,triggerId); checkPointInfo.setJobForm(Constant.CheckJobForm.JOB.getValue()); diff --git a/flink-web-console/src/main/resources/templates/cluster/edit.ftl b/flink-web-console/src/main/resources/templates/cluster/edit.ftl index 660bf07..a089770 100644 --- a/flink-web-console/src/main/resources/templates/cluster/edit.ftl +++ b/flink-web-console/src/main/resources/templates/cluster/edit.ftl @@ -26,6 +26,18 @@ +
+ +
+ +
+
+
+ +
+ +
+