From 05edd4cf234a397743d0c40365c8230178589898 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Tue, 26 Apr 2022 10:20:10 +0800 Subject: [PATCH 01/10] opts. --- flink-web-console/flink-admin.sql | 1 + .../main/resources/templates/cdcjob/edit.ftl | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/flink-web-console/flink-admin.sql b/flink-web-console/flink-admin.sql index 14a42b6..44ca527 100644 --- a/flink-web-console/flink-admin.sql +++ b/flink-web-console/flink-admin.sql @@ -166,6 +166,7 @@ DROP TABLE IF EXISTS `job`; CREATE TABLE `job` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, + `job_type` tinyint(4) NULL DEFAULT 1, `jar_id` int(11) NULL DEFAULT NULL, `entry_class` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `args` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, diff --git a/flink-web-console/src/main/resources/templates/cdcjob/edit.ftl b/flink-web-console/src/main/resources/templates/cdcjob/edit.ftl index 80d416f..0ebb587 100644 --- a/flink-web-console/src/main/resources/templates/cdcjob/edit.ftl +++ b/flink-web-console/src/main/resources/templates/cdcjob/edit.ftl @@ -71,7 +71,7 @@ - + @@ -120,7 +120,7 @@ checked="checked" style="margin-left: 13px"> 删除
- +
@@ -249,6 +249,14 @@ changeJobType($("input[name=jobType]:checked").val()); }); + $("#insertTips").mouseover(function (){ + $("#insertSql").text(" INSERT INTO dbName_dbId_tableName ( id,field1,field2,fieldN) SELECT id,field1,field2,fieldN FROM dbName_dbId_tableName"); + }); + + $("#insertTips").mouseout(function (){ + $("#insertSql").text(""); + }); + function changeJobType(jobType) { if(jobType == 1) { $("#jobType1Insert").show(); @@ -449,7 +457,7 @@ } }, error: function (response) { - alert("链接服务器失败"); + alert("操作失败"); } }); } @@ -481,7 +489,7 @@ } }, error: function (response) { - alert("链接服务器失败"); + alert("操作失败"); } }); } @@ -510,7 +518,7 @@ } }, error: function (response) { - alert("链接服务器失败"); + alert("操作失败"); } }); } -- Gitee From 36bc035bd93e5058463740d2f7d5a9899c6cae5f Mon Sep 17 00:00:00 2001 From: huangming Date: Thu, 5 May 2022 13:46:05 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E6=8F=90=E7=A4=BA=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flink-web-console/src/main/resources/templates/cdcjob/index.ftl | 2 +- flink-web-console/src/main/resources/templates/job/index.ftl | 2 +- flink-web-console/src/main/resources/templates/sqljob/index.ftl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-web-console/src/main/resources/templates/cdcjob/index.ftl b/flink-web-console/src/main/resources/templates/cdcjob/index.ftl index 663d432..c13c054 100644 --- a/flink-web-console/src/main/resources/templates/cdcjob/index.ftl +++ b/flink-web-console/src/main/resources/templates/cdcjob/index.ftl @@ -391,7 +391,7 @@ }, error: function (response) { $("#loadingModal").modal('hide'); - alert("链接服务器失败"); + alert(response.msg); } }); } diff --git a/flink-web-console/src/main/resources/templates/job/index.ftl b/flink-web-console/src/main/resources/templates/job/index.ftl index ee8e732..40c6ffd 100644 --- a/flink-web-console/src/main/resources/templates/job/index.ftl +++ b/flink-web-console/src/main/resources/templates/job/index.ftl @@ -456,7 +456,7 @@ }, error: function (response) { $("#loadingModal").modal('hide'); - alert("连接服务器失败",function(){ + alert(response.msg,function(){ location.reload(); }); } diff --git a/flink-web-console/src/main/resources/templates/sqljob/index.ftl b/flink-web-console/src/main/resources/templates/sqljob/index.ftl index c94472f..ae009f8 100644 --- a/flink-web-console/src/main/resources/templates/sqljob/index.ftl +++ b/flink-web-console/src/main/resources/templates/sqljob/index.ftl @@ -422,7 +422,7 @@ }, error: function (response) { $("#loadingModal").modal('hide'); - alert("连接服务器失败",function(){ + alert(response.msg,function(){ location.reload(); }); } -- Gitee From 13c4a0b82d829f67756cb0b0afe29343ab2ca652 Mon Sep 17 00:00:00 2001 From: huangming Date: Thu, 5 May 2022 15:24:55 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E5=85=AC=E5=8F=B8?= =?UTF-8?q?=E5=86=85=E9=83=A8=E8=B4=A6=E5=8F=B7=E7=B3=BB=E7=BB=9F=E7=99=BB?= =?UTF-8?q?=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../inf/flink/controller/AdminController.java | 7 +++- .../cn/chongho/inf/flink/model/AdminUser.java | 21 ++++++++-- .../cn/chongho/inf/flink/restapi/BmsApi.java | 42 +++++++++++++++++++ .../inf/flink/service/AdminUserService.java | 9 ++++ .../service/impl/AdminUserServiceImpl.java | 32 +++++++++++++- 5 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java index 6859250..435dfa0 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java @@ -12,6 +12,7 @@ package cn.chongho.inf.flink.controller; import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.service.*; +import cn.chongho.inf.flink.utils.StringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; @@ -75,7 +76,11 @@ public class AdminController { @PostMapping(value = "/admin/login") public String login(@RequestParam(defaultValue = "") String name, @RequestParam(defaultValue = "") String pass, ModelMap map, HttpSession session){ - AdminUser user = adminUserService.login(name,pass); + if(StringUtils.isEmpty(pass)){ + map.put("error","用户名或密码错误"); + return "login"; + } + AdminUser user = adminUserService.loginByBms(name,pass); if(user == null){ map.put("error","用户名或密码错误"); return "login"; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java index 2b1f38a..4bd541f 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java @@ -2,9 +2,7 @@ package cn.chongho.inf.flink.model; import lombok.Data; -import javax.persistence.Id; -import javax.persistence.Table; -import javax.persistence.Transient; +import javax.persistence.*; import java.util.Date; /** @@ -19,6 +17,8 @@ public class AdminUser { private Integer tenantid; + private String account; + private String name; private String psw; @@ -41,4 +41,19 @@ public class AdminUser { private Date updatetime; + public static AdminUser createByBmsUser(String account, String userName){ + AdminUser adminUser = new AdminUser(); + adminUser.setAccount(account); + adminUser.setName(userName); + adminUser.setPsw(""); + adminUser.setEmail(""); + adminUser.setCreator(-1); + adminUser.setUpdateuser(-1); + adminUser.setFlag(1); + adminUser.setCreatetime(new Date()); + adminUser.setUpdatetime(new Date()); + adminUser.setLogintime(new Date()); + adminUser.setTenantid(0); + return adminUser; + } } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java new file mode 100644 index 0000000..722ac63 --- /dev/null +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java @@ -0,0 +1,42 @@ +package cn.chongho.inf.flink.restapi; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +/** + * @author ming + */ + +@Service +@Slf4j +public class BmsApi { + + @Value("${bms.url}") + private String bmsUrl; + + @Autowired + private RestTemplate restTemplate; + + public JSONObject getUserByBms(String account, String password){ + JSONObject jsonObject = new JSONObject(); + jsonObject.put("account", account); + jsonObject.put("password", password); + jsonObject.put("loginType", "password"); + HttpHeaders headers = new HttpHeaders(); + //定义请求参数类型,这里用json所以是MediaType.APPLICATION_JSON + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity formEntity = new HttpEntity<>(jsonObject.toJSONString(), headers); + JSONObject userJsonObject = restTemplate.postForObject(bmsUrl + "/oauth/token", formEntity, JSONObject.class); + if(userJsonObject.getBoolean("success")){ + return userJsonObject.getJSONObject("data"); + } + return null; + } +} diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java index a9eb223..9042773 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java @@ -33,6 +33,15 @@ public interface AdminUserService { * @return */ public AdminUser login(String name, String pwd); + + /** + * 后台用户登录 + * @param account 用户账号 + * @param pwd 密码 + * @return + */ + public AdminUser loginByBms(String account, String pwd); + /** * 根据id查询系统用户 * @param id diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java index 97d07ef..e13babd 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java @@ -3,8 +3,10 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.mapper.AdminUserMapper; import cn.chongho.inf.flink.mapper.UserRoleMapper; import cn.chongho.inf.flink.model.*; +import cn.chongho.inf.flink.restapi.BmsApi; import cn.chongho.inf.flink.service.AdminUserService; import cn.chongho.inf.flink.utils.MD5Util; +import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -26,6 +28,9 @@ public class AdminUserServiceImpl implements AdminUserService { @Autowired private UserRoleMapper userRoleMapper; + @Autowired + private BmsApi bmsApi; + @Override public List select(int page, int pageSize, String query) { return mapper.selectByQuery((page - 1) * pageSize,pageSize,"%"+query+"%"); @@ -39,11 +44,35 @@ public class AdminUserServiceImpl implements AdminUserService { @Override public AdminUser login(String name, String pwd) { AdminUser temp = new AdminUser(); - temp.setName(name); + temp.setAccount(name); temp.setPsw(MD5Util.MD5(pwd)); return mapper.selectOne(temp); } + @Override + @Transactional(rollbackFor = Exception.class) + public AdminUser loginByBms(String account, String pwd) { + JSONObject userByBms = bmsApi.getUserByBms(account, pwd); + if(userByBms != null){ + String bmsAccount = userByBms.getString("account"); + String userName = userByBms.getString("userName"); + AdminUser temp = new AdminUser(); + temp.setAccount(bmsAccount); + AdminUser adminUser = mapper.selectOne(temp); + if(adminUser == null){ + AdminUser newAdminUser = AdminUser.createByBmsUser(account, userName); + mapper.insert(newAdminUser); + temp.setAccount(bmsAccount); + adminUser = mapper.selectOne(temp); + updateRoleMenu("2,", adminUser.getId(), -1); + return adminUser; + } + return adminUser; + }else{ + return login(account, pwd); + } + } + @Override public AdminUser selectById(int id) { return mapper.selectByPrimaryKey(id); @@ -52,6 +81,7 @@ public class AdminUserServiceImpl implements AdminUserService { @Override public boolean insert(AdminUser user) { user.setPsw(MD5Util.MD5(user.getPsw())); + user.setAccount(user.getName()); return mapper.insertSelective(user) > 0; } -- Gitee From 52928639f3c18fd83e49d00aa8a787d6278634f9 Mon Sep 17 00:00:00 2001 From: huangming Date: Thu, 5 May 2022 15:28:40 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E7=99=BB=E5=BD=95=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../inf/flink/service/impl/AdminUserServiceImpl.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java index e13babd..386816f 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java @@ -7,6 +7,7 @@ import cn.chongho.inf.flink.restapi.BmsApi; import cn.chongho.inf.flink.service.AdminUserService; import cn.chongho.inf.flink.utils.MD5Util; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -21,6 +22,7 @@ import java.util.stream.Collectors; * @since 2022-02-14 */ @Service +@Slf4j public class AdminUserServiceImpl implements AdminUserService { @Autowired @@ -52,7 +54,13 @@ public class AdminUserServiceImpl implements AdminUserService { @Override @Transactional(rollbackFor = Exception.class) public AdminUser loginByBms(String account, String pwd) { - JSONObject userByBms = bmsApi.getUserByBms(account, pwd); + JSONObject userByBms = null; + try { + userByBms = bmsApi.getUserByBms(account, pwd); + }catch (Exception e){ + log.error("loginByBms error", e); + } + if(userByBms != null){ String bmsAccount = userByBms.getString("account"); String userName = userByBms.getString("userName"); -- Gitee From 29c215bb6b4f430a8884c6be5a133cc8eabddcd5 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Thu, 5 May 2022 17:00:44 +0800 Subject: [PATCH 05/10] =?UTF-8?q?feats:=E9=9B=86=E6=88=90=E7=AC=AC?= =?UTF-8?q?=E4=B8=89=E6=96=B9=E7=99=BB=E5=BD=95=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../inf/flink/controller/AdminController.java | 2 +- .../cn/chongho/inf/flink/model/AdminUser.java | 2 +- .../restapi/{BmsApi.java => LoginApi.java} | 11 ++--- .../inf/flink/service/AdminUserService.java | 4 +- .../service/impl/AdminUserServiceImpl.java | 42 ++++++++++++------- .../src/main/resources/application-local.yml | 3 ++ 6 files changed, 39 insertions(+), 25 deletions(-) rename flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/{BmsApi.java => LoginApi.java} (80%) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java index 435dfa0..a0a527e 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java @@ -80,7 +80,7 @@ public class AdminController { map.put("error","用户名或密码错误"); return "login"; } - AdminUser user = adminUserService.loginByBms(name,pass); + AdminUser user = adminUserService.login(name,pass); if(user == null){ map.put("error","用户名或密码错误"); return "login"; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java index 4bd541f..a9004de 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/model/AdminUser.java @@ -41,7 +41,7 @@ public class AdminUser { private Date updatetime; - public static AdminUser createByBmsUser(String account, String userName){ + public static AdminUser createByUserInfo(String account, String userName){ AdminUser adminUser = new AdminUser(); adminUser.setAccount(account); adminUser.setName(userName); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/LoginApi.java similarity index 80% rename from flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java rename to flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/LoginApi.java index 722ac63..8e249b8 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/BmsApi.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/LoginApi.java @@ -11,20 +11,21 @@ import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; /** + * 第三方系统登录。 * @author ming */ @Service @Slf4j -public class BmsApi { +public class LoginApi { - @Value("${bms.url}") - private String bmsUrl; + @Value("${sys.login.url}") + private String loginUrl; @Autowired private RestTemplate restTemplate; - public JSONObject getUserByBms(String account, String password){ + public JSONObject getUser(String account, String password){ JSONObject jsonObject = new JSONObject(); jsonObject.put("account", account); jsonObject.put("password", password); @@ -33,7 +34,7 @@ public class BmsApi { //定义请求参数类型,这里用json所以是MediaType.APPLICATION_JSON headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity formEntity = new HttpEntity<>(jsonObject.toJSONString(), headers); - JSONObject userJsonObject = restTemplate.postForObject(bmsUrl + "/oauth/token", formEntity, JSONObject.class); + JSONObject userJsonObject = restTemplate.postForObject(loginUrl + "/oauth/token", formEntity, JSONObject.class); if(userJsonObject.getBoolean("success")){ return userJsonObject.getJSONObject("data"); } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java index 9042773..085cec7 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/AdminUserService.java @@ -35,12 +35,12 @@ public interface AdminUserService { public AdminUser login(String name, String pwd); /** - * 后台用户登录 + * 第三方登录 * @param account 用户账号 * @param pwd 密码 * @return */ - public AdminUser loginByBms(String account, String pwd); + public AdminUser loginBy3Thd(String account, String pwd); /** * 根据id查询系统用户 diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java index 386816f..eef5bcd 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/AdminUserServiceImpl.java @@ -3,12 +3,13 @@ package cn.chongho.inf.flink.service.impl; import cn.chongho.inf.flink.mapper.AdminUserMapper; import cn.chongho.inf.flink.mapper.UserRoleMapper; import cn.chongho.inf.flink.model.*; -import cn.chongho.inf.flink.restapi.BmsApi; +import cn.chongho.inf.flink.restapi.LoginApi; import cn.chongho.inf.flink.service.AdminUserService; import cn.chongho.inf.flink.utils.MD5Util; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import tk.mybatis.mapper.entity.Example; @@ -31,7 +32,10 @@ public class AdminUserServiceImpl implements AdminUserService { private UserRoleMapper userRoleMapper; @Autowired - private BmsApi bmsApi; + private LoginApi loginApi; + + @Value("sys.login.enable") + private boolean enableThirdLogin; @Override public List select(int page, int pageSize, String query) { @@ -45,32 +49,36 @@ public class AdminUserServiceImpl implements AdminUserService { @Override public AdminUser login(String name, String pwd) { - AdminUser temp = new AdminUser(); - temp.setAccount(name); - temp.setPsw(MD5Util.MD5(pwd)); - return mapper.selectOne(temp); + AdminUser user = new AdminUser(); + if (enableThirdLogin) { + return loginBy3Thd(name, pwd); + } else { + user.setAccount(name); + user.setPsw(MD5Util.MD5(pwd)); + return mapper.selectOne(user); + } } @Override @Transactional(rollbackFor = Exception.class) - public AdminUser loginByBms(String account, String pwd) { - JSONObject userByBms = null; + public AdminUser loginBy3Thd(String account, String pwd) { + JSONObject userInfo = null; try { - userByBms = bmsApi.getUserByBms(account, pwd); + userInfo = loginApi.getUser(account, pwd); }catch (Exception e){ - log.error("loginByBms error", e); + log.error("loginByApi error", e); } - if(userByBms != null){ - String bmsAccount = userByBms.getString("account"); - String userName = userByBms.getString("userName"); + if(userInfo != null){ + String accountName = userInfo.getString("account"); + String userName = userInfo.getString("userName"); AdminUser temp = new AdminUser(); - temp.setAccount(bmsAccount); + temp.setAccount(accountName); AdminUser adminUser = mapper.selectOne(temp); if(adminUser == null){ - AdminUser newAdminUser = AdminUser.createByBmsUser(account, userName); + AdminUser newAdminUser = AdminUser.createByUserInfo(account, userName); mapper.insert(newAdminUser); - temp.setAccount(bmsAccount); + temp.setAccount(accountName); adminUser = mapper.selectOne(temp); updateRoleMenu("2,", adminUser.getId(), -1); return adminUser; @@ -166,4 +174,6 @@ public class AdminUserServiceImpl implements AdminUserService { return WebResult.error("旧密码错误"); } } + + private } diff --git a/flink-web-console/src/main/resources/application-local.yml b/flink-web-console/src/main/resources/application-local.yml index 70327b0..3ffec89 100644 --- a/flink-web-console/src/main/resources/application-local.yml +++ b/flink-web-console/src/main/resources/application-local.yml @@ -42,6 +42,9 @@ sys: offset: 88888888 upload: jar-path: E:\\tmp\\ + login: + enable: false # 是否开启第三方登录 + url: #监控推送用于告警 alert: -- Gitee From 721eefc02c0ee3fd1c08e6eaaf2feaaab7de2d41 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Thu, 5 May 2022 18:24:06 +0800 Subject: [PATCH 06/10] feats:state backend. --- .../java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java b/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java index f69a039..692d1de 100644 --- a/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java +++ b/flink-cdc-job/src/main/java/cn/chongho/inf/flink/job/FlinkCdcSqlTemplateJob.java @@ -2,6 +2,7 @@ package cn.chongho.inf.flink.job; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.SqlDialect; @@ -31,6 +32,9 @@ public class FlinkCdcSqlTemplateJob { env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + env.setStateBackend(new HashMapStateBackend()); + env.getCheckpointConfig().setCheckpointStorage(parameterTool.get("checkpoint.path", "file:///tmp/flink/checkpoints")); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); -- Gitee From 7f088e1f63cb7897de4d500d69c4a6348fd94788 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 18 Jul 2022 11:33:34 +0800 Subject: [PATCH 07/10] opts. --- .../inf/flink/config/MybatisConfiguration.java | 1 + .../chongho/inf/flink/config/RestTemplateConfig.java | 11 +++-------- .../cn/chongho/inf/flink/constants/Constant.java | 9 ++++++++- .../inf/flink/controller/AdminController.java | 3 +-- .../cn/chongho/inf/flink/mapper/AdminUserMapper.java | 12 ++++++++++++ .../java/cn/chongho/inf/flink/restapi/JobApi.java | 4 ++-- .../cn/chongho/inf/flink/service/ClusterService.java | 6 +++++- .../inf/flink/service/impl/CdcSqlServiceImpl.java | 12 +++++++++--- .../inf/flink/service/impl/ClusterServiceImpl.java | 2 +- .../inf/flink/service/impl/JobServiceImpl.java | 5 +---- .../inf/flink/service/impl/MenuServiceImpl.java | 4 +++- .../inf/flink/service/impl/RoleServiceImpl.java | 2 +- .../java/cn/chongho/inf/flink/utils/MD5Util.java | 4 ++-- 13 files changed, 49 insertions(+), 26 deletions(-) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java index 055a405..5c25803 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/MybatisConfiguration.java @@ -80,6 +80,7 @@ public class MybatisConfiguration implements TransactionManagementConfigurer { } @Bean + @Override public PlatformTransactionManager annotationDrivenTransactionManager() { logger.info("事物配置"); return new DataSourceTransactionManager(dataSource); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java index 4a64c33..420ef3f 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/config/RestTemplateConfig.java @@ -24,9 +24,6 @@ public class RestTemplateConfig { restTemplate.setErrorHandler(new DefaultResponseErrorHandler(){ @Override public void handleError(ClientHttpResponse response) throws IOException { -// if(response.getStatusCode() != HttpStatus.INTERNAL_SERVER_ERROR) { -// super.handleError(response); -// }else{ StringBuilder sb = new StringBuilder(); String line; BufferedReader br = new BufferedReader(new InputStreamReader(response.getBody())); @@ -34,7 +31,7 @@ public class RestTemplateConfig { sb.append(line); } throw new RestClientException(sb.toString()); -// } + } }); return restTemplate; @@ -46,9 +43,7 @@ public class RestTemplateConfig { restTemplate.setErrorHandler(new DefaultResponseErrorHandler(){ @Override public void handleError(ClientHttpResponse response) throws IOException { -// if(response.getStatusCode() != HttpStatus.INTERNAL_SERVER_ERROR) { -// super.handleError(response); -// }else{ + StringBuilder sb = new StringBuilder(); String line; BufferedReader br = new BufferedReader(new InputStreamReader(response.getBody())); @@ -56,7 +51,7 @@ public class RestTemplateConfig { sb.append(line); } throw new RestClientException(sb.toString()); -// } + } }); return restTemplate; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java index 0bcec09..bade439 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/constants/Constant.java @@ -40,10 +40,14 @@ public class Constant { ENABLE; } + /** + * 数据库类型。 + */ public enum DbType { MYSQL, ORACLE, ELASTICSEARCH, + HOLO, ; } @@ -208,12 +212,15 @@ public class Constant { } /** - * 数据权限,数据类型 + * 数据权限,数据类型。 */ public enum DataType { JOB(1), CDCJOB(2); + /** + * 任务类型. + */ private Integer value; public Integer getValue() { diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java index a0a527e..7b4411e 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/controller/AdminController.java @@ -54,8 +54,7 @@ public class AdminController { List menus = menuService.selectByUser(user.getId()); map.put("treeMenu",menus); - map.put("flinkList", clusterService.getColonyInfo()); -// map.put("flinkList", flinkList); + map.put("flinkList", clusterService.getClusterInfo()); return "index"; } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/AdminUserMapper.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/AdminUserMapper.java index aaf7947..6b8d07f 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/AdminUserMapper.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/mapper/AdminUserMapper.java @@ -9,6 +9,13 @@ import java.util.List; public interface AdminUserMapper extends Mapper { + /** + * 用户查询. + * @param begin + * @param pageSize + * @param query + * @return + */ @Select({ "select", "u.id, u.name, u.email, u.creator, u.flag, u.logintime,c.name createuser", @@ -27,6 +34,11 @@ public interface AdminUserMapper extends Mapper { List selectByQuery(@Param("begin") int begin, @Param("pagesize") int pageSize, @Param("query") String query); + /** + * count for page. + * @param name + * @return + */ @Select({"SELECT COUNT(id) FROM users WHERE name LIKE #{name}"}) int selectCountByName(String name); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java index 8f386bc..a1a4777 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/restapi/JobApi.java @@ -60,7 +60,7 @@ public class JobApi { params.put("target-directory", savepointPath); log.info("rest req:{}", url); HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON_UTF8); + headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity entity = new HttpEntity<>(JSON.toJSONString(params), headers); JSONObject triggerObj = restTemplate.postForObject(url, entity, JSONObject.class); return triggerObj.getString("request-id"); @@ -88,7 +88,7 @@ public class JobApi { Map params = new HashMap(); params.put("targetDirectory", savepointPath); HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON_UTF8); + headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity entity = new HttpEntity<>(JSON.toJSONString(params), headers); JSONObject triggerObj = restTemplate.postForObject(url, entity, JSONObject.class); return triggerObj.getString("request-id"); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java index de31c09..4c3842e 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/ClusterService.java @@ -52,5 +52,9 @@ public interface ClusterService { */ List getAllCluster(); - List> getColonyInfo(); + /** + * 获取集群状态信息. + * @return + */ + List> getClusterInfo(); } diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcSqlServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcSqlServiceImpl.java index 7816ae3..15abc48 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcSqlServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/CdcSqlServiceImpl.java @@ -12,6 +12,7 @@ import cn.chongho.inf.flink.utils.StringUtils; import com.alibaba.fastjson.JSON; import cn.chongho.inf.flink.model.*; import cn.chongho.inf.flink.model.connector.ElasticSearchConfig; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @@ -26,6 +27,7 @@ import java.util.stream.Collectors; * @author ming */ @Service +@Slf4j public class CdcSqlServiceImpl { @Value("${sys.des.secretkey}") @@ -119,6 +121,8 @@ public class CdcSqlServiceImpl { break; case ELASTICSEARCH: break; + default: + log.warn("unsupported db type:{}", dbType); } return sqlBuilder.toString(); } @@ -175,6 +179,8 @@ public class CdcSqlServiceImpl { elasticSearchConfig.setIndex(table.getTableName()); sqlBuilder.append(elasticSearchConfig.doConfigToSql()); break; + case HOLO: + default: break; } @@ -240,6 +246,9 @@ public class CdcSqlServiceImpl { flinkDataType = elasticsearchDataType.getFlinkDataType(); tableSql.append(flinkDataType).append(","); break; + default: + log.warn("unsupported db type:{}", dbType); + break; } }); if(StringUtils.isEmpty(primaryColumn)){ @@ -283,9 +292,6 @@ public class CdcSqlServiceImpl { StringBuilder insertSql = new StringBuilder(); -// String targetColumn = columnAssociationList.stream().map(ColumnAssociation::getTargetColumnName).collect(Collectors.joining(",")); -// String sourceColumn = columnAssociationList.stream().map(ColumnAssociation::getSourceColumnName).collect(Collectors.joining(",")); - StringBuilder targetColumnBuilder = new StringBuilder(); StringBuilder sourceColumnBuilder = new StringBuilder(); for(ColumnAssociation columnAssociation : columnAssociationList){ diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java index 5bdaf03..2a3c830 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/ClusterServiceImpl.java @@ -98,7 +98,7 @@ public class ClusterServiceImpl implements ClusterService { } @Override - public List> getColonyInfo() { + public List> getClusterInfo() { long thisTime = System.currentTimeMillis(); long seconds = TimeUnit.MILLISECONDS.toSeconds(thisTime - lastTimestamp); if(flinkListCache != null && seconds < 60L){ diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java index 66af065..e941d33 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/JobServiceImpl.java @@ -126,10 +126,7 @@ public class JobServiceImpl implements JobService { public boolean update(Job job) { int ret = 0; job.setUpdateTime(new Date()); - /* Job jobByEntryClass = jobMapper.findJobByEntryClass(job.getEntryClass()); - if(!(jobByEntryClass == null || jobByEntryClass.getId().equals(job.getId()))){ - return false; - }*/ + job.setParallelism(job.getParallelism() == null ? 1 : job.getParallelism()); if (job.getId() != null) { Job dbJob = jobMapper.selectByPrimaryKey(job.getId()); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java index 7f88543..7953765 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java @@ -22,6 +22,7 @@ public class MenuServiceImpl implements MenuService { @Autowired private MenuMapper mapper; + @Override public List selectAll() { HashMap> map = new HashMap>(); List menus = mapper.selectByExample(null); @@ -86,6 +87,7 @@ public class MenuServiceImpl implements MenuService { return mapper.selectAuthorities(userId); } + @Override public boolean addMenu(Menu menu) { return mapper.insertSelective(menu) > 0; } @@ -96,7 +98,7 @@ public class MenuServiceImpl implements MenuService { } @Override - @Transactional + @Transactional(rollbackFor = Throwable.class) public boolean delete(int id) { mapper.deleteByPrimaryKey(id); return true; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java index a238d3e..37efeb9 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java @@ -51,7 +51,7 @@ public class RoleServiceImpl implements RoleService { return mapper.updateByPrimaryKeySelective(role) > 0; } - @Transactional + @Transactional(rollbackFor = Throwable.class) @Override public boolean delete(String id) { String[] ids = id.split(","); diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/utils/MD5Util.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/utils/MD5Util.java index a12c73c..9caa7c5 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/utils/MD5Util.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/utils/MD5Util.java @@ -14,7 +14,7 @@ public class MD5Util { } public final static String MD5(byte[] btInput) { - char hexDigits[] = { '0', '1', '2', '3', '4', + char [] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; try { @@ -22,7 +22,7 @@ public class MD5Util { mdInst.update(btInput); byte[] md = mdInst.digest(); int j = md.length; - char str[] = new char[j * 2]; + char [] str = new char[j * 2]; int k = 0; for (int i = 0; i < j; i++) { byte byte0 = md[i]; -- Gitee From 32ee0f49ca6792e65eb8f1cd610a1f54c7d3be3b Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 18 Jul 2022 11:38:20 +0800 Subject: [PATCH 08/10] opts. --- .../java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java | 2 +- .../java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java index 7953765..5537eef 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/MenuServiceImpl.java @@ -98,7 +98,7 @@ public class MenuServiceImpl implements MenuService { } @Override - @Transactional(rollbackFor = Throwable.class) + @Transactional(rollbackFor = RuntimeException.class) public boolean delete(int id) { mapper.deleteByPrimaryKey(id); return true; diff --git a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java index 37efeb9..873b057 100644 --- a/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java +++ b/flink-web-console/src/main/java/cn/chongho/inf/flink/service/impl/RoleServiceImpl.java @@ -51,7 +51,7 @@ public class RoleServiceImpl implements RoleService { return mapper.updateByPrimaryKeySelective(role) > 0; } - @Transactional(rollbackFor = Throwable.class) + @Transactional(rollbackFor = RuntimeException.class) @Override public boolean delete(String id) { String[] ids = id.split(","); -- Gitee From c925b4a9178878545ee74760b7ec49e349fc23c5 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 18 Jul 2022 12:05:35 +0800 Subject: [PATCH 09/10] opts. --- flink-web-console/src/main/resources/mapper/JobMapper.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-web-console/src/main/resources/mapper/JobMapper.xml b/flink-web-console/src/main/resources/mapper/JobMapper.xml index 8de628e..1c80235 100644 --- a/flink-web-console/src/main/resources/mapper/JobMapper.xml +++ b/flink-web-console/src/main/resources/mapper/JobMapper.xml @@ -64,6 +64,9 @@ AND status = #{job.status} + + AND job_type = #{job.jobType} + -- Gitee From b98f983a647bed39870a788c4dd0dba22cbfe657 Mon Sep 17 00:00:00 2001 From: "feihu.wang" Date: Mon, 18 Jul 2022 14:18:08 +0800 Subject: [PATCH 10/10] add column to job table. --- flink-web-console/flink-admin.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-web-console/flink-admin.sql b/flink-web-console/flink-admin.sql index 44ca527..3e55be6 100644 --- a/flink-web-console/flink-admin.sql +++ b/flink-web-console/flink-admin.sql @@ -174,6 +174,7 @@ CREATE TABLE `job` ( `target_db_id` bigint(20) NULL DEFAULT NULL COMMENT '落库数据库', `savepoint_path` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, `job_id` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, + `job_type` tinyint(4) NULL DEFAULT 1, `status` int(11) NULL DEFAULT 0, `enable_flag` tinyint(4) NULL DEFAULT 1, `flink_colony_id` bigint(20) NULL DEFAULT NULL COMMENT '任务运行集群id', -- Gitee