LCOV - code coverage report
Current view: top level - co - node.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 149 264 56.4 %
Date: 2016-07-14 01:20:24 Functions: 35 50 70.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "node.h"
      22             : 
      23             : #include "connectionDescription.h"
      24             : #include "customOCommand.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : 
      28             : #include <lunchbox/scopedMutex.h>
      29             : #include <lunchbox/spinLock.h>
      30             : 
      31             : #ifdef _MSC_VER
      32             : #  include <direct.h>
      33             : #  define getcwd _getcwd
      34             : #endif
      35             : #ifndef MAXPATHLEN
      36             : #  define MAXPATHLEN 1024
      37             : #endif
      38             : 
      39             : namespace co
      40             : {
      41             : namespace
      42             : {
      43             : /** The state of the node. */
      44             : enum State
      45             : {
      46             :     STATE_CLOSED,    //!< initial state
      47             :     STATE_CONNECTED, //!< proxy for a remote node, connected
      48             :     STATE_LISTENING, //!< local node, listening
      49             :     STATE_CLOSING    //!< listening, about to close
      50             : };
      51             : 
      52           0 : struct MCData
      53             : {
      54             :     ConnectionPtr connection;
      55             :     NodePtr       node;
      56             : };
      57             : typedef std::vector< MCData > MCDatas;
      58             : }
      59             : 
      60             : namespace detail
      61             : {
      62             : class Node
      63             : {
      64             : public:
      65             :     /** Globally unique node identifier. */
      66             :     NodeID id;
      67             : 
      68             :     const uint32_t type;
      69             : 
      70             :     /** The current state of this node. */
      71             :     State state;
      72             : 
      73             :     /** The connection to this node. */
      74             :     ConnectionPtr outgoing;
      75             : 
      76             :     /** The multicast connection to this node, can be 0. */
      77             :     lunchbox::Lockable< ConnectionPtr > outMulticast;
      78             : 
      79             :     /**
      80             :      * Yet unused multicast connections for this node.
      81             :      *
      82             :      * On the first multicast send usage, the connection is 'primed' by sending
      83             :      * our node identifier to the MC group, removed from this vector and set as
      84             :      * outMulticast.
      85             :      */
      86             :     MCDatas multicasts;
      87             : 
      88             :     /** The host name used to launch the remote node process */
      89             :     std::string hostname;
      90             : 
      91             :     /** The list of descriptions on how this node is reachable. */
      92             :     lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
      93             :         connectionDescriptions;
      94             : 
      95             :     /** Last time commands were received */
      96             :     int64_t lastReceive;
      97             : 
      98             :     /** Is a big endian host? */
      99             :     bool bigEndian;
     100             : 
     101         123 :     explicit Node( const uint32_t type_ )
     102             :         : id( lunchbox::make_UUID( ))
     103             :         , type( type_ )
     104             :         , state( STATE_CLOSED )
     105             :         , lastReceive ( 0 )
     106             : #ifdef COLLAGE_BIGENDIAN
     107             :         , bigEndian( true )
     108             : #else
     109         123 :         , bigEndian( false )
     110             : #endif
     111         123 :         {}
     112             : 
     113         121 :     ~Node()
     114         121 :     {
     115         121 :         LBASSERT( !outgoing );
     116         121 :         connectionDescriptions->clear();
     117         121 :     }
     118             : };
     119             : }
     120             : 
     121         123 : Node::Node( const uint32_t type )
     122         123 :         : _impl( new detail::Node( type ))
     123             : {
     124         123 :     LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
     125         123 : }
     126             : 
     127         310 : Node::~Node()
     128             : {
     129         121 :     LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
     130         121 :     delete _impl;
     131         189 : }
     132             : 
     133           0 : bool Node::operator == ( const Node* node ) const
     134             : {
     135           0 :     LBASSERTINFO( _impl->id != node->_impl->id || this == node,
     136             :                   "Two node instances with the same ID found "
     137             :                   << (void*)this << " and " << (void*)node );
     138             : 
     139           0 :     return ( _impl == node->_impl );
     140             : }
     141             : 
     142         638 : ConnectionDescriptions Node::getConnectionDescriptions() const
     143             : {
     144         638 :     lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     145         638 :     return _impl->connectionDescriptions.data;
     146             : }
     147             : 
     148         233 : ConnectionPtr Node::getMulticast()
     149             : {
     150         233 :     if( !isReachable( ))
     151           0 :         return 0;
     152             : 
     153         233 :     ConnectionPtr connection = _impl->outMulticast.data;
     154         233 :     if( connection && !connection->isClosed( ))
     155           0 :         return connection;
     156             : 
     157         466 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     158         233 :     if( _impl->multicasts.empty( ))
     159         233 :         return 0;
     160             : 
     161           0 :     MCData data = _impl->multicasts.back();
     162           0 :     _impl->multicasts.pop_back();
     163           0 :     NodePtr node = data.node;
     164             : 
     165             :     // prime multicast connections on peers
     166           0 :     LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
     167           0 :            << data.connection->getDescription() << std::endl;
     168             : 
     169             : #ifdef COLLAGE_BIGENDIAN
     170             :     uint32_t cmd = CMD_NODE_ID_BE;
     171             :     lunchbox::byteswap( cmd );
     172             : #else
     173           0 :     const uint32_t cmd = CMD_NODE_ID;
     174             : #endif
     175             :     OCommand( Connections( 1, data.connection ), cmd )
     176           0 :         << node->getNodeID() << getType() << node->serialize();
     177             : 
     178           0 :     _impl->outMulticast.data = data.connection;
     179         233 :     return data.connection;
     180             : }
     181             : 
     182          81 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
     183             : {
     184          81 :     LBASSERTINFO( isClosed(), *this );
     185          81 :     if( !isClosed( ))
     186          81 :         return;
     187          81 :     _addConnectionDescription( cd );
     188             : }
     189             : 
     190          81 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
     191             : {
     192          81 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     193          81 :     _impl->connectionDescriptions->push_back( cd );
     194          81 : }
     195             : 
     196           0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
     197             : {
     198           0 :     LBASSERTINFO( isClosed(), *this );
     199           0 :     if( !isClosed( ))
     200           0 :         return false;
     201           0 :     return _removeConnectionDescription( cd );
     202             : }
     203             : 
     204           0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
     205             : {
     206           0 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     207             : 
     208             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     209           0 :     for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
     210           0 :          i != _impl->connectionDescriptions->end(); ++i )
     211             :     {
     212           0 :         if( *cd != **i )
     213           0 :             continue;
     214             : 
     215           0 :         _impl->connectionDescriptions->erase( i );
     216           0 :         return true;
     217             :     }
     218           0 :     return false;
     219             : }
     220             : 
     221          68 : std::string Node::serialize() const
     222             : {
     223          68 :     std::ostringstream data;
     224          68 :     data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
     225         136 :          << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
     226          68 :          << CO_SEPARATOR;
     227             :     {
     228          68 :         lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     229          68 :         data << co::serialize( _impl->connectionDescriptions.data );
     230             :     }
     231          68 :     return data.str();
     232             : }
     233             : 
     234          68 : bool Node::deserialize( std::string& data )
     235             : {
     236          68 :     LBASSERT( _impl->state == STATE_CLOSED );
     237             : 
     238             :     // version check
     239          68 :     int32_t major = 0;
     240          68 :     size_t nextPos = data.find( CO_SEPARATOR );
     241          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     242             :     {
     243           0 :         LBERROR << "Could not parse node major version data" << std::endl;
     244           0 :         return false;
     245             :     }
     246             : 
     247          68 :     std::istringstream is( data.substr( 0, nextPos ));
     248          68 :     data = data.substr( nextPos + 1 );
     249          68 :     is >> major;
     250             : 
     251          68 :     int32_t minor = 0;
     252          68 :     nextPos = data.find( CO_SEPARATOR );
     253          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     254             :     {
     255           0 :         LBERROR << "Could not parse node minor version data" << std::endl;
     256           0 :         return false;
     257             :     }
     258             : 
     259          68 :     is.clear();
     260          68 :     is.str( data.substr( 0, nextPos ));
     261          68 :     data = data.substr( nextPos + 1 );
     262          68 :     is >> minor;
     263             : 
     264          68 :     if( major != Version::getMajor() || minor != Version::getMinor( ))
     265             :     {
     266           0 :         LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
     267           0 :                << minor << ", local node uses " << Version::getMajor() << '.'
     268           0 :                << Version::getMinor() << std::endl;
     269             :     }
     270             : 
     271             :     // node id
     272          68 :     nextPos = data.find( CO_SEPARATOR );
     273          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     274             :     {
     275           0 :         LBERROR << "Could not parse node id data" << std::endl;
     276           0 :         return false;
     277             :     }
     278             : 
     279          68 :     _impl->id = data.substr( 0, nextPos );
     280          68 :     data = data.substr( nextPos + 1 );
     281             : 
     282             :     // endianness
     283          68 :     nextPos = data.find( CO_SEPARATOR );
     284          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     285             :     {
     286           0 :         LBERROR << "Could not parse node endianness data" << std::endl;
     287           0 :         return false;
     288             :     }
     289             : 
     290          68 :     is.clear();
     291          68 :     is.str( data.substr( 0, nextPos ));
     292          68 :     data = data.substr( nextPos + 1 );
     293          68 :     is >> _impl->bigEndian;
     294             : 
     295             :     // Connections data
     296         136 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     297          68 :     _impl->connectionDescriptions->clear();
     298         136 :     return co::deserialize( data, _impl->connectionDescriptions.data );
     299             : }
     300             : 
     301       72077 : bool Node::isBigEndian() const
     302             : {
     303       72077 :     return _impl->bigEndian;
     304             : }
     305             : 
     306        1341 : bool Node::isReachable() const
     307             : {
     308        1341 :     return isListening() || isConnected();
     309             : }
     310             : 
     311         646 : bool Node::isConnected() const
     312             : {
     313         646 :     return _impl->state == STATE_CONNECTED;
     314             : }
     315             : 
     316      175185 : bool Node::isClosed() const
     317             : {
     318      175185 :     return _impl->state == STATE_CLOSED;
     319             : }
     320             : 
     321          49 : bool Node::isClosing() const
     322             : {
     323          49 :     return _impl->state == STATE_CLOSING;
     324             : }
     325             : 
     326       74326 : bool Node::isListening() const
     327             : {
     328       74326 :     return _impl->state == STATE_LISTENING;
     329             : }
     330             : 
     331         754 : ConnectionPtr Node::getConnection( const bool preferMulticast )
     332             : {
     333         754 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     334         753 :     return multicast ? multicast : _impl->outgoing;
     335             : }
     336             : 
     337       71476 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
     338             : {
     339       71476 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     340       71476 :     if( !isClosed( ))
     341       71476 :         return multicast ? multicast : _impl->outgoing;
     342           0 :     LBUNREACHABLE;
     343           0 :     return 0;
     344             : }
     345             : 
     346         115 : ConnectionPtr Node::_getMulticast() const
     347             : {
     348         115 :     return _impl->outMulticast.data;
     349             : }
     350             : 
     351           0 : void Node::setHostname( const std::string& host )
     352             : {
     353           0 :     _impl->hostname = host;
     354           0 : }
     355             : 
     356           0 : const std::string& Node::getHostname() const
     357             : {
     358           0 :     return _impl->hostname;
     359             : }
     360             : 
     361           0 : std::string Node::getWorkDir() const
     362             : {
     363             :     char cwd[MAXPATHLEN];
     364           0 :     return getcwd( cwd, MAXPATHLEN );
     365             : }
     366             : 
     367           0 : std::string Node::getLaunchQuote() const
     368             : {
     369             : #ifdef WIN32
     370             :     return "\"";
     371             : #else
     372           0 :     return "\'";
     373             : #endif
     374             : }
     375             : 
     376       71474 : OCommand Node::send( const uint32_t cmd, const bool multicast )
     377             : {
     378       71474 :     ConnectionPtr connection = _getConnection( multicast );
     379       71474 :     LBASSERT( connection );
     380       71474 :     return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
     381             : }
     382             : 
     383           2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
     384             : {
     385           2 :     ConnectionPtr connection = _getConnection( multicast );
     386           2 :     LBASSERT( connection );
     387           2 :     return CustomOCommand( Connections( 1, connection ), commandID );
     388             : }
     389             : 
     390      180828 : const NodeID& Node::getNodeID() const
     391             : {
     392      180828 :     return _impl->id;
     393             : }
     394             : 
     395           0 : int64_t Node::getLastReceiveTime() const
     396             : {
     397           0 :     return _impl->lastReceive;
     398             : }
     399             : 
     400         183 : uint32_t Node::getType() const
     401             : {
     402         183 :     return _impl->type;
     403             : }
     404             : 
     405           0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
     406             : {
     407           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     408           0 :     MCData data;
     409           0 :     data.connection = connection;
     410           0 :     data.node = node;
     411           0 :     _impl->multicasts.push_back( data );
     412           0 : }
     413             : 
     414           0 : void Node::_removeMulticast( ConnectionPtr connection )
     415             : {
     416           0 :     LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
     417             : 
     418           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     419           0 :     if( _impl->outMulticast == connection )
     420           0 :         _impl->outMulticast.data = 0;
     421             :     else
     422             :     {
     423           0 :         for( MCDatas::iterator j = _impl->multicasts.begin();
     424           0 :              j != _impl->multicasts.end(); ++j )
     425             :         {
     426           0 :             if( (*j).connection != connection )
     427           0 :                 continue;
     428             : 
     429           0 :             _impl->multicasts.erase( j );
     430           0 :             return;
     431             :         }
     432           0 :     }
     433             : }
     434             : 
     435          68 : void Node::_connectMulticast( NodePtr node )
     436             : {
     437          68 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     438             : 
     439          68 :     if( node->_impl->outMulticast.data.isValid( ))
     440             :         // multicast already connected by previous _cmdID
     441          68 :         return;
     442             : 
     443             :     // Search if the connected node is in the same multicast group as we are
     444         136 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     445         405 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     446         270 :          i != descriptions.end(); ++i )
     447             :     {
     448          67 :         ConnectionDescriptionPtr description = *i;
     449          67 :         if( description->type < CONNECTIONTYPE_MULTICAST )
     450          67 :             continue;
     451             : 
     452             :         const ConnectionDescriptions& fromDescs =
     453           0 :             node->getConnectionDescriptions();
     454           0 :         for( ConnectionDescriptionsCIter j = fromDescs.begin();
     455           0 :              j != fromDescs.end(); ++j )
     456             :         {
     457           0 :             ConnectionDescriptionPtr fromDescription = *j;
     458           0 :             if( !description->isSameMulticastGroup( fromDescription ))
     459           0 :                 continue;
     460             : 
     461           0 :             LBASSERT( !node->_impl->outMulticast.data );
     462           0 :             LBASSERT( node->_impl->multicasts.empty( ));
     463             : 
     464           0 :             if( _impl->outMulticast->isValid() &&
     465           0 :                 _impl->outMulticast.data->getDescription() == description )
     466             :             {
     467           0 :                 node->_impl->outMulticast.data = _impl->outMulticast.data;
     468           0 :                 LBDEBUG << "Using " << description << " as multicast group for "
     469           0 :                         << node->getNodeID() << std::endl;
     470             :             }
     471             :             // find unused multicast connection to node
     472           0 :             else for( MCDatas::const_iterator k = _impl->multicasts.begin();
     473           0 :                       k != _impl->multicasts.end(); ++k )
     474             :             {
     475           0 :                 const MCData& data = *k;
     476             :                 ConstConnectionDescriptionPtr dataDesc =
     477           0 :                     data.connection->getDescription();
     478           0 :                 if( !description->isSameMulticastGroup( dataDesc ))
     479           0 :                     continue;
     480             : 
     481           0 :                 node->_impl->multicasts.push_back( data );
     482           0 :                 LBDEBUG << "Adding " << dataDesc << " as multicast group for "
     483           0 :                         << node->getNodeID() << std::endl;
     484           0 :             }
     485           0 :         }
     486          68 :     }
     487             : }
     488             : 
     489           0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
     490             : {
     491           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     492           0 :     MCDatas::iterator i = node->_impl->multicasts.begin();
     493           0 :     for( ; i != node->_impl->multicasts.end(); ++i )
     494             :     {
     495           0 :         if( (*i).connection == connection )
     496           0 :             break;
     497             :     }
     498             : 
     499           0 :     if( node->_impl->outMulticast->isValid( ))
     500             :     {
     501           0 :         if( node->_impl->outMulticast.data == connection )
     502             :         {
     503             :             // nop, connection already used
     504           0 :             LBASSERT( i == node->_impl->multicasts.end( ));
     505             :         }
     506           0 :         else if( i == node->_impl->multicasts.end( ))
     507             :         {
     508             :             // another connection is used as multicast connection, save this
     509           0 :             LBASSERT( isListening( ));
     510           0 :             MCData data;
     511           0 :             data.connection = connection;
     512           0 :             data.node = this;
     513           0 :             _impl->multicasts.push_back( data );
     514             :         }
     515             :         // else nop, already know connection
     516             :     }
     517             :     else
     518             :     {
     519           0 :         node->_impl->outMulticast.data = connection;
     520           0 :         if( i != node->_impl->multicasts.end( ))
     521           0 :             node->_impl->multicasts.erase( i );
     522           0 :     }
     523           0 : }
     524             : 
     525          50 : void Node::_setListening()
     526             : {
     527          50 :     _impl->state = STATE_LISTENING;
     528          50 : }
     529             : 
     530          49 : void Node::_setClosing()
     531             : {
     532          49 :     _impl->state = STATE_CLOSING;
     533          49 : }
     534             : 
     535          99 : void Node::_setClosed()
     536             : {
     537          99 :     _impl->state = STATE_CLOSED;
     538          99 : }
     539             : 
     540         118 : void Node::_connect( ConnectionPtr connection )
     541             : {
     542         118 :     _impl->outgoing = connection;
     543         118 :     _impl->state = STATE_CONNECTED;
     544         118 : }
     545             : 
     546         164 : void Node::_disconnect()
     547             : {
     548         164 :     _impl->state = STATE_CLOSED;
     549         164 :     _impl->outgoing = 0;
     550         164 :     _impl->outMulticast.data = 0;
     551         164 :     _impl->multicasts.clear();
     552         164 : }
     553             : 
     554       72077 : void Node::_setLastReceive( const int64_t time )
     555             : {
     556       72077 :     _impl->lastReceive = time;
     557       72077 : }
     558             : 
     559         435 : std::ostream& operator << ( std::ostream& os, const State state )
     560             : {
     561             :     os << ( state == STATE_CLOSED ? "closed" :
     562             :             state == STATE_CONNECTED ? "connected" :
     563         435 :             state == STATE_LISTENING ? "listening" : "ERROR" );
     564         435 :     return os;
     565             : }
     566             : 
     567         435 : std::ostream& operator << ( std::ostream& os, const Node& node )
     568             : {
     569         435 :     os << "node " << node.getNodeID() << " " << node._impl->state;
     570         435 :     const ConnectionDescriptions& descs = node.getConnectionDescriptions();
     571         862 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
     572         427 :         os << ", " << (*i)->toString();
     573         435 :     return os;
     574             : }
     575             : 
     576          66 : }

Generated by: LCOV version 1.11