From c784e03be52ebf6474c653262033f7f79ddf82be Mon Sep 17 00:00:00 2001 From: castano Date: Thu, 3 May 2012 17:04:53 +0000 Subject: [PATCH] Try to increase robustness of thread pool. --- src/nvthread/Atomic.h | 23 +++++++++++++++++++++-- src/nvthread/Thread.cpp | 4 ++++ src/nvthread/ThreadPool.cpp | 17 +++++++++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/nvthread/Atomic.h b/src/nvthread/Atomic.h index 93455a9..9da7db6 100644 --- a/src/nvthread/Atomic.h +++ b/src/nvthread/Atomic.h @@ -48,7 +48,6 @@ namespace nv { nvDebugCheck((intptr_t(ptr) & 3) == 0); #if POSH_CPU_X86 || POSH_CPU_X86_64 - nvCompilerReadBarrier(); uint32 ret = *ptr; // on x86, loads are Acquire nvCompilerReadBarrier(); return ret; @@ -73,7 +72,6 @@ namespace nv { nvDebugCheck((intptr_t(&value) & 3) == 0); #if POSH_CPU_X86 || POSH_CPU_X86_64 - nvCompilerWriteBarrier(); *ptr = value; // on x86, stores are Release nvCompilerWriteBarrier(); #elif POSH_CPU_STRONGARM @@ -87,6 +85,27 @@ namespace nv { } + template + inline void storeReleasePointer(volatile T * pTo, T from) + { + NV_COMPILER_CHECK(sizeof(T) == sizeof(intptr_t)); + nvDebugCheck((((intptr_t)pTo) % sizeof(intptr_t)) == 0); + nvDebugCheck((((intptr_t)&from) % sizeof(intptr_t)) == 0); + nvCompilerWriteBarrier(); + *pTo = from; // on x86, stores are Release + } + + template + inline T loadAcquirePointer(volatile T * ptr) + { + NV_COMPILER_CHECK(sizeof(T) == sizeof(intptr_t)); + nvDebugCheck((((intptr_t)ptr) % sizeof(intptr_t)) == 0); + T ret = *ptr; // on x86, loads are Acquire + nvCompilerReadBarrier(); + return ret; + } + + // Atomics. @@ Assuming sequential memory order? #if NV_CC_MSVC diff --git a/src/nvthread/Thread.cpp b/src/nvthread/Thread.cpp index 68dc0c5..e0f8390 100644 --- a/src/nvthread/Thread.cpp +++ b/src/nvthread/Thread.cpp @@ -129,6 +129,10 @@ bool Thread::isRunning () const DWORD result = WaitForMultipleObjects(count, handles, TRUE, INFINITE); + for (uint i = 0; i < count; i++) { + CloseHandle (threads->p->thread); + threads->p->thread = 0; + } delete [] handles; #else*/ diff --git a/src/nvthread/ThreadPool.cpp b/src/nvthread/ThreadPool.cpp index af111f1..daa2837 100644 --- a/src/nvthread/ThreadPool.cpp +++ b/src/nvthread/ThreadPool.cpp @@ -3,6 +3,7 @@ #include "ThreadPool.h" #include "Mutex.h" #include "Thread.h" +#include "Atomic.h" #include "nvcore/Utils.h" @@ -49,17 +50,19 @@ AutoPtr s_pool; /*static*/ void ThreadPool::workerFunc(void * arg) { - uint i = toU32((uintptr_t)arg); // This is OK, because workerCount should always be <<< 2^32 + uint i = toU32((uintptr_t)arg); // This is OK, because workerCount should always be much smaller than 2^32 while(true) { s_pool->startEvents[i].wait(); - if (s_pool->func == NULL) { - return; // @@ should we post finish event anyway? + nv::ThreadFunc * func = loadAcquirePointer(&s_pool->func); + + if (func == NULL) { + return; } - s_pool->func(s_pool->arg); + func(s_pool->arg); s_pool->finishEvents[i].post(); } @@ -102,11 +105,13 @@ void ThreadPool::start(ThreadFunc * func, void * arg) wait(); // Set our desired function. - this->func = func; - this->arg = arg; + storeReleasePointer(&this->func, func); + storeReleasePointer(&this->arg, arg); allIdle = false; + nvCompilerWriteBarrier(); + // Resume threads. Event::post(startEvents, workerCount); }