Address data race in ThreadPool (#1265)

ThreadSanitizer detects some data race in ThreadPool. They stem from
inappropriate usage of volatile which are replaced with std::atomic
variables in this patch.

This patch focuses on data races identified while running the
math_brute_force component. For example, it doesn't fully remove usage
of ThreadPool_AtomicAdd from other components of the CTS. Furthermore,
thread leaks, most likely because threads are not joined, are not
addressed.

Signed-off-by: Marco Antognini <marco.antognini@arm.com>
This commit is contained in:
Marco Antognini
2021-05-27 09:06:13 +01:00
committed by GitHub
parent bd3135dd01
commit 315998511a

View File

@@ -22,6 +22,8 @@
#if defined(__APPLE__) || defined(__linux__) || defined(_WIN32) #if defined(__APPLE__) || defined(__linux__) || defined(_WIN32)
// or any other POSIX system // or any other POSIX system
#include <atomic>
#if defined(_WIN32) #if defined(_WIN32)
#include <windows.h> #include <windows.h>
#if defined(_MSC_VER) #if defined(_MSC_VER)
@@ -241,7 +243,7 @@ pthread_cond_t cond_var;
// Condition variable state. How many iterations on the function left to run, // Condition variable state. How many iterations on the function left to run,
// set to CL_INT_MAX to cause worker threads to exit. Note: this value might // set to CL_INT_MAX to cause worker threads to exit. Note: this value might
// go negative. // go negative.
volatile cl_int gRunCount = 0; std::atomic<cl_int> gRunCount{ 0 };
// State that only changes when the threadpool is not working. // State that only changes when the threadpool is not working.
volatile TPFuncPtr gFunc_ptr = NULL; volatile TPFuncPtr gFunc_ptr = NULL;
@@ -261,19 +263,20 @@ pthread_cond_t caller_cond_var;
// # of threads intended to be running. Running threads will decrement this // # of threads intended to be running. Running threads will decrement this
// as they discover they've run out of work to do. // as they discover they've run out of work to do.
volatile cl_int gRunning = 0; std::atomic<cl_int> gRunning{ 0 };
// The total number of threads launched. // The total number of threads launched.
volatile cl_int gThreadCount = 0; std::atomic<cl_int> gThreadCount{ 0 };
#ifdef _WIN32 #ifdef _WIN32
void ThreadPool_WorkerFunc(void *p) void ThreadPool_WorkerFunc(void *p)
#else #else
void *ThreadPool_WorkerFunc(void *p) void *ThreadPool_WorkerFunc(void *p)
#endif #endif
{ {
cl_uint threadID = ThreadPool_AtomicAdd((volatile cl_int *)p, 1); auto &tid = *static_cast<std::atomic<cl_uint> *>(p);
cl_int item = ThreadPool_AtomicAdd(&gRunCount, -1); cl_uint threadID = tid++;
// log_info( "ThreadPool_WorkerFunc start: gRunning = %d\n", gRunning ); cl_int item = gRunCount--;
while (MAX_COUNT > item) while (MAX_COUNT > item)
{ {
@@ -282,8 +285,6 @@ void *ThreadPool_WorkerFunc(void *p)
// check for more work to do // check for more work to do
if (0 >= item) if (0 >= item)
{ {
// log_info("Thread %d has run out of work.\n", threadID);
// No work to do. Attempt to block waiting for work // No work to do. Attempt to block waiting for work
#if defined(_WIN32) #if defined(_WIN32)
EnterCriticalSection(cond_lock); EnterCriticalSection(cond_lock);
@@ -298,9 +299,7 @@ void *ThreadPool_WorkerFunc(void *p)
} }
#endif // !_WIN32 #endif // !_WIN32
cl_int remaining = ThreadPool_AtomicAdd(&gRunning, -1); cl_int remaining = gRunning--;
// log_info("ThreadPool_WorkerFunc: gRunning = %d\n",
// remaining - 1);
if (1 == remaining) if (1 == remaining)
{ // last thread out signal the main thread to wake up { // last thread out signal the main thread to wake up
#if defined(_WIN32) #if defined(_WIN32)
@@ -350,7 +349,7 @@ void *ThreadPool_WorkerFunc(void *p)
#endif // !_WIN32 #endif // !_WIN32
// try again to get a valid item id // try again to get a valid item id
item = ThreadPool_AtomicAdd(&gRunCount, -1); item = gRunCount--;
if (MAX_COUNT <= item) // exit if we are done if (MAX_COUNT <= item) // exit if we are done
{ {
#if defined(_WIN32) #if defined(_WIN32)
@@ -362,8 +361,7 @@ void *ThreadPool_WorkerFunc(void *p)
} }
} }
ThreadPool_AtomicAdd(&gRunning, 1); gRunning++;
// log_info("Thread %d has found work.\n", threadID);
#if defined(_WIN32) #if defined(_WIN32)
LeaveCriticalSection(cond_lock); LeaveCriticalSection(cond_lock);
@@ -447,12 +445,12 @@ void *ThreadPool_WorkerFunc(void *p)
} }
// get the next item // get the next item
item = ThreadPool_AtomicAdd(&gRunCount, -1); item = gRunCount--;
} }
exit: exit:
log_info("ThreadPool: thread %d exiting.\n", threadID); log_info("ThreadPool: thread %d exiting.\n", threadID);
ThreadPool_AtomicAdd(&gThreadCount, -1); gThreadCount--;
#if !defined(_WIN32) #if !defined(_WIN32)
return NULL; return NULL;
#endif #endif
@@ -487,7 +485,7 @@ void ThreadPool_Init(void)
{ {
cl_int i; cl_int i;
int err; int err;
volatile cl_uint threadID = 0; std::atomic<cl_uint> threadID{ 0 };
// Check for manual override of multithreading code. We add this for better // Check for manual override of multithreading code. We add this for better
// debuggability. // debuggability.
@@ -624,7 +622,7 @@ void ThreadPool_Init(void)
} }
#endif // !_WIN32 #endif // !_WIN32
gRunning = gThreadCount; gRunning = gThreadCount.load();
// init threads // init threads
for (i = 0; i < gThreadCount; i++) for (i = 0; i < gThreadCount; i++)
{ {
@@ -688,10 +686,6 @@ static BOOL CALLBACK _ThreadPool_Init(_PINIT_ONCE InitOnce, PVOID Parameter,
void ThreadPool_Exit(void) void ThreadPool_Exit(void)
{ {
#ifndef _WIN32
int err;
#endif
int count;
gRunCount = CL_INT_MAX; gRunCount = CL_INT_MAX;
#if defined(__GNUC__) #if defined(__GNUC__)
@@ -705,13 +699,13 @@ void ThreadPool_Exit(void)
#endif #endif
// spin waiting for threads to die // spin waiting for threads to die
for (count = 0; 0 != gThreadCount && count < 1000; count++) for (int count = 0; 0 != gThreadCount && count < 1000; count++)
{ {
#if defined(_WIN32) #if defined(_WIN32)
_WakeAllConditionVariable(cond_var); _WakeAllConditionVariable(cond_var);
Sleep(1); Sleep(1);
#else // !_WIN32 #else // !_WIN32
if ((err = pthread_cond_broadcast(&cond_var))) if (int err = pthread_cond_broadcast(&cond_var))
{ {
log_error("Error %d from pthread_cond_broadcast. Unable to wake up " log_error("Error %d from pthread_cond_broadcast. Unable to wake up "
"work threads. ThreadPool_Exit failed.\n", "work threads. ThreadPool_Exit failed.\n",
@@ -725,7 +719,7 @@ void ThreadPool_Exit(void)
if (gThreadCount) if (gThreadCount)
log_error("Error: Thread pool timed out after 1 second with %d threads " log_error("Error: Thread pool timed out after 1 second with %d threads "
"still active.\n", "still active.\n",
gThreadCount); gThreadCount.load());
else else
log_info("Thread pool exited in a orderly fashion.\n"); log_info("Thread pool exited in a orderly fashion.\n");
} }