Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Marwan Abdellah <marwan.abdellah@epfl.ch>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "thread.h"
21 :
22 : #include "os.h"
23 : #include "debug.h"
24 : #include "lock.h"
25 : #include "log.h"
26 : #include "monitor.h"
27 : #include "rng.h"
28 : #include "scopedMutex.h"
29 : #include "sleep.h"
30 : #include "spinLock.h"
31 :
32 : #include <boost/lexical_cast.hpp>
33 : #include <errno.h>
34 : #include <pthread.h>
35 : #include <algorithm>
36 : #include <map>
37 :
38 : // Experimental Win32 thread pinning
39 : #ifdef _WIN32
40 : //# define LB_WIN32_THREAD_AFFINITY
41 : # pragma message ("Thread affinity not supported on WIN32")
42 : #endif
43 :
44 : #ifdef __linux__
45 : # include <sys/prctl.h>
46 : #endif
47 :
48 : #ifdef LUNCHBOX_USE_HWLOC
49 : # include <hwloc.h>
50 : #endif
51 :
52 : #include "detail/threadID.h"
53 :
54 : namespace lunchbox
55 : {
56 : namespace
57 : {
58 30 : a_int32_t _threadIDs;
59 :
60 : enum ThreadState //!< The current state of a thread.
61 : {
62 : STATE_STOPPED,
63 : STATE_STARTING, // start() in progress
64 : STATE_RUNNING,
65 : STATE_STOPPING // child no longer active, join() not yet called
66 : };
67 : }
68 :
69 : namespace detail
70 : {
71 4179 : class Thread
72 : {
73 : public:
74 4179 : Thread() : state( STATE_STOPPED ), index( ++_threadIDs ) {}
75 :
76 : lunchbox::ThreadID id;
77 : Monitor< ThreadState > state;
78 : int32_t index;
79 : };
80 : }
81 :
82 4179 : Thread::Thread()
83 4179 : : _impl( new detail::Thread )
84 : {
85 4179 : }
86 :
87 0 : Thread::Thread( const Thread& )
88 0 : : _impl( new detail::Thread )
89 : {
90 0 : }
91 :
92 4179 : Thread::~Thread()
93 : {
94 4179 : delete _impl;
95 4179 : }
96 :
97 257 : bool Thread::isStopped() const
98 : {
99 257 : return ( _impl->state == STATE_STOPPED );
100 : }
101 :
102 1 : bool Thread::isRunning() const
103 : {
104 1 : return ( _impl->state == STATE_RUNNING );
105 : }
106 :
107 3496 : void* Thread::runChild( void* arg )
108 : {
109 3496 : Thread* thread = static_cast<Thread*>(arg);
110 3496 : thread->_runChild();
111 0 : return 0; // not reached
112 : }
113 :
114 3496 : void Thread::_runChild()
115 : {
116 3496 : setName( boost::lexical_cast< std::string >( _impl->index ));
117 3496 : pinCurrentThread();
118 3496 : _impl->id._impl->pthread = pthread_self();
119 :
120 3496 : if( !init( ))
121 : {
122 5 : LBWARN << "Thread " << className( this ) << " failed to initialize"
123 4 : << std::endl;
124 1 : _impl->state = STATE_STOPPED;
125 1 : pthread_exit( 0 );
126 : LBUNREACHABLE;
127 : }
128 :
129 3495 : _impl->state = STATE_RUNNING;
130 17474 : LBINFO << "Thread #" << _impl->index << " type " << className( *this )
131 13979 : << " successfully initialized" << std::endl;
132 :
133 3494 : run();
134 3202 : LBVERB << "Thread " << className( this ) << " finished" << std::endl;
135 3202 : this->exit();
136 :
137 0 : LBUNREACHABLE;
138 0 : }
139 :
140 3496 : bool Thread::start()
141 : {
142 3496 : if( _impl->state != STATE_STOPPED )
143 0 : return false;
144 :
145 3496 : _impl->state = STATE_STARTING;
146 :
147 : pthread_attr_t attributes;
148 3496 : pthread_attr_init( &attributes );
149 3496 : pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
150 :
151 3496 : int nTries = 10;
152 6992 : while( nTries-- )
153 : {
154 : const int error = pthread_create( &_impl->id._impl->pthread,
155 3496 : &attributes, runChild, this );
156 3496 : if( error == 0 ) // succeeded
157 : {
158 3496 : LBVERB << "Created pthread " << this << std::endl;
159 3496 : break;
160 : }
161 0 : if( error != EAGAIN || nTries == 0 )
162 : {
163 0 : LBWARN << "Could not create thread: " << strerror( error )
164 0 : << std::endl;
165 0 : return false;
166 : }
167 0 : sleep( 1 ); // Give EAGAIN some time to recover
168 : }
169 :
170 : // avoid memleak, we don't use pthread_join
171 3496 : pthread_detach( _impl->id._impl->pthread );
172 3496 : _impl->state.waitNE( STATE_STARTING );
173 3496 : return (_impl->state != STATE_STOPPED);
174 : }
175 :
176 3451 : void Thread::exit()
177 : {
178 3451 : LBASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );
179 3445 : LBVERB << "Exiting thread " << className( this ) << std::endl;
180 3445 : Log::instance().forceFlush();
181 3451 : Log::instance().exit();
182 :
183 3451 : _impl->state = STATE_STOPPING;
184 3456 : pthread_exit( 0 );
185 : LBUNREACHABLE;
186 : }
187 :
188 30 : void Thread::cancel()
189 : {
190 30 : LBASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );
191 :
192 30 : LBVERB << "Canceling thread " << className( this ) << std::endl;
193 30 : _impl->state = STATE_STOPPING;
194 :
195 30 : const int error = pthread_cancel( _impl->id._impl->pthread );
196 30 : if( error != 0 )
197 0 : LBWARN << "Could not cancel thread: " << strerror( error ) << std::endl;
198 30 : }
199 :
200 3722 : bool Thread::join()
201 : {
202 3722 : if( _impl->state == STATE_STOPPED )
203 1 : return false;
204 3721 : if( isCurrent( )) // can't join self
205 256 : return false;
206 :
207 3465 : _impl->state.waitNE( STATE_RUNNING );
208 3465 : _impl->state = STATE_STOPPED;
209 :
210 3465 : LBVERB << "Joined thread " << className( this ) << std::endl;
211 3465 : return true;
212 : }
213 :
214 7197 : bool Thread::isCurrent() const
215 : {
216 7197 : return pthread_equal( pthread_self(), _impl->id._impl->pthread );
217 : }
218 :
219 2847383 : ThreadID Thread::getSelfThreadID()
220 : {
221 2847383 : ThreadID threadID;
222 2878202 : threadID._impl->pthread = pthread_self();
223 2876876 : return threadID;
224 : }
225 :
226 106825392 : void Thread::yield()
227 : {
228 : #ifdef _MSC_VER
229 : ::Sleep( 0 ); // sleeps thread
230 : // or ::SwitchToThread() ? // switches to another waiting thread, if exists
231 : #elif defined (__APPLE__)
232 : ::pthread_yield_np();
233 : #else
234 106825392 : ::sched_yield();
235 : #endif
236 123448871 : }
237 :
238 3501 : void Thread::pinCurrentThread()
239 : {
240 : #ifdef LB_WIN32_THREAD_AFFINITY
241 : static Lock lock;
242 : ScopedMutex<> mutex( lock );
243 :
244 : static DWORD_PTR processMask = 0;
245 : static DWORD_PTR processor = 0;
246 : if( processMask == 0 )
247 : {
248 : // Get available processors
249 : DWORD_PTR systemMask;
250 : if( GetProcessAffinityMask( GetCurrentProcess(), &processMask,
251 : &systemMask ) == 0 )
252 : {
253 : LBWARN << "Can't get usable processor mask" << std::endl;
254 : return;
255 : }
256 : LBINFO << "Available processors 0x" << hex << processMask << dec <<endl;
257 :
258 : // Choose random starting processor: Multiple Eq apps on the same node
259 : // would otherwise use the same processor for the same thread
260 : unsigned nProcessors = 0;
261 : for( DWORD_PTR i = 1; i != 0; i <<= 1 )
262 : {
263 : if( processMask & i )
264 : ++nProcessors;
265 : }
266 : LBINFO << nProcessors << " available processors" << std::endl;
267 :
268 : unsigned chance = RNG().get< unsigned >();
269 : processor = 1 << (chance % nProcessors);
270 : LBINFO << "Starting with processor " << processor << std::endl;
271 : }
272 : LBASSERT( processMask != 0 );
273 :
274 : while( true )
275 : {
276 : processor <<= 1;
277 : if( processor == 0 ) // wrap around
278 : processor = 1;
279 :
280 : if( processor & processMask ) // processor is available
281 : {
282 : if( SetThreadAffinityMask( GetCurrentThread(), processor ) == 0 )
283 : LBWARN << "Can't set thread processor" << std::endl;
284 : LBINFO << "Pinned thread to processor 0x" << hex << processor << dec
285 : << std::endl;
286 : return;
287 : }
288 : }
289 : #endif
290 3501 : }
291 :
292 : #ifdef _MSC_VER
293 : # ifndef MS_VC_EXCEPTION
294 : # define MS_VC_EXCEPTION 0x406D1388
295 : # endif
296 :
297 : # pragma pack(push,8)
298 : typedef struct tagTHREADNAME_INFO
299 : {
300 : DWORD dwType; // Must be 0x1000.
301 : LPCSTR szName; // Pointer to name (in user addr space).
302 : DWORD dwThreadID; // Thread ID (-1=caller thread).
303 : DWORD dwFlags; // Reserved for future use, must be zero.
304 : } THREADNAME_INFO;
305 : # pragma pack(pop)
306 : static void _setVCName( const char* name )
307 : {
308 : ::Sleep(10);
309 :
310 : THREADNAME_INFO info;
311 : info.dwType = 0x1000;
312 : info.szName = name;
313 : info.dwThreadID = GetCurrentThreadId();
314 : info.dwFlags = 0;
315 :
316 : __try
317 : {
318 : RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR),
319 : (ULONG_PTR*)&info );
320 : }
321 : __except(EXCEPTION_EXECUTE_HANDLER)
322 : {
323 : }
324 : }
325 : #endif
326 :
327 3525 : void Thread::setName( const std::string& name )
328 : {
329 3525 : Log::instance().setThreadName( name );
330 :
331 : #ifdef _MSC_VER
332 : # ifndef NDEBUG
333 : _setVCName( name.c_str( ));
334 : # endif
335 : #elif __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
336 : pthread_setname_np( name.c_str( ));
337 : #elif defined(__linux__)
338 3525 : prctl( PR_SET_NAME, name.c_str(), 0, 0, 0 );
339 : #else
340 : // Not implemented
341 : LBVERB << "Thread::setName() not implemented" << std::endl;
342 : #endif
343 3525 : }
344 :
345 : #ifdef LUNCHBOX_USE_HWLOC
346 0 : static hwloc_bitmap_t _getCpuSet( const int32_t affinity,
347 : hwloc_topology_t topology )
348 : {
349 0 : hwloc_bitmap_t cpuSet = hwloc_bitmap_alloc(); // HWloc CPU set
350 0 : hwloc_bitmap_zero( cpuSet ); // Initialize to zeros
351 :
352 0 : if( affinity >= Thread::CORE )
353 : {
354 0 : const int32_t coreIndex = affinity - Thread::CORE;
355 0 : if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_CORE, coreIndex ) == 0 )
356 : {
357 0 : LBWARN << "Core " << coreIndex << " does not exist in the topology"
358 0 : << std::endl;
359 0 : return cpuSet;
360 : }
361 :
362 : // Getting the core object #coreIndex
363 : const hwloc_obj_t coreObj = hwloc_get_obj_by_type( topology,
364 : HWLOC_OBJ_CORE,
365 0 : coreIndex );
366 : // Get the CPU set associated with the specified core
367 0 : cpuSet = coreObj->allowed_cpuset;
368 0 : return cpuSet;
369 : }
370 :
371 0 : if( affinity == Thread::NONE )
372 0 : return cpuSet;
373 :
374 : // Sets the affinity to a specific CPU or "socket"
375 0 : LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
376 0 : const int32_t socketIndex = affinity - Thread::SOCKET;
377 :
378 0 : if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_SOCKET, socketIndex ) == 0 )
379 : {
380 0 : LBWARN << "Socket " << socketIndex << " does not exist in the topology"
381 0 : << std::endl;
382 0 : return cpuSet;
383 : }
384 :
385 : // Getting the CPU object #cpuIndex (subtree node)
386 : const hwloc_obj_t socketObj = hwloc_get_obj_by_type( topology,
387 : HWLOC_OBJ_SOCKET,
388 0 : socketIndex );
389 : // Get the CPU set associated with the specified socket
390 0 : hwloc_bitmap_copy( cpuSet, socketObj->allowed_cpuset );
391 0 : return cpuSet;
392 : }
393 : #endif
394 :
395 0 : void Thread::setAffinity( const int32_t affinity )
396 : {
397 0 : if( affinity == Thread::NONE )
398 0 : return;
399 :
400 : #ifdef LUNCHBOX_USE_HWLOC
401 : hwloc_topology_t topology;
402 0 : hwloc_topology_init( &topology ); // Allocate & initialize the topology
403 0 : hwloc_topology_load( topology ); // Perform HW topology detection
404 0 : const hwloc_bitmap_t cpuSet = _getCpuSet( affinity, topology );
405 : const int result = hwloc_set_cpubind( topology, cpuSet,
406 0 : HWLOC_CPUBIND_THREAD );
407 : char* cpuSetString;
408 0 : hwloc_bitmap_asprintf( &cpuSetString, cpuSet );
409 :
410 0 : if( result == 0 )
411 : {
412 0 : LBVERB << "Bound to cpu set " << cpuSetString << std::endl;
413 : }
414 : else
415 : {
416 0 : LBWARN << "Error binding to cpu set " << cpuSetString << std::endl;
417 : }
418 0 : ::free( cpuSetString );
419 0 : hwloc_bitmap_free( cpuSet );
420 0 : hwloc_topology_destroy( topology );
421 :
422 : #else
423 : LBWARN << "Thread::setAffinity not implemented, hwloc library missing"
424 : << std::endl;
425 : #endif
426 : }
427 :
428 0 : std::ostream& operator << ( std::ostream& os, const Thread::Affinity affinity )
429 : {
430 0 : if( affinity == Thread::NONE )
431 0 : return os << "No affinity";
432 0 : if( affinity >= Thread::CORE )
433 0 : return os << "Core " << affinity - Thread::CORE;
434 :
435 0 : LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
436 0 : return os << "Socket " << affinity - Thread::SOCKET;
437 : }
438 :
439 : #if 0
440 : std::ostream& operator << ( std::ostream& os, const Thread* thread )
441 : {
442 : os << "Thread " << thread->_impl->id << " state "
443 : << ( thread->_impl->state == Thread::STATE_STOPPED ? "stopped" :
444 : thread->_impl->state == Thread::STATE_STARTING ? "starting" :
445 : thread->_impl->state == Thread::STATE_RUNNING ? "running" :
446 : thread->_impl->state == Thread::STATE_STOPPING ? "stopping" :
447 : "unknown" );
448 :
449 : #ifdef PTW32_VERSION
450 : os << " called from " << pthread_self().p;
451 : #else
452 : os << " called from " << pthread_self();
453 : #endif
454 :
455 : return os;
456 : }
457 : #endif
458 90 : }
|