LCOV - code coverage report
Current view: top level - eq/client - pipe.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 464 559 83.0 %
Date: 2014-06-18 Functions: 80 91 87.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *               2009-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "pipe.h"
      21             : 
      22             : #include "channel.h"
      23             : #include "client.h"
      24             : #include "config.h"
      25             : #include "exception.h"
      26             : #include "frame.h"
      27             : #include "frameData.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "node.h"
      31             : #include "nodeFactory.h"
      32             : #include "pipeStatistics.h"
      33             : #include "server.h"
      34             : #include "view.h"
      35             : #include "window.h"
      36             : 
      37             : #include "messagePump.h"
      38             : #include "systemPipe.h"
      39             : 
      40             : #include "computeContext.h"
      41             : #ifdef EQUALIZER_USE_CUDA
      42             : #  include "cudaContext.h"
      43             : #endif
      44             : 
      45             : #include <eq/fabric/commands.h>
      46             : #include <eq/fabric/elementVisitor.h>
      47             : #include <eq/fabric/leafVisitor.h>
      48             : #include <eq/fabric/task.h>
      49             : 
      50             : #include <co/global.h>
      51             : #include <co/objectICommand.h>
      52             : #include <co/queueSlave.h>
      53             : #include <co/worker.h>
      54             : #include <boost/lexical_cast.hpp>
      55             : #include <sstream>
      56             : 
      57             : #ifdef EQUALIZER_USE_HWLOC_GL
      58             : #  include <hwloc.h>
      59             : #  include <hwloc/gl.h>
      60             : #endif
      61             : 
      62             : namespace eq
      63             : {
      64             : /** @cond IGNORE */
      65             : typedef fabric::Pipe< Node, Pipe, Window, PipeVisitor > Super;
      66             : typedef co::CommandFunc<Pipe> PipeFunc;
      67             : /** @endcond */
      68             : 
      69             : namespace
      70             : {
      71             : enum State
      72             : {
      73             :     STATE_MAPPED,
      74             :     STATE_INITIALIZING,
      75             :     STATE_RUNNING,
      76             :     STATE_STOPPING, // must come after running
      77             :     STATE_STOPPED, // must come after running
      78             :     STATE_FAILED
      79             : };
      80             : 
      81             : typedef stde::hash_map< uint128_t, Frame* > FrameHash;
      82             : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
      83             : typedef stde::hash_map< uint128_t, View* > ViewHash;
      84             : typedef stde::hash_map< uint128_t, co::QueueSlave* > QueueHash;
      85             : typedef FrameHash::const_iterator FrameHashCIter;
      86             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      87             : typedef ViewHash::const_iterator ViewHashCIter;
      88             : typedef ViewHash::iterator ViewHashIter;
      89             : typedef QueueHash::const_iterator QueueHashCIter;
      90             : }
      91             : 
      92             : namespace detail
      93             : {
      94             : 
      95          30 : class RenderThread : public eq::Worker
      96             : {
      97             : public:
      98          15 :     explicit RenderThread( eq::Pipe* pipe )
      99             :         : eq::Worker( co::Global::getCommandQueueLimit( ))
     100          15 :         , _pipe( pipe )
     101          15 :     {}
     102             : 
     103             : protected:
     104          15 :     bool init() override
     105             :     {
     106          30 :         setName( std::string( "Draw" ) +
     107          30 :               boost::lexical_cast< std::string >( _pipe->getPath().pipeIndex ));
     108          15 :         return true;
     109             :     }
     110             : 
     111             :     void run() override;
     112         383 :     bool stopRunning() override { return !_pipe; }
     113             : 
     114             : private:
     115             :     eq::Pipe* _pipe;
     116             :     friend class eq::Pipe;
     117             : };
     118             : 
     119             : 
     120             : /** Asynchronous, per-pipe readback thread. */
     121          15 : class TransferThread : public co::Worker
     122             : {
     123             : public:
     124         105 :     explicit TransferThread( const uint32_t index )
     125             :         : co::Worker( co::Global::getCommandQueueLimit( ))
     126             :         , _index( index )
     127         105 :         , _stop( false )
     128         105 :     {}
     129             : 
     130           2 :     bool init() override
     131             :     {
     132           2 :         if( !co::Worker::init( ))
     133           0 :             return false;
     134           4 :         setName( std::string( "Tfer" ) +
     135           2 :                  boost::lexical_cast< std::string >( _index ));
     136           2 :         return true;
     137             :     }
     138             : 
     139          17 :     bool stopRunning() override { return _stop; }
     140           2 :     void postStop() { _stop = true; }
     141             : 
     142             : private:
     143             :     uint32_t _index;
     144             :     bool _stop; // thread will exit if this is true
     145             : };
     146             : 
     147             : class Pipe
     148             : {
     149             : public:
     150         105 :     explicit Pipe( const uint32_t index )
     151             :         : systemPipe( 0 )
     152             :         , state( STATE_STOPPED )
     153             :         , currentFrame( 0 )
     154             :         , frameTime( 0 )
     155             :         , thread( 0 )
     156             :         , transferThread( index )
     157         105 :         , computeContext( 0 )
     158         105 :     {}
     159             : 
     160          15 :     ~Pipe()
     161          15 :     {
     162          15 :         delete thread;
     163          15 :         thread = 0;
     164          15 :     }
     165             : 
     166             :     /** Window-system specific functions class */
     167             :     SystemPipe* systemPipe;
     168             : 
     169             :     /** The current window system. */
     170             :     WindowSystem windowSystem;
     171             : 
     172             :     /** The configInit/configExit state. */
     173             :     lunchbox::Monitor< State > state;
     174             : 
     175             :     /** The last started frame. */
     176             :     uint32_t currentFrame;
     177             : 
     178             :     /** The number of the last finished frame. */
     179             :     lunchbox::Monitor< uint32_t > finishedFrame;
     180             : 
     181             :     /** The number of the last locally unlocked frame. */
     182             :     lunchbox::Monitor<uint32_t> unlockedFrame;
     183             : 
     184             :     /** The running per-frame statistic clocks. */
     185             :     std::deque< int64_t > frameTimes;
     186             :     lunchbox::Lock frameTimeMutex;
     187             : 
     188             :     /** The base time for the currently active frame. */
     189             :     int64_t frameTime;
     190             : 
     191             :     /** All assembly frames used by the pipe during rendering. */
     192             :     FrameHash frames;
     193             : 
     194             :     /** All output frame datas used by the pipe during rendering. */
     195             :     FrameDataHash outputFrameDatas;
     196             : 
     197             :     /** All input frame datas used by the pipe during rendering. */
     198             :     FrameDataHash inputFrameDatas;
     199             : 
     200             :     /** All views used by the pipe's channels during rendering. */
     201             :     ViewHash views;
     202             : 
     203             :     /** All queues used by the pipe's channels during rendering. */
     204             :     QueueHash queues;
     205             : 
     206             :     /** The pipe thread. */
     207             :     RenderThread* thread;
     208             : 
     209             :     detail::TransferThread transferThread;
     210             : 
     211             :     /** GPU Computing context */
     212             :     ComputeContext *computeContext;
     213             : };
     214             : 
     215          15 : void RenderThread::run()
     216             : {
     217          15 :     LB_TS_THREAD( _pipe->_pipeThread );
     218          15 :     LBINFO << "Entered pipe thread" << std::endl;
     219             : 
     220          15 :     eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
     221          15 :     pipe->_impl->state.waitEQ( STATE_MAPPED );
     222          15 :     pipe->_impl->windowSystem = pipe->selectWindowSystem();
     223          15 :     pipe->_setupCommandQueue();
     224          15 :     pipe->_setupAffinity();
     225             : 
     226          15 :     Worker::run();
     227             : 
     228          15 :     pipe->_exitCommandQueue();
     229          15 : }
     230             : }
     231             : 
     232         105 : Pipe::Pipe( Node* parent )
     233             :         : Super( parent )
     234         105 :         , _impl( new detail::Pipe( getPath().pipeIndex ))
     235             : {
     236         105 : }
     237             : 
     238          40 : Pipe::~Pipe()
     239             : {
     240          15 :     LBASSERT( getWindows().empty( ));
     241          15 :     delete _impl;
     242          25 : }
     243             : 
     244         252 : Config* Pipe::getConfig()
     245             : {
     246         252 :     Node* node = getNode();
     247         252 :     LBASSERT( node );
     248         252 :     return ( node ? node->getConfig() : 0);
     249             : }
     250             : 
     251           0 : const Config* Pipe::getConfig() const
     252             : {
     253           0 :     const Node* node = getNode();
     254           0 :     LBASSERT( node );
     255           0 :     return ( node ? node->getConfig() : 0);
     256             : }
     257             : 
     258          52 : ClientPtr Pipe::getClient()
     259             : {
     260          52 :     Node* node = getNode();
     261          52 :     LBASSERT( node );
     262          52 :     return ( node ? node->getClient() : 0);
     263             : }
     264             : 
     265          72 : ServerPtr Pipe::getServer()
     266             : {
     267          72 :     Node* node = getNode();
     268          72 :     LBASSERT( node );
     269          72 :     return ( node ? node->getServer() : 0);
     270             : }
     271             : 
     272          15 : void Pipe::attach( const uint128_t& id, const uint32_t instanceID )
     273             : {
     274          15 :     Super::attach( id, instanceID );
     275             : 
     276          15 :     co::CommandQueue* queue = getPipeThreadQueue();
     277          15 :     co::CommandQueue* transferQ = getTransferThreadQueue();
     278             : 
     279             :     registerCommand( fabric::CMD_PIPE_CONFIG_INIT,
     280          15 :                      PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
     281             :     registerCommand( fabric::CMD_PIPE_CONFIG_EXIT,
     282          15 :                      PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
     283             :     registerCommand( fabric::CMD_PIPE_CREATE_WINDOW,
     284          15 :                      PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
     285             :     registerCommand( fabric::CMD_PIPE_DESTROY_WINDOW,
     286          15 :                      PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
     287             :     registerCommand( fabric::CMD_PIPE_FRAME_START,
     288          15 :                      PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
     289             :     registerCommand( fabric::CMD_PIPE_FRAME_FINISH,
     290          15 :                      PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
     291             :     registerCommand( fabric::CMD_PIPE_FRAME_DRAW_FINISH,
     292          15 :                      PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
     293             :     registerCommand( fabric::CMD_PIPE_FRAME_START_CLOCK,
     294          15 :                      PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
     295             :     registerCommand( fabric::CMD_PIPE_EXIT_THREAD,
     296          15 :                      PipeFunc( this, &Pipe::_cmdExitThread ), queue );
     297             :     registerCommand( fabric::CMD_PIPE_DETACH_VIEW,
     298          15 :                      PipeFunc( this, &Pipe::_cmdDetachView ), queue );
     299             :     registerCommand( fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
     300             :                      PipeFunc( this, &Pipe::_cmdExitTransferThread ),
     301          15 :                      transferQ );
     302          15 : }
     303             : 
     304        1255 : void Pipe::setDirty( const uint64_t bits )
     305             : {
     306             :     // jump over fabric setDirty to avoid dirty'ing node pipes list
     307             :     // pipes are individually synced in frame finish for thread-safety
     308        1255 :     Object::setDirty( bits );
     309        1255 : }
     310             : 
     311          15 : WindowSystem Pipe::selectWindowSystem() const
     312             : {
     313             : #ifdef AGL
     314             :     return WindowSystem( "AGL" ); // prefer over GLX
     315             : #else
     316          15 :     return WindowSystem();
     317             : #endif
     318             : }
     319             : 
     320          15 : void Pipe::_setupCommandQueue()
     321             : {
     322          45 :     LBINFO << "Set up pipe message pump for " << _impl->windowSystem
     323          45 :            << std::endl;
     324             : 
     325          15 :     Config* config = getConfig();
     326          15 :     config->setupMessagePump( this );
     327             : 
     328          15 :     if( !_impl->thread ) // Non-threaded pipes have no pipe thread message pump
     329          15 :         return;
     330             : 
     331          15 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     332          15 :     LBASSERT( queue );
     333          15 :     LBASSERT( !queue->getMessagePump( ));
     334             : 
     335          15 :     Global::enterCarbon();
     336          15 :     MessagePump* pump = createMessagePump();
     337          15 :     if( pump )
     338          15 :         pump->dispatchAll(); // initializes _impl->receiverQueue
     339             : 
     340          15 :     queue->setMessagePump( pump );
     341          15 :     Global::leaveCarbon();
     342             : }
     343             : 
     344          15 : int32_t Pipe::_getAutoAffinity() const
     345             : {
     346             : #ifdef EQUALIZER_USE_HWLOC_GL
     347             :     uint32_t port = getPort();
     348             :     uint32_t device = getDevice();
     349             : 
     350             :     if( port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32 )
     351             :         return lunchbox::Thread::NONE;
     352             : 
     353             :     if( port == LB_UNDEFINED_UINT32 )
     354             :         port = 0;
     355             :     if( device == LB_UNDEFINED_UINT32 )
     356             :         device = 0;
     357             : 
     358             :     hwloc_topology_t topology;
     359             :     if( hwloc_topology_init( &topology ) < 0 )
     360             :     {
     361             :         LBINFO << "Automatic pipe thread placement failed: "
     362             :                << "hwloc_topology_init() failed" << std::endl;
     363             :         return lunchbox::Thread::NONE;
     364             :     }
     365             : 
     366             :     // Load I/O devices, bridges and their relevant info
     367             :     const unsigned long loading_flags = HWLOC_TOPOLOGY_FLAG_IO_BRIDGES |
     368             :                                         HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
     369             :     if( hwloc_topology_set_flags( topology, loading_flags ) < 0 )
     370             :     {
     371             :         LBINFO << "Automatic pipe thread placement failed: "
     372             :                << "hwloc_topology_set_flags() failed" << std::endl;
     373             :         hwloc_topology_destroy( topology );
     374             :         return lunchbox::Thread::NONE;
     375             :     }
     376             : 
     377             :     if( hwloc_topology_load( topology ) < 0 )
     378             :     {
     379             :         LBINFO << "Automatic pipe thread placement failed: "
     380             :                << "hwloc_topology_load() failed" << std::endl;
     381             :         hwloc_topology_destroy( topology );
     382             :         return lunchbox::Thread::NONE;
     383             :     }
     384             : 
     385             :     const hwloc_obj_t osdev =
     386             :         hwloc_gl_get_display_osdev_by_port_device( topology,
     387             :                                                    int( port ), int( device ));
     388             :     if( !osdev )
     389             :     {
     390             :         LBINFO << "Automatic pipe thread placement failed: GPU not found"
     391             :                << std::endl;
     392             :         hwloc_topology_destroy( topology );
     393             :         return lunchbox::Thread::NONE;
     394             :     }
     395             : 
     396             :     const hwloc_obj_t pcidev = osdev->parent;
     397             :     const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj( topology, pcidev);
     398             :     const int numCpus =
     399             :         hwloc_get_nbobjs_inside_cpuset_by_type( topology, parent->cpuset,
     400             :                                                 HWLOC_OBJ_SOCKET );
     401             :     if( numCpus != 1 )
     402             :     {
     403             :         LBINFO << "Automatic pipe thread placement failed: GPU attached to "
     404             :                << numCpus << " processors?" << std::endl;
     405             :         hwloc_topology_destroy( topology );
     406             :         return lunchbox::Thread::NONE;
     407             :     }
     408             : 
     409             :     const hwloc_obj_t cpuObj =
     410             :         hwloc_get_obj_inside_cpuset_by_type( topology, parent->cpuset,
     411             :                                              HWLOC_OBJ_SOCKET, 0 );
     412             :     if( cpuObj == 0 )
     413             :     {
     414             :         LBINFO << "Automatic pipe thread placement failed: "
     415             :                << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
     416             :         hwloc_topology_destroy( topology );
     417             :         return lunchbox::Thread::NONE;
     418             :     }
     419             : 
     420             :     const int cpuIndex = cpuObj->logical_index;
     421             :     hwloc_topology_destroy( topology );
     422             :     return cpuIndex + lunchbox::Thread::SOCKET;
     423             : #else
     424          45 :     LBINFO << "Automatic thread placement not supported, no hwloc GL support"
     425          45 :            << std::endl;
     426             : #endif
     427          15 :     return lunchbox::Thread::NONE;
     428             : }
     429             : 
     430          15 : void Pipe::_setupAffinity()
     431             : {
     432          15 :     const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
     433          15 :     switch( affinity )
     434             :     {
     435             :         case AUTO:
     436          15 :             lunchbox::Thread::setAffinity( _getAutoAffinity( ));
     437          15 :             break;
     438             : 
     439             :         case OFF:
     440             :         default:
     441           0 :             lunchbox::Thread::setAffinity( affinity );
     442           0 :             break;
     443             :     }
     444          15 : }
     445             : 
     446          15 : void Pipe::_exitCommandQueue()
     447             : {
     448             :     // Non-threaded pipes have no pipe thread message pump
     449          15 :     if( !_impl->thread )
     450          15 :         return;
     451             : 
     452          15 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     453          15 :     LBASSERT( queue );
     454             : 
     455          15 :     MessagePump* pump = queue->getMessagePump();
     456          15 :     queue->setMessagePump( 0 );
     457          15 :     delete pump;
     458             : }
     459             : 
     460          15 : MessagePump* Pipe::createMessagePump()
     461             : {
     462          15 :     return _impl->windowSystem.createMessagePump();
     463             : }
     464             : 
     465          15 : MessagePump* Pipe::getMessagePump()
     466             : {
     467          15 :     LB_TS_THREAD( _pipeThread );
     468          15 :     if( !_impl->thread )
     469           0 :         return 0;
     470             : 
     471          15 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     472          15 :     return queue->getMessagePump();
     473             : }
     474             : 
     475          45 : co::CommandQueue* Pipe::getPipeThreadQueue()
     476             : {
     477          45 :     if( _impl->thread )
     478          45 :         return _impl->thread->getWorkerQueue();
     479             : 
     480           0 :     return getNode()->getMainThreadQueue();
     481             : }
     482             : 
     483          30 : co::CommandQueue* Pipe::getTransferThreadQueue()
     484             : {
     485          30 :     return _impl->transferThread.getWorkerQueue();
     486             : }
     487             : 
     488          15 : co::CommandQueue* Pipe::getMainThreadQueue()
     489             : {
     490          15 :     return getServer()->getMainThreadQueue();
     491             : }
     492             : 
     493          15 : co::CommandQueue* Pipe::getCommandThreadQueue()
     494             : {
     495          15 :     return getServer()->getCommandThreadQueue();
     496             : }
     497             : 
     498           4 : Frame* Pipe::getFrame( const co::ObjectVersion& frameVersion, const Eye eye,
     499             :                        const bool isOutput )
     500             : {
     501           4 :     LB_TS_THREAD( _pipeThread );
     502           4 :     Frame* frame = _impl->frames[ frameVersion.identifier ];
     503             : 
     504           4 :     if( !frame )
     505             :     {
     506           4 :         ClientPtr client = getClient();
     507           4 :         frame = new Frame();
     508             : 
     509           4 :         LBCHECK( client->mapObject( frame, frameVersion ));
     510           4 :         _impl->frames[ frameVersion.identifier ] = frame;
     511             :     }
     512             :     else
     513           0 :         frame->sync( frameVersion.version );
     514             : 
     515           4 :     const co::ObjectVersion& dataVersion = frame->getDataVersion( eye );
     516           4 :     LBLOG( LOG_ASSEMBLY ) << "Use " << dataVersion << std::endl;
     517             : 
     518           4 :     FrameDataPtr frameData = getNode()->getFrameData( dataVersion );
     519           4 :     LBASSERT( frameData );
     520             : 
     521           4 :     if( isOutput )
     522             :     {
     523           2 :         if( !frameData->isAttached() )
     524             :         {
     525           2 :             ClientPtr client = getClient();
     526           2 :             LBCHECK( client->mapObject( frameData.get(), dataVersion ));
     527             :         }
     528           0 :         else if( frameData->getVersion() < dataVersion.version )
     529           0 :             frameData->sync( dataVersion.version );
     530             : 
     531           2 :         _impl->outputFrameDatas[ dataVersion.identifier ] = frameData;
     532             :     }
     533             :     else
     534           2 :         _impl->inputFrameDatas[ dataVersion.identifier ] = frameData;
     535             : 
     536           4 :     frame->setFrameData( frameData );
     537           4 :     return frame;
     538             : }
     539             : 
     540          15 : void Pipe::flushFrames( util::ObjectManager& om )
     541             : {
     542          15 :     LB_TS_THREAD( _pipeThread );
     543          15 :     ClientPtr client = getClient();
     544          19 :     for( FrameHashCIter i = _impl->frames.begin(); i !=_impl->frames.end(); ++i)
     545             :     {
     546           4 :         Frame* frame = i->second;
     547           4 :         frame->setFrameData( 0 ); // datas are flushed below
     548           4 :         client->unmapObject( frame );
     549           4 :         delete frame;
     550             :     }
     551          15 :     _impl->frames.clear();
     552             : 
     553          51 :     for( FrameDataHashCIter i = _impl->inputFrameDatas.begin();
     554          34 :          i != _impl->inputFrameDatas.end(); ++i )
     555             :     {
     556           2 :         FrameDataPtr data = i->second;
     557           2 :         data->deleteGLObjects( om );
     558           2 :     }
     559          15 :     _impl->inputFrameDatas.clear();
     560             : 
     561          51 :     for( FrameDataHashCIter i = _impl->outputFrameDatas.begin();
     562          34 :          i != _impl->outputFrameDatas.end(); ++i )
     563             :     {
     564           2 :         FrameDataPtr data = i->second;
     565           2 :         data->resetPlugins();
     566           2 :         data->deleteGLObjects( om );
     567           2 :         client->unmapObject( data.get( ));
     568           2 :         getNode()->releaseFrameData( data );
     569           2 :     }
     570          15 :     _impl->outputFrameDatas.clear();
     571          15 : }
     572             : 
     573           0 : co::QueueSlave* Pipe::getQueue( const uint128_t& queueID )
     574             : {
     575           0 :     LB_TS_THREAD( _pipeThread );
     576           0 :     if( queueID == 0 )
     577           0 :         return 0;
     578             : 
     579           0 :     co::QueueSlave* queue = _impl->queues[ queueID ];
     580           0 :     if( !queue )
     581             :     {
     582           0 :         queue = new co::QueueSlave;
     583           0 :         ClientPtr client = getClient();
     584           0 :         LBCHECK( client->mapObject( queue, queueID ));
     585             : 
     586           0 :         _impl->queues[ queueID ] = queue;
     587             :     }
     588             : 
     589           0 :     return queue;
     590             : }
     591             : 
     592          15 : void Pipe::_flushQueues()
     593             : {
     594          15 :     LB_TS_THREAD( _pipeThread );
     595          15 :     ClientPtr client = getClient();
     596             : 
     597          15 :     for( QueueHashCIter i = _impl->queues.begin(); i !=_impl->queues.end(); ++i)
     598             :     {
     599           0 :         co::QueueSlave* queue = i->second;
     600           0 :         client->unmapObject( queue );
     601           0 :         delete queue;
     602             :     }
     603          15 :     _impl->queues.clear();
     604          15 : }
     605             : 
     606           1 : const View* Pipe::getView( const co::ObjectVersion& viewVersion ) const
     607             : {
     608             :     // Yie-ha: we want to have a const-interface to get a view on the render
     609             :     //         clients, but view mapping is by definition non-const.
     610           1 :     return const_cast< Pipe* >( this )->getView( viewVersion );
     611             : }
     612             : 
     613           1 : View* Pipe::getView( const co::ObjectVersion& viewVersion )
     614             : {
     615           1 :     LB_TS_THREAD( _pipeThread );
     616           1 :     if( viewVersion.identifier == 0 )
     617           0 :         return 0;
     618             : 
     619           1 :     View* view = _impl->views[ viewVersion.identifier ];
     620           1 :     if( !view )
     621             :     {
     622           1 :         NodeFactory* nodeFactory = Global::getNodeFactory();
     623           1 :         view = nodeFactory->createView( 0 );
     624           1 :         LBASSERT( view );
     625           1 :         view->_pipe = this;
     626           1 :         ClientPtr client = getClient();
     627           1 :         LBCHECK( client->mapObject( view, viewVersion ));
     628             : 
     629           1 :         _impl->views[ viewVersion.identifier ] = view;
     630             :     }
     631             : 
     632           1 :     view->sync( viewVersion.version );
     633           1 :     return view;
     634             : }
     635             : 
     636           7 : void Pipe::_releaseViews()
     637             : {
     638           7 :     LB_TS_THREAD( _pipeThread );
     639          21 :     for( bool changed = true; changed; )
     640             :     {
     641           7 :         changed = false;
     642          16 :         for( ViewHashIter i =_impl->views.begin(); i !=_impl->views.end(); ++i )
     643             :         {
     644           1 :             View* view = i->second;
     645           1 :             view->commit();
     646           1 :             if( view->getVersion() + 20 > view->getHeadVersion( ))
     647           1 :                 continue;
     648             : 
     649             :             // release unused view to avoid memory leaks due to deltas piling up
     650           0 :             view->_pipe = 0;
     651             : 
     652           0 :             ClientPtr client = getClient();
     653           0 :             client->unmapObject( view );
     654           0 :             _impl->views.erase( i );
     655             : 
     656           0 :             NodeFactory* nodeFactory = Global::getNodeFactory();
     657           0 :             nodeFactory->releaseView( view );
     658             : 
     659           0 :             changed = true;
     660           0 :             break;
     661           0 :         }
     662             :     }
     663           7 : }
     664             : 
     665          15 : void Pipe::_flushViews()
     666             : {
     667          15 :     LB_TS_THREAD( _pipeThread );
     668          15 :     NodeFactory* nodeFactory = Global::getNodeFactory();
     669          15 :     ClientPtr client = getClient();
     670             : 
     671          16 :     for( ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i )
     672             :     {
     673           1 :         View* view = i->second;
     674             : 
     675           1 :         client->unmapObject( view );
     676           1 :         view->_pipe = 0;
     677           1 :         nodeFactory->releaseView( view );
     678             :     }
     679          15 :     _impl->views.clear();
     680          15 : }
     681             : 
     682          15 : void Pipe::startThread()
     683             : {
     684          15 :     _impl->thread = new detail::RenderThread( this );
     685          15 :     _impl->thread->start();
     686          15 : }
     687             : 
     688          15 : void Pipe::exitThread()
     689             : {
     690          15 :     _stopTransferThread();
     691             : 
     692          15 :     if( !_impl->thread )
     693          15 :         return;
     694             : 
     695          15 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD );
     696             : 
     697          15 :     _impl->thread->join();
     698          15 :     delete _impl->thread;
     699          15 :     _impl->thread = 0;
     700             : }
     701             : 
     702          90 : void Pipe::cancelThread()
     703             : {
     704          90 :     _stopTransferThread();
     705             : 
     706          90 :     if( !_impl->thread )
     707         180 :         return;
     708             : 
     709             :     // local command dispatching
     710             :     co::ObjectOCommand( this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
     711           0 :                         co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL );
     712             : }
     713             : 
     714          15 : void Pipe::waitExited() const
     715             : {
     716          15 :     _impl->state.waitGE( STATE_STOPPED );
     717          15 : }
     718             : 
     719          31 : bool Pipe::isRunning() const
     720             : {
     721          31 :     return (_impl->state == STATE_RUNNING);
     722             : }
     723             : 
     724          15 : bool Pipe::isStopped() const
     725             : {
     726          15 :     return (_impl->state == STATE_STOPPED);
     727             : }
     728             : 
     729          15 : void Pipe::notifyMapped()
     730             : {
     731          15 :     LBASSERT( _impl->state == STATE_STOPPED );
     732          15 :     _impl->state = STATE_MAPPED;
     733          15 : }
     734             : 
     735             : namespace
     736             : {
     737           7 : class WaitFinishedVisitor : public PipeVisitor
     738             : {
     739             : public:
     740           7 :     WaitFinishedVisitor( const uint32_t frame ) : _frame( frame ) {}
     741             : 
     742           7 :     virtual VisitorResult visit( Channel* channel )
     743             :         {
     744           7 :             channel->waitFrameFinished( _frame );
     745           7 :             return TRAVERSE_CONTINUE;
     746             :         }
     747             : 
     748             : private:
     749             :     const uint32_t _frame;
     750             : };
     751             : }
     752             : 
     753           7 : void Pipe::waitFrameFinished( const uint32_t frameNumber ) const
     754             : {
     755           7 :     _impl->finishedFrame.waitGE( frameNumber );
     756           7 :     WaitFinishedVisitor waiter( frameNumber );
     757           7 :     accept( waiter );
     758           7 : }
     759             : 
     760          14 : void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
     761             : {
     762          14 :     _impl->unlockedFrame.waitGE( frameNumber );
     763          14 : }
     764             : 
     765          57 : uint32_t Pipe::getCurrentFrame() const
     766             : {
     767          57 :     LB_TS_THREAD( _pipeThread );
     768          57 :     return _impl->currentFrame;
     769             : }
     770             : 
     771           0 : uint32_t Pipe::getFinishedFrame() const
     772             : {
     773           0 :     return _impl->finishedFrame.get();
     774             : }
     775             : 
     776          32 : WindowSystem Pipe::getWindowSystem() const
     777             : {
     778          32 :     return _impl->windowSystem;
     779             : }
     780             : 
     781           0 : EventOCommand Pipe::sendError( const uint32_t error )
     782             : {
     783           0 :     return getConfig()->sendError( Event::PIPE_ERROR, getID(), error );
     784             : }
     785             : 
     786           2 : bool Pipe::processEvent( const Event& event )
     787             : {
     788           2 :     ConfigEvent configEvent( event );
     789           2 :     getConfig()->sendEvent( configEvent );
     790           2 :     return true;
     791             : }
     792             : 
     793             : //---------------------------------------------------------------------------
     794             : // pipe-thread methods
     795             : //---------------------------------------------------------------------------
     796          15 : bool Pipe::configInit( const uint128_t& initID )
     797             : {
     798          15 :     LB_TS_THREAD( _pipeThread );
     799             : 
     800          15 :     LBASSERT( !_impl->systemPipe );
     801             : 
     802          15 :     if ( !configInitSystemPipe( initID ))
     803           0 :         return false;
     804             : 
     805             :     // -------------------------------------------------------------------------
     806          15 :     LBASSERT(!_impl->computeContext);
     807             : 
     808             :     // for now we only support CUDA
     809             : #ifdef EQUALIZER_USE_CUDA
     810             :     if( getIAttribute( IATTR_HINT_CUDA_GL_INTEROP ) == eq::ON )
     811             :     {
     812             :         LBINFO << "Initializing CUDAContext" << std::endl;
     813             :         ComputeContext* computeCtx = new CUDAContext( this );
     814             : 
     815             :         if( !computeCtx->configInit() )
     816             :         {
     817             :             LBWARN << "GPU Computing context initialization failed "
     818             :                    << std::endl;
     819             :             delete computeCtx;
     820             :             return false;
     821             :         }
     822             :         setComputeContext( computeCtx );
     823             :     }
     824             : #endif
     825             : 
     826          15 :     return true;
     827             : }
     828             : 
     829          15 : bool Pipe::configInitSystemPipe( const uint128_t& )
     830             : {
     831          15 :     SystemPipe* systemPipe = _impl->windowSystem.createPipe( this );
     832          15 :     LBASSERT( systemPipe );
     833             : 
     834          15 :     if( !systemPipe->configInit( ))
     835             :     {
     836           0 :         LBERROR << "System pipe context initialization failed" << std::endl;
     837           0 :         delete systemPipe;
     838           0 :         return false;
     839             :     }
     840             : 
     841          15 :     setSystemPipe( systemPipe );
     842          15 :     return true;
     843             : }
     844             : 
     845          15 : bool Pipe::configExit()
     846             : {
     847          15 :     LB_TS_THREAD( _pipeThread );
     848             : 
     849          15 :     if( _impl->computeContext )
     850             :     {
     851           0 :         _impl->computeContext->configExit();
     852           0 :         delete _impl->computeContext;
     853           0 :         _impl->computeContext = 0;
     854             :     }
     855             : 
     856          15 :     if( _impl->systemPipe )
     857             :     {
     858          15 :         _impl->systemPipe->configExit( );
     859          15 :         delete _impl->systemPipe;
     860          15 :         _impl->systemPipe = 0;
     861             :     }
     862          15 :     return true;
     863             : }
     864             : 
     865             : 
     866           7 : void Pipe::frameStart( const uint128_t&, const uint32_t frameNumber )
     867             : {
     868           7 :     LB_TS_THREAD( _pipeThread );
     869             : 
     870           7 :     const Node* node = getNode();
     871           7 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     872             :     {
     873             :         case ASYNC:      // No sync, release immediately
     874           0 :             releaseFrameLocal( frameNumber );
     875           0 :             break;
     876             : 
     877             :         case DRAW_SYNC:  // Sync, release in frameDrawFinish
     878             :         case LOCAL_SYNC: // Sync, release in frameFinish
     879           7 :             node->waitFrameStarted( frameNumber );
     880           7 :             break;
     881             : 
     882             :         default:
     883           0 :             LBUNIMPLEMENTED;
     884             :     }
     885             : 
     886           7 :     startFrame( frameNumber );
     887           7 : }
     888             : 
     889           7 : void Pipe::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
     890             : {
     891           7 :     const Node* node = getNode();
     892           7 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     893             :     {
     894             :         case ASYNC:      // released in frameStart
     895           0 :             break;
     896             : 
     897             :         case DRAW_SYNC:  // release
     898           7 :             releaseFrameLocal( frameNumber );
     899           7 :             break;
     900             : 
     901             :         case LOCAL_SYNC: // release in frameFinish
     902           0 :             break;
     903             : 
     904             :         default:
     905           0 :             LBUNIMPLEMENTED;
     906             :     }
     907           7 : }
     908             : 
     909           7 : void Pipe::frameFinish( const uint128_t&, const uint32_t frameNumber )
     910             : {
     911           7 :     const Node* node = getNode();
     912           7 :     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
     913             :     {
     914             :         case ASYNC:      // released in frameStart
     915           0 :             break;
     916             : 
     917             :         case DRAW_SYNC:  // released in frameDrawFinish
     918           7 :             break;
     919             : 
     920             :         case LOCAL_SYNC: // release
     921           0 :             releaseFrameLocal( frameNumber );
     922           0 :             break;
     923             : 
     924             :         default:
     925           0 :             LBUNIMPLEMENTED;
     926             :     }
     927             : 
     928             :     // Global release
     929           7 :     releaseFrame( frameNumber );
     930           7 : }
     931             : 
     932           7 : void Pipe::startFrame( const uint32_t frameNumber )
     933             : {
     934           7 :     LB_TS_THREAD( _pipeThread );
     935           7 :     _impl->currentFrame = frameNumber;
     936           7 :     LBLOG( LOG_TASKS ) << "---- Started Frame ---- "<< frameNumber << std::endl;
     937           7 : }
     938             : 
     939           7 : void Pipe::releaseFrame( const uint32_t frameNumber )
     940             : {
     941           7 :     LB_TS_THREAD( _pipeThread );
     942           7 :     _impl->finishedFrame = frameNumber;
     943           7 :     LBLOG( LOG_TASKS ) << "---- Finished Frame --- "<< frameNumber << std::endl;
     944           7 : }
     945             : 
     946           7 : void Pipe::releaseFrameLocal( const uint32_t frameNumber )
     947             : {
     948           7 :     LB_TS_THREAD( _pipeThread );
     949           7 :     LBASSERTINFO( _impl->unlockedFrame + 1 == frameNumber,
     950             :                   _impl->unlockedFrame << ", " << frameNumber );
     951             : 
     952           7 :     _impl->unlockedFrame = frameNumber;
     953           7 :     LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- "
     954           7 :                        << _impl->unlockedFrame.get() << std::endl;
     955           7 : }
     956             : 
     957           2 : bool Pipe::startTransferThread()
     958             : {
     959           2 :     if( _impl->transferThread.isRunning( ))
     960           0 :         return true;
     961             : 
     962           2 :     return _impl->transferThread.start();
     963             : }
     964             : 
     965          15 : bool Pipe::hasTransferThread() const
     966             : {
     967          15 :     return _impl->transferThread.isRunning();
     968             : }
     969             : 
     970         105 : void Pipe::_stopTransferThread()
     971             : {
     972         105 :     if( _impl->transferThread.isStopped( ))
     973         208 :         return;
     974             : 
     975           2 :     send( getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD );
     976           2 :     _impl->transferThread.join();
     977             : }
     978             : 
     979          15 : void Pipe::setSystemPipe( SystemPipe* pipe )
     980             : {
     981          15 :     _impl->systemPipe = pipe;
     982          15 : }
     983             : 
     984          17 : SystemPipe* Pipe::getSystemPipe()
     985             : {
     986          17 :     return _impl->systemPipe;
     987             : }
     988             : 
     989           0 : const SystemPipe* Pipe::getSystemPipe() const
     990             : {
     991           0 :     return _impl->systemPipe;
     992             : }
     993             : 
     994           0 : void Pipe::setComputeContext( ComputeContext* ctx )
     995             : {
     996           0 :     _impl->computeContext = ctx;
     997           0 : }
     998             : 
     999           0 : const ComputeContext* Pipe::getComputeContext() const
    1000             : {
    1001           0 :     return _impl->computeContext;
    1002             : }
    1003             : 
    1004           0 : ComputeContext* Pipe::getComputeContext()
    1005             : {
    1006           0 :     return _impl->computeContext;
    1007             : }
    1008             : 
    1009             : //---------------------------------------------------------------------------
    1010             : // command handlers
    1011             : //---------------------------------------------------------------------------
    1012          15 : bool Pipe::_cmdCreateWindow( co::ICommand& cmd )
    1013             : {
    1014          15 :     co::ObjectICommand command( cmd );
    1015          15 :     const uint128_t& windowID = command.read< uint128_t >();
    1016             : 
    1017          15 :     LBLOG( LOG_INIT ) << "Create window " << command << " id " << windowID
    1018          15 :                       << std::endl;
    1019             : 
    1020          15 :     Window* window = Global::getNodeFactory()->createWindow( this );
    1021          15 :     window->init(); // not in ctor, virtual method
    1022             : 
    1023          15 :     Config* config = getConfig();
    1024          15 :     LBCHECK( config->mapObject( window, windowID ));
    1025             : 
    1026          15 :     return true;
    1027             : }
    1028             : 
    1029          15 : bool Pipe::_cmdDestroyWindow( co::ICommand& cmd )
    1030             : {
    1031          15 :     co::ObjectICommand command( cmd );
    1032             : 
    1033          15 :     LBLOG( LOG_INIT ) << "Destroy window " << command << std::endl;
    1034             : 
    1035          15 :     Window* window = _findWindow( command.read< uint128_t >( ));
    1036          15 :     LBASSERT( window );
    1037             : 
    1038             :     // re-set shared windows accordingly
    1039          15 :     Window* newSharedWindow = 0;
    1040          15 :     const Windows& windows = getWindows();
    1041          30 :     for( Windows::const_iterator i = windows.begin(); i != windows.end(); ++i )
    1042             :     {
    1043          15 :         Window* candidate = *i;
    1044             : 
    1045          15 :         if( candidate == window )
    1046          15 :             continue; // ignore
    1047             : 
    1048           0 :         if( candidate->getSharedContextWindow() == window )
    1049             :         {
    1050           0 :             if( newSharedWindow )
    1051           0 :                 candidate->setSharedContextWindow( newSharedWindow );
    1052             :             else
    1053             :             {
    1054           0 :                 newSharedWindow = candidate;
    1055           0 :                 newSharedWindow->setSharedContextWindow( candidate );
    1056             :             }
    1057             :         }
    1058             : 
    1059           0 :         LBASSERT( candidate->getSharedContextWindow() != window );
    1060             :     }
    1061             : 
    1062          15 :     const bool stopped = window->isStopped();
    1063             :     window->send( getServer(),
    1064          15 :                   fabric::CMD_WINDOW_CONFIG_EXIT_REPLY ) << stopped;
    1065             : 
    1066          15 :     Config* config = getConfig();
    1067          15 :     config->unmapObject( window );
    1068          15 :     Global::getNodeFactory()->releaseWindow( window );
    1069             : 
    1070          15 :     return true;
    1071             : }
    1072             : 
    1073          15 : bool Pipe::_cmdConfigInit( co::ICommand& cmd )
    1074             : {
    1075          15 :     LB_TS_THREAD( _pipeThread );
    1076             : 
    1077          15 :     co::ObjectICommand command( cmd );
    1078          15 :     const uint128_t& initID = command.read< uint128_t >();
    1079          15 :     const uint32_t frameNumber = command.read< uint32_t >();
    1080             : 
    1081          15 :     LBLOG( LOG_INIT ) << "Init pipe " << command << " init id " << initID
    1082          15 :                       << " frame " << frameNumber << std::endl;
    1083             : 
    1084          15 :     if( !isThreaded( ))
    1085             :     {
    1086           0 :         _impl->windowSystem = selectWindowSystem();
    1087           0 :         _setupCommandQueue();
    1088             :     }
    1089             : 
    1090          15 :     Node* node = getNode();
    1091          15 :     LBASSERT( node );
    1092          15 :     node->waitInitialized();
    1093             : 
    1094          15 :     bool result = false;
    1095          15 :     if( node->isRunning( ))
    1096             :     {
    1097          15 :         _impl->currentFrame  = frameNumber;
    1098          15 :         _impl->finishedFrame = frameNumber;
    1099          15 :         _impl->unlockedFrame = frameNumber;
    1100          15 :         _impl->state = STATE_INITIALIZING;
    1101             : 
    1102          15 :         result = configInit( initID );
    1103             : 
    1104          15 :         if( result )
    1105          15 :             _impl->state = STATE_RUNNING;
    1106             :     }
    1107             :     else
    1108           0 :         sendError( ERROR_PIPE_NODE_NOTRUNNING );
    1109             : 
    1110          15 :     LBLOG( LOG_INIT ) << "TASK pipe config init reply result " << result
    1111          15 :                       << std::endl;
    1112             : 
    1113          15 :     commit();
    1114             :     send( command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY )
    1115          15 :             << result;
    1116          15 :     return true;
    1117             : }
    1118             : 
    1119          15 : bool Pipe::_cmdConfigExit( co::ICommand& cmd )
    1120             : {
    1121          15 :     co::ObjectICommand command( cmd );
    1122             : 
    1123          15 :     LB_TS_THREAD( _pipeThread );
    1124          15 :     LBLOG( LOG_INIT ) << "TASK pipe config exit " << command << std::endl;
    1125             : 
    1126          15 :     _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
    1127             : 
    1128             :     // send before node gets a chance to send its destroy command
    1129          15 :     getNode()->send( getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE ) << getID();
    1130             : 
    1131             :     // Flush views before exit since they are created after init
    1132             :     // - application may need initialized pipe to exit
    1133             :     // - configExit can't access views since all channels are gone already
    1134          15 :     _flushViews();
    1135          15 :     _flushQueues();
    1136          15 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
    1137          15 :     return true;
    1138             : }
    1139             : 
    1140          15 : bool Pipe::_cmdExitThread( co::ICommand& )
    1141             : {
    1142          15 :     LBASSERT( _impl->thread );
    1143          15 :     _impl->thread->_pipe = 0;
    1144          15 :     return true;
    1145             : }
    1146             : 
    1147           2 : bool Pipe::_cmdExitTransferThread( co::ICommand& )
    1148             : {
    1149           2 :     _impl->transferThread.postStop();
    1150           2 :     return true;
    1151             : }
    1152             : 
    1153           7 : bool Pipe::_cmdFrameStartClock( co::ICommand& )
    1154             : {
    1155           7 :     LBVERB << "start frame clock" << std::endl;
    1156           7 :     _impl->frameTimeMutex.set();
    1157           7 :     _impl->frameTimes.push_back( getConfig()->getTime( ));
    1158           7 :     _impl->frameTimeMutex.unset();
    1159           7 :     return true;
    1160             : }
    1161             : 
    1162           7 : bool Pipe::_cmdFrameStart( co::ICommand& cmd )
    1163             : {
    1164           7 :     LB_TS_THREAD( _pipeThread );
    1165             : 
    1166           7 :     co::ObjectICommand command( cmd );
    1167           7 :     const uint128_t& version = command.read< uint128_t >();
    1168           7 :     const uint128_t& frameID = command.read< uint128_t >();
    1169           7 :     const uint32_t frameNumber = command.read< uint32_t >();
    1170             : 
    1171           7 :     LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
    1172           7 :            << " id " << frameID << std::endl;
    1173             : 
    1174           7 :     LBLOG( LOG_TASKS ) << "---- TASK start frame ---- frame " << frameNumber
    1175           7 :                        << " id " << frameID << std::endl;
    1176           7 :     sync( version );
    1177           7 :     const int64_t lastFrameTime = _impl->frameTime;
    1178             : 
    1179           7 :     _impl->frameTimeMutex.set();
    1180           7 :     LBASSERT( !_impl->frameTimes.empty( ));
    1181             : 
    1182           7 :     _impl->frameTime = _impl->frameTimes.front();
    1183           7 :     _impl->frameTimes.pop_front();
    1184           7 :     _impl->frameTimeMutex.unset();
    1185             : 
    1186           7 :     if( lastFrameTime > 0 )
    1187             :     {
    1188           2 :         PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
    1189             :         waitEvent.event.data.statistic.idleTime =
    1190           2 :             _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime() :0;
    1191             :         waitEvent.event.data.statistic.totalTime =
    1192           2 :             LB_MAX( _impl->frameTime - lastFrameTime, 1 ); // avoid SIGFPE
    1193             :     }
    1194             : 
    1195           7 :     LBASSERTINFO( _impl->currentFrame + 1 == frameNumber,
    1196             :                   "current " <<_impl->currentFrame << " start " << frameNumber);
    1197             : 
    1198           7 :     frameStart( frameID, frameNumber );
    1199           7 :     return true;
    1200             : }
    1201             : 
    1202           7 : bool Pipe::_cmdFrameFinish( co::ICommand& cmd )
    1203             : {
    1204           7 :     LB_TS_THREAD( _pipeThread );
    1205             : 
    1206           7 :     co::ObjectICommand command( cmd );
    1207           7 :     const uint128_t& frameID = command.read< uint128_t >();
    1208           7 :     const uint32_t frameNumber = command.read< uint32_t >();
    1209             : 
    1210           7 :     LBLOG( LOG_TASKS ) << "---- TASK finish frame --- " << command << " frame "
    1211           7 :                        << frameNumber << " id " << frameID << std::endl;
    1212             : 
    1213           7 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
    1214             :                   "current " <<_impl->currentFrame << " finish " <<frameNumber);
    1215             : 
    1216           7 :     frameFinish( frameID, frameNumber );
    1217             : 
    1218           7 :     LBASSERTINFO( _impl->finishedFrame >= frameNumber,
    1219             :                   "Pipe::frameFinish() did not release frame " << frameNumber );
    1220             : 
    1221           7 :     if( _impl->unlockedFrame < frameNumber )
    1222             :     {
    1223           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
    1224           0 :                << std::endl << "    unlocked " << _impl->unlockedFrame.get()
    1225           0 :                << " done " << frameNumber << std::endl;
    1226           0 :         releaseFrameLocal( frameNumber );
    1227             :     }
    1228             : 
    1229           7 :     if( _impl->finishedFrame < frameNumber )
    1230             :     {
    1231           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
    1232           0 :                << std::endl;
    1233           0 :         releaseFrame( frameNumber );
    1234             :     }
    1235             : 
    1236           7 :     _releaseViews();
    1237             : 
    1238           7 :     const uint128_t version = commit();
    1239           7 :     if( version != co::VERSION_NONE )
    1240           5 :         send( command.getRemoteNode(), fabric::CMD_OBJECT_SYNC );
    1241           7 :     return true;
    1242             : }
    1243             : 
    1244           7 : bool Pipe::_cmdFrameDrawFinish( co::ICommand& cmd )
    1245             : {
    1246           7 :     LB_TS_THREAD( _pipeThread );
    1247             : 
    1248           7 :     co::ObjectICommand command( cmd );
    1249           7 :     const uint128_t& frameID = command.read< uint128_t >();
    1250           7 :     const uint32_t frameNumber = command.read< uint32_t >();
    1251             : 
    1252           7 :     LBLOG( LOG_TASKS ) << "TASK draw finish " << getName()
    1253           0 :                        << " frame " << frameNumber << " id " << frameID
    1254           7 :                        << std::endl;
    1255             : 
    1256           7 :     frameDrawFinish( frameID, frameNumber );
    1257           7 :     return true;
    1258             : }
    1259             : 
    1260           0 : bool Pipe::_cmdDetachView( co::ICommand& cmd )
    1261             : {
    1262           0 :     co::ObjectICommand command( cmd );
    1263             : 
    1264           0 :     LB_TS_THREAD( _pipeThread );
    1265             : 
    1266           0 :     ViewHash::iterator i = _impl->views.find( command.read< uint128_t >( ));
    1267           0 :     if( i != _impl->views.end( ))
    1268             :     {
    1269           0 :         View* view = i->second;
    1270           0 :         _impl->views.erase( i );
    1271             : 
    1272           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
    1273           0 :         nodeFactory->releaseView( view );
    1274             :     }
    1275           0 :     return true;
    1276             : }
    1277             : 
    1278             : }
    1279             : 
    1280             : #include "../fabric/pipe.ipp"
    1281             : template class eq::fabric::Pipe< eq::Node, eq::Pipe, eq::Window,
    1282             :                                  eq::PipeVisitor >;
    1283             : 
    1284             : /** @cond IGNORE */
    1285             : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
    1286          36 :                                                 const eq::Super& );
    1287             : /** @endcond */

Generated by: LCOV version 1.10