Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Marwan Abdellah <marwan.abdellah@epfl.ch>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "thread.h"
21 :
22 : #include "debug.h"
23 : #include "log.h"
24 : #include "monitor.h"
25 : #include "os.h"
26 : #include "rng.h"
27 : #include "scopedMutex.h"
28 : #include "sleep.h"
29 : #include "spinLock.h"
30 :
31 : #include <algorithm>
32 : #include <boost/lexical_cast.hpp>
33 : #include <errno.h>
34 : #include <map>
35 : #include <pthread.h>
36 : #include <set>
37 :
38 : // Experimental Win32 thread pinning
39 : #ifdef _WIN32
40 : //# define LB_WIN32_THREAD_AFFINITY
41 : #pragma message("Thread affinity not supported on WIN32")
42 : #endif
43 :
44 : #ifdef __linux__
45 : #include <signal.h>
46 : #include <sys/prctl.h>
47 : #endif
48 :
49 : #ifdef LUNCHBOX_USE_HWLOC
50 : #include <hwloc.h>
51 : #endif
52 :
53 : #include "detail/threadID.h"
54 :
55 : namespace lunchbox
56 : {
57 : namespace
58 : {
59 25 : a_int32_t _threadIDs;
60 :
61 : enum ThreadState //!< The current state of a thread.
62 : {
63 : STATE_STOPPED,
64 : STATE_STARTING, // start() in progress
65 : STATE_RUNNING,
66 : STATE_STOPPING // child no longer active, join() not yet called
67 : };
68 :
69 : #ifdef __linux__
70 : typedef std::set<ThreadID> ThreadIDSet;
71 25 : static Lockable<ThreadIDSet, SpinLock> _threads;
72 0 : void _sigUserHandler(int, siginfo_t*, void*)
73 : {
74 0 : LBERROR << ":" << backtrace << std::endl;
75 0 : }
76 : #endif
77 : }
78 :
79 : namespace detail
80 : {
81 1639 : class Thread
82 : {
83 : public:
84 1639 : Thread()
85 1639 : : state(STATE_STOPPED)
86 1639 : , index(++_threadIDs)
87 : {
88 1639 : }
89 :
90 : lunchbox::ThreadID id;
91 : Monitor<ThreadState> state;
92 : int32_t index;
93 : };
94 : }
95 :
96 1639 : Thread::Thread()
97 1639 : : _impl(new detail::Thread)
98 : {
99 1639 : }
100 :
101 0 : Thread::Thread(const Thread&)
102 0 : : _impl(new detail::Thread)
103 : {
104 0 : }
105 :
106 3278 : Thread::~Thread()
107 : {
108 1639 : delete _impl;
109 1639 : }
110 :
111 257 : bool Thread::isStopped() const
112 : {
113 257 : return (_impl->state == STATE_STOPPED);
114 : }
115 :
116 1 : bool Thread::isRunning() const
117 : {
118 1 : return (_impl->state == STATE_RUNNING);
119 : }
120 :
121 1663 : void* Thread::runChild(void* arg)
122 : {
123 1663 : Thread* thread = static_cast<Thread*>(arg);
124 1663 : thread->_runChild();
125 0 : return 0; // not reached
126 : }
127 :
128 1663 : void Thread::_runChild()
129 : {
130 1663 : setName(std::to_string(_impl->index));
131 1663 : _impl->id._impl->pthread = pthread_self();
132 : #ifdef __linux__
133 : {
134 3326 : ScopedFastWrite mutex(_threads);
135 1663 : _threads->insert(_impl->id);
136 : }
137 : // install signal handler to dump thread state for debugging
138 : struct sigaction sa;
139 1663 : ::sigfillset(&sa.sa_mask);
140 1663 : sa.sa_flags = SA_SIGINFO;
141 1663 : sa.sa_sigaction = _sigUserHandler;
142 1663 : ::sigaction(SIGUSR1, &sa, 0);
143 : #endif
144 :
145 1663 : if (!init())
146 : {
147 5 : LBWARN << "Thread " << className(this) << " failed to initialize"
148 4 : << std::endl;
149 1 : _impl->state = STATE_STOPPED;
150 1 : pthread_exit(0);
151 : LBUNREACHABLE;
152 : }
153 :
154 1662 : _impl->state = STATE_RUNNING;
155 9969 : LBDEBUG << "Thread #" << _impl->index << " type " << className(*this)
156 8307 : << " successfully initialized" << std::endl;
157 :
158 1659 : run();
159 1386 : LBVERB << "Thread " << className(this) << " finished" << std::endl;
160 : #ifdef __linux__
161 : {
162 2772 : ScopedFastWrite mutex(_threads);
163 1386 : _threads->erase(_impl->id);
164 : }
165 : #endif
166 1386 : this->exit();
167 :
168 0 : LBUNREACHABLE;
169 0 : }
170 :
171 1663 : bool Thread::start()
172 : {
173 1663 : if (_impl->state != STATE_STOPPED)
174 0 : return false;
175 :
176 1663 : _impl->state = STATE_STARTING;
177 :
178 : pthread_attr_t attributes;
179 1663 : pthread_attr_init(&attributes);
180 1663 : pthread_attr_setscope(&attributes, PTHREAD_SCOPE_SYSTEM);
181 :
182 1663 : int nTries = 10;
183 1663 : while (nTries--)
184 : {
185 1663 : const int error = pthread_create(&_impl->id._impl->pthread, &attributes,
186 1663 : runChild, this);
187 1663 : if (error == 0) // succeeded
188 : {
189 1663 : LBVERB << "Created pthread " << this << std::endl;
190 1663 : break;
191 : }
192 0 : if (error != EAGAIN || nTries == 0)
193 : {
194 0 : LBWARN << "Could not create thread: " << strerror(error)
195 0 : << std::endl;
196 0 : return false;
197 : }
198 0 : sleep(1); // Give EAGAIN some time to recover
199 : }
200 1663 : _impl->state.waitNE(STATE_STARTING);
201 :
202 : // avoid memleak, we don't use pthread_join
203 1663 : pthread_detach(_impl->id._impl->pthread);
204 1663 : return (_impl->state != STATE_STOPPED);
205 : }
206 :
207 1642 : void Thread::exit()
208 : {
209 1642 : LBASSERTINFO(isCurrent(), "Thread::exit not called from child thread");
210 1642 : LBVERB << "Exiting thread " << className(this) << std::endl;
211 1642 : Log::instance().forceFlush();
212 1642 : Log::instance().exit();
213 :
214 1642 : _impl->state = STATE_STOPPING;
215 1642 : pthread_exit(0);
216 : LBUNREACHABLE;
217 : }
218 :
219 20 : void Thread::cancel()
220 : {
221 20 : LBASSERTINFO(!isCurrent(), "Thread::cancel called from child thread");
222 :
223 20 : LBVERB << "Canceling thread " << className(this) << std::endl;
224 20 : _impl->state = STATE_STOPPING;
225 :
226 20 : const int error = pthread_cancel(_impl->id._impl->pthread);
227 20 : if (error != 0)
228 0 : LBWARN << "Could not cancel thread: " << strerror(error) << std::endl;
229 20 : }
230 :
231 1899 : bool Thread::join()
232 : {
233 1899 : if (_impl->state == STATE_STOPPED)
234 1 : return false;
235 1898 : if (isCurrent()) // can't join self
236 256 : return false;
237 :
238 1642 : _impl->state.waitNE(STATE_RUNNING);
239 1642 : _impl->state = STATE_STOPPED;
240 :
241 1642 : LBVERB << "Joined thread " << className(this) << std::endl;
242 1642 : return true;
243 : }
244 :
245 3560 : bool Thread::isCurrent() const
246 : {
247 3560 : return pthread_equal(pthread_self(), _impl->id._impl->pthread);
248 : }
249 :
250 2063788 : ThreadID Thread::getSelfThreadID()
251 : {
252 2063788 : ThreadID threadID;
253 2119825 : threadID._impl->pthread = pthread_self();
254 2119963 : return threadID;
255 : }
256 :
257 583 : void Thread::yield()
258 : {
259 : #ifdef _MSC_VER
260 : ::Sleep(0); // sleeps thread
261 : // or ::SwitchToThread() ? // switches to another waiting thread, if exists
262 : #elif defined(__APPLE__)
263 : ::pthread_yield_np();
264 : #else
265 583 : ::sched_yield();
266 : #endif
267 582 : }
268 :
269 : #ifdef _MSC_VER
270 : #ifndef MS_VC_EXCEPTION
271 : #define MS_VC_EXCEPTION 0x406D1388
272 : #endif
273 :
274 : #pragma pack(push, 8)
275 : typedef struct tagTHREADNAME_INFO
276 : {
277 : DWORD dwType; // Must be 0x1000.
278 : LPCSTR szName; // Pointer to name (in user addr space).
279 : DWORD dwThreadID; // Thread ID (-1=caller thread).
280 : DWORD dwFlags; // Reserved for future use, must be zero.
281 : } THREADNAME_INFO;
282 : #pragma pack(pop)
283 : static void _setVCName(const char* name)
284 : {
285 : ::Sleep(10);
286 :
287 : THREADNAME_INFO info;
288 : info.dwType = 0x1000;
289 : info.szName = name;
290 : info.dwThreadID = GetCurrentThreadId();
291 : info.dwFlags = 0;
292 :
293 : __try
294 : {
295 : RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR),
296 : (ULONG_PTR*)&info);
297 : }
298 : __except (EXCEPTION_EXECUTE_HANDLER)
299 : {
300 : }
301 : }
302 : #endif
303 :
304 1680 : void Thread::setName(const std::string& name)
305 : {
306 1680 : Log::instance().setThreadName(name);
307 :
308 : #ifdef _MSC_VER
309 : #ifndef NDEBUG
310 : _setVCName(name.c_str());
311 : #endif
312 : #elif __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
313 : pthread_setname_np(name.c_str());
314 : #elif defined(__linux__)
315 1680 : prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
316 : #else
317 : // Not implemented
318 : LBVERB << "Thread::setName() not implemented" << std::endl;
319 : #endif
320 1680 : }
321 :
322 : #ifdef LUNCHBOX_USE_HWLOC
323 0 : static hwloc_bitmap_t _getCpuSet(const int32_t affinity,
324 : hwloc_topology_t topology)
325 : {
326 0 : hwloc_bitmap_t cpuSet = hwloc_bitmap_alloc(); // HWloc CPU set
327 0 : hwloc_bitmap_zero(cpuSet); // Initialize to zeros
328 :
329 0 : if (affinity >= Thread::CORE)
330 : {
331 0 : const int32_t coreIndex = affinity - Thread::CORE;
332 0 : if (hwloc_get_obj_by_type(topology, HWLOC_OBJ_CORE, coreIndex) == 0)
333 : {
334 0 : LBWARN << "Core " << coreIndex << " does not exist in the topology"
335 0 : << std::endl;
336 0 : return cpuSet;
337 : }
338 :
339 : // Getting the core object #coreIndex
340 : const hwloc_obj_t coreObj =
341 0 : hwloc_get_obj_by_type(topology, HWLOC_OBJ_CORE, coreIndex);
342 : // Get the CPU set associated with the specified core
343 0 : cpuSet = coreObj->allowed_cpuset;
344 0 : return cpuSet;
345 : }
346 :
347 0 : if (affinity == Thread::NONE)
348 0 : return cpuSet;
349 :
350 : // Sets the affinity to a specific CPU or "socket"
351 0 : LBASSERT(affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX);
352 0 : const int32_t socketIndex = affinity - Thread::SOCKET;
353 :
354 0 : if (hwloc_get_obj_by_type(topology, HWLOC_OBJ_SOCKET, socketIndex) == 0)
355 : {
356 0 : LBWARN << "Socket " << socketIndex << " does not exist in the topology"
357 0 : << std::endl;
358 0 : return cpuSet;
359 : }
360 :
361 : // Getting the CPU object #cpuIndex (subtree node)
362 : const hwloc_obj_t socketObj =
363 0 : hwloc_get_obj_by_type(topology, HWLOC_OBJ_SOCKET, socketIndex);
364 : // Get the CPU set associated with the specified socket
365 0 : hwloc_bitmap_copy(cpuSet, socketObj->allowed_cpuset);
366 0 : return cpuSet;
367 : }
368 : #endif
369 :
370 0 : void Thread::setAffinity(const int32_t affinity)
371 : {
372 0 : if (affinity == Thread::NONE)
373 0 : return;
374 :
375 : #ifdef LUNCHBOX_USE_HWLOC
376 : hwloc_topology_t topology;
377 0 : hwloc_topology_init(&topology); // Allocate & initialize the topology
378 0 : hwloc_topology_load(topology); // Perform HW topology detection
379 0 : const hwloc_bitmap_t cpuSet = _getCpuSet(affinity, topology);
380 : const int result =
381 0 : hwloc_set_cpubind(topology, cpuSet, HWLOC_CPUBIND_THREAD);
382 : char* cpuSetString;
383 0 : hwloc_bitmap_asprintf(&cpuSetString, cpuSet);
384 :
385 0 : if (result == 0)
386 : {
387 0 : LBVERB << "Bound to cpu set " << cpuSetString << std::endl;
388 : }
389 : else
390 : {
391 0 : LBWARN << "Error binding to cpu set " << cpuSetString << std::endl;
392 : }
393 0 : ::free(cpuSetString);
394 0 : hwloc_bitmap_free(cpuSet);
395 0 : hwloc_topology_destroy(topology);
396 :
397 : #else
398 : LBWARN << "Thread::setAffinity not implemented, hwloc library missing"
399 : << std::endl;
400 : #endif
401 : }
402 :
403 0 : void Thread::_dumpAll()
404 : {
405 : #ifdef __linux__
406 0 : ScopedFastRead mutex(_threads);
407 0 : for (ThreadIDSet::const_iterator i = _threads.data.begin();
408 0 : i != _threads.data.end(); ++i)
409 : {
410 0 : pthread_kill(i->_impl->pthread, SIGUSR1);
411 : }
412 : #endif
413 0 : }
414 :
415 0 : std::ostream& operator<<(std::ostream& os, const Thread::Affinity affinity)
416 : {
417 0 : if (affinity == Thread::NONE)
418 0 : return os << "No affinity";
419 0 : if (affinity >= Thread::CORE)
420 0 : return os << "Core " << affinity - Thread::CORE;
421 :
422 0 : LBASSERT(affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX);
423 0 : return os << "Socket " << affinity - Thread::SOCKET;
424 : }
425 :
426 : #if 0
427 : std::ostream& operator << ( std::ostream& os, const Thread* thread )
428 : {
429 : os << "Thread " << thread->_impl->id << " state "
430 : << ( thread->_impl->state == Thread::STATE_STOPPED ? "stopped" :
431 : thread->_impl->state == Thread::STATE_STARTING ? "starting" :
432 : thread->_impl->state == Thread::STATE_RUNNING ? "running" :
433 : thread->_impl->state == Thread::STATE_STOPPING ? "stopping" :
434 : "unknown" );
435 :
436 : #ifdef PTW32_VERSION
437 : os << " called from " << pthread_self().p;
438 : #else
439 : os << " called from " << pthread_self();
440 : #endif
441 :
442 : return os;
443 : }
444 : #endif
445 75 : }
|