From 237a370f01b7307f7586d3a9ab4b237f9893c69b Mon Sep 17 00:00:00 2001
From: hanshuai03 <14693422+hanshuai03@user.noreply.gitee.com>
Date: Tue, 16 Dec 2025 17:27:35 +0800
Subject: [PATCH 1/2] =?UTF-8?q?msu=E4=B8=BB=E5=BA=93=E6=9C=8D=E5=8A=A1?=
=?UTF-8?q?=E5=8C=96=E6=94=B9=E9=80=A0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
caf-msu/caf-msu-database/pom.xml | 4 +
.../msu/client/discovery/DbDiscoveryImpl.java | 57 ++++++++--
.../caf/msu/client/health/DbHealthCheck.java | 107 ++++++++++++++----
.../msu/client/register/DbRegisterImpl.java | 88 +++++++++++---
.../edp/caf/msu/common/DBMsuServiceImpl.java | 46 +++++++-
.../GspAppServerRpcPersistence.java | 22 ++++
.../persistence/GspSuRpcPersistence.java | 23 ++++
7 files changed, 301 insertions(+), 46 deletions(-)
create mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
create mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
diff --git a/caf-msu/caf-msu-database/pom.xml b/caf-msu/caf-msu-database/pom.xml
index 787d3a3e6..c90ee1756 100644
--- a/caf-msu/caf-msu-database/pom.xml
+++ b/caf-msu/caf-msu-database/pom.xml
@@ -27,6 +27,10 @@
io.iec.edp
caf-data-jpa
+
+ io.iec.edp
+ caf-rpc-client
+
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
index 4b2cd6af9..f970beb52 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
@@ -16,6 +16,7 @@
package io.iec.edp.caf.msu.client.discovery;
+import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.datasource.CAFDataSourceSelector;
import io.iec.edp.caf.msu.api.client.ServiceDiscovery;
@@ -26,8 +27,11 @@ import io.iec.edp.caf.msu.client.health.DbHealthSetting;
import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
+import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
+import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
+import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -47,9 +51,16 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
private AppServerRepository appRepo;
private SuRepository suRepo;
- public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo) {
+ private GspAppServerRpcPersistence appServerRpcPersistence;
+
+ private GspSuRpcPersistence suRpcPersistence;
+
+ private final RpcClassHolder rpcClassHolder;
+
+ public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) {
this.appRepo = appRepo;
this.suRepo = suRepo;
+ this.rpcClassHolder = rpcClassHolder;
}
@Override
@@ -98,7 +109,12 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
CAFDataSourceSelector.selectMaster();
//读取GSPSuInstance
- List allSuEntity = this.suRepo.findAll();
+ List allSuEntity = new ArrayList<>();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ allSuEntity = getSuRpcPersistence().findAll();
+ } else {
+ allSuEntity = this.suRepo.findAll();
+ }
if (allSuEntity.size() > 0) {
infos = new ArrayList<>();
for (GspSuEntity entity : allSuEntity) {
@@ -131,9 +147,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
//设置主库
CAFDataSourceSelector.selectMaster();
- List entitys = this.appRepo.findBySuName(su.toLowerCase());
- if (entitys == null || entitys.size() == 0) {
- entitys = this.appRepo.findBySuName(su);
+ List entitys = new ArrayList<>();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ entitys = getAppServerRpcPersistence().findBySuName(su.toLowerCase());
+ if (entitys == null || entitys.size() == 0) {
+ entitys = getAppServerRpcPersistence().findBySuName(su);
+ }
+ } else {
+ entitys = this.appRepo.findBySuName(su.toLowerCase());
+ if (entitys == null || entitys.size() == 0) {
+ entitys = this.appRepo.findBySuName(su);
+ }
}
for (GspAppServerEntity entity : entitys) {
@@ -165,11 +189,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
if (apps == null || apps.size() == 0)
return null;
- Date currentTimestamp = this.appRepo.getCurrentTimestamp();
+ Date currentTimestamp = new Date();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
+ } else {
+ currentTimestamp = this.appRepo.getCurrentTimestamp();
+ }
int beatPeriod = SpringBeanUtils.getBean(DbHealthSetting.class).getBeatPeriod();
List latest = new ArrayList<>();
+ Date finalCurrentTimestamp = currentTimestamp;
apps.forEach(app -> {
- if ((currentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) {
+ if ((finalCurrentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) {
latest.add(app);
}
});
@@ -184,4 +214,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
return apps.get(index);
}
+ private GspAppServerRpcPersistence getAppServerRpcPersistence(){
+ if(this.appServerRpcPersistence==null){
+ this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
+ }
+ return this.appServerRpcPersistence;
+ }
+
+ private GspSuRpcPersistence getSuRpcPersistence(){
+ if(this.suRpcPersistence==null){
+ this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
+ }
+ return this.suRpcPersistence;
+ }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
index f4a26c586..c62d5b089 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
@@ -16,6 +16,7 @@
package io.iec.edp.caf.msu.client.health;
+import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.commons.transaction.JpaTransaction;
import io.iec.edp.caf.commons.transaction.TransactionPropagation;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
@@ -25,8 +26,11 @@ import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
+import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
+import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
+import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -64,7 +68,13 @@ public class DbHealthCheck {
//任务
private ScheduledExecutorService executorService;
- public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps) {
+ private GspAppServerRpcPersistence appServerRpcPersistence;
+
+ private GspSuRpcPersistence suRpcPersistence;
+
+ private final RpcClassHolder rpcClassHolder;
+
+ public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps, RpcClassHolder rpcClassHolder) {
this.appRepo = appRepo;
this.suRepo = suRepo;
this.instances = appEntities;
@@ -76,6 +86,7 @@ public class DbHealthCheck {
thread.setName("DbHealthCheck");
return thread;
});
+ this.rpcClassHolder = rpcClassHolder;
}
//检测
@@ -146,7 +157,12 @@ public class DbHealthCheck {
*/
private void doHealthyCheck() {
//数据库时间
- Date currentTimestamp = appRepo.getCurrentTimestamp();
+ Date currentTimestamp = new Date();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
+ } else {
+ currentTimestamp = appRepo.getCurrentTimestamp();
+ }
if (currentTimestamp == null) {
currentTimestamp = new Date();
} else {
@@ -155,7 +171,12 @@ public class DbHealthCheck {
//数据库时间转换为OffsetDateTime
OffsetDateTime beatTime = currentTimestamp.toInstant().atZone(ZoneOffset.systemDefault()).toOffsetDateTime();
//所有app实例
- List allInstances = appRepo.findAll();
+ List allInstances = new ArrayList<>();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ allInstances = getAppServerRpcPersistence().findAll();
+ } else {
+ allInstances = appRepo.findAll();
+ }
//遍历实例内部的实例
for (GspAppServerEntity inner : DbHealthCheck.this.instances) {
@@ -173,7 +194,11 @@ public class DbHealthCheck {
//存在
appInstance.setBeatTime(beatTime);
//appServerEntity.setHealthy(true); 暂不使用
- appRepo.save(appInstance);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getAppServerRpcPersistence().save(appInstance);
+ } else {
+ appRepo.save(appInstance);
+ }
if (log.isInfoEnabled()) log.info("[{}] beats", serviceName);
@@ -181,13 +206,21 @@ public class DbHealthCheck {
allInstances.remove(appInstance);
} else {
//清理su信息、实例信息
- suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getSuRpcPersistence().deleteByApp(serviceName);
+ } else {
+ suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理
+ }
//注册实例信息、su信息
//注册实例信息
inner.setBeatTime(beatTime);
//appEntity.setHealthy(true); 暂不使用
- appRepo.save(inner);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getAppServerRpcPersistence().save(inner);
+ } else {
+ appRepo.save(inner);
+ }
//注册su信息
List suInfos = DbHealthCheck.this.maps.get(serviceName);
if (suInfos != null && suInfos.size() > 0) {
@@ -197,15 +230,28 @@ public class DbHealthCheck {
for (ServiceUnitInfo su : suInfos) {
if (su != null) {
String suName = su.getName().toLowerCase();
- if (suRepo.countByAppAndSu(serviceName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(serviceName);
- suEntity.setSu(suName);
- suRepo.save(suEntity);
-
- if (log.isInfoEnabled())
- log.info("Success to register su [{}]", suName);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ if (getSuRpcPersistence().countByAppAndSu(serviceName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(serviceName);
+ suEntity.setSu(suName);
+ getSuRpcPersistence().save(suEntity);
+
+ if (log.isInfoEnabled())
+ log.info("Success to register su [{}]", suName);
+ }
+ } else {
+ if (suRepo.countByAppAndSu(serviceName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(serviceName);
+ suEntity.setSu(suName);
+ suRepo.save(suEntity);
+
+ if (log.isInfoEnabled())
+ log.info("Success to register su [{}]", suName);
+ }
}
}
}
@@ -228,13 +274,23 @@ public class DbHealthCheck {
OffsetDateTime dateTime = entity.getBeatTime();
if (dateTime == null) {
//脏数据
- suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
- appRepo.deleteByAppName(entity.getAppName());
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getSuRpcPersistence().deleteByApp(entity.getAppName());
+ getAppServerRpcPersistence().deleteByAppName(entity.getAppName());
+ } else {
+ suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
+ appRepo.deleteByAppName(entity.getAppName());
+ }
} else {
long beat = Date.from(dateTime.toInstant()).getTime() / 1000;
if (current - beat > setting.getRemovePeriod()) {
- suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
- appRepo.deleteByAppName(entity.getAppName());
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getSuRpcPersistence().deleteByApp(entity.getAppName());
+ getAppServerRpcPersistence().deleteByAppName(entity.getAppName());
+ } else {
+ suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
+ appRepo.deleteByAppName(entity.getAppName());
+ }
if (log.isInfoEnabled()) {
log.info("Remove [{}], URL [{}]", entity.getAppName(), entity.getAppUrl());
@@ -264,4 +320,17 @@ public class DbHealthCheck {
}
}
+ private GspAppServerRpcPersistence getAppServerRpcPersistence(){
+ if(this.appServerRpcPersistence==null){
+ this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
+ }
+ return this.appServerRpcPersistence;
+ }
+
+ private GspSuRpcPersistence getSuRpcPersistence(){
+ if(this.suRpcPersistence==null){
+ this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
+ }
+ return this.suRpcPersistence;
+ }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
index 86a843bf1..793828e88 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
@@ -28,11 +28,15 @@ import io.iec.edp.caf.msu.api.entity.MsuProperties;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.api.entity.ServiceUnitRegisterInfo;
import io.iec.edp.caf.msu.client.health.DbHealthCheck;
+import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
+import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
+import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
import io.iec.edp.caf.msu.common.utils.NetUtil;
+import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
@@ -61,11 +65,18 @@ public class DbRegisterImpl implements ServiceRegistry {
private DbHealthCheck dbHealthCheck;
- public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware) {
+ private GspAppServerRpcPersistence appServerRpcPersistence;
+
+ private GspSuRpcPersistence suRpcPersistence;
+
+ private final RpcClassHolder rpcClassHolder;
+
+ public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware, RpcClassHolder rpcClassHolder) {
this.appRepo = appRepo;
this.suRepo = suRepo;
this.suAware = suAware;
this.enableSSL = enableSSL();
+ this.rpcClassHolder = rpcClassHolder;
}
@Override
@@ -86,9 +97,17 @@ public class DbRegisterImpl implements ServiceRegistry {
//依次注销su、实例信息
List suNames = this.suAware.getEnabledServiceUnits();
for (String suName : suNames) {
- this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase());
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getSuRpcPersistence().deleteByAppAndSu(appName, suName.toLowerCase());
+ } else {
+ this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase());
+ }
+ }
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getAppServerRpcPersistence().deleteByAppName(appName);
+ } else {
+ this.appRepo.deleteByAppName(appName);
}
- this.appRepo.deleteByAppName(appName);
log.info("ServiceCenter(DataBase) Unregister service [{}]", appName);
}
@@ -125,7 +144,7 @@ public class DbRegisterImpl implements ServiceRegistry {
if (this.dbHealthCheck != null) {
this.dbHealthCheck.stop();
}
- this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap);
+ this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap, rpcClassHolder);
this.dbHealthCheck.start();
}
@@ -143,8 +162,13 @@ public class DbRegisterImpl implements ServiceRegistry {
log.info("ServiceCenter(DataBase) Start to register su of service [{}]", appName);
//清理su信息、实例信息
- this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理
- this.appRepo.deleteByAppName(appName);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getSuRpcPersistence().deleteByApp(appName);
+ getAppServerRpcPersistence().deleteByAppName(appName);
+ } else {
+ this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理
+ this.appRepo.deleteByAppName(appName);
+ }
//注册实例信息、su信息
//注册实例信息
@@ -158,7 +182,12 @@ public class DbRegisterImpl implements ServiceRegistry {
appEntity.setAppName(appName);
appEntity.setAppUrl(url);
//appEntity.setHealthy(true); 暂不使用
- Date currentTimestamp = this.appRepo.getCurrentTimestamp();
+ Date currentTimestamp = new Date();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
+ } else {
+ currentTimestamp = this.appRepo.getCurrentTimestamp();
+ }
OffsetDateTime dateTime = null;
if (currentTimestamp != null) {
currentTimestamp.setTime(currentTimestamp.getTime());
@@ -166,19 +195,34 @@ public class DbRegisterImpl implements ServiceRegistry {
}
appEntity.setBeatTime(dateTime);
appEntity.setBasePath(CafEnvironment.getBaseUrlPath());
- this.appRepo.save(appEntity);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ getAppServerRpcPersistence().save(appEntity);
+ } else {
+ this.appRepo.save(appEntity);
+ }
//注册su信息
List suInfos = registerInfo.getServiceUnitInfo();
for (ServiceUnitInfo su : suInfos) {
if (su != null) {
String suName = su.getName().toLowerCase();
- if (this.suRepo.countByAppAndSu(appName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(appName);
- suEntity.setSu(suName);
- this.suRepo.save(suEntity);
- log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
+ if (CafEnvironment.enablePrimaryDataService()) {
+ if (getSuRpcPersistence().countByAppAndSu(appName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(appName);
+ suEntity.setSu(suName);
+ getSuRpcPersistence().save(suEntity);
+ log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
+ }
+ } else {
+ if (this.suRepo.countByAppAndSu(appName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(appName);
+ suEntity.setSu(suName);
+ this.suRepo.save(suEntity);
+ log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
+ }
}
}
}
@@ -237,4 +281,18 @@ public class DbRegisterImpl implements ServiceRegistry {
}
return addressFormat;
}
+
+ private GspAppServerRpcPersistence getAppServerRpcPersistence(){
+ if(this.appServerRpcPersistence==null){
+ this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
+ }
+ return this.appServerRpcPersistence;
+ }
+
+ private GspSuRpcPersistence getSuRpcPersistence(){
+ if(this.suRpcPersistence==null){
+ this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
+ }
+ return this.suRpcPersistence;
+ }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
index 940c5e4a5..31dd96a0e 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
@@ -1,5 +1,6 @@
package io.iec.edp.caf.msu.common;
+import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.datasource.CAFDataSourceSelector;
import io.iec.edp.caf.msu.api.ServiceUnitAwareService;
import io.iec.edp.caf.msu.api.client.ServiceDiscovery;
@@ -8,8 +9,11 @@ import io.iec.edp.caf.msu.api.entity.GspSuInstance;
import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter;
import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
+import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
+import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
+import io.iec.edp.caf.rpc.client.RpcClassHolder;
import java.util.ArrayList;
import java.util.List;
@@ -25,11 +29,18 @@ public class DBMsuServiceImpl extends MsuServiceImpl {
private SuRepository suRepo;
- public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo) {
+ private GspAppServerRpcPersistence appServerRpcPersistence;
+
+ private GspSuRpcPersistence suRpcPersistence;
+
+ private final RpcClassHolder rpcClassHolder;
+
+ public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) {
super(suAware, serviceDiscovery);
this.appRepo = appRepo;
this.suRepo = suRepo;
+ this.rpcClassHolder = rpcClassHolder;
}
//根据Su名称获取appServer信息
@@ -37,9 +48,17 @@ public class DBMsuServiceImpl extends MsuServiceImpl {
try {
CAFDataSourceSelector.selectMaster();
- List apps = this.appRepo.findBySuName(suName.toLowerCase());
- if (apps == null || apps.size() == 0) {
- apps = this.appRepo.findBySuName(suName);
+ List apps = new ArrayList<>();
+ if (CafEnvironment.enablePrimaryDataService()) {
+ apps = getAppServerRpcPersistence().findBySuName(suName.toLowerCase());
+ if (apps == null || apps.size() == 0) {
+ apps = getAppServerRpcPersistence().findBySuName(suName);
+ }
+ } else {
+ apps = this.appRepo.findBySuName(suName.toLowerCase());
+ if (apps == null || apps.size() == 0) {
+ apps = this.appRepo.findBySuName(suName);
+ }
}
List instances = new ArrayList<>();
@@ -60,10 +79,27 @@ public class DBMsuServiceImpl extends MsuServiceImpl {
try {
CAFDataSourceSelector.selectMaster();
- return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get());
+ if (CafEnvironment.enablePrimaryDataService()) {
+ return GspSuConverter.convertToInstance(getSuRpcPersistence().findById(suName));
+ } else {
+ return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get());
+ }
} finally {
CAFDataSourceSelector.reset();
}
}
+ private GspAppServerRpcPersistence getAppServerRpcPersistence(){
+ if(this.appServerRpcPersistence==null){
+ this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
+ }
+ return this.appServerRpcPersistence;
+ }
+
+ private GspSuRpcPersistence getSuRpcPersistence(){
+ if(this.suRpcPersistence==null){
+ this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
+ }
+ return this.suRpcPersistence;
+ }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
new file mode 100644
index 000000000..7c7afcb5c
--- /dev/null
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
@@ -0,0 +1,22 @@
+package io.iec.edp.caf.msu.common.domain.persistence;
+
+import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
+import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle;
+import io.iec.edp.caf.rpc.api.annotation.RpcParam;
+
+import java.util.Date;
+import java.util.List;
+
+@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys")
+public interface GspAppServerRpcPersistence {
+
+ List findBySuName(@RpcParam(paramName = "suName") String suName);
+
+ void deleteByAppName(@RpcParam(paramName = "appName") String appName);
+
+ Date getCurrentTimestamp();
+
+ void save(@RpcParam(paramName = "appEntity") GspAppServerEntity appEntity);
+
+ List findAll();
+}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
new file mode 100644
index 000000000..0cb5c397c
--- /dev/null
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
@@ -0,0 +1,23 @@
+package io.iec.edp.caf.msu.common.domain.persistence;
+
+import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
+import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle;
+import io.iec.edp.caf.rpc.api.annotation.RpcParam;
+
+import java.util.List;
+
+@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys")
+public interface GspSuRpcPersistence {
+
+ GspSuEntity findById(@RpcParam(paramName = "suName") String suName);
+
+ void deleteByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName);
+
+ void deleteByApp(@RpcParam(paramName = "appName") String appName);
+
+ int countByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName);
+
+ void save(@RpcParam(paramName = "entity") GspSuEntity entity);
+
+ List findAll();
+}
--
Gitee
From 573f59daf3c7693793e3dec5769549fcb3f8acc8 Mon Sep 17 00:00:00 2001
From: hanshuai03 <14693422+hanshuai03@user.noreply.gitee.com>
Date: Mon, 22 Dec 2025 14:19:35 +0800
Subject: [PATCH 2/2] =?UTF-8?q?msu=E4=B8=BB=E5=BA=93=E6=9C=8D=E5=8A=A1?=
=?UTF-8?q?=E5=8C=96=E6=94=B9=E9=80=A0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
caf-msu/caf-msu-database/pom.xml | 4 -
.../msu/client/discovery/DbDiscoveryImpl.java | 62 +++-------
.../caf/msu/client/health/DbHealthCheck.java | 109 ++++--------------
.../msu/client/register/DbRegisterImpl.java | 91 +++------------
.../edp/caf/msu/common/DBMsuServiceImpl.java | 51 ++------
.../GspAppServerRpcPersistence.java | 22 ----
.../persistence/GspSuRpcPersistence.java | 23 ----
7 files changed, 64 insertions(+), 298 deletions(-)
delete mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
delete mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
diff --git a/caf-msu/caf-msu-database/pom.xml b/caf-msu/caf-msu-database/pom.xml
index c90ee1756..787d3a3e6 100644
--- a/caf-msu/caf-msu-database/pom.xml
+++ b/caf-msu/caf-msu-database/pom.xml
@@ -27,10 +27,6 @@
io.iec.edp
caf-data-jpa
-
- io.iec.edp
- caf-rpc-client
-
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
index f970beb52..8bfcea009 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java
@@ -27,11 +27,8 @@ import io.iec.edp.caf.msu.client.health.DbHealthSetting;
import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
-import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
-import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
-import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
@@ -51,16 +48,9 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
private AppServerRepository appRepo;
private SuRepository suRepo;
- private GspAppServerRpcPersistence appServerRpcPersistence;
-
- private GspSuRpcPersistence suRpcPersistence;
-
- private final RpcClassHolder rpcClassHolder;
-
- public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) {
+ public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo) {
this.appRepo = appRepo;
this.suRepo = suRepo;
- this.rpcClassHolder = rpcClassHolder;
}
@Override
@@ -102,6 +92,9 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
*/
@Override
public List getEnabledServiceUnitInfo() {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
List infos = null;
try {
@@ -109,12 +102,7 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
CAFDataSourceSelector.selectMaster();
//读取GSPSuInstance
- List allSuEntity = new ArrayList<>();
- if (CafEnvironment.enablePrimaryDataService()) {
- allSuEntity = getSuRpcPersistence().findAll();
- } else {
- allSuEntity = this.suRepo.findAll();
- }
+ List allSuEntity = this.suRepo.findAll();
if (allSuEntity.size() > 0) {
infos = new ArrayList<>();
for (GspSuEntity entity : allSuEntity) {
@@ -141,23 +129,18 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
* @return 所有实例
*/
private List findAppsBySu(String su) {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
List apps = new ArrayList<>();
try {
//设置主库
CAFDataSourceSelector.selectMaster();
- List entitys = new ArrayList<>();
- if (CafEnvironment.enablePrimaryDataService()) {
- entitys = getAppServerRpcPersistence().findBySuName(su.toLowerCase());
- if (entitys == null || entitys.size() == 0) {
- entitys = getAppServerRpcPersistence().findBySuName(su);
- }
- } else {
- entitys = this.appRepo.findBySuName(su.toLowerCase());
- if (entitys == null || entitys.size() == 0) {
- entitys = this.appRepo.findBySuName(su);
- }
+ List entitys = this.appRepo.findBySuName(su.toLowerCase());
+ if (entitys == null || entitys.size() == 0) {
+ entitys = this.appRepo.findBySuName(su);
}
for (GspAppServerEntity entity : entitys) {
@@ -189,17 +172,11 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
if (apps == null || apps.size() == 0)
return null;
- Date currentTimestamp = new Date();
- if (CafEnvironment.enablePrimaryDataService()) {
- currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
- } else {
- currentTimestamp = this.appRepo.getCurrentTimestamp();
- }
+ Date currentTimestamp = this.appRepo.getCurrentTimestamp();
int beatPeriod = SpringBeanUtils.getBean(DbHealthSetting.class).getBeatPeriod();
List latest = new ArrayList<>();
- Date finalCurrentTimestamp = currentTimestamp;
apps.forEach(app -> {
- if ((finalCurrentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) {
+ if ((currentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) {
latest.add(app);
}
});
@@ -214,17 +191,4 @@ public class DbDiscoveryImpl implements ServiceDiscovery {
return apps.get(index);
}
- private GspAppServerRpcPersistence getAppServerRpcPersistence(){
- if(this.appServerRpcPersistence==null){
- this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
- }
- return this.appServerRpcPersistence;
- }
-
- private GspSuRpcPersistence getSuRpcPersistence(){
- if(this.suRpcPersistence==null){
- this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
- }
- return this.suRpcPersistence;
- }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
index c62d5b089..7a4fbd968 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java
@@ -26,11 +26,8 @@ import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
-import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
-import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
-import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -68,13 +65,7 @@ public class DbHealthCheck {
//任务
private ScheduledExecutorService executorService;
- private GspAppServerRpcPersistence appServerRpcPersistence;
-
- private GspSuRpcPersistence suRpcPersistence;
-
- private final RpcClassHolder rpcClassHolder;
-
- public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps, RpcClassHolder rpcClassHolder) {
+ public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps) {
this.appRepo = appRepo;
this.suRepo = suRepo;
this.instances = appEntities;
@@ -86,7 +77,6 @@ public class DbHealthCheck {
thread.setName("DbHealthCheck");
return thread;
});
- this.rpcClassHolder = rpcClassHolder;
}
//检测
@@ -120,6 +110,9 @@ public class DbHealthCheck {
// 健康检测
private void doCheck() {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
try {
//设置主库
CAFDataSourceSelector.selectMaster();
@@ -157,12 +150,7 @@ public class DbHealthCheck {
*/
private void doHealthyCheck() {
//数据库时间
- Date currentTimestamp = new Date();
- if (CafEnvironment.enablePrimaryDataService()) {
- currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
- } else {
- currentTimestamp = appRepo.getCurrentTimestamp();
- }
+ Date currentTimestamp = appRepo.getCurrentTimestamp();
if (currentTimestamp == null) {
currentTimestamp = new Date();
} else {
@@ -171,12 +159,7 @@ public class DbHealthCheck {
//数据库时间转换为OffsetDateTime
OffsetDateTime beatTime = currentTimestamp.toInstant().atZone(ZoneOffset.systemDefault()).toOffsetDateTime();
//所有app实例
- List allInstances = new ArrayList<>();
- if (CafEnvironment.enablePrimaryDataService()) {
- allInstances = getAppServerRpcPersistence().findAll();
- } else {
- allInstances = appRepo.findAll();
- }
+ List allInstances = appRepo.findAll();
//遍历实例内部的实例
for (GspAppServerEntity inner : DbHealthCheck.this.instances) {
@@ -194,11 +177,7 @@ public class DbHealthCheck {
//存在
appInstance.setBeatTime(beatTime);
//appServerEntity.setHealthy(true); 暂不使用
- if (CafEnvironment.enablePrimaryDataService()) {
- getAppServerRpcPersistence().save(appInstance);
- } else {
- appRepo.save(appInstance);
- }
+ appRepo.save(appInstance);
if (log.isInfoEnabled()) log.info("[{}] beats", serviceName);
@@ -206,21 +185,13 @@ public class DbHealthCheck {
allInstances.remove(appInstance);
} else {
//清理su信息、实例信息
- if (CafEnvironment.enablePrimaryDataService()) {
- getSuRpcPersistence().deleteByApp(serviceName);
- } else {
- suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理
- }
+ suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理
//注册实例信息、su信息
//注册实例信息
inner.setBeatTime(beatTime);
//appEntity.setHealthy(true); 暂不使用
- if (CafEnvironment.enablePrimaryDataService()) {
- getAppServerRpcPersistence().save(inner);
- } else {
- appRepo.save(inner);
- }
+ appRepo.save(inner);
//注册su信息
List suInfos = DbHealthCheck.this.maps.get(serviceName);
if (suInfos != null && suInfos.size() > 0) {
@@ -230,28 +201,15 @@ public class DbHealthCheck {
for (ServiceUnitInfo su : suInfos) {
if (su != null) {
String suName = su.getName().toLowerCase();
- if (CafEnvironment.enablePrimaryDataService()) {
- if (getSuRpcPersistence().countByAppAndSu(serviceName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(serviceName);
- suEntity.setSu(suName);
- getSuRpcPersistence().save(suEntity);
-
- if (log.isInfoEnabled())
- log.info("Success to register su [{}]", suName);
- }
- } else {
- if (suRepo.countByAppAndSu(serviceName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(serviceName);
- suEntity.setSu(suName);
- suRepo.save(suEntity);
-
- if (log.isInfoEnabled())
- log.info("Success to register su [{}]", suName);
- }
+ if (suRepo.countByAppAndSu(serviceName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(serviceName);
+ suEntity.setSu(suName);
+ suRepo.save(suEntity);
+
+ if (log.isInfoEnabled())
+ log.info("Success to register su [{}]", suName);
}
}
}
@@ -274,23 +232,13 @@ public class DbHealthCheck {
OffsetDateTime dateTime = entity.getBeatTime();
if (dateTime == null) {
//脏数据
- if (CafEnvironment.enablePrimaryDataService()) {
- getSuRpcPersistence().deleteByApp(entity.getAppName());
- getAppServerRpcPersistence().deleteByAppName(entity.getAppName());
- } else {
- suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
- appRepo.deleteByAppName(entity.getAppName());
- }
+ suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
+ appRepo.deleteByAppName(entity.getAppName());
} else {
long beat = Date.from(dateTime.toInstant()).getTime() / 1000;
if (current - beat > setting.getRemovePeriod()) {
- if (CafEnvironment.enablePrimaryDataService()) {
- getSuRpcPersistence().deleteByApp(entity.getAppName());
- getAppServerRpcPersistence().deleteByAppName(entity.getAppName());
- } else {
- suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
- appRepo.deleteByAppName(entity.getAppName());
- }
+ suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理
+ appRepo.deleteByAppName(entity.getAppName());
if (log.isInfoEnabled()) {
log.info("Remove [{}], URL [{}]", entity.getAppName(), entity.getAppUrl());
@@ -320,17 +268,4 @@ public class DbHealthCheck {
}
}
- private GspAppServerRpcPersistence getAppServerRpcPersistence(){
- if(this.appServerRpcPersistence==null){
- this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
- }
- return this.appServerRpcPersistence;
- }
-
- private GspSuRpcPersistence getSuRpcPersistence(){
- if(this.suRpcPersistence==null){
- this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
- }
- return this.suRpcPersistence;
- }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
index 793828e88..238c789fd 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java
@@ -28,15 +28,11 @@ import io.iec.edp.caf.msu.api.entity.MsuProperties;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.api.entity.ServiceUnitRegisterInfo;
import io.iec.edp.caf.msu.client.health.DbHealthCheck;
-import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
-import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
-import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
import io.iec.edp.caf.msu.common.utils.NetUtil;
-import io.iec.edp.caf.rpc.client.RpcClassHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
@@ -65,18 +61,11 @@ public class DbRegisterImpl implements ServiceRegistry {
private DbHealthCheck dbHealthCheck;
- private GspAppServerRpcPersistence appServerRpcPersistence;
-
- private GspSuRpcPersistence suRpcPersistence;
-
- private final RpcClassHolder rpcClassHolder;
-
- public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware, RpcClassHolder rpcClassHolder) {
+ public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware) {
this.appRepo = appRepo;
this.suRepo = suRepo;
this.suAware = suAware;
this.enableSSL = enableSSL();
- this.rpcClassHolder = rpcClassHolder;
}
@Override
@@ -97,17 +86,9 @@ public class DbRegisterImpl implements ServiceRegistry {
//依次注销su、实例信息
List suNames = this.suAware.getEnabledServiceUnits();
for (String suName : suNames) {
- if (CafEnvironment.enablePrimaryDataService()) {
- getSuRpcPersistence().deleteByAppAndSu(appName, suName.toLowerCase());
- } else {
- this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase());
- }
- }
- if (CafEnvironment.enablePrimaryDataService()) {
- getAppServerRpcPersistence().deleteByAppName(appName);
- } else {
- this.appRepo.deleteByAppName(appName);
+ this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase());
}
+ this.appRepo.deleteByAppName(appName);
log.info("ServiceCenter(DataBase) Unregister service [{}]", appName);
}
@@ -118,6 +99,9 @@ public class DbRegisterImpl implements ServiceRegistry {
@Override
public Boolean register(ServiceUnitRegisterInfo registerInfo, String ip, Integer port) {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
try {
//设置主库
CAFDataSourceSelector.selectMaster();
@@ -144,7 +128,7 @@ public class DbRegisterImpl implements ServiceRegistry {
if (this.dbHealthCheck != null) {
this.dbHealthCheck.stop();
}
- this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap, rpcClassHolder);
+ this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap);
this.dbHealthCheck.start();
}
@@ -162,13 +146,8 @@ public class DbRegisterImpl implements ServiceRegistry {
log.info("ServiceCenter(DataBase) Start to register su of service [{}]", appName);
//清理su信息、实例信息
- if (CafEnvironment.enablePrimaryDataService()) {
- getSuRpcPersistence().deleteByApp(appName);
- getAppServerRpcPersistence().deleteByAppName(appName);
- } else {
- this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理
- this.appRepo.deleteByAppName(appName);
- }
+ this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理
+ this.appRepo.deleteByAppName(appName);
//注册实例信息、su信息
//注册实例信息
@@ -182,12 +161,7 @@ public class DbRegisterImpl implements ServiceRegistry {
appEntity.setAppName(appName);
appEntity.setAppUrl(url);
//appEntity.setHealthy(true); 暂不使用
- Date currentTimestamp = new Date();
- if (CafEnvironment.enablePrimaryDataService()) {
- currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp();
- } else {
- currentTimestamp = this.appRepo.getCurrentTimestamp();
- }
+ Date currentTimestamp = this.appRepo.getCurrentTimestamp();
OffsetDateTime dateTime = null;
if (currentTimestamp != null) {
currentTimestamp.setTime(currentTimestamp.getTime());
@@ -195,34 +169,19 @@ public class DbRegisterImpl implements ServiceRegistry {
}
appEntity.setBeatTime(dateTime);
appEntity.setBasePath(CafEnvironment.getBaseUrlPath());
- if (CafEnvironment.enablePrimaryDataService()) {
- getAppServerRpcPersistence().save(appEntity);
- } else {
- this.appRepo.save(appEntity);
- }
+ this.appRepo.save(appEntity);
//注册su信息
List suInfos = registerInfo.getServiceUnitInfo();
for (ServiceUnitInfo su : suInfos) {
if (su != null) {
String suName = su.getName().toLowerCase();
- if (CafEnvironment.enablePrimaryDataService()) {
- if (getSuRpcPersistence().countByAppAndSu(appName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(appName);
- suEntity.setSu(suName);
- getSuRpcPersistence().save(suEntity);
- log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
- }
- } else {
- if (this.suRepo.countByAppAndSu(appName, suName) == 0) {
- GspSuEntity suEntity = new GspSuEntity();
- suEntity.setId(UUID.randomUUID().toString());
- suEntity.setApp(appName);
- suEntity.setSu(suName);
- this.suRepo.save(suEntity);
- log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
- }
+ if (this.suRepo.countByAppAndSu(appName, suName) == 0) {
+ GspSuEntity suEntity = new GspSuEntity();
+ suEntity.setId(UUID.randomUUID().toString());
+ suEntity.setApp(appName);
+ suEntity.setSu(suName);
+ this.suRepo.save(suEntity);
+ log.info("ServiceCenter(DataBase) Success to register su [{}]", suName);
}
}
}
@@ -281,18 +240,4 @@ public class DbRegisterImpl implements ServiceRegistry {
}
return addressFormat;
}
-
- private GspAppServerRpcPersistence getAppServerRpcPersistence(){
- if(this.appServerRpcPersistence==null){
- this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
- }
- return this.appServerRpcPersistence;
- }
-
- private GspSuRpcPersistence getSuRpcPersistence(){
- if(this.suRpcPersistence==null){
- this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
- }
- return this.suRpcPersistence;
- }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
index 31dd96a0e..59dd3c29f 100644
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
+++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java
@@ -9,11 +9,8 @@ import io.iec.edp.caf.msu.api.entity.GspSuInstance;
import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter;
import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
-import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence;
-import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
-import io.iec.edp.caf.rpc.client.RpcClassHolder;
import java.util.ArrayList;
import java.util.List;
@@ -29,36 +26,24 @@ public class DBMsuServiceImpl extends MsuServiceImpl {
private SuRepository suRepo;
- private GspAppServerRpcPersistence appServerRpcPersistence;
-
- private GspSuRpcPersistence suRpcPersistence;
-
- private final RpcClassHolder rpcClassHolder;
-
- public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) {
+ public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo) {
super(suAware, serviceDiscovery);
this.appRepo = appRepo;
this.suRepo = suRepo;
- this.rpcClassHolder = rpcClassHolder;
}
//根据Su名称获取appServer信息
public List getGspAppServerInfoBySu(String suName) {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
try {
CAFDataSourceSelector.selectMaster();
- List apps = new ArrayList<>();
- if (CafEnvironment.enablePrimaryDataService()) {
- apps = getAppServerRpcPersistence().findBySuName(suName.toLowerCase());
- if (apps == null || apps.size() == 0) {
- apps = getAppServerRpcPersistence().findBySuName(suName);
- }
- } else {
- apps = this.appRepo.findBySuName(suName.toLowerCase());
- if (apps == null || apps.size() == 0) {
- apps = this.appRepo.findBySuName(suName);
- }
+ List apps = this.appRepo.findBySuName(suName.toLowerCase());
+ if (apps == null || apps.size() == 0) {
+ apps = this.appRepo.findBySuName(suName);
}
List instances = new ArrayList<>();
@@ -76,30 +61,16 @@ public class DBMsuServiceImpl extends MsuServiceImpl {
//获取一个Su的信息
public GspSuInstance getGspSuInfo(String suName) {
+ if (CafEnvironment.enablePrimaryDataService()) {
+ throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives.");
+ }
try {
CAFDataSourceSelector.selectMaster();
- if (CafEnvironment.enablePrimaryDataService()) {
- return GspSuConverter.convertToInstance(getSuRpcPersistence().findById(suName));
- } else {
- return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get());
- }
+ return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get());
} finally {
CAFDataSourceSelector.reset();
}
}
- private GspAppServerRpcPersistence getAppServerRpcPersistence(){
- if(this.appServerRpcPersistence==null){
- this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class);
- }
- return this.appServerRpcPersistence;
- }
-
- private GspSuRpcPersistence getSuRpcPersistence(){
- if(this.suRpcPersistence==null){
- this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class);
- }
- return this.suRpcPersistence;
- }
}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
deleted file mode 100644
index 7c7afcb5c..000000000
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package io.iec.edp.caf.msu.common.domain.persistence;
-
-import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
-import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle;
-import io.iec.edp.caf.rpc.api.annotation.RpcParam;
-
-import java.util.Date;
-import java.util.List;
-
-@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys")
-public interface GspAppServerRpcPersistence {
-
- List findBySuName(@RpcParam(paramName = "suName") String suName);
-
- void deleteByAppName(@RpcParam(paramName = "appName") String appName);
-
- Date getCurrentTimestamp();
-
- void save(@RpcParam(paramName = "appEntity") GspAppServerEntity appEntity);
-
- List findAll();
-}
diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
deleted file mode 100644
index 0cb5c397c..000000000
--- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package io.iec.edp.caf.msu.common.domain.persistence;
-
-import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
-import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle;
-import io.iec.edp.caf.rpc.api.annotation.RpcParam;
-
-import java.util.List;
-
-@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys")
-public interface GspSuRpcPersistence {
-
- GspSuEntity findById(@RpcParam(paramName = "suName") String suName);
-
- void deleteByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName);
-
- void deleteByApp(@RpcParam(paramName = "appName") String appName);
-
- int countByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName);
-
- void save(@RpcParam(paramName = "entity") GspSuEntity entity);
-
- List findAll();
-}
--
Gitee