LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 592 1174 50.4 %
Date: 2018-01-09 16:37:03 Functions: 75 111 67.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/fork.h>
      46             : #include <lunchbox/futureFunction.h>
      47             : #include <lunchbox/hash.h>
      48             : #include <lunchbox/lockable.h>
      49             : #include <lunchbox/request.h>
      50             : #include <lunchbox/rng.h>
      51             : #include <lunchbox/servus.h>
      52             : #include <lunchbox/sleep.h>
      53             : 
      54             : #include <boost/bind.hpp>
      55             : #include <boost/date_time/posix_time/posix_time.hpp>
      56             : #include <boost/lexical_cast.hpp>
      57             : 
      58             : #include <list>
      59             : #ifdef _MSC_VER
      60             : #include <direct.h> // for chdir
      61             : #define chdir _chdir
      62             : #endif
      63             : 
      64             : namespace co
      65             : {
      66             : namespace
      67             : {
      68          21 : lunchbox::a_int32_t _threadIDs;
      69             : 
      70             : typedef CommandFunc<LocalNode> CmdFunc;
      71             : typedef std::list<ICommand> CommandList;
      72             : typedef lunchbox::RefPtrHash<Connection, NodePtr> ConnectionNodeHash;
      73             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      74             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      75             : typedef std::unordered_map<uint128_t, NodePtr> NodeHash;
      76             : typedef NodeHash::const_iterator NodeHashCIter;
      77             : typedef std::unordered_map<uint128_t, LocalNode::PushHandler> HandlerHash;
      78             : typedef HandlerHash::const_iterator HandlerHashCIter;
      79             : typedef std::pair<LocalNode::CommandHandler, CommandQueue*> CommandPair;
      80             : typedef std::unordered_map<uint128_t, CommandPair> CommandHash;
      81             : typedef CommandHash::const_iterator CommandHashCIter;
      82             : typedef lunchbox::FutureFunction<bool> FuturebImpl;
      83             : }
      84             : 
      85             : namespace detail
      86             : {
      87         102 : class ReceiverThread : public lunchbox::Thread
      88             : {
      89             : public:
      90          53 :     explicit ReceiverThread(co::LocalNode* localNode)
      91          53 :         : _localNode(localNode)
      92             :     {
      93          53 :     }
      94             : 
      95          48 :     bool init() override
      96             :     {
      97          48 :         const int32_t threadID = ++_threadIDs - 1;
      98          96 :         setName(std::string("Rcv") +
      99         144 :                 boost::lexical_cast<std::string>(threadID));
     100          48 :         return _localNode->_startCommandThread(threadID);
     101             :     }
     102             : 
     103          48 :     void run() override { _localNode->_runReceiverThread(); }
     104             : private:
     105             :     co::LocalNode* const _localNode;
     106             : };
     107             : 
     108         102 : class CommandThread : public Worker
     109             : {
     110             : public:
     111          53 :     explicit CommandThread(co::LocalNode* localNode)
     112          53 :         : Worker(Global::getCommandQueueLimit())
     113             :         , threadID(0)
     114          53 :         , _localNode(localNode)
     115             :     {
     116          53 :     }
     117             : 
     118             :     int32_t threadID;
     119             : 
     120             : protected:
     121          48 :     bool init() override
     122             :     {
     123          96 :         setName(std::string("Cmd") +
     124         144 :                 boost::lexical_cast<std::string>(threadID));
     125          48 :         return true;
     126             :     }
     127             : 
     128      103273 :     bool stopRunning() override { return _localNode->isClosed(); }
     129       51502 :     bool notifyIdle() override
     130             :     {
     131       51502 :         return _localNode->_notifyCommandThreadIdle();
     132             :     }
     133             : 
     134             : private:
     135             :     co::LocalNode* const _localNode;
     136             : };
     137             : 
     138             : class LocalNode
     139             : {
     140             : public:
     141          53 :     LocalNode()
     142          53 :         : smallBuffers(200)
     143             :         , bigBuffers(20)
     144             :         , sendToken(true)
     145             :         , lastSendToken(0)
     146             :         , objectStore(0)
     147             :         , receiverThread(0)
     148             :         , commandThread(0)
     149          53 :         , service("_collage._tcp")
     150             :     {
     151          53 :     }
     152             : 
     153          51 :     ~LocalNode()
     154          51 :     {
     155          51 :         LBASSERT(incoming.isEmpty());
     156          51 :         LBASSERT(connectionNodes.empty());
     157          51 :         LBASSERT(pendingCommands.empty());
     158          51 :         LBASSERT(nodes->empty());
     159             : 
     160          51 :         delete objectStore;
     161          51 :         objectStore = 0;
     162          51 :         LBASSERT(!commandThread->isRunning());
     163          51 :         delete commandThread;
     164          51 :         commandThread = 0;
     165             : 
     166          51 :         LBASSERT(!receiverThread->isRunning());
     167          51 :         delete receiverThread;
     168          51 :         receiverThread = 0;
     169          51 :     }
     170             : 
     171         271 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     172             :     /** Commands re-scheduled for dispatch. */
     173             :     CommandList pendingCommands;
     174             : 
     175             :     /** The command buffer 'allocator' for small packets */
     176             :     co::BufferCache smallBuffers;
     177             : 
     178             :     /** The command buffer 'allocator' for big packets */
     179             :     co::BufferCache bigBuffers;
     180             : 
     181             :     bool sendToken;         //!< send token availability.
     182             :     uint64_t lastSendToken; //!< last used time for timeout detection
     183             :     std::deque<co::ICommand> sendTokenQueue; //!< pending requests
     184             : 
     185             :     /** Manager of distributed object */
     186             :     ObjectStore* objectStore;
     187             : 
     188             :     /** Needed for thread-safety during nodeID-based connect() */
     189             :     std::mutex connectLock;
     190             : 
     191             :     /** The node for each connection. */
     192             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     193             : 
     194             :     /** The connected nodes. */
     195             :     lunchbox::Lockable<NodeHash, lunchbox::SpinLock> nodes; // r: all, w: recv
     196             : 
     197             :     /** The connection set of all connections from/to this node. */
     198             :     co::ConnectionSet incoming;
     199             : 
     200             :     /** The process-global clock. */
     201             :     lunchbox::Clock clock;
     202             : 
     203             :     /** The registered push handlers. */
     204             :     lunchbox::Lockable<HandlerHash, std::mutex> pushHandlers;
     205             : 
     206             :     /** The registered custom command handlers. */
     207             :     lunchbox::Lockable<CommandHash, lunchbox::SpinLock> commandHandlers;
     208             : 
     209             :     ReceiverThread* receiverThread;
     210             :     CommandThread* commandThread;
     211             : 
     212             :     lunchbox::Lockable<servus::Servus> service;
     213             : 
     214             :     a_ssize_t counters[co::LocalNode::COUNTER_ALL]; //!< Performance counters
     215             : 
     216             :     Strings args; //!< Command line arguments for remote node launch
     217             : };
     218             : }
     219             : 
     220          53 : LocalNode::LocalNode(const uint32_t type)
     221             :     : Node(type)
     222          53 :     , _impl(new detail::LocalNode)
     223             : {
     224          53 :     _impl->receiverThread = new detail::ReceiverThread(this);
     225          53 :     _impl->commandThread = new detail::CommandThread(this);
     226          53 :     _impl->objectStore = new ObjectStore(this, _impl->counters);
     227             : 
     228          53 :     CommandQueue* queue = getCommandThreadQueue();
     229         106 :     registerCommand(CMD_NODE_CONNECT, CmdFunc(this, &LocalNode::_cmdConnect),
     230          53 :                     0);
     231          53 :     registerCommand(CMD_NODE_CONNECT_REPLY,
     232         106 :                     CmdFunc(this, &LocalNode::_cmdConnectReply), 0);
     233          53 :     registerCommand(CMD_NODE_ID, CmdFunc(this, &LocalNode::_cmdID), 0);
     234          53 :     registerCommand(CMD_NODE_ACK_REQUEST,
     235         106 :                     CmdFunc(this, &LocalNode::_cmdAckRequest), 0);
     236         106 :     registerCommand(CMD_NODE_STOP_RCV, CmdFunc(this, &LocalNode::_cmdStopRcv),
     237          53 :                     0);
     238         106 :     registerCommand(CMD_NODE_STOP_CMD, CmdFunc(this, &LocalNode::_cmdStopCmd),
     239          53 :                     queue);
     240          53 :     registerCommand(CMD_NODE_SET_AFFINITY_RCV,
     241         106 :                     CmdFunc(this, &LocalNode::_cmdSetAffinity), 0);
     242          53 :     registerCommand(CMD_NODE_SET_AFFINITY_CMD,
     243         106 :                     CmdFunc(this, &LocalNode::_cmdSetAffinity), queue);
     244          53 :     registerCommand(CMD_NODE_CONNECT_ACK,
     245         106 :                     CmdFunc(this, &LocalNode::_cmdConnectAck), 0);
     246          53 :     registerCommand(CMD_NODE_DISCONNECT,
     247         106 :                     CmdFunc(this, &LocalNode::_cmdDisconnect), 0);
     248          53 :     registerCommand(CMD_NODE_GET_NODE_DATA,
     249         106 :                     CmdFunc(this, &LocalNode::_cmdGetNodeData), queue);
     250          53 :     registerCommand(CMD_NODE_GET_NODE_DATA_REPLY,
     251         106 :                     CmdFunc(this, &LocalNode::_cmdGetNodeDataReply), 0);
     252          53 :     registerCommand(CMD_NODE_ACQUIRE_SEND_TOKEN,
     253         106 :                     CmdFunc(this, &LocalNode::_cmdAcquireSendToken), queue);
     254          53 :     registerCommand(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     255         106 :                     CmdFunc(this, &LocalNode::_cmdAcquireSendTokenReply), 0);
     256          53 :     registerCommand(CMD_NODE_RELEASE_SEND_TOKEN,
     257         106 :                     CmdFunc(this, &LocalNode::_cmdReleaseSendToken), queue);
     258          53 :     registerCommand(CMD_NODE_ADD_LISTENER,
     259         106 :                     CmdFunc(this, &LocalNode::_cmdAddListener), 0);
     260          53 :     registerCommand(CMD_NODE_REMOVE_LISTENER,
     261         106 :                     CmdFunc(this, &LocalNode::_cmdRemoveListener), 0);
     262          53 :     registerCommand(CMD_NODE_PING, CmdFunc(this, &LocalNode::_cmdPing), queue);
     263         106 :     registerCommand(CMD_NODE_PING_REPLY, CmdFunc(this, &LocalNode::_cmdDiscard),
     264          53 :                     0);
     265         106 :     registerCommand(CMD_NODE_COMMAND, CmdFunc(this, &LocalNode::_cmdCommand),
     266          53 :                     0);
     267          53 :     registerCommand(CMD_NODE_ADD_CONNECTION,
     268         106 :                     CmdFunc(this, &LocalNode::_cmdAddConnection), 0);
     269          53 : }
     270             : 
     271         142 : LocalNode::~LocalNode()
     272             : {
     273          51 :     LBASSERT(!hasPendingRequests());
     274          51 :     delete _impl;
     275          91 : }
     276             : 
     277           1 : bool LocalNode::initLocal(const int argc, char** argv)
     278             : {
     279           2 :     std::string setupOpts;
     280           1 :     _impl->args.clear();
     281             : 
     282             :     // We do not use getopt_long because it really does not work due to the
     283             :     // following aspects:
     284             :     // - reordering of arguments
     285             :     // - different behavior of GNU and BSD implementations
     286             :     // - incomplete man pages
     287           1 :     for (int i = 1; i < argc; ++i)
     288             :     {
     289           0 :         if (std::string("--eq-listen") == argv[i] ||
     290           0 :             std::string("--co-listen") == argv[i])
     291             :         {
     292           0 :             if (std::string("--eq-listen") == argv[i])
     293           0 :                 LBWARN << "Deprecated --eq-listen, use --co-listen"
     294           0 :                        << std::endl;
     295             : 
     296           0 :             if ((i + 1) < argc && argv[i + 1][0] != '-')
     297             :             {
     298           0 :                 std::string data = argv[++i];
     299           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     300           0 :                 desc->port = Global::getDefaultPort();
     301             : 
     302           0 :                 if (desc->fromString(data))
     303             :                 {
     304           0 :                     addConnectionDescription(desc);
     305           0 :                     LBASSERTINFO(data.empty(), data);
     306             :                 }
     307             :                 else
     308           0 :                     LBWARN << "Ignoring listen option: " << argv[i]
     309           0 :                            << std::endl;
     310             :             }
     311             :             else
     312             :             {
     313           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     314             :             }
     315             :         }
     316           0 :         else if (std::string("--co-globals") == argv[i])
     317             :         {
     318           0 :             if ((i + 1) < argc && argv[i + 1][0] != '-')
     319             :             {
     320           0 :                 const std::string data = argv[++i];
     321           0 :                 if (!Global::fromString(data))
     322             :                 {
     323           0 :                     LBWARN << "Invalid global variables string: " << data
     324           0 :                            << ", using default global variables." << std::endl;
     325           0 :                 }
     326             :             }
     327             :             else
     328             :             {
     329           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     330             :             }
     331             :         }
     332           0 :         else if (std::string("--co-launch") == argv[i])
     333             :         {
     334           0 :             if (i == argc - 1 || argv[i + 1][0] == '-')
     335           0 :                 LBTHROW(std::runtime_error("Missing options to --co-launch"));
     336             : 
     337           0 :             setupOpts = argv[++i];
     338           0 :             if (!deserialize(setupOpts))
     339           0 :                 LBTHROW(std::runtime_error("Corrupt --co-launch argument"));
     340           0 :             LBASSERT(!setupOpts.empty());
     341             :         }
     342             :         else
     343           0 :             _impl->args.push_back(argv[i]);
     344             :     }
     345             : 
     346           1 :     if (!listen())
     347             :     {
     348           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast<Node*>(this)
     349           0 :                << std::endl;
     350           0 :         return false;
     351             :     }
     352             : 
     353           1 :     if (!setupOpts.empty())
     354           0 :         return _setupPeer(setupOpts);
     355           1 :     return true;
     356             : }
     357             : 
     358          48 : bool LocalNode::listen()
     359             : {
     360          48 :     LBVERB << "Listener data: " << serialize() << std::endl;
     361          48 :     if (!isClosed() || !_connectSelf())
     362           0 :         return false;
     363             : 
     364          96 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     365         279 :     for (ConnectionDescriptionsCIter i = descriptions.begin();
     366         186 :          i != descriptions.end(); ++i)
     367             :     {
     368          90 :         ConnectionDescriptionPtr description = *i;
     369          90 :         ConnectionPtr connection = Connection::create(description);
     370             : 
     371          45 :         if (!connection || !connection->listen())
     372             :         {
     373           0 :             LBWARN << "Can't create listener connection: " << description
     374           0 :                    << std::endl;
     375           0 :             return false;
     376             :         }
     377             : 
     378          45 :         _impl->connectionNodes[connection] = this;
     379          45 :         if (connection->isMulticast())
     380           0 :             _addMulticast(this, connection);
     381             : 
     382          45 :         connection->acceptNB();
     383          45 :         _impl->incoming.addConnection(connection);
     384             : 
     385          90 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     386         135 :                << std::endl;
     387             :     }
     388             : 
     389          96 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     390         144 :            << std::endl;
     391             : 
     392          48 :     _setListening();
     393          48 :     _impl->receiverThread->start();
     394             : 
     395          48 :     LBDEBUG << *this << std::endl;
     396          48 :     return true;
     397             : }
     398             : 
     399          47 : bool LocalNode::close()
     400             : {
     401          47 :     if (!isListening())
     402           0 :         return false;
     403             : 
     404          47 :     send(CMD_NODE_STOP_RCV);
     405             : 
     406          47 :     LBCHECK(_impl->receiverThread->join());
     407          47 :     _cleanup();
     408             : 
     409         141 :     LBDEBUG << _impl->incoming.getSize() << " connections open after close"
     410         141 :             << std::endl;
     411             : #ifndef NDEBUG
     412          47 :     const Connections& connections = _impl->incoming.getConnections();
     413         141 :     for (Connections::const_iterator i = connections.begin();
     414          94 :          i != connections.end(); ++i)
     415             :     {
     416           0 :         LBDEBUG << "    " << *i << std::endl;
     417             :     }
     418             : #endif
     419             : 
     420          47 :     LBASSERT(!hasPendingRequests());
     421          47 :     return true;
     422             : }
     423             : 
     424           0 : void LocalNode::setAffinity(const int32_t affinity)
     425             : {
     426           0 :     send(CMD_NODE_SET_AFFINITY_RCV) << affinity;
     427           0 :     send(CMD_NODE_SET_AFFINITY_CMD) << affinity;
     428             : 
     429           0 :     lunchbox::Thread::setAffinity(affinity);
     430           0 : }
     431             : 
     432          33 : void LocalNode::addConnection(ConnectionPtr connection)
     433             : {
     434          33 :     connection->ref(); // unref in _cmdAddConnection
     435          33 :     send(CMD_NODE_ADD_CONNECTION) << connection;
     436          33 : }
     437             : 
     438           0 : ConnectionPtr LocalNode::addListener(ConnectionDescriptionPtr desc)
     439             : {
     440           0 :     LBASSERT(isListening());
     441             : 
     442           0 :     ConnectionPtr connection = Connection::create(desc);
     443           0 :     if (connection && connection->listen())
     444             :     {
     445           0 :         addListener(connection);
     446           0 :         return connection;
     447             :     }
     448             : 
     449           0 :     return 0;
     450             : }
     451             : 
     452           0 : void LocalNode::addListener(ConnectionPtr connection)
     453             : {
     454           0 :     LBASSERT(isListening());
     455           0 :     LBASSERT(connection->isListening());
     456           0 :     if (!isListening() || !connection->isListening())
     457           0 :         return;
     458             : 
     459           0 :     connection->ref(this); // unref in self handler
     460             : 
     461             :     // Update everybody's description list of me, add the listener to myself in
     462             :     // my handler
     463           0 :     const Nodes& nodes = getNodes();
     464           0 :     for (NodePtr node : nodes)
     465           0 :         node->send(CMD_NODE_ADD_LISTENER)
     466           0 :             << (uint64_t)(connection.get())
     467           0 :             << connection->getDescription()->toString();
     468             : }
     469             : 
     470           0 : void LocalNode::removeListeners(const Connections& connections)
     471             : {
     472           0 :     std::vector<lunchbox::Request<void> > requests;
     473           0 :     for (ConnectionsCIter i = connections.begin(); i != connections.end(); ++i)
     474             :     {
     475           0 :         ConnectionPtr connection = *i;
     476           0 :         requests.push_back(_removeListener(connection));
     477             :     }
     478           0 : }
     479             : 
     480           0 : lunchbox::Request<void> LocalNode::_removeListener(ConnectionPtr conn)
     481             : {
     482           0 :     LBASSERT(isListening());
     483           0 :     LBASSERTINFO(!conn->isConnected(), conn);
     484             : 
     485           0 :     conn->ref(this);
     486           0 :     const lunchbox::Request<void> request = registerRequest<void>();
     487             : 
     488           0 :     const Nodes& nodes = getNodes();
     489           0 :     for (NodePtr node : nodes)
     490           0 :         node->send(CMD_NODE_REMOVE_LISTENER)
     491           0 :             << request << conn.get() << conn->getDescription()->toString();
     492           0 :     return request;
     493             : }
     494             : 
     495         147 : void LocalNode::_addConnection(ConnectionPtr connection)
     496             : {
     497         147 :     if (_impl->receiverThread->isRunning() && !_impl->inReceiverThread())
     498             :     {
     499          33 :         addConnection(connection);
     500          33 :         return;
     501             :     }
     502             : 
     503         228 :     BufferPtr buffer = _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
     504         114 :     connection->recvNB(buffer, COMMAND_MINSIZE);
     505         114 :     _impl->incoming.addConnection(connection);
     506             : }
     507             : 
     508         217 : void LocalNode::_removeConnection(ConnectionPtr connection)
     509             : {
     510         217 :     LBASSERT(connection);
     511             : 
     512         217 :     _impl->incoming.removeConnection(connection);
     513         217 :     connection->resetRecvData();
     514         217 :     if (!connection->isClosed())
     515         125 :         connection->close(); // cancel pending IO's
     516         217 : }
     517             : 
     518          47 : void LocalNode::_cleanup()
     519             : {
     520          47 :     LBVERB << "Clean up stopped node" << std::endl;
     521          47 :     LBASSERTINFO(isClosed(), *this);
     522             : 
     523          47 :     if (!_impl->connectionNodes.empty())
     524         135 :         LBDEBUG << _impl->connectionNodes.size()
     525         135 :                 << " open connections during cleanup" << std::endl;
     526             : #ifndef NDEBUG
     527         276 :     for (ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     528         184 :          i != _impl->connectionNodes.end(); ++i)
     529             :     {
     530          90 :         NodePtr node = i->second;
     531          45 :         LBDEBUG << "    " << i->first << " : " << node << std::endl;
     532             :     }
     533             : #endif
     534             : 
     535          47 :     _impl->connectionNodes.clear();
     536             : 
     537          47 :     if (!_impl->nodes->empty())
     538           6 :         LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
     539           6 :                 << std::endl;
     540             : 
     541             : #ifndef NDEBUG
     542          49 :     for (NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     543             :     {
     544           4 :         NodePtr node = i->second;
     545           2 :         LBDEBUG << "    " << node << std::endl;
     546             :     }
     547             : #endif
     548             : 
     549          47 :     _impl->nodes->clear();
     550          47 : }
     551             : 
     552         111 : void LocalNode::_closeNode(NodePtr node)
     553             : {
     554         222 :     ConnectionPtr connection = node->getConnection();
     555         222 :     ConnectionPtr mcConnection = node->_getMulticast();
     556             : 
     557         111 :     node->_disconnect();
     558             : 
     559         111 :     if (connection)
     560             :     {
     561          66 :         LBASSERTINFO(_impl->connectionNodes.find(connection) !=
     562             :                          _impl->connectionNodes.end(),
     563             :                      connection);
     564             : 
     565          66 :         _removeConnection(connection);
     566          66 :         _impl->connectionNodes.erase(connection);
     567             :     }
     568             : 
     569         111 :     if (mcConnection)
     570             :     {
     571           0 :         _removeConnection(mcConnection);
     572           0 :         _impl->connectionNodes.erase(mcConnection);
     573             :     }
     574             : 
     575         111 :     _impl->objectStore->removeInstanceData(node->getNodeID());
     576             : 
     577         222 :     lunchbox::ScopedFastWrite mutex(_impl->nodes);
     578         111 :     _impl->nodes->erase(node->getNodeID());
     579         111 :     notifyDisconnect(node);
     580         111 :     LBDEBUG << node << " disconnected from " << *this << std::endl;
     581         111 : }
     582             : 
     583          48 : bool LocalNode::_connectSelf()
     584             : {
     585             :     // setup local connection to myself
     586          96 :     PipeConnectionPtr connection = new PipeConnection;
     587          48 :     if (!connection->connect())
     588             :     {
     589           0 :         LBERROR << "Could not create local connection to receiver thread."
     590           0 :                 << std::endl;
     591           0 :         return false;
     592             :     }
     593             : 
     594          48 :     Node::_connect(connection->getSibling());
     595          48 :     _setClosed(); // reset state after _connect set it to connected
     596             : 
     597             :     // add to connection set
     598          48 :     LBASSERT(connection->getDescription());
     599          48 :     LBASSERT(_impl->connectionNodes.find(connection) ==
     600             :              _impl->connectionNodes.end());
     601             : 
     602          48 :     _impl->connectionNodes[connection] = this;
     603          48 :     _impl->nodes.data[getNodeID()] = this;
     604          48 :     _addConnection(connection);
     605             : 
     606          96 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     607         144 :            << std::endl;
     608          48 :     return true;
     609             : }
     610             : 
     611           7 : bool LocalNode::disconnect(NodePtr node)
     612             : {
     613           7 :     if (!node || !isListening())
     614           0 :         return false;
     615             : 
     616           7 :     if (!node->isConnected())
     617           0 :         return true;
     618             : 
     619           7 :     LBASSERT(!inCommandThread());
     620          14 :     lunchbox::Request<void> request = registerRequest<void>(node.get());
     621           7 :     send(CMD_NODE_DISCONNECT) << request;
     622             : 
     623           7 :     request.wait();
     624           7 :     _impl->objectStore->removeNode(node);
     625           7 :     return true;
     626             : }
     627             : 
     628       20001 : void LocalNode::ackRequest(NodePtr node, const uint32_t requestID)
     629             : {
     630       20001 :     if (requestID == LB_UNDEFINED_UINT32) // no need to ack operation
     631           0 :         return;
     632             : 
     633       20001 :     if (node == this) // OPT
     634           0 :         serveRequest(requestID);
     635             :     else
     636       20001 :         node->send(CMD_NODE_ACK_REQUEST) << requestID;
     637             : }
     638             : 
     639           0 : void LocalNode::ping(NodePtr peer)
     640             : {
     641           0 :     LBASSERT(!_impl->inReceiverThread());
     642           0 :     peer->send(CMD_NODE_PING);
     643           0 : }
     644             : 
     645           0 : bool LocalNode::pingIdleNodes()
     646             : {
     647           0 :     LBASSERT(!_impl->inReceiverThread());
     648           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     649           0 :     const Nodes& nodes = getNodes(false);
     650             : 
     651           0 :     bool pinged = false;
     652           0 :     for (NodePtr node : nodes)
     653             :     {
     654           0 :         if (getTime64() - node->getLastReceiveTime() > timeout)
     655             :         {
     656           0 :             LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
     657           0 :                    << node->getLastReceiveTime() << std::endl;
     658           0 :             node->send(CMD_NODE_PING);
     659           0 :             pinged = true;
     660             :         }
     661             :     }
     662           0 :     return pinged;
     663             : }
     664             : 
     665             : //----------------------------------------------------------------------
     666             : // Object functionality
     667             : //----------------------------------------------------------------------
     668           0 : void LocalNode::disableInstanceCache()
     669             : {
     670           0 :     _impl->objectStore->disableInstanceCache();
     671           0 : }
     672             : 
     673           0 : void LocalNode::expireInstanceData(const int64_t age)
     674             : {
     675           0 :     _impl->objectStore->expireInstanceData(age);
     676           0 : }
     677             : 
     678           0 : void LocalNode::enableSendOnRegister()
     679             : {
     680           0 :     _impl->objectStore->enableSendOnRegister();
     681           0 : }
     682             : 
     683           0 : void LocalNode::disableSendOnRegister()
     684             : {
     685           0 :     _impl->objectStore->disableSendOnRegister();
     686           0 : }
     687             : 
     688          15 : bool LocalNode::registerObject(Object* object)
     689             : {
     690          15 :     return _impl->objectStore->register_(object);
     691             : }
     692             : 
     693          15 : void LocalNode::deregisterObject(Object* object)
     694             : {
     695          15 :     _impl->objectStore->deregister(object);
     696          15 : }
     697             : 
     698          34 : f_bool_t LocalNode::mapObject(Object* object, const uint128_t& id,
     699             :                               NodePtr master, const uint128_t& version)
     700             : {
     701             :     const uint32_t request =
     702          34 :         _impl->objectStore->mapNB(object, id, version, master);
     703             :     const FuturebImpl::Func& func =
     704          68 :         boost::bind(&ObjectStore::mapSync, _impl->objectStore, request);
     705          68 :     return f_bool_t(new FuturebImpl(func));
     706             : }
     707             : 
     708           0 : uint32_t LocalNode::mapObjectNB(Object* object, const uint128_t& id,
     709             :                                 const uint128_t& version)
     710             : {
     711           0 :     return _impl->objectStore->mapNB(object, id, version, 0);
     712             : }
     713             : 
     714           2 : uint32_t LocalNode::mapObjectNB(Object* object, const uint128_t& id,
     715             :                                 const uint128_t& version, NodePtr master)
     716             : {
     717           2 :     return _impl->objectStore->mapNB(object, id, version, master);
     718             : }
     719             : 
     720           2 : bool LocalNode::mapObjectSync(const uint32_t requestID)
     721             : {
     722           2 :     return _impl->objectStore->mapSync(requestID);
     723             : }
     724             : 
     725           4 : f_bool_t LocalNode::syncObject(Object* object, const uint128_t& id,
     726             :                                NodePtr master, const uint32_t instanceID)
     727             : {
     728           4 :     return _impl->objectStore->sync(object, id, master, instanceID);
     729             : }
     730             : 
     731          36 : void LocalNode::unmapObject(Object* object)
     732             : {
     733          36 :     _impl->objectStore->unmap(object);
     734          36 : }
     735             : 
     736           0 : void LocalNode::swapObject(Object* oldObject, Object* newObject)
     737             : {
     738           0 :     _impl->objectStore->swap(oldObject, newObject);
     739           0 : }
     740             : 
     741           0 : void LocalNode::objectPush(const uint128_t& groupID,
     742             :                            const uint128_t& objectType,
     743             :                            const uint128_t& objectID, DataIStream& istream)
     744             : {
     745           0 :     lunchbox::ScopedRead mutex(_impl->pushHandlers);
     746           0 :     HandlerHashCIter i = _impl->pushHandlers->find(groupID);
     747           0 :     if (i != _impl->pushHandlers->end())
     748           0 :         i->second(groupID, objectType, objectID, istream);
     749             :     else
     750           0 :         LBWARN << "No custom handler for push group " << groupID
     751           0 :                << " registered" << std::endl;
     752             : 
     753           0 :     if (istream.wasUsed() && istream.hasData())
     754           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     755           0 :                << objectType << " object " << objectID << std::endl;
     756           0 : }
     757             : 
     758           0 : void LocalNode::registerPushHandler(const uint128_t& groupID,
     759             :                                     const PushHandler& handler)
     760             : {
     761           0 :     lunchbox::ScopedWrite mutex(_impl->pushHandlers);
     762           0 :     (*_impl->pushHandlers)[groupID] = handler;
     763           0 : }
     764             : 
     765           2 : bool LocalNode::registerCommandHandler(const uint128_t& command,
     766             :                                        const CommandHandler& func,
     767             :                                        CommandQueue* queue)
     768             : {
     769           4 :     lunchbox::ScopedFastWrite mutex(_impl->commandHandlers);
     770           2 :     if (_impl->commandHandlers->find(command) != _impl->commandHandlers->end())
     771             :     {
     772           0 :         LBWARN << "Already got a registered handler for custom command "
     773           0 :                << command << std::endl;
     774           0 :         return false;
     775             :     }
     776             : 
     777           2 :     _impl->commandHandlers->insert(
     778           4 :         std::make_pair(command, std::make_pair(func, queue)));
     779           2 :     return true;
     780             : }
     781             : 
     782           0 : LocalNode::SendToken LocalNode::acquireSendToken(NodePtr node)
     783             : {
     784           0 :     LBASSERT(!inCommandThread());
     785           0 :     LBASSERT(!_impl->inReceiverThread());
     786             : 
     787           0 :     lunchbox::Request<void> request = registerRequest<void>();
     788           0 :     node->send(CMD_NODE_ACQUIRE_SEND_TOKEN) << request;
     789             : 
     790             :     try
     791             :     {
     792           0 :         request.wait(Global::getTimeout());
     793             :     }
     794           0 :     catch (const lunchbox::FutureTimeout&)
     795             :     {
     796           0 :         LBERROR << "Timeout while acquiring send token " << request.getID()
     797           0 :                 << std::endl;
     798           0 :         request.unregister();
     799           0 :         return 0;
     800             :     }
     801           0 :     return new co::SendToken(node);
     802             : }
     803             : 
     804           0 : void LocalNode::releaseSendToken(SendToken token)
     805             : {
     806           0 :     LBASSERT(!_impl->inReceiverThread());
     807           0 :     if (token)
     808           0 :         token->release();
     809           0 : }
     810             : 
     811             : //----------------------------------------------------------------------
     812             : // Connecting a node
     813             : //----------------------------------------------------------------------
     814             : namespace
     815             : {
     816             : enum ConnectResult
     817             : {
     818             :     CONNECT_OK,
     819             :     CONNECT_TRY_AGAIN,
     820             :     CONNECT_BAD_STATE,
     821             :     CONNECT_TIMEOUT,
     822             :     CONNECT_UNREACHABLE
     823             : };
     824             : }
     825             : 
     826          68 : NodePtr LocalNode::connect(const NodeID& nodeID)
     827             : {
     828          68 :     LBASSERT(nodeID != 0);
     829          68 :     LBASSERT(isListening());
     830             : 
     831             :     // Make sure that only one connection request based on the node identifier
     832             :     // is pending at a given time. Otherwise a node with the same id might be
     833             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     834             :     // mutex is to register connecting nodes with this local node, and handle
     835             :     // all cases correctly, which is far more complex. Node connections only
     836             :     // happen a lot during initialization, and are therefore not time-critical.
     837         136 :     lunchbox::ScopedWrite mutex(_impl->connectLock);
     838             : 
     839         136 :     Nodes nodes = getNodes();
     840          71 :     for (NodePtr peer : nodes)
     841             :     {
     842          71 :         if (peer->getNodeID() == nodeID && peer->isReachable()) // early out
     843          68 :             return peer;
     844             :     }
     845             : 
     846           0 :     LBDEBUG << "Connecting node " << nodeID << std::endl;
     847           0 :     for (NodePtr peer : nodes)
     848             :     {
     849           0 :         NodePtr node = _connect(nodeID, peer);
     850           0 :         if (node)
     851           0 :             return node;
     852             :     }
     853             :     {
     854           0 :         NodePtr node = _connectFromZeroconf(nodeID);
     855           0 :         if (node)
     856           0 :             return node;
     857             :     }
     858             :     // check again if node connected by itself by now
     859           0 :     nodes = getNodes();
     860           0 :     for (NodePtr node : nodes)
     861           0 :         if (node->getNodeID() == nodeID && node->isReachable())
     862           0 :             return node;
     863             : 
     864           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     865           0 :     return 0;
     866             : }
     867             : 
     868           0 : NodePtr LocalNode::_connect(const NodeID& nodeID, NodePtr peer)
     869             : {
     870           0 :     LBASSERT(nodeID != 0);
     871             : 
     872           0 :     NodePtr node;
     873             :     {
     874           0 :         lunchbox::ScopedFastRead mutexNodes(_impl->nodes);
     875           0 :         NodeHash::const_iterator i = _impl->nodes->find(nodeID);
     876           0 :         if (i != _impl->nodes->end())
     877           0 :             node = i->second;
     878             :     }
     879             : 
     880           0 :     LBASSERT(getNodeID() != nodeID);
     881           0 :     if (!node)
     882             :     {
     883           0 :         lunchbox::Request<void*> request = registerRequest<void*>();
     884           0 :         peer->send(CMD_NODE_GET_NODE_DATA) << nodeID << request;
     885           0 :         node = reinterpret_cast<Node*>(request.wait());
     886           0 :         if (!node)
     887             :         {
     888           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     889           0 :                    << std::endl;
     890           0 :             return 0;
     891             :         }
     892           0 :         node->unref(this); // ref'd before serveRequest()
     893             :     }
     894             : 
     895           0 :     if (node->isReachable())
     896           0 :         return node;
     897             : 
     898           0 :     size_t tries = 10;
     899           0 :     while (--tries)
     900             :     {
     901           0 :         switch (_connect(node))
     902             :         {
     903             :         case CONNECT_OK:
     904           0 :             return node;
     905             :         case CONNECT_TRY_AGAIN:
     906             :         {
     907           0 :             lunchbox::RNG rng;
     908             :             // collision avoidance
     909           0 :             lunchbox::sleep(rng.get<uint8_t>());
     910           0 :             break;
     911             :         }
     912             :         case CONNECT_BAD_STATE:
     913           0 :             LBWARN << "Internal connect error" << std::endl;
     914             :         // no break;
     915             :         case CONNECT_TIMEOUT:
     916           0 :             return 0;
     917             : 
     918             :         case CONNECT_UNREACHABLE:
     919           0 :             break; // maybe peer talks to us
     920             :         }
     921             : 
     922           0 :         lunchbox::ScopedFastRead mutexNodes(_impl->nodes);
     923             :         // connect failed - check for simultaneous connect from peer
     924           0 :         NodeHash::const_iterator i = _impl->nodes->find(nodeID);
     925           0 :         if (i != _impl->nodes->end())
     926           0 :             node = i->second;
     927             :     }
     928             : 
     929           0 :     return node->isReachable() ? node : 0;
     930             : }
     931             : 
     932           0 : NodePtr LocalNode::_connectFromZeroconf(const NodeID& nodeID)
     933             : {
     934           0 :     NodePtr node;
     935             :     {
     936           0 :         lunchbox::ScopedWrite mutex(_impl->service);
     937             :         const Strings& instances =
     938           0 :             _impl->service->discover(servus::Servus::IF_ALL, 500);
     939             : 
     940           0 :         for (const auto& instance : instances)
     941             :         {
     942           0 :             const NodeID candidate(instance);
     943           0 :             if (candidate != nodeID)
     944           0 :                 continue;
     945             : 
     946             :             const std::string& typeStr =
     947           0 :                 _impl->service->get(instance, "co_type");
     948           0 :             if (typeStr.empty())
     949           0 :                 return 0;
     950             : 
     951           0 :             std::istringstream in(typeStr);
     952           0 :             uint32_t type = 0;
     953           0 :             in >> type;
     954             : 
     955           0 :             node = createNode(type);
     956           0 :             if (!node)
     957             :             {
     958           0 :                 LBINFO << "Can't create node of type " << type << std::endl;
     959           0 :                 continue;
     960             :             }
     961             : 
     962             :             const std::string& numStr =
     963           0 :                 _impl->service->get(instance, "co_numPorts");
     964           0 :             uint32_t num = 0;
     965             : 
     966           0 :             in.clear();
     967           0 :             in.str(numStr);
     968           0 :             in >> num;
     969           0 :             LBASSERT(num > 0);
     970           0 :             for (size_t j = 0; j < num; ++j)
     971             :             {
     972           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     973           0 :                 std::ostringstream out;
     974           0 :                 out << "co_port" << j;
     975             : 
     976           0 :                 std::string descStr = _impl->service->get(instance, out.str());
     977           0 :                 LBASSERT(!descStr.empty());
     978           0 :                 LBCHECK(desc->fromString(descStr));
     979           0 :                 LBASSERT(descStr.empty());
     980           0 :                 node->addConnectionDescription(desc);
     981             :             }
     982           0 :             break;
     983             :         }
     984             :     }
     985             : 
     986           0 :     if (node && _connect(node))
     987           0 :         return node;
     988           0 :     return nullptr;
     989             : }
     990             : 
     991          33 : bool LocalNode::connect(NodePtr node)
     992             : {
     993          66 :     lunchbox::ScopedWrite mutex(_impl->connectLock);
     994          66 :     return (_connect(node) == CONNECT_OK);
     995             : }
     996             : 
     997          33 : uint32_t LocalNode::_connect(NodePtr node)
     998             : {
     999          33 :     LBASSERTINFO(isListening(), *this);
    1000          33 :     if (node->isReachable())
    1001           0 :         return CONNECT_OK;
    1002             : 
    1003          33 :     LBASSERT(node->isClosed());
    1004          33 :     LBDEBUG << "Connecting " << node << std::endl;
    1005             : 
    1006             :     // try connecting using the given descriptions
    1007          66 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
    1008          33 :     for (ConnectionDescriptionsCIter i = cds.begin(); i != cds.end(); ++i)
    1009             :     {
    1010          33 :         ConnectionDescriptionPtr description = *i;
    1011          33 :         if (description->type >= CONNECTIONTYPE_MULTICAST)
    1012           0 :             continue; // Don't use multicast for primary connections
    1013             : 
    1014          33 :         ConnectionPtr connection = Connection::create(description);
    1015          33 :         if (!connection || !connection->connect())
    1016           0 :             continue;
    1017             : 
    1018          33 :         return _connect(node, connection);
    1019             :     }
    1020             : 
    1021           0 :     LBDEBUG << "Node " << node
    1022           0 :             << " unreachable, all connections failed to connect" << std::endl;
    1023           0 :     return CONNECT_UNREACHABLE;
    1024             : }
    1025             : 
    1026           0 : bool LocalNode::connect(NodePtr node, ConnectionPtr connection)
    1027             : {
    1028           0 :     return (_connect(node, connection) == CONNECT_OK);
    1029             : }
    1030             : 
    1031          33 : uint32_t LocalNode::_connect(NodePtr node, ConnectionPtr connection)
    1032             : {
    1033          33 :     LBASSERT(connection);
    1034          33 :     LBASSERT(node->getNodeID() != getNodeID());
    1035             : 
    1036          66 :     if (!node || !isListening() || !connection->isConnected() ||
    1037          33 :         !node->isClosed())
    1038             :     {
    1039           0 :         return CONNECT_BAD_STATE;
    1040             :     }
    1041             : 
    1042          33 :     _addConnection(connection);
    1043             : 
    1044             :     // send connect command to peer
    1045          66 :     lunchbox::Request<bool> request = registerRequest<bool>(node.get());
    1046          33 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1047          66 :     OCommand(Connections(1, connection), cmd) << getNodeID() << request
    1048          99 :                                               << getType() << serialize();
    1049             : 
    1050          33 :     bool connected = false;
    1051             :     try
    1052             :     {
    1053          33 :         connected = request.wait(10000 /*ms*/);
    1054             :     }
    1055           0 :     catch (const lunchbox::FutureTimeout&)
    1056             :     {
    1057           0 :         LBWARN << "Node connection handshake timeout - " << node
    1058           0 :                << " not a Collage node?" << std::endl;
    1059           0 :         request.unregister();
    1060           0 :         return CONNECT_TIMEOUT;
    1061             :     }
    1062             : 
    1063             :     // In simultaneous connect case, depending on the connection type
    1064             :     // (e.g. RDMA), a check on the connection state of the node is required
    1065          33 :     if (!connected || !node->isConnected())
    1066           0 :         return CONNECT_TRY_AGAIN;
    1067             : 
    1068          33 :     LBASSERT(node->getNodeID() != 0);
    1069          33 :     LBASSERTINFO(node->getNodeID() != getNodeID(), getNodeID());
    1070          33 :     LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
    1071          33 :     return CONNECT_OK;
    1072             : }
    1073             : 
    1074          34 : NodePtr LocalNode::connectObjectMaster(const uint128_t& id)
    1075             : {
    1076          34 :     LBASSERTINFO(id.isUUID(), id);
    1077          34 :     if (!id.isUUID())
    1078             :     {
    1079           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1080           0 :         return 0;
    1081             :     }
    1082             : 
    1083          34 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID(id);
    1084          34 :     if (masterNodeID == 0)
    1085             :     {
    1086           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1087           0 :         return 0;
    1088             :     }
    1089             : 
    1090          68 :     NodePtr master = connect(masterNodeID);
    1091          34 :     if (master && !master->isClosed())
    1092          34 :         return master;
    1093             : 
    1094           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1095           0 :            << " for object " << id << std::endl;
    1096           0 :     return 0;
    1097             : }
    1098             : 
    1099           0 : bool LocalNode::launch(NodePtr node, const std::string& command)
    1100             : {
    1101           0 :     size_t lastPos = 0;
    1102           0 :     std::string cmd;
    1103             : 
    1104           0 :     const std::string& quote = node->getLaunchQuote();
    1105             :     std::string launchOptions =
    1106           0 :         std::string(" --co-launch ") + quote + node->serialize() +
    1107           0 :         node->getWorkDir() + CO_SEPARATOR + std::to_string(getType()) +
    1108           0 :         CO_SEPARATOR + serialize() + quote + " --co-globals " + quote +
    1109           0 :         co::Global::toString() + quote;
    1110             : 
    1111           0 :     for (size_t percentPos = command.find('%'); percentPos != std::string::npos;
    1112           0 :          percentPos = command.find('%', percentPos + 1))
    1113             :     {
    1114           0 :         std::string replacement;
    1115           0 :         switch (command[percentPos + 1])
    1116             :         {
    1117             :         case 'h':
    1118           0 :             replacement = node->getHostname();
    1119           0 :             if (replacement.empty())
    1120           0 :                 replacement = "127.0.0.1";
    1121           0 :             break;
    1122             : 
    1123             :         case 'n':
    1124           0 :             replacement = node->getNodeID().getString();
    1125           0 :             break;
    1126             : 
    1127             :         case 'd':
    1128           0 :             replacement = node->getWorkDir();
    1129           0 :             break;
    1130             : 
    1131             :         case 'q':
    1132           0 :             replacement = quote;
    1133           0 :             break;
    1134             : 
    1135             :         case 'o':
    1136           0 :             replacement = launchOptions;
    1137           0 :             launchOptions.clear();
    1138           0 :             break;
    1139             : 
    1140             :         default:
    1141           0 :             LBWARN << "Unknown token " << command[percentPos + 1] << std::endl;
    1142           0 :             replacement = command.substr(percentPos, percentPos + 1);
    1143             :         }
    1144             : 
    1145           0 :         cmd += command.substr(lastPos, percentPos - lastPos) + replacement;
    1146           0 :         lastPos = percentPos + 2;
    1147             :     }
    1148             : 
    1149           0 :     cmd += command.substr(lastPos) + launchOptions;
    1150             : 
    1151           0 :     LBDEBUG << "Launching: " << cmd << std::endl;
    1152           0 :     if (lunchbox::fork(cmd))
    1153           0 :         return true;
    1154             : 
    1155           0 :     LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
    1156           0 :     return false;
    1157             : }
    1158             : 
    1159           0 : NodePtr LocalNode::syncLaunch(const uint128_t& nodeID, const int64_t timeout)
    1160             : {
    1161           0 :     const lunchbox::Clock clock;
    1162             : 
    1163           0 :     do
    1164             :     {
    1165           0 :         co::NodePtr node = getNode(nodeID);
    1166           0 :         if (node && node->isConnected())
    1167           0 :             return node;
    1168             : 
    1169           0 :         lunchbox::sleep(100 /*ms*/);
    1170           0 :     } while (timeout == static_cast<int32_t>(LB_TIMEOUT_INDEFINITE) ||
    1171           0 :              clock.getTime64() < timeout);
    1172             : 
    1173           0 :     return nullptr;
    1174             : }
    1175             : 
    1176           0 : bool LocalNode::_setupPeer(const std::string& setupOpts)
    1177             : {
    1178           0 :     size_t nextPos = setupOpts.find(CO_SEPARATOR);
    1179           0 :     if (nextPos == std::string::npos)
    1180             :     {
    1181           0 :         LBERROR << "Could not parse working directory: " << setupOpts
    1182           0 :                 << std::endl;
    1183           0 :         return false;
    1184             :     }
    1185             : 
    1186           0 :     const std::string workDir = setupOpts.substr(0, nextPos);
    1187           0 :     std::string description = setupOpts.substr(nextPos + 1);
    1188             : 
    1189           0 :     if (!workDir.empty() && chdir(workDir.c_str()) == -1)
    1190           0 :         LBWARN << "Can't change working directory to " << workDir << ": "
    1191           0 :                << lunchbox::sysError << std::endl;
    1192             : 
    1193           0 :     nextPos = description.find(CO_SEPARATOR);
    1194           0 :     if (nextPos == std::string::npos)
    1195             :     {
    1196           0 :         LBERROR << "Could not parse server node type: " << description
    1197           0 :                 << " is left from " << setupOpts << std::endl;
    1198           0 :         return false;
    1199             :     }
    1200             : 
    1201           0 :     const std::string nodeType = description.substr(0, nextPos);
    1202           0 :     description = description.substr(nextPos + 1);
    1203             : 
    1204           0 :     co::NodePtr peer = createNode(std::stoi(nodeType));
    1205           0 :     if (!peer->deserialize(description))
    1206           0 :         LBWARN << "Can't parse peer data" << std::endl;
    1207             : 
    1208           0 :     LBASSERTINFO(description.empty(), description);
    1209           0 :     if (!connect(peer))
    1210             :     {
    1211           0 :         LBERROR << "Can't connect peer node using " << *peer << std::endl;
    1212           0 :         return false;
    1213             :     }
    1214           0 :     return true;
    1215             : }
    1216             : 
    1217          33 : NodePtr LocalNode::createNode(const uint32_t type)
    1218             : {
    1219          33 :     LBASSERTINFO(type == NODETYPE_NODE, type);
    1220          33 :     return new Node(type);
    1221             : }
    1222             : 
    1223           0 : NodePtr LocalNode::getNode(const NodeID& id) const
    1224             : {
    1225           0 :     lunchbox::ScopedFastRead mutex(_impl->nodes);
    1226           0 :     NodeHash::const_iterator i = _impl->nodes->find(id);
    1227           0 :     if (i == _impl->nodes->end() || !i->second->isReachable())
    1228           0 :         return 0;
    1229           0 :     return i->second;
    1230             : }
    1231             : 
    1232         102 : Nodes LocalNode::getNodes(const bool addSelf) const
    1233             : {
    1234         102 :     Nodes nodes;
    1235         204 :     lunchbox::ScopedFastRead mutex(_impl->nodes);
    1236         298 :     for (NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1237             :     {
    1238         392 :         NodePtr node = i->second;
    1239         196 :         if (node->isReachable() && (addSelf || node != this))
    1240         196 :             nodes.push_back(i->second);
    1241             :     }
    1242         204 :     return nodes;
    1243             : }
    1244             : 
    1245         142 : CommandQueue* LocalNode::getCommandThreadQueue()
    1246             : {
    1247         142 :     return _impl->commandThread->getWorkerQueue();
    1248             : }
    1249             : 
    1250           7 : bool LocalNode::inCommandThread() const
    1251             : {
    1252           7 :     return _impl->commandThread->isCurrent();
    1253             : }
    1254             : 
    1255           0 : const Strings& LocalNode::getCommandLine() const
    1256             : {
    1257           0 :     return _impl->args;
    1258             : }
    1259             : 
    1260       72357 : int64_t LocalNode::getTime64() const
    1261             : {
    1262       72357 :     return _impl->clock.getTime64();
    1263             : }
    1264             : 
    1265           0 : ssize_t LocalNode::getCounter(const Counter counter) const
    1266             : {
    1267           0 :     return _impl->counters[counter];
    1268             : }
    1269             : 
    1270         184 : void LocalNode::flushCommands()
    1271             : {
    1272         184 :     _impl->incoming.interrupt();
    1273         184 : }
    1274             : 
    1275             : //----------------------------------------------------------------------
    1276             : // receiver thread functions
    1277             : //----------------------------------------------------------------------
    1278          48 : void LocalNode::_runReceiverThread()
    1279             : {
    1280          48 :     LB_TS_THREAD(_rcvThread);
    1281          48 :     _initService();
    1282             : 
    1283          48 :     int nErrors = 0;
    1284      144966 :     while (isListening())
    1285             :     {
    1286       72460 :         const ConnectionSet::Event result = _impl->incoming.select();
    1287       72459 :         switch (result)
    1288             :         {
    1289             :         case ConnectionSet::EVENT_CONNECT:
    1290          33 :             _handleConnect();
    1291          33 :             break;
    1292             : 
    1293             :         case ConnectionSet::EVENT_DATA:
    1294       72104 :             nErrors = 0;
    1295       72104 :             _handleData();
    1296       72104 :             break;
    1297             : 
    1298             :         case ConnectionSet::EVENT_DISCONNECT:
    1299             :         case ConnectionSet::EVENT_INVALID_HANDLE:
    1300          33 :             _handleDisconnect();
    1301          33 :             break;
    1302             : 
    1303             :         case ConnectionSet::EVENT_TIMEOUT:
    1304           0 :             LBINFO << "select timeout" << std::endl;
    1305           0 :             break;
    1306             : 
    1307             :         case ConnectionSet::EVENT_ERROR:
    1308           0 :             ++nErrors;
    1309           0 :             LBWARN << "Connection error during select" << std::endl;
    1310           0 :             if (nErrors > 100)
    1311             :             {
    1312           0 :                 LBWARN << "Too many errors in a row, capping connection"
    1313           0 :                        << std::endl;
    1314           0 :                 _handleDisconnect();
    1315             :             }
    1316           0 :             break;
    1317             : 
    1318             :         case ConnectionSet::EVENT_SELECT_ERROR:
    1319           0 :             LBWARN << "Error during select" << std::endl;
    1320           0 :             ++nErrors;
    1321           0 :             if (nErrors > 10)
    1322             :             {
    1323           0 :                 LBWARN << "Too many errors in a row" << std::endl;
    1324           0 :                 LBUNIMPLEMENTED;
    1325             :             }
    1326           0 :             break;
    1327             : 
    1328             :         case ConnectionSet::EVENT_INTERRUPT:
    1329         289 :             _redispatchCommands();
    1330         289 :             break;
    1331             : 
    1332             :         default:
    1333           0 :             LBUNIMPLEMENTED;
    1334             :         }
    1335             :     }
    1336             : 
    1337          47 :     if (!_impl->pendingCommands.empty())
    1338           0 :         LBWARN << _impl->pendingCommands.size()
    1339           0 :                << " commands pending while leaving command thread" << std::endl;
    1340             : 
    1341          47 :     _impl->pendingCommands.clear();
    1342          47 :     LBCHECK(_impl->commandThread->join());
    1343             : 
    1344          94 :     ConnectionPtr connection = getConnection();
    1345          94 :     PipeConnectionPtr pipe = LBSAFECAST(PipeConnection*, connection.get());
    1346          47 :     connection = pipe->getSibling();
    1347          47 :     _removeConnection(connection);
    1348          47 :     _impl->connectionNodes.erase(connection);
    1349          47 :     _disconnect();
    1350             : 
    1351          47 :     const Connections& connections = _impl->incoming.getConnections();
    1352         189 :     while (!connections.empty())
    1353             :     {
    1354          71 :         connection = connections.back();
    1355         142 :         NodePtr node = _impl->connectionNodes[connection];
    1356             : 
    1357          71 :         if (node)
    1358          71 :             _closeNode(node);
    1359          71 :         _removeConnection(connection);
    1360             :     }
    1361             : 
    1362          47 :     _impl->objectStore->clear();
    1363          47 :     _impl->pendingCommands.clear();
    1364          47 :     _impl->smallBuffers.flush();
    1365          47 :     _impl->bigBuffers.flush();
    1366             : 
    1367         235 :     LBDEBUG << "Leaving receiver thread of " << lunchbox::className(this)
    1368         188 :             << std::endl;
    1369          47 : }
    1370             : 
    1371          33 : void LocalNode::_handleConnect()
    1372             : {
    1373          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1374          66 :     ConnectionPtr newConn = connection->acceptSync();
    1375          33 :     connection->acceptNB();
    1376             : 
    1377          33 :     if (newConn)
    1378          33 :         _addConnection(newConn);
    1379             :     else
    1380           0 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1381          33 : }
    1382             : 
    1383          33 : void LocalNode::_handleDisconnect()
    1384             : {
    1385          33 :     while (_handleData())
    1386             :         ; // read remaining data off connection
    1387             : 
    1388          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1389          33 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find(connection);
    1390             : 
    1391          33 :     if (i != _impl->connectionNodes.end())
    1392             :     {
    1393          66 :         NodePtr node = i->second;
    1394             : 
    1395          33 :         node->ref(); // extend lifetime to give cmd handler a chance
    1396             : 
    1397             :         // local command dispatching
    1398          66 :         OCommand(this, this, CMD_NODE_REMOVE_NODE)
    1399          99 :             << node.get() << uint32_t(LB_UNDEFINED_UINT32);
    1400             : 
    1401          33 :         if (node->getConnection() == connection)
    1402          33 :             _closeNode(node);
    1403           0 :         else if (connection->isMulticast())
    1404           0 :             node->_removeMulticast(connection);
    1405             :     }
    1406             : 
    1407          33 :     _removeConnection(connection);
    1408          33 : }
    1409             : 
    1410       72137 : bool LocalNode::_handleData()
    1411             : {
    1412       72137 :     _impl->smallBuffers.compact();
    1413       72137 :     _impl->bigBuffers.compact();
    1414             : 
    1415      144274 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1416       72137 :     LBASSERT(connection);
    1417             : 
    1418      144274 :     BufferPtr buffer = _readHead(connection);
    1419       72137 :     if (!buffer) // fluke signal
    1420          66 :         return false;
    1421             : 
    1422      144142 :     ICommand command = _setupCommand(connection, buffer);
    1423       72071 :     const bool gotCommand = _readTail(command, buffer, connection);
    1424       72071 :     LBASSERT(gotCommand);
    1425             : 
    1426             :     // start next receive
    1427      144142 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
    1428       72071 :     connection->recvNB(nextBuffer, COMMAND_MINSIZE);
    1429             : 
    1430       72071 :     if (gotCommand)
    1431             :     {
    1432       72071 :         _dispatchCommand(command);
    1433       72071 :         return true;
    1434             :     }
    1435             : 
    1436           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1437           0 :     return false;
    1438             : }
    1439             : 
    1440       72137 : BufferPtr LocalNode::_readHead(ConnectionPtr connection)
    1441             : {
    1442      144274 :     BufferPtr buffer;
    1443       72137 :     const bool gotSize = connection->recvSync(buffer, false);
    1444             : 
    1445       72137 :     if (!buffer) // fluke signal
    1446             :     {
    1447           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1448           0 :                << std::endl;
    1449           0 :         _impl->incoming.setDirty();
    1450           0 :         return 0;
    1451             :     }
    1452             : 
    1453       72137 :     if (gotSize)
    1454       72071 :         return buffer;
    1455             : 
    1456             :     // Some systems signal data on dead connections.
    1457          66 :     buffer->setSize(0);
    1458          66 :     connection->recvNB(buffer, COMMAND_MINSIZE);
    1459          66 :     return 0;
    1460             : }
    1461             : 
    1462       72071 : ICommand LocalNode::_setupCommand(ConnectionPtr connection,
    1463             :                                   ConstBufferPtr buffer)
    1464             : {
    1465      144142 :     NodePtr node;
    1466       72071 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find(connection);
    1467       72071 :     if (i != _impl->connectionNodes.end())
    1468       72005 :         node = i->second;
    1469       72071 :     LBVERB << "Handle data from " << node << std::endl;
    1470             : 
    1471       72071 :     ICommand command(this, node, buffer);
    1472             : 
    1473       72071 :     if (node)
    1474       72005 :         node->_setLastReceive(getTime64());
    1475             : 
    1476      144142 :     return command;
    1477             : }
    1478             : 
    1479       72071 : bool LocalNode::_readTail(ICommand& command, BufferPtr buffer,
    1480             :                           ConnectionPtr connection)
    1481             : {
    1482       72071 :     const uint64_t needed = command.getSize();
    1483       72071 :     if (needed <= buffer->getSize())
    1484       62060 :         return true;
    1485             : 
    1486       10011 :     if (needed > buffer->getMaxSize())
    1487             :     {
    1488           0 :         LBASSERT(needed > COMMAND_ALLOCSIZE);
    1489           0 :         LBASSERTINFO(needed < LB_BIT48,
    1490             :                      "Out-of-sync network stream: " << command << "?");
    1491             :         // not enough space for remaining data, alloc and copy to new buffer
    1492           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc(needed);
    1493           0 :         newBuffer->replace(*buffer);
    1494           0 :         buffer = newBuffer;
    1495             : 
    1496           0 :         command = ICommand(this, command.getRemoteNode(), buffer);
    1497             :     }
    1498             : 
    1499             :     // read remaining data
    1500       10011 :     connection->recvNB(buffer, command.getSize() - buffer->getSize());
    1501       10011 :     return connection->recvSync(buffer);
    1502             : }
    1503             : 
    1504          34 : BufferPtr LocalNode::allocBuffer(const uint64_t size)
    1505             : {
    1506          34 :     LBASSERT(_impl->receiverThread->isStopped() || _impl->inReceiverThread());
    1507             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE
    1508           0 :                            ? _impl->bigBuffers.alloc(size)
    1509          34 :                            : _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
    1510          34 :     return buffer;
    1511             : }
    1512             : 
    1513       72118 : void LocalNode::_dispatchCommand(ICommand& command)
    1514             : {
    1515       72118 :     LBASSERTINFO(command.isValid(), command);
    1516             : 
    1517       72118 :     if (dispatchCommand(command))
    1518       72118 :         _redispatchCommands();
    1519             :     else
    1520             :     {
    1521           0 :         _redispatchCommands();
    1522           0 :         _impl->pendingCommands.push_back(command);
    1523             :     }
    1524       72118 : }
    1525             : 
    1526       72154 : bool LocalNode::dispatchCommand(ICommand& command)
    1527             : {
    1528       72154 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1529       72154 :     LBASSERTINFO(command.isValid(), command);
    1530             : 
    1531       72154 :     const uint32_t type = command.getType();
    1532       72154 :     switch (type)
    1533             :     {
    1534             :     case COMMANDTYPE_NODE:
    1535       71649 :         LBCHECK(Dispatcher::dispatchCommand(command));
    1536       71649 :         return true;
    1537             : 
    1538             :     case COMMANDTYPE_OBJECT:
    1539         505 :         return _impl->objectStore->dispatchObjectCommand(command);
    1540             : 
    1541             :     default:
    1542           0 :         LBABORT("Unknown command type " << type << " for " << command);
    1543           0 :         return true;
    1544             :     }
    1545             : }
    1546             : 
    1547       72407 : void LocalNode::_redispatchCommands()
    1548             : {
    1549       72407 :     bool changes = true;
    1550       72407 :     while (changes && !_impl->pendingCommands.empty())
    1551             :     {
    1552           0 :         changes = false;
    1553             : 
    1554           0 :         for (CommandList::iterator i = _impl->pendingCommands.begin();
    1555           0 :              i != _impl->pendingCommands.end(); ++i)
    1556             :         {
    1557           0 :             ICommand& command = *i;
    1558           0 :             LBASSERT(command.isValid());
    1559             : 
    1560           0 :             if (dispatchCommand(command))
    1561             :             {
    1562           0 :                 _impl->pendingCommands.erase(i);
    1563           0 :                 changes = true;
    1564           0 :                 break;
    1565             :             }
    1566             :         }
    1567             :     }
    1568             : 
    1569             : #ifndef NDEBUG
    1570       72407 :     if (!_impl->pendingCommands.empty())
    1571           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1572           0 :                << std::endl;
    1573       72407 :     LBASSERT(_impl->pendingCommands.size() < 200);
    1574             : #endif
    1575       72407 : }
    1576             : 
    1577          48 : void LocalNode::_initService()
    1578             : {
    1579          93 :     LB_TS_SCOPED(_rcvThread);
    1580          48 :     _impl->service->withdraw(); // go silent during k/v update
    1581             : 
    1582          93 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1583          48 :     if (descs.empty())
    1584           3 :         return;
    1585             : 
    1586          90 :     std::ostringstream out;
    1587          45 :     out << getType();
    1588          45 :     _impl->service->set("co_type", out.str());
    1589             : 
    1590          45 :     out.str("");
    1591          45 :     out << descs.size();
    1592          45 :     _impl->service->set("co_numPorts", out.str());
    1593             : 
    1594          90 :     for (ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i)
    1595             :     {
    1596          90 :         ConnectionDescriptionPtr desc = *i;
    1597          45 :         out.str("");
    1598          45 :         out << "co_port" << i - descs.begin();
    1599          45 :         _impl->service->set(out.str(), desc->toString());
    1600             :     }
    1601             : 
    1602          45 :     _impl->service->announce(descs.front()->port, getNodeID().getString());
    1603             : }
    1604             : 
    1605          47 : void LocalNode::_exitService()
    1606             : {
    1607          47 :     _impl->service->withdraw();
    1608          47 : }
    1609             : 
    1610           1 : Zeroconf LocalNode::getZeroconf()
    1611             : {
    1612           2 :     lunchbox::ScopedWrite mutex(_impl->service);
    1613           1 :     _impl->service->discover(servus::Servus::IF_ALL, 500);
    1614           2 :     return Zeroconf(_impl->service.data);
    1615             : }
    1616             : 
    1617             : //----------------------------------------------------------------------
    1618             : // command thread functions
    1619             : //----------------------------------------------------------------------
    1620          48 : bool LocalNode::_startCommandThread(const int32_t threadID)
    1621             : {
    1622          48 :     _impl->commandThread->threadID = threadID;
    1623          48 :     return _impl->commandThread->start();
    1624             : }
    1625             : 
    1626       51502 : bool LocalNode::_notifyCommandThreadIdle()
    1627             : {
    1628       51502 :     return _impl->objectStore->notifyCommandThreadIdle();
    1629             : }
    1630             : 
    1631       20001 : bool LocalNode::_cmdAckRequest(ICommand& command)
    1632             : {
    1633       20001 :     const uint32_t requestID = command.get<uint32_t>();
    1634       20001 :     LBASSERT(requestID != LB_UNDEFINED_UINT32);
    1635             : 
    1636       20001 :     serveRequest(requestID);
    1637       20001 :     return true;
    1638             : }
    1639             : 
    1640          47 : bool LocalNode::_cmdStopRcv(ICommand& command)
    1641             : {
    1642          47 :     LB_TS_THREAD(_rcvThread);
    1643          47 :     LBASSERT(isListening());
    1644             : 
    1645          47 :     _exitService();
    1646          47 :     _setClosing(); // causes rcv thread exit
    1647             : 
    1648          47 :     command.setCommand(CMD_NODE_STOP_CMD); // causes cmd thread exit
    1649          47 :     _dispatchCommand(command);
    1650          47 :     return true;
    1651             : }
    1652             : 
    1653          47 : bool LocalNode::_cmdStopCmd(ICommand&)
    1654             : {
    1655          47 :     LB_TS_THREAD(_cmdThread);
    1656          47 :     LBASSERTINFO(isClosing(), *this);
    1657             : 
    1658          47 :     _setClosed();
    1659          47 :     return true;
    1660             : }
    1661             : 
    1662           0 : bool LocalNode::_cmdSetAffinity(ICommand& command)
    1663             : {
    1664           0 :     const int32_t affinity = command.get<int32_t>();
    1665             : 
    1666           0 :     lunchbox::Thread::setAffinity(affinity);
    1667           0 :     return true;
    1668             : }
    1669             : 
    1670          33 : bool LocalNode::_cmdConnect(ICommand& command)
    1671             : {
    1672          33 :     LBASSERTINFO(!command.getRemoteNode(), command);
    1673          33 :     LBASSERT(_impl->inReceiverThread());
    1674             : 
    1675          33 :     const NodeID& nodeID = command.get<NodeID>();
    1676          33 :     const uint32_t requestID = command.get<uint32_t>();
    1677          33 :     const uint32_t nodeType = command.get<uint32_t>();
    1678          66 :     std::string data = command.get<std::string>();
    1679             : 
    1680          33 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1681          33 :            << nodeType << " data " << data << std::endl;
    1682             : 
    1683          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1684             : 
    1685          33 :     LBASSERT(connection);
    1686          33 :     LBASSERT(nodeID != getNodeID());
    1687          33 :     LBASSERT(_impl->connectionNodes.find(connection) ==
    1688             :              _impl->connectionNodes.end());
    1689             : 
    1690          66 :     NodePtr peer;
    1691          33 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1692             : 
    1693             :     // No locking needed, only recv thread modifies
    1694          33 :     NodeHashCIter i = _impl->nodes->find(nodeID);
    1695          33 :     if (i != _impl->nodes->end())
    1696             :     {
    1697           0 :         peer = i->second;
    1698           0 :         if (peer->isReachable())
    1699             :         {
    1700             :             // Node exists, probably simultaneous connect from peer
    1701           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1702           0 :                    << std::endl;
    1703             : 
    1704             :             // refuse connection
    1705           0 :             OCommand(Connections(1, connection), cmd) << NodeID() << requestID;
    1706             : 
    1707             :             // NOTE: There is no close() here. The reply command above has to be
    1708             :             // received by the peer first, before closing the connection.
    1709           0 :             _removeConnection(connection);
    1710           0 :             return true;
    1711             :         }
    1712             :     }
    1713             : 
    1714             :     // create and add connected node
    1715          33 :     if (!peer)
    1716          33 :         peer = createNode(nodeType);
    1717          33 :     if (!peer)
    1718             :     {
    1719           0 :         LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
    1720           0 :                 << std::endl;
    1721             : 
    1722             :         // refuse connection
    1723           0 :         OCommand(Connections(1, connection), cmd) << NodeID() << requestID;
    1724             : 
    1725             :         // NOTE: There is no close() here. The reply command above has to be
    1726             :         // received by the peer first, before closing the connection.
    1727           0 :         _removeConnection(connection);
    1728           0 :         return true;
    1729             :     }
    1730             : 
    1731          33 :     if (!peer->deserialize(data))
    1732           0 :         LBWARN << "Error during node initialization" << std::endl;
    1733          33 :     LBASSERTINFO(data.empty(), data);
    1734          33 :     LBASSERTINFO(peer->getNodeID() == nodeID, peer->getNodeID() << "!="
    1735             :                                                                 << nodeID);
    1736          33 :     LBASSERT(peer->getType() == nodeType);
    1737             : 
    1738          33 :     _impl->connectionNodes[connection] = peer;
    1739             :     {
    1740          66 :         lunchbox::ScopedFastWrite mutex(_impl->nodes);
    1741          33 :         _impl->nodes.data[peer->getNodeID()] = peer;
    1742             :     }
    1743          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1744             : 
    1745             :     // send our information as reply
    1746          66 :     OCommand(Connections(1, connection), cmd) << getNodeID() << requestID
    1747          99 :                                               << getType() << serialize();
    1748             : 
    1749          33 :     return true;
    1750             : }
    1751             : 
    1752          33 : bool LocalNode::_cmdConnectReply(ICommand& command)
    1753             : {
    1754          33 :     LBASSERT(!command.getRemoteNode());
    1755          33 :     LBASSERT(_impl->inReceiverThread());
    1756             : 
    1757          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1758          33 :     LBASSERT(_impl->connectionNodes.find(connection) ==
    1759             :              _impl->connectionNodes.end());
    1760             : 
    1761          33 :     const NodeID& nodeID = command.get<NodeID>();
    1762          33 :     const uint32_t requestID = command.get<uint32_t>();
    1763             : 
    1764             :     // connection refused
    1765          33 :     if (nodeID == 0)
    1766             :     {
    1767           0 :         LBINFO << "Connection refused, node already connected by peer"
    1768           0 :                << std::endl;
    1769             : 
    1770           0 :         _removeConnection(connection);
    1771           0 :         serveRequest(requestID, false);
    1772           0 :         return true;
    1773             :     }
    1774             : 
    1775          33 :     const uint32_t nodeType = command.get<uint32_t>();
    1776          66 :     std::string data = command.get<std::string>();
    1777             : 
    1778          33 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1779          33 :            << " type " << nodeType << " data " << data << std::endl;
    1780             : 
    1781             :     // No locking needed, only recv thread modifies
    1782          33 :     NodeHash::const_iterator i = _impl->nodes->find(nodeID);
    1783          66 :     NodePtr peer;
    1784          33 :     if (i != _impl->nodes->end())
    1785           0 :         peer = i->second;
    1786             : 
    1787          33 :     if (peer && peer->isReachable()) // simultaneous connect
    1788             :     {
    1789           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1790           0 :                << connection << std::endl;
    1791             : 
    1792           0 :         _removeConnection(connection);
    1793           0 :         _closeNode(peer);
    1794           0 :         serveRequest(requestID, false);
    1795           0 :         return true;
    1796             :     }
    1797             : 
    1798             :     // create and add node
    1799          33 :     if (!peer)
    1800             :     {
    1801          33 :         if (requestID != LB_UNDEFINED_UINT32)
    1802          33 :             peer = reinterpret_cast<Node*>(getRequestData(requestID));
    1803             :         else
    1804           0 :             peer = createNode(nodeType);
    1805             :     }
    1806          33 :     if (!peer)
    1807             :     {
    1808           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1809           0 :                << std::endl;
    1810           0 :         _removeConnection(connection);
    1811           0 :         return true;
    1812             :     }
    1813             : 
    1814          33 :     LBASSERTINFO(peer->getType() == nodeType, peer->getType() << " != "
    1815             :                                                               << nodeType);
    1816          33 :     LBASSERT(peer->isClosed());
    1817             : 
    1818          33 :     if (!peer->deserialize(data))
    1819           0 :         LBWARN << "Error during node initialization" << std::endl;
    1820          33 :     LBASSERT(data.empty());
    1821          33 :     LBASSERT(peer->getNodeID() == nodeID);
    1822             : 
    1823             :     // send ACK to peer
    1824             :     // cppcheck-suppress unusedScopedObject
    1825          33 :     OCommand(Connections(1, connection), CMD_NODE_CONNECT_ACK);
    1826             : 
    1827          33 :     peer->_connect(connection);
    1828          33 :     _impl->connectionNodes[connection] = peer;
    1829             :     {
    1830          66 :         lunchbox::ScopedFastWrite mutex(_impl->nodes);
    1831          33 :         _impl->nodes.data[peer->getNodeID()] = peer;
    1832             :     }
    1833          33 :     _connectMulticast(peer);
    1834          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1835             : 
    1836          33 :     serveRequest(requestID, true);
    1837             : 
    1838          33 :     notifyConnect(peer);
    1839          33 :     return true;
    1840             : }
    1841             : 
    1842          33 : bool LocalNode::_cmdConnectAck(ICommand& command)
    1843             : {
    1844          66 :     NodePtr node = command.getRemoteNode();
    1845          33 :     LBASSERT(node);
    1846          33 :     LBASSERT(_impl->inReceiverThread());
    1847          33 :     LBVERB << "handle connect ack" << std::endl;
    1848             : 
    1849          33 :     node->_connect(_impl->incoming.getConnection());
    1850          33 :     _connectMulticast(node);
    1851          33 :     notifyConnect(node);
    1852          66 :     return true;
    1853             : }
    1854             : 
    1855           0 : bool LocalNode::_cmdID(ICommand& command)
    1856             : {
    1857           0 :     LBASSERT(_impl->inReceiverThread());
    1858             : 
    1859           0 :     const NodeID& nodeID = command.get<NodeID>();
    1860           0 :     uint32_t nodeType = command.get<uint32_t>();
    1861           0 :     std::string data = command.get<std::string>();
    1862             : 
    1863           0 :     if (command.getRemoteNode())
    1864             :     {
    1865           0 :         LBASSERT(nodeID == command.getRemoteNode()->getNodeID());
    1866           0 :         LBASSERT(command.getRemoteNode()->_getMulticast());
    1867           0 :         return true;
    1868             :     }
    1869             : 
    1870           0 :     LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
    1871             : 
    1872           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1873           0 :     LBASSERT(connection->isMulticast());
    1874           0 :     LBASSERT(_impl->connectionNodes.find(connection) ==
    1875             :              _impl->connectionNodes.end());
    1876             : 
    1877           0 :     NodePtr node;
    1878           0 :     if (nodeID == getNodeID()) // 'self' multicast connection
    1879           0 :         node = this;
    1880             :     else
    1881             :     {
    1882             :         // No locking needed, only recv thread writes
    1883           0 :         NodeHash::const_iterator i = _impl->nodes->find(nodeID);
    1884             : 
    1885           0 :         if (i == _impl->nodes->end())
    1886             :         {
    1887             :             // unknown node: create and add unconnected node
    1888           0 :             node = createNode(nodeType);
    1889             : 
    1890           0 :             if (!node->deserialize(data))
    1891           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1892           0 :             LBASSERTINFO(data.empty(), data);
    1893             : 
    1894             :             {
    1895           0 :                 lunchbox::ScopedFastWrite mutex(_impl->nodes);
    1896           0 :                 _impl->nodes.data[nodeID] = node;
    1897             :             }
    1898           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1899           0 :                    << connection << std::endl;
    1900             :         }
    1901             :         else
    1902           0 :             node = i->second;
    1903             :     }
    1904           0 :     LBASSERT(node);
    1905           0 :     LBASSERTINFO(node->getNodeID() == nodeID, node->getNodeID() << "!="
    1906             :                                                                 << nodeID);
    1907             : 
    1908           0 :     _connectMulticast(node, connection);
    1909           0 :     _impl->connectionNodes[connection] = node;
    1910           0 :     LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
    1911           0 :             << " to " << getNodeID() << std::endl;
    1912           0 :     return true;
    1913             : }
    1914             : 
    1915           7 : bool LocalNode::_cmdDisconnect(ICommand& command)
    1916             : {
    1917           7 :     LBASSERT(_impl->inReceiverThread());
    1918             : 
    1919           7 :     const uint32_t requestID = command.get<uint32_t>();
    1920             : 
    1921          14 :     NodePtr node = static_cast<Node*>(getRequestData(requestID));
    1922           7 :     LBASSERT(node);
    1923             : 
    1924           7 :     _closeNode(node);
    1925           7 :     LBASSERT(node->isClosed());
    1926           7 :     serveRequest(requestID);
    1927          14 :     return true;
    1928             : }
    1929             : 
    1930           0 : bool LocalNode::_cmdGetNodeData(ICommand& command)
    1931             : {
    1932           0 :     const NodeID& nodeID = command.get<NodeID>();
    1933           0 :     const uint32_t requestID = command.get<uint32_t>();
    1934             : 
    1935           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1936           0 :            << " nodeID " << nodeID << std::endl;
    1937             : 
    1938           0 :     NodePtr node = getNode(nodeID);
    1939           0 :     NodePtr toNode = command.getRemoteNode();
    1940             : 
    1941           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1942           0 :     std::string nodeData;
    1943           0 :     if (node)
    1944             :     {
    1945           0 :         nodeType = node->getType();
    1946           0 :         nodeData = node->serialize();
    1947           0 :         LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID
    1948           0 :                 << " to " << toNode << std::endl;
    1949             :     }
    1950             : 
    1951           0 :     toNode->send(CMD_NODE_GET_NODE_DATA_REPLY) << nodeID << requestID
    1952           0 :                                                << nodeType << nodeData;
    1953           0 :     return true;
    1954             : }
    1955             : 
    1956           0 : bool LocalNode::_cmdGetNodeDataReply(ICommand& command)
    1957             : {
    1958           0 :     LBASSERT(_impl->inReceiverThread());
    1959             : 
    1960           0 :     const NodeID& nodeID = command.get<NodeID>();
    1961           0 :     const uint32_t requestID = command.get<uint32_t>();
    1962           0 :     const uint32_t nodeType = command.get<uint32_t>();
    1963           0 :     std::string nodeData = command.get<std::string>();
    1964             : 
    1965           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    1966           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    1967             : 
    1968             :     // No locking needed, only recv thread writes
    1969           0 :     NodeHash::const_iterator i = _impl->nodes->find(nodeID);
    1970           0 :     if (i != _impl->nodes->end())
    1971             :     {
    1972             :         // Requested node connected to us in the meantime
    1973           0 :         NodePtr node = i->second;
    1974             : 
    1975           0 :         node->ref(this);
    1976           0 :         serveRequest(requestID, node.get());
    1977           0 :         return true;
    1978             :     }
    1979             : 
    1980           0 :     if (nodeType == NODETYPE_INVALID)
    1981             :     {
    1982           0 :         serveRequest(requestID, (void*)0);
    1983           0 :         return true;
    1984             :     }
    1985             : 
    1986             :     // new node: create and add unconnected node
    1987           0 :     NodePtr node = createNode(nodeType);
    1988           0 :     if (node)
    1989             :     {
    1990           0 :         LBASSERT(node);
    1991             : 
    1992           0 :         if (!node->deserialize(nodeData))
    1993           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    1994           0 :         LBASSERT(nodeData.empty());
    1995           0 :         node->ref(this);
    1996             :     }
    1997             :     else
    1998           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    1999             : 
    2000           0 :     serveRequest(requestID, node.get());
    2001           0 :     return true;
    2002             : }
    2003             : 
    2004           0 : bool LocalNode::_cmdAcquireSendToken(ICommand& command)
    2005             : {
    2006           0 :     LBASSERT(inCommandThread());
    2007           0 :     if (!_impl->sendToken) // enqueue command if no token available
    2008             :     {
    2009           0 :         const uint32_t timeout = Global::getTimeout();
    2010           0 :         if (timeout == LB_TIMEOUT_INDEFINITE ||
    2011           0 :             (getTime64() - _impl->lastSendToken <= timeout))
    2012             :         {
    2013           0 :             _impl->sendTokenQueue.push_back(command);
    2014           0 :             return true;
    2015             :         }
    2016             : 
    2017             :         // timeout! - clear old requests
    2018           0 :         _impl->sendTokenQueue.clear();
    2019             :         // 'generate' new token - release is robust
    2020             :     }
    2021             : 
    2022           0 :     _impl->sendToken = false;
    2023             : 
    2024           0 :     const uint32_t requestID = command.get<uint32_t>();
    2025           0 :     command.getRemoteNode()->send(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY)
    2026           0 :         << requestID;
    2027           0 :     return true;
    2028             : }
    2029             : 
    2030           0 : bool LocalNode::_cmdAcquireSendTokenReply(ICommand& command)
    2031             : {
    2032           0 :     const uint32_t requestID = command.get<uint32_t>();
    2033           0 :     serveRequest(requestID);
    2034           0 :     return true;
    2035             : }
    2036             : 
    2037           0 : bool LocalNode::_cmdReleaseSendToken(ICommand&)
    2038             : {
    2039           0 :     LBASSERT(inCommandThread());
    2040           0 :     _impl->lastSendToken = getTime64();
    2041             : 
    2042           0 :     if (_impl->sendToken)
    2043           0 :         return true; // double release due to timeout
    2044           0 :     if (_impl->sendTokenQueue.empty())
    2045             :     {
    2046           0 :         _impl->sendToken = true;
    2047           0 :         return true;
    2048             :     }
    2049             : 
    2050           0 :     ICommand& request = _impl->sendTokenQueue.front();
    2051             : 
    2052           0 :     const uint32_t requestID = request.get<uint32_t>();
    2053           0 :     request.getRemoteNode()->send(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY)
    2054           0 :         << requestID;
    2055           0 :     _impl->sendTokenQueue.pop_front();
    2056           0 :     return true;
    2057             : }
    2058             : 
    2059           0 : bool LocalNode::_cmdAddListener(ICommand& command)
    2060             : {
    2061             :     Connection* rawConnection =
    2062           0 :         reinterpret_cast<Connection*>(command.get<uint64_t>());
    2063           0 :     std::string data = command.get<std::string>();
    2064             : 
    2065           0 :     ConnectionDescriptionPtr description = new ConnectionDescription(data);
    2066           0 :     command.getRemoteNode()->_addConnectionDescription(description);
    2067             : 
    2068           0 :     if (command.getRemoteNode() != this)
    2069           0 :         return true;
    2070             : 
    2071           0 :     ConnectionPtr connection = rawConnection;
    2072           0 :     connection->unref();
    2073           0 :     LBASSERT(connection);
    2074             : 
    2075           0 :     _impl->connectionNodes[connection] = this;
    2076           0 :     if (connection->isMulticast())
    2077           0 :         _addMulticast(this, connection);
    2078             : 
    2079           0 :     connection->acceptNB();
    2080           0 :     _impl->incoming.addConnection(connection);
    2081             : 
    2082           0 :     _initService(); // update zeroconf
    2083           0 :     return true;
    2084             : }
    2085             : 
    2086           0 : bool LocalNode::_cmdRemoveListener(ICommand& command)
    2087             : {
    2088           0 :     const uint32_t requestID = command.get<uint32_t>();
    2089           0 :     Connection* rawConnection = command.get<Connection*>();
    2090           0 :     std::string data = command.get<std::string>();
    2091             : 
    2092           0 :     ConnectionDescriptionPtr description = new ConnectionDescription(data);
    2093           0 :     LBCHECK(command.getRemoteNode()->_removeConnectionDescription(description));
    2094             : 
    2095           0 :     if (command.getRemoteNode() != this)
    2096           0 :         return true;
    2097             : 
    2098           0 :     _initService(); // update zeroconf
    2099             : 
    2100           0 :     ConnectionPtr connection = rawConnection;
    2101           0 :     connection->unref(this);
    2102           0 :     LBASSERT(connection);
    2103             : 
    2104           0 :     if (connection->isMulticast())
    2105           0 :         _removeMulticast(connection);
    2106             : 
    2107           0 :     _impl->incoming.removeConnection(connection);
    2108           0 :     LBASSERT(_impl->connectionNodes.find(connection) !=
    2109             :              _impl->connectionNodes.end());
    2110           0 :     _impl->connectionNodes.erase(connection);
    2111           0 :     serveRequest(requestID);
    2112           0 :     return true;
    2113             : }
    2114             : 
    2115           0 : bool LocalNode::_cmdPing(ICommand& command)
    2116             : {
    2117           0 :     LBASSERT(inCommandThread());
    2118           0 :     command.getRemoteNode()->send(CMD_NODE_PING_REPLY);
    2119           0 :     return true;
    2120             : }
    2121             : 
    2122           2 : bool LocalNode::_cmdCommand(ICommand& command)
    2123             : {
    2124           2 :     const uint128_t& commandID = command.get<uint128_t>();
    2125           4 :     CommandHandler func;
    2126             :     {
    2127           3 :         lunchbox::ScopedFastRead mutex(_impl->commandHandlers);
    2128           2 :         CommandHashCIter i = _impl->commandHandlers->find(commandID);
    2129           2 :         if (i == _impl->commandHandlers->end())
    2130           0 :             return false;
    2131             : 
    2132           2 :         CommandQueue* queue = i->second.second;
    2133           2 :         if (queue)
    2134             :         {
    2135           2 :             command.setDispatchFunction(
    2136           3 :                 CmdFunc(this, &LocalNode::_cmdCommandAsync));
    2137           1 :             queue->push(command);
    2138           1 :             return true;
    2139             :         }
    2140             :         // else
    2141             : 
    2142           1 :         func = i->second.first;
    2143             :     }
    2144             : 
    2145           2 :     CustomICommand customCmd(command);
    2146           1 :     return func(customCmd);
    2147             : }
    2148             : 
    2149           1 : bool LocalNode::_cmdCommandAsync(ICommand& command)
    2150             : {
    2151           1 :     const uint128_t& commandID = command.get<uint128_t>();
    2152           2 :     CommandHandler func;
    2153             :     {
    2154           2 :         lunchbox::ScopedFastRead mutex(_impl->commandHandlers);
    2155           1 :         CommandHashCIter i = _impl->commandHandlers->find(commandID);
    2156           1 :         LBASSERT(i != _impl->commandHandlers->end());
    2157           1 :         if (i == _impl->commandHandlers->end())
    2158           0 :             return true; // deregistered between dispatch and now
    2159           1 :         func = i->second.first;
    2160             :     }
    2161           2 :     CustomICommand customCmd(command);
    2162           1 :     return func(customCmd);
    2163             : }
    2164             : 
    2165          33 : bool LocalNode::_cmdAddConnection(ICommand& command)
    2166             : {
    2167          33 :     LBASSERT(_impl->inReceiverThread());
    2168             : 
    2169          66 :     ConnectionPtr connection = command.get<ConnectionPtr>();
    2170          33 :     _addConnection(connection);
    2171          33 :     connection->unref(); // ref'd by _addConnection
    2172          66 :     return true;
    2173             : }
    2174          63 : }

Generated by: LCOV version 1.11