# Learn Streaming LAB **Repository Path**: robertindie/LearnStreaming ## Basic Information - **Project Name**: Learn Streaming LAB - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: dev - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-04-28 - **Last Updated**: 2025-04-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 注意事项 - 请使用Gitee或者Github登录UniteStream,暂不支持账号密码登录 - 请准备python 3.8 或更高版本的python环境 在代码中类似: ```python ################## LAB1 ################################### ########################################################### ``` 的区域,是需要同学们实现和填入的代码。 - 在每次 consumer 处理完消息后,需要调用 `consumer.acknowledge(msg)` 方法对消息进行签收,可参考UniteStream Cloud上的代码示例。 # 实验一:设备数据采集 ![img.png](img.png) ## 实验目的 - 学习使用 Apache Pulsar 和 Python 客户端进行物联网数据采集。 - 实现从模拟数据采集到数据传输的完整流程。 ## 实验步骤 1. **理论及技术调研** - 回顾物联网流计算的相关理论知识。 - 熟悉 Apache Pulsar 的基本概念和使用方法。 - 了解 Pulsar 的 Topic、Producer、Consumer 等核心概念。 2. **构建物联网应用系统** - 软件准备: - 安装 Python 3.8 或更高版本 - 安装 所需要的依赖: `pip install -r requirements.txt` - 环境准备: - 登录 https://cloud.unitestream.com,点击右下角红色的Gitee按钮,使用Gitee登录。 - 点击DataSets标签页(Dataset是Topic的一个逻辑抽象,他们是一一对应的关系,可以将Dataset认为是一个Topic),创建两个Dataset: `sensor-data` 和 `control-data`。进入它们的详情页即可获得token和Topic名称信息。Token值用于连接认证,对于每个用户来说,每个Dataset的token值都是相同的。 - 在 `env.py` 中配置环境变量(TOKEN, SENSOR_TOPIC, CONTROL_TOPIC) - 运行 `python main.py` 后根据提示打开 `http://127.0.0.1:5001` 即可以查看物联网模拟系统 3. **模拟数据采集** - 数据采集程序实现: - 在 `main.py` 中实现 `update_sensor` 接口 - 使用 `sensor_producer` 将数据发送到 `SENSOR_TOPIC` - 数据格式: `{'timestamp': timestamp, 'values': {'temperature': value, 'humidity': value, 'water_level': value}}`, 代码中已经构造了消息 `message`,可直接将这个消息按照示例程序进行发送: `producer.send(json.dumps(message).encode('utf-8'))` 4. **数据处理和展示** - 实现 `process_sensor_data` 函数: - 使用 `sensor_consumer` 从 `SENSOR_TOPIC` 读取数据 - 通过调用 `load_message_data(msg)` 更新 `current_data` 和 `history_data` 数据结构即可供模拟系统读取,其中msg代表从consumer.receive 返回来的消息数据。 - 使用 while 循环进行循环数据读取 - 运行 `python main.py` 后根据提示打开 `http://127.0.0.1:5001` 即可看到数据展示效果 ## 实验要求 - 团队合作:鼓励团队合作,分工明确。 - 文档记录:详细记录实验过程,包括软件代码和遇到的问题。 ## 实验评估 - 系统能否成功采集和传输数据。 - 实验报告的完整性和逻辑性。 --- # 实验二:流计算设备控制 ![img_1.png](img_1.png) ## 实验目的 - 学习处理流数据并进行设备控制。 - 实现从数据处理到生成控制指令的完整流程。 ## 实验步骤 1. **理论及技术调研** - 复习流计算的相关知识。 - 了解设备控制的基本原理和策略。 - 学习 Pulsar 的消费者和生产者模式。 2. **设备管理器实现** - 在`device_manager.py`中已经实现设备控制逻辑,可以根据自己的想法对控制逻辑进行修改 3. **数据处理流程** - 在`device_manager_process.py`中实现函数`process_sensor_data`的数据处理逻辑: - 使用`sensor-consumer` 从 `sensor-data` 主题读取数据 - 调用 `self.device_manager.process(msg)` 获取设备控制状态,返回值是control_message - 将control_message通过 `self.control_producer` 发送到 `control-data` 主题。 4. **控制指令处理** - 实现 main.py 中的 `process_control_data` 函数: - 从 `control_consumer` 读取控制指令 - 通过调用 `update_control_data(msg)` 将控制数据应用到模拟系统 5. **测试** - 运行 `python main.py` 启动主程序 - 运行 `python device_manager_process.py` 启动设备控制程序 - 打开 `http://127.0.0.1:5001` 进行测试 ## 实验要求 - 团队合作和分工明确。 - 详细记录实验过程和代码实现。 ## 实验评估 - 数据处理的准确性和实时性。 - 控制指令的有效性和设备响应。 --- # 实验三:智能助手(可选) ![img_2.png](img_2.png) ## 实验目的 - 集成 DeepSeek 进行数据分析。 - 提升数据分析的智能化水平。 ## 实验步骤 1. **DeepSeek 集成** - 创建 `DeepSeekChat` 类: - 初始化 API 密钥和请求头 - 实现上下文格式化方法 - 实现响应生成方法 2. **数据分析实现** - 实现 `generate_response` 方法: - 格式化传感器数据和设备状态 - 构建提示词 - 调用 DeepSeek API - 处理响应结果 3. **API 集成** - 实现 `/api/chat` 接口 (位于 `main.py` 中的 `chat` 方法): - 接收用户查询 - 获取当前环境状态 - 调用 DeepSeek 生成响应 - 返回分析结果 ## 实现步骤 ## 实验要求 - 记录集成过程和分析结果。 ## 实验评估 - 助手的分析能力和准确性。 - 报告的完整性和可读性。 --- # 实验四:数据精确一次同步(可选) ## 实验目的 - 实现数据精确一次传输,确保数据的完整性。 ## 实验步骤 1. **CSV 生产者实现** - 创建 `CSVProducer` 类: - 初始化 Pulsar 客户端和生产者 - 实现文件读取和消息发送 - 添加序列号和属性信息 2. **CSV 消费者实现** - 创建 `CSVConsumer` 类: - 实现状态管理(使用文件锁) - 实现消息去重 - 实现数据写入 3. **精确一次传输实现** - 生产者端: - 使用序列号标识消息 - 添加文件信息属性 - 消费者端: - 使用文件锁确保原子性 - 维护处理状态 - 实现消息确认机制 ## 实验要求 - 详细记录实验过程。 - 提供代码注释和实现说明。 ## 实验评估 - 数据传输的准确性。 - 文件数据的完整性和一致性。 通过这些实验,学生将能更好地理解和应用物联网流计算技术,增强实际动手能力。