LCOV - code coverage report
Current view: top level - eq/server - node.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 211 420 50.2 %
Date: 2014-06-18 Functions: 39 50 78.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2013, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2011, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "node.h"
      21             : 
      22             : #include "channel.h"
      23             : #include "config.h"
      24             : #include "global.h"
      25             : #include "log.h"
      26             : #include "nodeFactory.h"
      27             : #include "pipe.h"
      28             : #include "server.h"
      29             : #include "window.h"
      30             : 
      31             : #include <eq/client/event.h>
      32             : #include <eq/client/error.h>
      33             : 
      34             : #include <eq/fabric/commands.h>
      35             : #include <eq/fabric/elementVisitor.h>
      36             : #include <eq/fabric/paths.h>
      37             : 
      38             : #include <co/barrier.h>
      39             : #include <co/global.h>
      40             : #include <co/objectICommand.h>
      41             : 
      42             : #include <lunchbox/clock.h>
      43             : #include <lunchbox/launcher.h>
      44             : #include <lunchbox/os.h>
      45             : #include <lunchbox/sleep.h>
      46             : 
      47             : namespace eq
      48             : {
      49             : namespace server
      50             : {
      51             : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
      52             : typedef co::CommandFunc<Node> NodeFunc;
      53             : namespace
      54             : {
      55             : #define S_MAKE_ATTR_STRING( attr ) ( std::string("EQ_NODE_") + #attr )
      56          18 : std::string _sAttributeStrings[] = {
      57             :     S_MAKE_ATTR_STRING( SATTR_LAUNCH_COMMAND )
      58           9 : };
      59          18 : std::string _cAttributeStrings[] = {
      60             :     S_MAKE_ATTR_STRING( CATTR_LAUNCH_COMMAND_QUOTE )
      61           9 : };
      62             : }
      63             : 
      64         438 : Node::Node( Config* parent )
      65             :     : Super( parent )
      66             :     , _active( 0 )
      67             :     , _finishedFrame( 0 )
      68             :     , _flushedFrame( 0 )
      69             :     , _state( STATE_STOPPED )
      70         438 :     , _bufferedTasks( new co::BufferConnection )
      71         876 :     , _lastDrawPipe( 0 )
      72             : {
      73         438 :     const Global* global = Global::instance();
      74         876 :     for( int i=0; i < Node::SATTR_LAST; ++i )
      75             :     {
      76         438 :         const SAttribute attr = static_cast< SAttribute >( i );
      77         438 :         setSAttribute( attr, global->getNodeSAttribute( attr ));
      78             :     }
      79         876 :     for( int i=0; i < Node::CATTR_LAST; ++i )
      80             :     {
      81         438 :         const CAttribute attr = static_cast< CAttribute >( i );
      82         438 :         setCAttribute( attr, global->getNodeCAttribute( attr ));
      83             :     }
      84        1752 :     for( int i = 0; i < IATTR_LAST; ++i )
      85             :     {
      86        1314 :         const IAttribute attr = static_cast< IAttribute >( i );
      87        1314 :         setIAttribute( attr, global->getNodeIAttribute( attr ));
      88             :     }
      89         438 : }
      90             : 
      91         874 : Node::~Node()
      92             : {
      93         874 : }
      94             : 
      95          15 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
      96             : {
      97          15 :     Super::attach( id, instanceID );
      98             : 
      99          15 :     co::CommandQueue* cmdQ = getCommandThreadQueue();
     100             :     registerCommand( fabric::CMD_OBJECT_SYNC,
     101          15 :                      NodeFunc( this, &Node::_cmdSync ), cmdQ );
     102             :     registerCommand( fabric::CMD_NODE_CONFIG_INIT_REPLY,
     103          15 :                      NodeFunc( this, &Node::_cmdConfigInitReply ), cmdQ );
     104             :     registerCommand( fabric::CMD_NODE_CONFIG_EXIT_REPLY,
     105          15 :                      NodeFunc( this, &Node::_cmdConfigExitReply ), cmdQ );
     106             :     registerCommand( fabric::CMD_NODE_FRAME_FINISH_REPLY,
     107          15 :                      NodeFunc( this, &Node::_cmdFrameFinishReply ), cmdQ );
     108          15 : }
     109             : 
     110         147 : ServerPtr Node::getServer()
     111             : {
     112         147 :     return getConfig() ? getConfig()->getServer() : 0;
     113             : }
     114             : 
     115           0 : ConstServerPtr Node::getServer() const
     116             : {
     117           0 :     return getConfig() ? getConfig()->getServer() : 0;
     118             : }
     119             : 
     120         109 : co::CommandQueue* Node::getMainThreadQueue()
     121             : {
     122         109 :     return getConfig()->getMainThreadQueue();
     123             : }
     124             : 
     125         147 : co::CommandQueue* Node::getCommandThreadQueue()
     126             : {
     127         147 :     return getConfig()->getCommandThreadQueue();
     128             : }
     129             : 
     130           0 : Channel* Node::getChannel( const ChannelPath& path )
     131             : {
     132           0 :     const Pipes& pipes = getPipes();
     133           0 :     LBASSERT( pipes.size() > path.pipeIndex );
     134             : 
     135           0 :     if( pipes.size() <= path.pipeIndex )
     136           0 :         return 0;
     137             : 
     138           0 :     return pipes[ path.pipeIndex ]->getChannel( path );
     139             : }
     140             : 
     141         348 : void Node::addTasks( const uint32_t tasks )
     142             : {
     143         348 :     setTasks( getTasks() | tasks );
     144         348 : }
     145             : 
     146          22 : void Node::activate()
     147             : {
     148          22 :     ++_active;
     149          22 :     LBLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
     150          22 : }
     151             : 
     152          22 : void Node::deactivate()
     153             : {
     154          22 :     LBASSERT( _active != 0 );
     155          22 :     --_active;
     156          22 :     LBLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
     157          22 : };
     158             : 
     159         438 : void Node::setSAttribute( const SAttribute attr, const std::string& value )
     160             : {
     161         438 :     _sAttributes[attr] = value;
     162         438 : }
     163             : 
     164         239 : const std::string& Node::getSAttribute( const SAttribute attr ) const
     165             : {
     166         239 :     return _sAttributes[attr];
     167             : }
     168             : 
     169         356 : const std::string& Node::getSAttributeString( const SAttribute attr )
     170             : {
     171         356 :     return _sAttributeStrings[attr];
     172             : }
     173             : 
     174         438 : void Node::setCAttribute( const CAttribute attr, const char value )
     175             : {
     176         438 :     _cAttributes[attr] = value;
     177         438 : }
     178             : 
     179         239 : char Node::getCAttribute( const CAttribute attr ) const
     180             : {
     181         239 :     return _cAttributes[attr];
     182             : }
     183             : 
     184         356 : const std::string& Node::getCAttributeString( const CAttribute attr )
     185             : {
     186         356 :     return _cAttributeStrings[attr];
     187             : }
     188             : 
     189             : //===========================================================================
     190             : // Operations
     191             : //===========================================================================
     192             : 
     193             : //---------------------------------------------------------------------------
     194             : // launch and connect
     195             : //---------------------------------------------------------------------------
     196             : 
     197             : namespace
     198             : {
     199           0 : static co::NodePtr _createNetNode( Node* node )
     200             : {
     201           0 :     co::NodePtr netNode = new co::Node;
     202             :     const co::ConnectionDescriptions& descriptions =
     203           0 :         node->getConnectionDescriptions();
     204           0 :     for( co::ConnectionDescriptionsCIter i = descriptions.begin();
     205           0 :          i != descriptions.end(); ++i )
     206             :     {
     207           0 :         netNode->addConnectionDescription( new co::ConnectionDescription( **i));
     208             :     }
     209             : 
     210           0 :     return netNode;
     211             : }
     212             : }
     213             : 
     214           8 : bool Node::connect()
     215             : {
     216           8 :     LBASSERT( isActive( ));
     217             : 
     218           8 :     if( _node.isValid( ))
     219           8 :         return _node->isConnected();
     220             : 
     221           0 :     if( !isStopped( ))
     222             :     {
     223           0 :         LBASSERT( _state == STATE_FAILED );
     224           0 :         return true;
     225             :     }
     226             : 
     227           0 :     co::LocalNodePtr localNode = getLocalNode();
     228           0 :     LBASSERT( localNode.isValid( ));
     229             : 
     230           0 :     if( !_node )
     231             :     {
     232           0 :         LBASSERT( !isApplicationNode( ));
     233           0 :         _node = _createNetNode( this );
     234             :     }
     235             :     else
     236             :     {
     237           0 :         LBASSERT( isApplicationNode( ));
     238             :     }
     239             : 
     240           0 :     LBLOG( LOG_INIT ) << "Connecting node" << std::endl;
     241           0 :     if( !localNode->connect( _node ) && !launch( ))
     242             :     {
     243           0 :         LBWARN << "Connection to " << _node->getNodeID() << " failed"
     244           0 :                << std::endl;
     245           0 :         _state = STATE_FAILED;
     246           0 :         _node = 0;
     247           0 :         return false;
     248             :     }
     249             : 
     250           0 :     return true;
     251             : }
     252             : 
     253           0 : bool Node::launch()
     254             : {
     255           0 :     for( co::ConnectionDescriptionsCIter i = _connectionDescriptions.begin();
     256           0 :          _host.empty() && i != _connectionDescriptions.end(); ++i )
     257             :     {
     258           0 :         _host = (*i)->getHostname();
     259           0 :         LBWARN << "No host specified, guessing " << _host << " from " << *i
     260           0 :                << std::endl;
     261             :     }
     262             : 
     263           0 :     if( _launch( _host ))
     264           0 :         return true;
     265             : 
     266             :     // Equalizer 1.0 style: try hostnames from connection descriptions
     267           0 :     for( co::ConnectionDescriptionsCIter i = _connectionDescriptions.begin();
     268           0 :          i != _connectionDescriptions.end(); ++i )
     269             :     {
     270           0 :         const std::string& hostname = (*i)->getHostname();
     271           0 :         if( hostname != _host && _launch( hostname ))
     272           0 :             return true;
     273             :     }
     274             : 
     275           0 :     sendError( ERROR_NODE_LAUNCH ) << _host;
     276           0 :     return false;
     277             : }
     278             : 
     279           8 : bool Node::syncLaunch( const lunchbox::Clock& clock )
     280             : {
     281           8 :     LBASSERT( isActive( ));
     282             : 
     283           8 :     if( !_node )
     284           0 :         return false;
     285             : 
     286           8 :     if( _node->isConnected( ))
     287           8 :         return true;
     288             : 
     289           0 :     LBASSERT( !isApplicationNode( ));
     290           0 :     co::LocalNodePtr localNode = getLocalNode();
     291           0 :     LBASSERT( localNode.isValid( ));
     292             : 
     293           0 :     const int32_t timeOut = getIAttribute( IATTR_LAUNCH_TIMEOUT );
     294             : 
     295             :     while( true )
     296             :     {
     297           0 :         co::NodePtr node = localNode->getNode( _node->getNodeID( ));
     298           0 :         if( node && node->isConnected( ))
     299             :         {
     300           0 :             LBASSERT( _node->getRefCount() == 1 );
     301           0 :             _node = node; // Use co::Node already connected
     302           0 :             return true;
     303             :         }
     304             : 
     305           0 :         lunchbox::sleep( 100 /*ms*/ );
     306           0 :         if( timeOut != static_cast<int32_t>(LB_TIMEOUT_INDEFINITE) &&
     307           0 :             clock.getTime64() > timeOut )
     308             :         {
     309           0 :             LBASSERT( _node->getRefCount() == 1 );
     310           0 :             _node = 0;
     311           0 :             std::ostringstream data;
     312             : 
     313           0 :             for( co::ConnectionDescriptions::const_iterator i =
     314           0 :                      _connectionDescriptions.begin();
     315           0 :                  i != _connectionDescriptions.end(); ++i )
     316             :             {
     317           0 :                 co::ConnectionDescriptionPtr desc = *i;
     318           0 :                 data << desc->getHostname() << ' ';
     319           0 :             }
     320             : 
     321           0 :             sendError( ERROR_NODE_CONNECT ) << _host;
     322           0 :             _state = STATE_FAILED;
     323           0 :             return false;
     324             :         }
     325           0 :     }
     326             : }
     327             : 
     328           0 : bool Node::_launch( const std::string& hostname ) const
     329             : {
     330           0 :     const std::string& command = getSAttribute( SATTR_LAUNCH_COMMAND );
     331           0 :     const size_t commandLen = command.size();
     332             : 
     333           0 :     bool commandFound = false;
     334           0 :     size_t lastPos = 0;
     335           0 :     std::string cmd;
     336             : 
     337           0 :     for( size_t percentPos = command.find( '%' );
     338             :          percentPos != std::string::npos;
     339           0 :          percentPos = command.find( '%', percentPos+1 ))
     340             :     {
     341           0 :         std::ostringstream replacement;
     342           0 :         switch( command[percentPos+1] )
     343             :         {
     344             :             case 'c':
     345             :             {
     346           0 :                 replacement << _createRemoteCommand();
     347           0 :                 commandFound = true;
     348           0 :                 break;
     349             :             }
     350             :             case 'h':
     351             :             {
     352           0 :                 if( hostname.empty( ))
     353           0 :                     replacement << "127.0.0.1";
     354             :                 else
     355           0 :                     replacement << hostname;
     356           0 :                 break;
     357             :             }
     358             :             case 'n':
     359           0 :                 replacement << _node->getNodeID();
     360           0 :                 break;
     361             : 
     362             :             case 'd':
     363           0 :                 replacement << getConfig()->getWorkDir();
     364           0 :                 break;
     365             : 
     366             :             case 'q':
     367           0 :                 replacement << getCAttribute( CATTR_LAUNCH_COMMAND_QUOTE );
     368           0 :                 break;
     369             : 
     370             :             default:
     371           0 :                 LBWARN << "Unknown token " << command[percentPos+1]
     372           0 :                        << std::endl;
     373           0 :                 replacement << '%' << command[percentPos+1];
     374             :         }
     375             : 
     376           0 :         cmd += command.substr( lastPos, percentPos-lastPos );
     377           0 :         if( !replacement.str().empty( ))
     378           0 :             cmd += replacement.str();
     379             : 
     380           0 :         lastPos  = percentPos+2;
     381           0 :     }
     382             : 
     383           0 :     cmd += command.substr( lastPos, commandLen-lastPos );
     384             : 
     385           0 :     if( !commandFound )
     386           0 :         cmd += " " + _createRemoteCommand();
     387             : 
     388           0 :     LBVERB << "Launch command: " << cmd << std::endl;
     389           0 :     if( lunchbox::Launcher::run( cmd ))
     390           0 :         return true;
     391             : 
     392           0 :     LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
     393           0 :     return false;
     394             : }
     395             : 
     396           0 : std::string Node::_createRemoteCommand() const
     397             : {
     398           0 :     const Config* config = getConfig();
     399           0 :     std::string program = config->getRenderClient();
     400           0 :     if( program.empty( ))
     401             :     {
     402           0 :         LBWARN << "No render client name, auto-launch will fail" << std::endl;
     403           0 :         return std::string();
     404             :     }
     405             : 
     406             :     //----- environment
     407           0 :     std::ostringstream stringStream;
     408           0 :     const char quote = getCAttribute( CATTR_LAUNCH_COMMAND_QUOTE );
     409             : 
     410             : #ifndef WIN32
     411             : #  ifdef Darwin
     412             :     const char libPath[] = "DYLD_LIBRARY_PATH";
     413             : #  else
     414           0 :     const char libPath[] = "LD_LIBRARY_PATH";
     415             : #  endif
     416             : 
     417           0 :     stringStream << "env "; // XXX
     418           0 :     char* env = getenv( libPath );
     419           0 :     if( env )
     420           0 :         stringStream << libPath << "=" << env << " ";
     421             : 
     422           0 :     for( int i=0; environ[i] != 0; ++i )
     423             :     {
     424           0 :         if( strlen( environ[i] ) > 2 &&
     425           0 :             ( strncmp( environ[i], "LB_", 3 ) == 0 ||
     426           0 :               strncmp( environ[i], "CO_", 3 ) == 0 ||
     427           0 :               strncmp( environ[i], "EQ_", 3 ) == 0 ))
     428             :         {
     429           0 :             stringStream << quote << environ[i] << quote << " ";
     430             :         }
     431             :     }
     432             : 
     433           0 :     stringStream << "LB_LOG_LEVEL=" <<lunchbox::Log::getLogLevelString() << " ";
     434           0 :     if( lunchbox::Log::topics != 0 )
     435           0 :         stringStream << "LB_LOG_TOPICS=" <<lunchbox::Log::topics << " ";
     436             : #endif // WIN32
     437             : 
     438             :     //----- program + args
     439           0 :     const std::string& workDir = config->getWorkDir();
     440             : #ifdef WIN32
     441             :     LBASSERT( program.length() > 2 );
     442             :     if( !( program[1] == ':' && (program[2] == '/' || program[2] == '\\' )) &&
     443             :         // !( drive letter and full path present )
     444             :         !( program[0] == '/' || program[0] == '\\' ))
     445             :         // !full path without drive letter
     446             :     {
     447             :         program = workDir + '/' + program; // add workDir to relative path
     448             :     }
     449             : #else
     450           0 :     if( program[0] != '/' )
     451           0 :         program = workDir + '/' + program;
     452             : #endif
     453             : 
     454           0 :     const std::string ownData = getServer()->serialize();
     455           0 :     const std::string remoteData = _node->serialize();
     456           0 :     std::string collageGlobals;
     457           0 :     co::Global::toString( collageGlobals );
     458             : 
     459             :     stringStream
     460           0 :         << quote << program << quote << " -- --eq-client " << quote
     461           0 :         << remoteData << workDir << CO_SEPARATOR << ownData << quote
     462           0 :         << " --co-globals " << quote << collageGlobals << quote;
     463             : 
     464           0 :     return stringStream.str();
     465             : }
     466             : 
     467             : //---------------------------------------------------------------------------
     468             : // init
     469             : //---------------------------------------------------------------------------
     470           8 : void Node::configInit( const uint128_t& initID, const uint32_t frameNumber )
     471             : {
     472           8 :     LBASSERT( _state == STATE_STOPPED );
     473           8 :     _state = STATE_INITIALIZING;
     474             : 
     475           8 :     const Config* config = getConfig();
     476           8 :     _flushedFrame  = config->getFinishedFrame();
     477           8 :     _finishedFrame = config->getFinishedFrame();
     478           8 :     _frameIDs.clear();
     479             : 
     480           8 :     LBLOG( LOG_INIT ) << "Create node" << std::endl;
     481           8 :     getConfig()->send( _node, fabric::CMD_CONFIG_CREATE_NODE ) << getID();
     482             : 
     483           8 :     LBLOG( LOG_INIT ) << "Init node" << std::endl;
     484           8 :     send( fabric::CMD_NODE_CONFIG_INIT ) << initID << frameNumber;
     485           8 : }
     486             : 
     487           8 : bool Node::syncConfigInit()
     488             : {
     489           8 :     LBASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
     490             :               _state == STATE_INIT_FAILED );
     491             : 
     492           8 :     _state.waitNE( STATE_INITIALIZING );
     493             : 
     494           8 :     if( _state == STATE_INIT_SUCCESS )
     495             :     {
     496           8 :         _state = STATE_RUNNING;
     497           8 :         return true;
     498             :     }
     499             : 
     500           0 :     LBWARN << "Node initialization failed" << std::endl;
     501           0 :     configExit();
     502           0 :     return false;
     503             : }
     504             : 
     505             : //---------------------------------------------------------------------------
     506             : // exit
     507             : //---------------------------------------------------------------------------
     508           8 : void Node::configExit()
     509             : {
     510           8 :     if( _state == STATE_EXITING )
     511           8 :         return;
     512             : 
     513           8 :     LBASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
     514           8 :     _state = STATE_EXITING;
     515             : 
     516           8 :     LBLOG( LOG_INIT ) << "Exit node" << std::endl;
     517           8 :     send( fabric::CMD_NODE_CONFIG_EXIT );
     518           8 :     flushSendBuffer();
     519             : }
     520             : 
     521           8 : bool Node::syncConfigExit()
     522             : {
     523           8 :     LBASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS ||
     524             :               _state == STATE_EXIT_FAILED );
     525             : 
     526           8 :     _state.waitNE( STATE_EXITING );
     527           8 :     const bool success = ( _state == STATE_EXIT_SUCCESS );
     528           8 :     LBASSERT( success || _state == STATE_EXIT_FAILED );
     529             : 
     530           8 :     _state = isActive() ? STATE_FAILED : STATE_STOPPED;
     531           8 :     setTasks( fabric::TASK_NONE );
     532           8 :     _frameIDs.clear();
     533           8 :     _flushBarriers();
     534           8 :     return success;
     535             : }
     536             : 
     537             : //---------------------------------------------------------------------------
     538             : // update
     539             : //---------------------------------------------------------------------------
     540           5 : void Node::update( const uint128_t& frameID, const uint32_t frameNumber )
     541             : {
     542           5 :     if( !isRunning( ))
     543           5 :         return;
     544             : 
     545           5 :     LBVERB << "Start frame " << frameNumber << std::endl;
     546           5 :     LBASSERT( isActive( ));
     547             : 
     548           5 :     _frameIDs[ frameNumber ] = frameID;
     549             : 
     550           5 :     uint128_t configVersion = co::VERSION_INVALID;
     551           5 :     if( !isApplicationNode( )) // synced in Config::_cmdFrameStart
     552           0 :         configVersion = getConfig()->getVersion();
     553             : 
     554             :     send( fabric::CMD_NODE_FRAME_START )
     555           5 :             << getVersion() << configVersion << frameID << frameNumber;
     556           5 :     LBLOG( LOG_TASKS ) << "TASK node start frame " << std::endl;
     557             : 
     558           5 :     const Pipes& pipes = getPipes();
     559          12 :     for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
     560           7 :         (*i)->update( frameID, frameNumber );
     561             : 
     562           5 :     if( !_lastDrawPipe ) // no FrameDrawFinish sent
     563             :     {
     564           0 :         send( fabric::CMD_NODE_FRAME_DRAW_FINISH ) << frameID << frameNumber;
     565           0 :         LBLOG( LOG_TASKS ) << "TASK node draw finish " << getName() <<  " "
     566           0 :                            << std::endl;
     567             :     }
     568           5 :     _lastDrawPipe = 0;
     569             : 
     570           5 :     send( fabric::CMD_NODE_FRAME_TASKS_FINISH ) << frameID << frameNumber;
     571           5 :     LBLOG( LOG_TASKS ) << "TASK node tasks finish " << std::endl;
     572             : 
     573           5 :     _finish( frameNumber );
     574           5 :     flushSendBuffer();
     575             : }
     576             : 
     577           5 : uint32_t Node::_getFinishLatency() const
     578             : {
     579           5 :     switch( getIAttribute( Node::IATTR_THREAD_MODEL ))
     580             :     {
     581             :         case fabric::UNDEFINED:
     582             :         case fabric::DRAW_SYNC:
     583           5 :             if( getTasks() & fabric::TASK_DRAW )
     584             :             {
     585             :                 // More than one frame latency doesn't make sense, since the
     586             :                 // draw sync for frame+1 does not allow for more
     587           5 :                 const Config* config = getConfig();
     588           5 :                 const uint32_t latency = config->getLatency();
     589             : 
     590           5 :                 return LB_MIN( latency, 1 );
     591             :             }
     592           0 :             break;
     593             : 
     594             :         case fabric::LOCAL_SYNC:
     595           0 :             if( getTasks() != fabric::TASK_NONE )
     596             :                 // local sync enforces no latency
     597           0 :                 return 0;
     598           0 :             break;
     599             : 
     600             :         case fabric::ASYNC:
     601           0 :             break;
     602             :         default:
     603           0 :             LBUNIMPLEMENTED;
     604             :     }
     605             : 
     606           0 :     const Config* config = getConfig();
     607           0 :     return config->getLatency();
     608             : }
     609             : 
     610           5 : void Node::_finish( const uint32_t currentFrame )
     611             : {
     612           5 :     const Pipes& pipes = getPipes();
     613           5 :     for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
     614             :     {
     615           5 :         const Pipe* pipe = *i;
     616           5 :         if( pipe->getIAttribute( Pipe::IATTR_HINT_THREAD ) && pipe->isRunning())
     617             :         {
     618           5 :             const uint32_t latency = _getFinishLatency();
     619           5 :             if( currentFrame > latency )
     620           2 :                 flushFrames( currentFrame - latency );
     621          10 :             return;
     622             :         }
     623             :     }
     624             : 
     625             :     // else only non-threaded pipes, all local tasks are done, send finish now.
     626           0 :     flushFrames( currentFrame );
     627             : }
     628             : 
     629           5 : void Node::flushFrames( const uint32_t frameNumber )
     630             : {
     631           5 :     LBLOG( LOG_TASKS ) << "Flush frames including " << frameNumber << std::endl;
     632             : 
     633          15 :     while( _flushedFrame < frameNumber )
     634             :     {
     635           5 :         ++_flushedFrame;
     636           5 :         _sendFrameFinish( _flushedFrame );
     637             :     }
     638             : 
     639           5 :     flushSendBuffer();
     640           5 : }
     641             : 
     642           5 : void Node::_sendFrameFinish( const uint32_t frameNumber )
     643             : {
     644           5 :     FrameIDHash::iterator i = _frameIDs.find( frameNumber );
     645           5 :     if( i == _frameIDs.end( ))
     646           5 :         return; // finish already send
     647             : 
     648           5 :     send( fabric::CMD_NODE_FRAME_FINISH ) << i->second << frameNumber;
     649           5 :     _frameIDs.erase( i );
     650           5 :     LBLOG( LOG_TASKS ) << "TASK node finish frame " << frameNumber << std::endl;
     651             : }
     652             : 
     653             : //---------------------------------------------------------------------------
     654             : // Barrier cache
     655             : //---------------------------------------------------------------------------
     656           0 : co::Barrier* Node::getBarrier()
     657             : {
     658           0 :     if( _barriers.empty( ))
     659             :     {
     660             :         co::Barrier* barrier = new co::Barrier( getServer(),
     661           0 :                                                 _node->getNodeID( ));
     662           0 :         barrier->setAutoObsolete( getConfig()->getLatency() + 1 );
     663           0 :         return barrier;
     664             :     }
     665             :     // else
     666             : 
     667           0 :     co::Barrier* barrier = _barriers.back();
     668           0 :     _barriers.pop_back();
     669           0 :     barrier->setHeight( 0 );
     670           0 :     return barrier;
     671             : }
     672             : 
     673           0 : void Node::changeLatency( const uint32_t latency )
     674             : {
     675           0 :     for( co::Barriers::const_iterator i = _barriers.begin();
     676           0 :          i != _barriers.end(); ++ i )
     677             :     {
     678           0 :         co::Barrier* barrier = *i;
     679           0 :         barrier->setAutoObsolete( latency + 1 );
     680             :     }
     681           0 : }
     682             : 
     683           0 : void Node::releaseBarrier( co::Barrier* barrier )
     684             : {
     685           0 :     _barriers.push_back( barrier );
     686           0 : }
     687             : 
     688           8 : void Node::_flushBarriers()
     689             : {
     690           8 :     for( co::BarriersCIter i =_barriers.begin(); i != _barriers.end(); ++i )
     691           0 :         delete *i;
     692           8 :     _barriers.clear();
     693           8 : }
     694             : 
     695           0 : bool Node::removeConnectionDescription( co::ConnectionDescriptionPtr cd )
     696             : {
     697             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     698           0 :     for(co::ConnectionDescriptions::iterator i=_connectionDescriptions.begin();
     699           0 :         i != _connectionDescriptions.end(); ++i )
     700             :     {
     701           0 :         if( *cd != **i )
     702           0 :             continue;
     703             : 
     704           0 :         _connectionDescriptions.erase( i );
     705           0 :         return true;
     706             :     }
     707           0 :     return false;
     708             : }
     709             : 
     710          31 : co::ObjectOCommand Node::send( const uint32_t cmd )
     711             : {
     712          31 :     return send( cmd, getID( ));
     713             : }
     714             : 
     715         276 : co::ObjectOCommand Node::send( const uint32_t cmd, const uint128_t& id )
     716             : {
     717             :     return co::ObjectOCommand( co::Connections( 1, _bufferedTasks ), cmd,
     718         276 :                                co::COMMANDTYPE_OBJECT, id, CO_INSTANCE_ALL );
     719             : }
     720             : 
     721           0 : EventOCommand Node::sendError( const uint32_t error )
     722             : {
     723           0 :     return getConfig()->sendError( Event::NODE_ERROR, getID(), error );
     724             : }
     725             : 
     726          50 : void Node::flushSendBuffer()
     727             : {
     728          50 :     _bufferedTasks->sendBuffer( _node->getConnection( ));
     729          50 : }
     730             : 
     731             : //===========================================================================
     732             : // command handling
     733             : //===========================================================================
     734           8 : bool Node::_cmdConfigInitReply( co::ICommand& cmd )
     735             : {
     736           8 :     co::ObjectICommand command( cmd );
     737           8 :     LBVERB << "handle configInit reply " << command << std::endl;
     738           8 :     LBASSERT( _state == STATE_INITIALIZING );
     739           8 :     _state = command.read< uint64_t >() ? STATE_INIT_SUCCESS : STATE_INIT_FAILED;
     740             : 
     741           8 :     return true;
     742             : }
     743             : 
     744           8 : bool Node::_cmdConfigExitReply( co::ICommand& cmd )
     745             : {
     746           8 :     co::ObjectICommand command( cmd );
     747           8 :     LBVERB << "handle configExit reply " << command << std::endl;
     748           8 :     LBASSERT( _state == STATE_EXITING );
     749             : 
     750           8 :     _state = command.read< bool >() ? STATE_EXIT_SUCCESS : STATE_EXIT_FAILED;
     751           8 :     return true;
     752             : }
     753             : 
     754           5 : bool Node::_cmdFrameFinishReply( co::ICommand& cmd )
     755             : {
     756           5 :     co::ObjectICommand command( cmd );
     757           5 :     LBVERB << "handle frame finish reply " << command << std::endl;
     758             : 
     759           5 :     const uint32_t frameNumber = command.read< uint32_t >();
     760             : 
     761           5 :     _finishedFrame = frameNumber;
     762           5 :     getConfig()->notifyNodeFrameFinished( frameNumber );
     763             : 
     764           5 :     return true;
     765             : }
     766             : 
     767         239 : void Node::output( std::ostream& os ) const
     768             : {
     769         239 :     if( !_host.empty( ))
     770         129 :         os << "host     \"" << _host << '"' << std::endl;
     771             : 
     772         239 :     const co::ConnectionDescriptions& descriptions = _connectionDescriptions;
     773        1230 :     for( co::ConnectionDescriptions::const_iterator i = descriptions.begin();
     774         820 :          i != descriptions.end(); ++i )
     775             :     {
     776         171 :         co::ConnectionDescriptionPtr desc = *i;
     777         171 :         os << *desc;
     778         171 :     }
     779             : 
     780         239 :     bool attrPrinted   = false;
     781             : 
     782         956 :     for( Node::SAttribute i = static_cast<Node::SAttribute>( 0 );
     783         478 :          i < Node::SATTR_LAST;
     784             :          i = static_cast<Node::SAttribute>( static_cast<uint32_t>( i )+1))
     785             :     {
     786         239 :         const std::string& value = getSAttribute( i );
     787         239 :         if( value == Global::instance()->getNodeSAttribute( i ))
     788         239 :             continue;
     789             : 
     790           0 :         if( !attrPrinted )
     791             :         {
     792           0 :             os << std::endl << "attributes" << std::endl;
     793           0 :             os << "{" << std::endl << lunchbox::indent;
     794           0 :             attrPrinted = true;
     795             :         }
     796             : 
     797             :         os << ( i==Node::SATTR_LAUNCH_COMMAND ? "launch_command       " :
     798           0 :                 "ERROR" )
     799           0 :            << "\"" << value << "\"" << std::endl;
     800             :     }
     801             : 
     802         956 :     for( Node::CAttribute i = static_cast<Node::CAttribute>( 0 );
     803         478 :          i < Node::CATTR_LAST;
     804             :          i = static_cast<Node::CAttribute>( static_cast<uint32_t>( i )+1))
     805             :     {
     806         239 :         const char value = getCAttribute( i );
     807         239 :         if( value == Global::instance()->getNodeCAttribute( i ))
     808         239 :             continue;
     809             : 
     810           0 :         if( !attrPrinted )
     811             :         {
     812           0 :             os << std::endl << "attributes" << std::endl;
     813           0 :             os << "{" << std::endl << lunchbox::indent;
     814           0 :             attrPrinted = true;
     815             :         }
     816             : 
     817             :         os << ( i==Node::CATTR_LAUNCH_COMMAND_QUOTE ? "launch_command_quote " :
     818           0 :                 "ERROR" )
     819           0 :            << "'" << value << "'" << std::endl;
     820             :     }
     821             : 
     822        1912 :     for( Node::IAttribute i = static_cast< Node::IAttribute>( 0 );
     823         956 :          i < Node::IATTR_LAST;
     824             :          i = static_cast< Node::IAttribute >( static_cast<uint32_t>( i )+1))
     825             :     {
     826         717 :         const int value = getIAttribute( i );
     827         717 :         if( value == Global::instance()->getNodeIAttribute( i ))
     828         717 :             continue;
     829             : 
     830           0 :         if( !attrPrinted )
     831             :         {
     832           0 :             os << std::endl << "attributes" << std::endl;
     833           0 :             os << "{" << std::endl << lunchbox::indent;
     834           0 :             attrPrinted = true;
     835             :         }
     836             : 
     837             :         os << ( i== Node::IATTR_LAUNCH_TIMEOUT ? "launch_timeout       " :
     838             :                 i== Node::IATTR_THREAD_MODEL   ? "thread_model         " :
     839             :                 i== Node::IATTR_HINT_AFFINITY  ? "hint_affinity        " :
     840           0 :                 "ERROR" )
     841           0 :            << static_cast< fabric::IAttribute >( value ) << std::endl;
     842             :     }
     843             : 
     844         239 :     if( attrPrinted )
     845           0 :         os << lunchbox::exdent << "}" << std::endl << std::endl;
     846         239 : }
     847             : 
     848             : }
     849             : }
     850             : 
     851             : #include "../fabric/node.ipp"
     852             : template class eq::fabric::Node< eq::server::Config, eq::server::Node,
     853             :                                  eq::server::Pipe, eq::server::NodeVisitor >;
     854             : /** @cond IGNORE */
     855             : template std::ostream& eq::fabric::operator << ( std::ostream&,
     856          27 :                                                  const eq::server::Super& );
     857             : /** @endcond */

Generated by: LCOV version 1.10