LCOV - code coverage report
Current view: top level - lunchbox - thread.cpp (source / functions) Hit Total Coverage
Test: Lunchbox Lines: 94 160 58.8 %
Date: 2017-08-03 05:21:41 Functions: 18 25 72.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Marwan Abdellah <marwan.abdellah@epfl.ch>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "thread.h"
      21             : 
      22             : #include "debug.h"
      23             : #include "log.h"
      24             : #include "monitor.h"
      25             : #include "os.h"
      26             : #include "rng.h"
      27             : #include "scopedMutex.h"
      28             : #include "sleep.h"
      29             : #include "spinLock.h"
      30             : 
      31             : #include <algorithm>
      32             : #include <boost/lexical_cast.hpp>
      33             : #include <errno.h>
      34             : #include <map>
      35             : #include <pthread.h>
      36             : #include <set>
      37             : 
      38             : // Experimental Win32 thread pinning
      39             : #ifdef _WIN32
      40             : //#  define LB_WIN32_THREAD_AFFINITY
      41             : #pragma message("Thread affinity not supported on WIN32")
      42             : #endif
      43             : 
      44             : #ifdef __linux__
      45             : #include <signal.h>
      46             : #include <sys/prctl.h>
      47             : #endif
      48             : 
      49             : #ifdef LUNCHBOX_USE_HWLOC
      50             : #include <hwloc.h>
      51             : #endif
      52             : 
      53             : #include "detail/threadID.h"
      54             : 
      55             : namespace lunchbox
      56             : {
      57             : namespace
      58             : {
      59          25 : a_int32_t _threadIDs;
      60             : 
      61             : enum ThreadState //!< The current state of a thread.
      62             : {
      63             :     STATE_STOPPED,
      64             :     STATE_STARTING, // start() in progress
      65             :     STATE_RUNNING,
      66             :     STATE_STOPPING // child no longer active, join() not yet called
      67             : };
      68             : 
      69             : #ifdef __linux__
      70             : typedef std::set<ThreadID> ThreadIDSet;
      71          25 : static Lockable<ThreadIDSet, SpinLock> _threads;
      72           0 : void _sigUserHandler(int, siginfo_t*, void*)
      73             : {
      74           0 :     LBERROR << ":" << backtrace << std::endl;
      75           0 : }
      76             : #endif
      77             : }
      78             : 
      79             : namespace detail
      80             : {
      81        1639 : class Thread
      82             : {
      83             : public:
      84        1639 :     Thread()
      85        1639 :         : state(STATE_STOPPED)
      86        1639 :         , index(++_threadIDs)
      87             :     {
      88        1639 :     }
      89             : 
      90             :     lunchbox::ThreadID id;
      91             :     Monitor<ThreadState> state;
      92             :     int32_t index;
      93             : };
      94             : }
      95             : 
      96        1639 : Thread::Thread()
      97        1639 :     : _impl(new detail::Thread)
      98             : {
      99        1639 : }
     100             : 
     101           0 : Thread::Thread(const Thread&)
     102           0 :     : _impl(new detail::Thread)
     103             : {
     104           0 : }
     105             : 
     106        3278 : Thread::~Thread()
     107             : {
     108        1639 :     delete _impl;
     109        1639 : }
     110             : 
     111         257 : bool Thread::isStopped() const
     112             : {
     113         257 :     return (_impl->state == STATE_STOPPED);
     114             : }
     115             : 
     116           1 : bool Thread::isRunning() const
     117             : {
     118           1 :     return (_impl->state == STATE_RUNNING);
     119             : }
     120             : 
     121        1663 : void* Thread::runChild(void* arg)
     122             : {
     123        1663 :     Thread* thread = static_cast<Thread*>(arg);
     124        1663 :     thread->_runChild();
     125           0 :     return 0; // not reached
     126             : }
     127             : 
     128        1663 : void Thread::_runChild()
     129             : {
     130        1663 :     setName(std::to_string(_impl->index));
     131        1663 :     _impl->id._impl->pthread = pthread_self();
     132             : #ifdef __linux__
     133             :     {
     134        3326 :         ScopedFastWrite mutex(_threads);
     135        1663 :         _threads->insert(_impl->id);
     136             :     }
     137             :     // install signal handler to dump thread state for debugging
     138             :     struct sigaction sa;
     139        1663 :     ::sigfillset(&sa.sa_mask);
     140        1663 :     sa.sa_flags = SA_SIGINFO;
     141        1663 :     sa.sa_sigaction = _sigUserHandler;
     142        1663 :     ::sigaction(SIGUSR1, &sa, 0);
     143             : #endif
     144             : 
     145        1663 :     if (!init())
     146             :     {
     147           5 :         LBWARN << "Thread " << className(this) << " failed to initialize"
     148           4 :                << std::endl;
     149           1 :         _impl->state = STATE_STOPPED;
     150           1 :         pthread_exit(0);
     151             :         LBUNREACHABLE;
     152             :     }
     153             : 
     154        1662 :     _impl->state = STATE_RUNNING;
     155        9969 :     LBDEBUG << "Thread #" << _impl->index << " type " << className(*this)
     156        8307 :             << " successfully initialized" << std::endl;
     157             : 
     158        1659 :     run();
     159        1386 :     LBVERB << "Thread " << className(this) << " finished" << std::endl;
     160             : #ifdef __linux__
     161             :     {
     162        2772 :         ScopedFastWrite mutex(_threads);
     163        1386 :         _threads->erase(_impl->id);
     164             :     }
     165             : #endif
     166        1386 :     this->exit();
     167             : 
     168           0 :     LBUNREACHABLE;
     169           0 : }
     170             : 
     171        1663 : bool Thread::start()
     172             : {
     173        1663 :     if (_impl->state != STATE_STOPPED)
     174           0 :         return false;
     175             : 
     176        1663 :     _impl->state = STATE_STARTING;
     177             : 
     178             :     pthread_attr_t attributes;
     179        1663 :     pthread_attr_init(&attributes);
     180        1663 :     pthread_attr_setscope(&attributes, PTHREAD_SCOPE_SYSTEM);
     181             : 
     182        1663 :     int nTries = 10;
     183        1663 :     while (nTries--)
     184             :     {
     185        1663 :         const int error = pthread_create(&_impl->id._impl->pthread, &attributes,
     186        1663 :                                          runChild, this);
     187        1663 :         if (error == 0) // succeeded
     188             :         {
     189        1663 :             LBVERB << "Created pthread " << this << std::endl;
     190        1663 :             break;
     191             :         }
     192           0 :         if (error != EAGAIN || nTries == 0)
     193             :         {
     194           0 :             LBWARN << "Could not create thread: " << strerror(error)
     195           0 :                    << std::endl;
     196           0 :             return false;
     197             :         }
     198           0 :         sleep(1); // Give EAGAIN some time to recover
     199             :     }
     200        1663 :     _impl->state.waitNE(STATE_STARTING);
     201             : 
     202             :     // avoid memleak, we don't use pthread_join
     203        1663 :     pthread_detach(_impl->id._impl->pthread);
     204        1663 :     return (_impl->state != STATE_STOPPED);
     205             : }
     206             : 
     207        1642 : void Thread::exit()
     208             : {
     209        1642 :     LBASSERTINFO(isCurrent(), "Thread::exit not called from child thread");
     210        1642 :     LBVERB << "Exiting thread " << className(this) << std::endl;
     211        1642 :     Log::instance().forceFlush();
     212        1642 :     Log::instance().exit();
     213             : 
     214        1642 :     _impl->state = STATE_STOPPING;
     215        1642 :     pthread_exit(0);
     216             :     LBUNREACHABLE;
     217             : }
     218             : 
     219          20 : void Thread::cancel()
     220             : {
     221          20 :     LBASSERTINFO(!isCurrent(), "Thread::cancel called from child thread");
     222             : 
     223          20 :     LBVERB << "Canceling thread " << className(this) << std::endl;
     224          20 :     _impl->state = STATE_STOPPING;
     225             : 
     226          20 :     const int error = pthread_cancel(_impl->id._impl->pthread);
     227          20 :     if (error != 0)
     228           0 :         LBWARN << "Could not cancel thread: " << strerror(error) << std::endl;
     229          20 : }
     230             : 
     231        1899 : bool Thread::join()
     232             : {
     233        1899 :     if (_impl->state == STATE_STOPPED)
     234           1 :         return false;
     235        1898 :     if (isCurrent()) // can't join self
     236         256 :         return false;
     237             : 
     238        1642 :     _impl->state.waitNE(STATE_RUNNING);
     239        1642 :     _impl->state = STATE_STOPPED;
     240             : 
     241        1642 :     LBVERB << "Joined thread " << className(this) << std::endl;
     242        1642 :     return true;
     243             : }
     244             : 
     245        3560 : bool Thread::isCurrent() const
     246             : {
     247        3560 :     return pthread_equal(pthread_self(), _impl->id._impl->pthread);
     248             : }
     249             : 
     250     1921421 : ThreadID Thread::getSelfThreadID()
     251             : {
     252     1921421 :     ThreadID threadID;
     253     1934545 :     threadID._impl->pthread = pthread_self();
     254     1934722 :     return threadID;
     255             : }
     256             : 
     257         964 : void Thread::yield()
     258             : {
     259             : #ifdef _MSC_VER
     260             :     ::Sleep(0); // sleeps thread
     261             : // or ::SwitchToThread() ? // switches to another waiting thread, if exists
     262             : #elif defined(__APPLE__)
     263             :     ::pthread_yield_np();
     264             : #else
     265         964 :     ::sched_yield();
     266             : #endif
     267         969 : }
     268             : 
     269             : #ifdef _MSC_VER
     270             : #ifndef MS_VC_EXCEPTION
     271             : #define MS_VC_EXCEPTION 0x406D1388
     272             : #endif
     273             : 
     274             : #pragma pack(push, 8)
     275             : typedef struct tagTHREADNAME_INFO
     276             : {
     277             :     DWORD dwType;     // Must be 0x1000.
     278             :     LPCSTR szName;    // Pointer to name (in user addr space).
     279             :     DWORD dwThreadID; // Thread ID (-1=caller thread).
     280             :     DWORD dwFlags;    // Reserved for future use, must be zero.
     281             : } THREADNAME_INFO;
     282             : #pragma pack(pop)
     283             : static void _setVCName(const char* name)
     284             : {
     285             :     ::Sleep(10);
     286             : 
     287             :     THREADNAME_INFO info;
     288             :     info.dwType = 0x1000;
     289             :     info.szName = name;
     290             :     info.dwThreadID = GetCurrentThreadId();
     291             :     info.dwFlags = 0;
     292             : 
     293             :     __try
     294             :     {
     295             :         RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR),
     296             :                        (ULONG_PTR*)&info);
     297             :     }
     298             :     __except (EXCEPTION_EXECUTE_HANDLER)
     299             :     {
     300             :     }
     301             : }
     302             : #endif
     303             : 
     304        1680 : void Thread::setName(const std::string& name)
     305             : {
     306        1680 :     Log::instance().setThreadName(name);
     307             : 
     308             : #ifdef _MSC_VER
     309             : #ifndef NDEBUG
     310             :     _setVCName(name.c_str());
     311             : #endif
     312             : #elif __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
     313             :     pthread_setname_np(name.c_str());
     314             : #elif defined(__linux__)
     315        1680 :     prctl(PR_SET_NAME, name.c_str(), 0, 0, 0);
     316             : #else
     317             :     // Not implemented
     318             :     LBVERB << "Thread::setName() not implemented" << std::endl;
     319             : #endif
     320        1680 : }
     321             : 
     322             : #ifdef LUNCHBOX_USE_HWLOC
     323           0 : static hwloc_bitmap_t _getCpuSet(const int32_t affinity,
     324             :                                  hwloc_topology_t topology)
     325             : {
     326           0 :     hwloc_bitmap_t cpuSet = hwloc_bitmap_alloc(); // HWloc CPU set
     327           0 :     hwloc_bitmap_zero(cpuSet);                    // Initialize to zeros
     328             : 
     329           0 :     if (affinity >= Thread::CORE)
     330             :     {
     331           0 :         const int32_t coreIndex = affinity - Thread::CORE;
     332           0 :         if (hwloc_get_obj_by_type(topology, HWLOC_OBJ_CORE, coreIndex) == 0)
     333             :         {
     334           0 :             LBWARN << "Core " << coreIndex << " does not exist in the topology"
     335           0 :                    << std::endl;
     336           0 :             return cpuSet;
     337             :         }
     338             : 
     339             :         // Getting the core object #coreIndex
     340             :         const hwloc_obj_t coreObj =
     341           0 :             hwloc_get_obj_by_type(topology, HWLOC_OBJ_CORE, coreIndex);
     342             :         // Get the CPU set associated with the specified core
     343           0 :         cpuSet = coreObj->allowed_cpuset;
     344           0 :         return cpuSet;
     345             :     }
     346             : 
     347           0 :     if (affinity == Thread::NONE)
     348           0 :         return cpuSet;
     349             : 
     350             :     // Sets the affinity to a specific CPU or "socket"
     351           0 :     LBASSERT(affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX);
     352           0 :     const int32_t socketIndex = affinity - Thread::SOCKET;
     353             : 
     354           0 :     if (hwloc_get_obj_by_type(topology, HWLOC_OBJ_SOCKET, socketIndex) == 0)
     355             :     {
     356           0 :         LBWARN << "Socket " << socketIndex << " does not exist in the topology"
     357           0 :                << std::endl;
     358           0 :         return cpuSet;
     359             :     }
     360             : 
     361             :     // Getting the CPU object #cpuIndex (subtree node)
     362             :     const hwloc_obj_t socketObj =
     363           0 :         hwloc_get_obj_by_type(topology, HWLOC_OBJ_SOCKET, socketIndex);
     364             :     // Get the CPU set associated with the specified socket
     365           0 :     hwloc_bitmap_copy(cpuSet, socketObj->allowed_cpuset);
     366           0 :     return cpuSet;
     367             : }
     368             : #endif
     369             : 
     370           0 : void Thread::setAffinity(const int32_t affinity)
     371             : {
     372           0 :     if (affinity == Thread::NONE)
     373           0 :         return;
     374             : 
     375             : #ifdef LUNCHBOX_USE_HWLOC
     376             :     hwloc_topology_t topology;
     377           0 :     hwloc_topology_init(&topology); // Allocate & initialize the topology
     378           0 :     hwloc_topology_load(topology);  // Perform HW topology detection
     379           0 :     const hwloc_bitmap_t cpuSet = _getCpuSet(affinity, topology);
     380             :     const int result =
     381           0 :         hwloc_set_cpubind(topology, cpuSet, HWLOC_CPUBIND_THREAD);
     382             :     char* cpuSetString;
     383           0 :     hwloc_bitmap_asprintf(&cpuSetString, cpuSet);
     384             : 
     385           0 :     if (result == 0)
     386             :     {
     387           0 :         LBVERB << "Bound to cpu set " << cpuSetString << std::endl;
     388             :     }
     389             :     else
     390             :     {
     391           0 :         LBWARN << "Error binding to cpu set " << cpuSetString << std::endl;
     392             :     }
     393           0 :     ::free(cpuSetString);
     394           0 :     hwloc_bitmap_free(cpuSet);
     395           0 :     hwloc_topology_destroy(topology);
     396             : 
     397             : #else
     398             :     LBWARN << "Thread::setAffinity not implemented, hwloc library missing"
     399             :            << std::endl;
     400             : #endif
     401             : }
     402             : 
     403           0 : void Thread::_dumpAll()
     404             : {
     405             : #ifdef __linux__
     406           0 :     ScopedFastRead mutex(_threads);
     407           0 :     for (ThreadIDSet::const_iterator i = _threads.data.begin();
     408           0 :          i != _threads.data.end(); ++i)
     409             :     {
     410           0 :         pthread_kill(i->_impl->pthread, SIGUSR1);
     411             :     }
     412             : #endif
     413           0 : }
     414             : 
     415           0 : std::ostream& operator<<(std::ostream& os, const Thread::Affinity affinity)
     416             : {
     417           0 :     if (affinity == Thread::NONE)
     418           0 :         return os << "No affinity";
     419           0 :     if (affinity >= Thread::CORE)
     420           0 :         return os << "Core " << affinity - Thread::CORE;
     421             : 
     422           0 :     LBASSERT(affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX);
     423           0 :     return os << "Socket " << affinity - Thread::SOCKET;
     424             : }
     425             : 
     426             : #if 0
     427             : std::ostream& operator << ( std::ostream& os, const Thread* thread )
     428             : {
     429             :     os << "Thread " << thread->_impl->id << " state "
     430             :        << ( thread->_impl->state == Thread::STATE_STOPPED  ? "stopped"  :
     431             :             thread->_impl->state == Thread::STATE_STARTING ? "starting" :
     432             :             thread->_impl->state == Thread::STATE_RUNNING  ? "running"  :
     433             :             thread->_impl->state == Thread::STATE_STOPPING ? "stopping" :
     434             :             "unknown" );
     435             : 
     436             : #ifdef PTW32_VERSION
     437             :     os << " called from " << pthread_self().p;
     438             : #else
     439             :     os << " called from " << pthread_self();
     440             : #endif
     441             : 
     442             :     return os;
     443             : }
     444             : #endif
     445          75 : }

Generated by: LCOV version 1.11