From 237a370f01b7307f7586d3a9ab4b237f9893c69b Mon Sep 17 00:00:00 2001 From: hanshuai03 <14693422+hanshuai03@user.noreply.gitee.com> Date: Tue, 16 Dec 2025 17:27:35 +0800 Subject: [PATCH 1/2] =?UTF-8?q?msu=E4=B8=BB=E5=BA=93=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8C=96=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- caf-msu/caf-msu-database/pom.xml | 4 + .../msu/client/discovery/DbDiscoveryImpl.java | 57 ++++++++-- .../caf/msu/client/health/DbHealthCheck.java | 107 ++++++++++++++---- .../msu/client/register/DbRegisterImpl.java | 88 +++++++++++--- .../edp/caf/msu/common/DBMsuServiceImpl.java | 46 +++++++- .../GspAppServerRpcPersistence.java | 22 ++++ .../persistence/GspSuRpcPersistence.java | 23 ++++ 7 files changed, 301 insertions(+), 46 deletions(-) create mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java create mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java diff --git a/caf-msu/caf-msu-database/pom.xml b/caf-msu/caf-msu-database/pom.xml index 787d3a3e6..c90ee1756 100644 --- a/caf-msu/caf-msu-database/pom.xml +++ b/caf-msu/caf-msu-database/pom.xml @@ -27,6 +27,10 @@ io.iec.edp caf-data-jpa + + io.iec.edp + caf-rpc-client + diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java index 4b2cd6af9..f970beb52 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java @@ -16,6 +16,7 @@ package io.iec.edp.caf.msu.client.discovery; +import io.iec.edp.caf.commons.runtime.CafEnvironment; import io.iec.edp.caf.commons.utils.SpringBeanUtils; import io.iec.edp.caf.datasource.CAFDataSourceSelector; import io.iec.edp.caf.msu.api.client.ServiceDiscovery; @@ -26,8 +27,11 @@ import io.iec.edp.caf.msu.client.health.DbHealthSetting; import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; +import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; +import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; +import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; @@ -47,9 +51,16 @@ public class DbDiscoveryImpl implements ServiceDiscovery { private AppServerRepository appRepo; private SuRepository suRepo; - public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo) { + private GspAppServerRpcPersistence appServerRpcPersistence; + + private GspSuRpcPersistence suRpcPersistence; + + private final RpcClassHolder rpcClassHolder; + + public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) { this.appRepo = appRepo; this.suRepo = suRepo; + this.rpcClassHolder = rpcClassHolder; } @Override @@ -98,7 +109,12 @@ public class DbDiscoveryImpl implements ServiceDiscovery { CAFDataSourceSelector.selectMaster(); //读取GSPSuInstance - List allSuEntity = this.suRepo.findAll(); + List allSuEntity = new ArrayList<>(); + if (CafEnvironment.enablePrimaryDataService()) { + allSuEntity = getSuRpcPersistence().findAll(); + } else { + allSuEntity = this.suRepo.findAll(); + } if (allSuEntity.size() > 0) { infos = new ArrayList<>(); for (GspSuEntity entity : allSuEntity) { @@ -131,9 +147,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery { //设置主库 CAFDataSourceSelector.selectMaster(); - List entitys = this.appRepo.findBySuName(su.toLowerCase()); - if (entitys == null || entitys.size() == 0) { - entitys = this.appRepo.findBySuName(su); + List entitys = new ArrayList<>(); + if (CafEnvironment.enablePrimaryDataService()) { + entitys = getAppServerRpcPersistence().findBySuName(su.toLowerCase()); + if (entitys == null || entitys.size() == 0) { + entitys = getAppServerRpcPersistence().findBySuName(su); + } + } else { + entitys = this.appRepo.findBySuName(su.toLowerCase()); + if (entitys == null || entitys.size() == 0) { + entitys = this.appRepo.findBySuName(su); + } } for (GspAppServerEntity entity : entitys) { @@ -165,11 +189,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery { if (apps == null || apps.size() == 0) return null; - Date currentTimestamp = this.appRepo.getCurrentTimestamp(); + Date currentTimestamp = new Date(); + if (CafEnvironment.enablePrimaryDataService()) { + currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); + } else { + currentTimestamp = this.appRepo.getCurrentTimestamp(); + } int beatPeriod = SpringBeanUtils.getBean(DbHealthSetting.class).getBeatPeriod(); List latest = new ArrayList<>(); + Date finalCurrentTimestamp = currentTimestamp; apps.forEach(app -> { - if ((currentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) { + if ((finalCurrentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) { latest.add(app); } }); @@ -184,4 +214,17 @@ public class DbDiscoveryImpl implements ServiceDiscovery { return apps.get(index); } + private GspAppServerRpcPersistence getAppServerRpcPersistence(){ + if(this.appServerRpcPersistence==null){ + this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); + } + return this.appServerRpcPersistence; + } + + private GspSuRpcPersistence getSuRpcPersistence(){ + if(this.suRpcPersistence==null){ + this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); + } + return this.suRpcPersistence; + } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java index f4a26c586..c62d5b089 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java @@ -16,6 +16,7 @@ package io.iec.edp.caf.msu.client.health; +import io.iec.edp.caf.commons.runtime.CafEnvironment; import io.iec.edp.caf.commons.transaction.JpaTransaction; import io.iec.edp.caf.commons.transaction.TransactionPropagation; import io.iec.edp.caf.commons.utils.SpringBeanUtils; @@ -25,8 +26,11 @@ import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory; import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; +import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; +import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; +import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -64,7 +68,13 @@ public class DbHealthCheck { //任务 private ScheduledExecutorService executorService; - public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps) { + private GspAppServerRpcPersistence appServerRpcPersistence; + + private GspSuRpcPersistence suRpcPersistence; + + private final RpcClassHolder rpcClassHolder; + + public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps, RpcClassHolder rpcClassHolder) { this.appRepo = appRepo; this.suRepo = suRepo; this.instances = appEntities; @@ -76,6 +86,7 @@ public class DbHealthCheck { thread.setName("DbHealthCheck"); return thread; }); + this.rpcClassHolder = rpcClassHolder; } //检测 @@ -146,7 +157,12 @@ public class DbHealthCheck { */ private void doHealthyCheck() { //数据库时间 - Date currentTimestamp = appRepo.getCurrentTimestamp(); + Date currentTimestamp = new Date(); + if (CafEnvironment.enablePrimaryDataService()) { + currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); + } else { + currentTimestamp = appRepo.getCurrentTimestamp(); + } if (currentTimestamp == null) { currentTimestamp = new Date(); } else { @@ -155,7 +171,12 @@ public class DbHealthCheck { //数据库时间转换为OffsetDateTime OffsetDateTime beatTime = currentTimestamp.toInstant().atZone(ZoneOffset.systemDefault()).toOffsetDateTime(); //所有app实例 - List allInstances = appRepo.findAll(); + List allInstances = new ArrayList<>(); + if (CafEnvironment.enablePrimaryDataService()) { + allInstances = getAppServerRpcPersistence().findAll(); + } else { + allInstances = appRepo.findAll(); + } //遍历实例内部的实例 for (GspAppServerEntity inner : DbHealthCheck.this.instances) { @@ -173,7 +194,11 @@ public class DbHealthCheck { //存在 appInstance.setBeatTime(beatTime); //appServerEntity.setHealthy(true); 暂不使用 - appRepo.save(appInstance); + if (CafEnvironment.enablePrimaryDataService()) { + getAppServerRpcPersistence().save(appInstance); + } else { + appRepo.save(appInstance); + } if (log.isInfoEnabled()) log.info("[{}] beats", serviceName); @@ -181,13 +206,21 @@ public class DbHealthCheck { allInstances.remove(appInstance); } else { //清理su信息、实例信息 - suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理 + if (CafEnvironment.enablePrimaryDataService()) { + getSuRpcPersistence().deleteByApp(serviceName); + } else { + suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理 + } //注册实例信息、su信息 //注册实例信息 inner.setBeatTime(beatTime); //appEntity.setHealthy(true); 暂不使用 - appRepo.save(inner); + if (CafEnvironment.enablePrimaryDataService()) { + getAppServerRpcPersistence().save(inner); + } else { + appRepo.save(inner); + } //注册su信息 List suInfos = DbHealthCheck.this.maps.get(serviceName); if (suInfos != null && suInfos.size() > 0) { @@ -197,15 +230,28 @@ public class DbHealthCheck { for (ServiceUnitInfo su : suInfos) { if (su != null) { String suName = su.getName().toLowerCase(); - if (suRepo.countByAppAndSu(serviceName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(serviceName); - suEntity.setSu(suName); - suRepo.save(suEntity); - - if (log.isInfoEnabled()) - log.info("Success to register su [{}]", suName); + if (CafEnvironment.enablePrimaryDataService()) { + if (getSuRpcPersistence().countByAppAndSu(serviceName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(serviceName); + suEntity.setSu(suName); + getSuRpcPersistence().save(suEntity); + + if (log.isInfoEnabled()) + log.info("Success to register su [{}]", suName); + } + } else { + if (suRepo.countByAppAndSu(serviceName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(serviceName); + suEntity.setSu(suName); + suRepo.save(suEntity); + + if (log.isInfoEnabled()) + log.info("Success to register su [{}]", suName); + } } } } @@ -228,13 +274,23 @@ public class DbHealthCheck { OffsetDateTime dateTime = entity.getBeatTime(); if (dateTime == null) { //脏数据 - suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 - appRepo.deleteByAppName(entity.getAppName()); + if (CafEnvironment.enablePrimaryDataService()) { + getSuRpcPersistence().deleteByApp(entity.getAppName()); + getAppServerRpcPersistence().deleteByAppName(entity.getAppName()); + } else { + suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 + appRepo.deleteByAppName(entity.getAppName()); + } } else { long beat = Date.from(dateTime.toInstant()).getTime() / 1000; if (current - beat > setting.getRemovePeriod()) { - suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 - appRepo.deleteByAppName(entity.getAppName()); + if (CafEnvironment.enablePrimaryDataService()) { + getSuRpcPersistence().deleteByApp(entity.getAppName()); + getAppServerRpcPersistence().deleteByAppName(entity.getAppName()); + } else { + suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 + appRepo.deleteByAppName(entity.getAppName()); + } if (log.isInfoEnabled()) { log.info("Remove [{}], URL [{}]", entity.getAppName(), entity.getAppUrl()); @@ -264,4 +320,17 @@ public class DbHealthCheck { } } + private GspAppServerRpcPersistence getAppServerRpcPersistence(){ + if(this.appServerRpcPersistence==null){ + this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); + } + return this.appServerRpcPersistence; + } + + private GspSuRpcPersistence getSuRpcPersistence(){ + if(this.suRpcPersistence==null){ + this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); + } + return this.suRpcPersistence; + } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java index 86a843bf1..793828e88 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java @@ -28,11 +28,15 @@ import io.iec.edp.caf.msu.api.entity.MsuProperties; import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo; import io.iec.edp.caf.msu.api.entity.ServiceUnitRegisterInfo; import io.iec.edp.caf.msu.client.health.DbHealthCheck; +import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; +import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; +import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; import io.iec.edp.caf.msu.common.utils.NetUtil; +import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import org.springframework.core.env.Environment; @@ -61,11 +65,18 @@ public class DbRegisterImpl implements ServiceRegistry { private DbHealthCheck dbHealthCheck; - public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware) { + private GspAppServerRpcPersistence appServerRpcPersistence; + + private GspSuRpcPersistence suRpcPersistence; + + private final RpcClassHolder rpcClassHolder; + + public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware, RpcClassHolder rpcClassHolder) { this.appRepo = appRepo; this.suRepo = suRepo; this.suAware = suAware; this.enableSSL = enableSSL(); + this.rpcClassHolder = rpcClassHolder; } @Override @@ -86,9 +97,17 @@ public class DbRegisterImpl implements ServiceRegistry { //依次注销su、实例信息 List suNames = this.suAware.getEnabledServiceUnits(); for (String suName : suNames) { - this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase()); + if (CafEnvironment.enablePrimaryDataService()) { + getSuRpcPersistence().deleteByAppAndSu(appName, suName.toLowerCase()); + } else { + this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase()); + } + } + if (CafEnvironment.enablePrimaryDataService()) { + getAppServerRpcPersistence().deleteByAppName(appName); + } else { + this.appRepo.deleteByAppName(appName); } - this.appRepo.deleteByAppName(appName); log.info("ServiceCenter(DataBase) Unregister service [{}]", appName); } @@ -125,7 +144,7 @@ public class DbRegisterImpl implements ServiceRegistry { if (this.dbHealthCheck != null) { this.dbHealthCheck.stop(); } - this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap); + this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap, rpcClassHolder); this.dbHealthCheck.start(); } @@ -143,8 +162,13 @@ public class DbRegisterImpl implements ServiceRegistry { log.info("ServiceCenter(DataBase) Start to register su of service [{}]", appName); //清理su信息、实例信息 - this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理 - this.appRepo.deleteByAppName(appName); + if (CafEnvironment.enablePrimaryDataService()) { + getSuRpcPersistence().deleteByApp(appName); + getAppServerRpcPersistence().deleteByAppName(appName); + } else { + this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理 + this.appRepo.deleteByAppName(appName); + } //注册实例信息、su信息 //注册实例信息 @@ -158,7 +182,12 @@ public class DbRegisterImpl implements ServiceRegistry { appEntity.setAppName(appName); appEntity.setAppUrl(url); //appEntity.setHealthy(true); 暂不使用 - Date currentTimestamp = this.appRepo.getCurrentTimestamp(); + Date currentTimestamp = new Date(); + if (CafEnvironment.enablePrimaryDataService()) { + currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); + } else { + currentTimestamp = this.appRepo.getCurrentTimestamp(); + } OffsetDateTime dateTime = null; if (currentTimestamp != null) { currentTimestamp.setTime(currentTimestamp.getTime()); @@ -166,19 +195,34 @@ public class DbRegisterImpl implements ServiceRegistry { } appEntity.setBeatTime(dateTime); appEntity.setBasePath(CafEnvironment.getBaseUrlPath()); - this.appRepo.save(appEntity); + if (CafEnvironment.enablePrimaryDataService()) { + getAppServerRpcPersistence().save(appEntity); + } else { + this.appRepo.save(appEntity); + } //注册su信息 List suInfos = registerInfo.getServiceUnitInfo(); for (ServiceUnitInfo su : suInfos) { if (su != null) { String suName = su.getName().toLowerCase(); - if (this.suRepo.countByAppAndSu(appName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(appName); - suEntity.setSu(suName); - this.suRepo.save(suEntity); - log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); + if (CafEnvironment.enablePrimaryDataService()) { + if (getSuRpcPersistence().countByAppAndSu(appName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(appName); + suEntity.setSu(suName); + getSuRpcPersistence().save(suEntity); + log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); + } + } else { + if (this.suRepo.countByAppAndSu(appName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(appName); + suEntity.setSu(suName); + this.suRepo.save(suEntity); + log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); + } } } } @@ -237,4 +281,18 @@ public class DbRegisterImpl implements ServiceRegistry { } return addressFormat; } + + private GspAppServerRpcPersistence getAppServerRpcPersistence(){ + if(this.appServerRpcPersistence==null){ + this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); + } + return this.appServerRpcPersistence; + } + + private GspSuRpcPersistence getSuRpcPersistence(){ + if(this.suRpcPersistence==null){ + this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); + } + return this.suRpcPersistence; + } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java index 940c5e4a5..31dd96a0e 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java @@ -1,5 +1,6 @@ package io.iec.edp.caf.msu.common; +import io.iec.edp.caf.commons.runtime.CafEnvironment; import io.iec.edp.caf.datasource.CAFDataSourceSelector; import io.iec.edp.caf.msu.api.ServiceUnitAwareService; import io.iec.edp.caf.msu.api.client.ServiceDiscovery; @@ -8,8 +9,11 @@ import io.iec.edp.caf.msu.api.entity.GspSuInstance; import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter; import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; +import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; +import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; +import io.iec.edp.caf.rpc.client.RpcClassHolder; import java.util.ArrayList; import java.util.List; @@ -25,11 +29,18 @@ public class DBMsuServiceImpl extends MsuServiceImpl { private SuRepository suRepo; - public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo) { + private GspAppServerRpcPersistence appServerRpcPersistence; + + private GspSuRpcPersistence suRpcPersistence; + + private final RpcClassHolder rpcClassHolder; + + public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) { super(suAware, serviceDiscovery); this.appRepo = appRepo; this.suRepo = suRepo; + this.rpcClassHolder = rpcClassHolder; } //根据Su名称获取appServer信息 @@ -37,9 +48,17 @@ public class DBMsuServiceImpl extends MsuServiceImpl { try { CAFDataSourceSelector.selectMaster(); - List apps = this.appRepo.findBySuName(suName.toLowerCase()); - if (apps == null || apps.size() == 0) { - apps = this.appRepo.findBySuName(suName); + List apps = new ArrayList<>(); + if (CafEnvironment.enablePrimaryDataService()) { + apps = getAppServerRpcPersistence().findBySuName(suName.toLowerCase()); + if (apps == null || apps.size() == 0) { + apps = getAppServerRpcPersistence().findBySuName(suName); + } + } else { + apps = this.appRepo.findBySuName(suName.toLowerCase()); + if (apps == null || apps.size() == 0) { + apps = this.appRepo.findBySuName(suName); + } } List instances = new ArrayList<>(); @@ -60,10 +79,27 @@ public class DBMsuServiceImpl extends MsuServiceImpl { try { CAFDataSourceSelector.selectMaster(); - return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get()); + if (CafEnvironment.enablePrimaryDataService()) { + return GspSuConverter.convertToInstance(getSuRpcPersistence().findById(suName)); + } else { + return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get()); + } } finally { CAFDataSourceSelector.reset(); } } + private GspAppServerRpcPersistence getAppServerRpcPersistence(){ + if(this.appServerRpcPersistence==null){ + this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); + } + return this.appServerRpcPersistence; + } + + private GspSuRpcPersistence getSuRpcPersistence(){ + if(this.suRpcPersistence==null){ + this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); + } + return this.suRpcPersistence; + } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java new file mode 100644 index 000000000..7c7afcb5c --- /dev/null +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java @@ -0,0 +1,22 @@ +package io.iec.edp.caf.msu.common.domain.persistence; + +import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; +import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle; +import io.iec.edp.caf.rpc.api.annotation.RpcParam; + +import java.util.Date; +import java.util.List; + +@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys") +public interface GspAppServerRpcPersistence { + + List findBySuName(@RpcParam(paramName = "suName") String suName); + + void deleteByAppName(@RpcParam(paramName = "appName") String appName); + + Date getCurrentTimestamp(); + + void save(@RpcParam(paramName = "appEntity") GspAppServerEntity appEntity); + + List findAll(); +} diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java new file mode 100644 index 000000000..0cb5c397c --- /dev/null +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java @@ -0,0 +1,23 @@ +package io.iec.edp.caf.msu.common.domain.persistence; + +import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; +import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle; +import io.iec.edp.caf.rpc.api.annotation.RpcParam; + +import java.util.List; + +@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys") +public interface GspSuRpcPersistence { + + GspSuEntity findById(@RpcParam(paramName = "suName") String suName); + + void deleteByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName); + + void deleteByApp(@RpcParam(paramName = "appName") String appName); + + int countByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName); + + void save(@RpcParam(paramName = "entity") GspSuEntity entity); + + List findAll(); +} -- Gitee From 573f59daf3c7693793e3dec5769549fcb3f8acc8 Mon Sep 17 00:00:00 2001 From: hanshuai03 <14693422+hanshuai03@user.noreply.gitee.com> Date: Mon, 22 Dec 2025 14:19:35 +0800 Subject: [PATCH 2/2] =?UTF-8?q?msu=E4=B8=BB=E5=BA=93=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8C=96=E6=94=B9=E9=80=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- caf-msu/caf-msu-database/pom.xml | 4 - .../msu/client/discovery/DbDiscoveryImpl.java | 62 +++------- .../caf/msu/client/health/DbHealthCheck.java | 109 ++++-------------- .../msu/client/register/DbRegisterImpl.java | 91 +++------------ .../edp/caf/msu/common/DBMsuServiceImpl.java | 51 ++------ .../GspAppServerRpcPersistence.java | 22 ---- .../persistence/GspSuRpcPersistence.java | 23 ---- 7 files changed, 64 insertions(+), 298 deletions(-) delete mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java delete mode 100644 caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java diff --git a/caf-msu/caf-msu-database/pom.xml b/caf-msu/caf-msu-database/pom.xml index c90ee1756..787d3a3e6 100644 --- a/caf-msu/caf-msu-database/pom.xml +++ b/caf-msu/caf-msu-database/pom.xml @@ -27,10 +27,6 @@ io.iec.edp caf-data-jpa - - io.iec.edp - caf-rpc-client - diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java index f970beb52..8bfcea009 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/discovery/DbDiscoveryImpl.java @@ -27,11 +27,8 @@ import io.iec.edp.caf.msu.client.health.DbHealthSetting; import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; -import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; -import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; -import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; @@ -51,16 +48,9 @@ public class DbDiscoveryImpl implements ServiceDiscovery { private AppServerRepository appRepo; private SuRepository suRepo; - private GspAppServerRpcPersistence appServerRpcPersistence; - - private GspSuRpcPersistence suRpcPersistence; - - private final RpcClassHolder rpcClassHolder; - - public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) { + public DbDiscoveryImpl(AppServerRepository appRepo, SuRepository suRepo) { this.appRepo = appRepo; this.suRepo = suRepo; - this.rpcClassHolder = rpcClassHolder; } @Override @@ -102,6 +92,9 @@ public class DbDiscoveryImpl implements ServiceDiscovery { */ @Override public List getEnabledServiceUnitInfo() { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } List infos = null; try { @@ -109,12 +102,7 @@ public class DbDiscoveryImpl implements ServiceDiscovery { CAFDataSourceSelector.selectMaster(); //读取GSPSuInstance - List allSuEntity = new ArrayList<>(); - if (CafEnvironment.enablePrimaryDataService()) { - allSuEntity = getSuRpcPersistence().findAll(); - } else { - allSuEntity = this.suRepo.findAll(); - } + List allSuEntity = this.suRepo.findAll(); if (allSuEntity.size() > 0) { infos = new ArrayList<>(); for (GspSuEntity entity : allSuEntity) { @@ -141,23 +129,18 @@ public class DbDiscoveryImpl implements ServiceDiscovery { * @return 所有实例 */ private List findAppsBySu(String su) { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } List apps = new ArrayList<>(); try { //设置主库 CAFDataSourceSelector.selectMaster(); - List entitys = new ArrayList<>(); - if (CafEnvironment.enablePrimaryDataService()) { - entitys = getAppServerRpcPersistence().findBySuName(su.toLowerCase()); - if (entitys == null || entitys.size() == 0) { - entitys = getAppServerRpcPersistence().findBySuName(su); - } - } else { - entitys = this.appRepo.findBySuName(su.toLowerCase()); - if (entitys == null || entitys.size() == 0) { - entitys = this.appRepo.findBySuName(su); - } + List entitys = this.appRepo.findBySuName(su.toLowerCase()); + if (entitys == null || entitys.size() == 0) { + entitys = this.appRepo.findBySuName(su); } for (GspAppServerEntity entity : entitys) { @@ -189,17 +172,11 @@ public class DbDiscoveryImpl implements ServiceDiscovery { if (apps == null || apps.size() == 0) return null; - Date currentTimestamp = new Date(); - if (CafEnvironment.enablePrimaryDataService()) { - currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); - } else { - currentTimestamp = this.appRepo.getCurrentTimestamp(); - } + Date currentTimestamp = this.appRepo.getCurrentTimestamp(); int beatPeriod = SpringBeanUtils.getBean(DbHealthSetting.class).getBeatPeriod(); List latest = new ArrayList<>(); - Date finalCurrentTimestamp = currentTimestamp; apps.forEach(app -> { - if ((finalCurrentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) { + if ((currentTimestamp.getTime() - Date.from(app.getBeatTime().toInstant()).getTime()) / 1000 <= beatPeriod) { latest.add(app); } }); @@ -214,17 +191,4 @@ public class DbDiscoveryImpl implements ServiceDiscovery { return apps.get(index); } - private GspAppServerRpcPersistence getAppServerRpcPersistence(){ - if(this.appServerRpcPersistence==null){ - this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); - } - return this.appServerRpcPersistence; - } - - private GspSuRpcPersistence getSuRpcPersistence(){ - if(this.suRpcPersistence==null){ - this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); - } - return this.suRpcPersistence; - } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java index c62d5b089..7a4fbd968 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/health/DbHealthCheck.java @@ -26,11 +26,8 @@ import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory; import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; -import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; -import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; -import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -68,13 +65,7 @@ public class DbHealthCheck { //任务 private ScheduledExecutorService executorService; - private GspAppServerRpcPersistence appServerRpcPersistence; - - private GspSuRpcPersistence suRpcPersistence; - - private final RpcClassHolder rpcClassHolder; - - public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps, RpcClassHolder rpcClassHolder) { + public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List appEntities, Map> maps) { this.appRepo = appRepo; this.suRepo = suRepo; this.instances = appEntities; @@ -86,7 +77,6 @@ public class DbHealthCheck { thread.setName("DbHealthCheck"); return thread; }); - this.rpcClassHolder = rpcClassHolder; } //检测 @@ -120,6 +110,9 @@ public class DbHealthCheck { // 健康检测 private void doCheck() { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } try { //设置主库 CAFDataSourceSelector.selectMaster(); @@ -157,12 +150,7 @@ public class DbHealthCheck { */ private void doHealthyCheck() { //数据库时间 - Date currentTimestamp = new Date(); - if (CafEnvironment.enablePrimaryDataService()) { - currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); - } else { - currentTimestamp = appRepo.getCurrentTimestamp(); - } + Date currentTimestamp = appRepo.getCurrentTimestamp(); if (currentTimestamp == null) { currentTimestamp = new Date(); } else { @@ -171,12 +159,7 @@ public class DbHealthCheck { //数据库时间转换为OffsetDateTime OffsetDateTime beatTime = currentTimestamp.toInstant().atZone(ZoneOffset.systemDefault()).toOffsetDateTime(); //所有app实例 - List allInstances = new ArrayList<>(); - if (CafEnvironment.enablePrimaryDataService()) { - allInstances = getAppServerRpcPersistence().findAll(); - } else { - allInstances = appRepo.findAll(); - } + List allInstances = appRepo.findAll(); //遍历实例内部的实例 for (GspAppServerEntity inner : DbHealthCheck.this.instances) { @@ -194,11 +177,7 @@ public class DbHealthCheck { //存在 appInstance.setBeatTime(beatTime); //appServerEntity.setHealthy(true); 暂不使用 - if (CafEnvironment.enablePrimaryDataService()) { - getAppServerRpcPersistence().save(appInstance); - } else { - appRepo.save(appInstance); - } + appRepo.save(appInstance); if (log.isInfoEnabled()) log.info("[{}] beats", serviceName); @@ -206,21 +185,13 @@ public class DbHealthCheck { allInstances.remove(appInstance); } else { //清理su信息、实例信息 - if (CafEnvironment.enablePrimaryDataService()) { - getSuRpcPersistence().deleteByApp(serviceName); - } else { - suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理 - } + suRepo.deleteByApp(serviceName); //此处是一重保险,避免非优雅停机导致数据未清理 //注册实例信息、su信息 //注册实例信息 inner.setBeatTime(beatTime); //appEntity.setHealthy(true); 暂不使用 - if (CafEnvironment.enablePrimaryDataService()) { - getAppServerRpcPersistence().save(inner); - } else { - appRepo.save(inner); - } + appRepo.save(inner); //注册su信息 List suInfos = DbHealthCheck.this.maps.get(serviceName); if (suInfos != null && suInfos.size() > 0) { @@ -230,28 +201,15 @@ public class DbHealthCheck { for (ServiceUnitInfo su : suInfos) { if (su != null) { String suName = su.getName().toLowerCase(); - if (CafEnvironment.enablePrimaryDataService()) { - if (getSuRpcPersistence().countByAppAndSu(serviceName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(serviceName); - suEntity.setSu(suName); - getSuRpcPersistence().save(suEntity); - - if (log.isInfoEnabled()) - log.info("Success to register su [{}]", suName); - } - } else { - if (suRepo.countByAppAndSu(serviceName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(serviceName); - suEntity.setSu(suName); - suRepo.save(suEntity); - - if (log.isInfoEnabled()) - log.info("Success to register su [{}]", suName); - } + if (suRepo.countByAppAndSu(serviceName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(serviceName); + suEntity.setSu(suName); + suRepo.save(suEntity); + + if (log.isInfoEnabled()) + log.info("Success to register su [{}]", suName); } } } @@ -274,23 +232,13 @@ public class DbHealthCheck { OffsetDateTime dateTime = entity.getBeatTime(); if (dateTime == null) { //脏数据 - if (CafEnvironment.enablePrimaryDataService()) { - getSuRpcPersistence().deleteByApp(entity.getAppName()); - getAppServerRpcPersistence().deleteByAppName(entity.getAppName()); - } else { - suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 - appRepo.deleteByAppName(entity.getAppName()); - } + suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 + appRepo.deleteByAppName(entity.getAppName()); } else { long beat = Date.from(dateTime.toInstant()).getTime() / 1000; if (current - beat > setting.getRemovePeriod()) { - if (CafEnvironment.enablePrimaryDataService()) { - getSuRpcPersistence().deleteByApp(entity.getAppName()); - getAppServerRpcPersistence().deleteByAppName(entity.getAppName()); - } else { - suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 - appRepo.deleteByAppName(entity.getAppName()); - } + suRepo.deleteByApp(entity.getAppName()); //此处是一重保险,避免非优雅停机导致数据未清理 + appRepo.deleteByAppName(entity.getAppName()); if (log.isInfoEnabled()) { log.info("Remove [{}], URL [{}]", entity.getAppName(), entity.getAppUrl()); @@ -320,17 +268,4 @@ public class DbHealthCheck { } } - private GspAppServerRpcPersistence getAppServerRpcPersistence(){ - if(this.appServerRpcPersistence==null){ - this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); - } - return this.appServerRpcPersistence; - } - - private GspSuRpcPersistence getSuRpcPersistence(){ - if(this.suRpcPersistence==null){ - this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); - } - return this.suRpcPersistence; - } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java index 793828e88..238c789fd 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/client/register/DbRegisterImpl.java @@ -28,15 +28,11 @@ import io.iec.edp.caf.msu.api.entity.MsuProperties; import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo; import io.iec.edp.caf.msu.api.entity.ServiceUnitRegisterInfo; import io.iec.edp.caf.msu.client.health.DbHealthCheck; -import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; -import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; -import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; import io.iec.edp.caf.msu.common.utils.NetUtil; -import io.iec.edp.caf.rpc.client.RpcClassHolder; import lombok.extern.slf4j.Slf4j; import org.springframework.core.env.Environment; @@ -65,18 +61,11 @@ public class DbRegisterImpl implements ServiceRegistry { private DbHealthCheck dbHealthCheck; - private GspAppServerRpcPersistence appServerRpcPersistence; - - private GspSuRpcPersistence suRpcPersistence; - - private final RpcClassHolder rpcClassHolder; - - public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware, RpcClassHolder rpcClassHolder) { + public DbRegisterImpl(AppServerRepository appRepo, SuRepository suRepo, ServiceUnitAwareService suAware) { this.appRepo = appRepo; this.suRepo = suRepo; this.suAware = suAware; this.enableSSL = enableSSL(); - this.rpcClassHolder = rpcClassHolder; } @Override @@ -97,17 +86,9 @@ public class DbRegisterImpl implements ServiceRegistry { //依次注销su、实例信息 List suNames = this.suAware.getEnabledServiceUnits(); for (String suName : suNames) { - if (CafEnvironment.enablePrimaryDataService()) { - getSuRpcPersistence().deleteByAppAndSu(appName, suName.toLowerCase()); - } else { - this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase()); - } - } - if (CafEnvironment.enablePrimaryDataService()) { - getAppServerRpcPersistence().deleteByAppName(appName); - } else { - this.appRepo.deleteByAppName(appName); + this.suRepo.deleteByAppAndSu(appName, suName.toLowerCase()); } + this.appRepo.deleteByAppName(appName); log.info("ServiceCenter(DataBase) Unregister service [{}]", appName); } @@ -118,6 +99,9 @@ public class DbRegisterImpl implements ServiceRegistry { @Override public Boolean register(ServiceUnitRegisterInfo registerInfo, String ip, Integer port) { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } try { //设置主库 CAFDataSourceSelector.selectMaster(); @@ -144,7 +128,7 @@ public class DbRegisterImpl implements ServiceRegistry { if (this.dbHealthCheck != null) { this.dbHealthCheck.stop(); } - this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap, rpcClassHolder); + this.dbHealthCheck = new DbHealthCheck(appRepo, suRepo, appEntities, appSuInfoMap); this.dbHealthCheck.start(); } @@ -162,13 +146,8 @@ public class DbRegisterImpl implements ServiceRegistry { log.info("ServiceCenter(DataBase) Start to register su of service [{}]", appName); //清理su信息、实例信息 - if (CafEnvironment.enablePrimaryDataService()) { - getSuRpcPersistence().deleteByApp(appName); - getAppServerRpcPersistence().deleteByAppName(appName); - } else { - this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理 - this.appRepo.deleteByAppName(appName); - } + this.suRepo.deleteByApp(appName); //此处是一重保险,避免非优雅停机导致数据未清理 + this.appRepo.deleteByAppName(appName); //注册实例信息、su信息 //注册实例信息 @@ -182,12 +161,7 @@ public class DbRegisterImpl implements ServiceRegistry { appEntity.setAppName(appName); appEntity.setAppUrl(url); //appEntity.setHealthy(true); 暂不使用 - Date currentTimestamp = new Date(); - if (CafEnvironment.enablePrimaryDataService()) { - currentTimestamp = getAppServerRpcPersistence().getCurrentTimestamp(); - } else { - currentTimestamp = this.appRepo.getCurrentTimestamp(); - } + Date currentTimestamp = this.appRepo.getCurrentTimestamp(); OffsetDateTime dateTime = null; if (currentTimestamp != null) { currentTimestamp.setTime(currentTimestamp.getTime()); @@ -195,34 +169,19 @@ public class DbRegisterImpl implements ServiceRegistry { } appEntity.setBeatTime(dateTime); appEntity.setBasePath(CafEnvironment.getBaseUrlPath()); - if (CafEnvironment.enablePrimaryDataService()) { - getAppServerRpcPersistence().save(appEntity); - } else { - this.appRepo.save(appEntity); - } + this.appRepo.save(appEntity); //注册su信息 List suInfos = registerInfo.getServiceUnitInfo(); for (ServiceUnitInfo su : suInfos) { if (su != null) { String suName = su.getName().toLowerCase(); - if (CafEnvironment.enablePrimaryDataService()) { - if (getSuRpcPersistence().countByAppAndSu(appName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(appName); - suEntity.setSu(suName); - getSuRpcPersistence().save(suEntity); - log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); - } - } else { - if (this.suRepo.countByAppAndSu(appName, suName) == 0) { - GspSuEntity suEntity = new GspSuEntity(); - suEntity.setId(UUID.randomUUID().toString()); - suEntity.setApp(appName); - suEntity.setSu(suName); - this.suRepo.save(suEntity); - log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); - } + if (this.suRepo.countByAppAndSu(appName, suName) == 0) { + GspSuEntity suEntity = new GspSuEntity(); + suEntity.setId(UUID.randomUUID().toString()); + suEntity.setApp(appName); + suEntity.setSu(suName); + this.suRepo.save(suEntity); + log.info("ServiceCenter(DataBase) Success to register su [{}]", suName); } } } @@ -281,18 +240,4 @@ public class DbRegisterImpl implements ServiceRegistry { } return addressFormat; } - - private GspAppServerRpcPersistence getAppServerRpcPersistence(){ - if(this.appServerRpcPersistence==null){ - this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); - } - return this.appServerRpcPersistence; - } - - private GspSuRpcPersistence getSuRpcPersistence(){ - if(this.suRpcPersistence==null){ - this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); - } - return this.suRpcPersistence; - } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java index 31dd96a0e..59dd3c29f 100644 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java +++ b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/DBMsuServiceImpl.java @@ -9,11 +9,8 @@ import io.iec.edp.caf.msu.api.entity.GspSuInstance; import io.iec.edp.caf.msu.common.domain.converter.GspAppConverter; import io.iec.edp.caf.msu.common.domain.converter.GspSuConverter; import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; -import io.iec.edp.caf.msu.common.domain.persistence.GspAppServerRpcPersistence; -import io.iec.edp.caf.msu.common.domain.persistence.GspSuRpcPersistence; import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository; import io.iec.edp.caf.msu.common.domain.repository.SuRepository; -import io.iec.edp.caf.rpc.client.RpcClassHolder; import java.util.ArrayList; import java.util.List; @@ -29,36 +26,24 @@ public class DBMsuServiceImpl extends MsuServiceImpl { private SuRepository suRepo; - private GspAppServerRpcPersistence appServerRpcPersistence; - - private GspSuRpcPersistence suRpcPersistence; - - private final RpcClassHolder rpcClassHolder; - - public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo, RpcClassHolder rpcClassHolder) { + public DBMsuServiceImpl(ServiceUnitAwareService suAware, ServiceDiscovery serviceDiscovery, AppServerRepository appRepo, SuRepository suRepo) { super(suAware, serviceDiscovery); this.appRepo = appRepo; this.suRepo = suRepo; - this.rpcClassHolder = rpcClassHolder; } //根据Su名称获取appServer信息 public List getGspAppServerInfoBySu(String suName) { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } try { CAFDataSourceSelector.selectMaster(); - List apps = new ArrayList<>(); - if (CafEnvironment.enablePrimaryDataService()) { - apps = getAppServerRpcPersistence().findBySuName(suName.toLowerCase()); - if (apps == null || apps.size() == 0) { - apps = getAppServerRpcPersistence().findBySuName(suName); - } - } else { - apps = this.appRepo.findBySuName(suName.toLowerCase()); - if (apps == null || apps.size() == 0) { - apps = this.appRepo.findBySuName(suName); - } + List apps = this.appRepo.findBySuName(suName.toLowerCase()); + if (apps == null || apps.size() == 0) { + apps = this.appRepo.findBySuName(suName); } List instances = new ArrayList<>(); @@ -76,30 +61,16 @@ public class DBMsuServiceImpl extends MsuServiceImpl { //获取一个Su的信息 public GspSuInstance getGspSuInfo(String suName) { + if (CafEnvironment.enablePrimaryDataService()) { + throw new RuntimeException("The Primary-Data-Service mode does not support using the database as a service registry. Please switch to Nacos or other alternatives."); + } try { CAFDataSourceSelector.selectMaster(); - if (CafEnvironment.enablePrimaryDataService()) { - return GspSuConverter.convertToInstance(getSuRpcPersistence().findById(suName)); - } else { - return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get()); - } + return GspSuConverter.convertToInstance(this.suRepo.findById(suName).get()); } finally { CAFDataSourceSelector.reset(); } } - private GspAppServerRpcPersistence getAppServerRpcPersistence(){ - if(this.appServerRpcPersistence==null){ - this.appServerRpcPersistence = rpcClassHolder.getRpcClass("sys", GspAppServerRpcPersistence.class); - } - return this.appServerRpcPersistence; - } - - private GspSuRpcPersistence getSuRpcPersistence(){ - if(this.suRpcPersistence==null){ - this.suRpcPersistence = rpcClassHolder.getRpcClass("sys", GspSuRpcPersistence.class); - } - return this.suRpcPersistence; - } } diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java deleted file mode 100644 index 7c7afcb5c..000000000 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspAppServerRpcPersistence.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.iec.edp.caf.msu.common.domain.persistence; - -import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity; -import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle; -import io.iec.edp.caf.rpc.api.annotation.RpcParam; - -import java.util.Date; -import java.util.List; - -@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys") -public interface GspAppServerRpcPersistence { - - List findBySuName(@RpcParam(paramName = "suName") String suName); - - void deleteByAppName(@RpcParam(paramName = "appName") String appName); - - Date getCurrentTimestamp(); - - void save(@RpcParam(paramName = "appEntity") GspAppServerEntity appEntity); - - List findAll(); -} diff --git a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java b/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java deleted file mode 100644 index 0cb5c397c..000000000 --- a/caf-msu/caf-msu-database/src/main/java/io/iec/edp/caf/msu/common/domain/persistence/GspSuRpcPersistence.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.iec.edp.caf.msu.common.domain.persistence; - -import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity; -import io.iec.edp.caf.rpc.api.annotation.GspServiceBundle; -import io.iec.edp.caf.rpc.api.annotation.RpcParam; - -import java.util.List; - -@GspServiceBundle(applicationName = "runtime", serviceUnitName= "sys") -public interface GspSuRpcPersistence { - - GspSuEntity findById(@RpcParam(paramName = "suName") String suName); - - void deleteByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName); - - void deleteByApp(@RpcParam(paramName = "appName") String appName); - - int countByAppAndSu(@RpcParam(paramName = "appName") String appName, @RpcParam(paramName = "suName") String suName); - - void save(@RpcParam(paramName = "entity") GspSuEntity entity); - - List findAll(); -} -- Gitee