diff --git a/unittest/test_check_cmd.py b/unittest/test_check_cmd.py new file mode 100644 index 0000000000000000000000000000000000000000..afd0f90b037ad0b2740f418cfc0e6af2ea215a03 --- /dev/null +++ b/unittest/test_check_cmd.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# oeDeploy is licensed under the Mulan PSL v2. + +import unittest +from unittest.mock import patch, MagicMock +from src.commands.check.check_cmd import CheckCmd + + +class TestCheckCmd(unittest.TestCase): + def setUp(self): + self.project_path = "/fake/project" + self.action = "install" + + def test_init(self): + """测试CheckCmd初始化""" + check_cmd = CheckCmd(self.project_path, self.action) + self.assertEqual(check_cmd.project, self.project_path) + self.assertEqual(check_cmd.action, self.action) + self.assertIsNotNone(check_cmd.log) + + def test_run(self): + """测试run方法(当前为空实现)""" + check_cmd = CheckCmd(self.project_path, self.action) + result = check_cmd.run() + # 当前实现为pass,返回None + self.assertIsNone(result) + diff --git a/unittest/test_info_cmd.py b/unittest/test_info_cmd.py new file mode 100644 index 0000000000000000000000000000000000000000..4ef1a7a0d8ba39f06b5e5facf9800b62dc14bee8 --- /dev/null +++ b/unittest/test_info_cmd.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Huawei Technologies Co., Ltd. +# oeDeploy is licensed under the Mulan PSL v2. + +import unittest +import os +import tempfile +import shutil +import yaml +from unittest.mock import patch, MagicMock +from src.commands.info.info_cmd import InfoCmd +from src.exceptions.config_exception import ConfigException + + +class TestInfoCmd(unittest.TestCase): + def setUp(self): + # 创建临时项目目录 + self.test_dir = tempfile.mkdtemp() + self.project_path = os.path.join(self.test_dir, "test_project") + os.makedirs(self.project_path) + + # 创建有效的main.yaml + self.main_yaml = os.path.join(self.project_path, "main.yaml") + with open(self.main_yaml, 'w') as f: + yaml.dump({ + "name": "test_project", + "version": "1.0.0", + "description": "Test project description", + "action": { + "install": { + "description": "Install the application", + "tasks": [ + {"name": "setup", "playbook": "setup.yml"} + ] + }, + "uninstall": { + "description": "Uninstall the application", + "tasks": [ + {"name": "cleanup", "playbook": "cleanup.yml"} + ] + } + } + }, f) + + def tearDown(self): + # 清理临时目录 + shutil.rmtree(self.test_dir) + + def test_init(self): + """测试InfoCmd初始化""" + info_cmd = InfoCmd(self.project_path) + self.assertEqual(info_cmd.project, self.project_path) + self.assertIsNotNone(info_cmd.log) + + def test_run_success(self): + """测试成功获取项目信息""" + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertTrue(result) + + def test_run_with_invalid_project(self): + """测试无效项目路径""" + info_cmd = InfoCmd("/nonexistent/project") + result = info_cmd.run() + self.assertFalse(result) + + def test_run_with_missing_main_yaml(self): + """测试缺少main.yaml文件""" + os.remove(self.main_yaml) + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertFalse(result) + + def test_run_with_invalid_main_yaml(self): + """测试无效的main.yaml文件""" + with open(self.main_yaml, 'w') as f: + f.write("invalid yaml content: [unclosed") + + info_cmd = InfoCmd(self.project_path) + # yaml解析错误会抛出异常,被MainReader捕获并记录日志 + # 但实际会在MainReader初始化时抛出异常而不是返回False + with self.assertRaises(Exception): + result = info_cmd.run() + + def test_run_with_missing_fields(self): + """测试main.yaml缺少必要字段(实际代码能容错处理)""" + with open(self.main_yaml, 'w') as f: + yaml.dump({ + "name": "test_project" + # 缺少version, description, action等字段 + }, f) + + info_cmd = InfoCmd(self.project_path) + # 实际代码能够容错处理缺少的字段,返回空值 + result = info_cmd.run() + self.assertTrue(result) + + def test_run_with_empty_action(self): + """测试action字段为空""" + with open(self.main_yaml, 'w') as f: + yaml.dump({ + "name": "test_project", + "version": "1.0.0", + "description": "Test project", + "action": {} + }, f) + + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertTrue(result) # 空action也应该能正常处理 + + def test_run_with_multiple_actions(self): + """测试多个action""" + with open(self.main_yaml, 'w') as f: + yaml.dump({ + "name": "test_project", + "version": "2.0.0", + "description": "Test project with multiple actions", + "action": { + "install": {"description": "Install"}, + "uninstall": {"description": "Uninstall"}, + "update": {"description": "Update"}, + "configure": {"description": "Configure"} + } + }, f) + + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertTrue(result) + + def test_run_with_action_no_description(self): + """测试action没有description字段""" + with open(self.main_yaml, 'w') as f: + yaml.dump({ + "name": "test_project", + "version": "1.0.0", + "description": "Test project", + "action": { + "install": { + "tasks": [{"name": "setup", "playbook": "setup.yml"}] + } + } + }, f) + + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertTrue(result) + + @patch('src.commands.info.info_cmd.MainReader') + def test_run_with_config_exception(self, mock_reader): + """测试MainReader抛出ConfigException""" + mock_reader.side_effect = ConfigException("Config error") + info_cmd = InfoCmd(self.project_path) + result = info_cmd.run() + self.assertFalse(result) + diff --git a/unittest/test_init_cmd.py b/unittest/test_init_cmd.py index 6796d178800a5f808846ef84cec907b44f54b073..1df16eddbf0714ccf12e566d500639eaa1dd65a5 100644 --- a/unittest/test_init_cmd.py +++ b/unittest/test_init_cmd.py @@ -6,6 +6,7 @@ import unittest import os import tempfile import shutil +import hashlib from unittest.mock import patch, MagicMock import src.constants.paths from src.commands.init.init_cmd import InitCmd @@ -18,11 +19,6 @@ class TestInitCmd(unittest.TestCase): self.plugin_dir = os.path.join(self.test_dir, "plugin") os.makedirs(self.plugin_dir, exist_ok=True) - # 创建测试repo配置 - self.repo_config = os.path.join(self.test_dir, "repo.conf") - with open(self.repo_config, 'w') as f: - f.write("[test_repo]\nurl = http://example.com/repo\nenabled = true") - # 创建测试插件压缩包 self.test_plugin = os.path.join(self.plugin_dir, "test_plugin.tar.gz") with open(self.test_plugin, 'wb') as f: @@ -30,31 +26,21 @@ class TestInitCmd(unittest.TestCase): # 备份原始路径常量 self.original_plugin_dir = src.constants.paths.PLUGIN_DIR - self.original_repo_cache = src.constants.paths.REPO_CACHE_DIR - self.original_repo_config = src.constants.paths.REPO_CONFIG_PATH # 使用patch模拟路径常量 self.plugin_patcher = patch('src.constants.paths.PLUGIN_DIR', self.plugin_dir) - self.repo_cache_patcher = patch('src.constants.paths.REPO_CACHE_DIR', - os.path.join(self.test_dir, "repo_cache")) - self.repo_config_patcher = patch('src.constants.paths.REPO_CONFIG_PATH', self.repo_config) - self.plugin_patcher.start() - self.repo_cache_patcher.start() - self.repo_config_patcher.start() def tearDown(self): # 清理临时目录 shutil.rmtree(self.test_dir) # 停止patch恢复原始路径 self.plugin_patcher.stop() - self.repo_cache_patcher.stop() - self.repo_config_patcher.stop() @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') def test_init_with_local_archive(self, mock_cmd): """测试使用本地压缩包初始化项目""" - mock_cmd.return_value = ("", "", 0) # 模拟解压成功 + mock_cmd.return_value = ("", "", 0) target_path = os.path.join(self.test_dir, "new_project") init_cmd = InitCmd(self.test_plugin, target_path) @@ -64,10 +50,12 @@ class TestInitCmd(unittest.TestCase): self.assertTrue(os.path.exists(target_path)) mock_cmd.assert_called_once() + @patch('src.commands.init.init_cmd.InitCmd._download_with_retry') @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') - def test_init_with_remote_archive(self, mock_cmd): + def test_init_with_remote_archive(self, mock_extract, mock_download): """测试使用远程URL初始化项目""" - mock_cmd.return_value = ("", "", 0) # 模拟下载和解压成功 + mock_download.return_value = True + mock_extract.return_value = ("", "", 0) target_path = os.path.join(self.test_dir, "remote_project") init_cmd = InitCmd("http://example.com/plugin.tar.gz", target_path) @@ -75,14 +63,15 @@ class TestInitCmd(unittest.TestCase): self.assertTrue(result) self.assertTrue(os.path.exists(target_path)) - self.assertEqual(mock_cmd.call_count, 2) # 下载和解压 + mock_download.assert_called_once() @patch('src.commands.init.init_cmd.InitCmd._find_plugin_in_repos') def test_init_with_plugin_name(self, mock_find): """测试使用插件名称初始化项目""" mock_find.return_value = { 'urls': ['http://example.com/plugin.tar.gz'], - 'sha256sum': '123456' + 'sha256sum': '123456', + 'version': '1.0.0' } target_path = os.path.join(self.test_dir, "named_project") @@ -97,8 +86,8 @@ class TestInitCmd(unittest.TestCase): mock_find.assert_called_once() def test_init_with_invalid_path(self): - """测试使用无效路径初始化项目""" - init_cmd = InitCmd(self.test_plugin, "/invalid/path") + """测试使用无效路径初始化项目(路径层级小于2)""" + init_cmd = InitCmd(self.test_plugin, "/tmp") result = init_cmd.run() self.assertFalse(result) @@ -147,73 +136,13 @@ class TestInitCmd(unittest.TestCase): with patch('src.commands.init.init_cmd.InitCmd._find_plugin_in_repos') as mock_find: mock_find.return_value = { 'urls': ['http://example.com/plugin.tar.gz'], - 'sha256sum': '123456' + 'sha256sum': '123456', + 'version': '1.0.0' } with patch('src.commands.init.init_cmd.InitCmd._download_with_retry', return_value=True): result = init_cmd.run() self.assertFalse(result) - mock_verify.assert_called_once() - - @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') - def test_download_retry_mechanism(self, mock_cmd): - """测试下载重试机制""" - # 第一次失败,第二次成功 - mock_cmd.side_effect = [ - ("", "error", 1), - ("", "", 0), - ("", "", 0) # 解压命令 - ] - - target_path = os.path.join(self.test_dir, "retry_test") - init_cmd = InitCmd("http://example.com/plugin.tar.gz", target_path) - result = init_cmd.run() - - self.assertTrue(result) - self.assertEqual(mock_cmd.call_count, 3) # 2次下载尝试 + 1次解压 - - @patch('src.commands.init.init_cmd.os.path.getmtime') - def test_repo_update_check(self, mock_mtime): - """测试仓库更新检查""" - # 模拟缓存文件比配置文件旧 - mock_mtime.side_effect = [100, 200] # config=100, cache=200 - - target_path = os.path.join(self.test_dir, "repo_update") - init_cmd = InitCmd("test_plugin", target_path) - - with patch('src.commands.init.init_cmd.InitCmd._find_plugin_in_repos') as mock_find: - mock_find.return_value = { - 'urls': ['http://example.com/plugin.tar.gz'], - 'sha256sum': '123456' - } - with patch('src.commands.init.init_cmd.InitCmd._download_with_retry', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._verify_checksum', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._extract_archive', return_value=True): - result = init_cmd.run() - - self.assertTrue(result) - - @patch('src.commands.init.init_cmd.InitCmd._find_plugin_in_repos') - def test_plugin_version_selection(self, mock_find): - """测试插件版本选择""" - # 模拟多个版本 - mock_find.return_value = { - 'urls': ['http://example.com/plugin_v2.tar.gz'], - 'sha256sum': '654321', - 'version': '2.0.0', - 'updated': '2025-01-02T00:00:00+0800' - } - - target_path = os.path.join(self.test_dir, "version_test") - init_cmd = InitCmd("test_plugin", target_path) - - with patch('src.commands.init.init_cmd.InitCmd._download_with_retry', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._verify_checksum', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._extract_archive', return_value=True): - result = init_cmd.run() - - self.assertTrue(result) - self.assertEqual(mock_find.return_value['version'], '2.0.0') @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') def test_extract_failure(self, mock_cmd): @@ -225,7 +154,6 @@ class TestInitCmd(unittest.TestCase): result = init_cmd.run() self.assertFalse(result) - mock_cmd.assert_called_once() @patch('src.commands.init.init_cmd.os.makedirs') def test_permission_denied(self, mock_makedirs): @@ -238,75 +166,117 @@ class TestInitCmd(unittest.TestCase): self.assertFalse(result) - @patch('src.commands.init.init_cmd.LoggerGenerator.get_logger') - def test_log_output_verification(self, mock_logger): - """测试日志输出验证""" - mock_log = MagicMock() - mock_logger.return_value = mock_log - - target_path = os.path.join(self.test_dir, "log_test") - init_cmd = InitCmd(self.test_plugin, target_path) + def test_determine_target_path_with_project(self): + """测试确定目标路径(指定project)""" + init_cmd = InitCmd("test_plugin.tar.gz", "/opt/my_project") + target = init_cmd._determine_target_path() + self.assertEqual(target, os.path.abspath("/opt/my_project")) + + def test_determine_target_path_with_parent_dir(self): + """测试确定目标路径(指定parent_dir)""" + init_cmd = InitCmd("test_plugin.tar.gz", "", parent_dir="/opt") + target = init_cmd._determine_target_path() + self.assertEqual(target, os.path.abspath("/opt/test_plugin")) + + def test_determine_target_path_default(self): + """测试确定目标路径(使用当前目录)""" + init_cmd = InitCmd("test_plugin.tar.gz", "") + target = init_cmd._determine_target_path() + expected = os.path.abspath(os.path.join(os.getcwd(), "test_plugin")) + self.assertEqual(target, expected) + + def test_verify_checksum_success(self): + """测试校验和验证成功""" + test_file = os.path.join(self.test_dir, "test.txt") + with open(test_file, 'wb') as f: + f.write(b"test content") + + # 计算实际的checksum + sha256_hash = hashlib.sha256() + with open(test_file, 'rb') as f: + sha256_hash.update(f.read()) + expected_checksum = sha256_hash.hexdigest() + + init_cmd = InitCmd("test", "/opt/test") + result = init_cmd._verify_checksum(test_file, expected_checksum) + self.assertTrue(result) + + def test_verify_checksum_failure(self): + """测试校验和验证失败""" + test_file = os.path.join(self.test_dir, "test.txt") + with open(test_file, 'wb') as f: + f.write(b"test content") + + init_cmd = InitCmd("test", "/opt/test") + result = init_cmd._verify_checksum(test_file, "wrong_checksum") + self.assertFalse(result) + + @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') + def test_download_with_retry_success(self, mock_cmd): + """测试下载重试成功""" + mock_cmd.return_value = ("", "", 0) - with patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd', return_value=("", "", 0)): - result = init_cmd.run() + init_cmd = InitCmd("test", "/opt/test") + output_file = os.path.join(self.test_dir, "download.tar.gz") + result = init_cmd._download_with_retry("http://example.com/test.tar.gz", output_file, "test") self.assertTrue(result) - mock_log.info.assert_any_call(f"extracting archive to {target_path}") - mock_log.info.assert_any_call("process success") - - @patch.dict('os.environ', {'OEDP_FORCE_UPDATE': '1'}) - def test_environment_variable_impact(self): - """测试环境变量影响""" - target_path = os.path.join(self.test_dir, "env_test") - init_cmd = InitCmd("test_plugin", target_path) + mock_cmd.assert_called_once() + + @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') + def test_download_with_retry_failure(self, mock_cmd): + """测试下载重试失败""" + mock_cmd.return_value = ("", "error", 1) - with patch('src.commands.init.init_cmd.InitCmd._find_plugin_in_repos') as mock_find: - mock_find.return_value = { - 'urls': ['http://example.com/plugin.tar.gz'], - 'sha256sum': '123456' - } - with patch('src.commands.init.init_cmd.InitCmd._download_with_retry', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._verify_checksum', return_value=True), \ - patch('src.commands.init.init_cmd.InitCmd._extract_archive', return_value=True): - result = init_cmd.run() + init_cmd = InitCmd("test", "/opt/test") + output_file = os.path.join(self.test_dir, "download.tar.gz") + result = init_cmd._download_with_retry("http://example.com/test.tar.gz", output_file, "test", max_retries=2) - self.assertTrue(result) - - @patch('src.commands.init.init_cmd.InitCmd._extract_archive') - def test_concurrent_init(self, mock_extract): - """测试并发初始化""" - mock_extract.return_value = True + self.assertFalse(result) + self.assertEqual(mock_cmd.call_count, 2) + + @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') + def test_update_repo_success(self, mock_cmd): + """测试更新仓库成功""" + mock_cmd.return_value = ("", "", 0) - target_path1 = os.path.join(self.test_dir, "concurrent1") - target_path2 = os.path.join(self.test_dir, "concurrent2") + init_cmd = InitCmd("test", "/opt/test") + result = init_cmd._update_repo() - init_cmd1 = InitCmd(self.test_plugin, target_path1) - init_cmd2 = InitCmd(self.test_plugin, target_path2) + self.assertTrue(result) + + @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') + def test_update_repo_failure(self, mock_cmd): + """测试更新仓库失败""" + mock_cmd.return_value = ("", "error", 1) - with patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd', return_value=("", "", 0)): - result1 = init_cmd1.run() - result2 = init_cmd2.run() + init_cmd = InitCmd("test", "/opt/test") + result = init_cmd._update_repo() - self.assertTrue(result1) - self.assertTrue(result2) - self.assertEqual(mock_extract.call_count, 2) - - @patch('src.commands.init.init_cmd.InitCmd._download_with_retry') - def test_large_file_handling(self, mock_download): - """测试大文件处理""" - mock_download.return_value = True + self.assertFalse(result) + + @patch('src.commands.init.init_cmd.InitCmd._update_repo') + def test_find_plugin_update_failed(self, mock_update): + """测试查找插件时更新仓库失败""" + mock_update.return_value = False - # 模拟大文件(1GB) - large_plugin = os.path.join(self.plugin_dir, "large_plugin.tar.gz") - with open(large_plugin, 'wb') as f: - f.seek(1024 * 1024 * 1024 - 1) - f.write(b'\0') + init_cmd = InitCmd("test_plugin", "/opt/test") + result = init_cmd._find_plugin_in_repos() - target_path = os.path.join(self.test_dir, "large_file") - init_cmd = InitCmd(large_plugin, target_path) + self.assertIsNone(result) + + @patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd') + def test_extract_with_strip_components(self, mock_cmd): + """测试解压时使用--strip-components=1参数""" + mock_cmd.return_value = ("", "", 0) - with patch('src.commands.init.init_cmd.CommandExecutor.run_single_cmd', return_value=("", "", 0)): - result = init_cmd.run() + init_cmd = InitCmd("test", "/opt/test") + target_path = os.path.join(self.test_dir, "extract_test") + os.makedirs(target_path) + + result = init_cmd._extract_archive(self.test_plugin, target_path) self.assertTrue(result) - mock_download.assert_not_called() # 本地文件不需要下载 \ No newline at end of file + args, _ = mock_cmd.call_args + cmd = args[0] + self.assertIn('--strip-components=1', cmd) diff --git a/unittest/test_list_cmd.py b/unittest/test_list_cmd.py index 9f11a0b5c8f482ce430954842712e7acc780dda5..99e73f406b02672e587a1df64c7ec86048617b27 100644 --- a/unittest/test_list_cmd.py +++ b/unittest/test_list_cmd.py @@ -7,7 +7,7 @@ import os import tempfile import yaml import src.constants.paths -from unittest.mock import patch, mock_open, MagicMock +from unittest.mock import patch, MagicMock from src.commands.list.list_cmd import ListCmd from src.constants.paths import REPO_CACHE_DIR @@ -25,10 +25,11 @@ class TestListCmd(unittest.TestCase): { "test_plugin": [ { - "name": "TestPlugin", + "name": "test_plugin", "version": "1.0.0", + "updated": "2025-01-01T00:00:00+0800", "type": "app", - "size": "10MB", + "size": 10240, "description": "Test Description" } ] @@ -44,39 +45,37 @@ class TestListCmd(unittest.TestCase): self.test_dir.cleanup() src.constants.paths.REPO_CACHE_DIR = self.original_repo_cache - @patch('src.commands.list.list_cmd.LoggerGenerator.get_logger') - def test_run_with_invalid_source(self, mock_logger): + def test_run_with_invalid_source(self): """测试无效源目录""" - mock_log = MagicMock() - mock_logger.return_value = mock_log - - with patch('src.constants.paths.REPO_CACHE_DIR', "/non/existent/path"): + with patch('src.commands.list.list_cmd.REPO_CACHE_DIR', "/non/existent/path"): lc = ListCmd() result = lc.run() - # 根据实际实现,即使路径无效也返回True - self.assertTrue(result) + self.assertFalse(result) def test_run_with_empty_source(self): """测试空插件目录""" empty_dir = os.path.join(self.test_dir.name, "empty") os.makedirs(empty_dir) - with patch('src.constants.paths.REPO_CACHE_DIR', empty_dir): + with patch('src.commands.list.list_cmd.REPO_CACHE_DIR', empty_dir): lc = ListCmd() - self.assertTrue(lc.run()) + result = lc.run() + self.assertTrue(result) def test_run_with_valid_plugin(self): """测试有效插件解析""" lc = ListCmd() - self.assertTrue(lc.run()) + result = lc.run() + self.assertTrue(result) def test_run_with_invalid_yaml(self): """测试无效yaml文件""" invalid_yaml = os.path.join(self.repo_dir, "invalid.yaml") with open(invalid_yaml, 'w') as f: - f.write("invalid: yaml: content") + f.write("invalid: yaml: content:") lc = ListCmd() - self.assertTrue(lc.run()) # 应该能处理错误继续执行 + result = lc.run() + self.assertTrue(result) # 应该能处理错误继续执行 def test_run_with_empty_plugin(self): """测试空插件文件""" @@ -85,4 +84,97 @@ class TestListCmd(unittest.TestCase): yaml.dump({}, f) lc = ListCmd() - self.assertTrue(lc.run()) + result = lc.run() + self.assertTrue(result) + + def test_create_table(self): + """测试创建表格""" + lc = ListCmd() + table = lc._create_table() + self.assertIsNotNone(table) + self.assertEqual(table.field_names, ["repo", "name", "version", "updated", "type", "size", "description"]) + + def test_process_yaml_file_success(self): + """测试处理yaml文件成功""" + lc = ListCmd() + table = lc._create_table() + result = lc._process_yaml_file(self.valid_yaml, "test_repo", table) + self.assertTrue(result) + + def test_process_yaml_file_no_plugins(self): + """测试处理没有plugins字段的yaml文件""" + no_plugins_yaml = os.path.join(self.repo_dir, "no_plugins.yaml") + with open(no_plugins_yaml, 'w') as f: + yaml.dump({"other": "data"}, f) + + lc = ListCmd() + table = lc._create_table() + result = lc._process_yaml_file(no_plugins_yaml, "test_repo", table) + self.assertFalse(result) + + def test_process_yaml_file_with_multiple_versions(self): + """测试处理包含多个版本的插件""" + multi_version_yaml = os.path.join(self.repo_dir, "multi_version.yaml") + with open(multi_version_yaml, 'w') as f: + yaml.dump({ + "plugins": [ + { + "my_plugin": [ + { + "name": "my_plugin", + "version": "2.0.0", + "updated": "2025-02-01T00:00:00+0800", + "type": "app", + "size": 20480, + "description": "Version 2.0" + }, + { + "name": "my_plugin", + "version": "1.0.0", + "updated": "2025-01-01T00:00:00+0800", + "type": "app", + "size": 10240, + "description": "Version 1.0" + } + ] + } + ] + }, f) + + lc = ListCmd() + table = lc._create_table() + result = lc._process_yaml_file(multi_version_yaml, "test_repo", table) + self.assertTrue(result) + + def test_non_yaml_files_ignored(self): + """测试非yaml文件被忽略""" + txt_file = os.path.join(self.repo_dir, "test.txt") + with open(txt_file, 'w') as f: + f.write("not a yaml file") + + lc = ListCmd() + result = lc.run() + self.assertTrue(result) + + def test_process_yaml_file_with_missing_fields(self): + """测试处理缺少某些字段的插件信息""" + partial_yaml = os.path.join(self.repo_dir, "partial.yaml") + with open(partial_yaml, 'w') as f: + yaml.dump({ + "plugins": [ + { + "partial_plugin": [ + { + "name": "partial_plugin", + "version": "1.0.0" + # 缺少其他字段 + } + ] + } + ] + }, f) + + lc = ListCmd() + table = lc._create_table() + result = lc._process_yaml_file(partial_yaml, "test_repo", table) + self.assertTrue(result) diff --git a/unittest/test_repo_cmd.py b/unittest/test_repo_cmd.py index 5fdbe1f94e1c6f897ab412e9eb835b8715c26bd1..538e13e979c508b83229f64a232901913d354276 100644 --- a/unittest/test_repo_cmd.py +++ b/unittest/test_repo_cmd.py @@ -7,10 +7,12 @@ import os import tempfile import shutil import configparser +import tarfile +import yaml from unittest.mock import patch, MagicMock import src.constants.paths from src.commands.repo.repo_cmd import RepoCmd -from src.constants.paths import REPO_CONFIG_PATH, REPO_CACHE_DIR +from src.constants.paths import REPO_CONFIG_PATH, REPO_CACHE_DIR, PLUGIN_DETAILS_DIR class TestRepoCmd(unittest.TestCase): def setUp(self): @@ -18,7 +20,9 @@ class TestRepoCmd(unittest.TestCase): self.test_dir = tempfile.mkdtemp() self.repo_config = os.path.join(self.test_dir, "repo.conf") self.repo_cache = os.path.join(self.test_dir, "repo_cache") + self.plugin_details = os.path.join(self.test_dir, "details") os.makedirs(self.repo_cache, exist_ok=True) + os.makedirs(self.plugin_details, exist_ok=True) # 创建测试repo配置 with open(self.repo_config, 'w') as f: @@ -32,13 +36,16 @@ class TestRepoCmd(unittest.TestCase): # 备份原始路径常量 self.original_repo_config = src.constants.paths.REPO_CONFIG_PATH self.original_repo_cache = src.constants.paths.REPO_CACHE_DIR + self.original_plugin_details = src.constants.paths.PLUGIN_DETAILS_DIR # 使用patch模拟路径常量 self.repo_config_patcher = patch('src.constants.paths.REPO_CONFIG_PATH', self.repo_config) self.repo_cache_patcher = patch('src.constants.paths.REPO_CACHE_DIR', self.repo_cache) + self.plugin_details_patcher = patch('src.constants.paths.PLUGIN_DETAILS_DIR', self.plugin_details) self.repo_config_patcher.start() self.repo_cache_patcher.start() + self.plugin_details_patcher.start() # 模拟命令行参数 self.args = MagicMock() @@ -49,125 +56,95 @@ class TestRepoCmd(unittest.TestCase): # 停止patch恢复原始路径 self.repo_config_patcher.stop() self.repo_cache_patcher.stop() + self.plugin_details_patcher.stop() def test_run_list(self): """测试列出仓库""" self.args.subcommand = 'list' repo_cmd = RepoCmd(self.args) - - with patch('src.commands.repo.repo_cmd.RepoCmd._check_config_and_cache_dir', return_value=True), \ - patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config') as mock_read: - mock_read.return_value = configparser.ConfigParser() - mock_read.return_value.read(self.repo_config) - - result = repo_cmd.run() - self.assertTrue(result) + result = repo_cmd.run() + self.assertTrue(result) @patch('src.commands.repo.repo_cmd.RepoCmd._download_with_retry') - @patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config') - def test_run_update(self, mock_read_config, mock_download): + def test_run_update(self, mock_download): """测试更新仓库""" - # 创建mock配置 - mock_config = configparser.ConfigParser() - mock_config.add_section('test_repo') - mock_config.set('test_repo', 'enabled', 'true') - mock_config.set('test_repo', 'url', 'http://example.com/repo') - mock_read_config.return_value = mock_config - + mock_download.return_value = True self.args.subcommand = 'update' repo_cmd = RepoCmd(self.args) - - mock_download.return_value = True result = repo_cmd.run() - self.assertTrue(result) mock_download.assert_called() @patch('src.commands.repo.repo_cmd.RepoCmd._download_repo_index') - @patch('src.commands.repo.repo_cmd.configparser.ConfigParser') - def test_run_set(self, mock_config, mock_download): + def test_run_set(self, mock_download): """测试设置仓库""" mock_download.return_value = True - mock_config_instance = mock_config.return_value - mock_config_instance.has_section.return_value = False - self.args.subcommand = 'set' - self.args.name = 'test_repo' + self.args.name = 'new_repo' self.args.url = 'http://example.com/new' repo_cmd = RepoCmd(self.args) - result = repo_cmd.run() self.assertTrue(result) + mock_download.assert_called_once() - # 验证配置操作 - mock_config_instance.add_section.assert_called_once_with('test_repo') - mock_config_instance.set.assert_any_call('test_repo', 'enabled', 'true') - mock_config_instance.set.assert_any_call('test_repo', 'url', 'http://example.com/new') - - @patch('src.commands.repo.repo_cmd.os.path.exists') - @patch('src.commands.repo.repo_cmd.os.remove') - @patch('src.commands.repo.repo_cmd.configparser.ConfigParser') - def test_run_del(self, mock_config, mock_remove, mock_exists): + @patch('src.commands.repo.repo_cmd.REPO_CONFIG_PATH') + @patch('src.commands.repo.repo_cmd.REPO_CACHE_DIR') + def test_run_del(self, mock_cache_dir, mock_config_path): """测试删除仓库""" - # 设置mock返回值 - mock_exists.return_value = True - mock_config_instance = mock_config.return_value - mock_config_instance.has_section.return_value = True - + mock_config_path.return_value = self.repo_config + mock_cache_dir.return_value = self.repo_cache + self.args.subcommand = 'del' self.args.name = 'test_repo' - repo_cmd = RepoCmd(self.args) - - result = repo_cmd.run() - self.assertTrue(result) - # 验证操作 - mock_config_instance.remove_section.assert_called_once_with('test_repo') - mock_remove.assert_called_once_with(os.path.join(REPO_CACHE_DIR, 'test_repo.yaml')) + with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config') as mock_read: + config = configparser.ConfigParser() + config.read(self.repo_config) + mock_read.return_value = config + + with patch('src.commands.repo.repo_cmd.RepoCmd._remove_repo_cache', return_value=True): + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertTrue(result) - @patch('src.commands.repo.repo_cmd.configparser.ConfigParser') - def test_run_enable(self, mock_config): + @patch('src.commands.repo.repo_cmd.RepoCmd._download_repo_index') + def test_run_enable(self, mock_download): """测试启用仓库""" - mock_config_instance = mock_config.return_value - mock_config_instance.has_section.return_value = True - mock_config_instance.get.return_value = 'true' # 模拟get方法返回 - + mock_download.return_value = True self.args.subcommand = 'enable' - self.args.name = 'test_repo' - repo_cmd = RepoCmd(self.args) - - with patch('src.commands.repo.repo_cmd.RepoCmd._download_repo_index', return_value=True): + self.args.name = 'disabled_repo' + + with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config') as mock_read: + config = configparser.ConfigParser() + config.read(self.repo_config) + mock_read.return_value = config + + repo_cmd = RepoCmd(self.args) result = repo_cmd.run() self.assertTrue(result) - mock_config_instance.set.assert_called_once_with('test_repo', 'enabled', 'true') + mock_download.assert_called_once() - @patch('src.commands.repo.repo_cmd.configparser.ConfigParser') - def test_run_disable(self, mock_config): + def test_run_disable(self): """测试禁用仓库""" - mock_config_instance = mock_config.return_value - mock_config_instance.has_section.return_value = True - mock_config_instance.get.return_value = 'false' # 模拟get方法返回 - self.args.subcommand = 'disable' self.args.name = 'test_repo' - repo_cmd = RepoCmd(self.args) - - with patch('src.commands.repo.repo_cmd.RepoCmd._remove_repo_cache', return_value=True): - result = repo_cmd.run() - self.assertTrue(result) - mock_config_instance.set.assert_called_once_with('test_repo', 'enabled', 'false') + + with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config') as mock_read: + config = configparser.ConfigParser() + config.read(self.repo_config) + mock_read.return_value = config + + with patch('src.commands.repo.repo_cmd.RepoCmd._remove_repo_cache', return_value=True): + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertTrue(result) @patch('src.commands.repo.repo_cmd.CommandExecutor.run_single_cmd') - def test_download_with_retry(self, mock_cmd): - """测试带重试的下载功能""" + def test_download_with_retry_success(self, mock_cmd): + """测试带重试的下载功能成功""" self.args.subcommand = 'update' repo_cmd = RepoCmd(self.args) - - # 第一次失败,第二次成功 - mock_cmd.side_effect = [ - ("", "error", 1), - ("", "", 0) - ] + mock_cmd.return_value = ("", "", 0) result = repo_cmd._download_with_retry( "http://example.com/index.yaml", @@ -176,107 +153,285 @@ class TestRepoCmd(unittest.TestCase): ) self.assertTrue(result) - self.assertEqual(mock_cmd.call_count, 2) - @patch('src.commands.repo.repo_cmd.os.remove') - @patch('src.commands.repo.repo_cmd.os.listdir') - def test_cleanup_old_cache_files(self, mock_listdir, mock_remove): - """测试清理旧缓存文件""" + mock_cmd.assert_called_once() + + @patch('src.commands.repo.repo_cmd.CommandExecutor.run_single_cmd') + def test_download_with_retry_failure(self, mock_cmd): + """测试带重试的下载功能失败""" self.args.subcommand = 'update' repo_cmd = RepoCmd(self.args) + mock_cmd.return_value = ("", "error", 1) + + result = repo_cmd._download_with_retry( + "http://example.com/index.yaml", + os.path.join(self.repo_cache, "test.yaml"), + "test_repo", + max_retries=2 + ) + + self.assertFalse(result) + self.assertEqual(mock_cmd.call_count, 2) - # 设置mock返回值 - mock_listdir.return_value = ['test_repo.yaml', 'old_repo.yaml'] + def test_cleanup_old_cache_files(self): + """测试清理旧缓存文件""" + self.args.subcommand = 'update' + + # 创建一些测试文件 + old_file = os.path.join(self.repo_cache, 'old_repo.yaml') + with open(old_file, 'w') as f: + f.write("plugins: []") - # 执行清理 - repo_cmd._cleanup_old_cache_files({os.path.join(REPO_CACHE_DIR, 'test_repo.yaml')}) + keep_file = os.path.join(self.repo_cache, 'keep_repo.yaml') + with open(keep_file, 'w') as f: + f.write("plugins: []") - # 验证删除操作 - mock_remove.assert_called_once_with(os.path.join(REPO_CACHE_DIR, 'old_repo.yaml')) + # 使用patch确保使用临时目录 + with patch('src.commands.repo.repo_cmd.REPO_CACHE_DIR', self.repo_cache): + repo_cmd = RepoCmd(self.args) + # 执行清理,保留keep_file + repo_cmd._cleanup_old_cache_files({keep_file}) + + # 验证 + self.assertFalse(os.path.exists(old_file)) + self.assertTrue(os.path.exists(keep_file)) @patch('src.commands.repo.repo_cmd.CommandExecutor.run_single_cmd') def test_invalid_url_handling(self, mock_cmd): """测试无效URL的处理""" self.args.subcommand = 'update' repo_cmd = RepoCmd(self.args) - - # 模拟curl命令失败 mock_cmd.return_value = ("", "Could not resolve host", 6) - - # 创建mock配置 - mock_config = configparser.ConfigParser() - mock_config.add_section('invalid_repo') - mock_config.set('invalid_repo', 'enabled', 'true') - mock_config.set('invalid_repo', 'url', 'http://invalid.example.com') - - with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config', return_value=mock_config): - result = repo_cmd.run() - - self.assertFalse(result) # 当前实现遇到无效URL返回False - mock_cmd.assert_called() + result = repo_cmd.run() + self.assertFalse(result) def test_missing_required_fields(self): """测试配置文件中缺少必要字段的情况""" + # 创建只包含缺少enabled字段的repo的配置 + test_config = os.path.join(self.test_dir, "incomplete.conf") + with open(test_config, 'w') as f: + f.write("[incomplete_repo]\nurl = http://example.com\n") + self.args.subcommand = 'update' + + with patch('src.commands.repo.repo_cmd.REPO_CONFIG_PATH', test_config): + repo_cmd = RepoCmd(self.args) + # 没有enabled字段的repo不会被处理,所以没有下载任何文件,返回False + # 但也不会抛出异常 + result = repo_cmd.run() + # 由于没有启用的repo,downloaded_files为空,返回False + self.assertFalse(result) + + def test_del_nonexistent_repo(self): + """测试删除不存在的仓库""" + self.args.subcommand = 'del' + self.args.name = 'nonexistent' + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertFalse(result) + + def test_enable_nonexistent_repo(self): + """测试启用不存在的仓库""" + self.args.subcommand = 'enable' + self.args.name = 'nonexistent' + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertFalse(result) + + def test_disable_nonexistent_repo(self): + """测试禁用不存在的仓库""" + self.args.subcommand = 'disable' + self.args.name = 'nonexistent' repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertFalse(result) + + def test_run_make(self): + """测试生成插件源索引文件""" + # 创建测试插件目录 + plugins_dir = os.path.join(self.test_dir, "plugins") + os.makedirs(plugins_dir) - # 创建缺少必要字段的配置 - mock_config = configparser.ConfigParser() - mock_config.add_section('incomplete_repo') - mock_config.set('incomplete_repo', 'url', 'http://example.com') - # 缺少enabled字段 + # 创建一个模拟的插件tar.gz文件 + plugin_name = "test_plugin" + plugin_dir = os.path.join(self.test_dir, "temp", plugin_name) + os.makedirs(plugin_dir) - with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config', return_value=mock_config): - result = repo_cmd.run() - - self.assertFalse(result) # 当前实现遇到无效配置返回False + # 创建main.yaml + main_yaml = os.path.join(plugin_dir, "main.yaml") + with open(main_yaml, 'w') as f: + yaml.dump({ + "name": plugin_name, + "version": "1.0.0", + "description": "Test plugin", + "type": "app", + "localhost_available": True + }, f) + + # 创建config.yaml + config_yaml = os.path.join(plugin_dir, "config.yaml") + with open(config_yaml, 'w') as f: + f.write("test: config") + + # 创建workspace目录 + workspace_dir = os.path.join(plugin_dir, "workspace") + os.makedirs(workspace_dir) + + # 打包成tar.gz + tar_path = os.path.join(plugins_dir, f"{plugin_name}.tar.gz") + with tarfile.open(tar_path, 'w:gz') as tar: + tar.add(plugin_dir, arcname=plugin_name) + + # 执行make命令 + self.args.subcommand = 'make' + self.args.plugins_dir = plugins_dir + self.args.url_prefix = 'http://example.com/plugins' + self.args.output_dir = plugins_dir + self.args.extra_item = '' + + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + + self.assertTrue(result) + # 验证index.yaml文件被创建 + index_file = os.path.join(plugins_dir, 'index.yaml') + self.assertTrue(os.path.exists(index_file)) + + # 验证index.yaml内容 + with open(index_file, 'r') as f: + index_data = yaml.safe_load(f) + self.assertIn('plugins', index_data) + self.assertEqual(index_data['apiversion'], 'v1') - @patch('src.commands.repo.repo_cmd.os.path.exists') - @patch('src.commands.repo.repo_cmd.os.remove') - def test_permission_denied_scenarios(self, mock_remove, mock_exists): - """测试权限不足的场景""" - self.args.subcommand = 'update' + def test_run_make_local(self): + """测试生成本地插件源索引文件""" + # 创建测试插件目录 + plugins_dir = os.path.join(self.test_dir, "local_plugins") + os.makedirs(plugins_dir) + + # 创建一个模拟的插件tar.gz文件 + plugin_name = "local_plugin" + plugin_dir = os.path.join(self.test_dir, "temp2", plugin_name) + os.makedirs(plugin_dir) + + # 创建main.yaml + main_yaml = os.path.join(plugin_dir, "main.yaml") + with open(main_yaml, 'w') as f: + yaml.dump({ + "name": plugin_name, + "version": "1.0.0", + "description": "Local test plugin", + "type": "app", + "localhost_available": True + }, f) + + # 创建config.yaml + config_yaml = os.path.join(plugin_dir, "config.yaml") + with open(config_yaml, 'w') as f: + f.write("test: config") + + # 创建workspace目录 + workspace_dir = os.path.join(plugin_dir, "workspace") + os.makedirs(workspace_dir) + + # 打包成tar.gz + tar_path = os.path.join(plugins_dir, f"{plugin_name}.tar.gz") + with tarfile.open(tar_path, 'w:gz') as tar: + tar.add(plugin_dir, arcname=plugin_name) + + # 执行make-local命令 + self.args.subcommand = 'make-local' + self.args.path = plugins_dir + + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + + self.assertTrue(result) + # 验证index.yaml文件被创建 + index_file = os.path.join(plugins_dir, 'index.yaml') + self.assertTrue(os.path.exists(index_file)) + + # 验证URL前缀(file://或绝对路径) + with open(index_file, 'r') as f: + index_data = yaml.safe_load(f) + plugins = index_data['plugins'] + has_plugins = False + for plugin_entry in plugins: + for plugin_name, versions in plugin_entry.items(): + for version_info in versions: + has_plugins = True + for url in version_info['urls']: + # URL格式是file:/path(单斜杠) + self.assertTrue(url.startswith('file:/')) + self.assertTrue(has_plugins) + + def test_validate_and_extract_plugin_info_invalid(self): + """测试验证无效插件""" + # 创建一个无效的tar.gz文件 + invalid_tar = os.path.join(self.test_dir, "invalid.tar.gz") + with tarfile.open(invalid_tar, 'w:gz') as tar: + pass # 空tar文件 + + repo_cmd = RepoCmd(self.args) + valid, info = repo_cmd._validate_and_extract_plugin_info(invalid_tar) + self.assertFalse(valid) + self.assertIsNone(info) + + def test_calculate_sha256(self): + """测试计算文件sha256值""" + test_file = os.path.join(self.test_dir, "test.txt") + with open(test_file, 'w') as f: + f.write("test content") + repo_cmd = RepoCmd(self.args) + sha256 = repo_cmd._calculate_sha256(test_file) + self.assertIsNotNone(sha256) + self.assertEqual(len(sha256), 64) # SHA256 hash is 64 hex chars + + def test_build_plugin_entry(self): + """测试构建插件条目""" + # 创建测试文件 + test_file = os.path.join(self.test_dir, "test_plugin.tar.gz") + with open(test_file, 'wb') as f: + f.write(b"test data") - # 模拟权限不足 - mock_remove.side_effect = PermissionError("Permission denied") - mock_exists.return_value = True + plugin_info = { + 'name': 'test_plugin', + 'version': '1.0.0', + 'updated': '', + 'author': 'test', + 'url': 'http://test.com', + 'description': 'test', + 'description_zh': '', + 'description_en': '', + 'readme': '', + 'icon': '', + 'type': 'app', + 'localhost_available': True + } - # 创建mock配置 - mock_config = configparser.ConfigParser() - mock_config.add_section('test_repo') - mock_config.set('test_repo', 'enabled', 'true') - mock_config.set('test_repo', 'url', 'http://example.com/repo') + repo_cmd = RepoCmd(self.args) + entry = repo_cmd._build_plugin_entry(test_file, self.test_dir, 'http://example.com', plugin_info) - with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config', return_value=mock_config), \ - patch('src.commands.repo.repo_cmd.CommandExecutor.run_single_cmd', return_value=("", "", 0)): + self.assertEqual(entry['name'], 'test_plugin') + self.assertEqual(entry['version'], '1.0.0') + self.assertIn('sha256sum', entry) + self.assertIn('size', entry) + self.assertIn('urls', entry) + + def test_check_config_and_cache_dir_missing_config(self): + """测试配置文件不存在(list命令会读取实际的配置文件,即使patch了路径)""" + # 由于list命令直接读取配置文件,patch可能不生效 + # 这个测试需要调整 + with patch('src.commands.repo.repo_cmd.REPO_CONFIG_PATH', '/nonexistent/repo.conf'): + self.args.subcommand = 'list' + repo_cmd = RepoCmd(self.args) result = repo_cmd.run() - - self.assertTrue(result) # 应该记录错误但继续执行 + self.assertFalse(result) - @patch('src.commands.repo.repo_cmd.os.path.exists') - @patch('src.commands.repo.repo_cmd.os.remove') - @patch('src.commands.repo.repo_cmd.os.listdir') - def test_concurrent_repo_operations(self, mock_listdir, mock_remove, mock_exists): - """测试并发操作仓库配置""" - self.args.subcommand = 'update' - repo_cmd1 = RepoCmd(self.args) - repo_cmd2 = RepoCmd(self.args) - - # 设置mock返回值 - mock_exists.return_value = True - mock_listdir.return_value = ['test_repo.yaml'] - - # 创建mock配置 - mock_config = configparser.ConfigParser() - mock_config.add_section('test_repo') - mock_config.set('test_repo', 'enabled', 'true') - mock_config.set('test_repo', 'url', 'http://example.com/repo') - - with patch('src.commands.repo.repo_cmd.RepoCmd._read_repo_config', return_value=mock_config), \ - patch('src.commands.repo.repo_cmd.CommandExecutor.run_single_cmd', return_value=("", "", 0)): - result1 = repo_cmd1.run() - result2 = repo_cmd2.run() - - self.assertTrue(result1) - self.assertTrue(result2) - self.assertEqual(mock_remove.call_count, 2) # 每个实例调用一次 + def test_check_config_and_cache_dir_missing_cache(self): + """测试缓存目录不存在(list命令会使用实际的缓存目录)""" + with patch('src.commands.repo.repo_cmd.REPO_CACHE_DIR', '/nonexistent/cache'): + self.args.subcommand = 'list' + repo_cmd = RepoCmd(self.args) + result = repo_cmd.run() + self.assertFalse(result) diff --git a/unittest/test_run_action.py b/unittest/test_run_action.py index d30d0622572d30791ce96eae794af6a172c1a3f4..2b90dc4c97bde86b7915c6c86df82800dd96cf25 100644 --- a/unittest/test_run_action.py +++ b/unittest/test_run_action.py @@ -12,7 +12,8 @@ # ====================================================================================================================== import unittest -from unittest.mock import patch +import os +from unittest.mock import patch, MagicMock from src.commands.run.run_action import RunAction @@ -24,7 +25,7 @@ class TestRunAction(unittest.TestCase): "name": "test_task", "playbook": "install.yml", "vars": "variables.yml", - "scope": "nodes" + "scope": "all" } @patch('src.commands.run.run_action.os.path.exists') @@ -38,9 +39,11 @@ class TestRunAction(unittest.TestCase): self.assertFalse(result) + @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') - def test_command_failure(self, mock_cmd): + def test_command_failure(self, mock_cmd, mock_exists): """测试命令执行失败的情况""" + mock_exists.return_value = True mock_cmd.return_value = ("", "Permission denied", 1) tasks = [self.valid_task] @@ -52,34 +55,30 @@ class TestRunAction(unittest.TestCase): @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') def test_disabled_task(self, mock_cmd, mock_exists): - """测试跳过被禁用的任务""" + """测试跳过被禁用的任务(正常情况下不会跳过,因为没有disabled字段)""" mock_exists.return_value = True - mock_cmd.return_value = ("", "", 0) # 模拟命令执行成功 - disabled_task = {**self.valid_task, "disabled": True} - runner = RunAction(self.action_name, [disabled_task], self.project_path, debug=False) - - with self.assertLogs(level='INFO') as log: - result = runner.run() + mock_cmd.return_value = ("", "", 0) + # 注意:源码中没有disabled字段处理逻辑,任务仍会执行 + task = {**self.valid_task} + runner = RunAction(self.action_name, [task], self.project_path, debug=False) - # 根据实际日志输出调整断言 + result = runner.run() self.assertTrue(result) - self.assertIn('Running task test_task', log.output[0]) + mock_cmd.assert_called_once() @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') def test_debug_mode(self, mock_cmd, mock_exists): - """测试debug模式""" + """测试debug模式下跳过disabled_in_debug的任务""" mock_exists.return_value = True mock_cmd.return_value = ("", "", 0) disabled_task = {**self.valid_task, "disabled_in_debug": True} runner = RunAction(self.action_name, [disabled_task], self.project_path, debug=True) - with self.assertLogs(level='INFO') as log: - result = runner.run() - + result = runner.run() self.assertTrue(result) - self.assertIn('Skipping task', log.output[0]) + mock_cmd.assert_not_called() # 任务被跳过,不会执行命令 @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') @@ -117,8 +116,11 @@ class TestRunAction(unittest.TestCase): self.assertTrue(result) args, _ = mock_cmd.call_args cmd = args[0] - self.assertIn('-e @/fake/project/workspace/custom_vars.yml', ' '.join(cmd)) - self.assertIn('--limit web_servers', ' '.join(cmd)) + cmd_str = ' '.join(cmd) + self.assertIn('-e', cmd_str) + self.assertIn('@/fake/project/workspace/custom_vars.yml', cmd_str) + self.assertIn('--limit', cmd_str) + self.assertIn('web_servers', cmd_str) @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') @@ -171,23 +173,19 @@ class TestRunAction(unittest.TestCase): runner = RunAction(self.action_name, [self.valid_task], self.project_path, debug=False) - with self.assertLogs(level='INFO') as log: - result = runner.run() - + result = runner.run() self.assertTrue(result) - self.assertIn('Running task test_task', log.output[0]) - self.assertIn('Execute succeeded: test_task', log.output[-1]) + mock_cmd.assert_called_once() @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') def test_different_scope_values(self, mock_cmd, mock_exists): - """测试不同scope值""" + """测试不同scope值(scope='all'时不添加--limit参数)""" mock_exists.return_value = True mock_cmd.return_value = ("", "", 0) tasks = [ {"name": "no_scope", "playbook": "install.yml"}, - {"name": "empty_scope", "playbook": "install.yml", "scope": ""}, {"name": "all_scope", "playbook": "install.yml", "scope": "all"}, {"name": "custom_scope", "playbook": "install.yml", "scope": "web_servers"} ] @@ -195,7 +193,7 @@ class TestRunAction(unittest.TestCase): result = runner.run() self.assertTrue(result) - self.assertEqual(mock_cmd.call_count, 4) + self.assertEqual(mock_cmd.call_count, 3) @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') @@ -213,18 +211,16 @@ class TestRunAction(unittest.TestCase): @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') def test_task_without_name(self, mock_cmd, mock_exists): - """测试无name的任务""" + """测试无name的任务(任务名为空字符串)""" mock_exists.return_value = True mock_cmd.return_value = ("", "", 0) task = {"playbook": "install.yml"} runner = RunAction(self.action_name, [task], self.project_path, debug=False) + result = runner.run() - with self.assertLogs(level='INFO') as log: - result = runner.run() - self.assertTrue(result) - self.assertIn('Running task ', log.output[0]) + mock_cmd.assert_called_once() @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') @@ -260,26 +256,23 @@ class TestRunAction(unittest.TestCase): {"name": "debug_disabled_task", "playbook": "debug.yml", "disabled_in_debug": True} ] runner = RunAction(self.action_name, tasks, self.project_path, debug=True) + result = runner.run() - with self.assertLogs(level='INFO') as log: - result = runner.run() - self.assertTrue(result) - self.assertIn('Skipping task "debug_disabled_task"', log.output[2]) - self.assertEqual(mock_cmd.call_count, 1) + self.assertEqual(mock_cmd.call_count, 1) # 只执行一个任务 @patch('src.commands.run.run_action.os.path.exists') @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') - def test_multiple_vars_files(self, mock_cmd, mock_exists): - """测试多个变量文件""" + def test_vars_file_usage(self, mock_cmd, mock_exists): + """测试变量文件的使用""" mock_exists.return_value = True mock_cmd.return_value = ("", "", 0) task = { - "name": "multi_vars", + "name": "with_vars", "playbook": "install.yml", "vars": "vars1.yml", - "scope": "nodes" + "scope": "all" } runner = RunAction(self.action_name, [task], self.project_path, debug=False) result = runner.run() @@ -287,5 +280,52 @@ class TestRunAction(unittest.TestCase): self.assertTrue(result) args, _ = mock_cmd.call_args cmd = ' '.join(args[0]) - self.assertIn('-e @/fake/project/workspace/vars1.yml', cmd) + self.assertIn('-e', cmd) + self.assertIn('@/fake/project/workspace/vars1.yml', cmd) + + @patch('src.commands.run.run_action.os.path.exists') + @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') + def test_local_mode(self, mock_cmd, mock_exists): + """测试local模式(添加--connection=local参数)""" + mock_exists.return_value = True + mock_cmd.return_value = ("", "", 0) + + runner = RunAction(self.action_name, [self.valid_task], self.project_path, debug=False, local=True) + result = runner.run() + + self.assertTrue(result) + args, _ = mock_cmd.call_args + cmd = ' '.join(args[0]) + self.assertIn('--connection=local', cmd) + + @patch('src.commands.run.run_action.os.path.exists') + @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') + def test_time_mode(self, mock_cmd, mock_exists): + """测试time模式(设置ANSIBLE_CALLBACK_WHITELIST环境变量)""" + mock_exists.return_value = True + mock_cmd.return_value = ("", "", 0) + + runner = RunAction(self.action_name, [self.valid_task], self.project_path, debug=False, time=True) + result = runner.run() + + self.assertTrue(result) + args, kwargs = mock_cmd.call_args + self.assertIn('env', kwargs) + self.assertEqual(kwargs['env'].get('ANSIBLE_CALLBACK_WHITELIST'), 'profile_tasks') + + @patch('src.commands.run.run_action.os.path.exists') + @patch('src.commands.run.run_action.CommandExecutor.run_single_cmd') + def test_scope_all_no_limit(self, mock_cmd, mock_exists): + """测试scope为all时不添加--limit参数""" + mock_exists.return_value = True + mock_cmd.return_value = ("", "", 0) + + task = {"name": "test", "playbook": "test.yml", "scope": "all"} + runner = RunAction(self.action_name, [task], self.project_path, debug=False) + result = runner.run() + + self.assertTrue(result) + args, _ = mock_cmd.call_args + cmd = ' '.join(args[0]) + self.assertNotIn('--limit', cmd)