LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 584 1166 50.1 %
Date: 2016-07-14 01:20:24 Functions: 75 110 68.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/futureFunction.h>
      46             : #include <lunchbox/hash.h>
      47             : #include <lunchbox/launcher.h>
      48             : #include <lunchbox/lockable.h>
      49             : #include <lunchbox/request.h>
      50             : #include <lunchbox/rng.h>
      51             : #include <lunchbox/servus.h>
      52             : #include <lunchbox/sleep.h>
      53             : 
      54             : #include <boost/bind.hpp>
      55             : #include <boost/date_time/posix_time/posix_time.hpp>
      56             : #include <boost/lexical_cast.hpp>
      57             : 
      58             : #include <list>
      59             : #ifdef _MSC_VER
      60             : #  include <direct.h>  // for chdir
      61             : #  define chdir _chdir
      62             : #endif
      63             : 
      64             : namespace co
      65             : {
      66             : namespace
      67             : {
      68          22 : lunchbox::a_int32_t _threadIDs;
      69             : 
      70             : typedef CommandFunc< LocalNode > CmdFunc;
      71             : typedef std::list< ICommand > CommandList;
      72             : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
      73             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      74             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      75             : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
      76             : typedef NodeHash::const_iterator NodeHashCIter;
      77             : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
      78             : typedef HandlerHash::const_iterator HandlerHashCIter;
      79             : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
      80             : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
      81             : typedef CommandHash::const_iterator CommandHashCIter;
      82             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      83             : }
      84             : 
      85             : namespace detail
      86             : {
      87         106 : class ReceiverThread : public lunchbox::Thread
      88             : {
      89             : public:
      90          55 :     explicit ReceiverThread( co::LocalNode* localNode )
      91          55 :         : _localNode( localNode ) {}
      92             : 
      93          50 :     bool init() override
      94             :     {
      95          50 :         const int32_t threadID = ++_threadIDs - 1;
      96         100 :         setName( std::string( "Rcv" ) +
      97          50 :                  boost::lexical_cast< std::string >( threadID ));
      98          50 :         return _localNode->_startCommandThread( threadID );
      99             :     }
     100             : 
     101          50 :     void run() override { _localNode->_runReceiverThread(); }
     102             : 
     103             : private:
     104             :     co::LocalNode* const _localNode;
     105             : };
     106             : 
     107         106 : class CommandThread : public Worker
     108             : {
     109             : public:
     110          55 :     explicit CommandThread( co::LocalNode* localNode )
     111             :         : Worker( Global::getCommandQueueLimit( ))
     112             :         , threadID( 0 )
     113          55 :         , _localNode( localNode )
     114          55 :     {}
     115             : 
     116             :     int32_t threadID;
     117             : 
     118             : protected:
     119          50 :     bool init() override
     120             :     {
     121         100 :         setName( std::string( "Cmd" ) +
     122          50 :                  boost::lexical_cast< std::string >( threadID ));
     123          50 :         return true;
     124             :     }
     125             : 
     126      103339 :     bool stopRunning() override { return _localNode->isClosed(); }
     127       51608 :     bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
     128             : 
     129             : private:
     130             :     co::LocalNode* const _localNode;
     131             : };
     132             : 
     133             : class LocalNode
     134             : {
     135             : public:
     136          55 :     LocalNode()
     137             :         : smallBuffers( 200 )
     138             :         , bigBuffers( 20 )
     139             :         , sendToken( true )
     140             :         , lastSendToken( 0 )
     141             :         , objectStore( 0 )
     142             :         , receiverThread( 0 )
     143             :         , commandThread( 0 )
     144          55 :         , service( "_collage._tcp" )
     145          55 :     {}
     146             : 
     147          53 :     ~LocalNode()
     148          53 :     {
     149          53 :         LBASSERT( incoming.isEmpty( ));
     150          53 :         LBASSERT( connectionNodes.empty( ));
     151          53 :         LBASSERT( pendingCommands.empty( ));
     152          53 :         LBASSERT( nodes->empty( ));
     153             : 
     154          53 :         delete objectStore;
     155          53 :         objectStore = 0;
     156          53 :         LBASSERT( !commandThread->isRunning( ));
     157          53 :         delete commandThread;
     158          53 :         commandThread = 0;
     159             : 
     160          53 :         LBASSERT( !receiverThread->isRunning( ));
     161          53 :         delete receiverThread;
     162          53 :         receiverThread = 0;
     163          53 :     }
     164             : 
     165         280 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     166             : 
     167             :     /** Commands re-scheduled for dispatch. */
     168             :     CommandList pendingCommands;
     169             : 
     170             :     /** The command buffer 'allocator' for small packets */
     171             :     co::BufferCache smallBuffers;
     172             : 
     173             :     /** The command buffer 'allocator' for big packets */
     174             :     co::BufferCache bigBuffers;
     175             : 
     176             :     bool sendToken; //!< send token availability.
     177             :     uint64_t lastSendToken; //!< last used time for timeout detection
     178             :     std::deque< co::ICommand > sendTokenQueue; //!< pending requests
     179             : 
     180             :     /** Manager of distributed object */
     181             :     ObjectStore* objectStore;
     182             : 
     183             :     /** Needed for thread-safety during nodeID-based connect() */
     184             :     lunchbox::Lock connectLock;
     185             : 
     186             :     /** The node for each connection. */
     187             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     188             : 
     189             :     /** The connected nodes. */
     190             :     lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
     191             : 
     192             :     /** The connection set of all connections from/to this node. */
     193             :     co::ConnectionSet incoming;
     194             : 
     195             :     /** The process-global clock. */
     196             :     lunchbox::Clock clock;
     197             : 
     198             :     /** The registered push handlers. */
     199             :     lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
     200             : 
     201             :     /** The registered custom command handlers. */
     202             :     lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
     203             : 
     204             :     ReceiverThread* receiverThread;
     205             :     CommandThread* commandThread;
     206             : 
     207             :     lunchbox::Lockable< servus::Servus > service;
     208             : 
     209             :     // Performance counters:
     210             :     a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
     211             : };
     212             : }
     213             : 
     214          55 : LocalNode::LocalNode( const uint32_t type )
     215             :     : Node( type )
     216          55 :     , _impl( new detail::LocalNode )
     217             : {
     218          55 :     _impl->receiverThread = new detail::ReceiverThread( this );
     219          55 :     _impl->commandThread  = new detail::CommandThread( this );
     220          55 :     _impl->objectStore = new ObjectStore( this, _impl->counters );
     221             : 
     222          55 :     CommandQueue* queue = getCommandThreadQueue();
     223             :     registerCommand( CMD_NODE_CONNECT,
     224          55 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     225             :     registerCommand( CMD_NODE_CONNECT_BE,
     226          55 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     227             :     registerCommand( CMD_NODE_CONNECT_REPLY,
     228          55 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     229             :     registerCommand( CMD_NODE_CONNECT_REPLY_BE,
     230          55 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     231             :     registerCommand( CMD_NODE_ID,
     232          55 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     233             :     registerCommand( CMD_NODE_ID_BE,
     234          55 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     235             :     registerCommand( CMD_NODE_ACK_REQUEST,
     236          55 :                      CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
     237             :     registerCommand( CMD_NODE_STOP_RCV,
     238          55 :                      CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
     239             :     registerCommand( CMD_NODE_STOP_CMD,
     240          55 :                      CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
     241             :     registerCommand( CMD_NODE_SET_AFFINITY_RCV,
     242          55 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
     243             :     registerCommand( CMD_NODE_SET_AFFINITY_CMD,
     244          55 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
     245             :     registerCommand( CMD_NODE_CONNECT_ACK,
     246          55 :                      CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
     247             :     registerCommand( CMD_NODE_DISCONNECT,
     248          55 :                      CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
     249             :     registerCommand( CMD_NODE_GET_NODE_DATA,
     250          55 :                      CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
     251             :     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
     252          55 :                      CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
     253             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
     254          55 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
     255             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     256          55 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
     257             :     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
     258          55 :                      CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
     259             :     registerCommand( CMD_NODE_ADD_LISTENER,
     260          55 :                      CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
     261             :     registerCommand( CMD_NODE_REMOVE_LISTENER,
     262          55 :                      CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
     263             :     registerCommand( CMD_NODE_PING,
     264          55 :                      CmdFunc( this, &LocalNode::_cmdPing ), queue );
     265             :     registerCommand( CMD_NODE_PING_REPLY,
     266          55 :                      CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
     267             :     registerCommand( CMD_NODE_COMMAND,
     268          55 :                      CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
     269             :     registerCommand( CMD_NODE_ADD_CONNECTION,
     270          55 :                      CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
     271          55 : }
     272             : 
     273         148 : LocalNode::~LocalNode( )
     274             : {
     275          53 :     LBASSERT( !hasPendingRequests( ));
     276          53 :     delete _impl;
     277          95 : }
     278             : 
     279           1 : bool LocalNode::initLocal( const int argc, char** argv )
     280             : {
     281           1 :     std::string setupOpts;
     282             : 
     283             :     // We do not use getopt_long because it really does not work due to the
     284             :     // following aspects:
     285             :     // - reordering of arguments
     286             :     // - different behavior of GNU and BSD implementations
     287             :     // - incomplete man pages
     288           1 :     for( int i=1; i<argc; ++i )
     289             :     {
     290           0 :         if( std::string( "--eq-listen" ) == argv[i] )
     291           0 :             LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
     292           0 :         if( std::string( "--eq-listen" ) == argv[i] ||
     293           0 :             std::string( "--co-listen" ) == argv[i] )
     294             :         {
     295           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     296             :             {
     297           0 :                 std::string data = argv[++i];
     298           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     299           0 :                 desc->port = Global::getDefaultPort();
     300             : 
     301           0 :                 if( desc->fromString( data ))
     302             :                 {
     303           0 :                     addConnectionDescription( desc );
     304           0 :                     LBASSERTINFO( data.empty(), data );
     305             :                 }
     306             :                 else
     307           0 :                     LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
     308             :             }
     309             :             else
     310             :             {
     311           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     312             :             }
     313             :         }
     314           0 :         else if( std::string( "--co-globals" ) == argv[i] )
     315             :         {
     316           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     317             :             {
     318           0 :                 const std::string data = argv[++i];
     319           0 :                 if( !Global::fromString( data ))
     320             :                 {
     321           0 :                     LBWARN << "Invalid global variables string: " << data
     322           0 :                            << ", using default global variables." << std::endl;
     323           0 :                 }
     324             :             }
     325             :             else
     326             :             {
     327           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     328             :             }
     329             :         }
     330           0 :         else if( std::string( "--co-launch" ) == argv[i] )
     331             :         {
     332           0 :             if( i == argc-1 || argv[i+1][0] == '-' )
     333           0 :                 LBTHROW( std::runtime_error( "Missing options to --co-launch"));
     334             : 
     335           0 :             setupOpts = argv[ ++i ];
     336           0 :             if( !deserialize( setupOpts ))
     337           0 :                 LBTHROW( std::runtime_error( "Corrupt --co-launch argument" ));
     338           0 :             LBASSERT( !setupOpts.empty( ));
     339             :         }
     340             :     }
     341             : 
     342           1 :     if( !listen( ))
     343             :     {
     344           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
     345           0 :                << std::endl;
     346           0 :         return false;
     347             :     }
     348             : 
     349           1 :     if( !setupOpts.empty( ))
     350           0 :         return _setupPeer( setupOpts );
     351           1 :     return true;
     352             : }
     353             : 
     354          50 : bool LocalNode::listen()
     355             : {
     356          50 :     LBVERB << "Listener data: " << serialize() << std::endl;
     357          50 :     if( !isClosed() || !_connectSelf( ))
     358           0 :         return false;
     359             : 
     360          50 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     361         291 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     362         194 :          i != descriptions.end(); ++i )
     363             :     {
     364          47 :         ConnectionDescriptionPtr description = *i;
     365          94 :         ConnectionPtr connection = Connection::create( description );
     366             : 
     367          47 :         if( !connection || !connection->listen( ))
     368             :         {
     369           0 :             LBWARN << "Can't create listener connection: " << description
     370           0 :                    << std::endl;
     371           0 :             return false;
     372             :         }
     373             : 
     374          47 :         _impl->connectionNodes[ connection ] = this;
     375          47 :         if( connection->isMulticast( ))
     376           0 :             _addMulticast( this, connection );
     377             : 
     378          47 :         connection->acceptNB();
     379          47 :         _impl->incoming.addConnection( connection );
     380             : 
     381          94 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     382         141 :                << std::endl;
     383          47 :     }
     384             : 
     385         100 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     386         150 :            << std::endl;
     387             : 
     388          50 :     _setListening();
     389          50 :     _impl->receiverThread->start();
     390             : 
     391          50 :     LBDEBUG << *this << std::endl;
     392          50 :     return true;
     393             : }
     394             : 
     395          49 : bool LocalNode::close()
     396             : {
     397          49 :     if( !isListening() )
     398           0 :         return false;
     399             : 
     400          49 :     send( CMD_NODE_STOP_RCV );
     401             : 
     402          49 :     LBCHECK( _impl->receiverThread->join( ));
     403          49 :     _cleanup();
     404             : 
     405         147 :     LBDEBUG << _impl->incoming.getSize() << " connections open after close"
     406         147 :            << std::endl;
     407             : #ifndef NDEBUG
     408          49 :     const Connections& connections = _impl->incoming.getConnections();
     409         147 :     for( Connections::const_iterator i = connections.begin();
     410          98 :          i != connections.end(); ++i )
     411             :     {
     412           0 :         LBDEBUG << "    " << *i << std::endl;
     413             :     }
     414             : #endif
     415             : 
     416          49 :     LBASSERTINFO( !hasPendingRequests(),
     417             :                   *static_cast< lunchbox::RequestHandler* >( this ));
     418          49 :     return true;
     419             : }
     420             : 
     421           0 : void LocalNode::setAffinity( const int32_t affinity )
     422             : {
     423           0 :     send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
     424           0 :     send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
     425             : 
     426           0 :     lunchbox::Thread::setAffinity( affinity );
     427           0 : }
     428             : 
     429          34 : void LocalNode::addConnection( ConnectionPtr connection )
     430             : {
     431          34 :     connection->ref(); // unref in _cmdAddConnection
     432          34 :     send( CMD_NODE_ADD_CONNECTION ) << connection;
     433          34 : }
     434             : 
     435           0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
     436             : {
     437           0 :     LBASSERT( isListening( ));
     438             : 
     439           0 :     ConnectionPtr connection = Connection::create( desc );
     440           0 :     if( connection && connection->listen( ))
     441             :     {
     442           0 :         addListener( connection );
     443           0 :         return connection;
     444             :     }
     445             : 
     446           0 :     return 0;
     447             : }
     448             : 
     449           0 : void LocalNode::addListener( ConnectionPtr connection )
     450             : {
     451           0 :     LBASSERT( isListening( ));
     452           0 :     LBASSERT( connection->isListening( ));
     453           0 :     if( !isListening() || !connection->isListening( ))
     454           0 :         return;
     455             : 
     456           0 :     connection->ref( this ); // unref in self handler
     457             : 
     458             :     // Update everybody's description list of me, add the listener to myself in
     459             :     // my handler
     460           0 :     const Nodes& nodes = getNodes();
     461           0 :     for( NodePtr node : nodes )
     462             :         node->send( CMD_NODE_ADD_LISTENER )
     463           0 :             << (uint64_t)(connection.get( ))
     464           0 :             << connection->getDescription()->toString();
     465             : }
     466             : 
     467           0 : void LocalNode::removeListeners( const Connections& connections )
     468             : {
     469           0 :     std::vector< lunchbox::Request< void > > requests;
     470           0 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     471             :     {
     472           0 :         ConnectionPtr connection = *i;
     473           0 :         requests.push_back( _removeListener( connection ));
     474           0 :     }
     475           0 : }
     476             : 
     477           0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
     478             : {
     479           0 :     LBASSERT( isListening( ));
     480           0 :     LBASSERTINFO( !conn->isConnected(), conn );
     481             : 
     482           0 :     conn->ref( this );
     483           0 :     const lunchbox::Request< void > request = registerRequest< void >();
     484             : 
     485           0 :     const Nodes& nodes = getNodes();
     486           0 :     for( NodePtr node : nodes )
     487             :         node->send( CMD_NODE_REMOVE_LISTENER )
     488           0 :             << request << conn.get() << conn->getDescription()->toString();
     489           0 :     return request;
     490             : }
     491             : 
     492         152 : void LocalNode::_addConnection( ConnectionPtr connection )
     493             : {
     494         152 :     if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
     495             :     {
     496          34 :         addConnection( connection );
     497         186 :         return;
     498             :     }
     499             : 
     500         118 :     BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
     501         118 :     connection->recvNB( buffer, COMMAND_MINSIZE );
     502         118 :     _impl->incoming.addConnection( connection );
     503             : }
     504             : 
     505         224 : void LocalNode::_removeConnection( ConnectionPtr connection )
     506             : {
     507         224 :     LBASSERT( connection );
     508             : 
     509         224 :     _impl->incoming.removeConnection( connection );
     510         224 :     connection->resetRecvData();
     511         224 :     if( !connection->isClosed( ))
     512         130 :         connection->close(); // cancel pending IO's
     513         224 : }
     514             : 
     515          49 : void LocalNode::_cleanup()
     516             : {
     517          49 :     LBVERB << "Clean up stopped node" << std::endl;
     518          49 :     LBASSERTINFO( isClosed(), *this );
     519             : 
     520          49 :     if( !_impl->connectionNodes.empty( ))
     521         141 :         LBDEBUG << _impl->connectionNodes.size()
     522         141 :                << " open connections during cleanup" << std::endl;
     523             : #ifndef NDEBUG
     524         288 :     for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     525         192 :          i != _impl->connectionNodes.end(); ++i )
     526             :     {
     527          47 :         NodePtr node = i->second;
     528          47 :         LBDEBUG << "    " << i->first << " : " << node << std::endl;
     529          47 :     }
     530             : #endif
     531             : 
     532          49 :     _impl->connectionNodes.clear();
     533             : 
     534          49 :     if( !_impl->nodes->empty( ))
     535           6 :         LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
     536           6 :                << std::endl;
     537             : 
     538             : #ifndef NDEBUG
     539          51 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     540             :     {
     541           2 :         NodePtr node = i->second;
     542           2 :         LBDEBUG << "    " << node << std::endl;
     543           2 :     }
     544             : #endif
     545             : 
     546          49 :     _impl->nodes->clear();
     547          49 : }
     548             : 
     549         115 : void LocalNode::_closeNode( NodePtr node )
     550             : {
     551         115 :     ConnectionPtr connection = node->getConnection();
     552         230 :     ConnectionPtr mcConnection = node->_getMulticast();
     553             : 
     554         115 :     node->_disconnect();
     555             : 
     556         115 :     if( connection )
     557             :     {
     558          68 :         LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
     559             :                       _impl->connectionNodes.end(), connection );
     560             : 
     561          68 :         _removeConnection( connection );
     562          68 :         _impl->connectionNodes.erase( connection );
     563             :     }
     564             : 
     565         115 :     if( mcConnection )
     566             :     {
     567           0 :         _removeConnection( mcConnection );
     568           0 :         _impl->connectionNodes.erase( mcConnection );
     569             :     }
     570             : 
     571         115 :     _impl->objectStore->removeInstanceData( node->getNodeID( ));
     572             : 
     573         230 :     lunchbox::ScopedFastWrite mutex( _impl->nodes );
     574         115 :     _impl->nodes->erase( node->getNodeID( ));
     575         115 :     notifyDisconnect( node );
     576         230 :     LBDEBUG << node << " disconnected from " << *this << std::endl;
     577         115 : }
     578             : 
     579          50 : bool LocalNode::_connectSelf()
     580             : {
     581             :     // setup local connection to myself
     582          50 :     PipeConnectionPtr connection = new PipeConnection;
     583          50 :     if( !connection->connect( ))
     584             :     {
     585           0 :         LBERROR << "Could not create local connection to receiver thread."
     586           0 :                 << std::endl;
     587           0 :         return false;
     588             :     }
     589             : 
     590          50 :     Node::_connect( connection->getSibling( ));
     591          50 :     _setClosed(); // reset state after _connect set it to connected
     592             : 
     593             :     // add to connection set
     594          50 :     LBASSERT( connection->getDescription( ));
     595          50 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
     596             :               _impl->connectionNodes.end( ));
     597             : 
     598          50 :     _impl->connectionNodes[ connection ] = this;
     599          50 :     _impl->nodes.data[ getNodeID() ] = this;
     600          50 :     _addConnection( connection );
     601             : 
     602         100 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     603         150 :            << std::endl;
     604          50 :     return true;
     605             : }
     606             : 
     607           8 : bool LocalNode::disconnect( NodePtr node )
     608             : {
     609           8 :     if( !node || !isListening() )
     610           0 :         return false;
     611             : 
     612           8 :     if( !node->isConnected( ))
     613           0 :         return true;
     614             : 
     615           8 :     LBASSERT( !inCommandThread( ));
     616           8 :     lunchbox::Request< void > request = registerRequest< void >( node.get( ));
     617           8 :     send( CMD_NODE_DISCONNECT ) << request;
     618             : 
     619           8 :     request.wait();
     620           8 :     _impl->objectStore->removeNode( node );
     621           8 :     return true;
     622             : }
     623             : 
     624       20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
     625             : {
     626       20001 :     if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
     627       20001 :         return;
     628             : 
     629       20001 :     if( node == this ) // OPT
     630           0 :         serveRequest( requestID );
     631             :     else
     632       20001 :         node->send( CMD_NODE_ACK_REQUEST ) << requestID;
     633             : }
     634             : 
     635           0 : void LocalNode::ping( NodePtr peer )
     636             : {
     637           0 :     LBASSERT( !_impl->inReceiverThread( ));
     638           0 :     peer->send( CMD_NODE_PING );
     639           0 : }
     640             : 
     641           0 : bool LocalNode::pingIdleNodes()
     642             : {
     643           0 :     LBASSERT( !_impl->inReceiverThread( ) );
     644           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     645           0 :     const Nodes& nodes = getNodes( false );
     646             : 
     647           0 :     bool pinged = false;
     648           0 :     for( NodePtr node : nodes )
     649             :     {
     650           0 :         if( getTime64() - node->getLastReceiveTime() > timeout )
     651             :         {
     652           0 :             LBINFO << " Ping Node: " <<  node->getNodeID() << " last seen "
     653           0 :                    << node->getLastReceiveTime() << std::endl;
     654           0 :             node->send( CMD_NODE_PING );
     655           0 :             pinged = true;
     656             :         }
     657           0 :     }
     658           0 :     return pinged;
     659             : }
     660             : 
     661             : //----------------------------------------------------------------------
     662             : // Object functionality
     663             : //----------------------------------------------------------------------
     664           0 : void LocalNode::disableInstanceCache()
     665             : {
     666           0 :     _impl->objectStore->disableInstanceCache();
     667           0 : }
     668             : 
     669           0 : void LocalNode::expireInstanceData( const int64_t age )
     670             : {
     671           0 :     _impl->objectStore->expireInstanceData( age );
     672           0 : }
     673             : 
     674           0 : void LocalNode::enableSendOnRegister()
     675             : {
     676           0 :     _impl->objectStore->enableSendOnRegister();
     677           0 : }
     678             : 
     679           0 : void LocalNode::disableSendOnRegister()
     680             : {
     681           0 :     _impl->objectStore->disableSendOnRegister();
     682           0 : }
     683             : 
     684          21 : bool LocalNode::registerObject( Object* object )
     685             : {
     686          21 :     return _impl->objectStore->register_( object );
     687             : }
     688             : 
     689          21 : void LocalNode::deregisterObject( Object* object )
     690             : {
     691          21 :     _impl->objectStore->deregister( object );
     692          21 : }
     693             : 
     694          40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
     695             :                                NodePtr master, const uint128_t& version )
     696             : {
     697             :     const uint32_t request = _impl->objectStore->mapNB( object, id, version,
     698          40 :                                                         master );
     699             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
     700          40 :                                                  _impl->objectStore, request );
     701          40 :     return f_bool_t( new FuturebImpl( func ));
     702             : }
     703             : 
     704           0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     705             :                                  const uint128_t& version )
     706             : {
     707           0 :     return _impl->objectStore->mapNB( object, id, version, 0 );
     708             : }
     709             : 
     710           2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     711             :                                  const uint128_t& version, NodePtr master )
     712             : {
     713           2 :     return _impl->objectStore->mapNB( object, id, version, master );
     714             : }
     715             : 
     716             : 
     717           2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
     718             : {
     719           2 :     return _impl->objectStore->mapSync( requestID );
     720             : }
     721             : 
     722           4 : f_bool_t LocalNode::syncObject( Object* object, const uint128_t& id,
     723             :                                 NodePtr master, const uint32_t instanceID )
     724             : {
     725           4 :     return _impl->objectStore->sync( object, id, master, instanceID );
     726             : }
     727             : 
     728          42 : void LocalNode::unmapObject( Object* object )
     729             : {
     730          42 :     _impl->objectStore->unmap( object );
     731          42 : }
     732             : 
     733           0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
     734             : {
     735           0 :     _impl->objectStore->swap( oldObject, newObject );
     736           0 : }
     737             : 
     738           0 : void LocalNode::objectPush( const uint128_t& groupID,
     739             :                             const uint128_t& objectType,
     740             :                             const uint128_t& objectID, DataIStream& istream )
     741             : {
     742           0 :     lunchbox::ScopedRead mutex( _impl->pushHandlers );
     743           0 :     HandlerHashCIter i = _impl->pushHandlers->find( groupID );
     744           0 :     if( i != _impl->pushHandlers->end( ))
     745           0 :         i->second( groupID, objectType, objectID, istream );
     746             :     else
     747           0 :         LBWARN << "No custom handler for push group " << groupID
     748           0 :                << " registered" << std::endl;
     749             : 
     750           0 :     if( istream.wasUsed() && istream.hasData( ))
     751           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     752           0 :                << objectType << " object " << objectID << std::endl;
     753           0 : }
     754             : 
     755           0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
     756             :                                      const PushHandler& handler )
     757             : {
     758           0 :     lunchbox::ScopedWrite mutex( _impl->pushHandlers );
     759           0 :     (*_impl->pushHandlers)[ groupID ] = handler;
     760           0 : }
     761             : 
     762           2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
     763             :                                         const CommandHandler& func,
     764             :                                         CommandQueue* queue )
     765             : {
     766           2 :     lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
     767           2 :     if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
     768             :     {
     769           0 :         LBWARN << "Already got a registered handler for custom command "
     770           0 :                << command << std::endl;
     771           0 :         return false;
     772             :     }
     773             : 
     774             :     _impl->commandHandlers->insert( std::make_pair( command,
     775           2 :                                                 std::make_pair( func, queue )));
     776           2 :     return true;
     777             : }
     778             : 
     779           0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
     780             : {
     781           0 :     LBASSERT( !inCommandThread( ));
     782           0 :     LBASSERT( !_impl->inReceiverThread( ));
     783             : 
     784           0 :     lunchbox::Request< void > request = registerRequest< void >();
     785           0 :     node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
     786             : 
     787             :     try
     788             :     {
     789           0 :         request.wait( Global::getTimeout( ));
     790             :     }
     791           0 :     catch( const lunchbox::FutureTimeout& )
     792             :     {
     793           0 :         LBERROR << "Timeout while acquiring send token " << request.getID()
     794           0 :                 << std::endl;
     795           0 :         request.unregister();
     796           0 :         return 0;
     797             :     }
     798           0 :     return new co::SendToken( node );
     799             : }
     800             : 
     801           0 : void LocalNode::releaseSendToken( SendToken token )
     802             : {
     803           0 :     LBASSERT( !_impl->inReceiverThread( ));
     804           0 :     if( token )
     805           0 :         token->release();
     806           0 : }
     807             : 
     808             : //----------------------------------------------------------------------
     809             : // Connecting a node
     810             : //----------------------------------------------------------------------
     811             : namespace
     812             : {
     813             : enum ConnectResult
     814             : {
     815             :     CONNECT_OK,
     816             :     CONNECT_TRY_AGAIN,
     817             :     CONNECT_BAD_STATE,
     818             :     CONNECT_TIMEOUT,
     819             :     CONNECT_UNREACHABLE
     820             : };
     821             : }
     822             : 
     823          73 : NodePtr LocalNode::connect( const NodeID& nodeID )
     824             : {
     825          73 :     LBASSERT( nodeID != 0 );
     826          74 :     LBASSERT( isListening( ));
     827             : 
     828             :     // Make sure that only one connection request based on the node identifier
     829             :     // is pending at a given time. Otherwise a node with the same id might be
     830             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     831             :     // mutex is to register connecting nodes with this local node, and handle
     832             :     // all cases correctly, which is far more complex. Node connections only
     833             :     // happen a lot during initialization, and are therefore not time-critical.
     834          74 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     835             : 
     836         148 :     Nodes nodes = getNodes();
     837          77 :     for( NodePtr peer : nodes )
     838             :     {
     839          77 :         if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
     840          74 :             return peer;
     841           3 :     }
     842             : 
     843           0 :     LBDEBUG << "Connecting node " << nodeID << std::endl;
     844           0 :     for( NodePtr peer : nodes )
     845             :     {
     846           0 :         NodePtr node = _connect( nodeID, peer );
     847           0 :         if( node )
     848           0 :             return node;
     849           0 :     }
     850             :     {
     851           0 :         NodePtr node = _connectFromZeroconf( nodeID );
     852           0 :         if( node )
     853           0 :             return node;
     854             :     }
     855             :     // check again if node connected by itself by now
     856           0 :     nodes = getNodes();
     857           0 :     for( NodePtr node : nodes )
     858           0 :         if( node->getNodeID() == nodeID && node->isReachable( ))
     859           0 :             return node;
     860             : 
     861           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     862          74 :     return 0;
     863             : }
     864             : 
     865           0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
     866             : {
     867           0 :     LBASSERT( nodeID != 0 );
     868             : 
     869           0 :     NodePtr node;
     870             :     {
     871           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     872           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     873           0 :         if( i != _impl->nodes->end( ))
     874           0 :             node = i->second;
     875             :     }
     876             : 
     877           0 :     LBASSERT( getNodeID() != nodeID );
     878           0 :     if( !node )
     879             :     {
     880           0 :         lunchbox::Request< void* > request = registerRequest< void* >();
     881           0 :         peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
     882           0 :         node = reinterpret_cast< Node* >( request.wait( ));
     883           0 :         if( !node )
     884             :         {
     885           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     886           0 :                    << std::endl;
     887           0 :             return 0;
     888             :         }
     889           0 :         node->unref( this ); // ref'd before serveRequest()
     890             :     }
     891             : 
     892           0 :     if( node->isReachable( ))
     893           0 :         return node;
     894             : 
     895           0 :     size_t tries = 10;
     896           0 :     while( --tries )
     897             :     {
     898           0 :         switch( _connect( node ))
     899             :         {
     900             :           case CONNECT_OK:
     901           0 :               return node;
     902             :           case CONNECT_TRY_AGAIN:
     903             :           {
     904           0 :               lunchbox::RNG rng;
     905             :               // collision avoidance
     906           0 :               lunchbox::sleep( rng.get< uint8_t >( ));
     907           0 :               break;
     908             :           }
     909             :           case CONNECT_BAD_STATE:
     910           0 :               LBWARN << "Internal connect error" << std::endl;
     911             :               // no break;
     912             :           case CONNECT_TIMEOUT:
     913           0 :               return 0;
     914             : 
     915             :           case CONNECT_UNREACHABLE:
     916           0 :               break; // maybe peer talks to us
     917             :         }
     918             : 
     919           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     920             :         // connect failed - check for simultaneous connect from peer
     921           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     922           0 :         if( i != _impl->nodes->end( ))
     923           0 :             node = i->second;
     924           0 :     }
     925             : 
     926           0 :     return node->isReachable() ? node : 0;
     927             : }
     928             : 
     929           0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
     930             : {
     931           0 :     lunchbox::ScopedWrite mutex( _impl->service );
     932             : 
     933             :     const Strings& instances =
     934           0 :         _impl->service->discover( servus::Servus::IF_ALL, 500 );
     935           0 :     for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
     936             :     {
     937           0 :         const std::string& instance = *i;
     938           0 :         const NodeID candidate( instance );
     939           0 :         if( candidate != nodeID )
     940           0 :             continue;
     941             : 
     942           0 :         const std::string& typeStr = _impl->service->get( instance, "co_type" );
     943           0 :         if( typeStr.empty( ))
     944           0 :             return 0;
     945             : 
     946           0 :         std::istringstream in( typeStr );
     947           0 :         uint32_t type = 0;
     948           0 :         in >> type;
     949             : 
     950           0 :         NodePtr node = createNode( type );
     951           0 :         if( !node )
     952             :         {
     953           0 :             LBINFO << "Can't create node of type " << type << std::endl;
     954           0 :             continue;
     955             :         }
     956             : 
     957             :         const std::string& numStr = _impl->service->get( instance,
     958           0 :                                                          "co_numPorts" );
     959           0 :         uint32_t num = 0;
     960             : 
     961           0 :         in.clear();
     962           0 :         in.str( numStr );
     963           0 :         in >> num;
     964           0 :         LBASSERT( num > 0 );
     965           0 :         for( size_t j = 0; j < num; ++j )
     966             :         {
     967           0 :             ConnectionDescriptionPtr desc = new ConnectionDescription;
     968           0 :             std::ostringstream out;
     969           0 :             out << "co_port" << j;
     970             : 
     971           0 :             std::string descStr = _impl->service->get( instance, out.str( ));
     972           0 :             LBASSERT( !descStr.empty( ));
     973           0 :             LBCHECK( desc->fromString( descStr ));
     974           0 :             LBASSERT( descStr.empty( ));
     975           0 :             node->addConnectionDescription( desc );
     976           0 :         }
     977           0 :         mutex.leave();
     978           0 :         if( _connect( node ))
     979           0 :             return node;
     980           0 :     }
     981           0 :     return 0;
     982             : }
     983             : 
     984          34 : bool LocalNode::connect( NodePtr node )
     985             : {
     986          34 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     987          34 :     return ( _connect( node ) == CONNECT_OK );
     988             : }
     989             : 
     990          34 : uint32_t LocalNode::_connect( NodePtr node )
     991             : {
     992          34 :     LBASSERTINFO( isListening(), *this );
     993          34 :     if( node->isReachable( ))
     994           0 :         return CONNECT_OK;
     995             : 
     996          34 :     LBASSERT( node->isClosed( ));
     997          34 :     LBDEBUG << "Connecting " << node << std::endl;
     998             : 
     999             :     // try connecting using the given descriptions
    1000          34 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
    1001         102 :     for( ConnectionDescriptionsCIter i = cds.begin();
    1002          68 :         i != cds.end(); ++i )
    1003             :     {
    1004          34 :         ConnectionDescriptionPtr description = *i;
    1005          34 :         if( description->type >= CONNECTIONTYPE_MULTICAST )
    1006           0 :             continue; // Don't use multicast for primary connections
    1007             : 
    1008          34 :         ConnectionPtr connection = Connection::create( description );
    1009          34 :         if( !connection || !connection->connect( ))
    1010           0 :             continue;
    1011             : 
    1012          34 :         return _connect( node, connection );
    1013           0 :     }
    1014             : 
    1015           0 :     LBDEBUG << "Node " << node
    1016           0 :            << " unreachable, all connections failed to connect" <<std::endl;
    1017           0 :     return CONNECT_UNREACHABLE;
    1018             : }
    1019             : 
    1020           0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
    1021             : {
    1022           0 :     return ( _connect( node, connection ) == CONNECT_OK );
    1023             : }
    1024             : 
    1025          34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
    1026             : {
    1027          34 :     LBASSERT( connection );
    1028          34 :     LBASSERT( node->getNodeID() != getNodeID( ));
    1029             : 
    1030          68 :     if( !node || !isListening() || !connection->isConnected() ||
    1031          34 :         !node->isClosed( ))
    1032             :     {
    1033           0 :         return CONNECT_BAD_STATE;
    1034             :     }
    1035             : 
    1036          34 :     _addConnection( connection );
    1037             : 
    1038             :     // send connect command to peer
    1039          34 :     lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
    1040             : #ifdef COLLAGE_BIGENDIAN
    1041             :     uint32_t cmd = CMD_NODE_CONNECT_BE;
    1042             :     lunchbox::byteswap( cmd );
    1043             : #else
    1044          34 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1045             : #endif
    1046             :     OCommand( Connections( 1, connection ), cmd )
    1047          34 :         << getNodeID() << request << getType() << serialize();
    1048             : 
    1049          34 :     bool connected = false;
    1050             :     try
    1051             :     {
    1052          34 :         connected = request.wait( 10000 /*ms*/ );
    1053             :     }
    1054           0 :     catch( const lunchbox::FutureTimeout& )
    1055             :     {
    1056           0 :         LBWARN << "Node connection handshake timeout - " << node
    1057           0 :                << " not a Collage node?" << std::endl;
    1058           0 :         request.unregister();
    1059           0 :         return CONNECT_TIMEOUT;
    1060             :     }
    1061             : 
    1062             :     // In simultaneous connect case, depending on the connection type
    1063             :     // (e.g. RDMA), a check on the connection state of the node is required
    1064          34 :     if( !connected || !node->isConnected( ))
    1065           0 :         return CONNECT_TRY_AGAIN;
    1066             : 
    1067          34 :     LBASSERT( node->getNodeID() != 0 );
    1068          34 :     LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
    1069          34 :     LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
    1070          34 :     return CONNECT_OK;
    1071             : }
    1072             : 
    1073          40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
    1074             : {
    1075          40 :     LBASSERTINFO( id.isUUID(), id );
    1076          40 :     if( !id.isUUID( ))
    1077             :     {
    1078           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1079           0 :         return 0;
    1080             :     }
    1081             : 
    1082          40 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
    1083          40 :     if( masterNodeID == 0 )
    1084             :     {
    1085           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1086           0 :         return 0;
    1087             :     }
    1088             : 
    1089          40 :     NodePtr master = connect( masterNodeID );
    1090          40 :     if( master && !master->isClosed( ))
    1091          40 :         return master;
    1092             : 
    1093           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1094           0 :            << " for object " << id << std::endl;
    1095           0 :     return 0;
    1096             : }
    1097             : 
    1098           0 : bool LocalNode::launch( NodePtr node, const std::string& command )
    1099             : {
    1100           0 :     size_t lastPos = 0;
    1101           0 :     std::string cmd;
    1102             : 
    1103           0 :     const std::string& quote = node->getLaunchQuote();
    1104             :     std::string launchOptions =
    1105           0 :         std::string( " --co-launch " ) + quote + node->serialize() +
    1106           0 :         node->getWorkDir() + CO_SEPARATOR + std::to_string( getType( )) +
    1107           0 :         CO_SEPARATOR + serialize() + quote +
    1108           0 :         " --co-globals " + quote + co::Global::toString() + quote;
    1109             : 
    1110           0 :     for( size_t percentPos = command.find( '%' );
    1111             :          percentPos != std::string::npos;
    1112           0 :          percentPos = command.find( '%', percentPos + 1 ))
    1113             :     {
    1114           0 :         std::string replacement;
    1115           0 :         switch( command[percentPos+1] )
    1116             :         {
    1117             :         case 'h':
    1118           0 :             replacement = node->getHostname();
    1119           0 :             if( replacement.empty( ))
    1120           0 :                 replacement = "127.0.0.1";
    1121           0 :             break;
    1122             : 
    1123             :         case 'n':
    1124           0 :             replacement = node->getNodeID().getString();
    1125           0 :             break;
    1126             : 
    1127             :         case 'd':
    1128           0 :             replacement = node->getWorkDir();
    1129           0 :             break;
    1130             : 
    1131             :         case 'q':
    1132           0 :             replacement = quote;
    1133           0 :             break;
    1134             : 
    1135             :         case 'o':
    1136           0 :             replacement = launchOptions;
    1137           0 :             launchOptions.clear();
    1138           0 :             break;
    1139             : 
    1140             :         default:
    1141           0 :             LBWARN << "Unknown token " << command[percentPos+1] << std::endl;
    1142           0 :             replacement = command.substr( percentPos, percentPos+1 );
    1143             :         }
    1144             : 
    1145           0 :         cmd += command.substr( lastPos, percentPos-lastPos ) + replacement;
    1146           0 :         lastPos = percentPos + 2;
    1147           0 :     }
    1148             : 
    1149           0 :     cmd += command.substr( lastPos ) + launchOptions;
    1150             : 
    1151           0 :     LBDEBUG << "Launching: " << cmd << std::endl;
    1152           0 :     if( lunchbox::Launcher::run( cmd ))
    1153           0 :         return true;
    1154             : 
    1155           0 :     LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
    1156           0 :     return false;
    1157             : }
    1158             : 
    1159           0 : NodePtr LocalNode::syncLaunch( const uint128_t& nodeID, const int64_t timeout )
    1160             : {
    1161           0 :     const lunchbox::Clock clock;
    1162             : 
    1163           0 :     do
    1164             :     {
    1165           0 :         co::NodePtr node = getNode( nodeID );
    1166           0 :         if( node && node->isConnected( ))
    1167           0 :             return node;
    1168             : 
    1169           0 :         lunchbox::sleep( 100 /*ms*/ );
    1170             :     }
    1171           0 :     while( timeout == static_cast< int32_t >( LB_TIMEOUT_INDEFINITE ) ||
    1172           0 :            clock.getTime64() < timeout );
    1173             : 
    1174           0 :     return nullptr;
    1175             : }
    1176             : 
    1177           0 : bool LocalNode::_setupPeer( const std::string& setupOpts )
    1178             : {
    1179           0 :     size_t nextPos = setupOpts.find( CO_SEPARATOR );
    1180           0 :     if( nextPos == std::string::npos )
    1181             :     {
    1182           0 :         LBERROR << "Could not parse working directory: " << setupOpts
    1183           0 :                 << std::endl;
    1184           0 :         return false;
    1185             :     }
    1186             : 
    1187           0 :     const std::string workDir = setupOpts.substr( 0, nextPos );
    1188           0 :     std::string description = setupOpts.substr( nextPos + 1 );
    1189             : 
    1190           0 :     if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
    1191           0 :         LBWARN << "Can't change working directory to " << workDir << ": "
    1192           0 :                << lunchbox::sysError << std::endl;
    1193             : 
    1194           0 :     nextPos = description.find( CO_SEPARATOR );
    1195           0 :     if( nextPos == std::string::npos )
    1196             :     {
    1197           0 :         LBERROR << "Could not parse server node type: " << description
    1198           0 :                 << " is left from " << setupOpts << std::endl;
    1199           0 :         return false;
    1200             :     }
    1201             : 
    1202           0 :     const std::string nodeType = description.substr( 0, nextPos );
    1203           0 :     description = description.substr( nextPos + 1 );
    1204             : 
    1205           0 :     co::NodePtr peer = createNode( std::stoi( nodeType ));
    1206           0 :     if( !peer->deserialize( description ))
    1207           0 :         LBWARN << "Can't parse peer data" << std::endl;
    1208             : 
    1209           0 :     LBASSERTINFO( description.empty(), description );
    1210           0 :     if( !connect( peer ))
    1211             :     {
    1212           0 :         LBERROR << "Can't connect peer node using " << *peer << std::endl;
    1213           0 :         return false;
    1214             :     }
    1215           0 :     return true;
    1216             : }
    1217             : 
    1218          34 : NodePtr LocalNode::createNode( const uint32_t type )
    1219             : {
    1220          34 :     LBASSERTINFO( type == NODETYPE_NODE, type );
    1221          34 :     return new Node( type );
    1222             : }
    1223             : 
    1224           0 : NodePtr LocalNode::getNode( const NodeID& id ) const
    1225             : {
    1226           0 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1227           0 :     NodeHash::const_iterator i = _impl->nodes->find( id );
    1228           0 :     if( i == _impl->nodes->end() || !i->second->isReachable( ))
    1229           0 :         return 0;
    1230           0 :     return i->second;
    1231             : }
    1232             : 
    1233         114 : Nodes LocalNode::getNodes( const bool addSelf ) const
    1234             : {
    1235         114 :     Nodes nodes;
    1236         228 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1237         334 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1238             :     {
    1239         220 :         NodePtr node = i->second;
    1240         220 :         if( node->isReachable() && ( addSelf || node != this ))
    1241         220 :             nodes.push_back( i->second );
    1242         220 :     }
    1243         228 :     return nodes;
    1244             : }
    1245             : 
    1246         146 : CommandQueue* LocalNode::getCommandThreadQueue()
    1247             : {
    1248         146 :     return _impl->commandThread->getWorkerQueue();
    1249             : }
    1250             : 
    1251           8 : bool LocalNode::inCommandThread() const
    1252             : {
    1253           8 :     return _impl->commandThread->isCurrent();
    1254             : }
    1255             : 
    1256       72428 : int64_t LocalNode::getTime64() const
    1257             : {
    1258       72428 :     return _impl->clock.getTime64();
    1259             : }
    1260             : 
    1261           0 : ssize_t LocalNode::getCounter( const Counter counter ) const
    1262             : {
    1263           0 :     return _impl->counters[ counter ];
    1264             : }
    1265             : 
    1266         196 : void LocalNode::flushCommands()
    1267             : {
    1268         196 :     _impl->incoming.interrupt();
    1269         196 : }
    1270             : 
    1271             : //----------------------------------------------------------------------
    1272             : // receiver thread functions
    1273             : //----------------------------------------------------------------------
    1274          50 : void LocalNode::_runReceiverThread()
    1275             : {
    1276          50 :     LB_TS_THREAD( _rcvThread );
    1277          50 :     _initService();
    1278             : 
    1279          50 :     int nErrors = 0;
    1280       72653 :     while( isListening( ))
    1281             :     {
    1282       72554 :         const ConnectionSet::Event result = _impl->incoming.select();
    1283       72553 :         switch( result )
    1284             :         {
    1285             :             case ConnectionSet::EVENT_CONNECT:
    1286          34 :                 _handleConnect();
    1287          34 :                 break;
    1288             : 
    1289             :             case ConnectionSet::EVENT_DATA:
    1290       72179 :                 nErrors = 0;
    1291       72179 :                 _handleData();
    1292       72179 :                 break;
    1293             : 
    1294             :             case ConnectionSet::EVENT_DISCONNECT:
    1295             :             case ConnectionSet::EVENT_INVALID_HANDLE:
    1296          34 :                 _handleDisconnect();
    1297          34 :                 break;
    1298             : 
    1299             :             case ConnectionSet::EVENT_TIMEOUT:
    1300           0 :                 LBINFO << "select timeout" << std::endl;
    1301           0 :                 break;
    1302             : 
    1303             :             case ConnectionSet::EVENT_ERROR:
    1304           0 :                 ++nErrors;
    1305           0 :                 LBWARN << "Connection error during select" << std::endl;
    1306           0 :                 if( nErrors > 100 )
    1307             :                 {
    1308           0 :                     LBWARN << "Too many errors in a row, capping connection"
    1309           0 :                            << std::endl;
    1310           0 :                     _handleDisconnect();
    1311             :                 }
    1312           0 :                 break;
    1313             : 
    1314             :             case ConnectionSet::EVENT_SELECT_ERROR:
    1315           0 :                 LBWARN << "Error during select" << std::endl;
    1316           0 :                 ++nErrors;
    1317           0 :                 if( nErrors > 10 )
    1318             :                 {
    1319           0 :                     LBWARN << "Too many errors in a row" << std::endl;
    1320           0 :                     LBUNIMPLEMENTED;
    1321             :                 }
    1322           0 :                 break;
    1323             : 
    1324             :             case ConnectionSet::EVENT_INTERRUPT:
    1325         306 :                 _redispatchCommands();
    1326         306 :                 break;
    1327             : 
    1328             :             default:
    1329           0 :                 LBUNIMPLEMENTED;
    1330             :         }
    1331             :     }
    1332             : 
    1333          49 :     if( !_impl->pendingCommands.empty( ))
    1334           0 :         LBWARN << _impl->pendingCommands.size()
    1335           0 :                << " commands pending while leaving command thread" << std::endl;
    1336             : 
    1337          49 :     _impl->pendingCommands.clear();
    1338          49 :     LBCHECK( _impl->commandThread->join( ));
    1339             : 
    1340          49 :     ConnectionPtr connection = getConnection();
    1341          98 :     PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
    1342          49 :     connection = pipe->getSibling();
    1343          49 :     _removeConnection( connection );
    1344          49 :     _impl->connectionNodes.erase( connection );
    1345          49 :     _disconnect();
    1346             : 
    1347          49 :     const Connections& connections = _impl->incoming.getConnections();
    1348         171 :     while( !connections.empty( ))
    1349             :     {
    1350          73 :         connection = connections.back();
    1351          73 :         NodePtr node = _impl->connectionNodes[ connection ];
    1352             : 
    1353          73 :         if( node )
    1354          73 :             _closeNode( node );
    1355          73 :         _removeConnection( connection );
    1356          73 :     }
    1357             : 
    1358          49 :     _impl->objectStore->clear();
    1359          49 :     _impl->pendingCommands.clear();
    1360          49 :     _impl->smallBuffers.flush();
    1361          49 :     _impl->bigBuffers.flush();
    1362             : 
    1363         245 :     LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
    1364         245 :            << std::endl;
    1365          49 : }
    1366             : 
    1367          34 : void LocalNode::_handleConnect()
    1368             : {
    1369          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1370          68 :     ConnectionPtr newConn = connection->acceptSync();
    1371          34 :     connection->acceptNB();
    1372             : 
    1373          34 :     if( newConn )
    1374          34 :         _addConnection( newConn );
    1375             :     else
    1376          34 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1377          34 : }
    1378             : 
    1379          34 : void LocalNode::_handleDisconnect()
    1380             : {
    1381          34 :     while( _handleData( )) ; // read remaining data off connection
    1382             : 
    1383          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1384          34 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
    1385             : 
    1386          34 :     if( i != _impl->connectionNodes.end( ))
    1387             :     {
    1388          34 :         NodePtr node = i->second;
    1389             : 
    1390          34 :         node->ref(); // extend lifetime to give cmd handler a chance
    1391             : 
    1392             :         // local command dispatching
    1393             :         OCommand( this, this, CMD_NODE_REMOVE_NODE )
    1394          34 :                 << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
    1395             : 
    1396          34 :         if( node->getConnection() == connection )
    1397          34 :             _closeNode( node );
    1398           0 :         else if( connection->isMulticast( ))
    1399           0 :             node->_removeMulticast( connection );
    1400             :     }
    1401             : 
    1402          34 :     _removeConnection( connection );
    1403          34 : }
    1404             : 
    1405       72213 : bool LocalNode::_handleData()
    1406             : {
    1407       72213 :     _impl->smallBuffers.compact();
    1408       72213 :     _impl->bigBuffers.compact();
    1409             : 
    1410       72213 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1411       72213 :     LBASSERT( connection );
    1412             : 
    1413      144426 :     BufferPtr buffer = _readHead( connection );
    1414       72213 :     if( !buffer ) // fluke signal
    1415          68 :         return false;
    1416             : 
    1417      144290 :     ICommand command = _setupCommand( connection, buffer );
    1418       72145 :     const bool gotCommand = _readTail( command, buffer, connection );
    1419       72145 :     LBASSERT( gotCommand );
    1420             : 
    1421             :     // start next receive
    1422      144290 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1423       72145 :     connection->recvNB( nextBuffer, COMMAND_MINSIZE );
    1424             : 
    1425       72145 :     if( gotCommand )
    1426             :     {
    1427       72145 :         _dispatchCommand( command );
    1428       72145 :         return true;
    1429             :     }
    1430             : 
    1431           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1432       72213 :     return false;
    1433             : }
    1434             : 
    1435       72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
    1436             : {
    1437       72213 :     BufferPtr buffer;
    1438       72213 :     const bool gotSize = connection->recvSync( buffer, false );
    1439             : 
    1440       72213 :     if( !buffer ) // fluke signal
    1441             :     {
    1442           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1443           0 :                << std::endl;
    1444           0 :         _impl->incoming.setDirty();
    1445           0 :         return 0;
    1446             :     }
    1447             : 
    1448       72213 :     if( gotSize )
    1449       72145 :         return buffer;
    1450             : 
    1451             :     // Some systems signal data on dead connections.
    1452          68 :     buffer->setSize( 0 );
    1453          68 :     connection->recvNB( buffer, COMMAND_MINSIZE );
    1454          68 :     return 0;
    1455             : }
    1456             : 
    1457       72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
    1458             :                                    ConstBufferPtr buffer )
    1459             : {
    1460       72145 :     NodePtr node;
    1461       72145 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
    1462       72145 :     if( i != _impl->connectionNodes.end( ))
    1463       72077 :         node = i->second;
    1464       72145 :     LBVERB << "Handle data from " << node << std::endl;
    1465             : 
    1466             : #ifdef COLLAGE_BIGENDIAN
    1467             :     const bool swapping = node ? !node->isBigEndian() : false;
    1468             : #else
    1469       72145 :     const bool swapping = node ? node->isBigEndian() : false;
    1470             : #endif
    1471      144290 :     ICommand command( this, node, buffer, swapping );
    1472             : 
    1473       72145 :     if( node )
    1474             :     {
    1475       72077 :         node->_setLastReceive( getTime64( ));
    1476       72077 :         return command;
    1477             :     }
    1478             : 
    1479          68 :     uint32_t cmd = command.getCommand();
    1480             : #ifdef COLLAGE_BIGENDIAN
    1481             :     lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
    1482             : #endif
    1483          68 :     switch( cmd )
    1484             :     {
    1485             :     case CMD_NODE_CONNECT:
    1486             :     case CMD_NODE_CONNECT_REPLY:
    1487             :     case CMD_NODE_ID:
    1488             : #ifdef COLLAGE_BIGENDIAN
    1489             :         command = ICommand( this, node, buffer, true );
    1490             : #endif
    1491          68 :         break;
    1492             : 
    1493             :     case CMD_NODE_CONNECT_BE:
    1494             :     case CMD_NODE_CONNECT_REPLY_BE:
    1495             :     case CMD_NODE_ID_BE:
    1496             : #ifndef COLLAGE_BIGENDIAN
    1497           0 :         command = ICommand( this, node, buffer, true );
    1498             : #endif
    1499           0 :         break;
    1500             : 
    1501             :     default:
    1502           0 :         LBUNIMPLEMENTED;
    1503           0 :         return ICommand();
    1504             :     }
    1505             : 
    1506          68 :     command.setCommand( cmd ); // reset correctly swapped version
    1507       72213 :     return command;
    1508             : }
    1509             : 
    1510       72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
    1511             :                            ConnectionPtr connection )
    1512             : {
    1513       72145 :     const uint64_t needed = command.getSize();
    1514       72145 :     if( needed <= buffer->getSize( ))
    1515       62134 :         return true;
    1516             : 
    1517       10011 :     if( needed > buffer->getMaxSize( ))
    1518             :     {
    1519           0 :         LBASSERT( needed > COMMAND_ALLOCSIZE );
    1520           0 :         LBASSERTINFO( needed < LB_BIT48,
    1521             :                       "Out-of-sync network stream: " << command << "?" );
    1522             :         // not enough space for remaining data, alloc and copy to new buffer
    1523           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
    1524           0 :         newBuffer->replace( *buffer );
    1525           0 :         buffer = newBuffer;
    1526             : 
    1527           0 :         command = ICommand( this, command.getRemoteNode(), buffer,
    1528           0 :                             command.isSwapping( ));
    1529             :     }
    1530             : 
    1531             :     // read remaining data
    1532       10011 :     connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
    1533       10011 :     return connection->recvSync( buffer );
    1534             : }
    1535             : 
    1536          35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
    1537             : {
    1538          35 :     LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
    1539             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
    1540             :         _impl->bigBuffers.alloc( size ) :
    1541          35 :         _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1542          35 :     return buffer;
    1543             : }
    1544             : 
    1545       72194 : void LocalNode::_dispatchCommand( ICommand& command )
    1546             : {
    1547       72194 :     LBASSERTINFO( command.isValid(), command );
    1548             : 
    1549       72194 :     if( dispatchCommand( command ))
    1550       72194 :         _redispatchCommands();
    1551             :     else
    1552             :     {
    1553           0 :         _redispatchCommands();
    1554           0 :         _impl->pendingCommands.push_back( command );
    1555             :     }
    1556       72194 : }
    1557             : 
    1558       72231 : bool LocalNode::dispatchCommand( ICommand& command )
    1559             : {
    1560       72231 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1561       72231 :     LBASSERTINFO( command.isValid(), command );
    1562             : 
    1563       72231 :     const uint32_t type = command.getType();
    1564       72231 :     switch( type )
    1565             :     {
    1566             :         case COMMANDTYPE_NODE:
    1567       71726 :             LBCHECK( Dispatcher::dispatchCommand( command ));
    1568       71726 :             return true;
    1569             : 
    1570             :         case COMMANDTYPE_OBJECT:
    1571         505 :             return _impl->objectStore->dispatchObjectCommand( command );
    1572             : 
    1573             :         default:
    1574           0 :             LBABORT( "Unknown command type " << type << " for " << command );
    1575           0 :             return true;
    1576             :     }
    1577             : }
    1578             : 
    1579       72499 : void LocalNode::_redispatchCommands()
    1580             : {
    1581       72499 :     bool changes = true;
    1582      144998 :     while( changes && !_impl->pendingCommands.empty( ))
    1583             :     {
    1584           0 :         changes = false;
    1585             : 
    1586           0 :         for( CommandList::iterator i = _impl->pendingCommands.begin();
    1587           0 :              i != _impl->pendingCommands.end(); ++i )
    1588             :         {
    1589           0 :             ICommand& command = *i;
    1590           0 :             LBASSERT( command.isValid( ));
    1591             : 
    1592           0 :             if( dispatchCommand( command ))
    1593             :             {
    1594           0 :                 _impl->pendingCommands.erase( i );
    1595           0 :                 changes = true;
    1596           0 :                 break;
    1597             :             }
    1598             :         }
    1599             :     }
    1600             : 
    1601             : #ifndef NDEBUG
    1602       72499 :     if( !_impl->pendingCommands.empty( ))
    1603           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1604           0 :                << std::endl;
    1605       72499 :     LBASSERT( _impl->pendingCommands.size() < 200 );
    1606             : #endif
    1607       72500 : }
    1608             : 
    1609          50 : void LocalNode::_initService()
    1610             : {
    1611          50 :     LB_TS_SCOPED( _rcvThread );
    1612          50 :     _impl->service->withdraw(); // go silent during k/v update
    1613             : 
    1614          97 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1615          50 :     if( descs.empty( ))
    1616          53 :         return;
    1617             : 
    1618          94 :     std::ostringstream out;
    1619          47 :     out << getType();
    1620          47 :     _impl->service->set( "co_type", out.str( ));
    1621             : 
    1622          47 :     out.str("");
    1623          47 :     out << descs.size();
    1624          47 :     _impl->service->set( "co_numPorts", out.str( ));
    1625             : 
    1626          94 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
    1627             :     {
    1628          47 :         ConnectionDescriptionPtr desc = *i;
    1629          47 :         out.str("");
    1630          47 :         out << "co_port" << i - descs.begin();
    1631          47 :         _impl->service->set( out.str(), desc->toString( ));
    1632          47 :     }
    1633             : 
    1634          94 :     _impl->service->announce( descs.front()->port, getNodeID().getString( ));
    1635             : }
    1636             : 
    1637          49 : void LocalNode::_exitService()
    1638             : {
    1639          49 :     _impl->service->withdraw();
    1640          49 : }
    1641             : 
    1642           1 : Zeroconf LocalNode::getZeroconf()
    1643             : {
    1644           1 :     lunchbox::ScopedWrite mutex( _impl->service );
    1645           1 :     _impl->service->discover( servus::Servus::IF_ALL, 500 );
    1646           1 :     return Zeroconf( _impl->service.data );
    1647             : }
    1648             : 
    1649             : 
    1650             : //----------------------------------------------------------------------
    1651             : // command thread functions
    1652             : //----------------------------------------------------------------------
    1653          50 : bool LocalNode::_startCommandThread( const int32_t threadID )
    1654             : {
    1655          50 :     _impl->commandThread->threadID = threadID;
    1656          50 :     return _impl->commandThread->start();
    1657             : }
    1658             : 
    1659       51608 : bool LocalNode::_notifyCommandThreadIdle()
    1660             : {
    1661       51608 :     return _impl->objectStore->notifyCommandThreadIdle();
    1662             : }
    1663             : 
    1664       20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
    1665             : {
    1666       20001 :     const uint32_t requestID = command.get< uint32_t >();
    1667       20001 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
    1668             : 
    1669       20001 :     serveRequest( requestID );
    1670       20001 :     return true;
    1671             : }
    1672             : 
    1673          49 : bool LocalNode::_cmdStopRcv( ICommand& command )
    1674             : {
    1675          49 :     LB_TS_THREAD( _rcvThread );
    1676          49 :     LBASSERT( isListening( ));
    1677             : 
    1678          49 :     _exitService();
    1679          49 :     _setClosing(); // causes rcv thread exit
    1680             : 
    1681          49 :     command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
    1682          49 :     _dispatchCommand( command );
    1683          49 :     return true;
    1684             : }
    1685             : 
    1686          49 : bool LocalNode::_cmdStopCmd( ICommand& )
    1687             : {
    1688          49 :     LB_TS_THREAD( _cmdThread );
    1689          49 :     LBASSERTINFO( isClosing(), *this );
    1690             : 
    1691          49 :     _setClosed();
    1692          49 :     return true;
    1693             : }
    1694             : 
    1695           0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
    1696             : {
    1697           0 :     const int32_t affinity = command.get< int32_t >();
    1698             : 
    1699           0 :     lunchbox::Thread::setAffinity( affinity );
    1700           0 :     return true;
    1701             : }
    1702             : 
    1703          34 : bool LocalNode::_cmdConnect( ICommand& command )
    1704             : {
    1705          34 :     LBASSERTINFO( !command.getRemoteNode(), command );
    1706          34 :     LBASSERT( _impl->inReceiverThread( ));
    1707             : 
    1708          34 :     const NodeID& nodeID = command.get< NodeID >();
    1709          34 :     const uint32_t requestID = command.get< uint32_t >();
    1710          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1711          34 :     std::string data = command.get< std::string >();
    1712             : 
    1713          34 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1714          34 :            << nodeType << " data " << data << std::endl;
    1715             : 
    1716          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1717             : 
    1718          34 :     LBASSERT( connection );
    1719          34 :     LBASSERT( nodeID != getNodeID() );
    1720          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1721             :               _impl->connectionNodes.end( ));
    1722             : 
    1723          68 :     NodePtr peer;
    1724             : #ifdef COLLAGE_BIGENDIAN
    1725             :     uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
    1726             :     lunchbox::byteswap( cmd );
    1727             : #else
    1728          34 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1729             : #endif
    1730             : 
    1731             :     // No locking needed, only recv thread modifies
    1732          34 :     NodeHashCIter i = _impl->nodes->find( nodeID );
    1733          34 :     if( i != _impl->nodes->end( ))
    1734             :     {
    1735           0 :         peer = i->second;
    1736           0 :         if( peer->isReachable( ))
    1737             :         {
    1738             :             // Node exists, probably simultaneous connect from peer
    1739           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1740           0 :                    << std::endl;
    1741             : 
    1742             :             // refuse connection
    1743             :             OCommand( Connections( 1, connection ), cmd )
    1744           0 :                 << NodeID() << requestID;
    1745             : 
    1746             :             // NOTE: There is no close() here. The reply command above has to be
    1747             :             // received by the peer first, before closing the connection.
    1748           0 :             _removeConnection( connection );
    1749           0 :             return true;
    1750             :         }
    1751             :     }
    1752             : 
    1753             :     // create and add connected node
    1754          34 :     if( !peer )
    1755          34 :         peer = createNode( nodeType );
    1756          34 :     if( !peer )
    1757             :     {
    1758           0 :         LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
    1759           0 :                << std::endl;
    1760             : 
    1761             :         // refuse connection
    1762           0 :         OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
    1763             : 
    1764             :         // NOTE: There is no close() here. The reply command above has to be
    1765             :         // received by the peer first, before closing the connection.
    1766           0 :         _removeConnection( connection );
    1767           0 :         return true;
    1768             :     }
    1769             : 
    1770          34 :     if( !peer->deserialize( data ))
    1771           0 :         LBWARN << "Error during node initialization" << std::endl;
    1772          34 :     LBASSERTINFO( data.empty(), data );
    1773          34 :     LBASSERTINFO( peer->getNodeID() == nodeID,
    1774             :                   peer->getNodeID() << "!=" << nodeID );
    1775          34 :     LBASSERT( peer->getType() == nodeType );
    1776             : 
    1777          34 :     _impl->connectionNodes[ connection ] = peer;
    1778             :     {
    1779          34 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1780          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1781             :     }
    1782          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1783             : 
    1784             :     // send our information as reply
    1785             :     OCommand( Connections( 1, connection ), cmd )
    1786          34 :         << getNodeID() << requestID << getType() << serialize();
    1787             : 
    1788          68 :     return true;
    1789             : }
    1790             : 
    1791          34 : bool LocalNode::_cmdConnectReply( ICommand& command )
    1792             : {
    1793          34 :     LBASSERT( !command.getRemoteNode( ));
    1794          34 :     LBASSERT( _impl->inReceiverThread( ));
    1795             : 
    1796          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1797          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1798             :               _impl->connectionNodes.end( ));
    1799             : 
    1800          34 :     const NodeID& nodeID = command.get< NodeID >();
    1801          34 :     const uint32_t requestID = command.get< uint32_t >();
    1802             : 
    1803             :     // connection refused
    1804          34 :     if( nodeID == 0 )
    1805             :     {
    1806           0 :         LBINFO << "Connection refused, node already connected by peer"
    1807           0 :                << std::endl;
    1808             : 
    1809           0 :         _removeConnection( connection );
    1810           0 :         serveRequest( requestID, false );
    1811           0 :         return true;
    1812             :     }
    1813             : 
    1814          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1815          68 :     std::string data = command.get< std::string >();
    1816             : 
    1817          34 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1818          34 :            << " type " << nodeType << " data " << data << std::endl;
    1819             : 
    1820             :     // No locking needed, only recv thread modifies
    1821          34 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1822          68 :     NodePtr peer;
    1823          34 :     if( i != _impl->nodes->end( ))
    1824           0 :         peer = i->second;
    1825             : 
    1826          34 :     if( peer && peer->isReachable( )) // simultaneous connect
    1827             :     {
    1828           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1829           0 :                << connection << std::endl;
    1830             : 
    1831           0 :         _removeConnection( connection );
    1832           0 :         _closeNode( peer );
    1833           0 :         serveRequest( requestID, false );
    1834           0 :         return true;
    1835             :     }
    1836             : 
    1837             :     // create and add node
    1838          34 :     if( !peer )
    1839             :     {
    1840          34 :         if( requestID != LB_UNDEFINED_UINT32 )
    1841          34 :             peer = reinterpret_cast< Node* >( getRequestData( requestID ));
    1842             :         else
    1843           0 :             peer = createNode( nodeType );
    1844             :     }
    1845          34 :     if( !peer )
    1846             :     {
    1847           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1848           0 :                << std::endl;
    1849           0 :         _removeConnection( connection );
    1850           0 :         return true;
    1851             :     }
    1852             : 
    1853          34 :     LBASSERTINFO( peer->getType() == nodeType,
    1854             :                   peer->getType() << " != " << nodeType );
    1855          34 :     LBASSERT( peer->isClosed( ));
    1856             : 
    1857          34 :     if( !peer->deserialize( data ))
    1858           0 :         LBWARN << "Error during node initialization" << std::endl;
    1859          34 :     LBASSERT( data.empty( ));
    1860          34 :     LBASSERT( peer->getNodeID() == nodeID );
    1861             : 
    1862             :     // send ACK to peer
    1863             :     // cppcheck-suppress unusedScopedObject
    1864          34 :     OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
    1865             : 
    1866          34 :     peer->_connect( connection );
    1867          34 :     _impl->connectionNodes[ connection ] = peer;
    1868             :     {
    1869          34 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1870          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1871             :     }
    1872          34 :     _connectMulticast( peer );
    1873          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1874             : 
    1875          34 :     serveRequest( requestID, true );
    1876             : 
    1877          34 :     notifyConnect( peer );
    1878          68 :     return true;
    1879             : }
    1880             : 
    1881          34 : bool LocalNode::_cmdConnectAck( ICommand& command )
    1882             : {
    1883          34 :     NodePtr node = command.getRemoteNode();
    1884          34 :     LBASSERT( node );
    1885          34 :     LBASSERT( _impl->inReceiverThread( ));
    1886          34 :     LBVERB << "handle connect ack" << std::endl;
    1887             : 
    1888          34 :     node->_connect( _impl->incoming.getConnection( ));
    1889          34 :     _connectMulticast( node );
    1890          34 :     notifyConnect( node );
    1891          34 :     return true;
    1892             : }
    1893             : 
    1894           0 : bool LocalNode::_cmdID( ICommand& command )
    1895             : {
    1896           0 :     LBASSERT( _impl->inReceiverThread( ));
    1897             : 
    1898           0 :     const NodeID& nodeID = command.get< NodeID >();
    1899           0 :     uint32_t nodeType = command.get< uint32_t >();
    1900           0 :     std::string data = command.get< std::string >();
    1901             : 
    1902           0 :     if( command.getRemoteNode( ))
    1903             :     {
    1904           0 :         LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
    1905           0 :         LBASSERT( command.getRemoteNode()->_getMulticast( ));
    1906           0 :         return true;
    1907             :     }
    1908             : 
    1909           0 :     LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
    1910             : 
    1911           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1912           0 :     LBASSERT( connection->isMulticast( ));
    1913           0 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1914             :               _impl->connectionNodes.end( ));
    1915             : 
    1916           0 :     NodePtr node;
    1917           0 :     if( nodeID == getNodeID() ) // 'self' multicast connection
    1918           0 :         node = this;
    1919             :     else
    1920             :     {
    1921             :         // No locking needed, only recv thread writes
    1922           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1923             : 
    1924           0 :         if( i == _impl->nodes->end( ))
    1925             :         {
    1926             :             // unknown node: create and add unconnected node
    1927           0 :             node = createNode( nodeType );
    1928             : 
    1929           0 :             if( !node->deserialize( data ))
    1930           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1931           0 :             LBASSERTINFO( data.empty(), data );
    1932             : 
    1933             :             {
    1934           0 :                 lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1935           0 :                 _impl->nodes.data[ nodeID ] = node;
    1936             :             }
    1937           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1938           0 :                    << connection << std::endl;
    1939             :         }
    1940             :         else
    1941           0 :             node = i->second;
    1942             :     }
    1943           0 :     LBASSERT( node );
    1944           0 :     LBASSERTINFO( node->getNodeID() == nodeID,
    1945             :                   node->getNodeID() << "!=" << nodeID );
    1946             : 
    1947           0 :     _connectMulticast( node, connection );
    1948           0 :     _impl->connectionNodes[ connection ] = node;
    1949           0 :     LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
    1950           0 :            << " to " << getNodeID() << std::endl;
    1951           0 :     return true;
    1952             : }
    1953             : 
    1954           8 : bool LocalNode::_cmdDisconnect( ICommand& command )
    1955             : {
    1956           8 :     LBASSERT( _impl->inReceiverThread( ));
    1957             : 
    1958           8 :     const uint32_t requestID = command.get< uint32_t >();
    1959             : 
    1960           8 :     NodePtr node = static_cast<Node*>( getRequestData( requestID ));
    1961           8 :     LBASSERT( node );
    1962             : 
    1963           8 :     _closeNode( node );
    1964           8 :     LBASSERT( node->isClosed( ));
    1965           8 :     serveRequest( requestID );
    1966           8 :     return true;
    1967             : }
    1968             : 
    1969           0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
    1970             : {
    1971           0 :     const NodeID& nodeID = command.get< NodeID >();
    1972           0 :     const uint32_t requestID = command.get< uint32_t >();
    1973             : 
    1974           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1975           0 :            << " nodeID " << nodeID << std::endl;
    1976             : 
    1977           0 :     NodePtr node = getNode( nodeID );
    1978           0 :     NodePtr toNode = command.getRemoteNode();
    1979             : 
    1980           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1981           0 :     std::string nodeData;
    1982           0 :     if( node )
    1983             :     {
    1984           0 :         nodeType = node->getType();
    1985           0 :         nodeData = node->serialize();
    1986           0 :         LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
    1987           0 :                << toNode << std::endl;
    1988             :     }
    1989             : 
    1990             :     toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
    1991           0 :         << nodeID << requestID << nodeType << nodeData;
    1992           0 :     return true;
    1993             : }
    1994             : 
    1995           0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
    1996             : {
    1997           0 :     LBASSERT( _impl->inReceiverThread( ));
    1998             : 
    1999           0 :     const NodeID& nodeID = command.get< NodeID >();
    2000           0 :     const uint32_t requestID = command.get< uint32_t >();
    2001           0 :     const uint32_t nodeType = command.get< uint32_t >();
    2002           0 :     std::string nodeData = command.get< std::string >();
    2003             : 
    2004           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    2005           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    2006             : 
    2007             :     // No locking needed, only recv thread writes
    2008           0 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    2009           0 :     if( i != _impl->nodes->end( ))
    2010             :     {
    2011             :         // Requested node connected to us in the meantime
    2012           0 :         NodePtr node = i->second;
    2013             : 
    2014           0 :         node->ref( this );
    2015           0 :         serveRequest( requestID, node.get( ));
    2016           0 :         return true;
    2017             :     }
    2018             : 
    2019           0 :     if( nodeType == NODETYPE_INVALID )
    2020             :     {
    2021           0 :         serveRequest( requestID, (void*)0 );
    2022           0 :         return true;
    2023             :     }
    2024             : 
    2025             :     // new node: create and add unconnected node
    2026           0 :     NodePtr node = createNode( nodeType );
    2027           0 :     if( node )
    2028             :     {
    2029           0 :         LBASSERT( node );
    2030             : 
    2031           0 :         if( !node->deserialize( nodeData ))
    2032           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    2033           0 :         LBASSERT( nodeData.empty( ));
    2034           0 :         node->ref( this );
    2035             :     }
    2036             :     else
    2037           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    2038             : 
    2039           0 :     serveRequest( requestID, node.get( ));
    2040           0 :     return true;
    2041             : }
    2042             : 
    2043           0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
    2044             : {
    2045           0 :     LBASSERT( inCommandThread( ));
    2046           0 :     if( !_impl->sendToken ) // enqueue command if no token available
    2047             :     {
    2048           0 :         const uint32_t timeout = Global::getTimeout();
    2049           0 :         if( timeout == LB_TIMEOUT_INDEFINITE ||
    2050           0 :             ( getTime64() - _impl->lastSendToken <= timeout ))
    2051             :         {
    2052           0 :             _impl->sendTokenQueue.push_back( command );
    2053           0 :             return true;
    2054             :         }
    2055             : 
    2056             :         // timeout! - clear old requests
    2057           0 :         _impl->sendTokenQueue.clear();
    2058             :         // 'generate' new token - release is robust
    2059             :     }
    2060             : 
    2061           0 :     _impl->sendToken = false;
    2062             : 
    2063           0 :     const uint32_t requestID = command.get< uint32_t >();
    2064           0 :     command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    2065           0 :             << requestID;
    2066           0 :     return true;
    2067             : }
    2068             : 
    2069           0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
    2070             : {
    2071           0 :     const uint32_t requestID = command.get< uint32_t >();
    2072           0 :     serveRequest( requestID );
    2073           0 :     return true;
    2074             : }
    2075             : 
    2076           0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
    2077             : {
    2078           0 :     LBASSERT( inCommandThread( ));
    2079           0 :     _impl->lastSendToken = getTime64();
    2080             : 
    2081           0 :     if( _impl->sendToken )
    2082           0 :         return true; // double release due to timeout
    2083           0 :     if( _impl->sendTokenQueue.empty( ))
    2084             :     {
    2085           0 :         _impl->sendToken = true;
    2086           0 :         return true;
    2087             :     }
    2088             : 
    2089           0 :     ICommand& request = _impl->sendTokenQueue.front();
    2090             : 
    2091           0 :     const uint32_t requestID = request.get< uint32_t >();
    2092           0 :     request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    2093           0 :             << requestID;
    2094           0 :     _impl->sendTokenQueue.pop_front();
    2095           0 :     return true;
    2096             : }
    2097             : 
    2098           0 : bool LocalNode::_cmdAddListener( ICommand& command )
    2099             : {
    2100             :     Connection* rawConnection =
    2101           0 :         reinterpret_cast< Connection* >( command.get< uint64_t >( ));
    2102           0 :     std::string data = command.get< std::string >();
    2103             : 
    2104           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2105           0 :     command.getRemoteNode()->_addConnectionDescription( description );
    2106             : 
    2107           0 :     if( command.getRemoteNode() != this )
    2108           0 :         return true;
    2109             : 
    2110           0 :     ConnectionPtr connection = rawConnection;
    2111           0 :     connection->unref();
    2112           0 :     LBASSERT( connection );
    2113             : 
    2114           0 :     _impl->connectionNodes[ connection ] = this;
    2115           0 :     if( connection->isMulticast( ))
    2116           0 :         _addMulticast( this, connection );
    2117             : 
    2118           0 :     connection->acceptNB();
    2119           0 :     _impl->incoming.addConnection( connection );
    2120             : 
    2121           0 :     _initService(); // update zeroconf
    2122           0 :     return true;
    2123             : }
    2124             : 
    2125           0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
    2126             : {
    2127           0 :     const uint32_t requestID = command.get< uint32_t >();
    2128           0 :     Connection* rawConnection = command.get< Connection* >();
    2129           0 :     std::string data = command.get< std::string >();
    2130             : 
    2131           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2132           0 :     LBCHECK(
    2133             :           command.getRemoteNode()->_removeConnectionDescription( description ));
    2134             : 
    2135           0 :     if( command.getRemoteNode() != this )
    2136           0 :         return true;
    2137             : 
    2138           0 :     _initService(); // update zeroconf
    2139             : 
    2140           0 :     ConnectionPtr connection = rawConnection;
    2141           0 :     connection->unref( this );
    2142           0 :     LBASSERT( connection );
    2143             : 
    2144           0 :     if( connection->isMulticast( ))
    2145           0 :         _removeMulticast( connection );
    2146             : 
    2147           0 :     _impl->incoming.removeConnection( connection );
    2148           0 :     LBASSERT( _impl->connectionNodes.find( connection ) !=
    2149             :               _impl->connectionNodes.end( ));
    2150           0 :     _impl->connectionNodes.erase( connection );
    2151           0 :     serveRequest( requestID );
    2152           0 :     return true;
    2153             : }
    2154             : 
    2155           0 : bool LocalNode::_cmdPing( ICommand& command )
    2156             : {
    2157           0 :     LBASSERT( inCommandThread( ));
    2158           0 :     command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
    2159           0 :     return true;
    2160             : }
    2161             : 
    2162           2 : bool LocalNode::_cmdCommand( ICommand& command )
    2163             : {
    2164           2 :     const uint128_t& commandID = command.get< uint128_t >();
    2165           2 :     CommandHandler func;
    2166             :     {
    2167           2 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2168           2 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2169           2 :         if( i == _impl->commandHandlers->end( ))
    2170           0 :             return false;
    2171             : 
    2172           2 :         CommandQueue* queue = i->second.second;
    2173           2 :         if( queue )
    2174             :         {
    2175             :             command.setDispatchFunction( CmdFunc( this,
    2176           1 :                                                 &LocalNode::_cmdCommandAsync ));
    2177           1 :             queue->push( command );
    2178           1 :             return true;
    2179             :         }
    2180             :         // else
    2181             : 
    2182           1 :         func = i->second.first;
    2183             :     }
    2184             : 
    2185           2 :     CustomICommand customCmd( command );
    2186           3 :     return func( customCmd );
    2187             : }
    2188             : 
    2189           1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
    2190             : {
    2191           1 :     const uint128_t& commandID = command.get< uint128_t >();
    2192           1 :     CommandHandler func;
    2193             :     {
    2194           1 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2195           1 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2196           1 :         LBASSERT( i != _impl->commandHandlers->end( ));
    2197           1 :         if( i ==  _impl->commandHandlers->end( ))
    2198           0 :             return true; // deregistered between dispatch and now
    2199           1 :         func = i->second.first;
    2200             :     }
    2201           2 :     CustomICommand customCmd( command );
    2202           2 :     return func( customCmd );
    2203             : }
    2204             : 
    2205          34 : bool LocalNode::_cmdAddConnection( ICommand& command )
    2206             : {
    2207          34 :     LBASSERT( _impl->inReceiverThread( ));
    2208             : 
    2209          34 :     ConnectionPtr connection = command.get< ConnectionPtr >();
    2210          34 :     _addConnection( connection );
    2211          34 :     connection->unref(); // ref'd by _addConnection
    2212          34 :     return true;
    2213             : }
    2214             : 
    2215          66 : }

Generated by: LCOV version 1.11