LCOV - code coverage report
Current view: top level - eq/client - node.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 288 377 76.4 %
Date: 2014-06-18 Functions: 48 56 85.7 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                    2010, Cedric Stalder<cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "node.h"
      21             : 
      22             : #include "client.h"
      23             : #include "config.h"
      24             : #include "error.h"
      25             : #include "exception.h"
      26             : #include "frameData.h"
      27             : #include "global.h"
      28             : #include "log.h"
      29             : #include "nodeFactory.h"
      30             : #include "nodeStatistics.h"
      31             : #include "pipe.h"
      32             : #include "server.h"
      33             : 
      34             : #include <eq/fabric/commands.h>
      35             : #include <eq/fabric/elementVisitor.h>
      36             : #include <eq/fabric/task.h>
      37             : 
      38             : #include <co/barrier.h>
      39             : #include <co/connection.h>
      40             : #include <co/global.h>
      41             : #include <co/objectICommand.h>
      42             : #include <lunchbox/scopedMutex.h>
      43             : 
      44             : namespace eq
      45             : {
      46             : namespace
      47             : {
      48             : typedef stde::hash_map< uint128_t, co::Barrier* > BarrierHash;
      49             : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
      50             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      51             : typedef FrameDataHash::iterator FrameDataHashIter;
      52             : 
      53             : enum State
      54             : {
      55             :     STATE_STOPPED,
      56             :     STATE_INITIALIZING,
      57             :     STATE_INIT_FAILED,
      58             :     STATE_RUNNING,
      59             :     STATE_FAILED
      60             : };
      61             : }
      62             : 
      63             : namespace detail
      64             : {
      65             : class TransmitThread : public lunchbox::Thread
      66             : {
      67             : public:
      68          38 :     TransmitThread()
      69          38 :         : _queue( co::Global::getCommandQueueLimit( ))
      70          38 :     {}
      71           8 :     virtual ~TransmitThread() {}
      72             : 
      73          61 :     co::CommandQueue& getQueue() { return _queue; }
      74             : 
      75             : protected:
      76           8 :     bool init() override { setName( "Xmit" ); return true; }
      77             :     void run() override;
      78             : 
      79             : private:
      80             :     co::CommandQueue _queue;
      81             : };
      82             : 
      83           8 : class Node
      84             : {
      85             : public:
      86          38 :     Node()
      87             :         : state( STATE_STOPPED )
      88             :         , finishedFrame( 0 )
      89          38 :         , unlockedFrame( 0 )
      90          38 :     {}
      91             : 
      92             :     /** The configInit/configExit state. */
      93             :     lunchbox::Monitor< State > state;
      94             : 
      95             :     /** The number of the last started frame. */
      96             :     lunchbox::Monitor< uint32_t > currentFrame;
      97             : 
      98             :     /** The number of the last finished frame. */
      99             :     uint32_t finishedFrame;
     100             : 
     101             :     /** The number of the last locally released frame. */
     102             :     uint32_t unlockedFrame;
     103             : 
     104             :     /** All barriers mapped by the node. */
     105             :     lunchbox::Lockable< BarrierHash > barriers;
     106             : 
     107             :     /** All frame datas used by the node during rendering. */
     108             :     lunchbox::Lockable< FrameDataHash > frameDatas;
     109             : 
     110             :     TransmitThread transmitter;
     111             : };
     112             : 
     113             : }
     114             : 
     115             : /** @cond IGNORE */
     116             : typedef co::CommandFunc<Node> NodeFunc;
     117             : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
     118             : /** @endcond */
     119             : 
     120          38 : Node::Node( Config* parent )
     121             :     : Super( parent )
     122          38 :     , _impl( new detail::Node )
     123             : {
     124          38 : }
     125             : 
     126          22 : Node::~Node()
     127             : {
     128           8 :     LBASSERT( getPipes().empty( ));
     129           8 :     delete _impl;
     130          14 : }
     131             : 
     132           8 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
     133             : {
     134           8 :     Super::attach( id, instanceID );
     135             : 
     136           8 :     co::CommandQueue* queue = getMainThreadQueue();
     137           8 :     co::CommandQueue* commandQ = getCommandThreadQueue();
     138           8 :     co::CommandQueue* transmitQ = getTransmitterQueue();
     139             : 
     140             :     registerCommand( fabric::CMD_NODE_CREATE_PIPE,
     141           8 :                      NodeFunc( this, &Node::_cmdCreatePipe ), queue );
     142             :     registerCommand( fabric::CMD_NODE_DESTROY_PIPE,
     143           8 :                      NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
     144             :     registerCommand( fabric::CMD_NODE_CONFIG_INIT,
     145           8 :                      NodeFunc( this, &Node::_cmdConfigInit ), queue );
     146             :     registerCommand( fabric::CMD_NODE_SET_AFFINITY,
     147           8 :                      NodeFunc( this, &Node::_cmdSetAffinity), transmitQ );
     148             :     registerCommand( fabric::CMD_NODE_CONFIG_EXIT,
     149           8 :                      NodeFunc( this, &Node::_cmdConfigExit ), queue );
     150             :     registerCommand( fabric::CMD_NODE_FRAME_START,
     151           8 :                      NodeFunc( this, &Node::_cmdFrameStart ), queue );
     152             :     registerCommand( fabric::CMD_NODE_FRAME_FINISH,
     153           8 :                      NodeFunc( this, &Node::_cmdFrameFinish ), queue );
     154             :     registerCommand( fabric::CMD_NODE_FRAME_DRAW_FINISH,
     155           8 :                      NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
     156             :     registerCommand( fabric::CMD_NODE_FRAME_TASKS_FINISH,
     157           8 :                      NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
     158             :     registerCommand( fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
     159           8 :                      NodeFunc( this, &Node::_cmdFrameDataTransmit ), commandQ );
     160             :     registerCommand( fabric::CMD_NODE_FRAMEDATA_READY,
     161           8 :                      NodeFunc( this, &Node::_cmdFrameDataReady ), commandQ );
     162           8 : }
     163             : 
     164           8 : void Node::setDirty( const uint64_t bits )
     165             : {
     166             :     // jump over fabric setDirty to avoid dirty'ing config node list
     167             :     // nodes are individually synced in frame finish
     168           8 :     Object::setDirty( bits );
     169           8 : }
     170             : 
     171          60 : ClientPtr Node::getClient()
     172             : {
     173          60 :     Config* config = getConfig();
     174          60 :     LBASSERT( config );
     175          60 :     return ( config ? config->getClient() : 0 );
     176             : }
     177             : 
     178          87 : ServerPtr Node::getServer()
     179             : {
     180          87 :     Config* config = getConfig();
     181          87 :     LBASSERT( config );
     182          87 :     return ( config ? config->getServer() : 0 );
     183             : }
     184             : 
     185           8 : co::CommandQueue* Node::getMainThreadQueue()
     186             : {
     187           8 :     return getConfig()->getMainThreadQueue();
     188             : }
     189             : 
     190           8 : co::CommandQueue* Node::getCommandThreadQueue()
     191             : {
     192           8 :     return getConfig()->getCommandThreadQueue();
     193             : }
     194             : 
     195          61 : co::CommandQueue* Node::getTransmitterQueue()
     196             : {
     197          61 :     return &_impl->transmitter.getQueue();
     198             : }
     199             : 
     200           0 : uint32_t Node::getCurrentFrame() const
     201             : {
     202           0 :     return _impl->currentFrame.get();
     203             : }
     204             : 
     205           6 : uint32_t Node::getFinishedFrame() const
     206             : {
     207           6 :     return _impl->finishedFrame;
     208             : }
     209             : 
     210           0 : co::Barrier* Node::getBarrier( const co::ObjectVersion& barrier )
     211             : {
     212           0 :     lunchbox::ScopedMutex<> mutex( _impl->barriers );
     213           0 :     co::Barrier* netBarrier = _impl->barriers.data[ barrier.identifier ];
     214             : 
     215           0 :     if( netBarrier )
     216           0 :         netBarrier->sync( barrier.version );
     217             :     else
     218             :     {
     219           0 :         ClientPtr client = getClient();
     220             : 
     221           0 :         netBarrier = new co::Barrier( client, barrier );
     222           0 :         if( !netBarrier->isGood( ))
     223             :         {
     224           0 :             LBCHECK( netBarrier->isGood( ));
     225           0 :             LBWARN << "Could not map swap barrier" << std::endl;
     226           0 :             delete netBarrier;
     227           0 :             return 0;
     228             :         }
     229           0 :         _impl->barriers.data[ barrier.identifier ] = netBarrier;
     230             :     }
     231             : 
     232           0 :     return netBarrier;
     233             : }
     234             : 
     235          10 : FrameDataPtr Node::getFrameData( const co::ObjectVersion& frameDataVersion )
     236             : {
     237          10 :     lunchbox::ScopedWrite mutex( _impl->frameDatas );
     238          10 :     FrameDataPtr data = _impl->frameDatas.data[ frameDataVersion.identifier ];
     239             : 
     240          10 :     if( !data )
     241             :     {
     242           2 :         data = new FrameData;
     243           2 :         data->setID( frameDataVersion.identifier );
     244           2 :         _impl->frameDatas.data[ frameDataVersion.identifier ] = data;
     245             :     }
     246             : 
     247          10 :     LBASSERT( frameDataVersion.version.high() == 0 );
     248          10 :     data->setVersion( frameDataVersion.version.low( ));
     249          10 :     return data;
     250             : }
     251             : 
     252           2 : void Node::releaseFrameData( FrameDataPtr data )
     253             : {
     254           2 :     lunchbox::ScopedWrite mutex( _impl->frameDatas );
     255           2 :     FrameDataHashIter i = _impl->frameDatas->find( data->getID( ));
     256           2 :     LBASSERT( i != _impl->frameDatas->end( ));
     257           2 :     if( i == _impl->frameDatas->end( ))
     258           2 :         return;
     259             : 
     260           2 :     _impl->frameDatas->erase( i );
     261             : }
     262             : 
     263          15 : void Node::waitInitialized() const
     264             : {
     265          15 :     _impl->state.waitGE( STATE_INIT_FAILED );
     266          15 : }
     267             : 
     268          15 : bool Node::isRunning() const
     269             : {
     270          15 :     return _impl->state == STATE_RUNNING;
     271             : }
     272             : 
     273           8 : bool Node::isStopped() const
     274             : {
     275           8 :     return _impl->state == STATE_STOPPED;
     276             : }
     277             : 
     278           8 : bool Node::configInit( const uint128_t& )
     279             : {
     280           8 :     WindowSystem::configInit( this );
     281           8 :     return true;
     282             : }
     283             : 
     284           8 : bool Node::configExit()
     285             : {
     286           8 :     WindowSystem::configExit( this );
     287           8 :     return true;
     288             : }
     289             : 
     290           8 : void Node::_setAffinity()
     291             : {
     292           8 :     const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
     293           8 :     switch( affinity )
     294             :     {
     295             :         case OFF:
     296           0 :             break;
     297             : 
     298             :         case AUTO:
     299             :             // TODO
     300           8 :             LBVERB << "No automatic thread placement for node threads "
     301           8 :                    << std::endl;
     302           8 :             break;
     303             : 
     304             :         default:
     305           0 :             co::LocalNodePtr node = getLocalNode();
     306           0 :             send( node, fabric::CMD_NODE_SET_AFFINITY ) << affinity;
     307             : 
     308           0 :             node->setAffinity( affinity );
     309           0 :             break;
     310             :     }
     311           8 : }
     312             : 
     313           7 : void Node::waitFrameStarted( const uint32_t frameNumber ) const
     314             : {
     315           7 :     _impl->currentFrame.waitGE( frameNumber );
     316           7 : }
     317             : 
     318           5 : void Node::startFrame( const uint32_t frameNumber )
     319             : {
     320           5 :     _impl->currentFrame = frameNumber;
     321           5 : }
     322             : 
     323           5 : void Node::frameFinish( const uint128_t&, const uint32_t frameNumber )
     324             : {
     325           5 :     releaseFrame( frameNumber );
     326           5 : }
     327             : 
     328           5 : void Node::_finishFrame( const uint32_t frameNumber ) const
     329             : {
     330           5 :     const Pipes& pipes = getPipes();
     331          12 :     for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
     332             :     {
     333           7 :         const Pipe* pipe = *i;
     334           7 :         LBASSERT( pipe->isThreaded() || pipe->getFinishedFrame()>=frameNumber );
     335             : 
     336           7 :         pipe->waitFrameLocal( frameNumber );
     337           7 :         pipe->waitFrameFinished( frameNumber );
     338             :     }
     339           5 : }
     340             : 
     341           5 : void Node::_frameFinish( const uint128_t& frameID,
     342             :                          const uint32_t frameNumber )
     343             : {
     344           5 :     frameFinish( frameID, frameNumber );
     345           5 :     LBLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber
     346           5 :                        << std::endl;
     347             : 
     348           5 :     if( _impl->unlockedFrame < frameNumber )
     349             :     {
     350           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
     351           0 :                << std::endl;
     352           0 :         releaseFrameLocal( frameNumber );
     353             :     }
     354             : 
     355           5 :     if( _impl->finishedFrame < frameNumber )
     356             :     {
     357           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
     358           0 :                << std::endl;
     359           0 :         releaseFrame( frameNumber );
     360             :     }
     361           5 : }
     362             : 
     363           5 : void Node::releaseFrame( const uint32_t frameNumber )
     364             : {
     365           5 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
     366             :                   "current " << _impl->currentFrame << " release " <<
     367             :                   frameNumber );
     368             : 
     369           5 :     if( _impl->finishedFrame >= frameNumber )
     370           5 :         return;
     371           5 :     _impl->finishedFrame = frameNumber;
     372             : 
     373           5 :     Config* config = getConfig();
     374           5 :     ServerPtr server = config->getServer();
     375          10 :     co::NodePtr node = server.get();
     376          10 :     send( node, fabric::CMD_NODE_FRAME_FINISH_REPLY ) << frameNumber;
     377             : }
     378             : 
     379           5 : void Node::releaseFrameLocal( const uint32_t frameNumber )
     380             : {
     381           5 :     LBASSERT( _impl->unlockedFrame <= frameNumber );
     382           5 :     _impl->unlockedFrame = frameNumber;
     383             : 
     384           5 :     Config* config = getConfig();
     385           5 :     LBASSERT( config->getNodes().size() == 1 );
     386           5 :     LBASSERT( config->getNodes()[0] == this );
     387           5 :     config->releaseFrameLocal( frameNumber );
     388             : 
     389           5 :     LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _impl->unlockedFrame
     390           5 :                        << std::endl;
     391           5 : }
     392             : 
     393           5 : void Node::frameStart( const uint128_t&, const uint32_t frameNumber )
     394             : {
     395           5 :     startFrame( frameNumber ); // unlock pipe threads
     396             : 
     397           5 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     398             :     {
     399             :         case ASYNC:
     400             :             // Don't wait for pipes to release frame locally, sync not needed
     401           0 :             releaseFrameLocal( frameNumber );
     402           0 :             break;
     403             : 
     404             :         case DRAW_SYNC:  // Sync and release in frameDrawFinish
     405             :         case LOCAL_SYNC: // Sync and release in frameTasksFinish
     406           5 :             break;
     407             : 
     408             :         default:
     409           0 :             LBUNIMPLEMENTED;
     410             :     }
     411           5 : }
     412             : 
     413           5 : void Node::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
     414             : {
     415           5 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     416             :     {
     417             :         case ASYNC:      // No sync, release in frameStart
     418             :         case LOCAL_SYNC: // Sync and release in frameTasksFinish
     419           0 :             break;
     420             : 
     421             :         case DRAW_SYNC:
     422             :         {
     423           5 :             const Pipes& pipes = getPipes();
     424          36 :             for( Pipes::const_iterator i = pipes.begin();
     425          24 :                  i != pipes.end(); ++i )
     426             :             {
     427           7 :                 const Pipe* pipe = *i;
     428           7 :                 if( pipe->getTasks() & fabric::TASK_DRAW )
     429           7 :                     pipe->waitFrameLocal( frameNumber );
     430             :             }
     431             : 
     432           5 :             releaseFrameLocal( frameNumber );
     433           5 :             break;
     434             :         }
     435             :         default:
     436           0 :             LBUNIMPLEMENTED;
     437             :     }
     438           5 : }
     439             : 
     440           5 : void Node::frameTasksFinish( const uint128_t&, const uint32_t frameNumber )
     441             : {
     442           5 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     443             :     {
     444             :         case ASYNC:      // No sync, release in frameStart
     445             :         case DRAW_SYNC:  // Sync and release in frameDrawFinish
     446           5 :             break;
     447             : 
     448             :         case LOCAL_SYNC:
     449             :         {
     450           0 :             const Pipes& pipes = getPipes();
     451           0 :             for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     452             :             {
     453           0 :                 const Pipe* pipe = *i;
     454           0 :                 if( pipe->getTasks() != fabric::TASK_NONE )
     455           0 :                     pipe->waitFrameLocal( frameNumber );
     456             :             }
     457             : 
     458           0 :             releaseFrameLocal( frameNumber );
     459           0 :             break;
     460             :         }
     461             :         default:
     462           0 :             LBUNIMPLEMENTED;
     463             :     }
     464           5 : }
     465             : 
     466             : 
     467           0 : EventOCommand Node::sendError( const uint32_t error )
     468             : {
     469           0 :     return getConfig()->sendError( Event::NODE_ERROR, getID(), error );
     470             : }
     471             : 
     472           0 : bool Node::processEvent( const Event& event )
     473             : {
     474           0 :     ConfigEvent configEvent( event );
     475           0 :     getConfig()->sendEvent( configEvent );
     476           0 :     return true;
     477             : }
     478             : 
     479           8 : void Node::_flushObjects()
     480             : {
     481           8 :     ClientPtr client = getClient();
     482             :     {
     483           8 :         lunchbox::ScopedMutex<> mutex( _impl->barriers );
     484          24 :         for( BarrierHash::const_iterator i = _impl->barriers->begin();
     485          16 :              i != _impl->barriers->end(); ++i )
     486             :         {
     487           0 :             delete i->second;
     488             :         }
     489           8 :         _impl->barriers->clear();
     490             :     }
     491             : 
     492          16 :     lunchbox::ScopedMutex<> mutex( _impl->frameDatas );
     493          24 :     for( FrameDataHashCIter i = _impl->frameDatas->begin();
     494          16 :          i != _impl->frameDatas->end(); ++i )
     495             :     {
     496           0 :         FrameDataPtr frameData = i->second;
     497           0 :         frameData->resetPlugins();
     498           0 :         client->unmapObject( frameData.get( ));
     499           0 :     }
     500          16 :     _impl->frameDatas->clear();
     501           8 : }
     502             : 
     503          10 : void detail::TransmitThread::run()
     504             : {
     505             :     while( true )
     506             :     {
     507          10 :         co::ICommand command = _queue.pop();
     508          10 :         if( !command.isValid( ))
     509          16 :             return; // exit thread
     510             : 
     511           2 :         LBCHECK( command( ));
     512           2 :     }
     513             : }
     514             : 
     515          30 : void Node::dirtyClientExit()
     516             : {
     517          30 :     const Pipes& pipes = getPipes();
     518         120 :     for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
     519             :     {
     520          90 :         Pipe* pipe = *i;
     521          90 :         pipe->cancelThread();
     522             :     }
     523          30 :     getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
     524          30 :     _impl->transmitter.join();
     525          30 : }
     526             : 
     527             : //---------------------------------------------------------------------------
     528             : // command handlers
     529             : //---------------------------------------------------------------------------
     530          15 : bool Node::_cmdCreatePipe( co::ICommand& cmd )
     531             : {
     532          15 :     LB_TS_THREAD( _nodeThread );
     533          15 :     LBASSERT( _impl->state >= STATE_INIT_FAILED );
     534             : 
     535          15 :     co::ObjectICommand command( cmd );
     536          15 :     const uint128_t& pipeID = command.read< uint128_t >();
     537          15 :     const bool threaded = command.read< bool >();
     538             : 
     539          15 :     LBLOG( LOG_INIT ) << "Create pipe " << command << " id " << pipeID
     540          15 :                       << std::endl;
     541             : 
     542          15 :     Pipe* pipe = Global::getNodeFactory()->createPipe( this );
     543          15 :     if( threaded )
     544          15 :         pipe->startThread();
     545             : 
     546          15 :     Config* config = getConfig();
     547          15 :     LBCHECK( config->mapObject( pipe, pipeID ));
     548          15 :     pipe->notifyMapped();
     549             : 
     550          15 :     return true;
     551             : }
     552             : 
     553          15 : bool Node::_cmdDestroyPipe( co::ICommand& cmd )
     554             : {
     555          15 :     co::ObjectICommand command( cmd );
     556             : 
     557          15 :     LB_TS_THREAD( _nodeThread );
     558          15 :     LBLOG( LOG_INIT ) << "Destroy pipe " << command << std::endl;
     559             : 
     560          15 :     Pipe* pipe = findPipe( command.read< uint128_t >( ));
     561          15 :     LBASSERT( pipe );
     562          15 :     pipe->exitThread();
     563             : 
     564          15 :     const bool stopped = pipe->isStopped();
     565             : 
     566          15 :     Config* config = getConfig();
     567          15 :     config->unmapObject( pipe );
     568          15 :     pipe->send( getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY ) << stopped;
     569          15 :     Global::getNodeFactory()->releasePipe( pipe );
     570             : 
     571          15 :     return true;
     572             : }
     573             : 
     574           8 : bool Node::_cmdConfigInit( co::ICommand& cmd )
     575             : {
     576           8 :     co::ObjectICommand command( cmd );
     577             : 
     578           8 :     LB_TS_THREAD( _nodeThread );
     579           8 :     LBLOG( LOG_INIT ) << "Init node " << command << std::endl;
     580             : 
     581           8 :     _impl->state = STATE_INITIALIZING;
     582             : 
     583           8 :     const uint128_t& initID = command.read< uint128_t >();
     584           8 :     const uint32_t frameNumber = command.read< uint32_t >();
     585             : 
     586           8 :     _impl->currentFrame  = frameNumber;
     587           8 :     _impl->unlockedFrame = frameNumber;
     588           8 :     _impl->finishedFrame = frameNumber;
     589           8 :     _setAffinity();
     590             : 
     591           8 :     _impl->transmitter.start();
     592           8 :     const uint64_t result = configInit( initID );
     593             : 
     594           8 :     if( getIAttribute( IATTR_THREAD_MODEL ) == eq::UNDEFINED )
     595           8 :         setIAttribute( IATTR_THREAD_MODEL, eq::DRAW_SYNC );
     596             : 
     597           8 :     _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
     598             : 
     599           8 :     commit();
     600             :     send( command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY )
     601           8 :             << result;
     602           8 :     return true;
     603             : }
     604             : 
     605           8 : bool Node::_cmdConfigExit( co::ICommand& cmd )
     606             : {
     607           8 :     co::ObjectICommand command( cmd );
     608             : 
     609           8 :     LB_TS_THREAD( _nodeThread );
     610           8 :     LBLOG( LOG_INIT ) << "Node exit " << command << std::endl;
     611             : 
     612           8 :     const Pipes& pipes = getPipes();
     613          23 :     for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
     614             :     {
     615          15 :         Pipe* pipe = *i;
     616          15 :         pipe->waitExited();
     617             :     }
     618             : 
     619           8 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
     620           8 :     getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
     621           8 :     _impl->transmitter.join();
     622           8 :     _flushObjects();
     623             : 
     624           8 :     getConfig()->send( getLocalNode(),
     625          16 :                        fabric::CMD_CONFIG_DESTROY_NODE ) << getID();
     626           8 :     return true;
     627             : }
     628             : 
     629           5 : bool Node::_cmdFrameStart( co::ICommand& cmd )
     630             : {
     631           5 :     LB_TS_THREAD( _nodeThread );
     632             : 
     633           5 :     co::ObjectICommand command( cmd );
     634           5 :     const uint128_t& version = command.read< uint128_t >();
     635           5 :     const uint128_t& configVersion = command.read< uint128_t >();
     636           5 :     const uint128_t& frameID = command.read< uint128_t >();
     637           5 :     const uint32_t frameNumber = command.read< uint32_t >();
     638             : 
     639           5 :     LBVERB << "handle node frame start " << command << " frame " << frameNumber
     640           5 :            << " id " << frameID << std::endl;
     641             : 
     642           5 :     LBASSERT( _impl->currentFrame == frameNumber-1 );
     643             : 
     644           5 :     LBLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber
     645           5 :                        << std::endl;
     646             : 
     647           5 :     Config* config = getConfig();
     648             : 
     649           5 :     if( configVersion != co::VERSION_INVALID )
     650           0 :         config->sync( configVersion );
     651           5 :     sync( version );
     652             : 
     653           5 :     config->_frameStart();
     654           5 :     frameStart( frameID, frameNumber );
     655             : 
     656           5 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
     657             :                   "Node::frameStart() did not start frame " << frameNumber );
     658           5 :     return true;
     659             : }
     660             : 
     661           5 : bool Node::_cmdFrameFinish( co::ICommand& cmd )
     662             : {
     663           5 :     LB_TS_THREAD( _nodeThread );
     664             : 
     665           5 :     co::ObjectICommand command( cmd );
     666           5 :     const uint128_t& frameID = command.read< uint128_t >();
     667           5 :     const uint32_t frameNumber = command.read< uint32_t >();
     668             : 
     669           5 :     LBLOG( LOG_TASKS ) << "TASK frame finish " << getName() <<  " " << command
     670           0 :                        << " frame " << frameNumber << " id " << frameID
     671           5 :                        << std::endl;
     672             : 
     673           5 :     _finishFrame( frameNumber );
     674           5 :     _frameFinish( frameID, frameNumber );
     675             : 
     676           5 :     const uint128_t version = commit();
     677           5 :     if( version != co::VERSION_NONE )
     678           0 :         send( command.getNode(), fabric::CMD_OBJECT_SYNC );
     679           5 :     return true;
     680             : }
     681             : 
     682           5 : bool Node::_cmdFrameDrawFinish( co::ICommand& cmd )
     683             : {
     684           5 :     co::ObjectICommand command( cmd );
     685           5 :     const uint128_t& frameID = command.read< uint128_t >();
     686           5 :     const uint32_t frameNumber = command.read< uint32_t >();
     687             : 
     688           5 :     LBLOG( LOG_TASKS ) << "TASK draw finish " << getName() <<  " " << command
     689           0 :                        << " frame " << frameNumber << " id " << frameID
     690           5 :                        << std::endl;
     691             : 
     692           5 :     frameDrawFinish( frameID, frameNumber );
     693           5 :     return true;
     694             : }
     695             : 
     696           5 : bool Node::_cmdFrameTasksFinish( co::ICommand& cmd )
     697             : {
     698           5 :     co::ObjectICommand command( cmd );
     699           5 :     const uint128_t& frameID = command.read< uint128_t >();
     700           5 :     const uint32_t frameNumber = command.read< uint32_t >();
     701             : 
     702           5 :     LBLOG( LOG_TASKS ) << "TASK tasks finish " << getName() <<  " " << command
     703           5 :                        << std::endl;
     704             : 
     705           5 :     frameTasksFinish( frameID, frameNumber );
     706           5 :     return true;
     707             : }
     708             : 
     709           0 : bool Node::_cmdFrameDataTransmit( co::ICommand& cmd )
     710             : {
     711           0 :     co::ObjectICommand command( cmd );
     712             : 
     713             :     const co::ObjectVersion& frameDataVersion =
     714           0 :                                              command.read< co::ObjectVersion >();
     715           0 :     const PixelViewport& pvp = command.read< PixelViewport >();
     716           0 :     const Zoom& zoom = command.read< Zoom >();
     717           0 :     const uint32_t buffers = command.read< uint32_t >();
     718           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     719           0 :     const bool useAlpha = command.read< bool >();
     720             :     const uint8_t* data = reinterpret_cast< const uint8_t* >(
     721           0 :                 command.getRemainingBuffer( command.getRemainingBufferSize( )));
     722             : 
     723           0 :     LBLOG( LOG_ASSEMBLY )
     724           0 :         << "received image data for " << frameDataVersion << ", buffers "
     725           0 :         << buffers << " pvp " << pvp << std::endl;
     726             : 
     727           0 :     LBASSERT( pvp.isValid( ));
     728             : 
     729           0 :     FrameDataPtr frameData = getFrameData( frameDataVersion );
     730           0 :     LBASSERT( !frameData->isReady() );
     731             : 
     732             :     NodeStatistics event( Statistic::NODE_FRAME_DECOMPRESS, this,
     733           0 :                           frameNumber );
     734             : 
     735             :     // Note on the const_cast: since the PixelData structure stores non-const
     736             :     // pointers, we have to go non-const at some point, even though we do not
     737             :     // modify the data.
     738           0 :     LBCHECK( frameData->addImage( frameDataVersion, pvp, zoom, buffers,
     739             :                                   useAlpha, const_cast< uint8_t* >( data )));
     740           0 :     return true;
     741             : }
     742             : 
     743           0 : bool Node::_cmdFrameDataReady( co::ICommand& cmd )
     744             : {
     745           0 :     co::ObjectICommand command( cmd );
     746             : 
     747             :     const co::ObjectVersion& frameDataVersion =
     748           0 :                                             command.read< co::ObjectVersion >();
     749           0 :     const FrameData::Data& data = command.read< FrameData::Data >();
     750             : 
     751           0 :     LBLOG( LOG_ASSEMBLY ) << "received ready for " << frameDataVersion
     752           0 :                           << std::endl;
     753           0 :     FrameDataPtr frameData = getFrameData( frameDataVersion );
     754           0 :     LBASSERT( frameData );
     755           0 :     LBASSERT( !frameData->isReady() );
     756           0 :     frameData->setReady( frameDataVersion, data );
     757           0 :     LBASSERT( frameData->isReady() );
     758           0 :     return true;
     759             : }
     760             : 
     761           0 : bool Node::_cmdSetAffinity( co::ICommand& cmd )
     762             : {
     763           0 :     co::ObjectICommand command( cmd );
     764             : 
     765           0 :     lunchbox::Thread::setAffinity( command.read< int32_t >( ));
     766           0 :     return true;
     767             : }
     768             : }
     769             : 
     770             : #include "../fabric/node.ipp"
     771             : template class eq::fabric::Node< eq::Config, eq::Node, eq::Pipe,
     772             :                                  eq::NodeVisitor >;
     773             : 
     774             : /** @cond IGNORE */
     775             : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
     776          36 :                                                              const eq::Super& );
     777             : /** @endcond */

Generated by: LCOV version 1.10