LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 606 1195 50.7 %
Date: 2016-12-14 01:26:48 Functions: 75 111 67.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/futureFunction.h>
      46             : #include <lunchbox/hash.h>
      47             : #include <lunchbox/launcher.h>
      48             : #include <lunchbox/lockable.h>
      49             : #include <lunchbox/request.h>
      50             : #include <lunchbox/rng.h>
      51             : #include <lunchbox/servus.h>
      52             : #include <lunchbox/sleep.h>
      53             : 
      54             : #include <boost/bind.hpp>
      55             : #include <boost/date_time/posix_time/posix_time.hpp>
      56             : #include <boost/lexical_cast.hpp>
      57             : 
      58             : #include <list>
      59             : #ifdef _MSC_VER
      60             : #  include <direct.h>  // for chdir
      61             : #  define chdir _chdir
      62             : #endif
      63             : 
      64             : namespace co
      65             : {
      66             : namespace
      67             : {
      68          22 : lunchbox::a_int32_t _threadIDs;
      69             : 
      70             : typedef CommandFunc< LocalNode > CmdFunc;
      71             : typedef std::list< ICommand > CommandList;
      72             : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
      73             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      74             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      75             : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
      76             : typedef NodeHash::const_iterator NodeHashCIter;
      77             : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
      78             : typedef HandlerHash::const_iterator HandlerHashCIter;
      79             : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
      80             : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
      81             : typedef CommandHash::const_iterator CommandHashCIter;
      82             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      83             : }
      84             : 
      85             : namespace detail
      86             : {
      87         106 : class ReceiverThread : public lunchbox::Thread
      88             : {
      89             : public:
      90          55 :     explicit ReceiverThread( co::LocalNode* localNode )
      91          55 :         : _localNode( localNode ) {}
      92             : 
      93          50 :     bool init() override
      94             :     {
      95          50 :         const int32_t threadID = ++_threadIDs - 1;
      96         100 :         setName( std::string( "Rcv" ) +
      97         150 :                  boost::lexical_cast< std::string >( threadID ));
      98          50 :         return _localNode->_startCommandThread( threadID );
      99             :     }
     100             : 
     101          50 :     void run() override { _localNode->_runReceiverThread(); }
     102             : 
     103             : private:
     104             :     co::LocalNode* const _localNode;
     105             : };
     106             : 
     107         106 : class CommandThread : public Worker
     108             : {
     109             : public:
     110          55 :     explicit CommandThread( co::LocalNode* localNode )
     111          55 :         : Worker( Global::getCommandQueueLimit( ))
     112             :         , threadID( 0 )
     113          55 :         , _localNode( localNode )
     114          55 :     {}
     115             : 
     116             :     int32_t threadID;
     117             : 
     118             : protected:
     119          50 :     bool init() override
     120             :     {
     121         100 :         setName( std::string( "Cmd" ) +
     122         150 :                  boost::lexical_cast< std::string >( threadID ));
     123          50 :         return true;
     124             :     }
     125             : 
     126      103343 :     bool stopRunning() override { return _localNode->isClosed(); }
     127       51574 :     bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
     128             : 
     129             : private:
     130             :     co::LocalNode* const _localNode;
     131             : };
     132             : 
     133             : class LocalNode
     134             : {
     135             : public:
     136          55 :     LocalNode()
     137          55 :         : smallBuffers( 200 )
     138             :         , bigBuffers( 20 )
     139             :         , sendToken( true )
     140             :         , lastSendToken( 0 )
     141             :         , objectStore( 0 )
     142             :         , receiverThread( 0 )
     143             :         , commandThread( 0 )
     144          55 :         , service( "_collage._tcp" )
     145          55 :     {}
     146             : 
     147          53 :     ~LocalNode()
     148          53 :     {
     149          53 :         LBASSERT( incoming.isEmpty( ));
     150          53 :         LBASSERT( connectionNodes.empty( ));
     151          53 :         LBASSERT( pendingCommands.empty( ));
     152          53 :         LBASSERT( nodes->empty( ));
     153             : 
     154          53 :         delete objectStore;
     155          53 :         objectStore = 0;
     156          53 :         LBASSERT( !commandThread->isRunning( ));
     157          53 :         delete commandThread;
     158          53 :         commandThread = 0;
     159             : 
     160          53 :         LBASSERT( !receiverThread->isRunning( ));
     161          53 :         delete receiverThread;
     162          53 :         receiverThread = 0;
     163          53 :     }
     164             : 
     165         280 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     166             : 
     167             :     /** Commands re-scheduled for dispatch. */
     168             :     CommandList pendingCommands;
     169             : 
     170             :     /** The command buffer 'allocator' for small packets */
     171             :     co::BufferCache smallBuffers;
     172             : 
     173             :     /** The command buffer 'allocator' for big packets */
     174             :     co::BufferCache bigBuffers;
     175             : 
     176             :     bool sendToken; //!< send token availability.
     177             :     uint64_t lastSendToken; //!< last used time for timeout detection
     178             :     std::deque< co::ICommand > sendTokenQueue; //!< pending requests
     179             : 
     180             :     /** Manager of distributed object */
     181             :     ObjectStore* objectStore;
     182             : 
     183             :     /** Needed for thread-safety during nodeID-based connect() */
     184             :     lunchbox::Lock connectLock;
     185             : 
     186             :     /** The node for each connection. */
     187             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     188             : 
     189             :     /** The connected nodes. */
     190             :     lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
     191             : 
     192             :     /** The connection set of all connections from/to this node. */
     193             :     co::ConnectionSet incoming;
     194             : 
     195             :     /** The process-global clock. */
     196             :     lunchbox::Clock clock;
     197             : 
     198             :     /** The registered push handlers. */
     199             :     lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
     200             : 
     201             :     /** The registered custom command handlers. */
     202             :     lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
     203             : 
     204             :     ReceiverThread* receiverThread;
     205             :     CommandThread* commandThread;
     206             : 
     207             :     lunchbox::Lockable< servus::Servus > service;
     208             : 
     209             :     a_ssize_t counters[ co::LocalNode::COUNTER_ALL ]; //!< Performance counters
     210             : 
     211             :     Strings args; //!< Command line arguments for remote node launch()
     212             : };
     213             : }
     214             : 
     215          55 : LocalNode::LocalNode( const uint32_t type )
     216             :     : Node( type )
     217          55 :     , _impl( new detail::LocalNode )
     218             : {
     219          55 :     _impl->receiverThread = new detail::ReceiverThread( this );
     220          55 :     _impl->commandThread  = new detail::CommandThread( this );
     221          55 :     _impl->objectStore = new ObjectStore( this, _impl->counters );
     222             : 
     223          55 :     CommandQueue* queue = getCommandThreadQueue();
     224          55 :     registerCommand( CMD_NODE_CONNECT,
     225         110 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     226          55 :     registerCommand( CMD_NODE_CONNECT_BE,
     227         110 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     228          55 :     registerCommand( CMD_NODE_CONNECT_REPLY,
     229         110 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     230          55 :     registerCommand( CMD_NODE_CONNECT_REPLY_BE,
     231         110 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     232          55 :     registerCommand( CMD_NODE_ID,
     233         110 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     234          55 :     registerCommand( CMD_NODE_ID_BE,
     235         110 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     236          55 :     registerCommand( CMD_NODE_ACK_REQUEST,
     237         110 :                      CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
     238          55 :     registerCommand( CMD_NODE_STOP_RCV,
     239         110 :                      CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
     240          55 :     registerCommand( CMD_NODE_STOP_CMD,
     241         110 :                      CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
     242          55 :     registerCommand( CMD_NODE_SET_AFFINITY_RCV,
     243         110 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
     244          55 :     registerCommand( CMD_NODE_SET_AFFINITY_CMD,
     245         110 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
     246          55 :     registerCommand( CMD_NODE_CONNECT_ACK,
     247         110 :                      CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
     248          55 :     registerCommand( CMD_NODE_DISCONNECT,
     249         110 :                      CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
     250          55 :     registerCommand( CMD_NODE_GET_NODE_DATA,
     251         110 :                      CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
     252          55 :     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
     253         110 :                      CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
     254          55 :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
     255         110 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
     256          55 :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     257         110 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
     258          55 :     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
     259         110 :                      CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
     260          55 :     registerCommand( CMD_NODE_ADD_LISTENER,
     261         110 :                      CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
     262          55 :     registerCommand( CMD_NODE_REMOVE_LISTENER,
     263         110 :                      CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
     264          55 :     registerCommand( CMD_NODE_PING,
     265         110 :                      CmdFunc( this, &LocalNode::_cmdPing ), queue );
     266          55 :     registerCommand( CMD_NODE_PING_REPLY,
     267         110 :                      CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
     268          55 :     registerCommand( CMD_NODE_COMMAND,
     269         110 :                      CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
     270          55 :     registerCommand( CMD_NODE_ADD_CONNECTION,
     271         110 :                      CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
     272          55 : }
     273             : 
     274         148 : LocalNode::~LocalNode( )
     275             : {
     276          53 :     LBASSERT( !hasPendingRequests( ));
     277          53 :     delete _impl;
     278          95 : }
     279             : 
     280           1 : bool LocalNode::initLocal( const int argc, char** argv )
     281             : {
     282           2 :     std::string setupOpts;
     283           1 :     _impl->args.clear();
     284             : 
     285             :     // We do not use getopt_long because it really does not work due to the
     286             :     // following aspects:
     287             :     // - reordering of arguments
     288             :     // - different behavior of GNU and BSD implementations
     289             :     // - incomplete man pages
     290           1 :     for( int i = 1; i < argc; ++i )
     291             :     {
     292           0 :         if( std::string( "--eq-listen" ) == argv[i] ||
     293           0 :             std::string( "--co-listen" ) == argv[i] )
     294             :         {
     295           0 :             if( std::string( "--eq-listen" ) == argv[i] )
     296           0 :                 LBWARN << "Deprecated --eq-listen, use --co-listen"
     297           0 :                        << std::endl;
     298             : 
     299           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     300             :             {
     301           0 :                 std::string data = argv[++i];
     302           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     303           0 :                 desc->port = Global::getDefaultPort();
     304             : 
     305           0 :                 if( desc->fromString( data ))
     306             :                 {
     307           0 :                     addConnectionDescription( desc );
     308           0 :                     LBASSERTINFO( data.empty(), data );
     309             :                 }
     310             :                 else
     311           0 :                     LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
     312             :             }
     313             :             else
     314             :             {
     315           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     316             :             }
     317             :         }
     318           0 :         else if( std::string( "--co-globals" ) == argv[i] )
     319             :         {
     320           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     321             :             {
     322           0 :                 const std::string data = argv[++i];
     323           0 :                 if( !Global::fromString( data ))
     324             :                 {
     325           0 :                     LBWARN << "Invalid global variables string: " << data
     326           0 :                            << ", using default global variables." << std::endl;
     327           0 :                 }
     328             :             }
     329             :             else
     330             :             {
     331           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     332             :             }
     333             :         }
     334           0 :         else if( std::string( "--co-launch" ) == argv[i] )
     335             :         {
     336           0 :             if( i == argc-1 || argv[i+1][0] == '-' )
     337           0 :                 LBTHROW( std::runtime_error( "Missing options to --co-launch"));
     338             : 
     339           0 :             setupOpts = argv[ ++i ];
     340           0 :             if( !deserialize( setupOpts ))
     341           0 :                 LBTHROW( std::runtime_error( "Corrupt --co-launch argument" ));
     342           0 :             LBASSERT( !setupOpts.empty( ));
     343             :         }
     344             :         else
     345           0 :             _impl->args.push_back( argv[ i ]);
     346             :     }
     347             : 
     348           1 :     if( !listen( ))
     349             :     {
     350           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
     351           0 :                << std::endl;
     352           0 :         return false;
     353             :     }
     354             : 
     355           1 :     if( !setupOpts.empty( ))
     356           0 :         return _setupPeer( setupOpts );
     357           1 :     return true;
     358             : }
     359             : 
     360          50 : bool LocalNode::listen()
     361             : {
     362          50 :     LBVERB << "Listener data: " << serialize() << std::endl;
     363          50 :     if( !isClosed() || !_connectSelf( ))
     364           0 :         return false;
     365             : 
     366         100 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     367         291 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     368         194 :          i != descriptions.end(); ++i )
     369             :     {
     370          94 :         ConnectionDescriptionPtr description = *i;
     371          94 :         ConnectionPtr connection = Connection::create( description );
     372             : 
     373          47 :         if( !connection || !connection->listen( ))
     374             :         {
     375           0 :             LBWARN << "Can't create listener connection: " << description
     376           0 :                    << std::endl;
     377           0 :             return false;
     378             :         }
     379             : 
     380          47 :         _impl->connectionNodes[ connection ] = this;
     381          47 :         if( connection->isMulticast( ))
     382           0 :             _addMulticast( this, connection );
     383             : 
     384          47 :         connection->acceptNB();
     385          47 :         _impl->incoming.addConnection( connection );
     386             : 
     387          94 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     388         141 :                << std::endl;
     389             :     }
     390             : 
     391         100 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     392         150 :            << std::endl;
     393             : 
     394          50 :     _setListening();
     395          50 :     _impl->receiverThread->start();
     396             : 
     397          50 :     LBDEBUG << *this << std::endl;
     398          50 :     return true;
     399             : }
     400             : 
     401          49 : bool LocalNode::close()
     402             : {
     403          49 :     if( !isListening() )
     404           0 :         return false;
     405             : 
     406          49 :     send( CMD_NODE_STOP_RCV );
     407             : 
     408          49 :     LBCHECK( _impl->receiverThread->join( ));
     409          49 :     _cleanup();
     410             : 
     411         147 :     LBDEBUG << _impl->incoming.getSize() << " connections open after close"
     412         147 :            << std::endl;
     413             : #ifndef NDEBUG
     414          49 :     const Connections& connections = _impl->incoming.getConnections();
     415         147 :     for( Connections::const_iterator i = connections.begin();
     416          98 :          i != connections.end(); ++i )
     417             :     {
     418           0 :         LBDEBUG << "    " << *i << std::endl;
     419             :     }
     420             : #endif
     421             : 
     422          49 :     LBASSERTINFO( !hasPendingRequests(),
     423             :                   *static_cast< lunchbox::RequestHandler* >( this ));
     424          49 :     return true;
     425             : }
     426             : 
     427           0 : void LocalNode::setAffinity( const int32_t affinity )
     428             : {
     429           0 :     send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
     430           0 :     send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
     431             : 
     432           0 :     lunchbox::Thread::setAffinity( affinity );
     433           0 : }
     434             : 
     435          34 : void LocalNode::addConnection( ConnectionPtr connection )
     436             : {
     437          34 :     connection->ref(); // unref in _cmdAddConnection
     438          34 :     send( CMD_NODE_ADD_CONNECTION ) << connection;
     439          34 : }
     440             : 
     441           0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
     442             : {
     443           0 :     LBASSERT( isListening( ));
     444             : 
     445           0 :     ConnectionPtr connection = Connection::create( desc );
     446           0 :     if( connection && connection->listen( ))
     447             :     {
     448           0 :         addListener( connection );
     449           0 :         return connection;
     450             :     }
     451             : 
     452           0 :     return 0;
     453             : }
     454             : 
     455           0 : void LocalNode::addListener( ConnectionPtr connection )
     456             : {
     457           0 :     LBASSERT( isListening( ));
     458           0 :     LBASSERT( connection->isListening( ));
     459           0 :     if( !isListening() || !connection->isListening( ))
     460           0 :         return;
     461             : 
     462           0 :     connection->ref( this ); // unref in self handler
     463             : 
     464             :     // Update everybody's description list of me, add the listener to myself in
     465             :     // my handler
     466           0 :     const Nodes& nodes = getNodes();
     467           0 :     for( NodePtr node : nodes )
     468           0 :         node->send( CMD_NODE_ADD_LISTENER )
     469           0 :             << (uint64_t)(connection.get( ))
     470           0 :             << connection->getDescription()->toString();
     471             : }
     472             : 
     473           0 : void LocalNode::removeListeners( const Connections& connections )
     474             : {
     475           0 :     std::vector< lunchbox::Request< void > > requests;
     476           0 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     477             :     {
     478           0 :         ConnectionPtr connection = *i;
     479           0 :         requests.push_back( _removeListener( connection ));
     480             :     }
     481           0 : }
     482             : 
     483           0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
     484             : {
     485           0 :     LBASSERT( isListening( ));
     486           0 :     LBASSERTINFO( !conn->isConnected(), conn );
     487             : 
     488           0 :     conn->ref( this );
     489           0 :     const lunchbox::Request< void > request = registerRequest< void >();
     490             : 
     491           0 :     const Nodes& nodes = getNodes();
     492           0 :     for( NodePtr node : nodes )
     493           0 :         node->send( CMD_NODE_REMOVE_LISTENER )
     494           0 :             << request << conn.get() << conn->getDescription()->toString();
     495           0 :     return request;
     496             : }
     497             : 
     498         152 : void LocalNode::_addConnection( ConnectionPtr connection )
     499             : {
     500         152 :     if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
     501             :     {
     502          34 :         addConnection( connection );
     503          34 :         return;
     504             :     }
     505             : 
     506         236 :     BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
     507         118 :     connection->recvNB( buffer, COMMAND_MINSIZE );
     508         118 :     _impl->incoming.addConnection( connection );
     509             : }
     510             : 
     511         224 : void LocalNode::_removeConnection( ConnectionPtr connection )
     512             : {
     513         224 :     LBASSERT( connection );
     514             : 
     515         224 :     _impl->incoming.removeConnection( connection );
     516         224 :     connection->resetRecvData();
     517         224 :     if( !connection->isClosed( ))
     518         130 :         connection->close(); // cancel pending IO's
     519         224 : }
     520             : 
     521          49 : void LocalNode::_cleanup()
     522             : {
     523          49 :     LBVERB << "Clean up stopped node" << std::endl;
     524          49 :     LBASSERTINFO( isClosed(), *this );
     525             : 
     526          49 :     if( !_impl->connectionNodes.empty( ))
     527         141 :         LBDEBUG << _impl->connectionNodes.size()
     528         141 :                << " open connections during cleanup" << std::endl;
     529             : #ifndef NDEBUG
     530         288 :     for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     531         192 :          i != _impl->connectionNodes.end(); ++i )
     532             :     {
     533          94 :         NodePtr node = i->second;
     534          47 :         LBDEBUG << "    " << i->first << " : " << node << std::endl;
     535             :     }
     536             : #endif
     537             : 
     538          49 :     _impl->connectionNodes.clear();
     539             : 
     540          49 :     if( !_impl->nodes->empty( ))
     541           6 :         LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
     542           6 :                << std::endl;
     543             : 
     544             : #ifndef NDEBUG
     545          51 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     546             :     {
     547           4 :         NodePtr node = i->second;
     548           2 :         LBDEBUG << "    " << node << std::endl;
     549             :     }
     550             : #endif
     551             : 
     552          49 :     _impl->nodes->clear();
     553          49 : }
     554             : 
     555         115 : void LocalNode::_closeNode( NodePtr node )
     556             : {
     557         230 :     ConnectionPtr connection = node->getConnection();
     558         230 :     ConnectionPtr mcConnection = node->_getMulticast();
     559             : 
     560         115 :     node->_disconnect();
     561             : 
     562         115 :     if( connection )
     563             :     {
     564          68 :         LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
     565             :                       _impl->connectionNodes.end(), connection );
     566             : 
     567          68 :         _removeConnection( connection );
     568          68 :         _impl->connectionNodes.erase( connection );
     569             :     }
     570             : 
     571         115 :     if( mcConnection )
     572             :     {
     573           0 :         _removeConnection( mcConnection );
     574           0 :         _impl->connectionNodes.erase( mcConnection );
     575             :     }
     576             : 
     577         115 :     _impl->objectStore->removeInstanceData( node->getNodeID( ));
     578             : 
     579         230 :     lunchbox::ScopedFastWrite mutex( _impl->nodes );
     580         115 :     _impl->nodes->erase( node->getNodeID( ));
     581         115 :     notifyDisconnect( node );
     582         115 :     LBDEBUG << node << " disconnected from " << *this << std::endl;
     583         115 : }
     584             : 
     585          50 : bool LocalNode::_connectSelf()
     586             : {
     587             :     // setup local connection to myself
     588         100 :     PipeConnectionPtr connection = new PipeConnection;
     589          50 :     if( !connection->connect( ))
     590             :     {
     591           0 :         LBERROR << "Could not create local connection to receiver thread."
     592           0 :                 << std::endl;
     593           0 :         return false;
     594             :     }
     595             : 
     596          50 :     Node::_connect( connection->getSibling( ));
     597          50 :     _setClosed(); // reset state after _connect set it to connected
     598             : 
     599             :     // add to connection set
     600          50 :     LBASSERT( connection->getDescription( ));
     601          50 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
     602             :               _impl->connectionNodes.end( ));
     603             : 
     604          50 :     _impl->connectionNodes[ connection ] = this;
     605          50 :     _impl->nodes.data[ getNodeID() ] = this;
     606          50 :     _addConnection( connection );
     607             : 
     608         100 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     609         150 :            << std::endl;
     610          50 :     return true;
     611             : }
     612             : 
     613           8 : bool LocalNode::disconnect( NodePtr node )
     614             : {
     615           8 :     if( !node || !isListening() )
     616           0 :         return false;
     617             : 
     618           8 :     if( !node->isConnected( ))
     619           0 :         return true;
     620             : 
     621           8 :     LBASSERT( !inCommandThread( ));
     622          16 :     lunchbox::Request< void > request = registerRequest< void >( node.get( ));
     623           8 :     send( CMD_NODE_DISCONNECT ) << request;
     624             : 
     625           8 :     request.wait();
     626           8 :     _impl->objectStore->removeNode( node );
     627           8 :     return true;
     628             : }
     629             : 
     630       20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
     631             : {
     632       20001 :     if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
     633           0 :         return;
     634             : 
     635       20001 :     if( node == this ) // OPT
     636           0 :         serveRequest( requestID );
     637             :     else
     638       20001 :         node->send( CMD_NODE_ACK_REQUEST ) << requestID;
     639             : }
     640             : 
     641           0 : void LocalNode::ping( NodePtr peer )
     642             : {
     643           0 :     LBASSERT( !_impl->inReceiverThread( ));
     644           0 :     peer->send( CMD_NODE_PING );
     645           0 : }
     646             : 
     647           0 : bool LocalNode::pingIdleNodes()
     648             : {
     649           0 :     LBASSERT( !_impl->inReceiverThread( ) );
     650           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     651           0 :     const Nodes& nodes = getNodes( false );
     652             : 
     653           0 :     bool pinged = false;
     654           0 :     for( NodePtr node : nodes )
     655             :     {
     656           0 :         if( getTime64() - node->getLastReceiveTime() > timeout )
     657             :         {
     658           0 :             LBINFO << " Ping Node: " <<  node->getNodeID() << " last seen "
     659           0 :                    << node->getLastReceiveTime() << std::endl;
     660           0 :             node->send( CMD_NODE_PING );
     661           0 :             pinged = true;
     662             :         }
     663             :     }
     664           0 :     return pinged;
     665             : }
     666             : 
     667             : //----------------------------------------------------------------------
     668             : // Object functionality
     669             : //----------------------------------------------------------------------
     670           0 : void LocalNode::disableInstanceCache()
     671             : {
     672           0 :     _impl->objectStore->disableInstanceCache();
     673           0 : }
     674             : 
     675           0 : void LocalNode::expireInstanceData( const int64_t age )
     676             : {
     677           0 :     _impl->objectStore->expireInstanceData( age );
     678           0 : }
     679             : 
     680           0 : void LocalNode::enableSendOnRegister()
     681             : {
     682           0 :     _impl->objectStore->enableSendOnRegister();
     683           0 : }
     684             : 
     685           0 : void LocalNode::disableSendOnRegister()
     686             : {
     687           0 :     _impl->objectStore->disableSendOnRegister();
     688           0 : }
     689             : 
     690          21 : bool LocalNode::registerObject( Object* object )
     691             : {
     692          21 :     return _impl->objectStore->register_( object );
     693             : }
     694             : 
     695          21 : void LocalNode::deregisterObject( Object* object )
     696             : {
     697          21 :     _impl->objectStore->deregister( object );
     698          21 : }
     699             : 
     700          40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
     701             :                                NodePtr master, const uint128_t& version )
     702             : {
     703          80 :     const uint32_t request = _impl->objectStore->mapNB( object, id, version,
     704          40 :                                                         master );
     705             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
     706          80 :                                                  _impl->objectStore, request );
     707          80 :     return f_bool_t( new FuturebImpl( func ));
     708             : }
     709             : 
     710           0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     711             :                                  const uint128_t& version )
     712             : {
     713           0 :     return _impl->objectStore->mapNB( object, id, version, 0 );
     714             : }
     715             : 
     716           2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     717             :                                  const uint128_t& version, NodePtr master )
     718             : {
     719           2 :     return _impl->objectStore->mapNB( object, id, version, master );
     720             : }
     721             : 
     722             : 
     723           2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
     724             : {
     725           2 :     return _impl->objectStore->mapSync( requestID );
     726             : }
     727             : 
     728           4 : f_bool_t LocalNode::syncObject( Object* object, const uint128_t& id,
     729             :                                 NodePtr master, const uint32_t instanceID )
     730             : {
     731           4 :     return _impl->objectStore->sync( object, id, master, instanceID );
     732             : }
     733             : 
     734          42 : void LocalNode::unmapObject( Object* object )
     735             : {
     736          42 :     _impl->objectStore->unmap( object );
     737          42 : }
     738             : 
     739           0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
     740             : {
     741           0 :     _impl->objectStore->swap( oldObject, newObject );
     742           0 : }
     743             : 
     744           0 : void LocalNode::objectPush( const uint128_t& groupID,
     745             :                             const uint128_t& objectType,
     746             :                             const uint128_t& objectID, DataIStream& istream )
     747             : {
     748           0 :     lunchbox::ScopedRead mutex( _impl->pushHandlers );
     749           0 :     HandlerHashCIter i = _impl->pushHandlers->find( groupID );
     750           0 :     if( i != _impl->pushHandlers->end( ))
     751           0 :         i->second( groupID, objectType, objectID, istream );
     752             :     else
     753           0 :         LBWARN << "No custom handler for push group " << groupID
     754           0 :                << " registered" << std::endl;
     755             : 
     756           0 :     if( istream.wasUsed() && istream.hasData( ))
     757           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     758           0 :                << objectType << " object " << objectID << std::endl;
     759           0 : }
     760             : 
     761           0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
     762             :                                      const PushHandler& handler )
     763             : {
     764           0 :     lunchbox::ScopedWrite mutex( _impl->pushHandlers );
     765           0 :     (*_impl->pushHandlers)[ groupID ] = handler;
     766           0 : }
     767             : 
     768           2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
     769             :                                         const CommandHandler& func,
     770             :                                         CommandQueue* queue )
     771             : {
     772           4 :     lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
     773           2 :     if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
     774             :     {
     775           0 :         LBWARN << "Already got a registered handler for custom command "
     776           0 :                << command << std::endl;
     777           0 :         return false;
     778             :     }
     779             : 
     780           4 :     _impl->commandHandlers->insert( std::make_pair( command,
     781           6 :                                                 std::make_pair( func, queue )));
     782           2 :     return true;
     783             : }
     784             : 
     785           0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
     786             : {
     787           0 :     LBASSERT( !inCommandThread( ));
     788           0 :     LBASSERT( !_impl->inReceiverThread( ));
     789             : 
     790           0 :     lunchbox::Request< void > request = registerRequest< void >();
     791           0 :     node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
     792             : 
     793             :     try
     794             :     {
     795           0 :         request.wait( Global::getTimeout( ));
     796             :     }
     797           0 :     catch( const lunchbox::FutureTimeout& )
     798             :     {
     799           0 :         LBERROR << "Timeout while acquiring send token " << request.getID()
     800           0 :                 << std::endl;
     801           0 :         request.unregister();
     802           0 :         return 0;
     803             :     }
     804           0 :     return new co::SendToken( node );
     805             : }
     806             : 
     807           0 : void LocalNode::releaseSendToken( SendToken token )
     808             : {
     809           0 :     LBASSERT( !_impl->inReceiverThread( ));
     810           0 :     if( token )
     811           0 :         token->release();
     812           0 : }
     813             : 
     814             : //----------------------------------------------------------------------
     815             : // Connecting a node
     816             : //----------------------------------------------------------------------
     817             : namespace
     818             : {
     819             : enum ConnectResult
     820             : {
     821             :     CONNECT_OK,
     822             :     CONNECT_TRY_AGAIN,
     823             :     CONNECT_BAD_STATE,
     824             :     CONNECT_TIMEOUT,
     825             :     CONNECT_UNREACHABLE
     826             : };
     827             : }
     828             : 
     829          74 : NodePtr LocalNode::connect( const NodeID& nodeID )
     830             : {
     831          74 :     LBASSERT( nodeID != 0 );
     832          74 :     LBASSERT( isListening( ));
     833             : 
     834             :     // Make sure that only one connection request based on the node identifier
     835             :     // is pending at a given time. Otherwise a node with the same id might be
     836             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     837             :     // mutex is to register connecting nodes with this local node, and handle
     838             :     // all cases correctly, which is far more complex. Node connections only
     839             :     // happen a lot during initialization, and are therefore not time-critical.
     840         148 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     841             : 
     842         148 :     Nodes nodes = getNodes();
     843          77 :     for( NodePtr peer : nodes )
     844             :     {
     845          77 :         if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
     846          74 :             return peer;
     847             :     }
     848             : 
     849           0 :     LBDEBUG << "Connecting node " << nodeID << std::endl;
     850           0 :     for( NodePtr peer : nodes )
     851             :     {
     852           0 :         NodePtr node = _connect( nodeID, peer );
     853           0 :         if( node )
     854           0 :             return node;
     855             :     }
     856             :     {
     857           0 :         NodePtr node = _connectFromZeroconf( nodeID );
     858           0 :         if( node )
     859           0 :             return node;
     860             :     }
     861             :     // check again if node connected by itself by now
     862           0 :     nodes = getNodes();
     863           0 :     for( NodePtr node : nodes )
     864           0 :         if( node->getNodeID() == nodeID && node->isReachable( ))
     865           0 :             return node;
     866             : 
     867           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     868           0 :     return 0;
     869             : }
     870             : 
     871           0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
     872             : {
     873           0 :     LBASSERT( nodeID != 0 );
     874             : 
     875           0 :     NodePtr node;
     876             :     {
     877           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     878           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     879           0 :         if( i != _impl->nodes->end( ))
     880           0 :             node = i->second;
     881             :     }
     882             : 
     883           0 :     LBASSERT( getNodeID() != nodeID );
     884           0 :     if( !node )
     885             :     {
     886           0 :         lunchbox::Request< void* > request = registerRequest< void* >();
     887           0 :         peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
     888           0 :         node = reinterpret_cast< Node* >( request.wait( ));
     889           0 :         if( !node )
     890             :         {
     891           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     892           0 :                    << std::endl;
     893           0 :             return 0;
     894             :         }
     895           0 :         node->unref( this ); // ref'd before serveRequest()
     896             :     }
     897             : 
     898           0 :     if( node->isReachable( ))
     899           0 :         return node;
     900             : 
     901           0 :     size_t tries = 10;
     902           0 :     while( --tries )
     903             :     {
     904           0 :         switch( _connect( node ))
     905             :         {
     906             :           case CONNECT_OK:
     907           0 :               return node;
     908             :           case CONNECT_TRY_AGAIN:
     909             :           {
     910           0 :               lunchbox::RNG rng;
     911             :               // collision avoidance
     912           0 :               lunchbox::sleep( rng.get< uint8_t >( ));
     913           0 :               break;
     914             :           }
     915             :           case CONNECT_BAD_STATE:
     916           0 :               LBWARN << "Internal connect error" << std::endl;
     917             :               // no break;
     918             :           case CONNECT_TIMEOUT:
     919           0 :               return 0;
     920             : 
     921             :           case CONNECT_UNREACHABLE:
     922           0 :               break; // maybe peer talks to us
     923             :         }
     924             : 
     925           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     926             :         // connect failed - check for simultaneous connect from peer
     927           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     928           0 :         if( i != _impl->nodes->end( ))
     929           0 :             node = i->second;
     930             :     }
     931             : 
     932           0 :     return node->isReachable() ? node : 0;
     933             : }
     934             : 
     935           0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
     936             : {
     937           0 :     lunchbox::ScopedWrite mutex( _impl->service );
     938             : 
     939             :     const Strings& instances =
     940           0 :         _impl->service->discover( servus::Servus::IF_ALL, 500 );
     941           0 :     for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
     942             :     {
     943           0 :         const std::string& instance = *i;
     944           0 :         const NodeID candidate( instance );
     945           0 :         if( candidate != nodeID )
     946           0 :             continue;
     947             : 
     948           0 :         const std::string& typeStr = _impl->service->get( instance, "co_type" );
     949           0 :         if( typeStr.empty( ))
     950           0 :             return 0;
     951             : 
     952           0 :         std::istringstream in( typeStr );
     953           0 :         uint32_t type = 0;
     954           0 :         in >> type;
     955             : 
     956           0 :         NodePtr node = createNode( type );
     957           0 :         if( !node )
     958             :         {
     959           0 :             LBINFO << "Can't create node of type " << type << std::endl;
     960           0 :             continue;
     961             :         }
     962             : 
     963           0 :         const std::string& numStr = _impl->service->get( instance,
     964           0 :                                                          "co_numPorts" );
     965           0 :         uint32_t num = 0;
     966             : 
     967           0 :         in.clear();
     968           0 :         in.str( numStr );
     969           0 :         in >> num;
     970           0 :         LBASSERT( num > 0 );
     971           0 :         for( size_t j = 0; j < num; ++j )
     972             :         {
     973           0 :             ConnectionDescriptionPtr desc = new ConnectionDescription;
     974           0 :             std::ostringstream out;
     975           0 :             out << "co_port" << j;
     976             : 
     977           0 :             std::string descStr = _impl->service->get( instance, out.str( ));
     978           0 :             LBASSERT( !descStr.empty( ));
     979           0 :             LBCHECK( desc->fromString( descStr ));
     980           0 :             LBASSERT( descStr.empty( ));
     981           0 :             node->addConnectionDescription( desc );
     982             :         }
     983           0 :         mutex.leave();
     984           0 :         if( _connect( node ))
     985           0 :             return node;
     986             :     }
     987           0 :     return 0;
     988             : }
     989             : 
     990          34 : bool LocalNode::connect( NodePtr node )
     991             : {
     992          68 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     993          68 :     return ( _connect( node ) == CONNECT_OK );
     994             : }
     995             : 
     996          34 : uint32_t LocalNode::_connect( NodePtr node )
     997             : {
     998          34 :     LBASSERTINFO( isListening(), *this );
     999          34 :     if( node->isReachable( ))
    1000           0 :         return CONNECT_OK;
    1001             : 
    1002          34 :     LBASSERT( node->isClosed( ));
    1003          34 :     LBDEBUG << "Connecting " << node << std::endl;
    1004             : 
    1005             :     // try connecting using the given descriptions
    1006          68 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
    1007         102 :     for( ConnectionDescriptionsCIter i = cds.begin();
    1008          68 :         i != cds.end(); ++i )
    1009             :     {
    1010          34 :         ConnectionDescriptionPtr description = *i;
    1011          34 :         if( description->type >= CONNECTIONTYPE_MULTICAST )
    1012           0 :             continue; // Don't use multicast for primary connections
    1013             : 
    1014          34 :         ConnectionPtr connection = Connection::create( description );
    1015          34 :         if( !connection || !connection->connect( ))
    1016           0 :             continue;
    1017             : 
    1018          34 :         return _connect( node, connection );
    1019             :     }
    1020             : 
    1021           0 :     LBDEBUG << "Node " << node
    1022           0 :            << " unreachable, all connections failed to connect" <<std::endl;
    1023           0 :     return CONNECT_UNREACHABLE;
    1024             : }
    1025             : 
    1026           0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
    1027             : {
    1028           0 :     return ( _connect( node, connection ) == CONNECT_OK );
    1029             : }
    1030             : 
    1031          34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
    1032             : {
    1033          34 :     LBASSERT( connection );
    1034          34 :     LBASSERT( node->getNodeID() != getNodeID( ));
    1035             : 
    1036          68 :     if( !node || !isListening() || !connection->isConnected() ||
    1037          34 :         !node->isClosed( ))
    1038             :     {
    1039           0 :         return CONNECT_BAD_STATE;
    1040             :     }
    1041             : 
    1042          34 :     _addConnection( connection );
    1043             : 
    1044             :     // send connect command to peer
    1045          68 :     lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
    1046             : #ifdef COLLAGE_BIGENDIAN
    1047             :     uint32_t cmd = CMD_NODE_CONNECT_BE;
    1048             :     lunchbox::byteswap( cmd );
    1049             : #else
    1050          34 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1051             : #endif
    1052          68 :     OCommand( Connections( 1, connection ), cmd )
    1053         102 :         << getNodeID() << request << getType() << serialize();
    1054             : 
    1055          34 :     bool connected = false;
    1056             :     try
    1057             :     {
    1058          34 :         connected = request.wait( 10000 /*ms*/ );
    1059             :     }
    1060           0 :     catch( const lunchbox::FutureTimeout& )
    1061             :     {
    1062           0 :         LBWARN << "Node connection handshake timeout - " << node
    1063           0 :                << " not a Collage node?" << std::endl;
    1064           0 :         request.unregister();
    1065           0 :         return CONNECT_TIMEOUT;
    1066             :     }
    1067             : 
    1068             :     // In simultaneous connect case, depending on the connection type
    1069             :     // (e.g. RDMA), a check on the connection state of the node is required
    1070          34 :     if( !connected || !node->isConnected( ))
    1071           0 :         return CONNECT_TRY_AGAIN;
    1072             : 
    1073          34 :     LBASSERT( node->getNodeID() != 0 );
    1074          34 :     LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
    1075          34 :     LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
    1076          34 :     return CONNECT_OK;
    1077             : }
    1078             : 
    1079          40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
    1080             : {
    1081          40 :     LBASSERTINFO( id.isUUID(), id );
    1082          40 :     if( !id.isUUID( ))
    1083             :     {
    1084           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1085           0 :         return 0;
    1086             :     }
    1087             : 
    1088          40 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
    1089          40 :     if( masterNodeID == 0 )
    1090             :     {
    1091           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1092           0 :         return 0;
    1093             :     }
    1094             : 
    1095          80 :     NodePtr master = connect( masterNodeID );
    1096          40 :     if( master && !master->isClosed( ))
    1097          40 :         return master;
    1098             : 
    1099           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1100           0 :            << " for object " << id << std::endl;
    1101           0 :     return 0;
    1102             : }
    1103             : 
    1104           0 : bool LocalNode::launch( NodePtr node, const std::string& command )
    1105             : {
    1106           0 :     size_t lastPos = 0;
    1107           0 :     std::string cmd;
    1108             : 
    1109           0 :     const std::string& quote = node->getLaunchQuote();
    1110             :     std::string launchOptions =
    1111           0 :         std::string( " --co-launch " ) + quote + node->serialize() +
    1112           0 :         node->getWorkDir() + CO_SEPARATOR + std::to_string( getType( )) +
    1113           0 :         CO_SEPARATOR + serialize() + quote + " --co-globals " + quote +
    1114           0 :         co::Global::toString() + quote;
    1115             : 
    1116           0 :     for( size_t percentPos = command.find( '%' );
    1117           0 :          percentPos != std::string::npos;
    1118           0 :          percentPos = command.find( '%', percentPos + 1 ))
    1119             :     {
    1120           0 :         std::string replacement;
    1121           0 :         switch( command[percentPos+1] )
    1122             :         {
    1123             :         case 'h':
    1124           0 :             replacement = node->getHostname();
    1125           0 :             if( replacement.empty( ))
    1126           0 :                 replacement = "127.0.0.1";
    1127           0 :             break;
    1128             : 
    1129             :         case 'n':
    1130           0 :             replacement = node->getNodeID().getString();
    1131           0 :             break;
    1132             : 
    1133             :         case 'd':
    1134           0 :             replacement = node->getWorkDir();
    1135           0 :             break;
    1136             : 
    1137             :         case 'q':
    1138           0 :             replacement = quote;
    1139           0 :             break;
    1140             : 
    1141             :         case 'o':
    1142           0 :             replacement = launchOptions;
    1143           0 :             launchOptions.clear();
    1144           0 :             break;
    1145             : 
    1146             :         default:
    1147           0 :             LBWARN << "Unknown token " << command[percentPos+1] << std::endl;
    1148           0 :             replacement = command.substr( percentPos, percentPos+1 );
    1149             :         }
    1150             : 
    1151           0 :         cmd += command.substr( lastPos, percentPos - lastPos ) + replacement;
    1152           0 :         lastPos = percentPos + 2;
    1153             :     }
    1154             : 
    1155           0 :     cmd += command.substr( lastPos ) + launchOptions;
    1156             : 
    1157           0 :     LBDEBUG << "Launching: " << cmd << std::endl;
    1158           0 :     if( lunchbox::Launcher::run( cmd ))
    1159           0 :         return true;
    1160             : 
    1161           0 :     LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
    1162           0 :     return false;
    1163             : }
    1164             : 
    1165           0 : NodePtr LocalNode::syncLaunch( const uint128_t& nodeID, const int64_t timeout )
    1166             : {
    1167           0 :     const lunchbox::Clock clock;
    1168             : 
    1169           0 :     do
    1170             :     {
    1171           0 :         co::NodePtr node = getNode( nodeID );
    1172           0 :         if( node && node->isConnected( ))
    1173           0 :             return node;
    1174             : 
    1175           0 :         lunchbox::sleep( 100 /*ms*/ );
    1176             :     }
    1177           0 :     while( timeout == static_cast< int32_t >( LB_TIMEOUT_INDEFINITE ) ||
    1178           0 :            clock.getTime64() < timeout );
    1179             : 
    1180           0 :     return nullptr;
    1181             : }
    1182             : 
    1183           0 : bool LocalNode::_setupPeer( const std::string& setupOpts )
    1184             : {
    1185           0 :     size_t nextPos = setupOpts.find( CO_SEPARATOR );
    1186           0 :     if( nextPos == std::string::npos )
    1187             :     {
    1188           0 :         LBERROR << "Could not parse working directory: " << setupOpts
    1189           0 :                 << std::endl;
    1190           0 :         return false;
    1191             :     }
    1192             : 
    1193           0 :     const std::string workDir = setupOpts.substr( 0, nextPos );
    1194           0 :     std::string description = setupOpts.substr( nextPos + 1 );
    1195             : 
    1196           0 :     if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
    1197           0 :         LBWARN << "Can't change working directory to " << workDir << ": "
    1198           0 :                << lunchbox::sysError << std::endl;
    1199             : 
    1200           0 :     nextPos = description.find( CO_SEPARATOR );
    1201           0 :     if( nextPos == std::string::npos )
    1202             :     {
    1203           0 :         LBERROR << "Could not parse server node type: " << description
    1204           0 :                 << " is left from " << setupOpts << std::endl;
    1205           0 :         return false;
    1206             :     }
    1207             : 
    1208           0 :     const std::string nodeType = description.substr( 0, nextPos );
    1209           0 :     description = description.substr( nextPos + 1 );
    1210             : 
    1211           0 :     co::NodePtr peer = createNode( std::stoi( nodeType ));
    1212           0 :     if( !peer->deserialize( description ))
    1213           0 :         LBWARN << "Can't parse peer data" << std::endl;
    1214             : 
    1215           0 :     LBASSERTINFO( description.empty(), description );
    1216           0 :     if( !connect( peer ))
    1217             :     {
    1218           0 :         LBERROR << "Can't connect peer node using " << *peer << std::endl;
    1219           0 :         return false;
    1220             :     }
    1221           0 :     return true;
    1222             : }
    1223             : 
    1224          34 : NodePtr LocalNode::createNode( const uint32_t type )
    1225             : {
    1226          34 :     LBASSERTINFO( type == NODETYPE_NODE, type );
    1227          34 :     return new Node( type );
    1228             : }
    1229             : 
    1230           0 : NodePtr LocalNode::getNode( const NodeID& id ) const
    1231             : {
    1232           0 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1233           0 :     NodeHash::const_iterator i = _impl->nodes->find( id );
    1234           0 :     if( i == _impl->nodes->end() || !i->second->isReachable( ))
    1235           0 :         return 0;
    1236           0 :     return i->second;
    1237             : }
    1238             : 
    1239         114 : Nodes LocalNode::getNodes( const bool addSelf ) const
    1240             : {
    1241         114 :     Nodes nodes;
    1242         228 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1243         334 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1244             :     {
    1245         440 :         NodePtr node = i->second;
    1246         220 :         if( node->isReachable() && ( addSelf || node != this ))
    1247         220 :             nodes.push_back( i->second );
    1248             :     }
    1249         228 :     return nodes;
    1250             : }
    1251             : 
    1252         146 : CommandQueue* LocalNode::getCommandThreadQueue()
    1253             : {
    1254         146 :     return _impl->commandThread->getWorkerQueue();
    1255             : }
    1256             : 
    1257           8 : bool LocalNode::inCommandThread() const
    1258             : {
    1259           8 :     return _impl->commandThread->isCurrent();
    1260             : }
    1261             : 
    1262           0 : const Strings& LocalNode::getCommandLine() const
    1263             : {
    1264           0 :     return _impl->args;
    1265             : }
    1266             : 
    1267       72429 : int64_t LocalNode::getTime64() const
    1268             : {
    1269       72429 :     return _impl->clock.getTime64();
    1270             : }
    1271             : 
    1272           0 : ssize_t LocalNode::getCounter( const Counter counter ) const
    1273             : {
    1274           0 :     return _impl->counters[ counter ];
    1275             : }
    1276             : 
    1277         196 : void LocalNode::flushCommands()
    1278             : {
    1279         196 :     _impl->incoming.interrupt();
    1280         196 : }
    1281             : 
    1282             : //----------------------------------------------------------------------
    1283             : // receiver thread functions
    1284             : //----------------------------------------------------------------------
    1285          50 : void LocalNode::_runReceiverThread()
    1286             : {
    1287          50 :     LB_TS_THREAD( _rcvThread );
    1288          50 :     _initService();
    1289             : 
    1290          50 :     int nErrors = 0;
    1291      145152 :     while( isListening( ))
    1292             :     {
    1293       72552 :         const ConnectionSet::Event result = _impl->incoming.select();
    1294       72551 :         switch( result )
    1295             :         {
    1296             :             case ConnectionSet::EVENT_CONNECT:
    1297          34 :                 _handleConnect();
    1298          34 :                 break;
    1299             : 
    1300             :             case ConnectionSet::EVENT_DATA:
    1301       72179 :                 nErrors = 0;
    1302       72179 :                 _handleData();
    1303       72179 :                 break;
    1304             : 
    1305             :             case ConnectionSet::EVENT_DISCONNECT:
    1306             :             case ConnectionSet::EVENT_INVALID_HANDLE:
    1307          34 :                 _handleDisconnect();
    1308          34 :                 break;
    1309             : 
    1310             :             case ConnectionSet::EVENT_TIMEOUT:
    1311           0 :                 LBINFO << "select timeout" << std::endl;
    1312           0 :                 break;
    1313             : 
    1314             :             case ConnectionSet::EVENT_ERROR:
    1315           0 :                 ++nErrors;
    1316           0 :                 LBWARN << "Connection error during select" << std::endl;
    1317           0 :                 if( nErrors > 100 )
    1318             :                 {
    1319           0 :                     LBWARN << "Too many errors in a row, capping connection"
    1320           0 :                            << std::endl;
    1321           0 :                     _handleDisconnect();
    1322             :                 }
    1323           0 :                 break;
    1324             : 
    1325             :             case ConnectionSet::EVENT_SELECT_ERROR:
    1326           0 :                 LBWARN << "Error during select" << std::endl;
    1327           0 :                 ++nErrors;
    1328           0 :                 if( nErrors > 10 )
    1329             :                 {
    1330           0 :                     LBWARN << "Too many errors in a row" << std::endl;
    1331           0 :                     LBUNIMPLEMENTED;
    1332             :                 }
    1333           0 :                 break;
    1334             : 
    1335             :             case ConnectionSet::EVENT_INTERRUPT:
    1336         304 :                 _redispatchCommands();
    1337         304 :                 break;
    1338             : 
    1339             :             default:
    1340           0 :                 LBUNIMPLEMENTED;
    1341             :         }
    1342             :     }
    1343             : 
    1344          49 :     if( !_impl->pendingCommands.empty( ))
    1345           0 :         LBWARN << _impl->pendingCommands.size()
    1346           0 :                << " commands pending while leaving command thread" << std::endl;
    1347             : 
    1348          49 :     _impl->pendingCommands.clear();
    1349          49 :     LBCHECK( _impl->commandThread->join( ));
    1350             : 
    1351          98 :     ConnectionPtr connection = getConnection();
    1352          98 :     PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
    1353          49 :     connection = pipe->getSibling();
    1354          49 :     _removeConnection( connection );
    1355          49 :     _impl->connectionNodes.erase( connection );
    1356          49 :     _disconnect();
    1357             : 
    1358          49 :     const Connections& connections = _impl->incoming.getConnections();
    1359         195 :     while( !connections.empty( ))
    1360             :     {
    1361          73 :         connection = connections.back();
    1362         146 :         NodePtr node = _impl->connectionNodes[ connection ];
    1363             : 
    1364          73 :         if( node )
    1365          73 :             _closeNode( node );
    1366          73 :         _removeConnection( connection );
    1367             :     }
    1368             : 
    1369          49 :     _impl->objectStore->clear();
    1370          49 :     _impl->pendingCommands.clear();
    1371          49 :     _impl->smallBuffers.flush();
    1372          49 :     _impl->bigBuffers.flush();
    1373             : 
    1374         245 :     LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
    1375         196 :            << std::endl;
    1376          49 : }
    1377             : 
    1378          34 : void LocalNode::_handleConnect()
    1379             : {
    1380          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1381          68 :     ConnectionPtr newConn = connection->acceptSync();
    1382          34 :     connection->acceptNB();
    1383             : 
    1384          34 :     if( newConn )
    1385          34 :         _addConnection( newConn );
    1386             :     else
    1387           0 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1388          34 : }
    1389             : 
    1390          34 : void LocalNode::_handleDisconnect()
    1391             : {
    1392          34 :     while( _handleData( )) ; // read remaining data off connection
    1393             : 
    1394          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1395          34 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
    1396             : 
    1397          34 :     if( i != _impl->connectionNodes.end( ))
    1398             :     {
    1399          68 :         NodePtr node = i->second;
    1400             : 
    1401          34 :         node->ref(); // extend lifetime to give cmd handler a chance
    1402             : 
    1403             :         // local command dispatching
    1404          68 :         OCommand( this, this, CMD_NODE_REMOVE_NODE )
    1405         102 :                 << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
    1406             : 
    1407          34 :         if( node->getConnection() == connection )
    1408          34 :             _closeNode( node );
    1409           0 :         else if( connection->isMulticast( ))
    1410           0 :             node->_removeMulticast( connection );
    1411             :     }
    1412             : 
    1413          34 :     _removeConnection( connection );
    1414          34 : }
    1415             : 
    1416       72213 : bool LocalNode::_handleData()
    1417             : {
    1418       72213 :     _impl->smallBuffers.compact();
    1419       72213 :     _impl->bigBuffers.compact();
    1420             : 
    1421      144426 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1422       72213 :     LBASSERT( connection );
    1423             : 
    1424      144426 :     BufferPtr buffer = _readHead( connection );
    1425       72213 :     if( !buffer ) // fluke signal
    1426          68 :         return false;
    1427             : 
    1428      144290 :     ICommand command = _setupCommand( connection, buffer );
    1429       72145 :     const bool gotCommand = _readTail( command, buffer, connection );
    1430       72145 :     LBASSERT( gotCommand );
    1431             : 
    1432             :     // start next receive
    1433      144290 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1434       72145 :     connection->recvNB( nextBuffer, COMMAND_MINSIZE );
    1435             : 
    1436       72145 :     if( gotCommand )
    1437             :     {
    1438       72145 :         _dispatchCommand( command );
    1439       72145 :         return true;
    1440             :     }
    1441             : 
    1442           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1443           0 :     return false;
    1444             : }
    1445             : 
    1446       72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
    1447             : {
    1448      144426 :     BufferPtr buffer;
    1449       72213 :     const bool gotSize = connection->recvSync( buffer, false );
    1450             : 
    1451       72213 :     if( !buffer ) // fluke signal
    1452             :     {
    1453           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1454           0 :                << std::endl;
    1455           0 :         _impl->incoming.setDirty();
    1456           0 :         return 0;
    1457             :     }
    1458             : 
    1459       72213 :     if( gotSize )
    1460       72145 :         return buffer;
    1461             : 
    1462             :     // Some systems signal data on dead connections.
    1463          68 :     buffer->setSize( 0 );
    1464          68 :     connection->recvNB( buffer, COMMAND_MINSIZE );
    1465          68 :     return 0;
    1466             : }
    1467             : 
    1468       72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
    1469             :                                    ConstBufferPtr buffer )
    1470             : {
    1471      144290 :     NodePtr node;
    1472       72145 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
    1473       72145 :     if( i != _impl->connectionNodes.end( ))
    1474       72077 :         node = i->second;
    1475       72145 :     LBVERB << "Handle data from " << node << std::endl;
    1476             : 
    1477             : #ifdef COLLAGE_BIGENDIAN
    1478             :     const bool swapping = node ? !node->isBigEndian() : false;
    1479             : #else
    1480       72145 :     const bool swapping = node ? node->isBigEndian() : false;
    1481             : #endif
    1482      144290 :     ICommand command( this, node, buffer, swapping );
    1483             : 
    1484       72145 :     if( node )
    1485             :     {
    1486       72077 :         node->_setLastReceive( getTime64( ));
    1487       72077 :         return command;
    1488             :     }
    1489             : 
    1490          68 :     uint32_t cmd = command.getCommand();
    1491             : #ifdef COLLAGE_BIGENDIAN
    1492             :     lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
    1493             : #endif
    1494          68 :     switch( cmd )
    1495             :     {
    1496             :     case CMD_NODE_CONNECT:
    1497             :     case CMD_NODE_CONNECT_REPLY:
    1498             :     case CMD_NODE_ID:
    1499             : #ifdef COLLAGE_BIGENDIAN
    1500             :         command = ICommand( this, node, buffer, true );
    1501             : #endif
    1502          68 :         break;
    1503             : 
    1504             :     case CMD_NODE_CONNECT_BE:
    1505             :     case CMD_NODE_CONNECT_REPLY_BE:
    1506             :     case CMD_NODE_ID_BE:
    1507             : #ifndef COLLAGE_BIGENDIAN
    1508           0 :         command = ICommand( this, node, buffer, true );
    1509             : #endif
    1510           0 :         break;
    1511             : 
    1512             :     default:
    1513           0 :         LBUNIMPLEMENTED;
    1514           0 :         return ICommand();
    1515             :     }
    1516             : 
    1517          68 :     command.setCommand( cmd ); // reset correctly swapped version
    1518          68 :     return command;
    1519             : }
    1520             : 
    1521       72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
    1522             :                            ConnectionPtr connection )
    1523             : {
    1524       72145 :     const uint64_t needed = command.getSize();
    1525       72145 :     if( needed <= buffer->getSize( ))
    1526       62134 :         return true;
    1527             : 
    1528       10011 :     if( needed > buffer->getMaxSize( ))
    1529             :     {
    1530           0 :         LBASSERT( needed > COMMAND_ALLOCSIZE );
    1531           0 :         LBASSERTINFO( needed < LB_BIT48,
    1532             :                       "Out-of-sync network stream: " << command << "?" );
    1533             :         // not enough space for remaining data, alloc and copy to new buffer
    1534           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
    1535           0 :         newBuffer->replace( *buffer );
    1536           0 :         buffer = newBuffer;
    1537             : 
    1538           0 :         command = ICommand( this, command.getRemoteNode(), buffer,
    1539           0 :                             command.isSwapping( ));
    1540             :     }
    1541             : 
    1542             :     // read remaining data
    1543       10011 :     connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
    1544       10011 :     return connection->recvSync( buffer );
    1545             : }
    1546             : 
    1547          35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
    1548             : {
    1549          35 :     LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
    1550             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
    1551           0 :         _impl->bigBuffers.alloc( size ) :
    1552          35 :         _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1553          35 :     return buffer;
    1554             : }
    1555             : 
    1556       72194 : void LocalNode::_dispatchCommand( ICommand& command )
    1557             : {
    1558       72194 :     LBASSERTINFO( command.isValid(), command );
    1559             : 
    1560       72194 :     if( dispatchCommand( command ))
    1561       72194 :         _redispatchCommands();
    1562             :     else
    1563             :     {
    1564           0 :         _redispatchCommands();
    1565           0 :         _impl->pendingCommands.push_back( command );
    1566             :     }
    1567       72194 : }
    1568             : 
    1569       72231 : bool LocalNode::dispatchCommand( ICommand& command )
    1570             : {
    1571       72231 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1572       72231 :     LBASSERTINFO( command.isValid(), command );
    1573             : 
    1574       72231 :     const uint32_t type = command.getType();
    1575       72231 :     switch( type )
    1576             :     {
    1577             :         case COMMANDTYPE_NODE:
    1578       71726 :             LBCHECK( Dispatcher::dispatchCommand( command ));
    1579       71726 :             return true;
    1580             : 
    1581             :         case COMMANDTYPE_OBJECT:
    1582         505 :             return _impl->objectStore->dispatchObjectCommand( command );
    1583             : 
    1584             :         default:
    1585           0 :             LBABORT( "Unknown command type " << type << " for " << command );
    1586           0 :             return true;
    1587             :     }
    1588             : }
    1589             : 
    1590       72498 : void LocalNode::_redispatchCommands()
    1591             : {
    1592       72498 :     bool changes = true;
    1593       72498 :     while( changes && !_impl->pendingCommands.empty( ))
    1594             :     {
    1595           0 :         changes = false;
    1596             : 
    1597           0 :         for( CommandList::iterator i = _impl->pendingCommands.begin();
    1598           0 :              i != _impl->pendingCommands.end(); ++i )
    1599             :         {
    1600           0 :             ICommand& command = *i;
    1601           0 :             LBASSERT( command.isValid( ));
    1602             : 
    1603           0 :             if( dispatchCommand( command ))
    1604             :             {
    1605           0 :                 _impl->pendingCommands.erase( i );
    1606           0 :                 changes = true;
    1607           0 :                 break;
    1608             :             }
    1609             :         }
    1610             :     }
    1611             : 
    1612             : #ifndef NDEBUG
    1613       72498 :     if( !_impl->pendingCommands.empty( ))
    1614           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1615           0 :                << std::endl;
    1616       72498 :     LBASSERT( _impl->pendingCommands.size() < 200 );
    1617             : #endif
    1618       72498 : }
    1619             : 
    1620          50 : void LocalNode::_initService()
    1621             : {
    1622          97 :     LB_TS_SCOPED( _rcvThread );
    1623          50 :     _impl->service->withdraw(); // go silent during k/v update
    1624             : 
    1625          97 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1626          50 :     if( descs.empty( ))
    1627           3 :         return;
    1628             : 
    1629          94 :     std::ostringstream out;
    1630          47 :     out << getType();
    1631          47 :     _impl->service->set( "co_type", out.str( ));
    1632             : 
    1633          47 :     out.str("");
    1634          47 :     out << descs.size();
    1635          47 :     _impl->service->set( "co_numPorts", out.str( ));
    1636             : 
    1637          94 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
    1638             :     {
    1639          94 :         ConnectionDescriptionPtr desc = *i;
    1640          47 :         out.str("");
    1641          47 :         out << "co_port" << i - descs.begin();
    1642          47 :         _impl->service->set( out.str(), desc->toString( ));
    1643             :     }
    1644             : 
    1645          47 :     _impl->service->announce( descs.front()->port, getNodeID().getString( ));
    1646             : }
    1647             : 
    1648          49 : void LocalNode::_exitService()
    1649             : {
    1650          49 :     _impl->service->withdraw();
    1651          49 : }
    1652             : 
    1653           1 : Zeroconf LocalNode::getZeroconf()
    1654             : {
    1655           2 :     lunchbox::ScopedWrite mutex( _impl->service );
    1656           1 :     _impl->service->discover( servus::Servus::IF_ALL, 500 );
    1657           2 :     return Zeroconf( _impl->service.data );
    1658             : }
    1659             : 
    1660             : 
    1661             : //----------------------------------------------------------------------
    1662             : // command thread functions
    1663             : //----------------------------------------------------------------------
    1664          50 : bool LocalNode::_startCommandThread( const int32_t threadID )
    1665             : {
    1666          50 :     _impl->commandThread->threadID = threadID;
    1667          50 :     return _impl->commandThread->start();
    1668             : }
    1669             : 
    1670       51574 : bool LocalNode::_notifyCommandThreadIdle()
    1671             : {
    1672       51574 :     return _impl->objectStore->notifyCommandThreadIdle();
    1673             : }
    1674             : 
    1675       20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
    1676             : {
    1677       20001 :     const uint32_t requestID = command.get< uint32_t >();
    1678       20001 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
    1679             : 
    1680       20001 :     serveRequest( requestID );
    1681       20001 :     return true;
    1682             : }
    1683             : 
    1684          49 : bool LocalNode::_cmdStopRcv( ICommand& command )
    1685             : {
    1686          49 :     LB_TS_THREAD( _rcvThread );
    1687          49 :     LBASSERT( isListening( ));
    1688             : 
    1689          49 :     _exitService();
    1690          49 :     _setClosing(); // causes rcv thread exit
    1691             : 
    1692          49 :     command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
    1693          49 :     _dispatchCommand( command );
    1694          49 :     return true;
    1695             : }
    1696             : 
    1697          49 : bool LocalNode::_cmdStopCmd( ICommand& )
    1698             : {
    1699          49 :     LB_TS_THREAD( _cmdThread );
    1700          49 :     LBASSERTINFO( isClosing(), *this );
    1701             : 
    1702          49 :     _setClosed();
    1703          49 :     return true;
    1704             : }
    1705             : 
    1706           0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
    1707             : {
    1708           0 :     const int32_t affinity = command.get< int32_t >();
    1709             : 
    1710           0 :     lunchbox::Thread::setAffinity( affinity );
    1711           0 :     return true;
    1712             : }
    1713             : 
    1714          34 : bool LocalNode::_cmdConnect( ICommand& command )
    1715             : {
    1716          34 :     LBASSERTINFO( !command.getRemoteNode(), command );
    1717          34 :     LBASSERT( _impl->inReceiverThread( ));
    1718             : 
    1719          34 :     const NodeID& nodeID = command.get< NodeID >();
    1720          34 :     const uint32_t requestID = command.get< uint32_t >();
    1721          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1722          68 :     std::string data = command.get< std::string >();
    1723             : 
    1724          34 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1725          34 :            << nodeType << " data " << data << std::endl;
    1726             : 
    1727          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1728             : 
    1729          34 :     LBASSERT( connection );
    1730          34 :     LBASSERT( nodeID != getNodeID() );
    1731          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1732             :               _impl->connectionNodes.end( ));
    1733             : 
    1734          68 :     NodePtr peer;
    1735             : #ifdef COLLAGE_BIGENDIAN
    1736             :     uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
    1737             :     lunchbox::byteswap( cmd );
    1738             : #else
    1739          34 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1740             : #endif
    1741             : 
    1742             :     // No locking needed, only recv thread modifies
    1743          34 :     NodeHashCIter i = _impl->nodes->find( nodeID );
    1744          34 :     if( i != _impl->nodes->end( ))
    1745             :     {
    1746           0 :         peer = i->second;
    1747           0 :         if( peer->isReachable( ))
    1748             :         {
    1749             :             // Node exists, probably simultaneous connect from peer
    1750           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1751           0 :                    << std::endl;
    1752             : 
    1753             :             // refuse connection
    1754           0 :             OCommand( Connections( 1, connection ), cmd )
    1755           0 :                 << NodeID() << requestID;
    1756             : 
    1757             :             // NOTE: There is no close() here. The reply command above has to be
    1758             :             // received by the peer first, before closing the connection.
    1759           0 :             _removeConnection( connection );
    1760           0 :             return true;
    1761             :         }
    1762             :     }
    1763             : 
    1764             :     // create and add connected node
    1765          34 :     if( !peer )
    1766          34 :         peer = createNode( nodeType );
    1767          34 :     if( !peer )
    1768             :     {
    1769           0 :         LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
    1770           0 :                << std::endl;
    1771             : 
    1772             :         // refuse connection
    1773           0 :         OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
    1774             : 
    1775             :         // NOTE: There is no close() here. The reply command above has to be
    1776             :         // received by the peer first, before closing the connection.
    1777           0 :         _removeConnection( connection );
    1778           0 :         return true;
    1779             :     }
    1780             : 
    1781          34 :     if( !peer->deserialize( data ))
    1782           0 :         LBWARN << "Error during node initialization" << std::endl;
    1783          34 :     LBASSERTINFO( data.empty(), data );
    1784          34 :     LBASSERTINFO( peer->getNodeID() == nodeID,
    1785             :                   peer->getNodeID() << "!=" << nodeID );
    1786          34 :     LBASSERT( peer->getType() == nodeType );
    1787             : 
    1788          34 :     _impl->connectionNodes[ connection ] = peer;
    1789             :     {
    1790          68 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1791          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1792             :     }
    1793          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1794             : 
    1795             :     // send our information as reply
    1796          68 :     OCommand( Connections( 1, connection ), cmd )
    1797         102 :         << getNodeID() << requestID << getType() << serialize();
    1798             : 
    1799          34 :     return true;
    1800             : }
    1801             : 
    1802          34 : bool LocalNode::_cmdConnectReply( ICommand& command )
    1803             : {
    1804          34 :     LBASSERT( !command.getRemoteNode( ));
    1805          34 :     LBASSERT( _impl->inReceiverThread( ));
    1806             : 
    1807          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1808          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1809             :               _impl->connectionNodes.end( ));
    1810             : 
    1811          34 :     const NodeID& nodeID = command.get< NodeID >();
    1812          34 :     const uint32_t requestID = command.get< uint32_t >();
    1813             : 
    1814             :     // connection refused
    1815          34 :     if( nodeID == 0 )
    1816             :     {
    1817           0 :         LBINFO << "Connection refused, node already connected by peer"
    1818           0 :                << std::endl;
    1819             : 
    1820           0 :         _removeConnection( connection );
    1821           0 :         serveRequest( requestID, false );
    1822           0 :         return true;
    1823             :     }
    1824             : 
    1825          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1826          68 :     std::string data = command.get< std::string >();
    1827             : 
    1828          34 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1829          34 :            << " type " << nodeType << " data " << data << std::endl;
    1830             : 
    1831             :     // No locking needed, only recv thread modifies
    1832          34 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1833          68 :     NodePtr peer;
    1834          34 :     if( i != _impl->nodes->end( ))
    1835           0 :         peer = i->second;
    1836             : 
    1837          34 :     if( peer && peer->isReachable( )) // simultaneous connect
    1838             :     {
    1839           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1840           0 :                << connection << std::endl;
    1841             : 
    1842           0 :         _removeConnection( connection );
    1843           0 :         _closeNode( peer );
    1844           0 :         serveRequest( requestID, false );
    1845           0 :         return true;
    1846             :     }
    1847             : 
    1848             :     // create and add node
    1849          34 :     if( !peer )
    1850             :     {
    1851          34 :         if( requestID != LB_UNDEFINED_UINT32 )
    1852          34 :             peer = reinterpret_cast< Node* >( getRequestData( requestID ));
    1853             :         else
    1854           0 :             peer = createNode( nodeType );
    1855             :     }
    1856          34 :     if( !peer )
    1857             :     {
    1858           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1859           0 :                << std::endl;
    1860           0 :         _removeConnection( connection );
    1861           0 :         return true;
    1862             :     }
    1863             : 
    1864          34 :     LBASSERTINFO( peer->getType() == nodeType,
    1865             :                   peer->getType() << " != " << nodeType );
    1866          34 :     LBASSERT( peer->isClosed( ));
    1867             : 
    1868          34 :     if( !peer->deserialize( data ))
    1869           0 :         LBWARN << "Error during node initialization" << std::endl;
    1870          34 :     LBASSERT( data.empty( ));
    1871          34 :     LBASSERT( peer->getNodeID() == nodeID );
    1872             : 
    1873             :     // send ACK to peer
    1874             :     // cppcheck-suppress unusedScopedObject
    1875          34 :     OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
    1876             : 
    1877          34 :     peer->_connect( connection );
    1878          34 :     _impl->connectionNodes[ connection ] = peer;
    1879             :     {
    1880          68 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1881          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1882             :     }
    1883          34 :     _connectMulticast( peer );
    1884          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1885             : 
    1886          34 :     serveRequest( requestID, true );
    1887             : 
    1888          34 :     notifyConnect( peer );
    1889          34 :     return true;
    1890             : }
    1891             : 
    1892          34 : bool LocalNode::_cmdConnectAck( ICommand& command )
    1893             : {
    1894          68 :     NodePtr node = command.getRemoteNode();
    1895          34 :     LBASSERT( node );
    1896          34 :     LBASSERT( _impl->inReceiverThread( ));
    1897          34 :     LBVERB << "handle connect ack" << std::endl;
    1898             : 
    1899          34 :     node->_connect( _impl->incoming.getConnection( ));
    1900          34 :     _connectMulticast( node );
    1901          34 :     notifyConnect( node );
    1902          68 :     return true;
    1903             : }
    1904             : 
    1905           0 : bool LocalNode::_cmdID( ICommand& command )
    1906             : {
    1907           0 :     LBASSERT( _impl->inReceiverThread( ));
    1908             : 
    1909           0 :     const NodeID& nodeID = command.get< NodeID >();
    1910           0 :     uint32_t nodeType = command.get< uint32_t >();
    1911           0 :     std::string data = command.get< std::string >();
    1912             : 
    1913           0 :     if( command.getRemoteNode( ))
    1914             :     {
    1915           0 :         LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
    1916           0 :         LBASSERT( command.getRemoteNode()->_getMulticast( ));
    1917           0 :         return true;
    1918             :     }
    1919             : 
    1920           0 :     LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
    1921             : 
    1922           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1923           0 :     LBASSERT( connection->isMulticast( ));
    1924           0 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1925             :               _impl->connectionNodes.end( ));
    1926             : 
    1927           0 :     NodePtr node;
    1928           0 :     if( nodeID == getNodeID() ) // 'self' multicast connection
    1929           0 :         node = this;
    1930             :     else
    1931             :     {
    1932             :         // No locking needed, only recv thread writes
    1933           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1934             : 
    1935           0 :         if( i == _impl->nodes->end( ))
    1936             :         {
    1937             :             // unknown node: create and add unconnected node
    1938           0 :             node = createNode( nodeType );
    1939             : 
    1940           0 :             if( !node->deserialize( data ))
    1941           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1942           0 :             LBASSERTINFO( data.empty(), data );
    1943             : 
    1944             :             {
    1945           0 :                 lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1946           0 :                 _impl->nodes.data[ nodeID ] = node;
    1947             :             }
    1948           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1949           0 :                    << connection << std::endl;
    1950             :         }
    1951             :         else
    1952           0 :             node = i->second;
    1953             :     }
    1954           0 :     LBASSERT( node );
    1955           0 :     LBASSERTINFO( node->getNodeID() == nodeID,
    1956             :                   node->getNodeID() << "!=" << nodeID );
    1957             : 
    1958           0 :     _connectMulticast( node, connection );
    1959           0 :     _impl->connectionNodes[ connection ] = node;
    1960           0 :     LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
    1961           0 :            << " to " << getNodeID() << std::endl;
    1962           0 :     return true;
    1963             : }
    1964             : 
    1965           8 : bool LocalNode::_cmdDisconnect( ICommand& command )
    1966             : {
    1967           8 :     LBASSERT( _impl->inReceiverThread( ));
    1968             : 
    1969           8 :     const uint32_t requestID = command.get< uint32_t >();
    1970             : 
    1971          16 :     NodePtr node = static_cast<Node*>( getRequestData( requestID ));
    1972           8 :     LBASSERT( node );
    1973             : 
    1974           8 :     _closeNode( node );
    1975           8 :     LBASSERT( node->isClosed( ));
    1976           8 :     serveRequest( requestID );
    1977          16 :     return true;
    1978             : }
    1979             : 
    1980           0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
    1981             : {
    1982           0 :     const NodeID& nodeID = command.get< NodeID >();
    1983           0 :     const uint32_t requestID = command.get< uint32_t >();
    1984             : 
    1985           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1986           0 :            << " nodeID " << nodeID << std::endl;
    1987             : 
    1988           0 :     NodePtr node = getNode( nodeID );
    1989           0 :     NodePtr toNode = command.getRemoteNode();
    1990             : 
    1991           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1992           0 :     std::string nodeData;
    1993           0 :     if( node )
    1994             :     {
    1995           0 :         nodeType = node->getType();
    1996           0 :         nodeData = node->serialize();
    1997           0 :         LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
    1998           0 :                << toNode << std::endl;
    1999             :     }
    2000             : 
    2001           0 :     toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
    2002           0 :         << nodeID << requestID << nodeType << nodeData;
    2003           0 :     return true;
    2004             : }
    2005             : 
    2006           0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
    2007             : {
    2008           0 :     LBASSERT( _impl->inReceiverThread( ));
    2009             : 
    2010           0 :     const NodeID& nodeID = command.get< NodeID >();
    2011           0 :     const uint32_t requestID = command.get< uint32_t >();
    2012           0 :     const uint32_t nodeType = command.get< uint32_t >();
    2013           0 :     std::string nodeData = command.get< std::string >();
    2014             : 
    2015           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    2016           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    2017             : 
    2018             :     // No locking needed, only recv thread writes
    2019           0 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    2020           0 :     if( i != _impl->nodes->end( ))
    2021             :     {
    2022             :         // Requested node connected to us in the meantime
    2023           0 :         NodePtr node = i->second;
    2024             : 
    2025           0 :         node->ref( this );
    2026           0 :         serveRequest( requestID, node.get( ));
    2027           0 :         return true;
    2028             :     }
    2029             : 
    2030           0 :     if( nodeType == NODETYPE_INVALID )
    2031             :     {
    2032           0 :         serveRequest( requestID, (void*)0 );
    2033           0 :         return true;
    2034             :     }
    2035             : 
    2036             :     // new node: create and add unconnected node
    2037           0 :     NodePtr node = createNode( nodeType );
    2038           0 :     if( node )
    2039             :     {
    2040           0 :         LBASSERT( node );
    2041             : 
    2042           0 :         if( !node->deserialize( nodeData ))
    2043           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    2044           0 :         LBASSERT( nodeData.empty( ));
    2045           0 :         node->ref( this );
    2046             :     }
    2047             :     else
    2048           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    2049             : 
    2050           0 :     serveRequest( requestID, node.get( ));
    2051           0 :     return true;
    2052             : }
    2053             : 
    2054           0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
    2055             : {
    2056           0 :     LBASSERT( inCommandThread( ));
    2057           0 :     if( !_impl->sendToken ) // enqueue command if no token available
    2058             :     {
    2059           0 :         const uint32_t timeout = Global::getTimeout();
    2060           0 :         if( timeout == LB_TIMEOUT_INDEFINITE ||
    2061           0 :             ( getTime64() - _impl->lastSendToken <= timeout ))
    2062             :         {
    2063           0 :             _impl->sendTokenQueue.push_back( command );
    2064           0 :             return true;
    2065             :         }
    2066             : 
    2067             :         // timeout! - clear old requests
    2068           0 :         _impl->sendTokenQueue.clear();
    2069             :         // 'generate' new token - release is robust
    2070             :     }
    2071             : 
    2072           0 :     _impl->sendToken = false;
    2073             : 
    2074           0 :     const uint32_t requestID = command.get< uint32_t >();
    2075           0 :     command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    2076           0 :             << requestID;
    2077           0 :     return true;
    2078             : }
    2079             : 
    2080           0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
    2081             : {
    2082           0 :     const uint32_t requestID = command.get< uint32_t >();
    2083           0 :     serveRequest( requestID );
    2084           0 :     return true;
    2085             : }
    2086             : 
    2087           0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
    2088             : {
    2089           0 :     LBASSERT( inCommandThread( ));
    2090           0 :     _impl->lastSendToken = getTime64();
    2091             : 
    2092           0 :     if( _impl->sendToken )
    2093           0 :         return true; // double release due to timeout
    2094           0 :     if( _impl->sendTokenQueue.empty( ))
    2095             :     {
    2096           0 :         _impl->sendToken = true;
    2097           0 :         return true;
    2098             :     }
    2099             : 
    2100           0 :     ICommand& request = _impl->sendTokenQueue.front();
    2101             : 
    2102           0 :     const uint32_t requestID = request.get< uint32_t >();
    2103           0 :     request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    2104           0 :             << requestID;
    2105           0 :     _impl->sendTokenQueue.pop_front();
    2106           0 :     return true;
    2107             : }
    2108             : 
    2109           0 : bool LocalNode::_cmdAddListener( ICommand& command )
    2110             : {
    2111             :     Connection* rawConnection =
    2112           0 :         reinterpret_cast< Connection* >( command.get< uint64_t >( ));
    2113           0 :     std::string data = command.get< std::string >();
    2114             : 
    2115           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2116           0 :     command.getRemoteNode()->_addConnectionDescription( description );
    2117             : 
    2118           0 :     if( command.getRemoteNode() != this )
    2119           0 :         return true;
    2120             : 
    2121           0 :     ConnectionPtr connection = rawConnection;
    2122           0 :     connection->unref();
    2123           0 :     LBASSERT( connection );
    2124             : 
    2125           0 :     _impl->connectionNodes[ connection ] = this;
    2126           0 :     if( connection->isMulticast( ))
    2127           0 :         _addMulticast( this, connection );
    2128             : 
    2129           0 :     connection->acceptNB();
    2130           0 :     _impl->incoming.addConnection( connection );
    2131             : 
    2132           0 :     _initService(); // update zeroconf
    2133           0 :     return true;
    2134             : }
    2135             : 
    2136           0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
    2137             : {
    2138           0 :     const uint32_t requestID = command.get< uint32_t >();
    2139           0 :     Connection* rawConnection = command.get< Connection* >();
    2140           0 :     std::string data = command.get< std::string >();
    2141             : 
    2142           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2143           0 :     LBCHECK(
    2144             :           command.getRemoteNode()->_removeConnectionDescription( description ));
    2145             : 
    2146           0 :     if( command.getRemoteNode() != this )
    2147           0 :         return true;
    2148             : 
    2149           0 :     _initService(); // update zeroconf
    2150             : 
    2151           0 :     ConnectionPtr connection = rawConnection;
    2152           0 :     connection->unref( this );
    2153           0 :     LBASSERT( connection );
    2154             : 
    2155           0 :     if( connection->isMulticast( ))
    2156           0 :         _removeMulticast( connection );
    2157             : 
    2158           0 :     _impl->incoming.removeConnection( connection );
    2159           0 :     LBASSERT( _impl->connectionNodes.find( connection ) !=
    2160             :               _impl->connectionNodes.end( ));
    2161           0 :     _impl->connectionNodes.erase( connection );
    2162           0 :     serveRequest( requestID );
    2163           0 :     return true;
    2164             : }
    2165             : 
    2166           0 : bool LocalNode::_cmdPing( ICommand& command )
    2167             : {
    2168           0 :     LBASSERT( inCommandThread( ));
    2169           0 :     command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
    2170           0 :     return true;
    2171             : }
    2172             : 
    2173           2 : bool LocalNode::_cmdCommand( ICommand& command )
    2174             : {
    2175           2 :     const uint128_t& commandID = command.get< uint128_t >();
    2176           4 :     CommandHandler func;
    2177             :     {
    2178           3 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2179           2 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2180           2 :         if( i == _impl->commandHandlers->end( ))
    2181           0 :             return false;
    2182             : 
    2183           2 :         CommandQueue* queue = i->second.second;
    2184           2 :         if( queue )
    2185             :         {
    2186           2 :             command.setDispatchFunction( CmdFunc( this,
    2187           1 :                                                 &LocalNode::_cmdCommandAsync ));
    2188           1 :             queue->push( command );
    2189           1 :             return true;
    2190             :         }
    2191             :         // else
    2192             : 
    2193           1 :         func = i->second.first;
    2194             :     }
    2195             : 
    2196           2 :     CustomICommand customCmd( command );
    2197           1 :     return func( customCmd );
    2198             : }
    2199             : 
    2200           1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
    2201             : {
    2202           1 :     const uint128_t& commandID = command.get< uint128_t >();
    2203           2 :     CommandHandler func;
    2204             :     {
    2205           2 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2206           1 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2207           1 :         LBASSERT( i != _impl->commandHandlers->end( ));
    2208           1 :         if( i ==  _impl->commandHandlers->end( ))
    2209           0 :             return true; // deregistered between dispatch and now
    2210           1 :         func = i->second.first;
    2211             :     }
    2212           2 :     CustomICommand customCmd( command );
    2213           1 :     return func( customCmd );
    2214             : }
    2215             : 
    2216          34 : bool LocalNode::_cmdAddConnection( ICommand& command )
    2217             : {
    2218          34 :     LBASSERT( _impl->inReceiverThread( ));
    2219             : 
    2220          68 :     ConnectionPtr connection = command.get< ConnectionPtr >();
    2221          34 :     _addConnection( connection );
    2222          34 :     connection->unref(); // ref'd by _addConnection
    2223          68 :     return true;
    2224             : }
    2225             : 
    2226          66 : }

Generated by: LCOV version 1.11