[NFC] Fix whitespace issues in run_conformance.py (#1491)

Fix whitespace issues and remove superfluous parens in the
run_conformance.py script.  This addresses 288 out of the 415
issues reported by pylint.

Signed-off-by: Stuart Brady <stuart.brady@arm.com>
This commit is contained in:
Stuart Brady
2022-09-07 17:28:29 +01:00
committed by GitHub
parent 388944c01c
commit fec9d9a238

View File

@@ -8,7 +8,15 @@
#// #//
#******************************************************************/ #******************************************************************/
import os, re, sys, subprocess, time, commands, tempfile, math, string import os
import re
import sys
import subprocess
import time
import commands
import tempfile
import math
import string
DEBUG = 0 DEBUG = 0
@@ -19,6 +27,7 @@ process_pid = 0
# to the screen while the tests are running. # to the screen while the tests are running.
seconds_between_status_updates = 60 * 60 * 24 * 7 # effectively never seconds_between_status_updates = 60 * 60 * 24 * 7 # effectively never
# Help info # Help info
def write_help_info(): def write_help_info():
print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]") print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]")
@@ -33,16 +42,18 @@ def write_help_info() :
def get_time(): def get_time():
return time.strftime("%d-%b %H:%M:%S", time.localtime()) return time.strftime("%d-%b %H:%M:%S", time.localtime())
# Write text to the screen and the log file # Write text to the screen and the log file
def write_screen_log(text): def write_screen_log(text):
global log_file global log_file
print(text) print(text)
log_file.write(text + "\n") log_file.write(text + "\n")
# Load the tests from a csv formated file of the form name,command # Load the tests from a csv formated file of the form name,command
def get_tests(filename, devices_to_test): def get_tests(filename, devices_to_test):
tests = [] tests = []
if (os.path.exists(filename) == False): if os.path.exists(filename) == False:
print("FAILED: test_list \"" + filename + "\" does not exist.") print("FAILED: test_list \"" + filename + "\" does not exist.")
print("") print("")
write_help_info() write_help_info()
@@ -50,11 +61,11 @@ def get_tests(filename, devices_to_test):
file = open(filename, 'r') file = open(filename, 'r')
for line in file.readlines(): for line in file.readlines():
comment = re.search("^#.*", line) comment = re.search("^#.*", line)
if (comment): if comment:
continue continue
device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line) device_specific_match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*,\s*(.+?)\s*$", line)
if (device_specific_match): if device_specific_match:
if (device_specific_match.group(1) in devices_to_test): if device_specific_match.group(1) in devices_to_test:
test_path = string.replace(device_specific_match.group(3), '/', os.sep) test_path = string.replace(device_specific_match.group(3), '/', os.sep)
test_name = string.replace(device_specific_match.group(2), '/', os.sep) test_name = string.replace(device_specific_match.group(2), '/', os.sep)
tests.append((test_name, test_path)) tests.append((test_name, test_path))
@@ -62,7 +73,7 @@ def get_tests(filename, devices_to_test):
print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.") print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.")
continue continue
match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line) match = re.search("^\s*(.+?)\s*,\s*(.+?)\s*$", line)
if (match): if match:
test_path = string.replace(match.group(2), '/', os.sep) test_path = string.replace(match.group(2), '/', os.sep)
test_name = string.replace(match.group(1), '/', os.sep) test_name = string.replace(match.group(1), '/', os.sep)
tests.append((test_name, test_path)) tests.append((test_name, test_path))
@@ -75,17 +86,18 @@ def run_test_checking_output(current_directory, test_dir, log_file):
start_time = time.time() start_time = time.time()
# Create a temporary file for capturing the output from the test # Create a temporary file for capturing the output from the test
(output_fd, output_name) = tempfile.mkstemp() (output_fd, output_name) = tempfile.mkstemp()
if ( not os.path.exists(output_name)) : if not os.path.exists(output_name):
write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name) write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name)
os.close(output_fd) os.close(output_fd)
return -1 return -1
# Execute the test # Execute the test
program_to_run = test_dir_without_args = test_dir.split(None, 1)[0] program_to_run = test_dir_without_args = test_dir.split(None, 1)[0]
if ( os.sep == '\\' ) : program_to_run += ".exe" if os.sep == '\\':
if (os.path.exists(current_directory + os.sep + program_to_run)) : program_to_run += ".exe"
if os.path.exists(current_directory + os.sep + program_to_run):
os.chdir(os.path.dirname(current_directory + os.sep + test_dir_without_args)) os.chdir(os.path.dirname(current_directory + os.sep + test_dir_without_args))
try: try:
if (DEBUG): p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True) if DEBUG: p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
else: p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True) else: p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True)
except OSError: except OSError:
write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError)) write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError))
@@ -114,14 +126,14 @@ def run_test_checking_output(current_directory, test_dir, log_file):
os.close(output_fd) os.close(output_fd)
return -1 return -1
line = "" line = ""
while (not done or more_to_read): while not done or more_to_read:
os.fsync(output_fd) os.fsync(output_fd)
# Determine if we should display some output # Determine if we should display some output
elapsed_time = (time.time() - start_time) elapsed_time = (time.time() - start_time)
if (elapsed_time > seconds_between_status_updates): if elapsed_time > seconds_between_status_updates:
start_time = time.time() start_time = time.time()
# If we've received output from the test since the last update, display a # # If we've received output from the test since the last update, display a #
if (pointer != pointer_at_last_user_update): if pointer != pointer_at_last_user_update:
sys.stdout.write(":") sys.stdout.write(":")
else: else:
sys.stdout.write(".") sys.stdout.write(".")
@@ -129,9 +141,9 @@ def run_test_checking_output(current_directory, test_dir, log_file):
sys.stdout.flush() sys.stdout.flush()
# Check if we're done # Check if we're done
p.poll() p.poll()
if (not done and p.returncode != None): if not done and p.returncode != None:
if (p.returncode < 0): if p.returncode < 0:
if (not output_this_run): if not output_this_run:
print "" print ""
output_this_run = True output_this_run = True
write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode) + ".") write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode) + ".")
@@ -144,20 +156,20 @@ def run_test_checking_output(current_directory, test_dir, log_file):
time.sleep(1) time.sleep(1)
continue continue
# If we got a full line then process it # If we got a full line then process it
if (char_read == "\n"): if char_read == "\n":
# Look for failures and report them as such # Look for failures and report them as such
match = re.search(".*(FAILED|ERROR).*", line) match = re.search(".*(FAILED|ERROR).*", line)
if (match): if match:
if (not output_this_run): if not output_this_run:
print "" print ""
output_this_run = True output_this_run = True
print(" ==> " + line.replace('\n', '')) print(" ==> " + line.replace('\n', ''))
match = re.search(".*FAILED.*", line) match = re.search(".*FAILED.*", line)
if (match): if match:
failures_this_run = failures_this_run + 1 failures_this_run = failures_this_run + 1
match = re.search(".*(PASSED).*", line) match = re.search(".*(PASSED).*", line)
if (match): if match:
if (not output_this_run): if not output_this_run:
print "" print ""
output_this_run = True output_this_run = True
print(" " + line.replace('\n', '')) print(" " + line.replace('\n', ''))
@@ -167,7 +179,7 @@ def run_test_checking_output(current_directory, test_dir, log_file):
line = "" line = ""
pointer = pointer + 1 pointer = pointer + 1
# If we are at the end of the file, then re-open it to get new data # If we are at the end of the file, then re-open it to get new data
elif (char_read == ""): elif char_read == "":
more_to_read = False more_to_read = False
read_output.close() read_output.close()
time.sleep(1) time.sleep(1)
@@ -176,7 +188,7 @@ def run_test_checking_output(current_directory, test_dir, log_file):
read_output = open(output_name, 'r') read_output = open(output_name, 'r')
# See if there is more to read. This happens if the process ends and we have data left. # See if there is more to read. This happens if the process ends and we have data left.
read_output.seek(pointer) read_output.seek(pointer)
if (read_output.read(1) != ""): if read_output.read(1) != "":
more_to_read = True more_to_read = True
except IOError: except IOError:
write_screen_log("\n ==> ERROR: could not reopen output file from test.") write_screen_log("\n ==> ERROR: could not reopen output file from test.")
@@ -188,7 +200,7 @@ def run_test_checking_output(current_directory, test_dir, log_file):
# Now we are done, so write out any remaining data in the file: # Now we are done, so write out any remaining data in the file:
# This should only happen if the process exited with an error. # This should only happen if the process exited with an error.
os.fsync(output_fd) os.fsync(output_fd)
while (read_output.read(1) != ""): while read_output.read(1) != "":
log_file.write(read_output.read(1)) log_file.write(read_output.read(1))
# Return the total number of failures # Return the total number of failures
if (p.returncode == 0 and failures_this_run > 0): if (p.returncode == 0 and failures_this_run > 0):
@@ -207,7 +219,7 @@ def run_tests(tests) :
for test in tests: for test in tests:
# Print the name of the test we're running and the time # Print the name of the test we're running and the time
(test_name, test_dir) = test (test_name, test_dir) = test
if (test_dir != previous_test): if test_dir != previous_test:
print("========== " + test_dir) print("========== " + test_dir)
log_file.write("========================================================================================\n") log_file.write("========================================================================================\n")
log_file.write("========================================================================================\n") log_file.write("========================================================================================\n")
@@ -233,7 +245,7 @@ def run_tests(tests) :
write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.") write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.")
os.kill(process_pid, 9) os.kill(process_pid, 9)
answer = raw_input("Abort all tests? (y/n)") answer = raw_input("Abort all tests? (y/n)")
if (answer.find("y") != -1): if answer.find("y") != -1:
write_screen_log("\nUser chose to abort all tests.") write_screen_log("\nUser chose to abort all tests.")
log_file.close() log_file.close()
sys.exit(-1) sys.exit(-1)
@@ -243,7 +255,7 @@ def run_tests(tests) :
run_time = (time.time() - start_time) run_time = (time.time() - start_time)
# Move print the finish status # Move print the finish status
if (result == 0): if result == 0:
print("(" + get_time() + ") PASSED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")"), print("(" + get_time() + ") PASSED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")"),
else: else:
print("(" + get_time() + ") FAILED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")"), print("(" + get_time() + ") FAILED " + test_name.ljust(40) + ": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) + ")"),
@@ -253,7 +265,7 @@ def run_tests(tests) :
log_file.flush() log_file.flush()
print("") print("")
if (result != 0): if result != 0:
log_file.write(" *******************************************************************************************\n") log_file.write(" *******************************************************************************************\n")
log_file.write(" * (" + get_time() + ") Test " + test_name + " ==> FAILED: " + str(result) + "\n") log_file.write(" * (" + get_time() + ") Test " + test_name + " ==> FAILED: " + str(result) + "\n")
log_file.write(" *******************************************************************************************\n") log_file.write(" *******************************************************************************************\n")
@@ -266,23 +278,19 @@ def run_tests(tests) :
return failures return failures
# ######################## # ########################
# Begin OpenCL conformance run script # Begin OpenCL conformance run script
# ######################## # ########################
if (len(sys.argv) < 2): if len(sys.argv) < 2:
write_help_info() write_help_info()
sys.exit(-1) sys.exit(-1)
current_directory = os.getcwd() current_directory = os.getcwd()
# Open the log file # Open the log file
for arg in sys.argv: for arg in sys.argv:
match = re.search("log=(\S+)", arg) match = re.search("log=(\S+)", arg)
if (match): if match:
log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name
try: try:
log_file = open(log_file_name, "w") log_file = open(log_file_name, "w")
@@ -295,7 +303,7 @@ devices_to_test = []
for device in device_types: for device in device_types:
if device in sys.argv[2:]: if device in sys.argv[2:]:
devices_to_test.append(device) devices_to_test.append(device)
if (len(devices_to_test) == 0): if len(devices_to_test) == 0:
devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"] devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"]
write_screen_log("Testing on: " + str(devices_to_test)) write_screen_log("Testing on: " + str(devices_to_test))
@@ -316,12 +324,12 @@ for arg in sys.argv[2:]:
(test_name, test_dir) = test (test_name, test_dir) = test
if (test_name.find(arg) != -1 or test_dir.find(arg) != -1): if (test_name.find(arg) != -1 or test_dir.find(arg) != -1):
found_it = True found_it = True
if (test not in tests_to_use): if test not in tests_to_use:
tests_to_use.append(test) tests_to_use.append(test)
if (found_it == False): if found_it == False:
print("Failed to find a test matching " + arg) print("Failed to find a test matching " + arg)
if (len(tests_to_use) == 0): if len(tests_to_use) == 0:
if (num_of_patterns_to_match > 0): if num_of_patterns_to_match > 0:
print("FAILED: Failed to find any tests matching the given command-line options.") print("FAILED: Failed to find any tests matching the given command-line options.")
print("") print("")
write_help_info() write_help_info()
@@ -346,7 +354,7 @@ for device_to_test in devices_to_test:
write_screen_log("========================================================================================") write_screen_log("========================================================================================")
failures = run_tests(tests) failures = run_tests(tests)
write_screen_log("========================================================================================") write_screen_log("========================================================================================")
if (failures == 0): if failures == 0:
write_screen_log(">> TEST on " + device_to_test + " PASSED") write_screen_log(">> TEST on " + device_to_test + " PASSED")
else: else:
write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)") write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)")