Files
OpenCL-CTS/test_conformance/run_conformance.py
Kevin Petit 05a11d8e49 Synchronise with Khronos-private Gitlab branch
The maintenance of the conformance tests is moving to Github.

This commit contains all the changes that have been done in
Gitlab since the first public release of the conformance tests.

Signed-off-by: Kevin Petit kevin.petit@arm.com
2019-03-05 16:24:34 +00:00

358 lines
14 KiB
Python
Executable File

#! /usr/bin/python
#/******************************************************************
#//
#// OpenCL Conformance Tests
#//
#// Copyright: (c) 2008-2009 by Apple Inc. All Rights Reserved.
#//
#******************************************************************/
import os, re, sys, subprocess, time, commands, tempfile, math, string
DEBUG = 0
log_file_name = "opencl_conformance_results_" + time.strftime("%Y-%m-%d_%H-%M", time.localtime())+ ".log"
process_pid = 0
# The amount of time between printing a "." (if no output from test) or ":" (if output)
# to the screen while the tests are running.
seconds_between_status_updates = 60*60*24*7 # effectively never
# Help info
def write_help_info() :
print("run_conformance.py test_list [CL_DEVICE_TYPE(s) to test] [partial-test-names, ...] [log=path/to/log/file/]")
print(" test_list - the .csv file containing the test names and commands to run the tests.")
print(" [partial-test-names, ...] - optional partial strings to select a subset of the tests to run.")
print(" [CL_DEVICE_TYPE(s) to test] - list of CL device types to test, default is CL_DEVICE_TYPE_DEFAULT.")
print(" [log=path/to/log/file/] - provide a path for the test log file, default is in the current directory.")
print(" (Note: spaces are not allowed in the log file path.")
# Get the time formatted nicely
def get_time() :
return time.strftime("%d-%b %H:%M:%S", time.localtime())
# Write text to the screen and the log file
def write_screen_log(text) :
global log_file
print(text)
log_file.write(text+"\n")
# Load the tests from a csv formated file of the form name,command
def get_tests(filename, devices_to_test):
tests = []
if (os.path.exists(filename) == False):
print("FAILED: test_list \"" + filename + "\" does not exist.")
print("")
write_help_info()
sys.exit(-1)
file = open(filename, 'r')
for line in file.readlines():
comment = re.search("^#.*", line)
if (comment):
continue
device_specific_match = re.search("^\s*(.+)\s*,\s*(.+)\s*,\s*(.+)\s*", line)
if (device_specific_match):
if (device_specific_match.group(1) in devices_to_test):
test_path = string.replace(device_specific_match.group(3), '/', os.sep)
test_name = string.replace(device_specific_match.group(2), '/', os.sep)
tests.append((test_name, test_path))
else:
print("Skipping " + device_specific_match.group(2) + " because " + device_specific_match.group(1) + " is not in the list of devices to test.")
continue
match = re.search("^\s*(.+)\s*,\s*(.+)\s*", line)
if (match):
test_path = string.replace(match.group(2), '/', os.sep)
test_name = string.replace(match.group(1), '/', os.sep)
tests.append((test_name, test_path))
return tests
def run_test_checking_output(current_directory, test_dir, log_file):
global process_pid, seconds_between_status_updates
failures_this_run = 0
start_time = time.time()
# Create a temporary file for capturing the output from the test
(output_fd, output_name) = tempfile.mkstemp()
if ( not os.path.exists(output_name)) :
write_screen_log("\n ==> ERROR: could not create temporary file %s ." % output_name)
os.close(output_fd)
return -1
# Execute the test
program_to_run = test_dir_without_args = test_dir.split(None, 1)[0]
if ( os.sep == '\\' ) : program_to_run += ".exe"
if (os.path.exists(current_directory + os.sep + program_to_run)) :
os.chdir(os.path.dirname(current_directory+os.sep+test_dir_without_args) )
try:
if (DEBUG): p = subprocess.Popen("", stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
else : p = subprocess.Popen(current_directory + os.sep + test_dir, stderr=output_fd, stdout=output_fd, shell=True)
except OSError:
write_screen_log("\n ==> ERROR: failed to execute test. Failing test. : " + str(OSError))
os.close(output_fd)
return -1
else:
write_screen_log("\n ==> ERROR: test file (" + current_directory + os.sep + program_to_run +") does not exist. Failing test.")
os.close(output_fd)
return -1
# Set the global pid so we can kill it if this is aborted
process_pid = p.pid
# Read one character at a time from the temporary output file while the process is running.
# When we get an end-of-line, look for errors and write the results to the log file.
# This allows us to process the file as it is being produced.
# Keep track of the state for reading
# Whether we are done, if we have more to read, and where in the file we last read
done = False
more_to_read = True
pointer = 0
pointer_at_last_user_update = 0
output_this_run = False
try:
read_output = open(output_name, 'r')
except IOError:
write_screen_log("\n ==> ERROR: could not open output file from test.")
os.close(output_fd)
return -1
line = ""
while (not done or more_to_read):
os.fsync(output_fd)
# Determine if we should display some output
elapsed_time = (time.time() - start_time)
if (elapsed_time > seconds_between_status_updates):
start_time = time.time()
# If we've received output from the test since the last update, display a #
if (pointer != pointer_at_last_user_update):
sys.stdout.write(":")
else:
sys.stdout.write(".")
pointer_at_last_user_update = pointer
sys.stdout.flush()
# Check if we're done
p.poll()
if (not done and p.returncode != None):
if (p.returncode < 0):
if (not output_this_run):
print ""
output_this_run = True
write_screen_log(" ==> ERROR: test killed/crashed: " + str(p.returncode)+ ".")
done = True
# Try reading
try:
read_output.seek(pointer)
char_read = read_output.read(1)
except IOError:
time.sleep(1)
continue
# If we got a full line then process it
if (char_read == "\n"):
# Look for failures and report them as such
match = re.search(".*(FAILED|ERROR).*", line)
if (match):
if (not output_this_run):
print ""
output_this_run = True
print(" ==> " + line.replace('\n',''))
match = re.search(".*FAILED.*", line)
if (match):
failures_this_run = failures_this_run + 1
match = re.search(".*(PASSED).*", line)
if (match):
if (not output_this_run):
print ""
output_this_run = True
print(" " + line.replace('\n',''))
# Write it to the log
log_file.write(" " + line +"\n")
log_file.flush()
line = ""
pointer = pointer + 1
# If we are at the end of the file, then re-open it to get new data
elif (char_read == ""):
more_to_read = False
read_output.close()
time.sleep(1)
try:
os.fsync(output_fd)
read_output = open(output_name, 'r')
# See if there is more to read. This happens if the process ends and we have data left.
read_output.seek(pointer)
if (read_output.read(1) != ""):
more_to_read = True
except IOError:
write_screen_log("\n ==> ERROR: could not reopen output file from test.")
return -1
done = True
else:
line = line + char_read
pointer = pointer + 1
# Now we are done, so write out any remaining data in the file:
# This should only happen if the process exited with an error.
os.fsync(output_fd)
while (read_output.read(1) != ""):
log_file.write(read_output.read(1))
# Return the total number of failures
if (p.returncode == 0 and failures_this_run > 0):
write_screen_log("\n ==> ERROR: Test returned 0, but number of FAILED lines reported is " + str(failures_this_run) +".")
return failures_this_run
return p.returncode
def run_tests(tests) :
global curent_directory
global process_pid
# Run the tests
failures = 0
previous_test = None
test_number = 1
for test in tests:
# Print the name of the test we're running and the time
(test_name, test_dir) = test
if (test_dir != previous_test):
print("========== " + test_dir)
log_file.write("========================================================================================\n")
log_file.write("========================================================================================\n")
log_file.write("(" + get_time() + ") Running Tests: " + test_dir +"\n")
log_file.write("========================================================================================\n")
log_file.write("========================================================================================\n")
previous_test = test_dir
print("("+get_time()+") BEGIN " + test_name.ljust(40) +": "),
log_file.write(" ----------------------------------------------------------------------------------------\n")
log_file.write(" (" + get_time() + ") Running Sub Test: " + test_name + "\n")
log_file.write(" ----------------------------------------------------------------------------------------\n")
log_file.flush()
sys.stdout.flush()
# Run the test
result = 0
start_time = time.time()
try:
process_pid = 0
result = run_test_checking_output(current_directory, test_dir, log_file)
except KeyboardInterrupt:
# Catch an interrupt from the user
write_screen_log("\nFAILED: Execution interrupted. Killing test process, but not aborting full test run.")
os.kill(process_pid, 9)
answer = raw_input("Abort all tests? (y/n)")
if (answer.find("y") != -1):
write_screen_log("\nUser chose to abort all tests.")
log_file.close()
sys.exit(-1)
else:
write_screen_log("\nUser chose to continue with other tests. Reporting this test as failed.")
result = 1
run_time = (time.time() - start_time)
# Move print the finish status
if (result == 0):
print("("+get_time()+") PASSED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"),
else:
print("("+get_time()+") FAILED " + test_name.ljust(40) +": (" + str(int(run_time)).rjust(3) + "s, test " + str(test_number).rjust(3) + os.sep + str(len(tests)) +")"),
test_number = test_number + 1
log_file.write(" ----------------------------------------------------------------------------------------\n")
log_file.flush()
print("")
if (result != 0):
log_file.write(" *******************************************************************************************\n")
log_file.write(" * ("+get_time()+") Test " + test_name + " ==> FAILED: " + str(result)+"\n")
log_file.write(" *******************************************************************************************\n")
failures = failures + 1
else:
log_file.write(" ("+get_time()+") Test " + test_name +" passed in " + str(run_time) + "s\n")
log_file.write(" ----------------------------------------------------------------------------------------\n")
log_file.write("\n")
return failures
# ########################
# Begin OpenCL conformance run script
# ########################
if (len(sys.argv) < 2):
write_help_info()
sys.exit(-1)
current_directory = os.getcwd()
# Open the log file
for arg in sys.argv:
match = re.search("log=(\S+)", arg)
if (match):
log_file_name = match.group(1).rstrip('/') + os.sep + log_file_name
try:
log_file = open(log_file_name, "w")
except IOError:
print "Could not open log file " + log_file_name
# Determine which devices to test
device_types = ["CL_DEVICE_TYPE_DEFAULT", "CL_DEVICE_TYPE_CPU", "CL_DEVICE_TYPE_GPU", "CL_DEVICE_TYPE_ACCELERATOR", "CL_DEVICE_TYPE_ALL"]
devices_to_test = []
for device in device_types:
if device in sys.argv[2:]:
devices_to_test.append(device)
if (len(devices_to_test) == 0):
devices_to_test = ["CL_DEVICE_TYPE_DEFAULT"]
write_screen_log("Testing on: " + str(devices_to_test))
# Get the tests
tests = get_tests(sys.argv[1], devices_to_test)
# If tests are specified on the command line then run just those ones
tests_to_use = []
num_of_patterns_to_match = 0
for arg in sys.argv[2:]:
if arg in device_types:
continue
if re.search("log=(\S+)", arg):
continue
num_of_patterns_to_match = num_of_patterns_to_match + 1
found_it = False
for test in tests:
(test_name, test_dir) = test
if (test_name.find(arg) != -1 or test_dir.find(arg) != -1):
found_it = True
if (test not in tests_to_use):
tests_to_use.append(test)
if (found_it == False):
print("Failed to find a test matching " + arg)
if (len(tests_to_use) == 0):
if (num_of_patterns_to_match > 0):
print("FAILED: Failed to find any tests matching the given command-line options.")
print("")
write_help_info()
sys.exit(-1)
else:
tests = tests_to_use[:]
write_screen_log("Test execution arguments: " + str(sys.argv))
write_screen_log("Logging to file " + log_file_name +".")
write_screen_log("Loaded tests from " + sys.argv[1] + ", total of " + str(len(tests)) + " tests selected to run:")
for (test_name, test_command) in tests:
write_screen_log(test_name.ljust(50) + " (" + test_command +")")
# Run the tests
total_failures = 0
for device_to_test in devices_to_test:
os.environ['CL_DEVICE_TYPE'] = device_to_test
write_screen_log("========================================================================================")
write_screen_log("========================================================================================")
write_screen_log(("Setting CL_DEVICE_TYPE to " + device_to_test).center(90))
write_screen_log("========================================================================================")
write_screen_log("========================================================================================")
failures = run_tests(tests)
write_screen_log("========================================================================================")
if (failures == 0):
write_screen_log(">> TEST on " + device_to_test + " PASSED")
else:
write_screen_log(">> TEST on " + device_to_test + " FAILED (" + str(failures) + " FAILURES)")
write_screen_log("========================================================================================")
total_failures = total_failures + failures
write_screen_log("("+get_time()+") Testing complete. " + str(total_failures) + " failures for " + str(len(tests)) + " tests.")
log_file.close()