Merge changes from the witness.

This commit is contained in:
castano
2011-09-27 17:48:46 +00:00
parent 9c0658edca
commit 3c0ab2d3f3
47 changed files with 1811 additions and 186 deletions

View File

@ -0,0 +1,26 @@
PROJECT(nvthreads)
SET(THREADS_SRCS
nvthreads.h
Mutex.h Mutex.cpp
SpinWaiter.h SpinWaiter.cpp
Thread.h Thread.cpp
ThreadLocalStorage.h ThreadLocalStorage.cpp)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
# targets
ADD_DEFINITIONS(-DNVTHREADS_EXPORTS)
IF(NVTHREADS_SHARED)
ADD_LIBRARY(nvthreads SHARED ${THREADS_SRCS})
ELSE(NVTHREADS_SHARED)
ADD_LIBRARY(nvthreads ${THREADS_SRCS})
ENDIF(NVTHREADS_SHARED)
TARGET_LINK_LIBRARIES(nvthreads ${LIBS} nvcore)
INSTALL(TARGETS nvthreads
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib/static)

52
src/nvthread/Event.cpp Normal file
View File

@ -0,0 +1,52 @@
// This code is in the public domain -- castano@gmail.com
#include "Event.h"
#if NV_OS_WIN32
#include "Win32.h"
#elif NV_OS_UNIX
#include <pthread.h>
#endif
using namespace nv;
#if NV_OS_WIN32
struct Event::Private {
HANDLE handle;
};
Event::Event() : m(new Private) {
m->handle = CreateEvent(NULL, FALSE, FALSE, NULL);
}
Event::~Event() {
CloseHandle(m->handle);
}
void Event::post() {
SetEvent(m->handle);
}
void Event::wait() {
WaitForSingleObject(m->handle, INFINITE);
}
/*static*/ void Event::post(Event * events, uint count) {
for (uint i = 0; i < count; i++) {
events[i].post();
}
}
/*static*/ void Event::wait(Event * events, uint count) {
// @@ Use wait for multiple objects?
for (uint i = 0; i < count; i++) {
events[i].wait();
}
}
#elif NV_OS_UNIX
// @@
#endif

34
src/nvthread/Event.h Normal file
View File

@ -0,0 +1,34 @@
// This code is in the public domain -- castano@gmail.com
#pragma once
#ifndef NV_THREAD_EVENT_H
#define NV_THREAD_EVENT_H
#include "nvthread.h"
#include "nvcore/Ptr.h"
namespace nv
{
// This is intended to be used by a single waiter thread.
class NVTHREAD_CLASS Event
{
NV_FORBID_COPY(Event);
public:
Event();
~Event();
void post();
void wait(); // Wait resets the event.
static void post(Event * events, uint count);
static void wait(Event * events, uint count);
private:
struct Private;
AutoPtr<Private> m;
};
} // nv namespace
#endif // NV_THREAD_EVENT_H

89
src/nvthread/Mutex.cpp Normal file
View File

@ -0,0 +1,89 @@
// This code is in the public domain -- castano@gmail.com
#include "Mutex.h"
#if NV_OS_WIN32
#include "Win32.h"
#elif NV_OS_UNIX
#include <pthread.h>
#include <errno.h> // EBUSY
#endif // NV_OS
using namespace nv;
#if NV_OS_WIN32
struct Mutex::Private {
CRITICAL_SECTION mutex;
};
Mutex::Mutex () : m(new Private)
{
InitializeCriticalSection(&m->mutex);
}
Mutex::~Mutex ()
{
DeleteCriticalSection(&m->mutex);
}
void Mutex::lock()
{
EnterCriticalSection(&m->mutex);
}
bool Mutex::tryLock()
{
return TryEnterCriticalSection(&m->mutex) != 0;
}
void Mutex::unlock()
{
LeaveCriticalSection(&m->mutex);
}
#elif NV_OS_UNIX
struct Mutex::Private {
pthread_mutex_t mutex;
};
Mutex::Mutex () : m(new Private)
{
int result = pthread_mutex_init(&m->mutex , NULL);
nvDebugCheck(result == 0);
}
Mutex::~Mutex ()
{
int result = pthread_mutex_destroy(&m->mutex);
nvDebugCheck(result == 0);
}
void Mutex::lock()
{
int result = pthread_mutex_lock(&m->mutex);
nvDebugCheck(result == 0);
}
bool Mutex::tryLock()
{
int result = pthread_mutex_trylock(&m->mutex);
nvDebugCheck(result == 0 || result == EBUSY);
return result == 0;
}
void Mutex::unlock()
{
int result = pthread_mutex_unlock(&m->mutex);
nvDebugCheck(result == 0);
}
#endif // NV_OS

47
src/nvthread/Mutex.h Normal file
View File

@ -0,0 +1,47 @@
// This code is in the public domain -- castano@gmail.com
#pragma once
#ifndef NV_THREAD_MUTEX_H
#define NV_THREAD_MUTEX_H
#include "nvthread.h"
#include "nvcore/Ptr.h"
namespace nv
{
class NVTHREAD_CLASS Mutex
{
NV_FORBID_COPY(Mutex);
public:
Mutex ();
~Mutex ();
void lock();
bool tryLock();
void unlock();
private:
struct Private;
AutoPtr<Private> m;
};
// Templated lock that can be used with any mutex.
template <class M>
class Lock
{
NV_FORBID_COPY(Lock);
public:
Lock (M & m) : m_mutex (m) { m_mutex.lock(); }
~Lock () { m_mutex.unlock(); }
private:
M & m_mutex;
};
} // nv namespace
#endif // NV_THREAD_MUTEX_H

View File

@ -0,0 +1,61 @@
// This code is in the public domain -- Ignacio Casta<74>o <castano@gmail.com>
#include "ParallelFor.h"
#include "Thread.h"
#include "Atomic.h"
#include "ThreadPool.h"
using namespace nv;
#define ENABLE_PARALLEL_FOR 1
void worker(void * arg) {
ParallelFor * owner = (ParallelFor *)arg;
while(true) {
// Consume one element at a time. @@ Might be more efficient to have custom grain.
uint i = atomicIncrement(&owner->idx);
if (i > owner->count) {
break;
}
owner->task(owner->context, i - 1);
}
}
ParallelFor::ParallelFor(ForTask * task, void * context) : task(task), context(context) {
#if ENABLE_PARALLEL_FOR
pool = ThreadPool::acquire();
#endif
}
ParallelFor::~ParallelFor() {
#if ENABLE_PARALLEL_FOR
ThreadPool::release(pool);
#endif
}
void ParallelFor::run(uint count) {
#if ENABLE_PARALLEL_FOR
storeRelease(&this->count, count);
// Init atomic counter to zero.
storeRelease(&idx, 0);
// Start threads.
pool->start(worker, this);
// Wait for all threads to complete.
pool->wait();
nvDebugCheck(idx >= count);
#else
for (int i = 0; i < count; i++) {
task(context, i);
}
#endif
}

View File

@ -0,0 +1,38 @@
// This code is in the public domain -- Ignacio Casta<74>o <castano@gmail.com>
#pragma once
#ifndef NV_THREAD_PARALLELFOR_H
#define NV_THREAD_PARALLELFOR_H
#include "nvthread.h"
//#include "Atomic.h" // atomic<uint>
namespace nv
{
class Thread;
class ThreadPool;
typedef void ForTask(void * context, int id);
struct ParallelFor {
ParallelFor(ForTask * task, void * context);
~ParallelFor();
void run(uint count);
// Invariant:
ForTask * task;
void * context;
ThreadPool * pool;
//uint workerCount; // @@ Move to thread pool.
//Thread * workers;
// State:
uint count;
/*atomic<uint>*/ uint idx;
};
} // nv namespace
#endif // NV_THREAD_PARALLELFOR_H

136
src/nvthread/Thread.cpp Normal file
View File

@ -0,0 +1,136 @@
// This code is in the public domain -- castano@gmail.com
#include "Thread.h"
#if NV_OS_WIN32
#include "Win32.h"
#elif NV_OS_UNIX
#include <pthread.h>
#include <unistd.h> // usleep
#endif
using namespace nv;
struct Thread::Private
{
#if NV_OS_WIN32
HANDLE thread;
#elif NV_OS_UNIX
pthread_t thread;
#endif
ThreadFunc * func;
void * arg;
};
#if NV_OS_WIN32
unsigned long __stdcall threadFunc(void * arg) {
Thread * thread = (Thread *)arg;
thread->func(thread->arg);
return 0;
}
#elif NV_OS_UNIX
extern "C" void * threadFunc(void * arg) {
Thread * thread = (Thread *)arg;
thread->func(thread->arg);
pthread_exit(0);
}
#endif
Thread::Thread() : p(new Private)
{
p->thread = 0;
}
Thread::~Thread()
{
nvDebugCheck(p->thread == 0);
}
void Thread::start(ThreadFunc * func, void * arg)
{
this->func = func;
this->arg = arg;
#if NV_OS_WIN32
p->thread = CreateThread(NULL, 0, threadFunc, this, 0, NULL);
//p->thread = (HANDLE)_beginthreadex (0, 0, threadFunc, this, 0, NULL); // @@ So that we can call CRT functions...
nvDebugCheck(p->thread != NULL);
#elif NV_OS_UNIX
int result = pthread_create(&p->thread, NULL, threadFunc, this);
nvDebugCheck(result == 0);
#endif
}
void Thread::wait()
{
#if NV_OS_WIN32
DWORD status = WaitForSingleObject (p->thread, INFINITE);
nvCheck (status == WAIT_OBJECT_0);
BOOL ok = CloseHandle (p->thread);
p->thread = NULL;
nvCheck (ok);
#elif NV_OS_UNIX
int result = pthread_join(p->thread, NULL);
p->thread = 0;
nvDebugCheck(result == 0);
#endif
}
bool Thread::isRunning () const
{
#if NV_OS_WIN32
return p->thread != NULL;
#elif NV_OS_UNIX
return p->thread != 0;
#endif
}
/*static*/ void Thread::spinWait(uint count)
{
for (uint i = 0; i < count; i++) {}
}
/*static*/ void Thread::yield()
{
#if NV_OS_WIN32
SwitchToThread();
#elif NV_OS_UNIX
int result = sched_yield();
nvDebugCheck(result == 0);
#endif
}
/*static*/ void Thread::sleep(uint ms)
{
#if NV_OS_WIN32
Sleep(ms);
#elif NV_OS_UNIX
usleep(1000 * ms);
#endif
}
/*static*/ void Thread::wait(Thread * threads, uint count)
{
/*#if NV_OS_WIN32
// @@ Is there any advantage in doing this?
nvDebugCheck(count < MAXIMUM_WAIT_OBJECTS);
HANDLE * handles = new HANDLE[count];
for (uint i = 0; i < count; i++) {
handles[i] = threads->p->thread;
}
DWORD result = WaitForMultipleObjects(count, handles, TRUE, INFINITE);
delete [] handles;
#else*/
for (uint i = 0; i < count; i++) {
threads[i].wait();
}
//#endif
}

46
src/nvthread/Thread.h Normal file
View File

@ -0,0 +1,46 @@
// This code is in the public domain -- castano@gmail.com
#pragma once
#ifndef NV_THREAD_THREAD_H
#define NV_THREAD_THREAD_H
#include "nvthread.h"
#include "nvcore/Ptr.h"
namespace nv
{
typedef void ThreadFunc(void * arg);
class NVTHREAD_CLASS Thread
{
NV_FORBID_COPY(Thread);
public:
Thread();
~Thread();
void start(ThreadFunc * func, void * arg);
void wait();
bool isRunning() const;
static void spinWait(uint count);
static void yield();
static void sleep(uint ms);
static void wait(Thread * threads, uint count);
private:
struct Private;
AutoPtr<Private> p;
public:
ThreadFunc * func;
void * arg;
};
} // nv namespace
#endif // NV_THREAD_THREAD_H

121
src/nvthread/ThreadPool.cpp Normal file
View File

@ -0,0 +1,121 @@
// This code is in the public domain -- castano@gmail.com
#include "ThreadPool.h"
#include "Mutex.h"
#include "Thread.h"
// Most of the time it's not necessary to protect the thread pool, but if it doesn't add a significant overhead, then it'd be safer to do it.
#define PROTECT_THREAD_POOL 1
using namespace nv;
#if PROTECT_THREAD_POOL
Mutex s_pool_mutex;
#endif
AutoPtr<ThreadPool> s_pool;
/*static*/ ThreadPool * ThreadPool::acquire()
{
#if PROTECT_THREAD_POOL
s_pool_mutex.lock(); // @@ If same thread tries to lock twice, this should assert.
#endif
if (s_pool == NULL) {
ThreadPool * p = new ThreadPool;
nvDebugCheck(s_pool == p);
}
return s_pool.ptr();
}
/*static*/ void ThreadPool::release(ThreadPool * pool)
{
nvDebugCheck(pool == s_pool);
// Make sure the threads of the pool are idle.
s_pool->wait();
#if PROTECT_THREAD_POOL
s_pool_mutex.unlock();
#endif
}
/*static*/ void ThreadPool::workerFunc(void * arg) {
uint i = (uint)arg;
while(true)
{
s_pool->startEvents[i].wait();
if (s_pool->func == NULL) {
return; // @@ should we post finish event anyway?
}
s_pool->func(s_pool->arg);
s_pool->finishEvents[i].post();
}
}
ThreadPool::ThreadPool()
{
s_pool = this; // Worker threads need this to be initialized before they start.
workerCount = nv::hardwareThreadCount();
workers = new Thread[workerCount];
startEvents = new Event[workerCount];
finishEvents = new Event[workerCount];
for (uint i = 0; i < workerCount; i++) {
workers[i].start(workerFunc, (void *)i);
}
allIdle = true;
}
ThreadPool::~ThreadPool()
{
// Set threads to terminate.
start(NULL, NULL);
// Wait until threads actually exit.
Thread::wait(workers, workerCount);
delete [] workers;
delete [] startEvents;
delete [] finishEvents;
}
void ThreadPool::start(ThreadFunc * func, void * arg)
{
// Wait until threads are idle.
wait();
// Set our desired function.
this->func = func;
this->arg = arg;
allIdle = false;
// Resume threads.
Event::post(startEvents, workerCount);
}
void ThreadPool::wait()
{
if (!allIdle)
{
// Wait for threads to complete.
Event::wait(finishEvents, workerCount);
allIdle = true;
}
}

49
src/nvthread/ThreadPool.h Normal file
View File

@ -0,0 +1,49 @@
// This code is in the public domain -- castano@gmail.com
#pragma once
#ifndef NV_THREAD_THREADPOOL_H
#define NV_THREAD_THREADPOOL_H
#include "nvthread.h"
#include "Event.h"
#include "Thread.h"
namespace nv {
class Thread;
class Event;
class ThreadPool {
NV_FORBID_COPY(ThreadPool);
public:
static ThreadPool * acquire();
static void release(ThreadPool *);
ThreadPool();
~ThreadPool();
void start(ThreadFunc * func, void * arg);
void wait();
private:
static void workerFunc(void * arg);
uint workerCount;
Thread * workers;
Event * startEvents;
Event * finishEvents;
uint allIdle;
// Current function:
ThreadFunc * func;
void * arg;
};
} // namespace nv
#endif // NV_THREAD_THREADPOOL_H

9
src/nvthread/Win32.h Normal file
View File

@ -0,0 +1,9 @@
// This code is in the public domain -- castano@gmail.com
// Never include this from a header file.
#define WIN32_LEAN_AND_MEAN
#define VC_EXTRALEAN
#define _WIN32_WINNT 0x0400 // for SwitchToThread, TryEnterCriticalSection
#include <windows.h>
//#include <process.h> // for _beginthreadex

51
src/nvthread/nvthread.cpp Normal file
View File

@ -0,0 +1,51 @@
#include "nvthread.h"
#include "Thread.h"
#define WIN32_LEAN_AND_MEAN
#define VC_EXTRALEAN
#include <windows.h>
using namespace nv;
// Find the number of cores in the system.
// Based on: http://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
// @@ Distinguish between logical and physical cores?
uint nv::hardwareThreadCount() {
#if NV_OS_WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo( &sysinfo );
return sysinfo.dwNumberOfProcessors;
#elif NV_OS_XBOX
return 3; // or 6?
#elif NV_OS_LINUX // Linux, Solaris, & AIX
return sysconf(_SC_NPROCESSORS_ONLN);
#elif NV_OS_DARWIN || NV_OS_FREEBSD
int numCPU;
int mib[4];
size_t len = sizeof(numCPU);
// set the mib for hw.ncpu
mib[0] = CTL_HW;
mib[1] = HW_AVAILCPU; // alternatively, try HW_NCPU;
// get the number of CPUs from the system
sysctl(mib, 2, &numCPU, &len, NULL, 0);
if (numCPU < 1) {
mib[1] = HW_NCPU;
sysctl( mib, 2, &numCPU, &len, NULL, 0 );
if (numCPU < 1) {
return 1; // Assume single core.
}
}
return numCPU;
#else
return 1; // Assume single core.
#endif
}

83
src/nvthread/nvthread.h Normal file
View File

@ -0,0 +1,83 @@
// This code is in the public domain -- castanyo@yahoo.es
#pragma once
#ifndef NV_THREAD_H
#define NV_THREAD_H
#include "nvcore/nvcore.h"
// Function linkage
#if NVTHREAD_SHARED
#ifdef NVTHREAD_EXPORTS
#define NVTHREAD_API DLL_EXPORT
#define NVTHREAD_CLASS DLL_EXPORT_CLASS
#else
#define NVTHREAD_API DLL_IMPORT
#define NVTHREAD_CLASS DLL_IMPORT
#endif
#else // NVMATH_SHARED
#define NVTHREAD_API
#define NVTHREAD_CLASS
#endif // NVMATH_SHARED
// Compiler barriers.
// See: http://en.wikipedia.org/wiki/Memory_ordering
#if NV_CC_MSVC
#include <intrin.h>
#pragma intrinsic(_WriteBarrier)
#define nvCompilerWriteBarrier _WriteBarrier
#pragma intrinsic(_ReadWriteBarrier)
#define nvCompilerReadWriteBarrier _ReadWriteBarrier
#if _MSC_VER >= 1400 // ReadBarrier is VC2005
#pragma intrinsic(_ReadBarrier)
#define nvCompilerReadBarrier _ReadBarrier
#else
#define nvCompilerReadBarrier _ReadWriteBarrier
#endif
#elif NV_CC_GNUC
#define nvCompilerReadWriteBarrier() asm volatile("" ::: "memory");
#define nvCompilerWriteBarrier nvCompilerReadWriteBarrier
#define nvCompilerReadBarrier nvCompilerReadWriteBarrier
#endif // NV_CC_MSVC
// @@ Memory barriers / fences.
// @@ Atomics.
/* Wrap this up:
#define YieldProcessor() __asm { rep nop }
#define YieldProcessor _mm_pause
#define YieldProcessor __yield
BOOL WINAPI SwitchToThread(void);
*/
namespace nv
{
// Reentrant.
uint hardwareThreadCount();
// Not thread-safe. Use from main thread only.
void initWorkers();
void shutWorkers();
void setWorkerFunction(void * func);
} // nv namespace
#endif // NV_THREAD_H