LCOV - code coverage report
Current view: top level - lunchbox - thread.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 84 141 59.6 %
Date: 2014-10-01 Functions: 19 24 79.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2012, Marwan Abdellah <marwan.abdellah@epfl.ch>
       4             :  *               2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "thread.h"
      21             : 
      22             : #include "os.h"
      23             : #include "debug.h"
      24             : #include "lock.h"
      25             : #include "log.h"
      26             : #include "monitor.h"
      27             : #include "rng.h"
      28             : #include "scopedMutex.h"
      29             : #include "sleep.h"
      30             : #include "spinLock.h"
      31             : 
      32             : #include <boost/lexical_cast.hpp>
      33             : #include <errno.h>
      34             : #include <pthread.h>
      35             : #include <algorithm>
      36             : #include <map>
      37             : 
      38             : // Experimental Win32 thread pinning
      39             : #ifdef _WIN32
      40             : //#  define LB_WIN32_THREAD_AFFINITY
      41             : #  pragma message ("Thread affinity not supported on WIN32")
      42             : #endif
      43             : 
      44             : #ifdef __linux__
      45             : #  include <sys/prctl.h>
      46             : #endif
      47             : 
      48             : #ifdef LUNCHBOX_USE_HWLOC
      49             : #  include <hwloc.h>
      50             : #endif
      51             : 
      52             : #include "detail/threadID.h"
      53             : 
      54             : namespace lunchbox
      55             : {
      56             : namespace
      57             : {
      58          30 : a_int32_t _threadIDs;
      59             : 
      60             : enum ThreadState //!< The current state of a thread.
      61             : {
      62             :     STATE_STOPPED,
      63             :     STATE_STARTING, // start() in progress
      64             :     STATE_RUNNING,
      65             :     STATE_STOPPING  // child no longer active, join() not yet called
      66             : };
      67             : }
      68             : 
      69             : namespace detail
      70             : {
      71        4179 : class Thread
      72             : {
      73             : public:
      74        4179 :     Thread() : state( STATE_STOPPED ), index( ++_threadIDs ) {}
      75             : 
      76             :     lunchbox::ThreadID id;
      77             :     Monitor< ThreadState > state;
      78             :     int32_t index;
      79             : };
      80             : }
      81             : 
      82        4179 : Thread::Thread()
      83        4179 :     : _impl( new detail::Thread )
      84             : {
      85        4179 : }
      86             : 
      87           0 : Thread::Thread( const Thread& )
      88           0 :     : _impl( new detail::Thread )
      89             : {
      90           0 : }
      91             : 
      92        4179 : Thread::~Thread()
      93             : {
      94        4179 :     delete _impl;
      95        4179 : }
      96             : 
      97         257 : bool Thread::isStopped() const
      98             : {
      99         257 :     return ( _impl->state == STATE_STOPPED );
     100             : }
     101             : 
     102           1 : bool Thread::isRunning() const
     103             : {
     104           1 :     return ( _impl->state == STATE_RUNNING );
     105             : }
     106             : 
     107        3496 : void* Thread::runChild( void* arg )
     108             : {
     109        3496 :     Thread* thread = static_cast<Thread*>(arg);
     110        3496 :     thread->_runChild();
     111           0 :     return 0; // not reached
     112             : }
     113             : 
     114        3496 : void Thread::_runChild()
     115             : {
     116        3496 :     setName( boost::lexical_cast< std::string >( _impl->index ));
     117        3496 :     pinCurrentThread();
     118        3496 :     _impl->id._impl->pthread = pthread_self();
     119             : 
     120        3496 :     if( !init( ))
     121             :     {
     122           5 :         LBWARN << "Thread " << className( this ) << " failed to initialize"
     123           4 :                << std::endl;
     124           1 :         _impl->state = STATE_STOPPED;
     125           1 :         pthread_exit( 0 );
     126             :         LBUNREACHABLE;
     127             :     }
     128             : 
     129        3495 :     _impl->state = STATE_RUNNING;
     130       17474 :     LBINFO << "Thread #" << _impl->index << " type " << className( *this )
     131       13979 :            << " successfully initialized" << std::endl;
     132             : 
     133        3494 :     run();
     134        3202 :     LBVERB << "Thread " << className( this ) << " finished" << std::endl;
     135        3202 :     this->exit();
     136             : 
     137           0 :     LBUNREACHABLE;
     138           0 : }
     139             : 
     140        3496 : bool Thread::start()
     141             : {
     142        3496 :     if( _impl->state != STATE_STOPPED )
     143           0 :         return false;
     144             : 
     145        3496 :     _impl->state = STATE_STARTING;
     146             : 
     147             :     pthread_attr_t attributes;
     148        3496 :     pthread_attr_init( &attributes );
     149        3496 :     pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
     150             : 
     151        3496 :     int nTries = 10;
     152        6992 :     while( nTries-- )
     153             :     {
     154             :         const int error = pthread_create( &_impl->id._impl->pthread,
     155        3496 :                                           &attributes, runChild, this );
     156        3496 :         if( error == 0 ) // succeeded
     157             :         {
     158        3496 :             LBVERB << "Created pthread " << this << std::endl;
     159        3496 :             break;
     160             :         }
     161           0 :         if( error != EAGAIN || nTries == 0 )
     162             :         {
     163           0 :             LBWARN << "Could not create thread: " << strerror( error )
     164           0 :                    << std::endl;
     165           0 :             return false;
     166             :         }
     167           0 :         sleep( 1 ); // Give EAGAIN some time to recover
     168             :     }
     169             : 
     170             :     // avoid memleak, we don't use pthread_join
     171        3496 :     pthread_detach( _impl->id._impl->pthread );
     172        3496 :     _impl->state.waitNE( STATE_STARTING );
     173        3496 :     return (_impl->state != STATE_STOPPED);
     174             : }
     175             : 
     176        3451 : void Thread::exit()
     177             : {
     178        3451 :     LBASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );
     179        3445 :     LBVERB << "Exiting thread " << className( this ) << std::endl;
     180        3445 :     Log::instance().forceFlush();
     181        3451 :     Log::instance().exit();
     182             : 
     183        3451 :     _impl->state = STATE_STOPPING;
     184        3456 :     pthread_exit( 0 );
     185             :     LBUNREACHABLE;
     186             : }
     187             : 
     188          30 : void Thread::cancel()
     189             : {
     190          30 :     LBASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );
     191             : 
     192          30 :     LBVERB << "Canceling thread " << className( this ) << std::endl;
     193          30 :     _impl->state = STATE_STOPPING;
     194             : 
     195          30 :     const int error = pthread_cancel( _impl->id._impl->pthread );
     196          30 :     if( error !=  0 )
     197           0 :         LBWARN << "Could not cancel thread: " << strerror( error ) << std::endl;
     198          30 : }
     199             : 
     200        3722 : bool Thread::join()
     201             : {
     202        3722 :     if( _impl->state == STATE_STOPPED )
     203           1 :         return false;
     204        3721 :     if( isCurrent( )) // can't join self
     205         256 :         return false;
     206             : 
     207        3465 :     _impl->state.waitNE( STATE_RUNNING );
     208        3465 :     _impl->state = STATE_STOPPED;
     209             : 
     210        3465 :     LBVERB << "Joined thread " << className( this ) << std::endl;
     211        3465 :     return true;
     212             : }
     213             : 
     214        7197 : bool Thread::isCurrent() const
     215             : {
     216        7197 :     return pthread_equal( pthread_self(), _impl->id._impl->pthread );
     217             : }
     218             : 
     219     2847383 : ThreadID Thread::getSelfThreadID()
     220             : {
     221     2847383 :     ThreadID threadID;
     222     2878202 :     threadID._impl->pthread = pthread_self();
     223     2876876 :     return threadID;
     224             : }
     225             : 
     226   106825392 : void Thread::yield()
     227             : {
     228             : #ifdef _MSC_VER
     229             :     ::Sleep( 0 ); // sleeps thread
     230             :     // or ::SwitchToThread() ? // switches to another waiting thread, if exists
     231             : #elif defined (__APPLE__)
     232             :     ::pthread_yield_np();
     233             : #else
     234   106825392 :     ::sched_yield();
     235             : #endif
     236   123448871 : }
     237             : 
     238        3501 : void Thread::pinCurrentThread()
     239             : {
     240             : #ifdef LB_WIN32_THREAD_AFFINITY
     241             :     static Lock lock;
     242             :     ScopedMutex<> mutex( lock );
     243             : 
     244             :     static DWORD_PTR processMask = 0;
     245             :     static DWORD_PTR processor   = 0;
     246             :     if( processMask == 0 )
     247             :     {
     248             :         // Get available processors
     249             :         DWORD_PTR systemMask;
     250             :         if( GetProcessAffinityMask( GetCurrentProcess(), &processMask,
     251             :             &systemMask ) == 0 )
     252             :         {
     253             :             LBWARN << "Can't get usable processor mask" << std::endl;
     254             :             return;
     255             :         }
     256             :         LBINFO << "Available processors 0x" << hex << processMask << dec <<endl;
     257             : 
     258             :         // Choose random starting processor: Multiple Eq apps on the same node
     259             :         // would otherwise use the same processor for the same thread
     260             :         unsigned nProcessors = 0;
     261             :         for( DWORD_PTR i = 1; i != 0; i <<= 1 )
     262             :         {
     263             :             if( processMask & i )
     264             :                 ++nProcessors;
     265             :         }
     266             :         LBINFO << nProcessors << " available processors" << std::endl;
     267             : 
     268             :         unsigned chance = RNG().get< unsigned >();
     269             :         processor = 1 << (chance % nProcessors);
     270             :         LBINFO << "Starting with processor " << processor << std::endl;
     271             :     }
     272             :     LBASSERT( processMask != 0 );
     273             : 
     274             :     while( true )
     275             :     {
     276             :         processor <<= 1;
     277             :         if( processor == 0 ) // wrap around
     278             :             processor = 1;
     279             : 
     280             :         if( processor & processMask ) // processor is available
     281             :         {
     282             :             if( SetThreadAffinityMask( GetCurrentThread(), processor ) == 0 )
     283             :                 LBWARN << "Can't set thread processor" << std::endl;
     284             :             LBINFO << "Pinned thread to processor 0x" << hex << processor << dec
     285             :                    << std::endl;
     286             :             return;
     287             :         }
     288             :     }
     289             : #endif
     290        3501 : }
     291             : 
     292             : #ifdef _MSC_VER
     293             : #  ifndef MS_VC_EXCEPTION
     294             : #    define MS_VC_EXCEPTION 0x406D1388
     295             : #  endif
     296             : 
     297             : #  pragma pack(push,8)
     298             : typedef struct tagTHREADNAME_INFO
     299             : {
     300             :     DWORD dwType; // Must be 0x1000.
     301             :     LPCSTR szName; // Pointer to name (in user addr space).
     302             :     DWORD dwThreadID; // Thread ID (-1=caller thread).
     303             :     DWORD dwFlags; // Reserved for future use, must be zero.
     304             : } THREADNAME_INFO;
     305             : #  pragma pack(pop)
     306             : static void _setVCName( const char* name )
     307             : {
     308             :     ::Sleep(10);
     309             : 
     310             :     THREADNAME_INFO info;
     311             :     info.dwType = 0x1000;
     312             :     info.szName = name;
     313             :     info.dwThreadID = GetCurrentThreadId();
     314             :     info.dwFlags = 0;
     315             : 
     316             :     __try
     317             :     {
     318             :         RaiseException( MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR),
     319             :                         (ULONG_PTR*)&info );
     320             :     }
     321             :     __except(EXCEPTION_EXECUTE_HANDLER)
     322             :     {
     323             :     }
     324             : }
     325             : #endif
     326             : 
     327        3525 : void Thread::setName( const std::string& name )
     328             : {
     329        3525 :     Log::instance().setThreadName( name );
     330             : 
     331             : #ifdef _MSC_VER
     332             : #  ifndef NDEBUG
     333             :     _setVCName( name.c_str( ));
     334             : #  endif
     335             : #elif __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
     336             :     pthread_setname_np( name.c_str( ));
     337             : #elif defined(__linux__)
     338        3525 :     prctl( PR_SET_NAME, name.c_str(), 0, 0, 0 );
     339             : #else
     340             :     // Not implemented
     341             :     LBVERB << "Thread::setName() not implemented" << std::endl;
     342             : #endif
     343        3525 : }
     344             : 
     345             : #ifdef LUNCHBOX_USE_HWLOC
     346           0 : static hwloc_bitmap_t _getCpuSet( const int32_t affinity,
     347             :                                   hwloc_topology_t topology )
     348             : {
     349           0 :     hwloc_bitmap_t cpuSet = hwloc_bitmap_alloc(); // HWloc CPU set
     350           0 :     hwloc_bitmap_zero( cpuSet ); // Initialize to zeros
     351             : 
     352           0 :     if( affinity >= Thread::CORE )
     353             :     {
     354           0 :         const int32_t coreIndex = affinity - Thread::CORE;
     355           0 :         if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_CORE, coreIndex ) == 0 )
     356             :         {
     357           0 :             LBWARN << "Core " << coreIndex << " does not exist in the topology"
     358           0 :                    << std::endl;
     359           0 :             return cpuSet;
     360             :         }
     361             : 
     362             :         // Getting the core object #coreIndex
     363             :         const hwloc_obj_t coreObj = hwloc_get_obj_by_type( topology,
     364             :                                                            HWLOC_OBJ_CORE,
     365           0 :                                                            coreIndex );
     366             :         // Get the CPU set associated with the specified core
     367           0 :         cpuSet = coreObj->allowed_cpuset;
     368           0 :         return cpuSet;
     369             :     }
     370             : 
     371           0 :     if( affinity == Thread::NONE )
     372           0 :         return cpuSet;
     373             : 
     374             :     // Sets the affinity to a specific CPU or "socket"
     375           0 :     LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
     376           0 :     const int32_t socketIndex = affinity - Thread::SOCKET;
     377             : 
     378           0 :     if( hwloc_get_obj_by_type( topology, HWLOC_OBJ_SOCKET, socketIndex ) == 0 )
     379             :     {
     380           0 :         LBWARN << "Socket " << socketIndex << " does not exist in the topology"
     381           0 :                << std::endl;
     382           0 :         return cpuSet;
     383             :     }
     384             : 
     385             :     // Getting the CPU object #cpuIndex (subtree node)
     386             :     const hwloc_obj_t socketObj = hwloc_get_obj_by_type( topology,
     387             :                                                          HWLOC_OBJ_SOCKET,
     388           0 :                                                          socketIndex );
     389             :     // Get the CPU set associated with the specified socket
     390           0 :     hwloc_bitmap_copy( cpuSet, socketObj->allowed_cpuset );
     391           0 :     return cpuSet;
     392             : }
     393             : #endif
     394             : 
     395           0 : void Thread::setAffinity( const int32_t affinity )
     396             : {
     397           0 :     if( affinity == Thread::NONE )
     398           0 :         return;
     399             : 
     400             : #ifdef LUNCHBOX_USE_HWLOC
     401             :     hwloc_topology_t topology;
     402           0 :     hwloc_topology_init( &topology ); // Allocate & initialize the topology
     403           0 :     hwloc_topology_load( topology );  // Perform HW topology detection
     404           0 :     const hwloc_bitmap_t cpuSet = _getCpuSet( affinity, topology );
     405             :     const int result = hwloc_set_cpubind( topology, cpuSet,
     406           0 :                                           HWLOC_CPUBIND_THREAD );
     407             :     char* cpuSetString;
     408           0 :     hwloc_bitmap_asprintf( &cpuSetString, cpuSet );
     409             : 
     410           0 :     if( result == 0 )
     411             :     {
     412           0 :         LBVERB << "Bound to cpu set "  << cpuSetString << std::endl;
     413             :     }
     414             :     else
     415             :     {
     416           0 :         LBWARN << "Error binding to cpu set " << cpuSetString << std::endl;
     417             :     }
     418           0 :     ::free( cpuSetString );
     419           0 :     hwloc_bitmap_free( cpuSet );
     420           0 :     hwloc_topology_destroy( topology );
     421             : 
     422             : #else
     423             :     LBWARN << "Thread::setAffinity not implemented, hwloc library missing"
     424             :            << std::endl;
     425             : #endif
     426             : }
     427             : 
     428           0 : std::ostream& operator << ( std::ostream& os, const Thread::Affinity affinity )
     429             : {
     430           0 :     if( affinity == Thread::NONE )
     431           0 :         return os << "No affinity";
     432           0 :     if( affinity >= Thread::CORE )
     433           0 :         return os << "Core " << affinity - Thread::CORE;
     434             : 
     435           0 :     LBASSERT( affinity >= Thread::SOCKET && affinity < Thread::SOCKET_MAX );
     436           0 :     return os << "Socket " <<  affinity - Thread::SOCKET;
     437             : }
     438             : 
     439             : #if 0
     440             : std::ostream& operator << ( std::ostream& os, const Thread* thread )
     441             : {
     442             :     os << "Thread " << thread->_impl->id << " state "
     443             :        << ( thread->_impl->state == Thread::STATE_STOPPED  ? "stopped"  :
     444             :             thread->_impl->state == Thread::STATE_STARTING ? "starting" :
     445             :             thread->_impl->state == Thread::STATE_RUNNING  ? "running"  :
     446             :             thread->_impl->state == Thread::STATE_STOPPING ? "stopping" :
     447             :             "unknown" );
     448             : 
     449             : #ifdef PTW32_VERSION
     450             :     os << " called from " << pthread_self().p;
     451             : #else
     452             :     os << " called from " << pthread_self();
     453             : #endif
     454             : 
     455             :     return os;
     456             : }
     457             : #endif
     458          90 : }

Generated by: LCOV version 1.10