LCOV - code coverage report
Current view: top level - eq - pipe.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 244 570 42.8 %
Date: 2016-09-29 05:02:09 Functions: 50 91 54.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2015, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "pipe.h"
      21             : 
      22             : // must be included before any header defining Bool
      23             : #ifdef EQ_QT_USED
      24             : #  include "qt/window.h"
      25             : #  include <QThread>
      26             : #endif
      27             : 
      28             : #include "channel.h"
      29             : #include "client.h"
      30             : #include "config.h"
      31             : #include "exception.h"
      32             : #include "frame.h"
      33             : #include "frameData.h"
      34             : #include "global.h"
      35             : #include "log.h"
      36             : #include "node.h"
      37             : #include "nodeFactory.h"
      38             : #include "pipeStatistics.h"
      39             : #include "server.h"
      40             : #include "view.h"
      41             : #include "window.h"
      42             : 
      43             : #include "messagePump.h"
      44             : #include "systemPipe.h"
      45             : 
      46             : #include <eq/fabric/commands.h>
      47             : #include <eq/fabric/elementVisitor.h>
      48             : #include <eq/fabric/leafVisitor.h>
      49             : #include <eq/fabric/task.h>
      50             : 
      51             : #include <co/global.h>
      52             : #include <co/objectICommand.h>
      53             : #include <co/queueSlave.h>
      54             : #include <co/worker.h>
      55             : #include <boost/lexical_cast.hpp>
      56             : #include <sstream>
      57             : 
      58             : #ifdef EQUALIZER_USE_HWLOC_GL
      59             : #  include <hwloc.h>
      60             : #  include <hwloc/gl.h>
      61             : #endif
      62             : 
      63             : #ifdef EQUALIZER_USE_QT5WIDGETS
      64             : #  include <QGuiApplication>
      65             : #  include <QRegularExpression>
      66             : #endif
      67             : 
      68             : namespace eq
      69             : {
      70             : /** @cond IGNORE */
      71             : typedef fabric::Pipe< Node, Pipe, Window, PipeVisitor > Super;
      72             : typedef co::CommandFunc<Pipe> PipeFunc;
      73             : /** @endcond */
      74             : 
      75             : namespace
      76             : {
      77             : enum State
      78             : {
      79             :     STATE_MAPPED,
      80             :     STATE_INITIALIZING,
      81             :     STATE_RUNNING,
      82             :     STATE_STOPPING, // must come after running
      83             :     STATE_STOPPED, // must come after running
      84             :     STATE_FAILED
      85             : };
      86             : 
      87             : typedef stde::hash_map< uint128_t, Frame* > FrameHash;
      88             : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
      89             : typedef stde::hash_map< uint128_t, View* > ViewHash;
      90             : typedef stde::hash_map< uint128_t, co::QueueSlave* > QueueHash;
      91             : typedef FrameHash::const_iterator FrameHashCIter;
      92             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      93             : typedef ViewHash::const_iterator ViewHashCIter;
      94             : typedef ViewHash::iterator ViewHashIter;
      95             : typedef QueueHash::const_iterator QueueHashCIter;
      96             : }
      97             : 
      98             : namespace detail
      99             : {
     100             : 
     101           2 : class RenderThread : public eq::Worker
     102             : {
     103             : public:
     104           1 :     explicit RenderThread( eq::Pipe* pipe )
     105             :         : eq::Worker( co::Global::getCommandQueueLimit( ))
     106           1 :         , _pipe( pipe )
     107           1 :     {}
     108             : 
     109             : protected:
     110           1 :     bool init() override
     111             :     {
     112           2 :         setName( std::string( "Draw" ) +
     113           2 :               boost::lexical_cast< std::string >( _pipe->getPath().pipeIndex ));
     114           1 :         return true;
     115             :     }
     116             : 
     117             :     void run() override;
     118          18 :     bool stopRunning() override { return !_pipe; }
     119             : 
     120             : private:
     121             :     eq::Pipe* _pipe;
     122             :     friend class eq::Pipe;
     123             : };
     124             : 
     125             : 
     126             : /** Asynchronous, per-pipe readback thread. */
     127           1 : class TransferThread : public co::Worker
     128             : {
     129             : public:
     130          91 :     explicit TransferThread( const uint32_t index )
     131             :         : co::Worker( co::Global::getCommandQueueLimit( ))
     132             :         , _index( index )
     133             :         , _qThread( nullptr )
     134          91 :         , _stop( false )
     135          91 :     {}
     136             : 
     137           0 :     bool init() override
     138             :     {
     139           0 :         if( !co::Worker::init( ))
     140           0 :             return false;
     141           0 :         setName( std::string( "Tfer" ) +
     142           0 :                  boost::lexical_cast< std::string >( _index ));
     143             : #ifdef EQ_QT_USED
     144           0 :         _qThread = QThread::currentThread();
     145             : #endif
     146           0 :         return true;
     147             :     }
     148             : 
     149           0 :     bool stopRunning() override { return _stop; }
     150           0 :     void postStop() { _stop = true; }
     151             : 
     152           0 :     QThread* getQThread() { return _qThread; }
     153             : 
     154             : private:
     155             :     uint32_t _index;
     156             :     QThread* _qThread;
     157             :     bool _stop; // thread will exit if this is true
     158             : };
     159             : 
     160             : class Pipe
     161             : {
     162             : public:
     163          91 :     explicit Pipe( const uint32_t index )
     164             :         : systemPipe( 0 )
     165             : #ifdef AGL
     166             :         , windowSystem( "AGL" )
     167             : #elif GLX
     168             :         , windowSystem( "GLX" )
     169             : #elif WGL
     170             :         , windowSystem( "WGL" )
     171             : #elif EQ_QT_USED
     172             :         , windowSystem( "Qt" )
     173             : #endif
     174             :         , state( STATE_STOPPED )
     175             :         , currentFrame( 0 )
     176             :         , frameTime( 0 )
     177             :         , thread( 0 )
     178          91 :         , transferThread( index )
     179          91 :     {}
     180             : 
     181           1 :     ~Pipe()
     182           1 :     {
     183           1 :         delete thread;
     184           1 :         thread = 0;
     185           1 :     }
     186             : 
     187             :     /** Window-system specific functions class */
     188             :     SystemPipe* systemPipe;
     189             : 
     190             :     /** The current window system. */
     191             :     WindowSystem windowSystem;
     192             : 
     193             :     /** The configInit/configExit state. */
     194             :     lunchbox::Monitor< State > state;
     195             : 
     196             :     /** The last started frame. */
     197             :     uint32_t currentFrame;
     198             : 
     199             :     /** The number of the last finished frame. */
     200             :     lunchbox::Monitor< uint32_t > finishedFrame;
     201             : 
     202             :     /** The number of the last locally unlocked frame. */
     203             :     lunchbox::Monitor< uint32_t > unlockedFrame;
     204             : 
     205             :     /** The running per-frame statistic clocks. */
     206             :     std::deque< int64_t > frameTimes;
     207             :     lunchbox::Lock frameTimeMutex;
     208             : 
     209             :     /** The base time for the currently active frame. */
     210             :     int64_t frameTime;
     211             : 
     212             :     /** All assembly frames used by the pipe during rendering. */
     213             :     FrameHash frames;
     214             : 
     215             :     /** All output frame datas used by the pipe during rendering. */
     216             :     FrameDataHash outputFrameDatas;
     217             : 
     218             :     /** All input frame datas used by the pipe during rendering. */
     219             :     FrameDataHash inputFrameDatas;
     220             : 
     221             :     /** All views used by the pipe's channels during rendering. */
     222             :     ViewHash views;
     223             : 
     224             :     /** All queues used by the pipe's channels during rendering. */
     225             :     QueueHash queues;
     226             : 
     227             :     /** The pipe thread. */
     228             :     RenderThread* thread;
     229             : 
     230             :     detail::TransferThread transferThread;
     231             : };
     232             : 
     233           1 : void RenderThread::run()
     234             : {
     235           1 :     LB_TS_THREAD( _pipe->_pipeThread );
     236           1 :     LBDEBUG << "Entered pipe thread" << std::endl;
     237             : 
     238           1 :     eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
     239           1 :     pipe->_impl->state.waitEQ( STATE_MAPPED );
     240           1 :     pipe->_impl->windowSystem = pipe->selectWindowSystem();
     241           1 :     pipe->_setupCommandQueue();
     242           1 :     pipe->_setupAffinity();
     243             : 
     244           1 :     Worker::run();
     245             : 
     246           1 :     pipe->_exitCommandQueue();
     247           1 : }
     248             : }
     249             : 
     250          91 : Pipe::Pipe( Node* parent )
     251             :         : Super( parent )
     252          91 :         , _impl( new detail::Pipe( getPath().pipeIndex ))
     253             : {
     254          91 : }
     255             : 
     256           3 : Pipe::~Pipe()
     257             : {
     258           1 :     LBASSERT( getWindows().empty( ));
     259           1 :     delete _impl;
     260           2 : }
     261             : 
     262           9 : Config* Pipe::getConfig()
     263             : {
     264           9 :     Node* node = getNode();
     265           9 :     LBASSERT( node );
     266           9 :     return ( node ? node->getConfig() : 0);
     267             : }
     268             : 
     269           0 : const Config* Pipe::getConfig() const
     270             : {
     271           0 :     const Node* node = getNode();
     272           0 :     LBASSERT( node );
     273           0 :     return ( node ? node->getConfig() : 0);
     274             : }
     275             : 
     276           2 : ClientPtr Pipe::getClient()
     277             : {
     278           2 :     Node* node = getNode();
     279           2 :     LBASSERT( node );
     280           2 :     return ( node ? node->getClient() : 0);
     281             : }
     282             : 
     283           4 : ServerPtr Pipe::getServer()
     284             : {
     285           4 :     Node* node = getNode();
     286           4 :     LBASSERT( node );
     287           4 :     return ( node ? node->getServer() : 0);
     288             : }
     289             : 
     290           1 : void Pipe::attach( const uint128_t& id, const uint32_t instanceID )
     291             : {
     292           1 :     Super::attach( id, instanceID );
     293             : 
     294           1 :     co::CommandQueue* queue = getPipeThreadQueue();
     295           1 :     co::CommandQueue* transferQ = getTransferThreadQueue();
     296             : 
     297             :     registerCommand( fabric::CMD_PIPE_CONFIG_INIT,
     298           1 :                      PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
     299             :     registerCommand( fabric::CMD_PIPE_CONFIG_EXIT,
     300           1 :                      PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
     301             :     registerCommand( fabric::CMD_PIPE_CREATE_WINDOW,
     302           1 :                      PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
     303             :     registerCommand( fabric::CMD_PIPE_DESTROY_WINDOW,
     304           1 :                      PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
     305             :     registerCommand( fabric::CMD_PIPE_FRAME_START,
     306           1 :                      PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
     307             :     registerCommand( fabric::CMD_PIPE_FRAME_FINISH,
     308           1 :                      PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
     309             :     registerCommand( fabric::CMD_PIPE_FRAME_DRAW_FINISH,
     310           1 :                      PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
     311             :     registerCommand( fabric::CMD_PIPE_FRAME_START_CLOCK,
     312           1 :                      PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
     313             :     registerCommand( fabric::CMD_PIPE_EXIT_THREAD,
     314           1 :                      PipeFunc( this, &Pipe::_cmdExitThread ), queue );
     315             :     registerCommand( fabric::CMD_PIPE_DETACH_VIEW,
     316           1 :                      PipeFunc( this, &Pipe::_cmdDetachView ), queue );
     317             :     registerCommand( fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
     318             :                      PipeFunc( this, &Pipe::_cmdExitTransferThread ),
     319           1 :                      transferQ );
     320           1 : }
     321             : 
     322        1084 : void Pipe::setDirty( const uint64_t bits )
     323             : {
     324             :     // jump over fabric setDirty to avoid dirty'ing node pipes list
     325             :     // pipes are individually synced in frame finish for thread-safety
     326        1084 :     Object::setDirty( bits );
     327        1084 : }
     328             : 
     329           0 : bool Pipe::isWindowSystemAvailable( const std::string& name ) const
     330             : {
     331           0 :     bool available = false;
     332           0 :     if( name != "Qt" )
     333             :     {
     334             : #ifdef AGL
     335             :         available = name == "AGL";
     336             : #elif GLX
     337           0 :         available = name == "GLX";
     338             : #elif WGL
     339             :         available = name == "WGL";
     340             : #endif
     341             :     }
     342             : 
     343             : #ifdef EQ_QT_USED
     344           0 :     if( name != "Qt" )
     345           0 :         return available;
     346             : 
     347             :     // Qt can only use ports that refer to QScreens available to the
     348             :     // QApplication. In Windows and Mac all physical displays are
     349             :     // queriable using platform dependent functions (no idea about virtual
     350             :     // displays like Xming or XQuartz), so we will assume that if Equalizer
     351             :     // was built with Qt support, then all devices can be used by Qt.
     352             :     // (How to choose the right screen for a given device it's a different
     353             :     // story)
     354             : #  ifdef AGL
     355             :     available = true;
     356             : #  elif WGL
     357             :     available = true;
     358             : #  else
     359             :     // For X it's different. Qt can only use screens that are part of the
     360             :     // display server referred by the DISPLAY environmental variable (in
     361             :     // particular its value at the moment of the creation of the QApplication)
     362             : #    ifdef __APPLE__
     363             :     // In MAC this is simpler, because there can only be one display server.
     364             :     return true;
     365             : #    else
     366             :     // There's no way to infer the X server number from the Qt API except
     367             :     // QScreen names, but only under certain conditions. In regular desktop
     368             :     // usage QScreen::name gives a name related to what xrandr prints, so it's
     369             :     // not usable. The names seems to follow the X naming convention
     370             :     // (e.g. host:0.0) when XRANDR is not available (e.g. Xnest) or doesn't
     371             :     // provide information about where the displays are connected (e.g.
     372             :     // headless or Xvnc). These are corner cases so we cannot rely on QScreen.
     373             :     // Therefore, we will infer which port is Qt using directly from DISPLAY.
     374             : 
     375             :     // Is the port is undefined that means that the default server must
     376             :     // be used. This should match the Qt display server.
     377           0 :     if( getPort() == LB_UNDEFINED_UINT32 )
     378           0 :         return true;
     379             : 
     380             :     QGuiApplication* app =
     381           0 :         dynamic_cast< QGuiApplication* >( QCoreApplication::instance( ));
     382           0 :     if( !app || !app->primaryScreen() )
     383           0 :         return false; // Qt won't be able to access anything anyway
     384             : 
     385           0 :     QRegularExpression regex( "^[a-z]*\\:([0-9]+)(\\.[0-9]+)?$" );
     386             :     QRegularExpressionMatch match =
     387           0 :         regex.match( getenv( "DISPLAY" ) ? getenv( "DISPLAY" ) : "" );
     388           0 :     available = match.captured( 1 ) == QString::number( getPort(), 10 );
     389             : #    endif
     390             : #  endif
     391             : #endif
     392             : 
     393           0 :     return available;
     394             : }
     395             : 
     396           1 : WindowSystem Pipe::selectWindowSystem() const
     397             : {
     398             : #ifdef AGL
     399             :     return WindowSystem( "AGL" );
     400             : #elif GLX
     401           1 :     return WindowSystem( "GLX" );
     402             : #elif WGL
     403             :     return WindowSystem( "WGL" );
     404             : #elif EQ_QT_USED
     405             :     if( !isWindowSystemAvailable( "Qt" ))
     406             :     {
     407             :         // Throwing because there's no reasonable alternative.
     408             :         std::stringstream msg;
     409             :         msg << "Cannot choose windowing system for pipe at port " << getPort()
     410             :             << ". Qt was set as the default, but it cannot be used. In X"
     411             :                " based systems this means that the value of DISPLAY taken by"
     412             :                " Qt refers to a different display server." << std::endl;
     413             :         LBTHROW( std::runtime_error( msg.str( )));
     414             :     }
     415             :     return WindowSystem( "Qt" );
     416             : #endif
     417             : }
     418             : 
     419           1 : void Pipe::_setupCommandQueue()
     420             : {
     421           3 :     LBDEBUG << "Set up pipe message pump for " << _impl->windowSystem
     422           3 :            << std::endl;
     423             : 
     424           1 :     Config* config = getConfig();
     425           1 :     config->setupMessagePump( this );
     426             : 
     427           1 :     if( !_impl->thread ) // Non-threaded pipes have no pipe thread message pump
     428           1 :         return;
     429             : 
     430           1 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     431           1 :     LBASSERT( queue );
     432           1 :     LBASSERT( !queue->getMessagePump( ));
     433             : 
     434           1 :     Global::enterCarbon();
     435           1 :     MessagePump* pump = createMessagePump();
     436           1 :     if( pump )
     437           1 :         pump->dispatchAll(); // initializes _impl->receiverQueue
     438             : 
     439           1 :     queue->setMessagePump( pump );
     440           1 :     Global::leaveCarbon();
     441             : }
     442             : 
     443           1 : int32_t Pipe::_getAutoAffinity() const
     444             : {
     445             : #ifdef EQUALIZER_USE_HWLOC_GL
     446             :     uint32_t port = getPort();
     447             :     uint32_t device = getDevice();
     448             : 
     449             :     if( port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32 )
     450             :         return lunchbox::Thread::NONE;
     451             : 
     452             :     if( port == LB_UNDEFINED_UINT32 )
     453             :         port = 0;
     454             :     if( device == LB_UNDEFINED_UINT32 )
     455             :         device = 0;
     456             : 
     457             :     hwloc_topology_t topology;
     458             :     if( hwloc_topology_init( &topology ) < 0 )
     459             :     {
     460             :         LBINFO << "Automatic pipe thread placement failed: "
     461             :                << "hwloc_topology_init() failed" << std::endl;
     462             :         return lunchbox::Thread::NONE;
     463             :     }
     464             : 
     465             :     // Load I/O devices, bridges and their relevant info
     466             :     const unsigned long loading_flags = HWLOC_TOPOLOGY_FLAG_IO_BRIDGES |
     467             :                                         HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
     468             :     if( hwloc_topology_set_flags( topology, loading_flags ) < 0 )
     469             :     {
     470             :         LBINFO << "Automatic pipe thread placement failed: "
     471             :                << "hwloc_topology_set_flags() failed" << std::endl;
     472             :         hwloc_topology_destroy( topology );
     473             :         return lunchbox::Thread::NONE;
     474             :     }
     475             : 
     476             :     if( hwloc_topology_load( topology ) < 0 )
     477             :     {
     478             :         LBINFO << "Automatic pipe thread placement failed: "
     479             :                << "hwloc_topology_load() failed" << std::endl;
     480             :         hwloc_topology_destroy( topology );
     481             :         return lunchbox::Thread::NONE;
     482             :     }
     483             : 
     484             :     const hwloc_obj_t osdev =
     485             :         hwloc_gl_get_display_osdev_by_port_device( topology,
     486             :                                                    int( port ), int( device ));
     487             :     if( !osdev )
     488             :     {
     489             :         LBINFO << "Automatic pipe thread placement failed: GPU not found"
     490             :                << std::endl;
     491             :         hwloc_topology_destroy( topology );
     492             :         return lunchbox::Thread::NONE;
     493             :     }
     494             : 
     495             :     const hwloc_obj_t pcidev = osdev->parent;
     496             :     const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj( topology, pcidev);
     497             :     const int numCpus =
     498             :         hwloc_get_nbobjs_inside_cpuset_by_type( topology, parent->cpuset,
     499             :                                                 HWLOC_OBJ_SOCKET );
     500             :     if( numCpus != 1 )
     501             :     {
     502             :         LBINFO << "Automatic pipe thread placement failed: GPU attached to "
     503             :                << numCpus << " processors?" << std::endl;
     504             :         hwloc_topology_destroy( topology );
     505             :         return lunchbox::Thread::NONE;
     506             :     }
     507             : 
     508             :     const hwloc_obj_t cpuObj =
     509             :         hwloc_get_obj_inside_cpuset_by_type( topology, parent->cpuset,
     510             :                                              HWLOC_OBJ_SOCKET, 0 );
     511             :     if( cpuObj == 0 )
     512             :     {
     513             :         LBINFO << "Automatic pipe thread placement failed: "
     514             :                << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
     515             :         hwloc_topology_destroy( topology );
     516             :         return lunchbox::Thread::NONE;
     517             :     }
     518             : 
     519             :     const int cpuIndex = cpuObj->logical_index;
     520             :     hwloc_topology_destroy( topology );
     521             :     return cpuIndex + lunchbox::Thread::SOCKET;
     522             : #else
     523           3 :     LBDEBUG << "Automatic thread placement not supported, no hwloc GL support"
     524           3 :             << std::endl;
     525             : #endif
     526           1 :     return lunchbox::Thread::NONE;
     527             : }
     528             : 
     529           1 : void Pipe::_setupAffinity()
     530             : {
     531           1 :     const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
     532           1 :     switch( affinity )
     533             :     {
     534             :         case AUTO:
     535           1 :             lunchbox::Thread::setAffinity( _getAutoAffinity( ));
     536           1 :             break;
     537             : 
     538             :         case OFF:
     539             :         default:
     540           0 :             lunchbox::Thread::setAffinity( affinity );
     541           0 :             break;
     542             :     }
     543           1 : }
     544             : 
     545           1 : void Pipe::_exitCommandQueue()
     546             : {
     547             :     // Non-threaded pipes have no pipe thread message pump
     548           1 :     if( !_impl->thread )
     549           1 :         return;
     550             : 
     551           1 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     552           1 :     LBASSERT( queue );
     553             : 
     554           1 :     MessagePump* pump = queue->getMessagePump();
     555           1 :     queue->setMessagePump( 0 );
     556           1 :     delete pump;
     557             : }
     558             : 
     559           1 : MessagePump* Pipe::createMessagePump()
     560             : {
     561           1 :     return _impl->windowSystem.createMessagePump();
     562             : }
     563             : 
     564           0 : MessagePump* Pipe::getMessagePump()
     565             : {
     566           0 :     LB_TS_THREAD( _pipeThread );
     567           0 :     if( !_impl->thread )
     568           0 :         return 0;
     569             : 
     570           0 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     571           0 :     return queue->getMessagePump();
     572             : }
     573             : 
     574           3 : co::CommandQueue* Pipe::getPipeThreadQueue()
     575             : {
     576           3 :     if( _impl->thread )
     577           3 :         return _impl->thread->getWorkerQueue();
     578             : 
     579           0 :     return getNode()->getMainThreadQueue();
     580             : }
     581             : 
     582           2 : co::CommandQueue* Pipe::getTransferThreadQueue()
     583             : {
     584           2 :     return _impl->transferThread.getWorkerQueue();
     585             : }
     586             : 
     587           1 : co::CommandQueue* Pipe::getMainThreadQueue()
     588             : {
     589           1 :     return getServer()->getMainThreadQueue();
     590             : }
     591             : 
     592           1 : co::CommandQueue* Pipe::getCommandThreadQueue()
     593             : {
     594           1 :     return getServer()->getCommandThreadQueue();
     595             : }
     596             : 
     597           0 : Frame* Pipe::getFrame( const co::ObjectVersion& frameVersion, const Eye eye,
     598             :                        const bool isOutput )
     599             : {
     600           0 :     LB_TS_THREAD( _pipeThread );
     601           0 :     Frame* frame = _impl->frames[ frameVersion.identifier ];
     602             : 
     603           0 :     if( !frame )
     604             :     {
     605           0 :         ClientPtr client = getClient();
     606           0 :         frame = new Frame();
     607             : 
     608           0 :         LBCHECK( client->mapObject( frame, frameVersion ));
     609           0 :         _impl->frames[ frameVersion.identifier ] = frame;
     610             :     }
     611             :     else
     612           0 :         frame->sync( frameVersion.version );
     613             : 
     614           0 :     const co::ObjectVersion& dataVersion = frame->getDataVersion( eye );
     615           0 :     LBLOG( LOG_ASSEMBLY ) << "Use " << dataVersion << std::endl;
     616             : 
     617           0 :     FrameDataPtr frameData = getNode()->getFrameData( dataVersion );
     618           0 :     LBASSERT( frameData );
     619             : 
     620           0 :     if( isOutput )
     621             :     {
     622           0 :         if( !frameData->isAttached() )
     623             :         {
     624           0 :             ClientPtr client = getClient();
     625           0 :             LBCHECK( client->mapObject( frameData.get(), dataVersion ));
     626             :         }
     627           0 :         else if( frameData->getVersion() < dataVersion.version )
     628           0 :             frameData->sync( dataVersion.version );
     629             : 
     630           0 :         _impl->outputFrameDatas[ dataVersion.identifier ] = frameData;
     631             :     }
     632             :     else
     633           0 :         _impl->inputFrameDatas[ dataVersion.identifier ] = frameData;
     634             : 
     635           0 :     frame->setFrameData( frameData );
     636           0 :     return frame;
     637             : }
     638             : 
     639           0 : void Pipe::flushFrames( util::ObjectManager& om )
     640             : {
     641           0 :     LB_TS_THREAD( _pipeThread );
     642           0 :     ClientPtr client = getClient();
     643           0 :     for( FrameHashCIter i = _impl->frames.begin(); i !=_impl->frames.end(); ++i)
     644             :     {
     645           0 :         Frame* frame = i->second;
     646           0 :         frame->setFrameData( 0 ); // datas are flushed below
     647           0 :         client->unmapObject( frame );
     648           0 :         delete frame;
     649             :     }
     650           0 :     _impl->frames.clear();
     651             : 
     652           0 :     for( FrameDataHashCIter i = _impl->inputFrameDatas.begin();
     653           0 :          i != _impl->inputFrameDatas.end(); ++i )
     654             :     {
     655           0 :         FrameDataPtr data = i->second;
     656           0 :         data->deleteGLObjects( om );
     657           0 :     }
     658           0 :     _impl->inputFrameDatas.clear();
     659             : 
     660           0 :     for( FrameDataHashCIter i = _impl->outputFrameDatas.begin();
     661           0 :          i != _impl->outputFrameDatas.end(); ++i )
     662             :     {
     663           0 :         FrameDataPtr data = i->second;
     664           0 :         data->resetPlugins();
     665           0 :         data->deleteGLObjects( om );
     666           0 :         client->unmapObject( data.get( ));
     667           0 :         getNode()->releaseFrameData( data );
     668           0 :     }
     669           0 :     _impl->outputFrameDatas.clear();
     670           0 : }
     671             : 
     672           0 : co::QueueSlave* Pipe::getQueue( const uint128_t& queueID )
     673             : {
     674           0 :     LB_TS_THREAD( _pipeThread );
     675           0 :     if( queueID == 0 )
     676           0 :         return 0;
     677             : 
     678           0 :     co::QueueSlave* queue = _impl->queues[ queueID ];
     679           0 :     if( !queue )
     680             :     {
     681           0 :         queue = new co::QueueSlave;
     682           0 :         ClientPtr client = getClient();
     683           0 :         LBCHECK( client->mapObject( queue, queueID ));
     684             : 
     685           0 :         _impl->queues[ queueID ] = queue;
     686             :     }
     687             : 
     688           0 :     return queue;
     689             : }
     690             : 
     691           1 : void Pipe::_flushQueues()
     692             : {
     693           1 :     LB_TS_THREAD( _pipeThread );
     694           1 :     ClientPtr client = getClient();
     695             : 
     696           1 :     for( QueueHashCIter i = _impl->queues.begin(); i !=_impl->queues.end(); ++i)
     697             :     {
     698           0 :         co::QueueSlave* queue = i->second;
     699           0 :         client->unmapObject( queue );
     700           0 :         delete queue;
     701             :     }
     702           1 :     _impl->queues.clear();
     703           1 : }
     704             : 
     705           0 : const View* Pipe::getView( const co::ObjectVersion& viewVersion ) const
     706             : {
     707             :     // Yie-ha: we want to have a const-interface to get a view on the render
     708             :     //         clients, but view mapping is by definition non-const.
     709           0 :     return const_cast< Pipe* >( this )->getView( viewVersion );
     710             : }
     711             : 
     712           0 : View* Pipe::getView( const co::ObjectVersion& viewVersion )
     713             : {
     714           0 :     LB_TS_THREAD( _pipeThread );
     715           0 :     if( viewVersion.identifier == 0 )
     716           0 :         return 0;
     717             : 
     718           0 :     View* view = _impl->views[ viewVersion.identifier ];
     719           0 :     if( !view )
     720             :     {
     721           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
     722           0 :         view = nodeFactory->createView( 0 );
     723           0 :         LBASSERT( view );
     724           0 :         view->_pipe = this;
     725           0 :         ClientPtr client = getClient();
     726           0 :         LBCHECK( client->mapObject( view, viewVersion ));
     727             : 
     728           0 :         _impl->views[ viewVersion.identifier ] = view;
     729             :     }
     730             : 
     731           0 :     view->sync( viewVersion.version );
     732           0 :     return view;
     733             : }
     734             : 
     735           0 : void Pipe::_releaseViews()
     736             : {
     737           0 :     LB_TS_THREAD( _pipeThread );
     738           0 :     for( bool changed = true; changed; )
     739             :     {
     740           0 :         changed = false;
     741           0 :         for( ViewHashIter i =_impl->views.begin(); i !=_impl->views.end(); ++i )
     742             :         {
     743           0 :             View* view = i->second;
     744           0 :             view->commit();
     745           0 :             if( view->getVersion() + 20 > view->getHeadVersion( ))
     746           0 :                 continue;
     747             : 
     748             :             // release unused view to avoid memory leaks due to deltas piling up
     749           0 :             view->_pipe = 0;
     750             : 
     751           0 :             ClientPtr client = getClient();
     752           0 :             client->unmapObject( view );
     753           0 :             _impl->views.erase( i );
     754             : 
     755           0 :             NodeFactory* nodeFactory = Global::getNodeFactory();
     756           0 :             nodeFactory->releaseView( view );
     757             : 
     758           0 :             changed = true;
     759           0 :             break;
     760           0 :         }
     761             :     }
     762           0 : }
     763             : 
     764           1 : void Pipe::_flushViews()
     765             : {
     766           1 :     LB_TS_THREAD( _pipeThread );
     767           1 :     NodeFactory* nodeFactory = Global::getNodeFactory();
     768           1 :     ClientPtr client = getClient();
     769             : 
     770           1 :     for( ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i )
     771             :     {
     772           0 :         View* view = i->second;
     773             : 
     774           0 :         client->unmapObject( view );
     775           0 :         view->_pipe = 0;
     776           0 :         nodeFactory->releaseView( view );
     777             :     }
     778           1 :     _impl->views.clear();
     779           1 : }
     780             : 
     781           1 : void Pipe::startThread()
     782             : {
     783           1 :     _impl->thread = new detail::RenderThread( this );
     784           1 :     _impl->thread->start();
     785           1 : }
     786             : 
     787           1 : void Pipe::exitThread()
     788             : {
     789           1 :     _stopTransferThread();
     790             : 
     791           1 :     if( !_impl->thread )
     792           1 :         return;
     793             : 
     794           1 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD );
     795             : 
     796           1 :     _impl->thread->join();
     797           1 :     delete _impl->thread;
     798           1 :     _impl->thread = 0;
     799             : }
     800             : 
     801          90 : void Pipe::cancelThread()
     802             : {
     803          90 :     _stopTransferThread();
     804             : 
     805          90 :     if( !_impl->thread )
     806         180 :         return;
     807             : 
     808             :     // local command dispatching
     809             :     co::ObjectOCommand( this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
     810           0 :                         co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL );
     811             : }
     812             : 
     813           0 : void Pipe::waitExited() const
     814             : {
     815           0 :     _impl->state.waitGE( STATE_STOPPED );
     816           0 : }
     817             : 
     818           1 : bool Pipe::isRunning() const
     819             : {
     820           1 :     return (_impl->state == STATE_RUNNING);
     821             : }
     822             : 
     823           1 : bool Pipe::isStopped() const
     824             : {
     825           1 :     return (_impl->state == STATE_STOPPED);
     826             : }
     827             : 
     828           1 : void Pipe::notifyMapped()
     829             : {
     830           1 :     LBASSERT( _impl->state == STATE_STOPPED );
     831           1 :     _impl->state = STATE_MAPPED;
     832           1 : }
     833             : 
     834             : namespace
     835             : {
     836           0 : class WaitFinishedVisitor : public PipeVisitor
     837             : {
     838             : public:
     839           0 :     WaitFinishedVisitor( const uint32_t frame, MessagePump* pump )
     840           0 :         : _frame( frame ), _pump( pump ) {}
     841             : 
     842           0 :     virtual VisitorResult visit( Channel* channel )
     843             :     {
     844           0 :         while( !channel->waitFrameFinished( _frame, 100 ))
     845             :         {
     846             :             // process potential pending Qt slots
     847           0 :             if( _pump )
     848           0 :                 _pump->dispatchAll();
     849             :         }
     850             : 
     851           0 :         return TRAVERSE_CONTINUE;
     852             :     }
     853             : 
     854             : private:
     855             :     const uint32_t _frame;
     856             :     MessagePump* _pump;
     857             : };
     858             : }
     859             : 
     860           0 : void Pipe::waitFrameFinished( const uint32_t frameNumber )
     861             : {
     862           0 :     MessagePump* pump = getConfig()->getMessagePump();
     863           0 :     while( !_impl->finishedFrame.timedWaitGE( frameNumber, 100 ))
     864             :     {
     865             :         // process potential pending Qt slots
     866           0 :         if( pump )
     867           0 :             pump->dispatchAll();
     868             :     }
     869             : 
     870           0 :     WaitFinishedVisitor waiter( frameNumber, pump );
     871           0 :     accept( waiter );
     872           0 : }
     873             : 
     874           0 : void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
     875             : {
     876           0 :     _impl->unlockedFrame.waitGE( frameNumber );
     877           0 : }
     878             : 
     879           0 : uint32_t Pipe::getCurrentFrame() const
     880             : {
     881           0 :     LB_TS_THREAD( _pipeThread );
     882           0 :     return _impl->currentFrame;
     883             : }
     884             : 
     885           0 : uint32_t Pipe::getFinishedFrame() const
     886             : {
     887           0 :     return _impl->finishedFrame.get();
     888             : }
     889             : 
     890           1 : WindowSystem Pipe::getWindowSystem() const
     891             : {
     892           1 :     return _impl->windowSystem;
     893             : }
     894             : 
     895           1 : EventOCommand Pipe::sendError( const uint32_t error )
     896             : {
     897           1 :     return getConfig()->sendError( Event::PIPE_ERROR, Error( error, getID( )));
     898             : }
     899             : 
     900           0 : bool Pipe::processEvent( const Event& event )
     901             : {
     902           0 :     ConfigEvent configEvent( event );
     903           0 :     getConfig()->sendEvent( configEvent );
     904           0 :     return true;
     905             : }
     906             : 
     907             : //---------------------------------------------------------------------------
     908             : // pipe-thread methods
     909             : //---------------------------------------------------------------------------
     910           1 : bool Pipe::configInit( const uint128_t& initID )
     911             : {
     912           1 :     LB_TS_THREAD( _pipeThread );
     913           1 :     LBASSERT( !_impl->systemPipe );
     914           1 :     return configInitSystemPipe( initID );
     915             : }
     916             : 
     917           1 : bool Pipe::configInitSystemPipe( const uint128_t& )
     918             : {
     919           1 :     SystemPipe* systemPipe = _impl->windowSystem.createPipe( this );
     920           1 :     LBASSERT( systemPipe );
     921             : 
     922           1 :     if( !systemPipe->configInit( ))
     923             :     {
     924           1 :         LBERROR << "System pipe context initialization failed" << std::endl;
     925           1 :         delete systemPipe;
     926           1 :         return false;
     927             :     }
     928             : 
     929           0 :     setSystemPipe( systemPipe );
     930           0 :     return true;
     931             : }
     932             : 
     933           1 : bool Pipe::configExit()
     934             : {
     935           1 :     LB_TS_THREAD( _pipeThread );
     936             : 
     937           1 :     if( _impl->systemPipe )
     938             :     {
     939           0 :         _impl->systemPipe->configExit( );
     940           0 :         delete _impl->systemPipe;
     941           0 :         _impl->systemPipe = 0;
     942             :     }
     943           1 :     return true;
     944             : }
     945             : 
     946             : 
     947           0 : void Pipe::frameStart( const uint128_t&, const uint32_t frameNumber )
     948             : {
     949           0 :     LB_TS_THREAD( _pipeThread );
     950             : 
     951           0 :     const Node* node = getNode();
     952           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     953             :     {
     954             :         case ASYNC:      // No sync, release immediately
     955           0 :             releaseFrameLocal( frameNumber );
     956           0 :             break;
     957             : 
     958             :         case DRAW_SYNC:  // Sync, release in frameDrawFinish
     959             :         case LOCAL_SYNC: // Sync, release in frameFinish
     960           0 :             node->waitFrameStarted( frameNumber );
     961           0 :             break;
     962             : 
     963             :         default:
     964           0 :             LBUNIMPLEMENTED;
     965             :     }
     966             : 
     967           0 :     startFrame( frameNumber );
     968           0 : }
     969             : 
     970           0 : void Pipe::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
     971             : {
     972           0 :     const Node* node = getNode();
     973           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     974             :     {
     975             :         case ASYNC:      // released in frameStart
     976           0 :             break;
     977             : 
     978             :         case DRAW_SYNC:  // release
     979           0 :             releaseFrameLocal( frameNumber );
     980           0 :             break;
     981             : 
     982             :         case LOCAL_SYNC: // release in frameFinish
     983           0 :             break;
     984             : 
     985             :         default:
     986           0 :             LBUNIMPLEMENTED;
     987             :     }
     988           0 : }
     989             : 
     990           0 : void Pipe::frameFinish( const uint128_t&, const uint32_t frameNumber )
     991             : {
     992           0 :     const Node* node = getNode();
     993           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     994             :     {
     995             :         case ASYNC:      // released in frameStart
     996           0 :             break;
     997             : 
     998             :         case DRAW_SYNC:  // released in frameDrawFinish
     999           0 :             break;
    1000             : 
    1001             :         case LOCAL_SYNC: // release
    1002           0 :             releaseFrameLocal( frameNumber );
    1003           0 :             break;
    1004             : 
    1005             :         default:
    1006           0 :             LBUNIMPLEMENTED;
    1007             :     }
    1008             : 
    1009             :     // Global release
    1010           0 :     releaseFrame( frameNumber );
    1011           0 : }
    1012             : 
    1013           0 : void Pipe::startFrame( const uint32_t frameNumber )
    1014             : {
    1015           0 :     LB_TS_THREAD( _pipeThread );
    1016           0 :     _impl->currentFrame = frameNumber;
    1017           0 :     LBLOG( LOG_TASKS ) << "---- Started Frame ---- "<< frameNumber << std::endl;
    1018           0 : }
    1019             : 
    1020           0 : void Pipe::releaseFrame( const uint32_t frameNumber )
    1021             : {
    1022           0 :     LB_TS_THREAD( _pipeThread );
    1023           0 :     _impl->finishedFrame = frameNumber;
    1024           0 :     LBLOG( LOG_TASKS ) << "---- Finished Frame --- "<< frameNumber << std::endl;
    1025           0 : }
    1026             : 
    1027           0 : void Pipe::releaseFrameLocal( const uint32_t frameNumber )
    1028             : {
    1029           0 :     LB_TS_THREAD( _pipeThread );
    1030           0 :     LBASSERTINFO( _impl->unlockedFrame + 1 == frameNumber,
    1031             :                   _impl->unlockedFrame << ", " << frameNumber );
    1032             : 
    1033           0 :     _impl->unlockedFrame = frameNumber;
    1034           0 :     LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- "
    1035           0 :                        << _impl->unlockedFrame.get() << std::endl;
    1036           0 : }
    1037             : 
    1038           0 : bool Pipe::startTransferThread()
    1039             : {
    1040           0 :     if( _impl->transferThread.isRunning( ))
    1041           0 :         return true;
    1042             : 
    1043           0 :     return _impl->transferThread.start();
    1044             : }
    1045             : 
    1046           0 : QThread* Pipe::getTransferQThread()
    1047             : {
    1048           0 :     return _impl->transferThread.getQThread();
    1049             : }
    1050             : 
    1051           1 : bool Pipe::hasTransferThread() const
    1052             : {
    1053           1 :     return _impl->transferThread.isRunning();
    1054             : }
    1055             : 
    1056          91 : void Pipe::_stopTransferThread()
    1057             : {
    1058          91 :     if( _impl->transferThread.isStopped( ))
    1059         182 :         return;
    1060             : 
    1061           0 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD );
    1062           0 :     _impl->transferThread.join();
    1063             : }
    1064             : 
    1065           0 : void Pipe::setSystemPipe( SystemPipe* pipe )
    1066             : {
    1067           0 :     _impl->systemPipe = pipe;
    1068           0 : }
    1069             : 
    1070           0 : SystemPipe* Pipe::getSystemPipe()
    1071             : {
    1072           0 :     return _impl->systemPipe;
    1073             : }
    1074             : 
    1075           0 : const SystemPipe* Pipe::getSystemPipe() const
    1076             : {
    1077           0 :     return _impl->systemPipe;
    1078             : }
    1079             : 
    1080             : //---------------------------------------------------------------------------
    1081             : // command handlers
    1082             : //---------------------------------------------------------------------------
    1083           1 : bool Pipe::_cmdCreateWindow( co::ICommand& cmd )
    1084             : {
    1085           1 :     co::ObjectICommand command( cmd );
    1086           1 :     const uint128_t& windowID = command.read< uint128_t >();
    1087             : 
    1088           1 :     LBLOG( LOG_INIT ) << "Create window " << command << " id " << windowID
    1089           1 :                       << std::endl;
    1090             : 
    1091           1 :     Window* window = Global::getNodeFactory()->createWindow( this );
    1092           1 :     window->init(); // not in ctor, virtual method
    1093             : 
    1094           1 :     Config* config = getConfig();
    1095           1 :     LBCHECK( config->mapObject( window, windowID ));
    1096             : 
    1097           1 :     return true;
    1098             : }
    1099             : 
    1100           1 : bool Pipe::_cmdDestroyWindow( co::ICommand& cmd )
    1101             : {
    1102           1 :     co::ObjectICommand command( cmd );
    1103             : 
    1104           1 :     LBLOG( LOG_INIT ) << "Destroy window " << command << std::endl;
    1105             : 
    1106           1 :     Window* window = _findWindow( command.read< uint128_t >( ));
    1107           1 :     LBASSERT( window );
    1108             : 
    1109             :     // re-set shared windows accordingly
    1110           1 :     Window* newSharedWindow = 0;
    1111           1 :     const Windows& windows = getWindows();
    1112           2 :     for( Windows::const_iterator i = windows.begin(); i != windows.end(); ++i )
    1113             :     {
    1114           1 :         Window* candidate = *i;
    1115             : 
    1116           1 :         if( candidate == window )
    1117           1 :             continue; // ignore
    1118             : 
    1119           0 :         if( candidate->getSharedContextWindow() == window )
    1120             :         {
    1121           0 :             if( newSharedWindow )
    1122           0 :                 candidate->setSharedContextWindow( newSharedWindow );
    1123             :             else
    1124             :             {
    1125           0 :                 newSharedWindow = candidate;
    1126           0 :                 newSharedWindow->setSharedContextWindow( candidate );
    1127             :             }
    1128             :         }
    1129             : 
    1130           0 :         LBASSERT( candidate->getSharedContextWindow() != window );
    1131             :     }
    1132             : 
    1133           1 :     const bool stopped = window->isStopped();
    1134             :     window->send( getServer(),
    1135           1 :                   fabric::CMD_WINDOW_CONFIG_EXIT_REPLY ) << stopped;
    1136             : 
    1137           1 :     Config* config = getConfig();
    1138           1 :     config->unmapObject( window );
    1139           1 :     Global::getNodeFactory()->releaseWindow( window );
    1140             : 
    1141           1 :     return true;
    1142             : }
    1143             : 
    1144           1 : bool Pipe::_cmdConfigInit( co::ICommand& cmd )
    1145             : {
    1146           1 :     LB_TS_THREAD( _pipeThread );
    1147             : 
    1148           1 :     co::ObjectICommand command( cmd );
    1149           1 :     const uint128_t& initID = command.read< uint128_t >();
    1150           1 :     const uint32_t frameNumber = command.read< uint32_t >();
    1151             : 
    1152           1 :     LBLOG( LOG_INIT ) << "Init pipe " << command << " init id " << initID
    1153           1 :                       << " frame " << frameNumber << std::endl;
    1154             : 
    1155           1 :     if( !isThreaded( ))
    1156             :     {
    1157           0 :         _impl->windowSystem = selectWindowSystem();
    1158           0 :         _setupCommandQueue();
    1159             :     }
    1160             : 
    1161           1 :     Node* node = getNode();
    1162           1 :     LBASSERT( node );
    1163           1 :     node->waitInitialized();
    1164             : 
    1165           1 :     bool result = false;
    1166           1 :     if( node->isRunning( ))
    1167             :     {
    1168           1 :         _impl->currentFrame  = frameNumber;
    1169           1 :         _impl->finishedFrame = frameNumber;
    1170           1 :         _impl->unlockedFrame = frameNumber;
    1171           1 :         _impl->state = STATE_INITIALIZING;
    1172             : 
    1173           1 :         result = configInit( initID );
    1174             : 
    1175           1 :         if( result )
    1176           0 :             _impl->state = STATE_RUNNING;
    1177             :     }
    1178             :     else
    1179           0 :         sendError( ERROR_PIPE_NODE_NOTRUNNING );
    1180             : 
    1181           1 :     LBLOG( LOG_INIT ) << "TASK pipe config init reply result " << result
    1182           1 :                       << std::endl;
    1183             : 
    1184           1 :     commit();
    1185             :     send( command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY )
    1186           1 :             << result;
    1187           1 :     return true;
    1188             : }
    1189             : 
    1190           1 : bool Pipe::_cmdConfigExit( co::ICommand& cmd )
    1191             : {
    1192           1 :     co::ObjectICommand command( cmd );
    1193             : 
    1194           1 :     LB_TS_THREAD( _pipeThread );
    1195           1 :     LBLOG( LOG_INIT ) << "TASK pipe config exit " << command << std::endl;
    1196             : 
    1197           1 :     _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
    1198             : 
    1199             :     // send before node gets a chance to send its destroy command
    1200           1 :     getNode()->send( getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE ) << getID();
    1201             : 
    1202             :     // Flush views before exit since they are created after init
    1203             :     // - application may need initialized pipe to exit
    1204             :     // - configExit can't access views since all channels are gone already
    1205           1 :     _flushViews();
    1206           1 :     _flushQueues();
    1207           1 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
    1208           1 :     return true;
    1209             : }
    1210             : 
    1211           1 : bool Pipe::_cmdExitThread( co::ICommand& )
    1212             : {
    1213           1 :     LBASSERT( _impl->thread );
    1214           1 :     _impl->thread->_pipe = 0;
    1215           1 :     return true;
    1216             : }
    1217             : 
    1218           0 : bool Pipe::_cmdExitTransferThread( co::ICommand& )
    1219             : {
    1220           0 :     _impl->transferThread.postStop();
    1221           0 :     return true;
    1222             : }
    1223             : 
    1224           0 : bool Pipe::_cmdFrameStartClock( co::ICommand& )
    1225             : {
    1226           0 :     LBVERB << "start frame clock" << std::endl;
    1227           0 :     _impl->frameTimeMutex.set();
    1228           0 :     _impl->frameTimes.push_back( getConfig()->getTime( ));
    1229           0 :     _impl->frameTimeMutex.unset();
    1230           0 :     return true;
    1231             : }
    1232             : 
    1233           0 : bool Pipe::_cmdFrameStart( co::ICommand& cmd )
    1234             : {
    1235           0 :     LB_TS_THREAD( _pipeThread );
    1236             : 
    1237           0 :     co::ObjectICommand command( cmd );
    1238           0 :     const uint128_t& version = command.read< uint128_t >();
    1239           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1240           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1241             : 
    1242           0 :     LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
    1243           0 :            << " id " << frameID << std::endl;
    1244             : 
    1245           0 :     LBLOG( LOG_TASKS ) << "---- TASK start frame ---- frame " << frameNumber
    1246           0 :                        << " id " << frameID << std::endl;
    1247           0 :     sync( version );
    1248           0 :     const int64_t lastFrameTime = _impl->frameTime;
    1249             : 
    1250           0 :     _impl->frameTimeMutex.set();
    1251           0 :     LBASSERT( !_impl->frameTimes.empty( ));
    1252             : 
    1253           0 :     _impl->frameTime = _impl->frameTimes.front();
    1254           0 :     _impl->frameTimes.pop_front();
    1255           0 :     _impl->frameTimeMutex.unset();
    1256             : 
    1257           0 :     if( lastFrameTime > 0 )
    1258             :     {
    1259           0 :         PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
    1260             :         waitEvent.event.data.statistic.idleTime =
    1261           0 :             _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime() :0;
    1262             :         waitEvent.event.data.statistic.totalTime =
    1263           0 :             LB_MAX( _impl->frameTime - lastFrameTime, 1 ); // avoid SIGFPE
    1264             :     }
    1265             : 
    1266           0 :     LBASSERTINFO( _impl->currentFrame + 1 == frameNumber,
    1267             :                   "current " <<_impl->currentFrame << " start " << frameNumber);
    1268             : 
    1269           0 :     frameStart( frameID, frameNumber );
    1270           0 :     return true;
    1271             : }
    1272             : 
    1273           0 : bool Pipe::_cmdFrameFinish( co::ICommand& cmd )
    1274             : {
    1275           0 :     LB_TS_THREAD( _pipeThread );
    1276             : 
    1277           0 :     co::ObjectICommand command( cmd );
    1278           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1279           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1280             : 
    1281           0 :     LBLOG( LOG_TASKS ) << "---- TASK finish frame --- " << command << " frame "
    1282           0 :                        << frameNumber << " id " << frameID << std::endl;
    1283             : 
    1284           0 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
    1285             :                   "current " <<_impl->currentFrame << " finish " <<frameNumber);
    1286             : 
    1287           0 :     frameFinish( frameID, frameNumber );
    1288             : 
    1289           0 :     LBASSERTINFO( _impl->finishedFrame >= frameNumber,
    1290             :                   "Pipe::frameFinish() did not release frame " << frameNumber );
    1291             : 
    1292           0 :     if( _impl->unlockedFrame < frameNumber )
    1293             :     {
    1294           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
    1295           0 :                << std::endl << "    unlocked " << _impl->unlockedFrame.get()
    1296           0 :                << " done " << frameNumber << std::endl;
    1297           0 :         releaseFrameLocal( frameNumber );
    1298             :     }
    1299             : 
    1300           0 :     if( _impl->finishedFrame < frameNumber )
    1301             :     {
    1302           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
    1303           0 :                << std::endl;
    1304           0 :         releaseFrame( frameNumber );
    1305             :     }
    1306             : 
    1307           0 :     _releaseViews();
    1308             : 
    1309           0 :     const uint128_t version = commit();
    1310           0 :     if( version != co::VERSION_NONE )
    1311           0 :         send( command.getRemoteNode(), fabric::CMD_OBJECT_SYNC );
    1312           0 :     return true;
    1313             : }
    1314             : 
    1315           0 : bool Pipe::_cmdFrameDrawFinish( co::ICommand& cmd )
    1316             : {
    1317           0 :     LB_TS_THREAD( _pipeThread );
    1318             : 
    1319           0 :     co::ObjectICommand command( cmd );
    1320           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1321           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1322             : 
    1323           0 :     LBLOG( LOG_TASKS ) << "TASK draw finish " << getName()
    1324           0 :                        << " frame " << frameNumber << " id " << frameID
    1325           0 :                        << std::endl;
    1326             : 
    1327           0 :     frameDrawFinish( frameID, frameNumber );
    1328           0 :     return true;
    1329             : }
    1330             : 
    1331           0 : bool Pipe::_cmdDetachView( co::ICommand& cmd )
    1332             : {
    1333           0 :     co::ObjectICommand command( cmd );
    1334             : 
    1335           0 :     LB_TS_THREAD( _pipeThread );
    1336             : 
    1337           0 :     ViewHash::iterator i = _impl->views.find( command.read< uint128_t >( ));
    1338           0 :     if( i != _impl->views.end( ))
    1339             :     {
    1340           0 :         View* view = i->second;
    1341           0 :         _impl->views.erase( i );
    1342             : 
    1343           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
    1344           0 :         nodeFactory->releaseView( view );
    1345             :     }
    1346           0 :     return true;
    1347             : }
    1348             : 
    1349             : }
    1350             : 
    1351             : #include <eq/fabric/pipe.ipp>
    1352             : template class eq::fabric::Pipe< eq::Node, eq::Pipe, eq::Window,
    1353             :                                  eq::PipeVisitor >;
    1354             : 
    1355             : /** @cond IGNORE */
    1356             : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
    1357          42 :                                                 const eq::Super& );
    1358             : /** @endcond */

Generated by: LCOV version 1.11