Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Marwan Abdellah <marwan.abdellah@epfl.ch>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "thread.h"
21 :
22 : #include "os.h"
23 : #include "debug.h"
24 : #include "lock.h"
25 : #include "log.h"
26 : #include "monitor.h"
27 : #include "rng.h"
28 : #include "scopedMutex.h"
29 : #include "sleep.h"
30 : #include "spinLock.h"
31 :
32 : #include <boost/lexical_cast.hpp>
33 : #include <errno.h>
34 : #include <pthread.h>
35 : #include <algorithm>
36 : #include <map>
37 : #include <set>
38 :
39 : // Experimental Win32 thread pinning
40 : #ifdef _WIN32
41 : //# define LB_WIN32_THREAD_AFFINITY
42 : # pragma message ("Thread affinity not supported on WIN32")
43 : #endif
44 :
45 : #ifdef __linux__
46 : # include <signal.h>
47 : # include <sys/prctl.h>
48 : #endif
49 :
50 : #ifdef LUNCHBOX_USE_HWLOC
51 : # include <hwloc.h>
52 : #endif
53 :
54 : #include "detail/threadID.h"
55 :
56 : namespace lunchbox
57 : {
58 : namespace
59 : {
60 24 : a_int32_t _threadIDs;
61 :
62 : enum ThreadState //!< The current state of a thread.
63 : {
64 : STATE_STOPPED,
65 : STATE_STARTING, // start() in progress
66 : STATE_RUNNING,
67 : STATE_STOPPING // child no longer active, join() not yet called
68 : };
69 :
70 : #ifdef __linux__
71 : typedef std::set< ThreadID > ThreadIDSet;
72 24 : static Lockable< ThreadIDSet, SpinLock > _threads;
73 0 : void _sigUserHandler( int, siginfo_t*, void* )
74 : {
75 0 : LBERROR << ":" << backtrace << std::endl;
76 0 : }
77 : #endif
78 :
79 : }
80 :
81 : namespace detail
82 : {
83 1640 : class Thread
84 : {
85 : public:
86 1640 : Thread() : state( STATE_STOPPED ), index( ++_threadIDs ) {}
87 :
88 : lunchbox::ThreadID id;
89 : Monitor< ThreadState > state;
90 : int32_t index;
91 : };
92 : }
93 :
94 1640 : Thread::Thread()
95 1640 : : _impl( new detail::Thread )
96 : {
97 1640 : }
98 :
99 0 : Thread::Thread( const Thread& )
100 0 : : _impl( new detail::Thread )
101 : {
102 0 : }
103 :
104 3280 : Thread::~Thread()
105 : {
106 1640 : delete _impl;
107 1640 : }
108 :
109 257 : bool Thread::isStopped() const
110 : {
111 257 : return ( _impl->state == STATE_STOPPED );
112 : }
113 :
114 1 : bool Thread::isRunning() const
115 : {
116 1 : return ( _impl->state == STATE_RUNNING );
117 : }
118 :
119 1664 : void* Thread::runChild( void* arg )
120 : {
121 1664 : Thread* thread = static_cast<Thread*>(arg);
122 1664 : thread->_runChild();
123 0 : return 0; // not reached
124 : }
125 :
126 1664 : void Thread::_runChild()
127 : {
128 1664 : setName( boost::lexical_cast< std::string >( _impl->index ));
129 1664 : _impl->id._impl->pthread = pthread_self();
130 : #ifdef __linux__
131 : {
132 3328 : ScopedFastWrite mutex( _threads );
133 1664 : _threads->insert( _impl->id );
134 : }
135 : // install signal handler to dump thread state for debugging
136 : struct sigaction sa;
137 1664 : ::sigfillset( &sa.sa_mask );
138 1664 : sa.sa_flags = SA_SIGINFO;
139 1664 : sa.sa_sigaction = _sigUserHandler;
140 1664 : ::sigaction( SIGUSR1, &sa, 0 );
141 : #endif
142 :
143 1664 : if( !init( ))
144 : {
145 5 : LBWARN << "Thread " << className( this ) << " failed to initialize"
146 4 : << std::endl;
147 1 : _impl->state = STATE_STOPPED;
148 1 : pthread_exit( 0 );
149 : LBUNREACHABLE;
150 : }
151 :
152 1663 : _impl->state = STATE_RUNNING;
153 9976 : LBDEBUG << "Thread #" << _impl->index << " type " << className( *this )
154 8313 : << " successfully initialized" << std::endl;
155 :
156 1661 : run();
157 1386 : LBVERB << "Thread " << className( this ) << " finished" << std::endl;
158 : #ifdef __linux__
159 : {
160 2772 : ScopedFastWrite mutex( _threads );
161 1386 : _threads->erase( _impl->id );
162 : }
163 : #endif
164 1386 : this->exit();
165 :
166 0 : LBUNREACHABLE;
167 0 : }
168 :
169 1664 : bool Thread::start()
170 : {
171 1664 : if( _impl->state != STATE_STOPPED )
172 0 : return false;
173 :
174 1664 : _impl->state = STATE_STARTING;
175 :
176 : pthread_attr_t attributes;
177 1664 : pthread_attr_init( &attributes );
178 1664 : pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
179 :
180 1664 : int nTries = 10;
181 1664 : while( nTries-- )
182 : {
183 1664 : const int error = pthread_create( &_impl->id._impl->pthread,
184 1664 : &attributes, runChild, this );
185 1664 : if( error == 0 ) // succeeded
186 : {
187 1664 : LBVERB << "Created pthread " << this << std::endl;
188 1664 : break;
189 : }
190 0 : if( error != EAGAIN || nTries == 0 )
191 : {
192 0 : LBWARN << "Could not create thread: " << strerror( error )
193 0 : << std::endl;
194 0 : return false;
195 : }
196 0 : sleep( 1 ); // Give EAGAIN some time to recover
197 : }
198 :
199 : // avoid memleak, we don't use pthread_join
200 1664 : pthread_detach( _impl->id._impl->pthread );
201 1664 : _impl->state.waitNE( STATE_STARTING );
202 1664 : return (_impl->state != STATE_STOPPED);
203 : }
204 :
205 1642 : void Thread::exit()
206 : {
207 1642 : LBASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );
208 1642 : LBVERB << "Exiting thread " << className( this ) << std::endl;
209 1642 : Log::instance().forceFlush();
210 1642 : Log::instance().exit();
211 :
212 1641 : _impl->state = STATE_STOPPING;
213 1642 : pthread_exit( 0 );
214 : LBUNREACHABLE;
215 : }
216 :
217 21 : void Thread::cancel()
218 : {
219 21 : LBASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );
220 :
221 21 : LBVERB << "Canceling thread " << className( this ) << std::endl;
222 21 : _impl->state = STATE_STOPPING;
223 :
224 21 : const int error = pthread_cancel( _impl->id._impl->pthread );
225 21 : if( error != 0 )
226 0 : LBWARN << "Could not cancel thread: " << strerror( error ) << std::endl;
227 21 : }
228 :
229 1899 : bool Thread::join()
230 : {
231 1899 : if( _impl->state == STATE_STOPPED )
232 1 : return false;
233 1898 : if( isCurrent( )) // can't join self
234 256 : return false;
235 :
236 1642 : _impl->state.waitNE( STATE_RUNNING );
237 1642 : _impl->state = STATE_STOPPED;
238 :
239 1642 : LBVERB << "Joined thread " << className( this ) << std::endl;
240 1642 : return true;
241 : }
242 :
243 3561 : bool Thread::isCurrent() const
244 : {
245 3561 : return pthread_equal( pthread_self(), _impl->id._impl->pthread );
246 : }
247 :
248 2296948 : ThreadID Thread::getSelfThreadID()
249 : {
250 2296948 : ThreadID threadID;
251 2315459 : threadID._impl->pthread = pthread_self();
252 2315585 : return threadID;
253 : }
254 :
255 1008 : void Thread::yield()
256 : {
257 : #ifdef _MSC_VER
258 : ::Sleep( 0 ); // sleeps thread
259 : // or ::SwitchToThread() ? // switches to another waiting thread, if exists
260 : #elif defined (__APPLE__)
261 : ::pthread_yield_np();
262 : #else
263 1008 : ::sched_yield();
264 : #endif
265 1011 : }
266 :
267 : #ifdef _MSC_VER
268 : # ifndef MS_VC_EXCEPTION
269 : # define MS_VC_EXCEPTION 0x406D1388
270 : # endif
271 :
272 : # pragma pack(push,8)
273 : typedef struct tagTHREADNAME_INFO
274 : {
275 : DWORD dwType; // Must be 0x1000.
276 : LPCSTR szName; // Pointer to name (in user addr space).
277 : DWORD dwThreadID; // Thread ID (-1=caller thread).
278 : DWORD dwFlags; // Reserved for future use, must be zero.
279 : } THREADNAME_INFO;
280 : # pragma pack(pop)
281 : static void _setVCName( const char* name )
282 : {
283 : ::Sleep(10);
284 :
285 : THREADNAME_INFO info;
286 : info.dwType = 0x1000;
287 : info.szName = name;
288 : info.dwThreadID = GetCurrentThreadId();
289 : info.dwFlags = 0;
290 :
291 : __try
292 : {
293 : RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR),
294 : (ULONG_PTR*)&info );
295 : }
296 : __except(EXCEPTION_EXECUTE_HANDLER)
297 : {
298 : }
299 : }
300 : #endif
301 :
302 1683 : void Thread::setName( const std::string& name )
303 : {
304 1683 : Log::instance().setThreadName( name );
305 :
306 : #ifdef _MSC_VER
307 : # ifndef NDEBUG
308 : _setVCName( name.c_str( ));
309 : # endif
310 : #elif __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
311 : pthread_setname_np( name.c_str( ));
312 : #elif defined(__linux__)
313 1683 : prctl( PR_SET_NAME, name.c_str(), 0, 0, 0 );
314 : #else
315 : // Not implemented
316 : LBVERB << "Thread::setName() not implemented" << std::endl;
317 : #endif
318 1683 : }
319 :
320 : #ifdef LUNCHBOX_USE_HWLOC
321 0 : static hwloc_bitmap_t _getCpuSet( const int32_t affinity,
322 : hwloc_topology_t topology )
323 : {
324 0 : hwloc_bitmap_t cpuSet = hwloc_bitmap_alloc(); // HWloc CPU set
325 0 : hwloc_bitmap_zero( cpuSet ); // Initialize to zeros
326 :
327 0 : if( affinity >= Thread::CORE )
328 : {
329 0 : const int32_t coreIndex = affinity - Thread::CORE;
330 0 : if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_CORE, coreIndex ) == 0 )
331 : {
332 0 : LBWARN << "Core " << coreIndex << " does not exist in the topology"
333 0 : << std::endl;
334 0 : return cpuSet;
335 : }
336 :
337 : // Getting the core object #coreIndex
338 0 : const hwloc_obj_t coreObj = hwloc_get_obj_by_type( topology,
339 : HWLOC_OBJ_CORE,
340 0 : coreIndex );
341 : // Get the CPU set associated with the specified core
342 0 : cpuSet = coreObj->allowed_cpuset;
343 0 : return cpuSet;
344 : }
345 :
346 0 : if( affinity == Thread::NONE )
347 0 : return cpuSet;
348 :
349 : // Sets the affinity to a specific CPU or "socket"
350 0 : LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
351 0 : const int32_t socketIndex = affinity - Thread::SOCKET;
352 :
353 0 : if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_SOCKET, socketIndex ) == 0 )
354 : {
355 0 : LBWARN << "Socket " << socketIndex << " does not exist in the topology"
356 0 : << std::endl;
357 0 : return cpuSet;
358 : }
359 :
360 : // Getting the CPU object #cpuIndex (subtree node)
361 0 : const hwloc_obj_t socketObj = hwloc_get_obj_by_type( topology,
362 : HWLOC_OBJ_SOCKET,
363 0 : socketIndex );
364 : // Get the CPU set associated with the specified socket
365 0 : hwloc_bitmap_copy( cpuSet, socketObj->allowed_cpuset );
366 0 : return cpuSet;
367 : }
368 : #endif
369 :
370 0 : void Thread::setAffinity( const int32_t affinity )
371 : {
372 0 : if( affinity == Thread::NONE )
373 0 : return;
374 :
375 : #ifdef LUNCHBOX_USE_HWLOC
376 : hwloc_topology_t topology;
377 0 : hwloc_topology_init( &topology ); // Allocate & initialize the topology
378 0 : hwloc_topology_load( topology ); // Perform HW topology detection
379 0 : const hwloc_bitmap_t cpuSet = _getCpuSet( affinity, topology );
380 0 : const int result = hwloc_set_cpubind( topology, cpuSet,
381 0 : HWLOC_CPUBIND_THREAD );
382 : char* cpuSetString;
383 0 : hwloc_bitmap_asprintf( &cpuSetString, cpuSet );
384 :
385 0 : if( result == 0 )
386 : {
387 0 : LBVERB << "Bound to cpu set " << cpuSetString << std::endl;
388 : }
389 : else
390 : {
391 0 : LBWARN << "Error binding to cpu set " << cpuSetString << std::endl;
392 : }
393 0 : ::free( cpuSetString );
394 0 : hwloc_bitmap_free( cpuSet );
395 0 : hwloc_topology_destroy( topology );
396 :
397 : #else
398 : LBWARN << "Thread::setAffinity not implemented, hwloc library missing"
399 : << std::endl;
400 : #endif
401 : }
402 :
403 0 : void Thread::_dumpAll()
404 : {
405 : #ifdef __linux__
406 0 : ScopedFastRead mutex( _threads );
407 0 : for( ThreadIDSet::const_iterator i = _threads.data.begin();
408 0 : i != _threads.data.end(); ++i )
409 : {
410 0 : pthread_kill( i->_impl->pthread, SIGUSR1 );
411 : }
412 : #endif
413 0 : }
414 :
415 0 : std::ostream& operator << ( std::ostream& os, const Thread::Affinity affinity )
416 : {
417 0 : if( affinity == Thread::NONE )
418 0 : return os << "No affinity";
419 0 : if( affinity >= Thread::CORE )
420 0 : return os << "Core " << affinity - Thread::CORE;
421 :
422 0 : LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
423 0 : return os << "Socket " << affinity - Thread::SOCKET;
424 : }
425 :
426 : #if 0
427 : std::ostream& operator << ( std::ostream& os, const Thread* thread )
428 : {
429 : os << "Thread " << thread->_impl->id << " state "
430 : << ( thread->_impl->state == Thread::STATE_STOPPED ? "stopped" :
431 : thread->_impl->state == Thread::STATE_STARTING ? "starting" :
432 : thread->_impl->state == Thread::STATE_RUNNING ? "running" :
433 : thread->_impl->state == Thread::STATE_STOPPING ? "stopping" :
434 : "unknown" );
435 :
436 : #ifdef PTW32_VERSION
437 : os << " called from " << pthread_self().p;
438 : #else
439 : os << " called from " << pthread_self();
440 : #endif
441 :
442 : return os;
443 : }
444 : #endif
445 72 : }
|