diff --git a/data/sysagent.service b/data/sysagent.service new file mode 100644 index 0000000000000000000000000000000000000000..4f893444dffb621d71cda5f6c8002d03ceeca099 --- /dev/null +++ b/data/sysagent.service @@ -0,0 +1,14 @@ +[Unit] +After=network.target nss-lookup.target +[Service] +User=root +WorkingDirectory=/usr/lib/sysagent +Environment="PYTHONPATH=/usr/lib/sysagent" +Environment="CONFIG=/etc/sysagent/config.toml" +ExecStart=/usr/bin/python3 apps/main.py +ExecReload=/bin/kill -HUP $MAINPID +Restart=on-failure +RestartSec=10 +LimitNOFILE=infinity +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index bf5fbbf34e61afaeadf40ac96575c87ab95cc878..b5709e9f35f9afd08ba2ac7fedec7b02066cf0e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,25 +7,25 @@ dependencies = [ "aiofiles==24.1.0", "asyncer==0.0.8", "asyncpg==0.30.0", - "cryptography==44.0.2", + "cryptography==42.0.2", "fastapi==0.115.12", "httpx==0.28.1", "httpx-sse==0.4.0", - "jinja2==3.1.6", - "jionlp==1.5.20", - "jsonschema==4.23.0", + "jinja2==3.1.3", + "jionlp==1.5.22", + "jsonschema==4.21.1", "mcp==1.17.0", "minio==7.2.15", "ollama==0.5.3", "openai==2.3.0", - "pandas==2.2.3", + "pandas==2.2.2", "pgvector==0.4.1", "pillow==10.3.0", "pydantic==2.11.7", "python-jsonpath==1.3.0", "python-magic==0.4.27", "python-multipart==0.0.20", - "pyyaml==6.0.2", + "pyyaml==6.0.1", "rich==14.2.0", "sqlalchemy==2.0.41", "tiktoken==0.9.0", diff --git a/tests/common/test_config.py b/tests/common/test_config.py index 6c77141ca78e662ab12a8ec3c1ae017f3de6fced..dcca74ddb036ace5b86afd236c0b1874db169508 100644 --- a/tests/common/test_config.py +++ b/tests/common/test_config.py @@ -7,67 +7,35 @@ from typing import Any import pytest import toml +from pydantic import ValidationError from pytest_mock import MockerFixture from apps.common.config import Config -from apps.common.singleton import SingletonMeta from apps.schemas.config import ConfigModel MOCK_CONFIG_DATA: dict[str, Any] = { "deploy": {"mode": "local", "cookie": "domain", "data_dir": "/app/data"}, - "login": { - "provider": "authhub", - "settings": { - "host": "http://localhost", - "host_inner": "http://localhost", - "login_api": "http://localhost/api/auth/login", - "app_id": "test_id", - "app_secret": "test_secret", - }, - }, - "embedding": { - "type": "openai", - "endpoint": "http://localhost", - "api_key": "test_key", - "model": "test_model", - }, "rag": {"rag_service": "http://localhost"}, - "fastapi": {"domain": "localhost", "session_ttl": 30, "csrf": False}, + "fastapi": {"domain": "localhost"}, "minio": { "endpoint": "localhost:9000", "access_key": "test_key", "secret_key": "test_secret", "secure": False, }, - "mongodb": { + "postgres": { "host": "localhost", - "port": 27017, + "port": 5432, "user": "test_user", "password": "test_password", "database": "test_db", }, - "llm": { - "key": "test_key", - "endpoint": "http://localhost", - "model": "test_model", - "max_tokens": 8192, - "temperature": 0.7, - }, - "function_call": { - "backend": "test_backend", - "model": "test_model", - "endpoint": "http://localhost", - "api_key": "test_key", - "max_tokens": 8192, - "temperature": 0.7, - }, "security": { "half_key1": "test_key1", "half_key2": "test_key2", "half_key3": "test_key3", "jwt_key": "test_jwt_key", }, - "check": {"enable": False, "words_list": ""}, "extra": {"sql_url": "http://localhost"}, } @@ -75,55 +43,140 @@ MOCK_CONFIG_DATA: dict[str, Any] = { @pytest.fixture(autouse=True) def setup_teardown() -> Generator[None, None, None]: """测试前的准备工作和清理工作""" - # 清除单例实例 - SingletonMeta._instances.clear() + # 保存原始环境变量 + original_config = os.environ.get("CONFIG") + original_prod = os.environ.get("PROD") + # 清除环境变量 if "CONFIG" in os.environ: del os.environ["CONFIG"] if "PROD" in os.environ: del os.environ["PROD"] + yield - # 测试后的清理工作 - SingletonMeta._instances.clear() + # 测试后恢复环境变量 + if original_config is not None: + os.environ["CONFIG"] = original_config + elif "CONFIG" in os.environ: + del os.environ["CONFIG"] + + if original_prod is not None: + os.environ["PROD"] = original_prod + elif "PROD" in os.environ: + del os.environ["PROD"] -def test_init_with_default_config(mocker: MockerFixture) -> None: + +def test_init_with_default_config(mocker: MockerFixture, tmp_path: Path) -> None: """测试使用默认配置文件路径初始化""" - mocker.patch("builtins.open", mocker.mock_open(read_data=toml.dumps(MOCK_CONFIG_DATA))) - config = Config() - assert isinstance(config._config, ConfigModel) - assert config._config.deploy.mode == "local" + # 创建临时配置文件 + config_file = tmp_path / "config.toml" + config_file.write_text(toml.dumps(MOCK_CONFIG_DATA)) + + # Mock默认配置文件路径 + mocker.patch.object(Path, "__truediv__", return_value=config_file) + mocker.patch("toml.load", return_value=MOCK_CONFIG_DATA) + + config = Config.init_config() + + assert isinstance(config, Config) + assert isinstance(config, ConfigModel) + assert config.deploy.mode == "local" + assert config.deploy.data_dir == "/app/data" -def test_init_with_custom_config_path(mocker: MockerFixture) -> None: +def test_init_with_custom_config_path(mocker: MockerFixture, tmp_path: Path) -> None: """测试使用自定义配置文件路径初始化""" - custom_path = "/custom/path/config.toml" - os.environ["CONFIG"] = custom_path + # 创建临时配置文件 + config_file = tmp_path / "custom_config.toml" + config_file.write_text(toml.dumps(MOCK_CONFIG_DATA)) - mocker.patch("builtins.open", mocker.mock_open(read_data=toml.dumps(MOCK_CONFIG_DATA))) - config = Config() - assert isinstance(config._config, ConfigModel) - assert config._config.deploy.mode == "local" + # 设置自定义配置文件路径 + os.environ["CONFIG"] = str(config_file) + mock_load = mocker.patch("toml.load", return_value=MOCK_CONFIG_DATA) -def test_init_with_prod_env(mocker: MockerFixture) -> None: + config = Config.init_config() + + assert isinstance(config, Config) + assert isinstance(config, ConfigModel) + assert config.deploy.mode == "local" + mock_load.assert_called_once_with(str(config_file)) + + +def test_init_with_prod_env(mocker: MockerFixture, tmp_path: Path) -> None: """测试在PROD环境下初始化""" + # 创建临时配置文件 + config_file = tmp_path / "config.toml" + config_file.write_text(toml.dumps(MOCK_CONFIG_DATA)) + + # 设置PROD环境变量 os.environ["PROD"] = "true" + os.environ["CONFIG"] = str(config_file) - mocker.patch("builtins.open", mocker.mock_open(read_data=toml.dumps(MOCK_CONFIG_DATA))) + mocker.patch("toml.load", return_value=MOCK_CONFIG_DATA) mock_unlink = mocker.patch.object(Path, "unlink") - config = Config() - assert isinstance(config._config, ConfigModel) + + config = Config.init_config() + + assert isinstance(config, Config) + assert isinstance(config, ConfigModel) mock_unlink.assert_called_once() -def test_get_config(mocker: MockerFixture) -> None: - """测试获取配置""" - mocker.patch("builtins.open", mocker.mock_open(read_data=toml.dumps(MOCK_CONFIG_DATA))) - config = Config() - config_copy = config.get_config() - assert isinstance(config_copy, ConfigModel) - assert config_copy is not config._config # 确保返回的是深拷贝 +def test_config_immutability(mocker: MockerFixture, tmp_path: Path) -> None: + """测试配置对象的不可变性""" + # 创建临时配置文件 + config_file = tmp_path / "config.toml" + config_file.write_text(toml.dumps(MOCK_CONFIG_DATA)) + + os.environ["CONFIG"] = str(config_file) + mocker.patch("toml.load", return_value=MOCK_CONFIG_DATA) + + config = Config.init_config() + + # 验证配置是frozen的,无法修改 + with pytest.raises((ValidationError, AttributeError)): + config.deploy.mode = "cloud" # type: ignore[misc] + + +def test_config_fields(mocker: MockerFixture, tmp_path: Path) -> None: + """测试配置字段的正确性""" + # 创建临时配置文件 + config_file = tmp_path / "config.toml" + config_file.write_text(toml.dumps(MOCK_CONFIG_DATA)) + + os.environ["CONFIG"] = str(config_file) + mocker.patch("toml.load", return_value=MOCK_CONFIG_DATA) + + config = Config.init_config() + + # 验证所有字段 + assert config.deploy.mode == "local" + assert config.deploy.cookie == "domain" + assert config.deploy.data_dir == "/app/data" + + assert config.rag.rag_service == "http://localhost" + + assert config.fastapi.domain == "localhost" + + assert config.minio.endpoint == "localhost:9000" + assert config.minio.access_key == "test_key" + assert config.minio.secret_key == "test_secret" + assert config.minio.secure is False + + assert config.postgres.host == "localhost" + assert config.postgres.port == MOCK_CONFIG_DATA["postgres"]["port"] + assert config.postgres.user == "test_user" + assert config.postgres.password == "test_password" + assert config.postgres.database == "test_db" + + assert config.security.half_key1 == "test_key1" + assert config.security.half_key2 == "test_key2" + assert config.security.half_key3 == "test_key3" + assert config.security.jwt_key == "test_jwt_key" + + assert config.extra.sql_url == "http://localhost" if __name__ == "__main__": diff --git a/tests/common/test_oidc.py b/tests/common/test_oidc.py deleted file mode 100644 index 53edaf01b6722bb27b1f3919a08045cfaab751ef..0000000000000000000000000000000000000000 --- a/tests/common/test_oidc.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. - -import os -import unittest -from unittest.mock import patch, MagicMock -from http import HTTPStatus -from fastapi.exceptions import HTTPException -from apps.common.auth import get_oidc_token, get_oidc_user -from apps.common.config import config - - -class TestOidcFunctions(unittest.TestCase): - @patch('apps.auth.oidc.requests.post') - def test_get_oidc_token(self, mock_post): - mock_response = MagicMock() - mock_response.json.return_value = {"access_token": "test_access_token"} - mock_response.status_code = HTTPStatus.OK - mock_post.return_value = mock_response - - token = get_oidc_token("test_code") - - self.assertEqual(token, "test_access_token") - mock_post.assert_called_once_with( - os.getenv("OIDC_TOKEN_URL"), - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "client_id": os.getenv("OIDC_APP_ID"), - "client_secret": config["OIDC_APP_SECRET"], - "redirect_uri": os.getenv("OIDC_AUTH_CALLBACK_URL"), - "grant_type": os.getenv("OIDC_TOKEN_GRANT_TYPE"), - "code": "test_code" - }, - stream=False, - timeout=10 - ) - - @patch('apps.auth.oidc.requests.get') - def test_get_oidc_user(self, mock_get): - mock_response = MagicMock() - mock_response.json.return_value = {"sub": "test_user_sub", "phone_number": "1234567890"} - mock_response.status_code = HTTPStatus.OK - mock_get.return_value = mock_response - - user_info = get_oidc_user("test_access_token") - - self.assertEqual(user_info, {"user_sub": "test_user_sub", "organization": "openEuler"}) - mock_get.assert_called_once_with( - os.getenv("OIDC_USER_URL"), - headers={"Authorization": "test_access_token"}, - timeout=10 - ) - - @patch('apps.auth.oidc.requests.get') - def test_get_oidc_user_invalid_token(self, mock_get): - mock_response = MagicMock() - mock_response.status_code = HTTPStatus.UNAUTHORIZED - mock_response.json.return_value = {} - mock_get.side_effect = HTTPException( - status_code=HTTPStatus.UNAUTHORIZED - ) - - with self.assertRaises(HTTPException) as cm: - get_oidc_user("test_access_token") - - self.assertEqual(cm.exception.status_code, HTTPStatus.UNAUTHORIZED) - - @patch('apps.auth.oidc.requests.get') - def test_get_oidc_user_empty_token(self, mock_get): - user_info = get_oidc_user("") - - self.assertEqual(user_info, {}) - mock_get.assert_not_called() - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/common/test_security.py b/tests/common/test_security.py index bf6ea4ee92fe96ae25437d54186861059c1c5017..747ed5312fd637c1bcf456fa772290afa5744c21 100644 --- a/tests/common/test_security.py +++ b/tests/common/test_security.py @@ -10,6 +10,7 @@ from apps.common.security import Security def test_encrypt() -> None: + """测试加密功能""" plaintext = "test_plaintext" encrypted_plaintext, secret_dict = Security.encrypt(plaintext) assert isinstance(encrypted_plaintext, str) @@ -17,6 +18,7 @@ def test_encrypt() -> None: def test_decrypt(mocker: MockerFixture) -> None: + """测试解密功能""" encrypted_plaintext = "encrypted_plaintext" secret_dict = { "encrypted_work_key": "encrypted_work_key", diff --git a/tests/routers/test_auth.py b/tests/routers/test_auth.py index 6ab05d0e8d10dae9637019a4d58e2f9ce68c190d..c89c62316e82549563dc9c690fda8c313634c10b 100644 --- a/tests/routers/test_auth.py +++ b/tests/routers/test_auth.py @@ -1,90 +1,179 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. import unittest -from unittest.mock import patch, MagicMock +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +from fastapi import FastAPI, status from fastapi.testclient import TestClient -from jwt import encode - -from apps.routers.auth import admin_router - -access_token = encode({"sub": "user_id"}, "secret_key", algorithm="HS256") - - -class TestAuthorizeRouter(unittest.TestCase): - - @patch('apps.routers.authorize.get_oidc_token') - @patch('apps.routers.authorize.get_oidc_user') - @patch('apps.routers.authorize.RedisConnectionPool.get_redis_connection') - @patch('apps.routers.authorize.UserManager.update_userinfo_by_user_sub') - def test_oidc_login_success(self, mock_update_userinfo, mock_get_redis_connection, mock_get_oidc_user, - mock_get_oidc_token): - client = TestClient(admin_router) - mock_update_userinfo.return_value = None - mock_get_oidc_token.return_value = "access_token" - mock_get_oidc_user.return_value = {'user_sub': '123'} - mock_redis = MagicMock() - mock_get_redis_connection.return_value = mock_redis - mock_redis.setex.return_value = None - response = client.get("/authorize/login?code=123") - assert response.status_code == 200 - assert mock_update_userinfo.call_count == 1 - assert mock_redis.setex.call_count == 2 - - @patch('apps.routers.authorize.RedisConnectionPool.get_redis_connection') - def test_oidc_login_fail(self, mock_get_redis_connection): - client = TestClient(admin_router) - mock_redis = MagicMock() - mock_get_redis_connection.return_value = mock_redis - mock_redis.setex.return_value = None - response = client.get("/authorize/login?code=123") - assert response.status_code == 200 +from apps.routers.auth import router + + +class TestAuthRouter(unittest.TestCase): + """测试 auth 路由""" + + def setUp(self) -> None: + """设置测试客户端""" + app = FastAPI() + app.include_router(router) + self.client = TestClient(app) + + @patch("apps.routers.auth._check_user_group") + @patch("apps.routers.auth.UserManager.create_or_update_on_login", new_callable=AsyncMock) + @patch("apps.routers.auth.PersonalTokenManager.update_personal_token", new_callable=AsyncMock) + def test_linux_login_success( + self, mock_update_token: Any, mock_create_user: Any, mock_check_group: Any, + ) -> None: + """测试 Linux 用户登录成功""" + mock_check_group.return_value = True + mock_create_user.return_value = None + mock_update_token.return_value = "test_token_123" + + response = self.client.get("/api/auth/login", headers={"X-Remote-User": "testuser"}) + + assert response.status_code == status.HTTP_200_OK assert response.json() == { - "code": 400, - "err_msg": "OIDC login failed." + "code": status.HTTP_200_OK, + "message": "登录成功", + "result": {"token": "test_token_123"}, } - assert mock_redis.setex.call_count == 0 - - @patch('apps.routers.authorize.RedisConnectionPool.get_redis_connection') - def test_logout(self, mock_get_redis_connection): - client = TestClient(admin_router) - mock_redis = MagicMock() - mock_get_redis_connection.return_value = mock_redis - mock_redis.delete.return_value = None - response = client.get("/authorize/logout", cookies={"_t": access_token}) - assert response.status_code == 200 + mock_check_group.assert_called_once_with("testuser") + mock_create_user.assert_called_once_with("testuser", "testuser") + mock_update_token.assert_called_once_with("testuser") + + def test_linux_login_no_header(self) -> None: + """测试登录时缺少 X-Remote-User header""" + response = self.client.get("/api/auth/login") + + assert response.status_code == status.HTTP_401_UNAUTHORIZED assert response.json() == { - "code": 200, - "message": "success", - "result": {} + "code": status.HTTP_401_UNAUTHORIZED, + "message": "无法获取用户信息", + "result": {}, } - assert mock_redis.delete.call_count == 2 - - @patch('apps.routers.authorize.UserManager.get_revision_number_by_user_sub') - def test_userinfo(self, mock_get_revision_number_by_user_sub): - client = TestClient(admin_router) - mock_get_revision_number_by_user_sub.return_value = "123" - response = client.get("/authorize/user", cookies={"_t": "access_token"}) - assert response.status_code == 200 + + @patch("apps.routers.auth._check_user_group") + def test_linux_login_user_not_in_group(self, mock_check_group: Any) -> None: + """测试用户不在允许的用户组中""" + mock_check_group.return_value = False + + response = self.client.get("/api/auth/login", headers={"X-Remote-User": "testuser"}) + + assert response.status_code == status.HTTP_403_FORBIDDEN assert response.json() == { - "code": 200, - "message": "success", - "result": {"user_sub": "123", "organization": "example", "revision_number": "123"} + "code": status.HTTP_403_FORBIDDEN, + "message": "您没有权限访问此系统", + "result": {}, } + mock_check_group.assert_called_once_with("testuser") + + @patch("apps.routers.auth._check_user_group") + @patch("apps.routers.auth.UserManager.create_or_update_on_login", new_callable=AsyncMock) + @patch("apps.routers.auth.PersonalTokenManager.update_personal_token", new_callable=AsyncMock) + def test_linux_login_token_creation_failed( + self, mock_update_token: Any, mock_create_user: Any, mock_check_group: Any, + ) -> None: + """测试创建 PersonalToken 失败""" + mock_check_group.return_value = True + mock_create_user.return_value = None + mock_update_token.return_value = None + + response = self.client.get("/api/auth/login", headers={"X-Remote-User": "testuser"}) - @patch('apps.routers.authorize.UserManager.update_userinfo_by_user_sub') - def test_update_revision_number(self, mock_update_userinfo_by_user_sub): - client = TestClient(admin_router) - mock_update_userinfo_by_user_sub.return_value = None - response = client.post("/authorize/update_revision_number", json={"revision_num": "123"}, - cookies={"_t": "access_token"}) - assert response.status_code == 200 + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert response.json() == { - "code": 200, - "message": "success", - "result": {"user_sub": "123", "organization": "example", "revision_number": "123"} + "code": status.HTTP_500_INTERNAL_SERVER_ERROR, + "message": "创建Token失败", + "result": {}, } + @patch("apps.routers.auth.is_admin") + @patch("apps.routers.auth.grp.getgrnam") + def test_check_user_group_admin(self, mock_getgrnam: Any, mock_is_admin: Any) -> None: + """测试 _check_user_group - 管理员用户""" + from apps.routers.auth import _check_user_group # noqa: PLC0415 + + mock_is_admin.return_value = True + + result = _check_user_group("root") + + assert result is True + mock_is_admin.assert_called_once_with("root") + mock_getgrnam.assert_not_called() + + @patch("apps.routers.auth.is_admin") + @patch("apps.routers.auth.grp.getgrnam") + def test_check_user_group_oi_member(self, mock_getgrnam: Any, mock_is_admin: Any) -> None: + """测试 _check_user_group - oi 组成员""" + from apps.routers.auth import _check_user_group # noqa: PLC0415 + + mock_is_admin.return_value = False + mock_group = MagicMock() + mock_group.gr_mem = ["testuser", "otheruser"] + mock_getgrnam.return_value = mock_group + + result = _check_user_group("testuser") + + assert result is True + mock_is_admin.assert_called_once_with("testuser") + mock_getgrnam.assert_called_once_with("oi") + + @patch("apps.routers.auth.is_admin") + @patch("apps.routers.auth.grp.getgrnam") + def test_check_user_group_not_authorized(self, mock_getgrnam: Any, mock_is_admin: Any) -> None: + """测试 _check_user_group - 未授权用户""" + from apps.routers.auth import _check_user_group # noqa: PLC0415 + + mock_is_admin.return_value = False + mock_group = MagicMock() + mock_group.gr_mem = ["otheruser"] + mock_getgrnam.return_value = mock_group + + result = _check_user_group("testuser") + + assert result is False + + @patch("apps.routers.auth.is_admin") + @patch("apps.routers.auth.grp.getgrnam") + def test_check_user_group_oi_not_exists(self, mock_getgrnam: Any, mock_is_admin: Any) -> None: + """测试 _check_user_group - oi 组不存在""" + from apps.routers.auth import _check_user_group # noqa: PLC0415 + + mock_is_admin.return_value = False + mock_getgrnam.side_effect = KeyError("oi") + + result = _check_user_group("testuser") + + assert result is False + + @patch("apps.routers.auth.verify_personal_token") + @patch("apps.routers.auth.PersonalTokenManager.update_personal_token", new_callable=AsyncMock) + def test_change_personal_token_success(self, mock_update_token: Any) -> None: + """测试更新 API 密钥成功""" + mock_update_token.return_value = "new_api_key_456" + + response = self.client.post("/api/auth/key") + + assert response.status_code == status.HTTP_200_OK + response_data = response.json() + assert response_data["code"] == status.HTTP_200_OK + assert response_data["message"] == "success" + assert response_data["result"]["apiKey"] == "new_api_key_456" + + @patch("apps.routers.auth.verify_personal_token") + @patch("apps.routers.auth.PersonalTokenManager.update_personal_token", new_callable=AsyncMock) + def test_change_personal_token_failed(self, mock_update_token: Any) -> None: + """测试更新 API 密钥失败""" + mock_update_token.return_value = None + + response = self.client.post("/api/auth/key") + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + response_data = response.json() + assert response_data["code"] == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response_data["message"] == "failed to update personal token" + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()