LCOV - code coverage report
Current view: top level - co - node.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 149 255 58.4 %
Date: 2015-11-03 13:48:53 Functions: 35 46 76.1 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "node.h"
      22             : 
      23             : #include "connectionDescription.h"
      24             : #include "customOCommand.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : 
      28             : #include <lunchbox/scopedMutex.h>
      29             : #include <lunchbox/spinLock.h>
      30             : 
      31             : namespace co
      32             : {
      33             : namespace
      34             : {
      35             : /** The state of the node. */
      36             : enum State
      37             : {
      38             :     STATE_CLOSED,    //!< initial state
      39             :     STATE_CONNECTED, //!< proxy for a remote node, connected
      40             :     STATE_LISTENING, //!< local node, listening
      41             :     STATE_CLOSING    //!< listening, about to close
      42             : };
      43             : 
      44           0 : struct MCData
      45             : {
      46             :     ConnectionPtr connection;
      47             :     NodePtr       node;
      48             : };
      49             : typedef std::vector< MCData > MCDatas;
      50             : }
      51             : 
      52             : namespace detail
      53             : {
      54             : class Node
      55             : {
      56             : public:
      57             :     /** Globally unique node identifier. */
      58             :     NodeID id;
      59             : 
      60             :     const uint32_t type;
      61             : 
      62             :     /** The current state of this node. */
      63             :     State state;
      64             : 
      65             :     /** The connection to this node. */
      66             :     ConnectionPtr outgoing;
      67             : 
      68             :     /** The multicast connection to this node, can be 0. */
      69             :     lunchbox::Lockable< ConnectionPtr > outMulticast;
      70             : 
      71             :     /**
      72             :      * Yet unused multicast connections for this node.
      73             :      *
      74             :      * On the first multicast send usage, the connection is 'primed' by sending
      75             :      * our node identifier to the MC group, removed from this vector and set as
      76             :      * outMulticast.
      77             :      */
      78             :     MCDatas multicasts;
      79             : 
      80             :     /** The list of descriptions on how this node is reachable. */
      81             :     lunchbox::Lockable< ConnectionDescriptions, lunchbox::SpinLock >
      82             :         connectionDescriptions;
      83             : 
      84             :     /** Last time commands were received */
      85             :     int64_t lastReceive;
      86             : 
      87             :     /** Is a big endian host? */
      88             :     bool bigEndian;
      89             : 
      90         116 :     explicit Node( const uint32_t type_ )
      91             :         : id( lunchbox::make_UUID( ))
      92             :         , type( type_ )
      93             :         , state( STATE_CLOSED )
      94             :         , lastReceive ( 0 )
      95             : #ifdef COMMON_BIGENDIAN
      96             :         , bigEndian( true )
      97             : #else
      98         116 :         , bigEndian( false )
      99             : #endif
     100         117 :         {}
     101             : 
     102         115 :     ~Node()
     103         115 :     {
     104         115 :         LBASSERT( !outgoing );
     105         115 :         connectionDescriptions->clear();
     106         115 :     }
     107             : };
     108             : }
     109             : 
     110         116 : Node::Node( const uint32_t type )
     111         116 :         : _impl( new detail::Node( type ))
     112             : {
     113         117 :     LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
     114         117 : }
     115             : 
     116         296 : Node::~Node()
     117             : {
     118         115 :     LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
     119         115 :     delete _impl;
     120         181 : }
     121             : 
     122           0 : bool Node::operator == ( const Node* node ) const
     123             : {
     124           0 :     LBASSERTINFO( _impl->id != node->_impl->id || this == node,
     125             :                   "Two node instances with the same ID found "
     126             :                   << (void*)this << " and " << (void*)node );
     127             : 
     128           0 :     return ( _impl == node->_impl );
     129             : }
     130             : 
     131         606 : ConnectionDescriptions Node::getConnectionDescriptions() const
     132             : {
     133         606 :     lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     134         606 :     return _impl->connectionDescriptions.data;
     135             : }
     136             : 
     137         231 : ConnectionPtr Node::getMulticast()
     138             : {
     139         231 :     if( !isReachable( ))
     140           0 :         return 0;
     141             : 
     142         231 :     ConnectionPtr connection = _impl->outMulticast.data;
     143         231 :     if( connection && !connection->isClosed( ))
     144           0 :         return connection;
     145             : 
     146         462 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     147         231 :     if( _impl->multicasts.empty( ))
     148         231 :         return 0;
     149             : 
     150           0 :     MCData data = _impl->multicasts.back();
     151           0 :     _impl->multicasts.pop_back();
     152           0 :     NodePtr node = data.node;
     153             : 
     154             :     // prime multicast connections on peers
     155           0 :     LBINFO << "Announcing id " << node->getNodeID() << " to multicast group "
     156           0 :            << data.connection->getDescription() << std::endl;
     157             : 
     158             : #ifdef COMMON_BIGENDIAN
     159             :     uint32_t cmd = CMD_NODE_ID_BE;
     160             :     lunchbox::byteswap( cmd );
     161             : #else
     162           0 :     const uint32_t cmd = CMD_NODE_ID;
     163             : #endif
     164             :     OCommand( Connections( 1, data.connection ), cmd )
     165           0 :         << node->getNodeID() << getType() << node->serialize();
     166             : 
     167           0 :     _impl->outMulticast.data = data.connection;
     168         231 :     return data.connection;
     169             : }
     170             : 
     171          77 : void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
     172             : {
     173          77 :     LBASSERTINFO( isClosed(), *this );
     174          77 :     if( !isClosed( ))
     175          77 :         return;
     176          77 :     _addConnectionDescription( cd );
     177             : }
     178             : 
     179          77 : void Node::_addConnectionDescription( ConnectionDescriptionPtr cd )
     180             : {
     181          77 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     182          77 :     _impl->connectionDescriptions->push_back( cd );
     183          77 : }
     184             : 
     185           0 : bool Node::removeConnectionDescription( ConnectionDescriptionPtr cd )
     186             : {
     187           0 :     LBASSERTINFO( isClosed(), *this );
     188           0 :     if( !isClosed( ))
     189           0 :         return false;
     190           0 :     return _removeConnectionDescription( cd );
     191             : }
     192             : 
     193           0 : bool Node::_removeConnectionDescription( ConnectionDescriptionPtr cd )
     194             : {
     195           0 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     196             : 
     197             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     198           0 :     for( ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
     199           0 :          i != _impl->connectionDescriptions->end(); ++i )
     200             :     {
     201           0 :         if( *cd != **i )
     202           0 :             continue;
     203             : 
     204           0 :         _impl->connectionDescriptions->erase( i );
     205           0 :         return true;
     206             :     }
     207           0 :     return false;
     208             : }
     209             : 
     210          66 : std::string Node::serialize() const
     211             : {
     212          66 :     std::ostringstream data;
     213          66 :     data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
     214         132 :          << CO_SEPARATOR << _impl->id << CO_SEPARATOR << _impl->bigEndian
     215          66 :          << CO_SEPARATOR;
     216             :     {
     217          66 :         lunchbox::ScopedFastRead mutex( _impl->connectionDescriptions );
     218          66 :         data << co::serialize( _impl->connectionDescriptions.data );
     219             :     }
     220          66 :     return data.str();
     221             : }
     222             : 
     223          66 : bool Node::deserialize( std::string& data )
     224             : {
     225          66 :     LBASSERT( _impl->state == STATE_CLOSED );
     226             : 
     227             :     // version check
     228          66 :     int32_t major = 0;
     229          66 :     size_t nextPos = data.find( CO_SEPARATOR );
     230          66 :     if( nextPos == std::string::npos || nextPos == 0 )
     231             :     {
     232           0 :         LBERROR << "Could not parse node major version data" << std::endl;
     233           0 :         return false;
     234             :     }
     235             : 
     236          66 :     std::istringstream is( data.substr( 0, nextPos ));
     237          66 :     data = data.substr( nextPos + 1 );
     238          66 :     is >> major;
     239             : 
     240          66 :     int32_t minor = 0;
     241          66 :     nextPos = data.find( CO_SEPARATOR );
     242          66 :     if( nextPos == std::string::npos || nextPos == 0 )
     243             :     {
     244           0 :         LBERROR << "Could not parse node minor version data" << std::endl;
     245           0 :         return false;
     246             :     }
     247             : 
     248          66 :     is.clear();
     249          66 :     is.str( data.substr( 0, nextPos ));
     250          66 :     data = data.substr( nextPos + 1 );
     251          66 :     is >> minor;
     252             : 
     253          66 :     if( major != Version::getMajor() || minor != Version::getMinor( ))
     254             :     {
     255           0 :         LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
     256           0 :                << minor << ", local node uses " << Version::getMajor() << '.'
     257           0 :                << Version::getMinor() << std::endl;
     258             :     }
     259             : 
     260             :     // node id
     261          66 :     nextPos = data.find( CO_SEPARATOR );
     262          66 :     if( nextPos == std::string::npos || nextPos == 0 )
     263             :     {
     264           0 :         LBERROR << "Could not parse node id data" << std::endl;
     265           0 :         return false;
     266             :     }
     267             : 
     268          66 :     _impl->id = data.substr( 0, nextPos );
     269          66 :     data = data.substr( nextPos + 1 );
     270             : 
     271             :     // endianness
     272          66 :     nextPos = data.find( CO_SEPARATOR );
     273          66 :     if( nextPos == std::string::npos || nextPos == 0 )
     274             :     {
     275           0 :         LBERROR << "Could not parse node endianness data" << std::endl;
     276           0 :         return false;
     277             :     }
     278             : 
     279          66 :     is.clear();
     280          66 :     is.str( data.substr( 0, nextPos ));
     281          66 :     data = data.substr( nextPos + 1 );
     282          66 :     is >> _impl->bigEndian;
     283             : 
     284             :     // Connections data
     285         132 :     lunchbox::ScopedFastWrite mutex( _impl->connectionDescriptions );
     286          66 :     _impl->connectionDescriptions->clear();
     287         132 :     return co::deserialize( data, _impl->connectionDescriptions.data );
     288             : }
     289             : 
     290       72058 : bool Node::isBigEndian() const
     291             : {
     292       72058 :     return _impl->bigEndian;
     293             : }
     294             : 
     295        1333 : bool Node::isReachable() const
     296             : {
     297        1333 :     return isListening() || isConnected();
     298             : }
     299             : 
     300         636 : bool Node::isConnected() const
     301             : {
     302         636 :     return _impl->state == STATE_CONNECTED;
     303             : }
     304             : 
     305      174727 : bool Node::isClosed() const
     306             : {
     307      174727 :     return _impl->state == STATE_CLOSED;
     308             : }
     309             : 
     310          45 : bool Node::isClosing() const
     311             : {
     312          45 :     return _impl->state == STATE_CLOSING;
     313             : }
     314             : 
     315       74269 : bool Node::isListening() const
     316             : {
     317       74269 :     return _impl->state == STATE_LISTENING;
     318             : }
     319             : 
     320         744 : ConnectionPtr Node::getConnection( const bool preferMulticast )
     321             : {
     322         744 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     323         744 :     return multicast ? multicast : _impl->outgoing;
     324             : }
     325             : 
     326       71459 : ConnectionPtr Node::_getConnection( const bool preferMulticast )
     327             : {
     328       71459 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     329       71459 :     if( !isClosed( ))
     330       71459 :         return multicast ? multicast : _impl->outgoing;
     331           0 :     LBUNREACHABLE;
     332           0 :     return 0;
     333             : }
     334             : 
     335         110 : ConnectionPtr Node::_getMulticast() const
     336             : {
     337         110 :     return _impl->outMulticast.data;
     338             : }
     339             : 
     340       71457 : OCommand Node::send( const uint32_t cmd, const bool multicast )
     341             : {
     342       71457 :     ConnectionPtr connection = _getConnection( multicast );
     343       71457 :     LBASSERT( connection );
     344       71457 :     return OCommand( Connections( 1, connection ), cmd, COMMANDTYPE_NODE );
     345             : }
     346             : 
     347           2 : CustomOCommand Node::send( const uint128_t& commandID, const bool multicast )
     348             : {
     349           2 :     ConnectionPtr connection = _getConnection( multicast );
     350           2 :     LBASSERT( connection );
     351           2 :     return CustomOCommand( Connections( 1, connection ), commandID );
     352             : }
     353             : 
     354      164100 : const NodeID& Node::getNodeID() const
     355             : {
     356      164100 :     return _impl->id;
     357             : }
     358             : 
     359           0 : int64_t Node::getLastReceiveTime() const
     360             : {
     361           0 :     return _impl->lastReceive;
     362             : }
     363             : 
     364         176 : uint32_t Node::getType() const
     365             : {
     366         176 :     return _impl->type;
     367             : }
     368             : 
     369           0 : void Node::_addMulticast( NodePtr node, ConnectionPtr connection )
     370             : {
     371           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     372           0 :     MCData data;
     373           0 :     data.connection = connection;
     374           0 :     data.node = node;
     375           0 :     _impl->multicasts.push_back( data );
     376           0 : }
     377             : 
     378           0 : void Node::_removeMulticast( ConnectionPtr connection )
     379             : {
     380           0 :     LBASSERT( connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST );
     381             : 
     382           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     383           0 :     if( _impl->outMulticast == connection )
     384           0 :         _impl->outMulticast.data = 0;
     385             :     else
     386             :     {
     387           0 :         for( MCDatas::iterator j = _impl->multicasts.begin();
     388           0 :              j != _impl->multicasts.end(); ++j )
     389             :         {
     390           0 :             if( (*j).connection != connection )
     391           0 :                 continue;
     392             : 
     393           0 :             _impl->multicasts.erase( j );
     394           0 :             return;
     395             :         }
     396           0 :     }
     397             : }
     398             : 
     399          66 : void Node::_connectMulticast( NodePtr node )
     400             : {
     401          66 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     402             : 
     403          66 :     if( node->_impl->outMulticast.data.isValid( ))
     404             :         // multicast already connected by previous _cmdID
     405          66 :         return;
     406             : 
     407             :     // Search if the connected node is in the same multicast group as we are
     408         132 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     409         393 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     410         262 :          i != descriptions.end(); ++i )
     411             :     {
     412          65 :         ConnectionDescriptionPtr description = *i;
     413          65 :         if( description->type < CONNECTIONTYPE_MULTICAST )
     414          65 :             continue;
     415             : 
     416             :         const ConnectionDescriptions& fromDescs =
     417           0 :             node->getConnectionDescriptions();
     418           0 :         for( ConnectionDescriptionsCIter j = fromDescs.begin();
     419           0 :              j != fromDescs.end(); ++j )
     420             :         {
     421           0 :             ConnectionDescriptionPtr fromDescription = *j;
     422           0 :             if( !description->isSameMulticastGroup( fromDescription ))
     423           0 :                 continue;
     424             : 
     425           0 :             LBASSERT( !node->_impl->outMulticast.data );
     426           0 :             LBASSERT( node->_impl->multicasts.empty( ));
     427             : 
     428           0 :             if( _impl->outMulticast->isValid() &&
     429           0 :                 _impl->outMulticast.data->getDescription() == description )
     430             :             {
     431           0 :                 node->_impl->outMulticast.data = _impl->outMulticast.data;
     432           0 :                 LBINFO << "Using " << description << " as multicast group for "
     433           0 :                        << node->getNodeID() << std::endl;
     434             :             }
     435             :             // find unused multicast connection to node
     436           0 :             else for( MCDatas::const_iterator k = _impl->multicasts.begin();
     437           0 :                       k != _impl->multicasts.end(); ++k )
     438             :             {
     439           0 :                 const MCData& data = *k;
     440             :                 ConstConnectionDescriptionPtr dataDesc =
     441           0 :                     data.connection->getDescription();
     442           0 :                 if( !description->isSameMulticastGroup( dataDesc ))
     443           0 :                     continue;
     444             : 
     445           0 :                 node->_impl->multicasts.push_back( data );
     446           0 :                 LBINFO << "Adding " << dataDesc << " as multicast group for "
     447           0 :                        << node->getNodeID() << std::endl;
     448           0 :             }
     449           0 :         }
     450          66 :     }
     451             : }
     452             : 
     453           0 : void Node::_connectMulticast( NodePtr node, ConnectionPtr connection )
     454             : {
     455           0 :     lunchbox::ScopedMutex<> mutex( _impl->outMulticast );
     456           0 :     MCDatas::iterator i = node->_impl->multicasts.begin();
     457           0 :     for( ; i != node->_impl->multicasts.end(); ++i )
     458             :     {
     459           0 :         if( (*i).connection == connection )
     460           0 :             break;
     461             :     }
     462             : 
     463           0 :     if( node->_impl->outMulticast->isValid( ))
     464             :     {
     465           0 :         if( node->_impl->outMulticast.data == connection )
     466             :         {
     467             :             // nop, connection already used
     468           0 :             LBASSERT( i == node->_impl->multicasts.end( ));
     469             :         }
     470           0 :         else if( i == node->_impl->multicasts.end( ))
     471             :         {
     472             :             // another connection is used as multicast connection, save this
     473           0 :             LBASSERT( isListening( ));
     474           0 :             MCData data;
     475           0 :             data.connection = connection;
     476           0 :             data.node = this;
     477           0 :             _impl->multicasts.push_back( data );
     478             :         }
     479             :         // else nop, already know connection
     480             :     }
     481             :     else
     482             :     {
     483           0 :         node->_impl->outMulticast.data = connection;
     484           0 :         if( i != node->_impl->multicasts.end( ))
     485           0 :             node->_impl->multicasts.erase( i );
     486           0 :     }
     487           0 : }
     488             : 
     489          46 : void Node::_setListening()
     490             : {
     491          46 :     _impl->state = STATE_LISTENING;
     492          46 : }
     493             : 
     494          45 : void Node::_setClosing()
     495             : {
     496          45 :     _impl->state = STATE_CLOSING;
     497          45 : }
     498             : 
     499          91 : void Node::_setClosed()
     500             : {
     501          91 :     _impl->state = STATE_CLOSED;
     502          91 : }
     503             : 
     504         112 : void Node::_connect( ConnectionPtr connection )
     505             : {
     506         112 :     _impl->outgoing = connection;
     507         112 :     _impl->state = STATE_CONNECTED;
     508         112 : }
     509             : 
     510         155 : void Node::_disconnect()
     511             : {
     512         155 :     _impl->state = STATE_CLOSED;
     513         155 :     _impl->outgoing = 0;
     514         155 :     _impl->outMulticast.data = 0;
     515         155 :     _impl->multicasts.clear();
     516         155 : }
     517             : 
     518       72058 : void Node::_setLastReceive( const int64_t time )
     519             : {
     520       72058 :     _impl->lastReceive = time;
     521       72058 : }
     522             : 
     523         414 : std::ostream& operator << ( std::ostream& os, const State state )
     524             : {
     525             :     os << ( state == STATE_CLOSED ? "closed" :
     526             :             state == STATE_CONNECTED ? "connected" :
     527         414 :             state == STATE_LISTENING ? "listening" : "ERROR" );
     528         414 :     return os;
     529             : }
     530             : 
     531         414 : std::ostream& operator << ( std::ostream& os, const Node& node )
     532             : {
     533         414 :     os << "node " << node.getNodeID() << " " << node._impl->state;
     534         414 :     const ConnectionDescriptions& descs = node.getConnectionDescriptions();
     535         822 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
     536         408 :         os << ", " << (*i)->toString();
     537         414 :     return os;
     538             : }
     539             : 
     540          63 : }

Generated by: LCOV version 1.11