LCOV - code coverage report
Current view: top level - eq - pipe.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 246 584 42.1 %
Date: 2016-07-30 05:04:55 Functions: 50 94 53.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2015, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "pipe.h"
      21             : 
      22             : // must be included before any header defining Bool
      23             : #ifdef EQ_QT_USED
      24             : #  include "qt/window.h"
      25             : #  include <QThread>
      26             : #endif
      27             : 
      28             : #include "channel.h"
      29             : #include "client.h"
      30             : #include "config.h"
      31             : #include "exception.h"
      32             : #include "frame.h"
      33             : #include "frameData.h"
      34             : #include "global.h"
      35             : #include "log.h"
      36             : #include "node.h"
      37             : #include "nodeFactory.h"
      38             : #include "pipeStatistics.h"
      39             : #include "server.h"
      40             : #include "view.h"
      41             : #include "window.h"
      42             : 
      43             : #include "messagePump.h"
      44             : #include "systemPipe.h"
      45             : 
      46             : #include "computeContext.h"
      47             : #ifdef EQUALIZER_USE_CUDA
      48             : #  include "cudaContext.h"
      49             : #endif
      50             : 
      51             : #include <eq/fabric/commands.h>
      52             : #include <eq/fabric/elementVisitor.h>
      53             : #include <eq/fabric/leafVisitor.h>
      54             : #include <eq/fabric/task.h>
      55             : 
      56             : #include <co/global.h>
      57             : #include <co/objectICommand.h>
      58             : #include <co/queueSlave.h>
      59             : #include <co/worker.h>
      60             : #include <boost/lexical_cast.hpp>
      61             : #include <sstream>
      62             : 
      63             : #ifdef EQUALIZER_USE_HWLOC_GL
      64             : #  include <hwloc.h>
      65             : #  include <hwloc/gl.h>
      66             : #endif
      67             : 
      68             : #ifdef EQUALIZER_USE_QT5WIDGETS
      69             : #  include <QGuiApplication>
      70             : #  include <QRegularExpression>
      71             : #endif
      72             : 
      73             : namespace eq
      74             : {
      75             : /** @cond IGNORE */
      76             : typedef fabric::Pipe< Node, Pipe, Window, PipeVisitor > Super;
      77             : typedef co::CommandFunc<Pipe> PipeFunc;
      78             : /** @endcond */
      79             : 
      80             : namespace
      81             : {
      82             : enum State
      83             : {
      84             :     STATE_MAPPED,
      85             :     STATE_INITIALIZING,
      86             :     STATE_RUNNING,
      87             :     STATE_STOPPING, // must come after running
      88             :     STATE_STOPPED, // must come after running
      89             :     STATE_FAILED
      90             : };
      91             : 
      92             : typedef stde::hash_map< uint128_t, Frame* > FrameHash;
      93             : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
      94             : typedef stde::hash_map< uint128_t, View* > ViewHash;
      95             : typedef stde::hash_map< uint128_t, co::QueueSlave* > QueueHash;
      96             : typedef FrameHash::const_iterator FrameHashCIter;
      97             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      98             : typedef ViewHash::const_iterator ViewHashCIter;
      99             : typedef ViewHash::iterator ViewHashIter;
     100             : typedef QueueHash::const_iterator QueueHashCIter;
     101             : }
     102             : 
     103             : namespace detail
     104             : {
     105             : 
     106           2 : class RenderThread : public eq::Worker
     107             : {
     108             : public:
     109           1 :     explicit RenderThread( eq::Pipe* pipe )
     110             :         : eq::Worker( co::Global::getCommandQueueLimit( ))
     111           1 :         , _pipe( pipe )
     112           1 :     {}
     113             : 
     114             : protected:
     115           1 :     bool init() override
     116             :     {
     117           2 :         setName( std::string( "Draw" ) +
     118           2 :               boost::lexical_cast< std::string >( _pipe->getPath().pipeIndex ));
     119           1 :         return true;
     120             :     }
     121             : 
     122             :     void run() override;
     123          20 :     bool stopRunning() override { return !_pipe; }
     124             : 
     125             : private:
     126             :     eq::Pipe* _pipe;
     127             :     friend class eq::Pipe;
     128             : };
     129             : 
     130             : 
     131             : /** Asynchronous, per-pipe readback thread. */
     132           1 : class TransferThread : public co::Worker
     133             : {
     134             : public:
     135          91 :     explicit TransferThread( const uint32_t index )
     136             :         : co::Worker( co::Global::getCommandQueueLimit( ))
     137             :         , _index( index )
     138             :         , _qThread( nullptr )
     139          91 :         , _stop( false )
     140          91 :     {}
     141             : 
     142           0 :     bool init() override
     143             :     {
     144           0 :         if( !co::Worker::init( ))
     145           0 :             return false;
     146           0 :         setName( std::string( "Tfer" ) +
     147           0 :                  boost::lexical_cast< std::string >( _index ));
     148             : #ifdef EQ_QT_USED
     149           0 :         _qThread = QThread::currentThread();
     150             : #endif
     151           0 :         return true;
     152             :     }
     153             : 
     154           0 :     bool stopRunning() override { return _stop; }
     155           0 :     void postStop() { _stop = true; }
     156             : 
     157           0 :     QThread* getQThread() { return _qThread; }
     158             : 
     159             : private:
     160             :     uint32_t _index;
     161             :     QThread* _qThread;
     162             :     bool _stop; // thread will exit if this is true
     163             : };
     164             : 
     165             : class Pipe
     166             : {
     167             : public:
     168          91 :     explicit Pipe( const uint32_t index )
     169             :         : systemPipe( 0 )
     170             : #ifdef AGL
     171             :         , windowSystem( "AGL" )
     172             : #elif GLX
     173             :         , windowSystem( "GLX" )
     174             : #elif WGL
     175             :         , windowSystem( "WGL" )
     176             : #elif EQ_QT_USED
     177             :         , windowSystem( "Qt" )
     178             : #endif
     179             :         , state( STATE_STOPPED )
     180             :         , currentFrame( 0 )
     181             :         , frameTime( 0 )
     182             :         , thread( 0 )
     183             :         , transferThread( index )
     184          91 :         , computeContext( 0 )
     185          91 :     {}
     186             : 
     187           1 :     ~Pipe()
     188           1 :     {
     189           1 :         delete thread;
     190           1 :         thread = 0;
     191           1 :     }
     192             : 
     193             :     /** Window-system specific functions class */
     194             :     SystemPipe* systemPipe;
     195             : 
     196             :     /** The current window system. */
     197             :     WindowSystem windowSystem;
     198             : 
     199             :     /** The configInit/configExit state. */
     200             :     lunchbox::Monitor< State > state;
     201             : 
     202             :     /** The last started frame. */
     203             :     uint32_t currentFrame;
     204             : 
     205             :     /** The number of the last finished frame. */
     206             :     lunchbox::Monitor< uint32_t > finishedFrame;
     207             : 
     208             :     /** The number of the last locally unlocked frame. */
     209             :     lunchbox::Monitor< uint32_t > unlockedFrame;
     210             : 
     211             :     /** The running per-frame statistic clocks. */
     212             :     std::deque< int64_t > frameTimes;
     213             :     lunchbox::Lock frameTimeMutex;
     214             : 
     215             :     /** The base time for the currently active frame. */
     216             :     int64_t frameTime;
     217             : 
     218             :     /** All assembly frames used by the pipe during rendering. */
     219             :     FrameHash frames;
     220             : 
     221             :     /** All output frame datas used by the pipe during rendering. */
     222             :     FrameDataHash outputFrameDatas;
     223             : 
     224             :     /** All input frame datas used by the pipe during rendering. */
     225             :     FrameDataHash inputFrameDatas;
     226             : 
     227             :     /** All views used by the pipe's channels during rendering. */
     228             :     ViewHash views;
     229             : 
     230             :     /** All queues used by the pipe's channels during rendering. */
     231             :     QueueHash queues;
     232             : 
     233             :     /** The pipe thread. */
     234             :     RenderThread* thread;
     235             : 
     236             :     detail::TransferThread transferThread;
     237             : 
     238             :     /** GPU Computing context */
     239             :     ComputeContext *computeContext;
     240             : };
     241             : 
     242           1 : void RenderThread::run()
     243             : {
     244           1 :     LB_TS_THREAD( _pipe->_pipeThread );
     245           1 :     LBDEBUG << "Entered pipe thread" << std::endl;
     246             : 
     247           1 :     eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
     248           1 :     pipe->_impl->state.waitEQ( STATE_MAPPED );
     249           1 :     pipe->_impl->windowSystem = pipe->selectWindowSystem();
     250           1 :     pipe->_setupCommandQueue();
     251           1 :     pipe->_setupAffinity();
     252             : 
     253           1 :     Worker::run();
     254             : 
     255           1 :     pipe->_exitCommandQueue();
     256           1 : }
     257             : }
     258             : 
     259          91 : Pipe::Pipe( Node* parent )
     260             :         : Super( parent )
     261          91 :         , _impl( new detail::Pipe( getPath().pipeIndex ))
     262             : {
     263          91 : }
     264             : 
     265           3 : Pipe::~Pipe()
     266             : {
     267           1 :     LBASSERT( getWindows().empty( ));
     268           1 :     delete _impl;
     269           2 : }
     270             : 
     271           9 : Config* Pipe::getConfig()
     272             : {
     273           9 :     Node* node = getNode();
     274           9 :     LBASSERT( node );
     275           9 :     return ( node ? node->getConfig() : 0);
     276             : }
     277             : 
     278           0 : const Config* Pipe::getConfig() const
     279             : {
     280           0 :     const Node* node = getNode();
     281           0 :     LBASSERT( node );
     282           0 :     return ( node ? node->getConfig() : 0);
     283             : }
     284             : 
     285           2 : ClientPtr Pipe::getClient()
     286             : {
     287           2 :     Node* node = getNode();
     288           2 :     LBASSERT( node );
     289           2 :     return ( node ? node->getClient() : 0);
     290             : }
     291             : 
     292           4 : ServerPtr Pipe::getServer()
     293             : {
     294           4 :     Node* node = getNode();
     295           4 :     LBASSERT( node );
     296           4 :     return ( node ? node->getServer() : 0);
     297             : }
     298             : 
     299           1 : void Pipe::attach( const uint128_t& id, const uint32_t instanceID )
     300             : {
     301           1 :     Super::attach( id, instanceID );
     302             : 
     303           1 :     co::CommandQueue* queue = getPipeThreadQueue();
     304           1 :     co::CommandQueue* transferQ = getTransferThreadQueue();
     305             : 
     306             :     registerCommand( fabric::CMD_PIPE_CONFIG_INIT,
     307           1 :                      PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
     308             :     registerCommand( fabric::CMD_PIPE_CONFIG_EXIT,
     309           1 :                      PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
     310             :     registerCommand( fabric::CMD_PIPE_CREATE_WINDOW,
     311           1 :                      PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
     312             :     registerCommand( fabric::CMD_PIPE_DESTROY_WINDOW,
     313           1 :                      PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
     314             :     registerCommand( fabric::CMD_PIPE_FRAME_START,
     315           1 :                      PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
     316             :     registerCommand( fabric::CMD_PIPE_FRAME_FINISH,
     317           1 :                      PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
     318             :     registerCommand( fabric::CMD_PIPE_FRAME_DRAW_FINISH,
     319           1 :                      PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
     320             :     registerCommand( fabric::CMD_PIPE_FRAME_START_CLOCK,
     321           1 :                      PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
     322             :     registerCommand( fabric::CMD_PIPE_EXIT_THREAD,
     323           1 :                      PipeFunc( this, &Pipe::_cmdExitThread ), queue );
     324             :     registerCommand( fabric::CMD_PIPE_DETACH_VIEW,
     325           1 :                      PipeFunc( this, &Pipe::_cmdDetachView ), queue );
     326             :     registerCommand( fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
     327             :                      PipeFunc( this, &Pipe::_cmdExitTransferThread ),
     328           1 :                      transferQ );
     329           1 : }
     330             : 
     331        1084 : void Pipe::setDirty( const uint64_t bits )
     332             : {
     333             :     // jump over fabric setDirty to avoid dirty'ing node pipes list
     334             :     // pipes are individually synced in frame finish for thread-safety
     335        1084 :     Object::setDirty( bits );
     336        1084 : }
     337             : 
     338           0 : bool Pipe::isWindowSystemAvailable( const std::string& name ) const
     339             : {
     340           0 :     bool available = false;
     341           0 :     if( name != "Qt" )
     342             :     {
     343             : #ifdef AGL
     344             :         available = name == "AGL";
     345             : #elif GLX
     346           0 :         available = name == "GLX";
     347             : #elif WGL
     348             :         available = name == "WGL";
     349             : #endif
     350             :     }
     351             : 
     352             : #ifdef EQ_QT_USED
     353           0 :     if( name != "Qt" )
     354           0 :         return available;
     355             : 
     356             :     // Qt can only use ports that refer to QScreens available to the
     357             :     // QApplication. In Windows and Mac all physical displays are
     358             :     // queriable using platform dependent functions (no idea about virtual
     359             :     // displays like Xming or XQuartz), so we will assume that if Equalizer
     360             :     // was built with Qt support, then all devices can be used by Qt.
     361             :     // (How to choose the right screen for a given device it's a different
     362             :     // story)
     363             : #  ifdef AGL
     364             :     available = true;
     365             : #  elif WGL
     366             :     available = true;
     367             : #  else
     368             :     // For X it's different. Qt can only use screens that are part of the
     369             :     // display server referred by the DISPLAY environmental variable (in
     370             :     // particular its value at the moment of the creation of the QApplication)
     371             : #    ifdef __APPLE__
     372             :     // In MAC this is simpler, because there can only be one display server.
     373             :     return true;
     374             : #    else
     375             :     // There's no way to infer the X server number from the Qt API except
     376             :     // QScreen names, but only under certain conditions. In regular desktop
     377             :     // usage QScreen::name gives a name related to what xrandr prints, so it's
     378             :     // not usable. The names seems to follow the X naming convention
     379             :     // (e.g. host:0.0) when XRANDR is not available (e.g. Xnest) or doesn't
     380             :     // provide information about where the displays are connected (e.g.
     381             :     // headless or Xvnc). These are corner cases so we cannot rely on QScreen.
     382             :     // Therefore, we will infer which port is Qt using directly from DISPLAY.
     383             : 
     384             :     // Is the port is undefined that means that the default server must
     385             :     // be used. This should match the Qt display server.
     386           0 :     if( getPort() == LB_UNDEFINED_UINT32 )
     387           0 :         return true;
     388             : 
     389             :     QGuiApplication* app =
     390           0 :         dynamic_cast< QGuiApplication* >( QCoreApplication::instance( ));
     391           0 :     if( !app || !app->primaryScreen() )
     392           0 :         return false; // Qt won't be able to access anything anyway
     393             : 
     394           0 :     QRegularExpression regex( "^[a-z]*\\:([0-9]+)(\\.[0-9]+)?$" );
     395             :     QRegularExpressionMatch match =
     396           0 :         regex.match( getenv( "DISPLAY" ) ? getenv( "DISPLAY" ) : "" );
     397           0 :     available = match.captured( 1 ) == QString::number( getPort(), 10 );
     398             : #    endif
     399             : #  endif
     400             : #endif
     401             : 
     402           0 :     return available;
     403             : }
     404             : 
     405           1 : WindowSystem Pipe::selectWindowSystem() const
     406             : {
     407             : #ifdef AGL
     408             :     return WindowSystem( "AGL" );
     409             : #elif GLX
     410           1 :     return WindowSystem( "GLX" );
     411             : #elif WGL
     412             :     return WindowSystem( "WGL" );
     413             : #elif EQ_QT_USED
     414             :     if( !isWindowSystemAvailable( "Qt" ))
     415             :     {
     416             :         // Throwing because there's no reasonable alternative.
     417             :         std::stringstream msg;
     418             :         msg << "Cannot choose windowing system for pipe at port " << getPort()
     419             :             << ". Qt was set as the default, but it cannot be used. In X"
     420             :                " based systems this means that the value of DISPLAY taken by"
     421             :                " Qt refers to a different display server." << std::endl;
     422             :         LBTHROW( std::runtime_error( msg.str( )));
     423             :     }
     424             :     return WindowSystem( "Qt" );
     425             : #endif
     426             : }
     427             : 
     428           1 : void Pipe::_setupCommandQueue()
     429             : {
     430           3 :     LBDEBUG << "Set up pipe message pump for " << _impl->windowSystem
     431           3 :            << std::endl;
     432             : 
     433           1 :     Config* config = getConfig();
     434           1 :     config->setupMessagePump( this );
     435             : 
     436           1 :     if( !_impl->thread ) // Non-threaded pipes have no pipe thread message pump
     437           1 :         return;
     438             : 
     439           1 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     440           1 :     LBASSERT( queue );
     441           1 :     LBASSERT( !queue->getMessagePump( ));
     442             : 
     443           1 :     Global::enterCarbon();
     444           1 :     MessagePump* pump = createMessagePump();
     445           1 :     if( pump )
     446           1 :         pump->dispatchAll(); // initializes _impl->receiverQueue
     447             : 
     448           1 :     queue->setMessagePump( pump );
     449           1 :     Global::leaveCarbon();
     450             : }
     451             : 
     452           1 : int32_t Pipe::_getAutoAffinity() const
     453             : {
     454             : #ifdef EQUALIZER_USE_HWLOC_GL
     455             :     uint32_t port = getPort();
     456             :     uint32_t device = getDevice();
     457             : 
     458             :     if( port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32 )
     459             :         return lunchbox::Thread::NONE;
     460             : 
     461             :     if( port == LB_UNDEFINED_UINT32 )
     462             :         port = 0;
     463             :     if( device == LB_UNDEFINED_UINT32 )
     464             :         device = 0;
     465             : 
     466             :     hwloc_topology_t topology;
     467             :     if( hwloc_topology_init( &topology ) < 0 )
     468             :     {
     469             :         LBINFO << "Automatic pipe thread placement failed: "
     470             :                << "hwloc_topology_init() failed" << std::endl;
     471             :         return lunchbox::Thread::NONE;
     472             :     }
     473             : 
     474             :     // Load I/O devices, bridges and their relevant info
     475             :     const unsigned long loading_flags = HWLOC_TOPOLOGY_FLAG_IO_BRIDGES |
     476             :                                         HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
     477             :     if( hwloc_topology_set_flags( topology, loading_flags ) < 0 )
     478             :     {
     479             :         LBINFO << "Automatic pipe thread placement failed: "
     480             :                << "hwloc_topology_set_flags() failed" << std::endl;
     481             :         hwloc_topology_destroy( topology );
     482             :         return lunchbox::Thread::NONE;
     483             :     }
     484             : 
     485             :     if( hwloc_topology_load( topology ) < 0 )
     486             :     {
     487             :         LBINFO << "Automatic pipe thread placement failed: "
     488             :                << "hwloc_topology_load() failed" << std::endl;
     489             :         hwloc_topology_destroy( topology );
     490             :         return lunchbox::Thread::NONE;
     491             :     }
     492             : 
     493             :     const hwloc_obj_t osdev =
     494             :         hwloc_gl_get_display_osdev_by_port_device( topology,
     495             :                                                    int( port ), int( device ));
     496             :     if( !osdev )
     497             :     {
     498             :         LBINFO << "Automatic pipe thread placement failed: GPU not found"
     499             :                << std::endl;
     500             :         hwloc_topology_destroy( topology );
     501             :         return lunchbox::Thread::NONE;
     502             :     }
     503             : 
     504             :     const hwloc_obj_t pcidev = osdev->parent;
     505             :     const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj( topology, pcidev);
     506             :     const int numCpus =
     507             :         hwloc_get_nbobjs_inside_cpuset_by_type( topology, parent->cpuset,
     508             :                                                 HWLOC_OBJ_SOCKET );
     509             :     if( numCpus != 1 )
     510             :     {
     511             :         LBINFO << "Automatic pipe thread placement failed: GPU attached to "
     512             :                << numCpus << " processors?" << std::endl;
     513             :         hwloc_topology_destroy( topology );
     514             :         return lunchbox::Thread::NONE;
     515             :     }
     516             : 
     517             :     const hwloc_obj_t cpuObj =
     518             :         hwloc_get_obj_inside_cpuset_by_type( topology, parent->cpuset,
     519             :                                              HWLOC_OBJ_SOCKET, 0 );
     520             :     if( cpuObj == 0 )
     521             :     {
     522             :         LBINFO << "Automatic pipe thread placement failed: "
     523             :                << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
     524             :         hwloc_topology_destroy( topology );
     525             :         return lunchbox::Thread::NONE;
     526             :     }
     527             : 
     528             :     const int cpuIndex = cpuObj->logical_index;
     529             :     hwloc_topology_destroy( topology );
     530             :     return cpuIndex + lunchbox::Thread::SOCKET;
     531             : #else
     532           3 :     LBDEBUG << "Automatic thread placement not supported, no hwloc GL support"
     533           3 :             << std::endl;
     534             : #endif
     535           1 :     return lunchbox::Thread::NONE;
     536             : }
     537             : 
     538           1 : void Pipe::_setupAffinity()
     539             : {
     540           1 :     const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
     541           1 :     switch( affinity )
     542             :     {
     543             :         case AUTO:
     544           1 :             lunchbox::Thread::setAffinity( _getAutoAffinity( ));
     545           1 :             break;
     546             : 
     547             :         case OFF:
     548             :         default:
     549           0 :             lunchbox::Thread::setAffinity( affinity );
     550           0 :             break;
     551             :     }
     552           1 : }
     553             : 
     554           1 : void Pipe::_exitCommandQueue()
     555             : {
     556             :     // Non-threaded pipes have no pipe thread message pump
     557           1 :     if( !_impl->thread )
     558           1 :         return;
     559             : 
     560           1 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     561           1 :     LBASSERT( queue );
     562             : 
     563           1 :     MessagePump* pump = queue->getMessagePump();
     564           1 :     queue->setMessagePump( 0 );
     565           1 :     delete pump;
     566             : }
     567             : 
     568           1 : MessagePump* Pipe::createMessagePump()
     569             : {
     570           1 :     return _impl->windowSystem.createMessagePump();
     571             : }
     572             : 
     573           0 : MessagePump* Pipe::getMessagePump()
     574             : {
     575           0 :     LB_TS_THREAD( _pipeThread );
     576           0 :     if( !_impl->thread )
     577           0 :         return 0;
     578             : 
     579           0 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     580           0 :     return queue->getMessagePump();
     581             : }
     582             : 
     583           3 : co::CommandQueue* Pipe::getPipeThreadQueue()
     584             : {
     585           3 :     if( _impl->thread )
     586           3 :         return _impl->thread->getWorkerQueue();
     587             : 
     588           0 :     return getNode()->getMainThreadQueue();
     589             : }
     590             : 
     591           2 : co::CommandQueue* Pipe::getTransferThreadQueue()
     592             : {
     593           2 :     return _impl->transferThread.getWorkerQueue();
     594             : }
     595             : 
     596           1 : co::CommandQueue* Pipe::getMainThreadQueue()
     597             : {
     598           1 :     return getServer()->getMainThreadQueue();
     599             : }
     600             : 
     601           1 : co::CommandQueue* Pipe::getCommandThreadQueue()
     602             : {
     603           1 :     return getServer()->getCommandThreadQueue();
     604             : }
     605             : 
     606           0 : Frame* Pipe::getFrame( const co::ObjectVersion& frameVersion, const Eye eye,
     607             :                        const bool isOutput )
     608             : {
     609           0 :     LB_TS_THREAD( _pipeThread );
     610           0 :     Frame* frame = _impl->frames[ frameVersion.identifier ];
     611             : 
     612           0 :     if( !frame )
     613             :     {
     614           0 :         ClientPtr client = getClient();
     615           0 :         frame = new Frame();
     616             : 
     617           0 :         LBCHECK( client->mapObject( frame, frameVersion ));
     618           0 :         _impl->frames[ frameVersion.identifier ] = frame;
     619             :     }
     620             :     else
     621           0 :         frame->sync( frameVersion.version );
     622             : 
     623           0 :     const co::ObjectVersion& dataVersion = frame->getDataVersion( eye );
     624           0 :     LBLOG( LOG_ASSEMBLY ) << "Use " << dataVersion << std::endl;
     625             : 
     626           0 :     FrameDataPtr frameData = getNode()->getFrameData( dataVersion );
     627           0 :     LBASSERT( frameData );
     628             : 
     629           0 :     if( isOutput )
     630             :     {
     631           0 :         if( !frameData->isAttached() )
     632             :         {
     633           0 :             ClientPtr client = getClient();
     634           0 :             LBCHECK( client->mapObject( frameData.get(), dataVersion ));
     635             :         }
     636           0 :         else if( frameData->getVersion() < dataVersion.version )
     637           0 :             frameData->sync( dataVersion.version );
     638             : 
     639           0 :         _impl->outputFrameDatas[ dataVersion.identifier ] = frameData;
     640             :     }
     641             :     else
     642           0 :         _impl->inputFrameDatas[ dataVersion.identifier ] = frameData;
     643             : 
     644           0 :     frame->setFrameData( frameData );
     645           0 :     return frame;
     646             : }
     647             : 
     648           0 : void Pipe::flushFrames( util::ObjectManager& om )
     649             : {
     650           0 :     LB_TS_THREAD( _pipeThread );
     651           0 :     ClientPtr client = getClient();
     652           0 :     for( FrameHashCIter i = _impl->frames.begin(); i !=_impl->frames.end(); ++i)
     653             :     {
     654           0 :         Frame* frame = i->second;
     655           0 :         frame->setFrameData( 0 ); // datas are flushed below
     656           0 :         client->unmapObject( frame );
     657           0 :         delete frame;
     658             :     }
     659           0 :     _impl->frames.clear();
     660             : 
     661           0 :     for( FrameDataHashCIter i = _impl->inputFrameDatas.begin();
     662           0 :          i != _impl->inputFrameDatas.end(); ++i )
     663             :     {
     664           0 :         FrameDataPtr data = i->second;
     665           0 :         data->deleteGLObjects( om );
     666           0 :     }
     667           0 :     _impl->inputFrameDatas.clear();
     668             : 
     669           0 :     for( FrameDataHashCIter i = _impl->outputFrameDatas.begin();
     670           0 :          i != _impl->outputFrameDatas.end(); ++i )
     671             :     {
     672           0 :         FrameDataPtr data = i->second;
     673           0 :         data->resetPlugins();
     674           0 :         data->deleteGLObjects( om );
     675           0 :         client->unmapObject( data.get( ));
     676           0 :         getNode()->releaseFrameData( data );
     677           0 :     }
     678           0 :     _impl->outputFrameDatas.clear();
     679           0 : }
     680             : 
     681           0 : co::QueueSlave* Pipe::getQueue( const uint128_t& queueID )
     682             : {
     683           0 :     LB_TS_THREAD( _pipeThread );
     684           0 :     if( queueID == 0 )
     685           0 :         return 0;
     686             : 
     687           0 :     co::QueueSlave* queue = _impl->queues[ queueID ];
     688           0 :     if( !queue )
     689             :     {
     690           0 :         queue = new co::QueueSlave;
     691           0 :         ClientPtr client = getClient();
     692           0 :         LBCHECK( client->mapObject( queue, queueID ));
     693             : 
     694           0 :         _impl->queues[ queueID ] = queue;
     695             :     }
     696             : 
     697           0 :     return queue;
     698             : }
     699             : 
     700           1 : void Pipe::_flushQueues()
     701             : {
     702           1 :     LB_TS_THREAD( _pipeThread );
     703           1 :     ClientPtr client = getClient();
     704             : 
     705           1 :     for( QueueHashCIter i = _impl->queues.begin(); i !=_impl->queues.end(); ++i)
     706             :     {
     707           0 :         co::QueueSlave* queue = i->second;
     708           0 :         client->unmapObject( queue );
     709           0 :         delete queue;
     710             :     }
     711           1 :     _impl->queues.clear();
     712           1 : }
     713             : 
     714           0 : const View* Pipe::getView( const co::ObjectVersion& viewVersion ) const
     715             : {
     716             :     // Yie-ha: we want to have a const-interface to get a view on the render
     717             :     //         clients, but view mapping is by definition non-const.
     718           0 :     return const_cast< Pipe* >( this )->getView( viewVersion );
     719             : }
     720             : 
     721           0 : View* Pipe::getView( const co::ObjectVersion& viewVersion )
     722             : {
     723           0 :     LB_TS_THREAD( _pipeThread );
     724           0 :     if( viewVersion.identifier == 0 )
     725           0 :         return 0;
     726             : 
     727           0 :     View* view = _impl->views[ viewVersion.identifier ];
     728           0 :     if( !view )
     729             :     {
     730           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
     731           0 :         view = nodeFactory->createView( 0 );
     732           0 :         LBASSERT( view );
     733           0 :         view->_pipe = this;
     734           0 :         ClientPtr client = getClient();
     735           0 :         LBCHECK( client->mapObject( view, viewVersion ));
     736             : 
     737           0 :         _impl->views[ viewVersion.identifier ] = view;
     738             :     }
     739             : 
     740           0 :     view->sync( viewVersion.version );
     741           0 :     return view;
     742             : }
     743             : 
     744           0 : void Pipe::_releaseViews()
     745             : {
     746           0 :     LB_TS_THREAD( _pipeThread );
     747           0 :     for( bool changed = true; changed; )
     748             :     {
     749           0 :         changed = false;
     750           0 :         for( ViewHashIter i =_impl->views.begin(); i !=_impl->views.end(); ++i )
     751             :         {
     752           0 :             View* view = i->second;
     753           0 :             view->commit();
     754           0 :             if( view->getVersion() + 20 > view->getHeadVersion( ))
     755           0 :                 continue;
     756             : 
     757             :             // release unused view to avoid memory leaks due to deltas piling up
     758           0 :             view->_pipe = 0;
     759             : 
     760           0 :             ClientPtr client = getClient();
     761           0 :             client->unmapObject( view );
     762           0 :             _impl->views.erase( i );
     763             : 
     764           0 :             NodeFactory* nodeFactory = Global::getNodeFactory();
     765           0 :             nodeFactory->releaseView( view );
     766             : 
     767           0 :             changed = true;
     768           0 :             break;
     769           0 :         }
     770             :     }
     771           0 : }
     772             : 
     773           1 : void Pipe::_flushViews()
     774             : {
     775           1 :     LB_TS_THREAD( _pipeThread );
     776           1 :     NodeFactory* nodeFactory = Global::getNodeFactory();
     777           1 :     ClientPtr client = getClient();
     778             : 
     779           1 :     for( ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i )
     780             :     {
     781           0 :         View* view = i->second;
     782             : 
     783           0 :         client->unmapObject( view );
     784           0 :         view->_pipe = 0;
     785           0 :         nodeFactory->releaseView( view );
     786             :     }
     787           1 :     _impl->views.clear();
     788           1 : }
     789             : 
     790           1 : void Pipe::startThread()
     791             : {
     792           1 :     _impl->thread = new detail::RenderThread( this );
     793           1 :     _impl->thread->start();
     794           1 : }
     795             : 
     796           1 : void Pipe::exitThread()
     797             : {
     798           1 :     _stopTransferThread();
     799             : 
     800           1 :     if( !_impl->thread )
     801           1 :         return;
     802             : 
     803           1 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD );
     804             : 
     805           1 :     _impl->thread->join();
     806           1 :     delete _impl->thread;
     807           1 :     _impl->thread = 0;
     808             : }
     809             : 
     810          90 : void Pipe::cancelThread()
     811             : {
     812          90 :     _stopTransferThread();
     813             : 
     814          90 :     if( !_impl->thread )
     815         180 :         return;
     816             : 
     817             :     // local command dispatching
     818             :     co::ObjectOCommand( this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
     819           0 :                         co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL );
     820             : }
     821             : 
     822           0 : void Pipe::waitExited() const
     823             : {
     824           0 :     _impl->state.waitGE( STATE_STOPPED );
     825           0 : }
     826             : 
     827           1 : bool Pipe::isRunning() const
     828             : {
     829           1 :     return (_impl->state == STATE_RUNNING);
     830             : }
     831             : 
     832           1 : bool Pipe::isStopped() const
     833             : {
     834           1 :     return (_impl->state == STATE_STOPPED);
     835             : }
     836             : 
     837           1 : void Pipe::notifyMapped()
     838             : {
     839           1 :     LBASSERT( _impl->state == STATE_STOPPED );
     840           1 :     _impl->state = STATE_MAPPED;
     841           1 : }
     842             : 
     843             : namespace
     844             : {
     845           0 : class WaitFinishedVisitor : public PipeVisitor
     846             : {
     847             : public:
     848           0 :     WaitFinishedVisitor( const uint32_t frame, MessagePump* pump )
     849           0 :         : _frame( frame ), _pump( pump ) {}
     850             : 
     851           0 :     virtual VisitorResult visit( Channel* channel )
     852             :     {
     853           0 :         while( !channel->waitFrameFinished( _frame, 100 ))
     854             :         {
     855             :             // process potential pending Qt slots
     856           0 :             if( _pump )
     857           0 :                 _pump->dispatchAll();
     858             :         }
     859             : 
     860           0 :         return TRAVERSE_CONTINUE;
     861             :     }
     862             : 
     863             : private:
     864             :     const uint32_t _frame;
     865             :     MessagePump* _pump;
     866             : };
     867             : }
     868             : 
     869           0 : void Pipe::waitFrameFinished( const uint32_t frameNumber )
     870             : {
     871           0 :     MessagePump* pump = getConfig()->getMessagePump();
     872           0 :     while( !_impl->finishedFrame.timedWaitGE( frameNumber, 100 ))
     873             :     {
     874             :         // process potential pending Qt slots
     875           0 :         if( pump )
     876           0 :             pump->dispatchAll();
     877             :     }
     878             : 
     879           0 :     WaitFinishedVisitor waiter( frameNumber, pump );
     880           0 :     accept( waiter );
     881           0 : }
     882             : 
     883           0 : void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
     884             : {
     885           0 :     _impl->unlockedFrame.waitGE( frameNumber );
     886           0 : }
     887             : 
     888           0 : uint32_t Pipe::getCurrentFrame() const
     889             : {
     890           0 :     LB_TS_THREAD( _pipeThread );
     891           0 :     return _impl->currentFrame;
     892             : }
     893             : 
     894           0 : uint32_t Pipe::getFinishedFrame() const
     895             : {
     896           0 :     return _impl->finishedFrame.get();
     897             : }
     898             : 
     899           1 : WindowSystem Pipe::getWindowSystem() const
     900             : {
     901           1 :     return _impl->windowSystem;
     902             : }
     903             : 
     904           1 : EventOCommand Pipe::sendError( const uint32_t error )
     905             : {
     906           1 :     return getConfig()->sendError( Event::PIPE_ERROR, Error( error, getID( )));
     907             : }
     908             : 
     909           0 : bool Pipe::processEvent( const Event& event )
     910             : {
     911           0 :     ConfigEvent configEvent( event );
     912           0 :     getConfig()->sendEvent( configEvent );
     913           0 :     return true;
     914             : }
     915             : 
     916             : //---------------------------------------------------------------------------
     917             : // pipe-thread methods
     918             : //---------------------------------------------------------------------------
     919           1 : bool Pipe::configInit( const uint128_t& initID )
     920             : {
     921           1 :     LB_TS_THREAD( _pipeThread );
     922             : 
     923           1 :     LBASSERT( !_impl->systemPipe );
     924             : 
     925           1 :     if ( !configInitSystemPipe( initID ))
     926           1 :         return false;
     927             : 
     928             :     // -------------------------------------------------------------------------
     929           0 :     LBASSERT(!_impl->computeContext);
     930             : 
     931             :     // for now we only support CUDA
     932             : #ifdef EQUALIZER_USE_CUDA
     933             :     if( getIAttribute( IATTR_HINT_CUDA_GL_INTEROP ) == eq::ON )
     934             :     {
     935             :         LBDEBUG << "Initializing CUDAContext" << std::endl;
     936             :         ComputeContext* computeCtx = new CUDAContext( this );
     937             : 
     938             :         if( !computeCtx->configInit() )
     939             :         {
     940             :             LBWARN << "GPU Computing context initialization failed "
     941             :                    << std::endl;
     942             :             delete computeCtx;
     943             :             return false;
     944             :         }
     945             :         setComputeContext( computeCtx );
     946             :     }
     947             : #endif
     948             : 
     949           0 :     return true;
     950             : }
     951             : 
     952           1 : bool Pipe::configInitSystemPipe( const uint128_t& )
     953             : {
     954           1 :     SystemPipe* systemPipe = _impl->windowSystem.createPipe( this );
     955           1 :     LBASSERT( systemPipe );
     956             : 
     957           1 :     if( !systemPipe->configInit( ))
     958             :     {
     959           1 :         LBERROR << "System pipe context initialization failed" << std::endl;
     960           1 :         delete systemPipe;
     961           1 :         return false;
     962             :     }
     963             : 
     964           0 :     setSystemPipe( systemPipe );
     965           0 :     return true;
     966             : }
     967             : 
     968           1 : bool Pipe::configExit()
     969             : {
     970           1 :     LB_TS_THREAD( _pipeThread );
     971             : 
     972           1 :     if( _impl->computeContext )
     973             :     {
     974           0 :         _impl->computeContext->configExit();
     975           0 :         delete _impl->computeContext;
     976           0 :         _impl->computeContext = 0;
     977             :     }
     978             : 
     979           1 :     if( _impl->systemPipe )
     980             :     {
     981           0 :         _impl->systemPipe->configExit( );
     982           0 :         delete _impl->systemPipe;
     983           0 :         _impl->systemPipe = 0;
     984             :     }
     985           1 :     return true;
     986             : }
     987             : 
     988             : 
     989           0 : void Pipe::frameStart( const uint128_t&, const uint32_t frameNumber )
     990             : {
     991           0 :     LB_TS_THREAD( _pipeThread );
     992             : 
     993           0 :     const Node* node = getNode();
     994           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     995             :     {
     996             :         case ASYNC:      // No sync, release immediately
     997           0 :             releaseFrameLocal( frameNumber );
     998           0 :             break;
     999             : 
    1000             :         case DRAW_SYNC:  // Sync, release in frameDrawFinish
    1001             :         case LOCAL_SYNC: // Sync, release in frameFinish
    1002           0 :             node->waitFrameStarted( frameNumber );
    1003           0 :             break;
    1004             : 
    1005             :         default:
    1006           0 :             LBUNIMPLEMENTED;
    1007             :     }
    1008             : 
    1009           0 :     startFrame( frameNumber );
    1010           0 : }
    1011             : 
    1012           0 : void Pipe::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
    1013             : {
    1014           0 :     const Node* node = getNode();
    1015           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
    1016             :     {
    1017             :         case ASYNC:      // released in frameStart
    1018           0 :             break;
    1019             : 
    1020             :         case DRAW_SYNC:  // release
    1021           0 :             releaseFrameLocal( frameNumber );
    1022           0 :             break;
    1023             : 
    1024             :         case LOCAL_SYNC: // release in frameFinish
    1025           0 :             break;
    1026             : 
    1027             :         default:
    1028           0 :             LBUNIMPLEMENTED;
    1029             :     }
    1030           0 : }
    1031             : 
    1032           0 : void Pipe::frameFinish( const uint128_t&, const uint32_t frameNumber )
    1033             : {
    1034           0 :     const Node* node = getNode();
    1035           0 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
    1036             :     {
    1037             :         case ASYNC:      // released in frameStart
    1038           0 :             break;
    1039             : 
    1040             :         case DRAW_SYNC:  // released in frameDrawFinish
    1041           0 :             break;
    1042             : 
    1043             :         case LOCAL_SYNC: // release
    1044           0 :             releaseFrameLocal( frameNumber );
    1045           0 :             break;
    1046             : 
    1047             :         default:
    1048           0 :             LBUNIMPLEMENTED;
    1049             :     }
    1050             : 
    1051             :     // Global release
    1052           0 :     releaseFrame( frameNumber );
    1053           0 : }
    1054             : 
    1055           0 : void Pipe::startFrame( const uint32_t frameNumber )
    1056             : {
    1057           0 :     LB_TS_THREAD( _pipeThread );
    1058           0 :     _impl->currentFrame = frameNumber;
    1059           0 :     LBLOG( LOG_TASKS ) << "---- Started Frame ---- "<< frameNumber << std::endl;
    1060           0 : }
    1061             : 
    1062           0 : void Pipe::releaseFrame( const uint32_t frameNumber )
    1063             : {
    1064           0 :     LB_TS_THREAD( _pipeThread );
    1065           0 :     _impl->finishedFrame = frameNumber;
    1066           0 :     LBLOG( LOG_TASKS ) << "---- Finished Frame --- "<< frameNumber << std::endl;
    1067           0 : }
    1068             : 
    1069           0 : void Pipe::releaseFrameLocal( const uint32_t frameNumber )
    1070             : {
    1071           0 :     LB_TS_THREAD( _pipeThread );
    1072           0 :     LBASSERTINFO( _impl->unlockedFrame + 1 == frameNumber,
    1073             :                   _impl->unlockedFrame << ", " << frameNumber );
    1074             : 
    1075           0 :     _impl->unlockedFrame = frameNumber;
    1076           0 :     LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- "
    1077           0 :                        << _impl->unlockedFrame.get() << std::endl;
    1078           0 : }
    1079             : 
    1080           0 : bool Pipe::startTransferThread()
    1081             : {
    1082           0 :     if( _impl->transferThread.isRunning( ))
    1083           0 :         return true;
    1084             : 
    1085           0 :     return _impl->transferThread.start();
    1086             : }
    1087             : 
    1088           0 : QThread* Pipe::getTransferQThread()
    1089             : {
    1090           0 :     return _impl->transferThread.getQThread();
    1091             : }
    1092             : 
    1093           1 : bool Pipe::hasTransferThread() const
    1094             : {
    1095           1 :     return _impl->transferThread.isRunning();
    1096             : }
    1097             : 
    1098          91 : void Pipe::_stopTransferThread()
    1099             : {
    1100          91 :     if( _impl->transferThread.isStopped( ))
    1101         182 :         return;
    1102             : 
    1103           0 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD );
    1104           0 :     _impl->transferThread.join();
    1105             : }
    1106             : 
    1107           0 : void Pipe::setSystemPipe( SystemPipe* pipe )
    1108             : {
    1109           0 :     _impl->systemPipe = pipe;
    1110           0 : }
    1111             : 
    1112           0 : SystemPipe* Pipe::getSystemPipe()
    1113             : {
    1114           0 :     return _impl->systemPipe;
    1115             : }
    1116             : 
    1117           0 : const SystemPipe* Pipe::getSystemPipe() const
    1118             : {
    1119           0 :     return _impl->systemPipe;
    1120             : }
    1121             : 
    1122           0 : void Pipe::setComputeContext( ComputeContext* ctx )
    1123             : {
    1124           0 :     _impl->computeContext = ctx;
    1125           0 : }
    1126             : 
    1127           0 : const ComputeContext* Pipe::getComputeContext() const
    1128             : {
    1129           0 :     return _impl->computeContext;
    1130             : }
    1131             : 
    1132           0 : ComputeContext* Pipe::getComputeContext()
    1133             : {
    1134           0 :     return _impl->computeContext;
    1135             : }
    1136             : 
    1137             : //---------------------------------------------------------------------------
    1138             : // command handlers
    1139             : //---------------------------------------------------------------------------
    1140           1 : bool Pipe::_cmdCreateWindow( co::ICommand& cmd )
    1141             : {
    1142           1 :     co::ObjectICommand command( cmd );
    1143           1 :     const uint128_t& windowID = command.read< uint128_t >();
    1144             : 
    1145           1 :     LBLOG( LOG_INIT ) << "Create window " << command << " id " << windowID
    1146           1 :                       << std::endl;
    1147             : 
    1148           1 :     Window* window = Global::getNodeFactory()->createWindow( this );
    1149           1 :     window->init(); // not in ctor, virtual method
    1150             : 
    1151           1 :     Config* config = getConfig();
    1152           1 :     LBCHECK( config->mapObject( window, windowID ));
    1153             : 
    1154           1 :     return true;
    1155             : }
    1156             : 
    1157           1 : bool Pipe::_cmdDestroyWindow( co::ICommand& cmd )
    1158             : {
    1159           1 :     co::ObjectICommand command( cmd );
    1160             : 
    1161           1 :     LBLOG( LOG_INIT ) << "Destroy window " << command << std::endl;
    1162             : 
    1163           1 :     Window* window = _findWindow( command.read< uint128_t >( ));
    1164           1 :     LBASSERT( window );
    1165             : 
    1166             :     // re-set shared windows accordingly
    1167           1 :     Window* newSharedWindow = 0;
    1168           1 :     const Windows& windows = getWindows();
    1169           2 :     for( Windows::const_iterator i = windows.begin(); i != windows.end(); ++i )
    1170             :     {
    1171           1 :         Window* candidate = *i;
    1172             : 
    1173           1 :         if( candidate == window )
    1174           1 :             continue; // ignore
    1175             : 
    1176           0 :         if( candidate->getSharedContextWindow() == window )
    1177             :         {
    1178           0 :             if( newSharedWindow )
    1179           0 :                 candidate->setSharedContextWindow( newSharedWindow );
    1180             :             else
    1181             :             {
    1182           0 :                 newSharedWindow = candidate;
    1183           0 :                 newSharedWindow->setSharedContextWindow( candidate );
    1184             :             }
    1185             :         }
    1186             : 
    1187           0 :         LBASSERT( candidate->getSharedContextWindow() != window );
    1188             :     }
    1189             : 
    1190           1 :     const bool stopped = window->isStopped();
    1191             :     window->send( getServer(),
    1192           1 :                   fabric::CMD_WINDOW_CONFIG_EXIT_REPLY ) << stopped;
    1193             : 
    1194           1 :     Config* config = getConfig();
    1195           1 :     config->unmapObject( window );
    1196           1 :     Global::getNodeFactory()->releaseWindow( window );
    1197             : 
    1198           1 :     return true;
    1199             : }
    1200             : 
    1201           1 : bool Pipe::_cmdConfigInit( co::ICommand& cmd )
    1202             : {
    1203           1 :     LB_TS_THREAD( _pipeThread );
    1204             : 
    1205           1 :     co::ObjectICommand command( cmd );
    1206           1 :     const uint128_t& initID = command.read< uint128_t >();
    1207           1 :     const uint32_t frameNumber = command.read< uint32_t >();
    1208             : 
    1209           1 :     LBLOG( LOG_INIT ) << "Init pipe " << command << " init id " << initID
    1210           1 :                       << " frame " << frameNumber << std::endl;
    1211             : 
    1212           1 :     if( !isThreaded( ))
    1213             :     {
    1214           0 :         _impl->windowSystem = selectWindowSystem();
    1215           0 :         _setupCommandQueue();
    1216             :     }
    1217             : 
    1218           1 :     Node* node = getNode();
    1219           1 :     LBASSERT( node );
    1220           1 :     node->waitInitialized();
    1221             : 
    1222           1 :     bool result = false;
    1223           1 :     if( node->isRunning( ))
    1224             :     {
    1225           1 :         _impl->currentFrame  = frameNumber;
    1226           1 :         _impl->finishedFrame = frameNumber;
    1227           1 :         _impl->unlockedFrame = frameNumber;
    1228           1 :         _impl->state = STATE_INITIALIZING;
    1229             : 
    1230           1 :         result = configInit( initID );
    1231             : 
    1232           1 :         if( result )
    1233           0 :             _impl->state = STATE_RUNNING;
    1234             :     }
    1235             :     else
    1236           0 :         sendError( ERROR_PIPE_NODE_NOTRUNNING );
    1237             : 
    1238           1 :     LBLOG( LOG_INIT ) << "TASK pipe config init reply result " << result
    1239           1 :                       << std::endl;
    1240             : 
    1241           1 :     commit();
    1242             :     send( command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY )
    1243           1 :             << result;
    1244           1 :     return true;
    1245             : }
    1246             : 
    1247           1 : bool Pipe::_cmdConfigExit( co::ICommand& cmd )
    1248             : {
    1249           1 :     co::ObjectICommand command( cmd );
    1250             : 
    1251           1 :     LB_TS_THREAD( _pipeThread );
    1252           1 :     LBLOG( LOG_INIT ) << "TASK pipe config exit " << command << std::endl;
    1253             : 
    1254           1 :     _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
    1255             : 
    1256             :     // send before node gets a chance to send its destroy command
    1257           1 :     getNode()->send( getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE ) << getID();
    1258             : 
    1259             :     // Flush views before exit since they are created after init
    1260             :     // - application may need initialized pipe to exit
    1261             :     // - configExit can't access views since all channels are gone already
    1262           1 :     _flushViews();
    1263           1 :     _flushQueues();
    1264           1 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
    1265           1 :     return true;
    1266             : }
    1267             : 
    1268           1 : bool Pipe::_cmdExitThread( co::ICommand& )
    1269             : {
    1270           1 :     LBASSERT( _impl->thread );
    1271           1 :     _impl->thread->_pipe = 0;
    1272           1 :     return true;
    1273             : }
    1274             : 
    1275           0 : bool Pipe::_cmdExitTransferThread( co::ICommand& )
    1276             : {
    1277           0 :     _impl->transferThread.postStop();
    1278           0 :     return true;
    1279             : }
    1280             : 
    1281           0 : bool Pipe::_cmdFrameStartClock( co::ICommand& )
    1282             : {
    1283           0 :     LBVERB << "start frame clock" << std::endl;
    1284           0 :     _impl->frameTimeMutex.set();
    1285           0 :     _impl->frameTimes.push_back( getConfig()->getTime( ));
    1286           0 :     _impl->frameTimeMutex.unset();
    1287           0 :     return true;
    1288             : }
    1289             : 
    1290           0 : bool Pipe::_cmdFrameStart( co::ICommand& cmd )
    1291             : {
    1292           0 :     LB_TS_THREAD( _pipeThread );
    1293             : 
    1294           0 :     co::ObjectICommand command( cmd );
    1295           0 :     const uint128_t& version = command.read< uint128_t >();
    1296           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1297           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1298             : 
    1299           0 :     LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
    1300           0 :            << " id " << frameID << std::endl;
    1301             : 
    1302           0 :     LBLOG( LOG_TASKS ) << "---- TASK start frame ---- frame " << frameNumber
    1303           0 :                        << " id " << frameID << std::endl;
    1304           0 :     sync( version );
    1305           0 :     const int64_t lastFrameTime = _impl->frameTime;
    1306             : 
    1307           0 :     _impl->frameTimeMutex.set();
    1308           0 :     LBASSERT( !_impl->frameTimes.empty( ));
    1309             : 
    1310           0 :     _impl->frameTime = _impl->frameTimes.front();
    1311           0 :     _impl->frameTimes.pop_front();
    1312           0 :     _impl->frameTimeMutex.unset();
    1313             : 
    1314           0 :     if( lastFrameTime > 0 )
    1315             :     {
    1316           0 :         PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
    1317             :         waitEvent.event.data.statistic.idleTime =
    1318           0 :             _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime() :0;
    1319             :         waitEvent.event.data.statistic.totalTime =
    1320           0 :             LB_MAX( _impl->frameTime - lastFrameTime, 1 ); // avoid SIGFPE
    1321             :     }
    1322             : 
    1323           0 :     LBASSERTINFO( _impl->currentFrame + 1 == frameNumber,
    1324             :                   "current " <<_impl->currentFrame << " start " << frameNumber);
    1325             : 
    1326           0 :     frameStart( frameID, frameNumber );
    1327           0 :     return true;
    1328             : }
    1329             : 
    1330           0 : bool Pipe::_cmdFrameFinish( co::ICommand& cmd )
    1331             : {
    1332           0 :     LB_TS_THREAD( _pipeThread );
    1333             : 
    1334           0 :     co::ObjectICommand command( cmd );
    1335           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1336           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1337             : 
    1338           0 :     LBLOG( LOG_TASKS ) << "---- TASK finish frame --- " << command << " frame "
    1339           0 :                        << frameNumber << " id " << frameID << std::endl;
    1340             : 
    1341           0 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
    1342             :                   "current " <<_impl->currentFrame << " finish " <<frameNumber);
    1343             : 
    1344           0 :     frameFinish( frameID, frameNumber );
    1345             : 
    1346           0 :     LBASSERTINFO( _impl->finishedFrame >= frameNumber,
    1347             :                   "Pipe::frameFinish() did not release frame " << frameNumber );
    1348             : 
    1349           0 :     if( _impl->unlockedFrame < frameNumber )
    1350             :     {
    1351           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
    1352           0 :                << std::endl << "    unlocked " << _impl->unlockedFrame.get()
    1353           0 :                << " done " << frameNumber << std::endl;
    1354           0 :         releaseFrameLocal( frameNumber );
    1355             :     }
    1356             : 
    1357           0 :     if( _impl->finishedFrame < frameNumber )
    1358             :     {
    1359           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
    1360           0 :                << std::endl;
    1361           0 :         releaseFrame( frameNumber );
    1362             :     }
    1363             : 
    1364           0 :     _releaseViews();
    1365             : 
    1366           0 :     const uint128_t version = commit();
    1367           0 :     if( version != co::VERSION_NONE )
    1368           0 :         send( command.getRemoteNode(), fabric::CMD_OBJECT_SYNC );
    1369           0 :     return true;
    1370             : }
    1371             : 
    1372           0 : bool Pipe::_cmdFrameDrawFinish( co::ICommand& cmd )
    1373             : {
    1374           0 :     LB_TS_THREAD( _pipeThread );
    1375             : 
    1376           0 :     co::ObjectICommand command( cmd );
    1377           0 :     const uint128_t& frameID = command.read< uint128_t >();
    1378           0 :     const uint32_t frameNumber = command.read< uint32_t >();
    1379             : 
    1380           0 :     LBLOG( LOG_TASKS ) << "TASK draw finish " << getName()
    1381           0 :                        << " frame " << frameNumber << " id " << frameID
    1382           0 :                        << std::endl;
    1383             : 
    1384           0 :     frameDrawFinish( frameID, frameNumber );
    1385           0 :     return true;
    1386             : }
    1387             : 
    1388           0 : bool Pipe::_cmdDetachView( co::ICommand& cmd )
    1389             : {
    1390           0 :     co::ObjectICommand command( cmd );
    1391             : 
    1392           0 :     LB_TS_THREAD( _pipeThread );
    1393             : 
    1394           0 :     ViewHash::iterator i = _impl->views.find( command.read< uint128_t >( ));
    1395           0 :     if( i != _impl->views.end( ))
    1396             :     {
    1397           0 :         View* view = i->second;
    1398           0 :         _impl->views.erase( i );
    1399             : 
    1400           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
    1401           0 :         nodeFactory->releaseView( view );
    1402             :     }
    1403           0 :     return true;
    1404             : }
    1405             : 
    1406             : }
    1407             : 
    1408             : #include <eq/fabric/pipe.ipp>
    1409             : template class eq::fabric::Pipe< eq::Node, eq::Pipe, eq::Window,
    1410             :                                  eq::PipeVisitor >;
    1411             : 
    1412             : /** @cond IGNORE */
    1413             : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
    1414          42 :                                                 const eq::Super& );
    1415             : /** @endcond */

Generated by: LCOV version 1.11