From 315998511abe3959be21962a696911b43d4d5f59 Mon Sep 17 00:00:00 2001 From: Marco Antognini Date: Thu, 27 May 2021 09:06:13 +0100 Subject: [PATCH] Address data race in ThreadPool (#1265) ThreadSanitizer detects some data race in ThreadPool. They stem from inappropriate usage of volatile which are replaced with std::atomic variables in this patch. This patch focuses on data races identified while running the math_brute_force component. For example, it doesn't fully remove usage of ThreadPool_AtomicAdd from other components of the CTS. Furthermore, thread leaks, most likely because threads are not joined, are not addressed. Signed-off-by: Marco Antognini --- test_common/harness/ThreadPool.cpp | 44 +++++++++++++----------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/test_common/harness/ThreadPool.cpp b/test_common/harness/ThreadPool.cpp index 5dae1b4a..62798045 100644 --- a/test_common/harness/ThreadPool.cpp +++ b/test_common/harness/ThreadPool.cpp @@ -22,6 +22,8 @@ #if defined(__APPLE__) || defined(__linux__) || defined(_WIN32) // or any other POSIX system +#include + #if defined(_WIN32) #include #if defined(_MSC_VER) @@ -241,7 +243,7 @@ pthread_cond_t cond_var; // Condition variable state. How many iterations on the function left to run, // set to CL_INT_MAX to cause worker threads to exit. Note: this value might // go negative. -volatile cl_int gRunCount = 0; +std::atomic gRunCount{ 0 }; // State that only changes when the threadpool is not working. volatile TPFuncPtr gFunc_ptr = NULL; @@ -261,19 +263,20 @@ pthread_cond_t caller_cond_var; // # of threads intended to be running. Running threads will decrement this // as they discover they've run out of work to do. -volatile cl_int gRunning = 0; +std::atomic gRunning{ 0 }; // The total number of threads launched. -volatile cl_int gThreadCount = 0; +std::atomic gThreadCount{ 0 }; + #ifdef _WIN32 void ThreadPool_WorkerFunc(void *p) #else void *ThreadPool_WorkerFunc(void *p) #endif { - cl_uint threadID = ThreadPool_AtomicAdd((volatile cl_int *)p, 1); - cl_int item = ThreadPool_AtomicAdd(&gRunCount, -1); - // log_info( "ThreadPool_WorkerFunc start: gRunning = %d\n", gRunning ); + auto &tid = *static_cast *>(p); + cl_uint threadID = tid++; + cl_int item = gRunCount--; while (MAX_COUNT > item) { @@ -282,8 +285,6 @@ void *ThreadPool_WorkerFunc(void *p) // check for more work to do if (0 >= item) { - // log_info("Thread %d has run out of work.\n", threadID); - // No work to do. Attempt to block waiting for work #if defined(_WIN32) EnterCriticalSection(cond_lock); @@ -298,9 +299,7 @@ void *ThreadPool_WorkerFunc(void *p) } #endif // !_WIN32 - cl_int remaining = ThreadPool_AtomicAdd(&gRunning, -1); - // log_info("ThreadPool_WorkerFunc: gRunning = %d\n", - // remaining - 1); + cl_int remaining = gRunning--; if (1 == remaining) { // last thread out signal the main thread to wake up #if defined(_WIN32) @@ -350,7 +349,7 @@ void *ThreadPool_WorkerFunc(void *p) #endif // !_WIN32 // try again to get a valid item id - item = ThreadPool_AtomicAdd(&gRunCount, -1); + item = gRunCount--; if (MAX_COUNT <= item) // exit if we are done { #if defined(_WIN32) @@ -362,8 +361,7 @@ void *ThreadPool_WorkerFunc(void *p) } } - ThreadPool_AtomicAdd(&gRunning, 1); - // log_info("Thread %d has found work.\n", threadID); + gRunning++; #if defined(_WIN32) LeaveCriticalSection(cond_lock); @@ -447,12 +445,12 @@ void *ThreadPool_WorkerFunc(void *p) } // get the next item - item = ThreadPool_AtomicAdd(&gRunCount, -1); + item = gRunCount--; } exit: log_info("ThreadPool: thread %d exiting.\n", threadID); - ThreadPool_AtomicAdd(&gThreadCount, -1); + gThreadCount--; #if !defined(_WIN32) return NULL; #endif @@ -487,7 +485,7 @@ void ThreadPool_Init(void) { cl_int i; int err; - volatile cl_uint threadID = 0; + std::atomic threadID{ 0 }; // Check for manual override of multithreading code. We add this for better // debuggability. @@ -624,7 +622,7 @@ void ThreadPool_Init(void) } #endif // !_WIN32 - gRunning = gThreadCount; + gRunning = gThreadCount.load(); // init threads for (i = 0; i < gThreadCount; i++) { @@ -688,10 +686,6 @@ static BOOL CALLBACK _ThreadPool_Init(_PINIT_ONCE InitOnce, PVOID Parameter, void ThreadPool_Exit(void) { -#ifndef _WIN32 - int err; -#endif - int count; gRunCount = CL_INT_MAX; #if defined(__GNUC__) @@ -705,13 +699,13 @@ void ThreadPool_Exit(void) #endif // spin waiting for threads to die - for (count = 0; 0 != gThreadCount && count < 1000; count++) + for (int count = 0; 0 != gThreadCount && count < 1000; count++) { #if defined(_WIN32) _WakeAllConditionVariable(cond_var); Sleep(1); #else // !_WIN32 - if ((err = pthread_cond_broadcast(&cond_var))) + if (int err = pthread_cond_broadcast(&cond_var)) { log_error("Error %d from pthread_cond_broadcast. Unable to wake up " "work threads. ThreadPool_Exit failed.\n", @@ -725,7 +719,7 @@ void ThreadPool_Exit(void) if (gThreadCount) log_error("Error: Thread pool timed out after 1 second with %d threads " "still active.\n", - gThreadCount); + gThreadCount.load()); else log_info("Thread pool exited in a orderly fashion.\n"); }