# LLMScore_java **Repository Path**: rf818/llmscore_java ## Basic Information - **Project Name**: LLMScore_java - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-10-17 - **Last Updated**: 2024-12-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 项目介绍:Java后端 > 传送门🚪:[LLM Score 项目的 python 大模型端](https://gitee.com/rf818/llmscore_python) 本项目是大模型评分的 Java 后端代码。主要的功能有:基于SpringBoot提供解析Excel表的API接口、对新上传的记录进行调用调用`llmScore-message` 微服务,逐条向大模型端发送消息、基于kafka消费大模型端推送的评分结果。 ![项目架构图](readme_files/大模型智能质检.jpg) # Quick Start - 【pull docker】 mysql:latest 、bitnami/kafka:3.8、nacos/nacos-server:latest - 【启动nacos、mysql】运行`cleanup_start_docker_compose.sh` 脚本,清除并创建nacos、mysql、kafka本地映射目录、启动`docker-compose.yml`中定义的docker容器; - 【启动Excel处理】启动`llmScore-excel`模块下的`LlmScoreExcelApplication` ,对外暴露`localhost:8080/llm_score/v1/upload_excel` 接口; - 【启动消息处理】启动`llmScore-message` 模块下的`LlmScoreMessageApplication`,注册为`send_message` 服务,使用SpringCloud+nacos进行服务注册; # 项目功能 技术栈:SpringBoot、SpringCloud、Nacos、MySQL、Docker、Mybatis-plus ## llmScore-excel 模块 ### `CodeGenerator.java` 基于Mybatis-plus,对配置的数据库表生成对应的类。 --- ### `service/impl/RecordsServiceImpl.java` :解析上传的Excel、并进行存库、微服务调用。 - 读取上传的Excel表,并进行统计和打印。 ```java @Transactional(rollbackFor = Exception.class) @Override public Integer uploadExcel(MultipartFile file) { AtomicReference dataListSize = new AtomicReference<>(); try { EasyExcel.read(file.getInputStream(), ExcelTables.class, new PageReadListener(dataList -> { log.info("读取到{}条数据", dataList.size()); ThoseNotInDbWillAdd(dataList, dataListSize); })).sheet().doRead(); } catch (IOException e) { throw HttpException.badRequest("上传文件格式有误!"); } if (dataListSize.get() == null) { throw HttpException.badRequest("本次成功读取新录音记录为0"); } return dataListSize.get(); } ``` - `ThoseNotInDbWillAdd`方法 函数将excel表中的一行记录转化为Java对象,使用Mybatis-plus查询器,找到新增的记录(不在数据库中的)并保存到数据库,同时基于SpringCould启动message微服务向大模型端发送消息。这里都开启了Spring事务,一旦发生异常就需要重新上传。 ```java private void ThoseNotInDbWillAdd(List dataList, AtomicReference dataListSize) { //进行转化 List dataToRecordsList = dataList.stream().map(data -> dozerBeanMapper.map(data, Records.class)).collect(Collectors.toList()); //先找到重复的记录 QueryWrapper wrapper = new QueryWrapper<>(); wrapper.lambda().in(Records::getRecordName, dataToRecordsList.stream().map(Records::getRecordName).collect(Collectors.toList())); List recordsInDb = this.list(wrapper); List nameOfRecordsInDb = recordsInDb.stream().map(Records::getRecordName).collect(Collectors.toList()); //使用stream,排除不存在的记录 List newdataList = dataToRecordsList.stream().filter(data -> !nameOfRecordsInDb.contains(data.getRecordName())).collect(Collectors.toList()); if (!newdataList.isEmpty()) { this.saveBatch(newdataList); dataListSize.set(newdataList.size()); //将新数据启用微服务,让发送消息服务进行发送任务 this.restTemplate.postForObject(sendMessageUrl, newdataList, Response.class); } } ``` --- ## llmScore-message模块 ### `service/impl/SendMessageService.java` :消息发送微服务 使用kafka进行通信,注入指定的 topic,并且逐条进行发送。成功和失败都使用日志记录。 ```java @Override public Integer sendMessage(List newdataList) { // 分区编号最好为 null,交给 kafka 自己去分配 newdataList.forEach(this::sendMessageByKafka); return newdataList.size(); } private void sendMessageByKafka(Records newdata) { ProducerRecord producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(newdata.hashCode()), newdata); ListenableFuture> future = kafkaTemplate.send(producerRecord); // 处理正常完成的结果 future.addCallback(result -> { assert result != null; log.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()); }, ex -> log.error("生产者发送消失败,原因:{}", ex.getMessage())); } ``` ### `config/KafkaConfig.java 和 TopicAdminstrator`:读取springboot配置,并进行topic注册 - 读取配置中的kafka topic 并对应的生成JSON消息转换器和topic对象。 ```java @Configuration @ConfigurationProperties(prefix = "kafka") @Data class KafkaConfig { private List topics; /** * JSON消息转换器 */ @Bean public RecordMessageConverter jsonConverter() { return new StringJsonMessageConverter(); } @Data static class Topic { String name; Integer numPartitions = 1; Short replicationFactor = 1; NewTopic toNewTopic() { return new NewTopic(this.name, this.numPartitions, this.replicationFactor); } } } ``` - 对每个 topic bean 都进行注册。 ```java @Configuration @Slf4j public class TopicAdministrator { private final KafkaConfig configurations; private final GenericWebApplicationContext context; public TopicAdministrator(KafkaConfig configurations, GenericWebApplicationContext genericContext) { this.configurations = configurations; this.context = genericContext; } @PostConstruct public void init() { configurations.getTopics().forEach(t -> { context.registerBean(t.name, NewTopic.class, t::toNewTopic); log.info(String.format("构建主题:-> %s", t)); }); } } ``` --- ### `service/impl/ReceiveService.java` : kafka消息接收 - 注册一个kafka消费者监听器,对消息转化为java对象并进行数据库更新。 ```java @Override @KafkaListener(topics = {"${kafka.topic.receive}"}, groupId = "javaBackEnd") public void receiveMessage(ConsumerRecord msg) { try { if (msg.value().contains("mark")) { MarkContent markContent = objectMapper.readValue(msg.value(), MarkContent.class); //后续存入对应的列中 log.info("topic:{} partition:{}的消息 -> {}\n无法转化为JSON\n", msg.topic(), msg.partition(), markContent.toString()); } else { TypeFactory typeFactory = objectMapper.getTypeFactory(); CollectionType collectionType = typeFactory.constructCollectionType(List.class, LLMOutput.ScoreReason.class); JavaType constructedParametricType = typeFactory.constructParametricType(LLMOutput.class, collectionType); LLMOutput> llmOutput = objectMapper.readValue(msg.value(), constructedParametricType); Records records = llmOutputToRecords(llmOutput); //修改状态 this.updateById(records); log.info("消费topic:{} partition:{}的消息 -> {}\n更新数据->{}", msg.topic(), msg.partition(), llmOutput, records); } } catch (JsonProcessingException e) { e.printStackTrace(); } } ```