diff --git a/hwcompatible/client.py b/hwcompatible/client.py index 49e823f6f4bd400e57fc250c56a8fe8eb5250cb7..c94d8a96db1c7bb48ec5bc608d789974ce272743 100755 --- a/hwcompatible/client.py +++ b/hwcompatible/client.py @@ -64,7 +64,6 @@ class Client: 'Accept': 'text/plain' } try: - # print(url) req = Request(url, data=data, headers=headers) res = urlopen(req) if res.code != 200: @@ -81,3 +80,4 @@ if __name__ == '__main__': import sys file_name = sys.argv[1] c.upload(file_name) + diff --git a/hwcompatible/command.py b/hwcompatible/command.py index ce78e291db0f2ad234ef4a8484c3fe09e7f0ca79..3eefafb94e0de0612cc8f77766c42d5bc70813ba 100755 --- a/hwcompatible/command.py +++ b/hwcompatible/command.py @@ -4,7 +4,7 @@ # Copyright (c) 2020 Huawei Technologies Co., Ltd. # oec-hardware is licensed under the Mulan PSL v2. # You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: +# You may ob tain a copy of Mulan PSL v2 at: # http://license.coscl.org.cn/MulanPSL2 # THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR @@ -80,7 +80,6 @@ class Command: if self.errors and len(self.errors) > 0: self.print_errors() - # raise CertCommandError(self, "has output on stderr") def run_quiet(self): """quiet after running command""" @@ -95,7 +94,7 @@ class Command: def print_output(self): """ - 结果显示 + Result display :return: """ if self.output: @@ -106,7 +105,7 @@ class Command: def print_errors(self): """ - 页面显示错误信息 + Print error messages on model :return: """ if self.errors: @@ -117,7 +116,7 @@ class Command: def pid(self): """ - 获取管道pid值 + Get pipe pid :return: """ if self.pipe: @@ -125,7 +124,7 @@ class Command: def readline(self): """ - 按行读取输出信息 + Read line to get messages :return """ if self.pipe: @@ -133,7 +132,7 @@ class Command: def read(self): """ - 执行命令,并读取结果 + Execute command and get results :return: """ self.pipe = subprocess.Popen(self.command, shell=True, @@ -173,28 +172,30 @@ class Command: # otherwise raise CertCommandError(self, "no match for regular " - "expression %s" % self.regex) + "expression %s" % self.regex) def _get_str_multi_line(self, result, pattern, return_list): - if self.output: - for line in self.output: - if self.regex_group: - match = pattern.match(line) - if match: - if self.regex_group: - if return_list: - result.append(match.group(self.regex_group)) - else: - return match.group(self.regex_group) + if self.output == None: + return None + + for line in self.output: + if self.regex_group: + match = pattern.match(line) + if match and self.regex_group: + if return_list: + result.append(match.group(self.regex_group)) + else: + return match.group(self.regex_group) + else: + # otherwise, return the matching line + match = pattern.search(line) + if match == None: + continue + if return_list: + result.append(match.group()) else: - # otherwise, return the matching line - match = pattern.search(line) - if match: - if return_list: - result.append(match.group()) - else: - return match.group() - return result + return match.group() + return result def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False): @@ -219,23 +220,19 @@ class Command: def get_str(self, regex=None, regex_group=None, single_line=True, return_list=False, ignore_errors=False): - """获取命令执行结果中匹配的值""" + """Get matching value in results""" result = self._get_str(regex, regex_group, single_line, return_list) if not ignore_errors: if self.returncode != 0: self.print_output() self.print_errors() raise CertCommandError(self, "returned %d" % self.returncode) - - # if self.errors and len(self.errors) > 0: - # raise CertCommandError(self, "has output on stderr") - return result class CertCommandError(Exception): """ - Cert command error handling + Cert command error handling """ def __init__(self, command, message): Exception.__init__(self) @@ -252,3 +249,4 @@ class CertCommandError(Exception): def _set_message(self, value): self.__message = value message = property(_get_message, _set_message) + diff --git a/hwcompatible/commandUI.py b/hwcompatible/commandUI.py index 877e0ae28d9425d98e8f6b6069f779a2707445a9..d2cfcd3993dc10d247a3f31ef5c00e0efdc2ba49 100755 --- a/hwcompatible/commandUI.py +++ b/hwcompatible/commandUI.py @@ -14,12 +14,13 @@ import sys import readline - +from .constants import SAMEASYES, YES, SAMEASNO, NO class CommandUI: """ Command user interface selection """ + def __init__(self, echoResponses=False): self.echo = echoResponses @@ -86,10 +87,6 @@ class CommandUI: :param question: :return: """ - YES = "y" - SAMEASYES = ["y", "yes"] - NO = "n" - SAMEASNO = ["n", "no"] while True: reply = self.prompt(question, (YES, NO)) if reply.lower() in SAMEASYES: @@ -127,3 +124,4 @@ class CommandUI: "following: %s" % " | ".join(choices)) finally: readline.set_startup_hook() + diff --git a/hwcompatible/compatibility.py b/hwcompatible/compatibility.py index a4c27aa0dd441b76770d5177ad0220e0dcb4ecc4..49414b316d40a1097c984946a8e52c9f81504837 100755 --- a/hwcompatible/compatibility.py +++ b/hwcompatible/compatibility.py @@ -27,7 +27,7 @@ from .commandUI import CommandUI from .job import Job from .reboot import Reboot from .client import Client - +from .constants import * class EulerCertification(): """ @@ -59,9 +59,6 @@ class EulerCertification(): oec_devices = certdevice.get_devices() self.devices = DeviceDocument(CertEnv.devicefile, oec_devices) self.devices.save() - - # test_factory format example: [{"name":"nvme", "device":device, - # "run":True, "status":"PASS", "reboot":False}] test_factory = self.get_tests(oec_devices) self.update_factory(test_factory) if not self.choose_tests(): @@ -216,8 +213,6 @@ class EulerCertification(): :param devices: :return: """ - nodevice = ["cpufreq", "memory", "clock", "profiler", "system", - "stress", "kdump", "perf", "acpi", "watchdog"] sort_devices = self.sort_tests(devices) empty_device = Device() test_factory = list() @@ -233,19 +228,19 @@ class EulerCertification(): if sort_devices.get(testname): for device in sort_devices[testname]: test = dict() - test["name"] = testname - test["device"] = device - test["run"] = True - test["status"] = "NotRun" - test["reboot"] = False + test[NAME] = testname + test[DEVICE] = device + test[RUN] = True + test[STATUS] = NOTRUN + test[REBOOT] = False test_factory.append(test) - elif testname in nodevice: + elif testname in NODEVICE: test = dict() - test["name"] = testname - test["device"] = empty_device - test["run"] = True - test["status"] = "NotRun" - test["reboot"] = False + test[NAME] = testname + test[DEVICE] = empty_device + test[RUN] = True + test[STATUS] = NOTRUN + test[REBOOT] = False test_factory.append(test) return test_factory @@ -258,28 +253,28 @@ class EulerCertification(): sort_devices = dict() empty_device = Device() for device in devices: - if device.get_property("SUBSYSTEM") == "usb" and \ + if device.get_property("SUBSYSTEM") == USB and \ device.get_property("ID_VENDOR_FROM_DATABASE") == \ "Linux Foundation" and \ ("2." in device.get_property("ID_MODEL_FROM_DATABASE") or "3." in device.get_property("ID_MODEL_FROM_DATABASE")): - sort_devices["usb"] = [empty_device] + sort_devices[USB] = [empty_device] continue if device.get_property("PCI_CLASS") == "30000" or \ device.get_property("PCI_CLASS") == "38000": - sort_devices["video"] = [device] + sort_devices[VIDEO] = [device] continue - if (device.get_property("DEVTYPE") == "disk" and + if (device.get_property("DEVTYPE") == DISK and not device.get_property("ID_TYPE")) or \ - device.get_property("ID_TYPE") == "disk": - if "nvme" in device.get_property("DEVPATH"): - sort_devices["disk"] = [empty_device] + device.get_property("ID_TYPE") == DISK: + if NVME in device.get_property("DEVPATH"): + sort_devices[DISK] = [empty_device] try: - sort_devices["nvme"].extend([device]) + sort_devices[NVME].extend([device]) except KeyError: - sort_devices["nvme"] = [device] + sort_devices[NVME] = [device] elif "/host" in device.get_property("DEVPATH"): - sort_devices["disk"] = [empty_device] + sort_devices[DISK] = [empty_device] if device.get_property("SUBSYSTEM") == "net" and \ device.get_property("INTERFACE"): interface = device.get_property("INTERFACE") @@ -293,35 +288,33 @@ class EulerCertification(): sort_devices["infiniband"].extend([device]) except KeyError: sort_devices["infiniband"] = [device] - elif interface in line and "ethernet" in line: + elif interface in line and ETHERNET in line: try: - sort_devices["ethernet"].extend([device]) + sort_devices[ETHERNET].extend([device]) except KeyError: - sort_devices["ethernet"] = [device] - elif interface in line and "wifi" in line: + sort_devices[ETHERNET] = [device] + elif interface in line and WIFI in line: try: - sort_devices["wlan"].extend([device]) + sort_devices[WLAN].extend([device]) except KeyError: - sort_devices["wlan"] = [device] + sort_devices[WLAN] = [device] else: break continue if device.get_property("ID_CDROM") == "1": - types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", - "BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"] - for dev_type in types: + for dev_type in CDTYPES: if device.get_property("ID_CDROM_" + dev_type) == "1": try: - sort_devices["cdrom"].extend([device]) + sort_devices[CDROM].extend([device]) except KeyError: - sort_devices["cdrom"] = [device] + sort_devices[CDROM] = [device] break - if device.get_property("SUBSYSTEM") == "ipmi": - sort_devices["ipmi"] = [empty_device] + if device.get_property("SUBSYSTEM") == IPMI: + sort_devices[IPMI] = [empty_device] try: Command("dmidecode").get_str("IPMI Device Information", single_line=False) - sort_devices["ipmi"] = [empty_device] + sort_devices[IPMI] = [empty_device] except: pass @@ -334,10 +327,10 @@ class EulerCertification(): """ while True: for test in self.test_factory: - if test["name"] == "system": - test["run"] = True - if test["status"] == "PASS": - test["status"] = "Force" + if test[NAME] == SYSTEM: + test[RUN] = True + if test[STATUS] == PASS: + test[STATUS] = FORCE os.system("clear") print("Select tests to run:") @@ -350,11 +343,11 @@ class EulerCertification(): return False if reply in ["n", "none"]: for test in self.test_factory: - test["run"] = False + test[RUN] = False continue if reply in ["a", "all"]: for test in self.test_factory: - test["run"] = True + test[RUN] = True continue num_lst = reply.split(" ") @@ -365,8 +358,8 @@ class EulerCertification(): continue if 0 < num <= len(self.test_factory): - self.test_factory[num - 1]["run"] = not \ - self.test_factory[num - 1]["run"] + self.test_factory[num - 1][RUN] = not \ + self.test_factory[num - 1][RUN] continue def show_tests(self): @@ -375,29 +368,29 @@ class EulerCertification(): :return: """ print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) - + "Status".ljust(8) + "Class".ljust(14) + "Device\033[0m") + + STATUS.ljust(8) + "Class".ljust(14) + "Device\033[0m") num = 0 for test in self.test_factory: - name = test["name"] - if name == "system": - test["run"] = True - if test["status"] == "PASS": - test["status"] = "Force" - - status = test["status"] - device = test["device"].get_name() + name = test[NAME] + if name == SYSTEM: + test[RUN] = True + if test[STATUS] == PASS: + test[STATUS] = FORCE + + status = test[STATUS] + device = test[DEVICE].get_name() run = "no" - if test["run"] is True: + if test[RUN] is True: run = "yes" num = num + 1 - if status == "PASS": + if status == PASS: print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" + name.ljust(14) + "%s" % device) elif status == "FAIL": print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" + name.ljust(14) + "%s" % device) - elif status == "Force": + elif status == FORCE: print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" + name.ljust(14) + "%s" % device) else: @@ -410,10 +403,10 @@ class EulerCertification(): :return: """ for test in self.test_factory: - if test["status"] == "PASS": - test["run"] = False + if test[STATUS] == PASS: + test[RUN] = False else: - test["run"] = True + test[RUN] = True os.system("clear") print("These tests are recommended to " "complete the compatibility test:") @@ -438,7 +431,7 @@ class EulerCertification(): if len(self.test_factory) == 0: return False for test in self.test_factory: - if test["status"] != "PASS": + if test[STATUS] != PASS: return False return True @@ -454,14 +447,14 @@ class EulerCertification(): for test in self.test_factory: if not self.search_factory(test, test_factory): self.test_factory.remove(test) - print("delete %s test %s" % (test["name"], - test["device"].get_name())) + print("delete %s test %s" % (test[NAME], + test[DEVICE].get_name())) for test in test_factory: if not self.search_factory(test, self.test_factory): self.test_factory.append(test) - print("add %s test %s" % (test["name"], - test["device"].get_name())) - self.test_factory.sort(key=lambda k: k["name"]) + print("add %s test %s" % (test[NAME], + test[DEVICE].get_name())) + self.test_factory.sort(key=lambda k: k[NAME]) FactoryDocument(CertEnv.factoryfile, self.test_factory).save() def search_factory(self, obj_test, test_factory): @@ -472,7 +465,8 @@ class EulerCertification(): :return: """ for test in test_factory: - if test["name"] == obj_test["name"] and \ - test["device"].path == obj_test["device"].path: + if test[NAME] == obj_test[NAME] and \ + test[DEVICE].path == obj_test[DEVICE].path: return True return False + diff --git a/hwcompatible/constants.py b/hwcompatible/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..3fd4842a7c8a9605623cf09b98df49283366ece5 --- /dev/null +++ b/hwcompatible/constants.py @@ -0,0 +1,56 @@ +# -*- encoding=utf-8 -*- +""" +# ********************************************************************************** +# Copyright (c) Huawei Technologies Co., Ltd. 2021-2021. All rights reserved. +# [oecp] is licensed under the Mulan PSL v1. +# You can use this software according to the terms and conditions of the Mulan PSL v1. +# You may obtain a copy of Mulan PSL v1 at: +# http://license.coscl.org.cn/MulanPSL +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v1 for more details. +# Author: meitingli +# Create: 2021-03-01 +# Description: save constants objects +# ********************************************************************************** +""" + +YES = "y" +SAMEASYES = ["y", "yes"] +NO = "n" +SAMEASNO = ["n", "no"] +TEST = "test" +PASS = "PASS" +FAIL = "FAIL" +FORCE = "Force" +RUN = "run" +NOTRUN = "NotRun" + +OS = "OS" +KERNEL = "kernel" +ID = "ID" +PRODUCTURL = "Product URL" +SERVER = "server" +NAME = "name" +DEVICE = "device" +STATUS = "status" +REBOOT= "reboot" +SYSTEM = "system" + +NODEVICE = ["cpufreq", "memory", "clock", "profiler", "system", + "stress", "kdump", "perf", "acpi", "watchdog"] +CDTYPES = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", + "BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"] +SUBSYSTEM = "SUBSYSTEM" +PCI_CLASS = "PCI_CLASS" +USB = "usb" +VIDEO = "video" +DISK = "disk" +NVME = "nvme" +ETHERNET = "ethernet" +WIFI = "wifi" +WLAN = "wlan" +CDROM = "cdrom" +IPMI = "ipmi" + diff --git a/hwcompatible/document.py b/hwcompatible/document.py index 25a4514fa85c40e9225cd20b61a40bf3518d7cc1..fd24315d3939a574e7b0bfef75f034bab14c100d 100755 --- a/hwcompatible/document.py +++ b/hwcompatible/document.py @@ -20,12 +20,13 @@ from .command import Command from .device import Device from .sysinfo import SysInfo from .env import CertEnv - +from .constants import * class Document(): """ Read and write documents """ + def __init__(self, filename, document=''): self.document = document self.filename = filename @@ -60,8 +61,8 @@ class CertDocument(Document): """ get hardware and release information """ + def __init__(self, filename, document=''): - # super(CertDocument, self).__init__(filename, document) self.document = dict() self.filename = filename if not document: @@ -72,7 +73,6 @@ class CertDocument(Document): def new(self): """ new document object - :return: """ try: pipe = Command("/usr/sbin/dmidecode -t 1") @@ -82,11 +82,12 @@ class CertDocument(Document): line = pipe.readline() if line: property_right = line.split(":", 1) - if len(property_right) == 2: - key = property_right[0].strip() - value = property_right[1].strip() - if key in ["Manufacturer", "Product Name", "Version"]: - self.document[key] = value + if len(property_right) != 2: + continue + key = property_right[0].strip() + value = property_right[1].strip() + if key in ["Manufacturer", "Product Name", "Version"]: + self.document[key] = value else: break except Exception as concrete_error: @@ -94,11 +95,13 @@ class CertDocument(Document): print(concrete_error) sysinfo = SysInfo(CertEnv.releasefile) - self.document["OS"] = sysinfo.product + " " + sysinfo.get_version() - self.document["kernel"] = sysinfo.kernel - self.document["ID"] = CommandUI().prompt("Please provide your Compatibility Test ID:") - self.document["Product URL"] = CommandUI().prompt("Please provide your Product URL:") - self.document["server"] = CommandUI().prompt("Please provide the Compatibility Test " + self.document[OS] = sysinfo.product + " " + sysinfo.get_version() + self.document[KERNEL] = sysinfo.kernel + self.document[ID] = CommandUI().prompt( + "Please provide your Compatibility Test ID:") + self.document[PRODUCTURL] = CommandUI().prompt( + "Please provide your Product URL:") + self.document[SERVER] = CommandUI().prompt("Please provide the Compatibility Test " "Server (Hostname or Ipaddr):") def get_hardware(self): @@ -106,45 +109,45 @@ class CertDocument(Document): Get hardware information """ return self.document["Manufacturer"] + " " + self.document["Product Name"] + " " \ - + self.document["Version"] + + self.document["Version"] def get_os(self): """ Get os information """ - return self.document["OS"] + return self.document[OS] def get_server(self): """ Get server information """ - return self.document["server"] + return self.document[SERVER] def get_url(self): """ Get url """ - return self.document["Product URL"] + return self.document[PRODUCTURL] def get_certify(self): """ Get certify """ - return self.document["ID"] + return self.document[ID] def get_kernel(self): """ Get kernel information """ - return self.document["kernel"] + return self.document[KERNEL] class DeviceDocument(Document): """ get device document """ + def __init__(self, filename, devices=''): - # super(DeviceDocument, self).__init__(filename, devices) self.filename = filename self.document = list() if not devices: @@ -160,7 +163,6 @@ class FactoryDocument(Document): """ def __init__(self, filename, factory=''): - # super(FactoryDocument, self).__init__(filename, factory) self.document = list() self.filename = filename if not factory: @@ -168,10 +170,10 @@ class FactoryDocument(Document): else: for member in factory: element = dict() - element["name"] = member["name"] - element["device"] = member["device"].properties - element["run"] = member["run"] - element["status"] = member["status"] + element[NAME] = member[NAME] + element[DEVICE] = member[DEVICE].properties + element[RUN] = member[RUN] + element[STATUS] = member[STATUS] self.document.append(element) def get_factory(self): @@ -182,11 +184,11 @@ class FactoryDocument(Document): factory = list() for element in self.document: test = dict() - device = Device(element["device"]) - test["device"] = device - test["name"] = element["name"] - test["run"] = element["run"] - test["status"] = element["status"] + device = Device(element[DEVICE]) + test[DEVICE] = device + test[NAME] = element[NAME] + test[RUN] = element[RUN] + test[STATUS] = element[STATUS] factory.append(test) return factory @@ -195,6 +197,7 @@ class ConfigFile: """ Get parameters from configuration file """ + def __init__(self, filename): self.filename = filename self.parameters = dict() @@ -277,3 +280,4 @@ class ConfigFile: for line in self.config: fp_info.write(line) fp_info.close() + diff --git a/hwcompatible/job.py b/hwcompatible/job.py index 5630b7048a9592feb14c4d31d42d0302251bb372..0a69585e2966ba4650dd3504aae290fc4a82f0fe 100755 --- a/hwcompatible/job.py +++ b/hwcompatible/job.py @@ -26,6 +26,7 @@ from .command import Command, CertCommandError from .commandUI import CommandUI from .log import Logger from .reboot import Reboot +from .constants import * class Job(): @@ -88,7 +89,7 @@ class Job(): except ImportError: ct = type if isinstance(test_class, ct) and issubclass(test_class, Test): - if "test" not in dir(test_class): + if TEST not in dir(test_class): continue if (subtests_filter and not subtests_filter in dir(test_class)): continue @@ -111,18 +112,18 @@ class Job(): self.test_suite = [] for test in self.test_factory: if test["run"]: - testclass = self.discover(test["name"], subtests_filter) + testclass = self.discover(test[NAME], subtests_filter) if testclass: testcase = dict() - testcase["test"] = testclass - testcase["name"] = test["name"] - testcase["device"] = test["device"] - testcase["status"] = "FAIL" + testcase[TEST] = testclass + testcase[NAME] = test[NAME] + testcase[DEVICE] = test[DEVICE] + testcase[STATUS] = FAIL self.test_suite.append(testcase) else: if not subtests_filter: - test["status"] = "FAIL" - print("not found %s" % test["name"]) + test[STATUS] = FAIL + print("not found %s" % test[NAME]) if not len(self.test_suite): print("No test found") @@ -134,7 +135,7 @@ class Job(): """ required_rpms = [] for tests in self.test_suite: - for pkg in tests["test"].requirements: + for pkg in tests[TEST].requirements: try: Command("rpm -q " + pkg).run_quiet() except CertCommandError: @@ -160,25 +161,25 @@ class Job(): :param subtests_filter: :return: """ - name = testcase["name"] - if testcase["device"].get_name(): - name = testcase["name"] + "-" + testcase["device"].get_name() + name = testcase[NAME] + if testcase[DEVICE].get_name(): + name = testcase[NAME] + "-" + testcase[DEVICE].get_name() logname = name + ".log" reboot = None test = None logger = None try: - test = testcase["test"] + test = testcase[TEST] logger = Logger(logname, self.job_id, sys.stdout, sys.stderr) logger.start() if subtests_filter: return_code = getattr(test, subtests_filter)() else: print("---- start to run test %s ----" % name) - args = argparse.Namespace(device=testcase["device"], logdir=logger.log.dir) + args = argparse.Namespace(device=testcase[DEVICE], logdir=logger.log.dir) test.setup(args) if test.reboot: - reboot = Reboot(testcase["name"], self, test.rebootup) + reboot = Reboot(testcase[NAME], self, test.rebootup) return_code = False if reboot.setup(): return_code = test.test() @@ -206,12 +207,12 @@ class Job(): print("No test to run.") return - self.test_suite.sort(key=lambda k: k["test"].pri) + self.test_suite.sort(key=lambda k: k[TEST].pri) for testcase in self.test_suite: if self._run_test(testcase, subtests_filter): - testcase["status"] = "PASS" + testcase[STATUS] = PASS else: - testcase["status"] = "FAIL" + testcase[STATUS] = FAIL def run(self): """ @@ -237,10 +238,10 @@ class Job(): print("------------- Summary -------------") for test in self.test_factory: if test["run"]: - name = test["name"] - if test["device"].get_name(): - name = test["name"] + "-" + test["device"].get_name() - if test["status"] == "PASS": + name = test[NAME] + if test[DEVICE].get_name(): + name = test[NAME] + "-" + test[DEVICE].get_name() + if test[STATUS] == PASS: print(name.ljust(33) + "\033[0;32mPASS\033[0m") else: print(name.ljust(33) + "\033[0;31mFAIL\033[0m") @@ -253,6 +254,7 @@ class Job(): """ for test in self.test_factory: for testcase in self.test_suite: - if test["name"] == testcase["name"] and test["device"].path == \ - testcase["device"].path: - test["status"] = testcase["status"] + if test[NAME] == testcase[NAME] and test[DEVICE].path == \ + testcase[DEVICE].path: + test[STATUS] = testcase[STATUS] + diff --git a/hwcompatible/reboot.py b/hwcompatible/reboot.py index 4f733765a5c6a7cb011d04732b99b6b110146b5b..4705d36b3b937e319b4f7ebf4a5bedd3c4b14f3a 100755 --- a/hwcompatible/reboot.py +++ b/hwcompatible/reboot.py @@ -19,12 +19,14 @@ import datetime from .document import Document, FactoryDocument from .env import CertEnv from .command import Command, CertCommandError +from .constants import * class Reboot: """ Special for restart tasks, so that the test can be continued after the machine is restarted """ + def __init__(self, testname, job, rebootup): self.testname = testname self.rebootup = rebootup @@ -40,8 +42,8 @@ class Reboot: return for test in self.job.test_factory: - if test["run"] and self.testname == test["name"]: - test["reboot"] = False + if test[RUN] and self.testname == test[NAME]: + test[REBOOT] = False Command("rm -rf %s" % CertEnv.rebootfile).run(ignore_errors=True) Command("systemctl disable oech").run(ignore_errors=True) @@ -57,8 +59,8 @@ class Reboot: self.job.save_result() for test in self.job.test_factory: - if test["run"] and self.testname == test["name"]: - test["reboot"] = True + if test[RUN] and self.testname == test[NAME]: + test[REBOOT] = True test["status"] = "FAIL" if not FactoryDocument(CertEnv.factoryfile, self.job.test_factory).save(): print("Error: save testfactory doc fail before reboot.") @@ -66,7 +68,7 @@ class Reboot: self.reboot["job_id"] = self.job.job_id self.reboot["time"] = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - self.reboot["test"] = self.testname + self.reboot[TEST] = self.testname self.reboot["rebootup"] = self.rebootup if not Document(CertEnv.rebootfile, self.reboot).save(): print("Error: save reboot doc fail.") @@ -92,11 +94,12 @@ class Reboot: return False try: - self.testname = doc.document["test"] + self.testname = doc.document[TEST] self.reboot = doc.document self.job.job_id = self.reboot["job_id"] self.job.subtests_filter = self.reboot["rebootup"] - time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S") + time_reboot = datetime.datetime.strptime( + self.reboot["time"], "%Y%m%d%H%M%S") except Exception: print("Error: reboot file format not as expect.") return False @@ -104,9 +107,11 @@ class Reboot: time_now = datetime.datetime.now() time_delta = (time_now - time_reboot).seconds cmd = Command("last reboot -s '%s seconds ago'" % time_delta) - reboot_list = cmd.get_str("^reboot .*$", single_line=False, return_list=True) + reboot_list = cmd.get_str( + "^reboot .*$", single_line=False, return_list=True) if len(reboot_list) != 1: print("Errot:reboot times check fail.") return False return True + diff --git a/hwcompatible/test.py b/hwcompatible/test.py index 60e45bcecdb1bfed3d6738f942ffd18c6f7bf967..62354a5ce5d926d3375dd5f4b26e50d0bd6cb3b4 100755 --- a/hwcompatible/test.py +++ b/hwcompatible/test.py @@ -11,9 +11,7 @@ # PURPOSE. # See the Mulan PSL v2 for more details. # Create: 2020-04-01 - -"""Test set template""" - +# Desc: Test template class Test: """