LCOV - code coverage report
Current view: top level - eq - node.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 155 379 40.9 %
Date: 2016-07-30 05:04:55 Functions: 31 56 55.4 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder<cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "node.h"
      21             : 
      22             : #include "client.h"
      23             : #include "config.h"
      24             : #include "error.h"
      25             : #include "exception.h"
      26             : #include "frameData.h"
      27             : #include "global.h"
      28             : #include "log.h"
      29             : #include "nodeFactory.h"
      30             : #include "nodeStatistics.h"
      31             : #include "pipe.h"
      32             : #include "server.h"
      33             : 
      34             : #include <eq/fabric/commands.h>
      35             : #include <eq/fabric/elementVisitor.h>
      36             : #include <eq/fabric/frameData.h>
      37             : #include <eq/fabric/task.h>
      38             : 
      39             : #include <co/barrier.h>
      40             : #include <co/connection.h>
      41             : #include <co/global.h>
      42             : #include <co/objectICommand.h>
      43             : #include <lunchbox/scopedMutex.h>
      44             : 
      45             : namespace eq
      46             : {
      47             : namespace
      48             : {
      49             : typedef stde::hash_map< uint128_t, co::Barrier* > BarrierHash;
      50             : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
      51             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      52             : typedef FrameDataHash::iterator FrameDataHashIter;
      53             : 
      54             : enum State
      55             : {
      56             :     STATE_STOPPED,
      57             :     STATE_INITIALIZING,
      58             :     STATE_INIT_FAILED,
      59             :     STATE_RUNNING,
      60             :     STATE_FAILED
      61             : };
      62             : }
      63             : 
      64             : namespace detail
      65             : {
      66             : class TransmitThread : public lunchbox::Thread
      67             : {
      68             : public:
      69          31 :     TransmitThread()
      70          31 :         : _queue( co::Global::getCommandQueueLimit( ))
      71          31 :     {}
      72           1 :     virtual ~TransmitThread() {}
      73             : 
      74          33 :     co::CommandQueue& getQueue() { return _queue; }
      75             : 
      76             : protected:
      77           1 :     bool init() override { setName( "Xmit" ); return true; }
      78             :     void run() override;
      79             : 
      80             : private:
      81             :     co::CommandQueue _queue;
      82             : };
      83             : 
      84           1 : class Node
      85             : {
      86             : public:
      87          31 :     Node()
      88             :         : state( STATE_STOPPED )
      89             :         , finishedFrame( 0 )
      90          31 :         , unlockedFrame( 0 )
      91          31 :     {}
      92             : 
      93             :     /** The configInit/configExit state. */
      94             :     lunchbox::Monitor< State > state;
      95             : 
      96             :     /** The number of the last started frame. */
      97             :     lunchbox::Monitor< uint32_t > currentFrame;
      98             : 
      99             :     /** The number of the last finished frame. */
     100             :     uint32_t finishedFrame;
     101             : 
     102             :     /** The number of the last locally released frame. */
     103             :     uint32_t unlockedFrame;
     104             : 
     105             :     /** All barriers mapped by the node. */
     106             :     lunchbox::Lockable< BarrierHash > barriers;
     107             : 
     108             :     /** All frame datas used by the node during rendering. */
     109             :     lunchbox::Lockable< FrameDataHash > frameDatas;
     110             : 
     111             :     TransmitThread transmitter;
     112             : };
     113             : 
     114             : }
     115             : 
     116             : /** @cond IGNORE */
     117             : typedef co::CommandFunc<Node> NodeFunc;
     118             : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
     119             : /** @endcond */
     120             : 
     121          31 : Node::Node( Config* parent )
     122             :     : Super( parent )
     123          31 :     , _impl( new detail::Node )
     124             : {
     125          31 : }
     126             : 
     127           3 : Node::~Node()
     128             : {
     129           1 :     LBASSERT( getPipes().empty( ));
     130           1 :     delete _impl;
     131           2 : }
     132             : 
     133           1 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
     134             : {
     135           1 :     Super::attach( id, instanceID );
     136             : 
     137           1 :     co::CommandQueue* queue = getMainThreadQueue();
     138           1 :     co::CommandQueue* commandQ = getCommandThreadQueue();
     139           1 :     co::CommandQueue* transmitQ = getTransmitterQueue();
     140             : 
     141             :     registerCommand( fabric::CMD_NODE_CREATE_PIPE,
     142           1 :                      NodeFunc( this, &Node::_cmdCreatePipe ), queue );
     143             :     registerCommand( fabric::CMD_NODE_DESTROY_PIPE,
     144           1 :                      NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
     145             :     registerCommand( fabric::CMD_NODE_CONFIG_INIT,
     146           1 :                      NodeFunc( this, &Node::_cmdConfigInit ), queue );
     147             :     registerCommand( fabric::CMD_NODE_SET_AFFINITY,
     148           1 :                      NodeFunc( this, &Node::_cmdSetAffinity), transmitQ );
     149             :     registerCommand( fabric::CMD_NODE_CONFIG_EXIT,
     150           1 :                      NodeFunc( this, &Node::_cmdConfigExit ), queue );
     151             :     registerCommand( fabric::CMD_NODE_FRAME_START,
     152           1 :                      NodeFunc( this, &Node::_cmdFrameStart ), queue );
     153             :     registerCommand( fabric::CMD_NODE_FRAME_FINISH,
     154           1 :                      NodeFunc( this, &Node::_cmdFrameFinish ), queue );
     155             :     registerCommand( fabric::CMD_NODE_FRAME_DRAW_FINISH,
     156           1 :                      NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
     157             :     registerCommand( fabric::CMD_NODE_FRAME_TASKS_FINISH,
     158           1 :                      NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
     159             :     registerCommand( fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
     160           1 :                      NodeFunc( this, &Node::_cmdFrameDataTransmit ), commandQ );
     161             :     registerCommand( fabric::CMD_NODE_FRAMEDATA_READY,
     162           1 :                      NodeFunc( this, &Node::_cmdFrameDataReady ), commandQ );
     163           1 : }
     164             : 
     165           1 : void Node::setDirty( const uint64_t bits )
     166             : {
     167             :     // jump over fabric setDirty to avoid dirty'ing config node list
     168             :     // nodes are individually synced in frame finish
     169           1 :     Object::setDirty( bits );
     170           1 : }
     171             : 
     172           3 : ClientPtr Node::getClient()
     173             : {
     174           3 :     Config* config = getConfig();
     175           3 :     LBASSERT( config );
     176           3 :     return ( config ? config->getClient() : 0 );
     177             : }
     178             : 
     179           5 : ServerPtr Node::getServer()
     180             : {
     181           5 :     Config* config = getConfig();
     182           5 :     LBASSERT( config );
     183           5 :     return ( config ? config->getServer() : 0 );
     184             : }
     185             : 
     186           1 : co::CommandQueue* Node::getMainThreadQueue()
     187             : {
     188           1 :     return getConfig()->getMainThreadQueue();
     189             : }
     190             : 
     191           1 : co::CommandQueue* Node::getCommandThreadQueue()
     192             : {
     193           1 :     return getConfig()->getCommandThreadQueue();
     194             : }
     195             : 
     196          33 : co::CommandQueue* Node::getTransmitterQueue()
     197             : {
     198          33 :     return &_impl->transmitter.getQueue();
     199             : }
     200             : 
     201           0 : uint32_t Node::getCurrentFrame() const
     202             : {
     203           0 :     return _impl->currentFrame.get();
     204             : }
     205             : 
     206           0 : uint32_t Node::getFinishedFrame() const
     207             : {
     208           0 :     return _impl->finishedFrame;
     209             : }
     210             : 
     211           0 : co::Barrier* Node::getBarrier( const co::ObjectVersion& barrier )
     212             : {
     213           0 :     lunchbox::ScopedMutex<> mutex( _impl->barriers );
     214           0 :     co::Barrier* netBarrier = _impl->barriers.data[ barrier.identifier ];
     215             : 
     216           0 :     if( netBarrier )
     217           0 :         netBarrier->sync( barrier.version );
     218             :     else
     219             :     {
     220           0 :         ClientPtr client = getClient();
     221             : 
     222           0 :         netBarrier = new co::Barrier( client, barrier );
     223           0 :         if( !netBarrier->isGood( ))
     224             :         {
     225           0 :             LBCHECK( netBarrier->isGood( ));
     226           0 :             LBWARN << "Could not map swap barrier" << std::endl;
     227           0 :             delete netBarrier;
     228           0 :             return 0;
     229             :         }
     230           0 :         _impl->barriers.data[ barrier.identifier ] = netBarrier;
     231             :     }
     232             : 
     233           0 :     return netBarrier;
     234             : }
     235             : 
     236           0 : FrameDataPtr Node::getFrameData( const co::ObjectVersion& frameDataVersion )
     237             : {
     238           0 :     lunchbox::ScopedWrite mutex( _impl->frameDatas );
     239           0 :     FrameDataPtr data = _impl->frameDatas.data[ frameDataVersion.identifier ];
     240             : 
     241           0 :     if( !data )
     242             :     {
     243           0 :         data = new FrameData;
     244           0 :         data->setID( frameDataVersion.identifier );
     245           0 :         _impl->frameDatas.data[ frameDataVersion.identifier ] = data;
     246             :     }
     247             : 
     248           0 :     LBASSERT( frameDataVersion.version.high() == 0 );
     249           0 :     data->setVersion( frameDataVersion.version.low( ));
     250           0 :     return data;
     251             : }
     252             : 
     253           0 : void Node::releaseFrameData( FrameDataPtr data )
     254             : {
     255           0 :     lunchbox::ScopedWrite mutex( _impl->frameDatas );
     256           0 :     FrameDataHashIter i = _impl->frameDatas->find( data->getID( ));
     257           0 :     LBASSERT( i != _impl->frameDatas->end( ));
     258           0 :     if( i == _impl->frameDatas->end( ))
     259           0 :         return;
     260             : 
     261           0 :     _impl->frameDatas->erase( i );
     262             : }
     263             : 
     264           1 : void Node::waitInitialized() const
     265             : {
     266           1 :     _impl->state.waitGE( STATE_INIT_FAILED );
     267           1 : }
     268             : 
     269           1 : bool Node::isRunning() const
     270             : {
     271           1 :     return _impl->state == STATE_RUNNING;
     272             : }
     273             : 
     274           1 : bool Node::isStopped() const
     275             : {
     276           1 :     return _impl->state == STATE_STOPPED;
     277             : }
     278             : 
     279           1 : bool Node::configInit( const uint128_t& )
     280             : {
     281           1 :     WindowSystem::configInit( this );
     282           1 :     return true;
     283             : }
     284             : 
     285           1 : bool Node::configExit()
     286             : {
     287           1 :     WindowSystem::configExit( this );
     288           1 :     return true;
     289             : }
     290             : 
     291           1 : void Node::_setAffinity()
     292             : {
     293           1 :     const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
     294           1 :     switch( affinity )
     295             :     {
     296             :         case OFF:
     297           0 :             break;
     298             : 
     299             :         case AUTO:
     300             :             // TODO
     301           1 :             LBVERB << "No automatic thread placement for node threads "
     302           1 :                    << std::endl;
     303           1 :             break;
     304             : 
     305             :         default:
     306           0 :             co::LocalNodePtr node = getLocalNode();
     307           0 :             send( node, fabric::CMD_NODE_SET_AFFINITY ) << affinity;
     308             : 
     309           0 :             node->setAffinity( affinity );
     310           0 :             break;
     311             :     }
     312           1 : }
     313             : 
     314           0 : void Node::waitFrameStarted( const uint32_t frameNumber ) const
     315             : {
     316           0 :     _impl->currentFrame.waitGE( frameNumber );
     317           0 : }
     318             : 
     319           0 : void Node::startFrame( const uint32_t frameNumber )
     320             : {
     321           0 :     _impl->currentFrame = frameNumber;
     322           0 : }
     323             : 
     324           0 : void Node::frameFinish( const uint128_t&, const uint32_t frameNumber )
     325             : {
     326           0 :     releaseFrame( frameNumber );
     327           0 : }
     328             : 
     329           0 : void Node::_finishFrame( const uint32_t frameNumber ) const
     330             : {
     331           0 :     const Pipes& pipes = getPipes();
     332           0 :     for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
     333             :     {
     334           0 :         Pipe* pipe = *i;
     335           0 :         LBASSERT( pipe->isThreaded() || pipe->getFinishedFrame()>=frameNumber );
     336             : 
     337           0 :         pipe->waitFrameLocal( frameNumber );
     338           0 :         pipe->waitFrameFinished( frameNumber );
     339             :     }
     340           0 : }
     341             : 
     342           0 : void Node::_frameFinish( const uint128_t& frameID,
     343             :                          const uint32_t frameNumber )
     344             : {
     345           0 :     frameFinish( frameID, frameNumber );
     346           0 :     LBLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber
     347           0 :                        << std::endl;
     348             : 
     349           0 :     if( _impl->unlockedFrame < frameNumber )
     350             :     {
     351           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
     352           0 :                << std::endl;
     353           0 :         releaseFrameLocal( frameNumber );
     354             :     }
     355             : 
     356           0 :     if( _impl->finishedFrame < frameNumber )
     357             :     {
     358           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
     359           0 :                << std::endl;
     360           0 :         releaseFrame( frameNumber );
     361             :     }
     362           0 : }
     363             : 
     364           0 : void Node::releaseFrame( const uint32_t frameNumber )
     365             : {
     366           0 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
     367             :                   "current " << _impl->currentFrame << " release " <<
     368             :                   frameNumber );
     369             : 
     370           0 :     if( _impl->finishedFrame >= frameNumber )
     371           0 :         return;
     372           0 :     _impl->finishedFrame = frameNumber;
     373             : 
     374           0 :     Config* config = getConfig();
     375           0 :     ServerPtr server = config->getServer();
     376           0 :     co::NodePtr node = server.get();
     377           0 :     send( node, fabric::CMD_NODE_FRAME_FINISH_REPLY ) << frameNumber;
     378             : }
     379             : 
     380           0 : void Node::releaseFrameLocal( const uint32_t frameNumber )
     381             : {
     382           0 :     LBASSERT( _impl->unlockedFrame <= frameNumber );
     383           0 :     _impl->unlockedFrame = frameNumber;
     384             : 
     385           0 :     Config* config = getConfig();
     386           0 :     LBASSERT( config->getNodes().size() == 1 );
     387           0 :     LBASSERT( config->getNodes()[0] == this );
     388           0 :     config->releaseFrameLocal( frameNumber );
     389             : 
     390           0 :     LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _impl->unlockedFrame
     391           0 :                        << std::endl;
     392           0 : }
     393             : 
     394           0 : void Node::frameStart( const uint128_t&, const uint32_t frameNumber )
     395             : {
     396           0 :     startFrame( frameNumber ); // unlock pipe threads
     397             : 
     398           0 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     399             :     {
     400             :         case ASYNC:
     401             :             // Don't wait for pipes to release frame locally, sync not needed
     402           0 :             releaseFrameLocal( frameNumber );
     403           0 :             break;
     404             : 
     405             :         case DRAW_SYNC:  // Sync and release in frameDrawFinish
     406             :         case LOCAL_SYNC: // Sync and release in frameTasksFinish
     407           0 :             break;
     408             : 
     409             :         default:
     410           0 :             LBUNIMPLEMENTED;
     411             :     }
     412           0 : }
     413             : 
     414           0 : void Node::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
     415             : {
     416           0 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     417             :     {
     418             :         case ASYNC:      // No sync, release in frameStart
     419             :         case LOCAL_SYNC: // Sync and release in frameTasksFinish
     420           0 :             break;
     421             : 
     422             :         case DRAW_SYNC:
     423             :         {
     424           0 :             const Pipes& pipes = getPipes();
     425           0 :             for( Pipes::const_iterator i = pipes.begin();
     426           0 :                  i != pipes.end(); ++i )
     427             :             {
     428           0 :                 const Pipe* pipe = *i;
     429           0 :                 if( pipe->getTasks() & fabric::TASK_DRAW )
     430           0 :                     pipe->waitFrameLocal( frameNumber );
     431             :             }
     432             : 
     433           0 :             releaseFrameLocal( frameNumber );
     434           0 :             break;
     435             :         }
     436             :         default:
     437           0 :             LBUNIMPLEMENTED;
     438             :     }
     439           0 : }
     440             : 
     441           0 : void Node::frameTasksFinish( const uint128_t&, const uint32_t frameNumber )
     442             : {
     443           0 :     switch( getIAttribute( IATTR_THREAD_MODEL ))
     444             :     {
     445             :         case ASYNC:      // No sync, release in frameStart
     446             :         case DRAW_SYNC:  // Sync and release in frameDrawFinish
     447           0 :             break;
     448             : 
     449             :         case LOCAL_SYNC:
     450             :         {
     451           0 :             const Pipes& pipes = getPipes();
     452           0 :             for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     453             :             {
     454           0 :                 const Pipe* pipe = *i;
     455           0 :                 if( pipe->getTasks() != fabric::TASK_NONE )
     456           0 :                     pipe->waitFrameLocal( frameNumber );
     457             :             }
     458             : 
     459           0 :             releaseFrameLocal( frameNumber );
     460           0 :             break;
     461             :         }
     462             :         default:
     463           0 :             LBUNIMPLEMENTED;
     464             :     }
     465           0 : }
     466             : 
     467             : 
     468           0 : EventOCommand Node::sendError( const uint32_t error )
     469             : {
     470           0 :     return getConfig()->sendError( Event::NODE_ERROR, Error( error, getID( )));
     471             : }
     472             : 
     473           0 : bool Node::processEvent( const Event& event )
     474             : {
     475           0 :     ConfigEvent configEvent( event );
     476           0 :     getConfig()->sendEvent( configEvent );
     477           0 :     return true;
     478             : }
     479             : 
     480           1 : void Node::_flushObjects()
     481             : {
     482           1 :     ClientPtr client = getClient();
     483             :     {
     484           1 :         lunchbox::ScopedMutex<> mutex( _impl->barriers );
     485           3 :         for( BarrierHash::const_iterator i = _impl->barriers->begin();
     486           2 :              i != _impl->barriers->end(); ++i )
     487             :         {
     488           0 :             delete i->second;
     489             :         }
     490           1 :         _impl->barriers->clear();
     491             :     }
     492             : 
     493           2 :     lunchbox::ScopedMutex<> mutex( _impl->frameDatas );
     494           3 :     for( FrameDataHashCIter i = _impl->frameDatas->begin();
     495           2 :          i != _impl->frameDatas->end(); ++i )
     496             :     {
     497           0 :         FrameDataPtr frameData = i->second;
     498           0 :         frameData->resetPlugins();
     499           0 :         client->unmapObject( frameData.get( ));
     500           0 :     }
     501           2 :     _impl->frameDatas->clear();
     502           1 : }
     503             : 
     504           1 : void detail::TransmitThread::run()
     505             : {
     506             :     while( true )
     507             :     {
     508           1 :         co::ICommand command = _queue.pop();
     509           1 :         if( !command.isValid( ))
     510           2 :             return; // exit thread
     511             : 
     512           0 :         LBCHECK( command( ));
     513           0 :     }
     514             : }
     515             : 
     516          30 : void Node::dirtyClientExit()
     517             : {
     518          30 :     const Pipes& pipes = getPipes();
     519         120 :     for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
     520             :     {
     521          90 :         Pipe* pipe = *i;
     522          90 :         pipe->cancelThread();
     523             :     }
     524          30 :     getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
     525          30 :     _impl->transmitter.join();
     526          30 : }
     527             : 
     528             : //---------------------------------------------------------------------------
     529             : // command handlers
     530             : //---------------------------------------------------------------------------
     531           1 : bool Node::_cmdCreatePipe( co::ICommand& cmd )
     532             : {
     533           1 :     LB_TS_THREAD( _nodeThread );
     534           1 :     LBASSERT( _impl->state >= STATE_INIT_FAILED );
     535             : 
     536           1 :     co::ObjectICommand command( cmd );
     537           1 :     const uint128_t& pipeID = command.read< uint128_t >();
     538           1 :     const bool threaded = command.read< bool >();
     539             : 
     540           1 :     LBLOG( LOG_INIT ) << "Create pipe " << command << " id " << pipeID
     541           1 :                       << std::endl;
     542             : 
     543           1 :     Pipe* pipe = Global::getNodeFactory()->createPipe( this );
     544           1 :     if( threaded )
     545           1 :         pipe->startThread();
     546             : 
     547           1 :     Config* config = getConfig();
     548           1 :     LBCHECK( config->mapObject( pipe, pipeID ));
     549           1 :     pipe->notifyMapped();
     550             : 
     551           1 :     return true;
     552             : }
     553             : 
     554           1 : bool Node::_cmdDestroyPipe( co::ICommand& cmd )
     555             : {
     556           1 :     co::ObjectICommand command( cmd );
     557             : 
     558           1 :     LB_TS_THREAD( _nodeThread );
     559           1 :     LBLOG( LOG_INIT ) << "Destroy pipe " << command << std::endl;
     560             : 
     561           1 :     Pipe* pipe = findPipe( command.read< uint128_t >( ));
     562           1 :     LBASSERT( pipe );
     563           1 :     pipe->exitThread();
     564             : 
     565           1 :     const bool stopped = pipe->isStopped();
     566             : 
     567           1 :     Config* config = getConfig();
     568           1 :     config->unmapObject( pipe );
     569           1 :     pipe->send( getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY ) << stopped;
     570           1 :     Global::getNodeFactory()->releasePipe( pipe );
     571             : 
     572           1 :     return true;
     573             : }
     574             : 
     575           1 : bool Node::_cmdConfigInit( co::ICommand& cmd )
     576             : {
     577           1 :     co::ObjectICommand command( cmd );
     578             : 
     579           1 :     LB_TS_THREAD( _nodeThread );
     580           1 :     LBLOG( LOG_INIT ) << "Init node " << command << std::endl;
     581             : 
     582           1 :     _impl->state = STATE_INITIALIZING;
     583             : 
     584           1 :     const uint128_t& initID = command.read< uint128_t >();
     585           1 :     const uint32_t frameNumber = command.read< uint32_t >();
     586             : 
     587           1 :     _impl->currentFrame  = frameNumber;
     588           1 :     _impl->unlockedFrame = frameNumber;
     589           1 :     _impl->finishedFrame = frameNumber;
     590           1 :     _setAffinity();
     591             : 
     592           1 :     _impl->transmitter.start();
     593           1 :     const uint64_t result = configInit( initID );
     594             : 
     595           1 :     if( getIAttribute( IATTR_THREAD_MODEL ) == eq::UNDEFINED )
     596           1 :         setIAttribute( IATTR_THREAD_MODEL, eq::DRAW_SYNC );
     597             : 
     598           1 :     _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
     599             : 
     600           1 :     commit();
     601             :     send( command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY )
     602           1 :             << result;
     603           1 :     return true;
     604             : }
     605             : 
     606           1 : bool Node::_cmdConfigExit( co::ICommand& cmd )
     607             : {
     608           1 :     co::ObjectICommand command( cmd );
     609             : 
     610           1 :     LB_TS_THREAD( _nodeThread );
     611           1 :     LBLOG( LOG_INIT ) << "Node exit " << command << std::endl;
     612             : 
     613           1 :     const Pipes& pipes = getPipes();
     614           1 :     for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
     615             :     {
     616           0 :         Pipe* pipe = *i;
     617           0 :         pipe->waitExited();
     618             :     }
     619             : 
     620           1 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
     621           1 :     getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
     622           1 :     _impl->transmitter.join();
     623           1 :     _flushObjects();
     624             : 
     625           1 :     getConfig()->send( getLocalNode(),
     626           2 :                        fabric::CMD_CONFIG_DESTROY_NODE ) << getID();
     627           1 :     return true;
     628             : }
     629             : 
     630           0 : bool Node::_cmdFrameStart( co::ICommand& cmd )
     631             : {
     632           0 :     LB_TS_THREAD( _nodeThread );
     633             : 
     634           0 :     co::ObjectICommand command( cmd );
     635           0 :     const uint128_t& version = command.read< uint128_t >();
     636           0 :     const uint128_t& configVersion = command.read< uint128_t >();
     637           0 :     const uint128_t& frameID = command.read< uint128_t >();
     638           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     639             : 
     640           0 :     LBVERB << "handle node frame start " << command << " frame " << frameNumber
     641           0 :            << " id " << frameID << std::endl;
     642             : 
     643           0 :     LBASSERT( _impl->currentFrame == frameNumber-1 );
     644             : 
     645           0 :     LBLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber
     646           0 :                        << std::endl;
     647             : 
     648           0 :     Config* config = getConfig();
     649             : 
     650           0 :     if( configVersion != co::VERSION_INVALID )
     651           0 :         config->sync( configVersion );
     652           0 :     sync( version );
     653             : 
     654           0 :     config->_frameStart();
     655           0 :     frameStart( frameID, frameNumber );
     656             : 
     657           0 :     LBASSERTINFO( _impl->currentFrame >= frameNumber,
     658             :                   "Node::frameStart() did not start frame " << frameNumber );
     659           0 :     return true;
     660             : }
     661             : 
     662           0 : bool Node::_cmdFrameFinish( co::ICommand& cmd )
     663             : {
     664           0 :     LB_TS_THREAD( _nodeThread );
     665             : 
     666           0 :     co::ObjectICommand command( cmd );
     667           0 :     const uint128_t& frameID = command.read< uint128_t >();
     668           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     669             : 
     670           0 :     LBLOG( LOG_TASKS ) << "TASK frame finish " << getName() <<  " " << command
     671           0 :                        << " frame " << frameNumber << " id " << frameID
     672           0 :                        << std::endl;
     673             : 
     674           0 :     _finishFrame( frameNumber );
     675           0 :     _frameFinish( frameID, frameNumber );
     676             : 
     677           0 :     const uint128_t version = commit();
     678           0 :     if( version != co::VERSION_NONE )
     679           0 :         send( command.getNode(), fabric::CMD_OBJECT_SYNC );
     680           0 :     return true;
     681             : }
     682             : 
     683           0 : bool Node::_cmdFrameDrawFinish( co::ICommand& cmd )
     684             : {
     685           0 :     co::ObjectICommand command( cmd );
     686           0 :     const uint128_t& frameID = command.read< uint128_t >();
     687           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     688             : 
     689           0 :     LBLOG( LOG_TASKS ) << "TASK draw finish " << getName() <<  " " << command
     690           0 :                        << " frame " << frameNumber << " id " << frameID
     691           0 :                        << std::endl;
     692             : 
     693           0 :     frameDrawFinish( frameID, frameNumber );
     694           0 :     return true;
     695             : }
     696             : 
     697           0 : bool Node::_cmdFrameTasksFinish( co::ICommand& cmd )
     698             : {
     699           0 :     co::ObjectICommand command( cmd );
     700           0 :     const uint128_t& frameID = command.read< uint128_t >();
     701           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     702             : 
     703           0 :     LBLOG( LOG_TASKS ) << "TASK tasks finish " << getName() <<  " " << command
     704           0 :                        << std::endl;
     705             : 
     706           0 :     frameTasksFinish( frameID, frameNumber );
     707           0 :     return true;
     708             : }
     709             : 
     710           0 : bool Node::_cmdFrameDataTransmit( co::ICommand& cmd )
     711             : {
     712           0 :     co::ObjectICommand command( cmd );
     713             : 
     714             :     const co::ObjectVersion& frameDataVersion =
     715           0 :                                             command.read< co::ObjectVersion >();
     716           0 :     const PixelViewport& pvp = command.read< PixelViewport >();
     717           0 :     const Zoom& zoom = command.read< Zoom >();
     718           0 :     const RenderContext& context = command.read< RenderContext >();
     719           0 :     const uint32_t buffers = command.read< uint32_t >();
     720           0 :     const uint32_t frameNumber = command.read< uint32_t >();
     721           0 :     const bool useAlpha = command.read< bool >();
     722             :     const uint8_t* data = reinterpret_cast< const uint8_t* >(
     723           0 :                 command.getRemainingBuffer( command.getRemainingBufferSize( )));
     724             : 
     725           0 :     LBLOG( LOG_ASSEMBLY )
     726           0 :         << "received image data for " << frameDataVersion << ", buffers "
     727           0 :         << buffers << " pvp " << pvp << std::endl;
     728             : 
     729           0 :     LBASSERT( pvp.isValid( ));
     730             : 
     731           0 :     FrameDataPtr frameData = getFrameData( frameDataVersion );
     732           0 :     LBASSERT( !frameData->isReady() );
     733             : 
     734             :     NodeStatistics event( Statistic::NODE_FRAME_DECOMPRESS, this,
     735           0 :                           frameNumber );
     736             : 
     737             :     // Note on the const_cast: since the PixelData structure stores non-const
     738             :     // pointers, we have to go non-const at some point, even though we do not
     739             :     // modify the data.
     740           0 :     LBCHECK( frameData->addImage( frameDataVersion, pvp, zoom, context, buffers,
     741             :                                   useAlpha, const_cast< uint8_t* >( data )));
     742           0 :     return true;
     743             : }
     744             : 
     745           0 : bool Node::_cmdFrameDataReady( co::ICommand& cmd )
     746             : {
     747           0 :     co::ObjectICommand command( cmd );
     748             : 
     749             :     const co::ObjectVersion& frameDataVersion =
     750           0 :                                             command.read< co::ObjectVersion >();
     751           0 :     fabric::FrameData data;
     752           0 :     data.deserialize( command );
     753             : 
     754           0 :     LBLOG( LOG_ASSEMBLY ) << "received ready for " << frameDataVersion
     755           0 :                           << std::endl;
     756           0 :     FrameDataPtr frameData = getFrameData( frameDataVersion );
     757           0 :     LBASSERT( frameData );
     758           0 :     LBASSERT( !frameData->isReady() );
     759           0 :     frameData->setReady( frameDataVersion, data );
     760           0 :     LBASSERT( frameData->isReady() );
     761           0 :     return true;
     762             : }
     763             : 
     764           0 : bool Node::_cmdSetAffinity( co::ICommand& cmd )
     765             : {
     766           0 :     co::ObjectICommand command( cmd );
     767             : 
     768           0 :     lunchbox::Thread::setAffinity( command.read< int32_t >( ));
     769           0 :     return true;
     770             : }
     771             : }
     772             : 
     773             : #include <eq/fabric/node.ipp>
     774             : template class eq::fabric::Node< eq::Config, eq::Node, eq::Pipe,
     775             :                                  eq::NodeVisitor >;
     776             : 
     777             : /** @cond IGNORE */
     778             : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
     779          42 :                                                              const eq::Super& );
     780             : /** @endcond */

Generated by: LCOV version 1.11