LCOV - code coverage report
Current view: top level - co - node.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 147 262 56.1 %
Date: 2016-12-14 01:26:48 Functions: 35 50 70.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "node.h"
      22             : 
      23             : #include "connectionDescription.h"
      24             : #include "customOCommand.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : 
      28             : #include <lunchbox/file.h>
      29             : #include <lunchbox/scopedMutex.h>
      30             : #include <lunchbox/spinLock.h>
      31             : 
      32             : #ifndef MAXPATHLEN
      33             : #  define MAXPATHLEN 1024
      34             : #endif
      35             : 
      36             : namespace co
      37             : {
      38             : namespace
      39             : {
      40             : /** The state of the node. */
      41             : enum State
      42             : {
      43             :     STATE_CLOSED,    //!< initial state
      44             :     STATE_CONNECTED, //!< proxy for a remote node, connected
      45             :     STATE_LISTENING, //!< local node, listening
      46             :     STATE_CLOSING    //!< listening, about to close
      47             : };
      48             : 
      49           0 : struct MCData
      50             : {
      51             :     ConnectionPtr connection;
      52             :     NodePtr       node;
      53             : };
      54             : typedef std::vector< MCData > MCDatas;
      55             : }
      56             : 
      57             : namespace detail
      58             : {
      59             : class Node
      60             : {
      61             : public:
      62             :     /** Globally unique node identifier. */
      63             :     NodeID id;
      64             : 
      65             :     const uint32_t type;
      66             : 
      67             :     /** The current state of this node. */
      68             :     State state;
      69             : 
      70             :     /** The connection to this node. */
      71             :     ConnectionPtr outgoing;
      72             : 
      73             :     /** The multicast connection to this node, can be 0. */
      74             :     lunchbox::Lockable< ConnectionPtr > outMulticast;
      75             : 
      76             :     /**
      77             :      * Yet unused multicast connections for this node.
      78             :      *
      79             :      * On the first multicast send usage, the connection is 'primed' by sending
      80             :      * our node identifier to the MC group, removed from this vector and set as
      81             :      * outMulticast.
      82             :      */
      83             :     MCDatas multicasts;
      84             : 
      85             :     /** The host name used to launch the remote node process */
      86             :     std::string hostname;
      87             : 
      88             :     /** The list of descriptions on how this node is reachable. */
      89             :     lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
      90             :         connectionDescriptions;
      91             : 
      92             :     /** Last time commands were received */
      93             :     int64_t lastReceive;
      94             : 
      95             :     /** Is a big endian host? */
      96             :     bool bigEndian;
      97             : 
      98         123 :     explicit Node( const uint32_t type_ )
      99         123 :         : id( lunchbox::make_UUID( ))
     100             :         , type( type_ )
     101             :         , state( STATE_CLOSED )
     102             :         , lastReceive ( 0 )
     103             : #ifdef COLLAGE_BIGENDIAN
     104             :         , bigEndian( true )
     105             : #else
     106         123 :         , bigEndian( false )
     107             : #endif
     108         123 :         {}
     109             : 
     110         121 :     ~Node()
     111         121 :     {
     112         121 :         LBASSERT( !outgoing );
     113         121 :         connectionDescriptions->clear();
     114         121 :     }
     115             : };
     116             : }
     117             : 
     118         123 : Node::Node( const uint32_t type )
     119         123 :         : _impl( new detail::Node( type ))
     120             : {
     121         122 :     LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
     122         122 : }
     123             : 
     124         310 : Node::~Node()
     125             : {
     126         121 :     LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
     127         121 :     delete _impl;
     128         189 : }
     129             : 
     130           0 : bool Node::operator == ( const Node* node ) const
     131             : {
     132           0 :     LBASSERTINFO( _impl->id != node->_impl->id || this == node,
     133             :                   "Two node instances with the same ID found "
     134             :                   << (void*)this << " and " << (void*)node );
     135             : 
     136           0 :     return ( _impl == node->_impl );
     137             : }
     138             : 
     139         638 : ConnectionDescriptions Node::getConnectionDescriptions() const
     140             : {
     141        1276 :     lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     142        1276 :     return _impl->connectionDescriptions.data;
     143             : }
     144             : 
     145         233 : ConnectionPtr Node::getMulticast()
     146             : {
     147         233 :     if( !isReachable( ))
     148           0 :         return 0;
     149             : 
     150         466 :     ConnectionPtr connection = _impl->outMulticast.data;
     151         233 :     if( connection && !connection->isClosed( ))
     152           0 :         return connection;
     153             : 
     154         466 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     155         233 :     if( _impl->multicasts.empty( ))
     156         233 :         return 0;
     157             : 
     158           0 :     MCData data = _impl->multicasts.back();
     159           0 :     _impl->multicasts.pop_back();
     160           0 :     NodePtr node = data.node;
     161             : 
     162             :     // prime multicast connections on peers
     163           0 :     LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
     164           0 :            << data.connection->getDescription() << std::endl;
     165             : 
     166             : #ifdef COLLAGE_BIGENDIAN
     167             :     uint32_t cmd = CMD_NODE_ID_BE;
     168             :     lunchbox::byteswap( cmd );
     169             : #else
     170           0 :     const uint32_t cmd = CMD_NODE_ID;
     171             : #endif
     172           0 :     OCommand( Connections( 1, data.connection ), cmd )
     173           0 :         << node->getNodeID() << getType() << node->serialize();
     174             : 
     175           0 :     _impl->outMulticast.data = data.connection;
     176           0 :     return data.connection;
     177             : }
     178             : 
     179          81 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
     180             : {
     181          81 :     LBASSERTINFO( isClosed(), *this );
     182          81 :     if( !isClosed( ))
     183           0 :         return;
     184          81 :     _addConnectionDescription( cd );
     185             : }
     186             : 
     187          81 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
     188             : {
     189         162 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     190          81 :     _impl->connectionDescriptions->push_back( cd );
     191          81 : }
     192             : 
     193           0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
     194             : {
     195           0 :     LBASSERTINFO( isClosed(), *this );
     196           0 :     if( !isClosed( ))
     197           0 :         return false;
     198           0 :     return _removeConnectionDescription( cd );
     199             : }
     200             : 
     201           0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
     202             : {
     203           0 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     204             : 
     205             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     206           0 :     for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
     207           0 :          i != _impl->connectionDescriptions->end(); ++i )
     208             :     {
     209           0 :         if( *cd != **i )
     210           0 :             continue;
     211             : 
     212           0 :         _impl->connectionDescriptions->erase( i );
     213           0 :         return true;
     214             :     }
     215           0 :     return false;
     216             : }
     217             : 
     218          68 : std::string Node::serialize() const
     219             : {
     220         136 :     std::ostringstream data;
     221          68 :     data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
     222         136 :          << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
     223          68 :          << CO_SEPARATOR;
     224             :     {
     225         136 :         lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     226          68 :         data << co::serialize( _impl->connectionDescriptions.data );
     227             :     }
     228         136 :     return data.str();
     229             : }
     230             : 
     231          68 : bool Node::deserialize( std::string& data )
     232             : {
     233          68 :     LBASSERT( _impl->state == STATE_CLOSED );
     234             : 
     235             :     // version check
     236          68 :     int32_t major = 0;
     237          68 :     size_t nextPos = data.find( CO_SEPARATOR );
     238          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     239             :     {
     240           0 :         LBERROR << "Could not parse node major version data" << std::endl;
     241           0 :         return false;
     242             :     }
     243             : 
     244         136 :     std::istringstream is( data.substr( 0, nextPos ));
     245          68 :     data = data.substr( nextPos + 1 );
     246          68 :     is >> major;
     247             : 
     248          68 :     int32_t minor = 0;
     249          68 :     nextPos = data.find( CO_SEPARATOR );
     250          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     251             :     {
     252           0 :         LBERROR << "Could not parse node minor version data" << std::endl;
     253           0 :         return false;
     254             :     }
     255             : 
     256          68 :     is.clear();
     257          68 :     is.str( data.substr( 0, nextPos ));
     258          68 :     data = data.substr( nextPos + 1 );
     259          68 :     is >> minor;
     260             : 
     261          68 :     if( major != Version::getMajor() || minor != Version::getMinor( ))
     262             :     {
     263           0 :         LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
     264           0 :                << minor << ", local node uses " << Version::getMajor() << '.'
     265           0 :                << Version::getMinor() << std::endl;
     266             :     }
     267             : 
     268             :     // node id
     269          68 :     nextPos = data.find( CO_SEPARATOR );
     270          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     271             :     {
     272           0 :         LBERROR << "Could not parse node id data" << std::endl;
     273           0 :         return false;
     274             :     }
     275             : 
     276          68 :     _impl->id = data.substr( 0, nextPos );
     277          68 :     data = data.substr( nextPos + 1 );
     278             : 
     279             :     // endianness
     280          68 :     nextPos = data.find( CO_SEPARATOR );
     281          68 :     if( nextPos == std::string::npos || nextPos == 0 )
     282             :     {
     283           0 :         LBERROR << "Could not parse node endianness data" << std::endl;
     284           0 :         return false;
     285             :     }
     286             : 
     287          68 :     is.clear();
     288          68 :     is.str( data.substr( 0, nextPos ));
     289          68 :     data = data.substr( nextPos + 1 );
     290          68 :     is >> _impl->bigEndian;
     291             : 
     292             :     // Connections data
     293         136 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     294          68 :     _impl->connectionDescriptions->clear();
     295          68 :     return co::deserialize( data, _impl->connectionDescriptions.data );
     296             : }
     297             : 
     298       72077 : bool Node::isBigEndian() const
     299             : {
     300       72077 :     return _impl->bigEndian;
     301             : }
     302             : 
     303        1343 : bool Node::isReachable() const
     304             : {
     305        1343 :     return isListening() || isConnected();
     306             : }
     307             : 
     308         646 : bool Node::isConnected() const
     309             : {
     310         646 :     return _impl->state == STATE_CONNECTED;
     311             : }
     312             : 
     313      175201 : bool Node::isClosed() const
     314             : {
     315      175201 :     return _impl->state == STATE_CLOSED;
     316             : }
     317             : 
     318          49 : bool Node::isClosing() const
     319             : {
     320          49 :     return _impl->state == STATE_CLOSING;
     321             : }
     322             : 
     323       74328 : bool Node::isListening() const
     324             : {
     325       74328 :     return _impl->state == STATE_LISTENING;
     326             : }
     327             : 
     328         753 : ConnectionPtr Node::getConnection( const bool preferMulticast )
     329             : {
     330        1507 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     331        1508 :     return multicast ? multicast : _impl->outgoing;
     332             : }
     333             : 
     334       71476 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
     335             : {
     336      142952 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     337       71476 :     if( !isClosed( ))
     338       71476 :         return multicast ? multicast : _impl->outgoing;
     339           0 :     LBUNREACHABLE;
     340           0 :     return 0;
     341             : }
     342             : 
     343         115 : ConnectionPtr Node::_getMulticast() const
     344             : {
     345         115 :     return _impl->outMulticast.data;
     346             : }
     347             : 
     348           0 : void Node::setHostname( const std::string& host )
     349             : {
     350           0 :     _impl->hostname = host;
     351           0 : }
     352             : 
     353           0 : const std::string& Node::getHostname() const
     354             : {
     355           0 :     return _impl->hostname;
     356             : }
     357             : 
     358           0 : std::string Node::getWorkDir() const
     359             : {
     360           0 :     return lunchbox::getWorkDir();
     361             : }
     362             : 
     363           0 : std::string Node::getLaunchQuote() const
     364             : {
     365             : #ifdef WIN32
     366             :     return "\"";
     367             : #else
     368           0 :     return "\'";
     369             : #endif
     370             : }
     371             : 
     372       71474 : OCommand Node::send( const uint32_t cmd, const bool multicast )
     373             : {
     374      142948 :     ConnectionPtr connection = _getConnection( multicast );
     375       71474 :     LBASSERT( connection );
     376      142948 :     return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
     377             : }
     378             : 
     379           2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
     380             : {
     381           4 :     ConnectionPtr connection = _getConnection( multicast );
     382           2 :     LBASSERT( connection );
     383           4 :     return CustomOCommand( Connections( 1, connection ), commandID );
     384             : }
     385             : 
     386      198868 : const NodeID& Node::getNodeID() const
     387             : {
     388      198868 :     return _impl->id;
     389             : }
     390             : 
     391           0 : int64_t Node::getLastReceiveTime() const
     392             : {
     393           0 :     return _impl->lastReceive;
     394             : }
     395             : 
     396         183 : uint32_t Node::getType() const
     397             : {
     398         183 :     return _impl->type;
     399             : }
     400             : 
     401           0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
     402             : {
     403           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     404           0 :     MCData data;
     405           0 :     data.connection = connection;
     406           0 :     data.node = node;
     407           0 :     _impl->multicasts.push_back( data );
     408           0 : }
     409             : 
     410           0 : void Node::_removeMulticast( ConnectionPtr connection )
     411             : {
     412           0 :     LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
     413             : 
     414           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     415           0 :     if( _impl->outMulticast == connection )
     416           0 :         _impl->outMulticast.data = 0;
     417             :     else
     418             :     {
     419           0 :         for( MCDatas::iterator j = _impl->multicasts.begin();
     420           0 :              j != _impl->multicasts.end(); ++j )
     421             :         {
     422           0 :             if( (*j).connection != connection )
     423           0 :                 continue;
     424             : 
     425           0 :             _impl->multicasts.erase( j );
     426           0 :             return;
     427             :         }
     428             :     }
     429             : }
     430             : 
     431          68 : void Node::_connectMulticast( NodePtr node )
     432             : {
     433         136 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     434             : 
     435          68 :     if( node->_impl->outMulticast.data.isValid( ))
     436             :         // multicast already connected by previous _cmdID
     437           0 :         return;
     438             : 
     439             :     // Search if the connected node is in the same multicast group as we are
     440         136 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     441         405 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     442         270 :          i != descriptions.end(); ++i )
     443             :     {
     444          67 :         ConnectionDescriptionPtr description = *i;
     445          67 :         if( description->type < CONNECTIONTYPE_MULTICAST )
     446          67 :             continue;
     447             : 
     448             :         const ConnectionDescriptions& fromDescs =
     449           0 :             node->getConnectionDescriptions();
     450           0 :         for( ConnectionDescriptionsCIter j = fromDescs.begin();
     451           0 :              j != fromDescs.end(); ++j )
     452             :         {
     453           0 :             ConnectionDescriptionPtr fromDescription = *j;
     454           0 :             if( !description->isSameMulticastGroup( fromDescription ))
     455           0 :                 continue;
     456             : 
     457           0 :             LBASSERT( !node->_impl->outMulticast.data );
     458           0 :             LBASSERT( node->_impl->multicasts.empty( ));
     459             : 
     460           0 :             if( _impl->outMulticast->isValid() &&
     461           0 :                 _impl->outMulticast.data->getDescription() == description )
     462             :             {
     463           0 :                 node->_impl->outMulticast.data = _impl->outMulticast.data;
     464           0 :                 LBDEBUG << "Using " << description << " as multicast group for "
     465           0 :                         << node->getNodeID() << std::endl;
     466             :             }
     467             :             // find unused multicast connection to node
     468           0 :             else for( MCDatas::const_iterator k = _impl->multicasts.begin();
     469           0 :                       k != _impl->multicasts.end(); ++k )
     470             :             {
     471           0 :                 const MCData& data = *k;
     472             :                 ConstConnectionDescriptionPtr dataDesc =
     473           0 :                     data.connection->getDescription();
     474           0 :                 if( !description->isSameMulticastGroup( dataDesc ))
     475           0 :                     continue;
     476             : 
     477           0 :                 node->_impl->multicasts.push_back( data );
     478           0 :                 LBDEBUG << "Adding " << dataDesc << " as multicast group for "
     479           0 :                         << node->getNodeID() << std::endl;
     480             :             }
     481             :         }
     482             :     }
     483             : }
     484             : 
     485           0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
     486             : {
     487           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     488           0 :     MCDatas::iterator i = node->_impl->multicasts.begin();
     489           0 :     for( ; i != node->_impl->multicasts.end(); ++i )
     490             :     {
     491           0 :         if( (*i).connection == connection )
     492           0 :             break;
     493             :     }
     494             : 
     495           0 :     if( node->_impl->outMulticast->isValid( ))
     496             :     {
     497           0 :         if( node->_impl->outMulticast.data == connection )
     498             :         {
     499             :             // nop, connection already used
     500           0 :             LBASSERT( i == node->_impl->multicasts.end( ));
     501             :         }
     502           0 :         else if( i == node->_impl->multicasts.end( ))
     503             :         {
     504             :             // another connection is used as multicast connection, save this
     505           0 :             LBASSERT( isListening( ));
     506           0 :             MCData data;
     507           0 :             data.connection = connection;
     508           0 :             data.node = this;
     509           0 :             _impl->multicasts.push_back( data );
     510             :         }
     511             :         // else nop, already know connection
     512             :     }
     513             :     else
     514             :     {
     515           0 :         node->_impl->outMulticast.data = connection;
     516           0 :         if( i != node->_impl->multicasts.end( ))
     517           0 :             node->_impl->multicasts.erase( i );
     518             :     }
     519           0 : }
     520             : 
     521          50 : void Node::_setListening()
     522             : {
     523          50 :     _impl->state = STATE_LISTENING;
     524          50 : }
     525             : 
     526          49 : void Node::_setClosing()
     527             : {
     528          49 :     _impl->state = STATE_CLOSING;
     529          49 : }
     530             : 
     531          99 : void Node::_setClosed()
     532             : {
     533          99 :     _impl->state = STATE_CLOSED;
     534          99 : }
     535             : 
     536         118 : void Node::_connect( ConnectionPtr connection )
     537             : {
     538         118 :     _impl->outgoing = connection;
     539         118 :     _impl->state = STATE_CONNECTED;
     540         118 : }
     541             : 
     542         164 : void Node::_disconnect()
     543             : {
     544         164 :     _impl->state = STATE_CLOSED;
     545         164 :     _impl->outgoing = 0;
     546         164 :     _impl->outMulticast.data = 0;
     547         164 :     _impl->multicasts.clear();
     548         164 : }
     549             : 
     550       72077 : void Node::_setLastReceive( const int64_t time )
     551             : {
     552       72077 :     _impl->lastReceive = time;
     553       72077 : }
     554             : 
     555         435 : std::ostream& operator << ( std::ostream& os, const State state )
     556             : {
     557             :     os << ( state == STATE_CLOSED ? "closed" :
     558         290 :             state == STATE_CONNECTED ? "connected" :
     559         725 :             state == STATE_LISTENING ? "listening" : "ERROR" );
     560         434 :     return os;
     561             : }
     562             : 
     563         435 : std::ostream& operator << ( std::ostream& os, const Node& node )
     564             : {
     565         435 :     os << "node " << node.getNodeID() << " " << node._impl->state;
     566         869 :     const ConnectionDescriptions& descs = node.getConnectionDescriptions();
     567         862 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
     568         427 :         os << ", " << (*i)->toString();
     569         870 :     return os;
     570             : }
     571             : 
     572          66 : }

Generated by: LCOV version 1.11