PosixThreadSupport.cpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /*
  2. Bullet Continuous Collision Detection and Physics Library
  3. Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com
  4. This software is provided 'as-is', without any express or implied warranty.
  5. In no event will the authors be held liable for any damages arising from the use of this software.
  6. Permission is granted to anyone to use this software for any purpose,
  7. including commercial applications, and to alter it and redistribute it freely,
  8. subject to the following restrictions:
  9. 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
  10. 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
  11. 3. This notice may not be removed or altered from any source distribution.
  12. */
  13. #include <stdio.h>
  14. #include "PosixThreadSupport.h"
  15. #ifdef USE_PTHREADS
  16. #include <errno.h>
  17. #include <unistd.h>
  18. #include "SpuCollisionTaskProcess.h"
  19. #include "SpuNarrowPhaseCollisionTask/SpuGatheringCollisionTask.h"
  20. #define checkPThreadFunction(returnValue) \
  21. if(0 != returnValue) { \
  22. printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
  23. }
  24. // The number of threads should be equal to the number of available cores
  25. // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
  26. // PosixThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
  27. // Setup and initialize SPU/CELL/Libspe2
  28. PosixThreadSupport::PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo)
  29. {
  30. startThreads(threadConstructionInfo);
  31. }
  32. // cleanup/shutdown Libspe2
  33. PosixThreadSupport::~PosixThreadSupport()
  34. {
  35. stopSPU();
  36. }
  37. #if (defined (__APPLE__))
  38. #define NAMED_SEMAPHORES
  39. #endif
  40. // this semaphore will signal, if and how many threads are finished with their work
  41. static sem_t* mainSemaphore=0;
  42. static sem_t* createSem(const char* baseName)
  43. {
  44. static int semCount = 0;
  45. #ifdef NAMED_SEMAPHORES
  46. /// Named semaphore begin
  47. char name[32];
  48. snprintf(name, 32, "/%s-%d-%4.4d", baseName, getpid(), semCount++);
  49. sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
  50. if (tempSem != reinterpret_cast<sem_t *>(SEM_FAILED))
  51. {
  52. // printf("Created \"%s\" Semaphore %p\n", name, tempSem);
  53. }
  54. else
  55. {
  56. //printf("Error creating Semaphore %d\n", errno);
  57. exit(-1);
  58. }
  59. /// Named semaphore end
  60. #else
  61. sem_t* tempSem = new sem_t;
  62. checkPThreadFunction(sem_init(tempSem, 0, 0));
  63. #endif
  64. return tempSem;
  65. }
  66. static void destroySem(sem_t* semaphore)
  67. {
  68. #ifdef NAMED_SEMAPHORES
  69. checkPThreadFunction(sem_close(semaphore));
  70. #else
  71. checkPThreadFunction(sem_destroy(semaphore));
  72. delete semaphore;
  73. #endif
  74. }
  75. static void *threadFunction(void *argument)
  76. {
  77. PosixThreadSupport::btSpuStatus* status = (PosixThreadSupport::btSpuStatus*)argument;
  78. while (1)
  79. {
  80. checkPThreadFunction(sem_wait(status->startSemaphore));
  81. void* userPtr = status->m_userPtr;
  82. if (userPtr)
  83. {
  84. btAssert(status->m_status);
  85. status->m_userThreadFunc(userPtr,status->m_lsMemory);
  86. status->m_status = 2;
  87. checkPThreadFunction(sem_post(mainSemaphore));
  88. status->threadUsed++;
  89. } else {
  90. //exit Thread
  91. status->m_status = 3;
  92. checkPThreadFunction(sem_post(mainSemaphore));
  93. printf("Thread with taskId %i exiting\n",status->m_taskId);
  94. break;
  95. }
  96. }
  97. printf("Thread TERMINATED\n");
  98. return 0;
  99. }
  100. ///send messages to SPUs
  101. void PosixThreadSupport::sendRequest(uint32_t uiCommand, ppu_address_t uiArgument0, uint32_t taskId)
  102. {
  103. /// gMidphaseSPU.sendRequest(CMD_GATHER_AND_PROCESS_PAIRLIST, (uint32_t) &taskDesc);
  104. ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
  105. switch (uiCommand)
  106. {
  107. case CMD_GATHER_AND_PROCESS_PAIRLIST:
  108. {
  109. btSpuStatus& spuStatus = m_activeSpuStatus[taskId];
  110. btAssert(taskId >= 0);
  111. btAssert(taskId < m_activeSpuStatus.size());
  112. spuStatus.m_commandId = uiCommand;
  113. spuStatus.m_status = 1;
  114. spuStatus.m_userPtr = (void*)uiArgument0;
  115. // fire event to start new task
  116. checkPThreadFunction(sem_post(spuStatus.startSemaphore));
  117. break;
  118. }
  119. default:
  120. {
  121. ///not implemented
  122. btAssert(0);
  123. }
  124. };
  125. }
  126. ///check for messages from SPUs
  127. void PosixThreadSupport::waitForResponse(unsigned int *puiArgument0, unsigned int *puiArgument1)
  128. {
  129. ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
  130. ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
  131. btAssert(m_activeSpuStatus.size());
  132. // wait for any of the threads to finish
  133. checkPThreadFunction(sem_wait(mainSemaphore));
  134. // get at least one thread which has finished
  135. size_t last = -1;
  136. for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t) {
  137. if(2 == m_activeSpuStatus[t].m_status) {
  138. last = t;
  139. break;
  140. }
  141. }
  142. btSpuStatus& spuStatus = m_activeSpuStatus[last];
  143. btAssert(spuStatus.m_status > 1);
  144. spuStatus.m_status = 0;
  145. // need to find an active spu
  146. btAssert(last >= 0);
  147. *puiArgument0 = spuStatus.m_taskId;
  148. *puiArgument1 = spuStatus.m_status;
  149. }
  150. void PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructionInfo)
  151. {
  152. printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
  153. m_activeSpuStatus.resize(threadConstructionInfo.m_numThreads);
  154. mainSemaphore = createSem("main");
  155. //checkPThreadFunction(sem_wait(mainSemaphore));
  156. for (int i=0;i < threadConstructionInfo.m_numThreads;i++)
  157. {
  158. printf("starting thread %d\n",i);
  159. btSpuStatus& spuStatus = m_activeSpuStatus[i];
  160. spuStatus.startSemaphore = createSem("threadLocal");
  161. checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus));
  162. spuStatus.m_userPtr=0;
  163. spuStatus.m_taskId = i;
  164. spuStatus.m_commandId = 0;
  165. spuStatus.m_status = 0;
  166. spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
  167. spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
  168. spuStatus.threadUsed = 0;
  169. printf("started thread %d \n",i);
  170. }
  171. }
  172. void PosixThreadSupport::startSPU()
  173. {
  174. }
  175. ///tell the task scheduler we are done with the SPU tasks
  176. void PosixThreadSupport::stopSPU()
  177. {
  178. for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t)
  179. {
  180. btSpuStatus& spuStatus = m_activeSpuStatus[t];
  181. printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed);
  182. spuStatus.m_userPtr = 0;
  183. checkPThreadFunction(sem_post(spuStatus.startSemaphore));
  184. checkPThreadFunction(sem_wait(mainSemaphore));
  185. printf("destroy semaphore\n");
  186. destroySem(spuStatus.startSemaphore);
  187. printf("semaphore destroyed\n");
  188. checkPThreadFunction(pthread_join(spuStatus.thread,0));
  189. }
  190. printf("destroy main semaphore\n");
  191. destroySem(mainSemaphore);
  192. printf("main semaphore destroyed\n");
  193. m_activeSpuStatus.clear();
  194. }
  195. class PosixCriticalSection : public btCriticalSection
  196. {
  197. pthread_mutex_t m_mutex;
  198. public:
  199. PosixCriticalSection()
  200. {
  201. pthread_mutex_init(&m_mutex, NULL);
  202. }
  203. virtual ~PosixCriticalSection()
  204. {
  205. pthread_mutex_destroy(&m_mutex);
  206. }
  207. ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]);
  208. virtual unsigned int getSharedParam(int i)
  209. {
  210. return mCommonBuff[i];
  211. }
  212. virtual void setSharedParam(int i,unsigned int p)
  213. {
  214. mCommonBuff[i] = p;
  215. }
  216. virtual void lock()
  217. {
  218. pthread_mutex_lock(&m_mutex);
  219. }
  220. virtual void unlock()
  221. {
  222. pthread_mutex_unlock(&m_mutex);
  223. }
  224. };
  225. #if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
  226. /* OK to use barriers on this platform */
  227. class PosixBarrier : public btBarrier
  228. {
  229. pthread_barrier_t m_barr;
  230. int m_numThreads;
  231. public:
  232. PosixBarrier()
  233. :m_numThreads(0) { }
  234. virtual ~PosixBarrier() {
  235. pthread_barrier_destroy(&m_barr);
  236. }
  237. virtual void sync()
  238. {
  239. int rc = pthread_barrier_wait(&m_barr);
  240. if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
  241. {
  242. printf("Could not wait on barrier\n");
  243. exit(-1);
  244. }
  245. }
  246. virtual void setMaxCount(int numThreads)
  247. {
  248. int result = pthread_barrier_init(&m_barr, NULL, numThreads);
  249. m_numThreads = numThreads;
  250. btAssert(result==0);
  251. }
  252. virtual int getMaxCount()
  253. {
  254. return m_numThreads;
  255. }
  256. };
  257. #else
  258. /* Not OK to use barriers on this platform - insert alternate code here */
  259. class PosixBarrier : public btBarrier
  260. {
  261. pthread_mutex_t m_mutex;
  262. pthread_cond_t m_cond;
  263. int m_numThreads;
  264. int m_called;
  265. public:
  266. PosixBarrier()
  267. :m_numThreads(0)
  268. {
  269. }
  270. virtual ~PosixBarrier()
  271. {
  272. if (m_numThreads>0)
  273. {
  274. pthread_mutex_destroy(&m_mutex);
  275. pthread_cond_destroy(&m_cond);
  276. }
  277. }
  278. virtual void sync()
  279. {
  280. pthread_mutex_lock(&m_mutex);
  281. m_called++;
  282. if (m_called == m_numThreads) {
  283. m_called = 0;
  284. pthread_cond_broadcast(&m_cond);
  285. } else {
  286. pthread_cond_wait(&m_cond,&m_mutex);
  287. }
  288. pthread_mutex_unlock(&m_mutex);
  289. }
  290. virtual void setMaxCount(int numThreads)
  291. {
  292. if (m_numThreads>0)
  293. {
  294. pthread_mutex_destroy(&m_mutex);
  295. pthread_cond_destroy(&m_cond);
  296. }
  297. m_called = 0;
  298. pthread_mutex_init(&m_mutex,NULL);
  299. pthread_cond_init(&m_cond,NULL);
  300. m_numThreads = numThreads;
  301. }
  302. virtual int getMaxCount()
  303. {
  304. return m_numThreads;
  305. }
  306. };
  307. #endif//_POSIX_BARRIERS
  308. btBarrier* PosixThreadSupport::createBarrier()
  309. {
  310. PosixBarrier* barrier = new PosixBarrier();
  311. barrier->setMaxCount(getNumTasks());
  312. return barrier;
  313. }
  314. btCriticalSection* PosixThreadSupport::createCriticalSection()
  315. {
  316. return new PosixCriticalSection();
  317. }
  318. void PosixThreadSupport::deleteBarrier(btBarrier* barrier)
  319. {
  320. delete barrier;
  321. }
  322. void PosixThreadSupport::deleteCriticalSection(btCriticalSection* cs)
  323. {
  324. delete cs;
  325. }
  326. #endif // USE_PTHREADS