LCOV - code coverage report
Current view: top level - co - node.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 139 253 54.9 %
Date: 2018-01-09 16:37:03 Functions: 34 49 69.4 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "node.h"
      22             : 
      23             : #include "connectionDescription.h"
      24             : #include "customOCommand.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : 
      28             : #include <lunchbox/file.h>
      29             : #include <lunchbox/scopedMutex.h>
      30             : #include <lunchbox/spinLock.h>
      31             : 
      32             : #ifndef MAXPATHLEN
      33             : #define MAXPATHLEN 1024
      34             : #endif
      35             : 
      36             : namespace co
      37             : {
      38             : namespace
      39             : {
      40             : /** The state of the node. */
      41             : enum State
      42             : {
      43             :     STATE_CLOSED,    //!< initial state
      44             :     STATE_CONNECTED, //!< proxy for a remote node, connected
      45             :     STATE_LISTENING, //!< local node, listening
      46             :     STATE_CLOSING    //!< listening, about to close
      47             : };
      48             : 
      49           0 : struct MCData
      50             : {
      51             :     ConnectionPtr connection;
      52             :     NodePtr node;
      53             : };
      54             : typedef std::vector<MCData> MCDatas;
      55             : }
      56             : 
      57             : namespace detail
      58             : {
      59             : class Node
      60             : {
      61             : public:
      62             :     /** Globally unique node identifier. */
      63             :     NodeID id;
      64             : 
      65             :     const uint32_t type;
      66             : 
      67             :     /** The current state of this node. */
      68             :     State state;
      69             : 
      70             :     /** The connection to this node. */
      71             :     ConnectionPtr outgoing;
      72             : 
      73             :     /** The multicast connection to this node, can be 0. */
      74             :     lunchbox::Lockable<ConnectionPtr> outMulticast;
      75             : 
      76             :     /**
      77             :      * Yet unused multicast connections for this node.
      78             :      *
      79             :      * On the first multicast send usage, the connection is 'primed' by sending
      80             :      * our node identifier to the MC group, removed from this vector and set as
      81             :      * outMulticast.
      82             :      */
      83             :     MCDatas multicasts;
      84             : 
      85             :     /** The host name used to launch the remote node process */
      86             :     std::string hostname;
      87             : 
      88             :     /** The list of descriptions on how this node is reachable. */
      89             :     lunchbox::Lockable<ConnectionDescriptions, lunchbox::SpinLock>
      90             :         connectionDescriptions;
      91             : 
      92             :     /** Last time commands were received */
      93             :     int64_t lastReceive;
      94             : 
      95         119 :     explicit Node(const uint32_t type_)
      96         119 :         : id(servus::make_UUID())
      97             :         , type(type_)
      98             :         , state(STATE_CLOSED)
      99         119 :         , lastReceive(0)
     100             :     {
     101         119 :     }
     102             : 
     103         117 :     ~Node()
     104         117 :     {
     105         117 :         LBASSERT(!outgoing);
     106         117 :         connectionDescriptions->clear();
     107         117 :     }
     108             : };
     109             : }
     110             : 
     111         119 : Node::Node(const uint32_t type)
     112         119 :     : _impl(new detail::Node(type))
     113             : {
     114         119 :     LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
     115         119 : }
     116             : 
     117         300 : Node::~Node()
     118             : {
     119         117 :     LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
     120         117 :     delete _impl;
     121         183 : }
     122             : 
     123           0 : bool Node::operator==(const Node* node) const
     124             : {
     125           0 :     LBASSERTINFO(_impl->id != node->_impl->id || this == node,
     126             :                  "Two node instances with the same ID found "
     127             :                      << (void*)this << " and " << (void*)node);
     128             : 
     129           0 :     return (_impl == node->_impl);
     130             : }
     131             : 
     132         616 : ConnectionDescriptions Node::getConnectionDescriptions() const
     133             : {
     134        1232 :     lunchbox::ScopedFastRead mutex(_impl->connectionDescriptions);
     135        1232 :     return _impl->connectionDescriptions.data;
     136             : }
     137             : 
     138         215 : ConnectionPtr Node::getMulticast()
     139             : {
     140         215 :     if (!isReachable())
     141           0 :         return 0;
     142             : 
     143         430 :     ConnectionPtr connection = _impl->outMulticast.data;
     144         215 :     if (connection && !connection->isClosed())
     145           0 :         return connection;
     146             : 
     147         430 :     lunchbox::ScopedWrite mutex(_impl->outMulticast);
     148         215 :     if (_impl->multicasts.empty())
     149         215 :         return 0;
     150             : 
     151           0 :     MCData data = _impl->multicasts.back();
     152           0 :     _impl->multicasts.pop_back();
     153           0 :     NodePtr node = data.node;
     154             : 
     155             :     // prime multicast connections on peers
     156           0 :     LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
     157           0 :             << data.connection->getDescription() << std::endl;
     158             : 
     159           0 :     const uint32_t cmd = CMD_NODE_ID;
     160           0 :     OCommand(Connections(1, data.connection), cmd)
     161           0 :         << node->getNodeID() << getType() << node->serialize();
     162             : 
     163           0 :     _impl->outMulticast.data = data.connection;
     164           0 :     return data.connection;
     165             : }
     166             : 
     167          78 : void Node::addConnectionDescription(ConnectionDescriptionPtr cd)
     168             : {
     169          78 :     LBASSERTINFO(isClosed(), *this);
     170          78 :     if (!isClosed())
     171           0 :         return;
     172          78 :     _addConnectionDescription(cd);
     173             : }
     174             : 
     175          78 : void Node::_addConnectionDescription(ConnectionDescriptionPtr cd)
     176             : {
     177         156 :     lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
     178          78 :     _impl->connectionDescriptions->push_back(cd);
     179          78 : }
     180             : 
     181           0 : bool Node::removeConnectionDescription(ConnectionDescriptionPtr cd)
     182             : {
     183           0 :     LBASSERTINFO(isClosed(), *this);
     184           0 :     if (!isClosed())
     185           0 :         return false;
     186           0 :     return _removeConnectionDescription(cd);
     187             : }
     188             : 
     189           0 : bool Node::_removeConnectionDescription(ConnectionDescriptionPtr cd)
     190             : {
     191           0 :     lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
     192             : 
     193             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     194           0 :     for (ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
     195           0 :          i != _impl->connectionDescriptions->end(); ++i)
     196             :     {
     197           0 :         if (*cd != **i)
     198           0 :             continue;
     199             : 
     200           0 :         _impl->connectionDescriptions->erase(i);
     201           0 :         return true;
     202             :     }
     203           0 :     return false;
     204             : }
     205             : 
     206          66 : std::string Node::serialize() const
     207             : {
     208         132 :     std::ostringstream data;
     209          66 :     data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
     210         132 :          << CO_SEPARATOR << _impl->id << CO_SEPARATOR;
     211             :     {
     212         132 :         lunchbox::ScopedFastRead mutex(_impl->connectionDescriptions);
     213          66 :         data << co::serialize(_impl->connectionDescriptions.data);
     214             :     }
     215         132 :     return data.str();
     216             : }
     217             : 
     218          66 : bool Node::deserialize(std::string& data)
     219             : {
     220          66 :     LBASSERT(_impl->state == STATE_CLOSED);
     221             : 
     222             :     // version check
     223          66 :     int32_t major = 0;
     224          66 :     size_t nextPos = data.find(CO_SEPARATOR);
     225          66 :     if (nextPos == std::string::npos || nextPos == 0)
     226             :     {
     227           0 :         LBERROR << "Could not parse node major version data" << std::endl;
     228           0 :         return false;
     229             :     }
     230             : 
     231         132 :     std::istringstream is(data.substr(0, nextPos));
     232          66 :     data = data.substr(nextPos + 1);
     233          66 :     is >> major;
     234             : 
     235          66 :     int32_t minor = 0;
     236          66 :     nextPos = data.find(CO_SEPARATOR);
     237          66 :     if (nextPos == std::string::npos || nextPos == 0)
     238             :     {
     239           0 :         LBERROR << "Could not parse node minor version data" << std::endl;
     240           0 :         return false;
     241             :     }
     242             : 
     243          66 :     is.clear();
     244          66 :     is.str(data.substr(0, nextPos));
     245          66 :     data = data.substr(nextPos + 1);
     246          66 :     is >> minor;
     247             : 
     248          66 :     if (major != Version::getMajor() || minor != Version::getMinor())
     249             :     {
     250           0 :         LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
     251           0 :                << minor << ", local node uses " << Version::getMajor() << '.'
     252           0 :                << Version::getMinor() << std::endl;
     253             :     }
     254             : 
     255             :     // node id
     256          66 :     nextPos = data.find(CO_SEPARATOR);
     257          66 :     if (nextPos == std::string::npos || nextPos == 0)
     258             :     {
     259           0 :         LBERROR << "Could not parse node id data" << std::endl;
     260           0 :         return false;
     261             :     }
     262             : 
     263          66 :     _impl->id = data.substr(0, nextPos);
     264          66 :     data = data.substr(nextPos + 1);
     265             : 
     266             :     // Connections data
     267         132 :     lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
     268          66 :     _impl->connectionDescriptions->clear();
     269          66 :     return co::deserialize(data, _impl->connectionDescriptions.data);
     270             : }
     271             : 
     272        1288 : bool Node::isReachable() const
     273             : {
     274        1288 :     return isListening() || isConnected();
     275             : }
     276             : 
     277         601 : bool Node::isConnected() const
     278             : {
     279         601 :     return _impl->state == STATE_CONNECTED;
     280             : }
     281             : 
     282      175018 : bool Node::isClosed() const
     283             : {
     284      175018 :     return _impl->state == STATE_CLOSED;
     285             : }
     286             : 
     287          47 : bool Node::isClosing() const
     288             : {
     289          47 :     return _impl->state == STATE_CLOSING;
     290             : }
     291             : 
     292       74165 : bool Node::isListening() const
     293             : {
     294       74165 :     return _impl->state == STATE_LISTENING;
     295             : }
     296             : 
     297         741 : ConnectionPtr Node::getConnection(const bool preferMulticast)
     298             : {
     299        1482 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     300        1482 :     return multicast ? multicast : _impl->outgoing;
     301             : }
     302             : 
     303       71411 : ConnectionPtr Node::_getConnection(const bool preferMulticast)
     304             : {
     305      142822 :     ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
     306       71411 :     if (!isClosed())
     307       71411 :         return multicast ? multicast : _impl->outgoing;
     308           0 :     LBUNREACHABLE;
     309           0 :     return 0;
     310             : }
     311             : 
     312         111 : ConnectionPtr Node::_getMulticast() const
     313             : {
     314         111 :     return _impl->outMulticast.data;
     315             : }
     316             : 
     317           0 : void Node::setHostname(const std::string& host)
     318             : {
     319           0 :     _impl->hostname = host;
     320           0 : }
     321             : 
     322           0 : const std::string& Node::getHostname() const
     323             : {
     324           0 :     return _impl->hostname;
     325             : }
     326             : 
     327           0 : std::string Node::getWorkDir() const
     328             : {
     329           0 :     return lunchbox::getWorkDir();
     330             : }
     331             : 
     332           0 : std::string Node::getLaunchQuote() const
     333             : {
     334             : #ifdef WIN32
     335             :     return "\"";
     336             : #else
     337           0 :     return "\'";
     338             : #endif
     339             : }
     340             : 
     341       71409 : OCommand Node::send(const uint32_t cmd, const bool multicast)
     342             : {
     343      142818 :     ConnectionPtr connection = _getConnection(multicast);
     344       71409 :     LBASSERT(connection);
     345      142818 :     return OCommand(Connections(1, connection), cmd, COMMANDTYPE_NODE);
     346             : }
     347             : 
     348           2 : CustomOCommand Node::send(const uint128_t& commandID, const bool multicast)
     349             : {
     350           4 :     ConnectionPtr connection = _getConnection(multicast);
     351           2 :     LBASSERT(connection);
     352           4 :     return CustomOCommand(Connections(1, connection), commandID);
     353             : }
     354             : 
     355      187124 : const NodeID& Node::getNodeID() const
     356             : {
     357      187124 :     return _impl->id;
     358             : }
     359             : 
     360           0 : int64_t Node::getLastReceiveTime() const
     361             : {
     362           0 :     return _impl->lastReceive;
     363             : }
     364             : 
     365         177 : uint32_t Node::getType() const
     366             : {
     367         177 :     return _impl->type;
     368             : }
     369             : 
     370           0 : void Node::_addMulticast(NodePtr node, ConnectionPtr connection)
     371             : {
     372           0 :     lunchbox::ScopedWrite mutex(_impl->outMulticast);
     373           0 :     MCData data;
     374           0 :     data.connection = connection;
     375           0 :     data.node = node;
     376           0 :     _impl->multicasts.push_back(data);
     377           0 : }
     378             : 
     379           0 : void Node::_removeMulticast(ConnectionPtr connection)
     380             : {
     381           0 :     LBASSERT(connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST);
     382             : 
     383           0 :     lunchbox::ScopedWrite mutex(_impl->outMulticast);
     384           0 :     if (_impl->outMulticast == connection)
     385           0 :         _impl->outMulticast.data = 0;
     386             :     else
     387             :     {
     388           0 :         for (MCDatas::iterator j = _impl->multicasts.begin();
     389           0 :              j != _impl->multicasts.end(); ++j)
     390             :         {
     391           0 :             if ((*j).connection != connection)
     392           0 :                 continue;
     393             : 
     394           0 :             _impl->multicasts.erase(j);
     395           0 :             return;
     396             :         }
     397             :     }
     398             : }
     399             : 
     400          66 : void Node::_connectMulticast(NodePtr node)
     401             : {
     402         132 :     lunchbox::ScopedWrite mutex(_impl->outMulticast);
     403             : 
     404          66 :     if (node->_impl->outMulticast.data.isValid())
     405             :         // multicast already connected by previous _cmdID
     406           0 :         return;
     407             : 
     408             :     // Search if the connected node is in the same multicast group as we are
     409         132 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     410         393 :     for (ConnectionDescriptionsCIter i = descriptions.begin();
     411         262 :          i != descriptions.end(); ++i)
     412             :     {
     413          65 :         ConnectionDescriptionPtr description = *i;
     414          65 :         if (description->type < CONNECTIONTYPE_MULTICAST)
     415          65 :             continue;
     416             : 
     417             :         const ConnectionDescriptions& fromDescs =
     418           0 :             node->getConnectionDescriptions();
     419           0 :         for (ConnectionDescriptionsCIter j = fromDescs.begin();
     420           0 :              j != fromDescs.end(); ++j)
     421             :         {
     422           0 :             ConnectionDescriptionPtr fromDescription = *j;
     423           0 :             if (!description->isSameMulticastGroup(fromDescription))
     424           0 :                 continue;
     425             : 
     426           0 :             LBASSERT(!node->_impl->outMulticast.data);
     427           0 :             LBASSERT(node->_impl->multicasts.empty());
     428             : 
     429           0 :             if (_impl->outMulticast->isValid() &&
     430           0 :                 _impl->outMulticast.data->getDescription() == description)
     431             :             {
     432           0 :                 node->_impl->outMulticast.data = _impl->outMulticast.data;
     433           0 :                 LBDEBUG << "Using " << description << " as multicast group for "
     434           0 :                         << node->getNodeID() << std::endl;
     435             :             }
     436             :             // find unused multicast connection to node
     437             :             else
     438           0 :                 for (MCDatas::const_iterator k = _impl->multicasts.begin();
     439           0 :                      k != _impl->multicasts.end(); ++k)
     440             :                 {
     441           0 :                     const MCData& data = *k;
     442             :                     ConstConnectionDescriptionPtr dataDesc =
     443           0 :                         data.connection->getDescription();
     444           0 :                     if (!description->isSameMulticastGroup(dataDesc))
     445           0 :                         continue;
     446             : 
     447           0 :                     node->_impl->multicasts.push_back(data);
     448           0 :                     LBDEBUG << "Adding " << dataDesc
     449           0 :                             << " as multicast group for " << node->getNodeID()
     450           0 :                             << std::endl;
     451             :                 }
     452             :         }
     453             :     }
     454             : }
     455             : 
     456           0 : void Node::_connectMulticast(NodePtr node, ConnectionPtr connection)
     457             : {
     458           0 :     lunchbox::ScopedWrite mutex(_impl->outMulticast);
     459           0 :     MCDatas::iterator i = node->_impl->multicasts.begin();
     460           0 :     for (; i != node->_impl->multicasts.end(); ++i)
     461             :     {
     462           0 :         if ((*i).connection == connection)
     463           0 :             break;
     464             :     }
     465             : 
     466           0 :     if (node->_impl->outMulticast->isValid())
     467             :     {
     468           0 :         if (node->_impl->outMulticast.data == connection)
     469             :         {
     470             :             // nop, connection already used
     471           0 :             LBASSERT(i == node->_impl->multicasts.end());
     472             :         }
     473           0 :         else if (i == node->_impl->multicasts.end())
     474             :         {
     475             :             // another connection is used as multicast connection, save this
     476           0 :             LBASSERT(isListening());
     477           0 :             MCData data;
     478           0 :             data.connection = connection;
     479           0 :             data.node = this;
     480           0 :             _impl->multicasts.push_back(data);
     481             :         }
     482             :         // else nop, already know connection
     483             :     }
     484             :     else
     485             :     {
     486           0 :         node->_impl->outMulticast.data = connection;
     487           0 :         if (i != node->_impl->multicasts.end())
     488           0 :             node->_impl->multicasts.erase(i);
     489             :     }
     490           0 : }
     491             : 
     492          48 : void Node::_setListening()
     493             : {
     494          48 :     _impl->state = STATE_LISTENING;
     495          48 : }
     496             : 
     497          47 : void Node::_setClosing()
     498             : {
     499          47 :     _impl->state = STATE_CLOSING;
     500          47 : }
     501             : 
     502          95 : void Node::_setClosed()
     503             : {
     504          95 :     _impl->state = STATE_CLOSED;
     505          95 : }
     506             : 
     507         114 : void Node::_connect(ConnectionPtr connection)
     508             : {
     509         114 :     _impl->outgoing = connection;
     510         114 :     _impl->state = STATE_CONNECTED;
     511         114 : }
     512             : 
     513         158 : void Node::_disconnect()
     514             : {
     515         158 :     _impl->state = STATE_CLOSED;
     516         158 :     _impl->outgoing = 0;
     517         158 :     _impl->outMulticast.data = 0;
     518         158 :     _impl->multicasts.clear();
     519         158 : }
     520             : 
     521       72005 : void Node::_setLastReceive(const int64_t time)
     522             : {
     523       72005 :     _impl->lastReceive = time;
     524       72005 : }
     525             : 
     526         420 : std::ostream& operator<<(std::ostream& os, const State state)
     527             : {
     528             :     os << (state == STATE_CLOSED ? "closed" : state == STATE_CONNECTED
     529         279 :                                                   ? "connected"
     530             :                                                   : state == STATE_LISTENING
     531         121 :                                                         ? "listening"
     532         578 :                                                         : "ERROR");
     533         420 :     return os;
     534             : }
     535             : 
     536         420 : std::ostream& operator<<(std::ostream& os, const Node& node)
     537             : {
     538         420 :     os << "node " << node.getNodeID() << " " << node._impl->state;
     539         840 :     const ConnectionDescriptions& descs = node.getConnectionDescriptions();
     540         832 :     for (ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i)
     541         412 :         os << ", " << (*i)->toString();
     542         840 :     return os;
     543             : }
     544          63 : }

Generated by: LCOV version 1.11