LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 580 1095 53.0 %
Date: 2015-11-03 13:48:53 Functions: 74 107 69.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/futureFunction.h>
      46             : #include <lunchbox/hash.h>
      47             : #include <lunchbox/lockable.h>
      48             : #include <lunchbox/request.h>
      49             : #include <lunchbox/rng.h>
      50             : #include <lunchbox/servus.h>
      51             : #include <lunchbox/sleep.h>
      52             : 
      53             : #include <boost/bind.hpp>
      54             : #include <boost/date_time/posix_time/posix_time.hpp>
      55             : #include <boost/lexical_cast.hpp>
      56             : 
      57             : #include <list>
      58             : 
      59             : namespace bp = boost::posix_time;
      60             : 
      61             : namespace co
      62             : {
      63             : namespace
      64             : {
      65          21 : lunchbox::a_int32_t _threadIDs;
      66             : 
      67             : typedef CommandFunc< LocalNode > CmdFunc;
      68             : typedef std::list< ICommand > CommandList;
      69             : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
      70             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      71             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      72             : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
      73             : typedef NodeHash::const_iterator NodeHashCIter;
      74             : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
      75             : typedef HandlerHash::const_iterator HandlerHashCIter;
      76             : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
      77             : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
      78             : typedef CommandHash::const_iterator CommandHashCIter;
      79             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      80             : }
      81             : 
      82             : namespace detail
      83             : {
      84          98 : class ReceiverThread : public lunchbox::Thread
      85             : {
      86             : public:
      87          51 :     explicit ReceiverThread( co::LocalNode* localNode )
      88          51 :         : _localNode( localNode ) {}
      89             : 
      90          46 :     bool init() override
      91             :     {
      92          46 :         const int32_t threadID = ++_threadIDs - 1;
      93          92 :         setName( std::string( "Rcv" ) +
      94          46 :                  boost::lexical_cast< std::string >( threadID ));
      95          46 :         return _localNode->_startCommandThread( threadID );
      96             :     }
      97             : 
      98          46 :     void run() override { _localNode->_runReceiverThread(); }
      99             : 
     100             : private:
     101             :     co::LocalNode* const _localNode;
     102             : };
     103             : 
     104          98 : class CommandThread : public Worker
     105             : {
     106             : public:
     107          51 :     explicit CommandThread( co::LocalNode* localNode )
     108             :         : Worker( Global::getCommandQueueLimit( ))
     109             :         , threadID( 0 )
     110          51 :         , _localNode( localNode )
     111          51 :     {}
     112             : 
     113             :     int32_t threadID;
     114             : 
     115             : protected:
     116          46 :     bool init() override
     117             :     {
     118          92 :         setName( std::string( "Cmd" ) +
     119          46 :                  boost::lexical_cast< std::string >( threadID ));
     120          46 :         return true;
     121             :     }
     122             : 
     123      102919 :     bool stopRunning() override { return _localNode->isClosed(); }
     124       51173 :     bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
     125             : 
     126             : private:
     127             :     co::LocalNode* const _localNode;
     128             : };
     129             : 
     130             : class LocalNode
     131             : {
     132             : public:
     133          51 :     LocalNode()
     134             :         : smallBuffers( 200 )
     135             :         , bigBuffers( 20 )
     136             :         , sendToken( true )
     137             :         , lastSendToken( 0 )
     138             :         , objectStore( 0 )
     139             :         , receiverThread( 0 )
     140             :         , commandThread( 0 )
     141          51 :         , service( "_collage._tcp" )
     142          51 :     {}
     143             : 
     144          49 :     ~LocalNode()
     145          49 :     {
     146          49 :         LBASSERT( incoming.isEmpty( ));
     147          49 :         LBASSERT( connectionNodes.empty( ));
     148          49 :         LBASSERT( pendingCommands.empty( ));
     149          49 :         LBASSERT( nodes->empty( ));
     150             : 
     151          49 :         delete objectStore;
     152          49 :         objectStore = 0;
     153          49 :         LBASSERT( !commandThread->isRunning( ));
     154          49 :         delete commandThread;
     155          49 :         commandThread = 0;
     156             : 
     157          49 :         LBASSERT( !receiverThread->isRunning( ));
     158          49 :         delete receiverThread;
     159          49 :         receiverThread = 0;
     160          49 :     }
     161             : 
     162         271 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     163             : 
     164             :     /** Commands re-scheduled for dispatch. */
     165             :     CommandList pendingCommands;
     166             : 
     167             :     /** The command buffer 'allocator' for small packets */
     168             :     co::BufferCache smallBuffers;
     169             : 
     170             :     /** The command buffer 'allocator' for big packets */
     171             :     co::BufferCache bigBuffers;
     172             : 
     173             :     bool sendToken; //!< send token availability.
     174             :     uint64_t lastSendToken; //!< last used time for timeout detection
     175             :     std::deque< co::ICommand > sendTokenQueue; //!< pending requests
     176             : 
     177             :     /** Manager of distributed object */
     178             :     ObjectStore* objectStore;
     179             : 
     180             :     /** Needed for thread-safety during nodeID-based connect() */
     181             :     lunchbox::Lock connectLock;
     182             : 
     183             :     /** The node for each connection. */
     184             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     185             : 
     186             :     /** The connected nodes. */
     187             :     lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
     188             : 
     189             :     /** The connection set of all connections from/to this node. */
     190             :     co::ConnectionSet incoming;
     191             : 
     192             :     /** The process-global clock. */
     193             :     lunchbox::Clock clock;
     194             : 
     195             :     /** The registered push handlers. */
     196             :     lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
     197             : 
     198             :     /** The registered custom command handlers. */
     199             :     lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
     200             : 
     201             :     ReceiverThread* receiverThread;
     202             :     CommandThread* commandThread;
     203             : 
     204             :     lunchbox::Lockable< lunchbox::Servus > service;
     205             : 
     206             :     // Performance counters:
     207             :     a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
     208             : };
     209             : }
     210             : 
     211          51 : LocalNode::LocalNode( const uint32_t type )
     212             :         : Node( type )
     213          51 :         , _impl( new detail::LocalNode )
     214             : {
     215          51 :     _impl->receiverThread = new detail::ReceiverThread( this );
     216          51 :     _impl->commandThread  = new detail::CommandThread( this );
     217          51 :     _impl->objectStore = new ObjectStore( this, _impl->counters );
     218             : 
     219          51 :     CommandQueue* queue = getCommandThreadQueue();
     220             :     registerCommand( CMD_NODE_CONNECT,
     221          51 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     222             :     registerCommand( CMD_NODE_CONNECT_BE,
     223          51 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     224             :     registerCommand( CMD_NODE_CONNECT_REPLY,
     225          51 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     226             :     registerCommand( CMD_NODE_CONNECT_REPLY_BE,
     227          51 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     228             :     registerCommand( CMD_NODE_ID,
     229          51 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     230             :     registerCommand( CMD_NODE_ID_BE,
     231          51 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     232             :     registerCommand( CMD_NODE_ACK_REQUEST,
     233          51 :                      CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
     234             :     registerCommand( CMD_NODE_STOP_RCV,
     235          51 :                      CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
     236             :     registerCommand( CMD_NODE_STOP_CMD,
     237          51 :                      CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
     238             :     registerCommand( CMD_NODE_SET_AFFINITY_RCV,
     239          51 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
     240             :     registerCommand( CMD_NODE_SET_AFFINITY_CMD,
     241          51 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
     242             :     registerCommand( CMD_NODE_CONNECT_ACK,
     243          51 :                      CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
     244             :     registerCommand( CMD_NODE_DISCONNECT,
     245          51 :                      CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
     246             :     registerCommand( CMD_NODE_GET_NODE_DATA,
     247          51 :                      CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
     248             :     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
     249          51 :                      CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
     250             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
     251          51 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
     252             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     253          51 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
     254             :     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
     255          51 :                      CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
     256             :     registerCommand( CMD_NODE_ADD_LISTENER,
     257          51 :                      CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
     258             :     registerCommand( CMD_NODE_REMOVE_LISTENER,
     259          51 :                      CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
     260             :     registerCommand( CMD_NODE_PING,
     261          51 :                      CmdFunc( this, &LocalNode::_cmdPing ), queue );
     262             :     registerCommand( CMD_NODE_PING_REPLY,
     263          51 :                      CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
     264             :     registerCommand( CMD_NODE_COMMAND,
     265          51 :                      CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
     266             :     registerCommand( CMD_NODE_ADD_CONNECTION,
     267          51 :                      CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
     268          51 : }
     269             : 
     270         137 : LocalNode::~LocalNode( )
     271             : {
     272          49 :     LBASSERT( !hasPendingRequests( ));
     273          49 :     delete _impl;
     274          88 : }
     275             : 
     276           1 : bool LocalNode::initLocal( const int argc, char** argv )
     277             : {
     278             :     // We do not use getopt_long because it really does not work due to the
     279             :     // following aspects:
     280             :     // - reordering of arguments
     281             :     // - different behavior of GNU and BSD implementations
     282             :     // - incomplete man pages
     283           1 :     for( int i=1; i<argc; ++i )
     284             :     {
     285           0 :         if( std::string( "--eq-listen" ) == argv[i] )
     286           0 :             LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
     287           0 :         if( std::string( "--eq-listen" ) == argv[i] ||
     288           0 :             std::string( "--co-listen" ) == argv[i] )
     289             :         {
     290           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     291             :             {
     292           0 :                 std::string data = argv[++i];
     293           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     294           0 :                 desc->port = Global::getDefaultPort();
     295             : 
     296           0 :                 if( desc->fromString( data ))
     297             :                 {
     298           0 :                     addConnectionDescription( desc );
     299           0 :                     LBASSERTINFO( data.empty(), data );
     300             :                 }
     301             :                 else
     302           0 :                     LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
     303             :             }
     304             :             else
     305             :             {
     306           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     307             :             }
     308             :         }
     309           0 :         else if ( std::string( "--co-globals" ) == argv[i] )
     310             :         {
     311           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     312             :             {
     313           0 :                 const std::string data = argv[++i];
     314           0 :                 if( !Global::fromString( data ))
     315             :                 {
     316           0 :                     LBWARN << "Invalid global variables string: " << data
     317           0 :                            << ", using default global variables." << std::endl;
     318           0 :                 }
     319             :             }
     320             :             else
     321             :             {
     322           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     323             :             }
     324             :         }
     325             :     }
     326             : 
     327           1 :     if( !listen( ))
     328             :     {
     329           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
     330           0 :                << std::endl;
     331           0 :         return false;
     332             :     }
     333           1 :     return true;
     334             : }
     335             : 
     336          46 : bool LocalNode::listen()
     337             : {
     338          46 :     LBVERB << "Listener data: " << serialize() << std::endl;
     339          46 :     if( !isClosed() || !_connectSelf( ))
     340           0 :         return false;
     341             : 
     342          46 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     343         270 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     344         180 :          i != descriptions.end(); ++i )
     345             :     {
     346          44 :         ConnectionDescriptionPtr description = *i;
     347          88 :         ConnectionPtr connection = Connection::create( description );
     348             : 
     349          44 :         if( !connection || !connection->listen( ))
     350             :         {
     351           0 :             LBWARN << "Can't create listener connection: " << description
     352           0 :                    << std::endl;
     353           0 :             return false;
     354             :         }
     355             : 
     356          44 :         _impl->connectionNodes[ connection ] = this;
     357          44 :         if( connection->isMulticast( ))
     358           0 :             _addMulticast( this, connection );
     359             : 
     360          44 :         connection->acceptNB();
     361          44 :         _impl->incoming.addConnection( connection );
     362             : 
     363          88 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     364         132 :                << std::endl;
     365          44 :     }
     366             : 
     367          92 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     368         138 :            << std::endl;
     369             : 
     370          46 :     _setListening();
     371          46 :     _impl->receiverThread->start();
     372             : 
     373          46 :     LBDEBUG << *this << std::endl;
     374          46 :     return true;
     375             : }
     376             : 
     377          45 : bool LocalNode::close()
     378             : {
     379          45 :     if( !isListening() )
     380           0 :         return false;
     381             : 
     382          45 :     send( CMD_NODE_STOP_RCV );
     383             : 
     384          45 :     LBCHECK( _impl->receiverThread->join( ));
     385          45 :     _cleanup();
     386             : 
     387         135 :     LBDEBUG << _impl->incoming.getSize() << " connections open after close"
     388         135 :            << std::endl;
     389             : #ifndef NDEBUG
     390          45 :     const Connections& connections = _impl->incoming.getConnections();
     391         135 :     for( Connections::const_iterator i = connections.begin();
     392          90 :          i != connections.end(); ++i )
     393             :     {
     394           0 :         LBDEBUG << "    " << *i << std::endl;
     395             :     }
     396             : #endif
     397             : 
     398          45 :     LBASSERTINFO( !hasPendingRequests(),
     399             :                   *static_cast< lunchbox::RequestHandler* >( this ));
     400          45 :     return true;
     401             : }
     402             : 
     403           0 : void LocalNode::setAffinity( const int32_t affinity )
     404             : {
     405           0 :     send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
     406           0 :     send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
     407             : 
     408           0 :     lunchbox::Thread::setAffinity( affinity );
     409           0 : }
     410             : 
     411          33 : void LocalNode::addConnection( ConnectionPtr connection )
     412             : {
     413          33 :     connection->ref(); // unref in _cmdAddConnection
     414          33 :     send( CMD_NODE_ADD_CONNECTION ) << connection;
     415          33 : }
     416             : 
     417           0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
     418             : {
     419           0 :     LBASSERT( isListening( ));
     420             : 
     421           0 :     ConnectionPtr connection = Connection::create( desc );
     422           0 :     if( connection && connection->listen( ))
     423             :     {
     424           0 :         addListener( connection );
     425           0 :         return connection;
     426             :     }
     427             : 
     428           0 :     return 0;
     429             : }
     430             : 
     431           0 : void LocalNode::addListener( ConnectionPtr connection )
     432             : {
     433           0 :     LBASSERT( isListening( ));
     434           0 :     LBASSERT( connection->isListening( ));
     435           0 :     if( !isListening() || !connection->isListening( ))
     436           0 :         return;
     437             : 
     438           0 :     connection->ref( this ); // unref in self handler
     439             : 
     440             :     // Update everybody's description list of me, add the listener to myself in
     441             :     // my handler
     442           0 :     Nodes nodes;
     443           0 :     getNodes( nodes );
     444             : 
     445           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     446           0 :         (*i)->send( CMD_NODE_ADD_LISTENER )
     447           0 :             << (uint64_t)(connection.get( ))
     448           0 :             << connection->getDescription()->toString();
     449             : }
     450             : 
     451           0 : void LocalNode::removeListeners( const Connections& connections )
     452             : {
     453           0 :     std::vector< lunchbox::Request< void > > requests;
     454           0 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     455             :     {
     456           0 :         ConnectionPtr connection = *i;
     457           0 :         requests.push_back( _removeListener( connection ));
     458           0 :     }
     459           0 : }
     460             : 
     461           0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
     462             : {
     463           0 :     LBASSERT( isListening( ));
     464           0 :     LBASSERTINFO( !conn->isConnected(), conn );
     465             : 
     466           0 :     conn->ref( this );
     467           0 :     const lunchbox::Request< void > request = registerRequest< void >();
     468           0 :     Nodes nodes;
     469           0 :     getNodes( nodes );
     470             : 
     471           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     472           0 :         (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
     473           0 :                                           << conn->getDescription()->toString();
     474           0 :     return request;
     475             : }
     476             : 
     477         145 : void LocalNode::_addConnection( ConnectionPtr connection )
     478             : {
     479         145 :     if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
     480             :     {
     481          33 :         addConnection( connection );
     482         178 :         return;
     483             :     }
     484             : 
     485         112 :     BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
     486         112 :     connection->recvNB( buffer, COMMAND_MINSIZE );
     487         112 :     _impl->incoming.addConnection( connection );
     488             : }
     489             : 
     490         214 : void LocalNode::_removeConnection( ConnectionPtr connection )
     491             : {
     492         214 :     LBASSERT( connection );
     493             : 
     494         214 :     _impl->incoming.removeConnection( connection );
     495         214 :     connection->resetRecvData();
     496         214 :     if( !connection->isClosed( ))
     497         122 :         connection->close(); // cancel pending IO's
     498         214 : }
     499             : 
     500          45 : void LocalNode::_cleanup()
     501             : {
     502          45 :     LBVERB << "Clean up stopped node" << std::endl;
     503          45 :     LBASSERTINFO( isClosed(), *this );
     504             : 
     505          45 :     if( !_impl->connectionNodes.empty( ))
     506         132 :         LBDEBUG << _impl->connectionNodes.size()
     507         132 :                << " open connections during cleanup" << std::endl;
     508             : #ifndef NDEBUG
     509         267 :     for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     510         178 :          i != _impl->connectionNodes.end(); ++i )
     511             :     {
     512          44 :         NodePtr node = i->second;
     513          44 :         LBDEBUG << "    " << i->first << " : " << node << std::endl;
     514          44 :     }
     515             : #endif
     516             : 
     517          45 :     _impl->connectionNodes.clear();
     518             : 
     519          45 :     if( !_impl->nodes->empty( ))
     520           3 :         LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
     521           3 :                << std::endl;
     522             : 
     523             : #ifndef NDEBUG
     524          46 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     525             :     {
     526           1 :         NodePtr node = i->second;
     527           1 :         LBDEBUG << "    " << node << std::endl;
     528           1 :     }
     529             : #endif
     530             : 
     531          45 :     _impl->nodes->clear();
     532          45 : }
     533             : 
     534         110 : void LocalNode::_closeNode( NodePtr node )
     535             : {
     536         110 :     ConnectionPtr connection = node->getConnection();
     537         220 :     ConnectionPtr mcConnection = node->_getMulticast();
     538             : 
     539         110 :     node->_disconnect();
     540             : 
     541         110 :     if( connection )
     542             :     {
     543          66 :         LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
     544             :                       _impl->connectionNodes.end(), connection );
     545             : 
     546          66 :         _removeConnection( connection );
     547          66 :         _impl->connectionNodes.erase( connection );
     548             :     }
     549             : 
     550         110 :     if( mcConnection )
     551             :     {
     552           0 :         _removeConnection( mcConnection );
     553           0 :         _impl->connectionNodes.erase( mcConnection );
     554             :     }
     555             : 
     556         110 :     _impl->objectStore->removeInstanceData( node->getNodeID( ));
     557             : 
     558         220 :     lunchbox::ScopedFastWrite mutex( _impl->nodes );
     559         110 :     _impl->nodes->erase( node->getNodeID( ));
     560         110 :     notifyDisconnect( node );
     561         220 :     LBDEBUG << node << " disconnected from " << *this << std::endl;
     562         110 : }
     563             : 
     564          46 : bool LocalNode::_connectSelf()
     565             : {
     566             :     // setup local connection to myself
     567          46 :     PipeConnectionPtr connection = new PipeConnection;
     568          46 :     if( !connection->connect( ))
     569             :     {
     570           0 :         LBERROR << "Could not create local connection to receiver thread."
     571           0 :                 << std::endl;
     572           0 :         return false;
     573             :     }
     574             : 
     575          46 :     Node::_connect( connection->getSibling( ));
     576          46 :     _setClosed(); // reset state after _connect set it to connected
     577             : 
     578             :     // add to connection set
     579          46 :     LBASSERT( connection->getDescription( ));
     580          46 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
     581             :               _impl->connectionNodes.end( ));
     582             : 
     583          46 :     _impl->connectionNodes[ connection ] = this;
     584          46 :     _impl->nodes.data[ getNodeID() ] = this;
     585          46 :     _addConnection( connection );
     586             : 
     587          92 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     588         138 :            << std::endl;
     589          46 :     return true;
     590             : }
     591             : 
     592           7 : bool LocalNode::disconnect( NodePtr node )
     593             : {
     594           7 :     if( !node || !isListening() )
     595           0 :         return false;
     596             : 
     597           7 :     if( !node->isConnected( ))
     598           0 :         return true;
     599             : 
     600           7 :     LBASSERT( !inCommandThread( ));
     601           7 :     lunchbox::Request< void > request = registerRequest< void >( node.get( ));
     602           7 :     send( CMD_NODE_DISCONNECT ) << request;
     603             : 
     604           7 :     request.wait();
     605           7 :     _impl->objectStore->removeNode( node );
     606           7 :     return true;
     607             : }
     608             : 
     609       20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
     610             : {
     611       20001 :     if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
     612       20001 :         return;
     613             : 
     614       20001 :     if( node == this ) // OPT
     615           0 :         serveRequest( requestID );
     616             :     else
     617       20001 :         node->send( CMD_NODE_ACK_REQUEST ) << requestID;
     618             : }
     619             : 
     620           0 : void LocalNode::ping( NodePtr peer )
     621             : {
     622           0 :     LBASSERT( !_impl->inReceiverThread( ));
     623           0 :     peer->send( CMD_NODE_PING );
     624           0 : }
     625             : 
     626           0 : bool LocalNode::pingIdleNodes()
     627             : {
     628           0 :     LBASSERT( !_impl->inReceiverThread( ) );
     629           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     630           0 :     Nodes nodes;
     631           0 :     getNodes( nodes, false );
     632             : 
     633           0 :     bool pinged = false;
     634           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     635             :     {
     636           0 :         NodePtr node = *i;
     637           0 :         if( getTime64() - node->getLastReceiveTime() > timeout )
     638             :         {
     639           0 :             LBINFO << " Ping Node: " <<  node->getNodeID() << " last seen "
     640           0 :                    << node->getLastReceiveTime() << std::endl;
     641           0 :             node->send( CMD_NODE_PING );
     642           0 :             pinged = true;
     643             :         }
     644           0 :     }
     645           0 :     return pinged;
     646             : }
     647             : 
     648             : //----------------------------------------------------------------------
     649             : // Object functionality
     650             : //----------------------------------------------------------------------
     651           0 : void LocalNode::disableInstanceCache()
     652             : {
     653           0 :     _impl->objectStore->disableInstanceCache();
     654           0 : }
     655             : 
     656           0 : void LocalNode::expireInstanceData( const int64_t age )
     657             : {
     658           0 :     _impl->objectStore->expireInstanceData( age );
     659           0 : }
     660             : 
     661           0 : void LocalNode::enableSendOnRegister()
     662             : {
     663           0 :     _impl->objectStore->enableSendOnRegister();
     664           0 : }
     665             : 
     666           0 : void LocalNode::disableSendOnRegister()
     667             : {
     668           0 :     _impl->objectStore->disableSendOnRegister();
     669           0 : }
     670             : 
     671          20 : bool LocalNode::registerObject( Object* object )
     672             : {
     673          20 :     return _impl->objectStore->register_( object );
     674             : }
     675             : 
     676          20 : void LocalNode::deregisterObject( Object* object )
     677             : {
     678          20 :     _impl->objectStore->deregister( object );
     679          20 : }
     680             : 
     681          39 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
     682             :                                NodePtr master, const uint128_t& version )
     683             : {
     684             :     const uint32_t request = _impl->objectStore->mapNB( object, id, version,
     685          39 :                                                         master );
     686             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
     687          39 :                                                  _impl->objectStore, request );
     688          39 :     return f_bool_t( new FuturebImpl( func ));
     689             : }
     690             : 
     691           0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     692             :                                  const uint128_t& version )
     693             : {
     694           0 :     return _impl->objectStore->mapNB( object, id, version, 0 );
     695             : }
     696             : 
     697           2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     698             :                                  const uint128_t& version, NodePtr master )
     699             : {
     700           2 :     return _impl->objectStore->mapNB( object, id, version, master );
     701             : }
     702             : 
     703             : 
     704           2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
     705             : {
     706           2 :     return _impl->objectStore->mapSync( requestID );
     707             : }
     708             : 
     709           4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
     710             :                                const uint32_t instanceID )
     711             : {
     712           4 :     return _impl->objectStore->sync( object, master, id, instanceID );
     713             : }
     714             : 
     715          41 : void LocalNode::unmapObject( Object* object )
     716             : {
     717          41 :     _impl->objectStore->unmap( object );
     718          41 : }
     719             : 
     720           0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
     721             : {
     722           0 :     _impl->objectStore->swap( oldObject, newObject );
     723           0 : }
     724             : 
     725           0 : void LocalNode::objectPush( const uint128_t& groupID,
     726             :                             const uint128_t& objectType,
     727             :                             const uint128_t& objectID, DataIStream& istream )
     728             : {
     729           0 :     lunchbox::ScopedRead mutex( _impl->pushHandlers );
     730           0 :     HandlerHashCIter i = _impl->pushHandlers->find( groupID );
     731           0 :     if( i != _impl->pushHandlers->end( ))
     732           0 :         i->second( groupID, objectType, objectID, istream );
     733             :     else
     734           0 :         LBWARN << "No custom handler for push group " << groupID
     735           0 :                << " registered" << std::endl;
     736             : 
     737           0 :     if( istream.wasUsed() && istream.hasData( ))
     738           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     739           0 :                << objectType << " object " << objectID << std::endl;
     740           0 : }
     741             : 
     742           0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
     743             :                                      const PushHandler& handler )
     744             : {
     745           0 :     lunchbox::ScopedWrite mutex( _impl->pushHandlers );
     746           0 :     (*_impl->pushHandlers)[ groupID ] = handler;
     747           0 : }
     748             : 
     749           2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
     750             :                                         const CommandHandler& func,
     751             :                                         CommandQueue* queue )
     752             : {
     753           2 :     lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
     754           2 :     if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
     755             :     {
     756           0 :         LBWARN << "Already got a registered handler for custom command "
     757           0 :                << command << std::endl;
     758           0 :         return false;
     759             :     }
     760             : 
     761             :     _impl->commandHandlers->insert( std::make_pair( command,
     762           2 :                                                 std::make_pair( func, queue )));
     763           2 :     return true;
     764             : }
     765             : 
     766           0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
     767             : {
     768           0 :     LBASSERT( !inCommandThread( ));
     769           0 :     LBASSERT( !_impl->inReceiverThread( ));
     770             : 
     771           0 :     lunchbox::Request< void > request = registerRequest< void >();
     772           0 :     node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
     773             : 
     774             :     try
     775             :     {
     776           0 :         request.wait( Global::getTimeout( ));
     777             :     }
     778           0 :     catch( const lunchbox::FutureTimeout& )
     779             :     {
     780           0 :         LBERROR << "Timeout while acquiring send token " << request.getID()
     781           0 :                 << std::endl;
     782           0 :         request.unregister();
     783           0 :         return 0;
     784             :     }
     785           0 :     return new co::SendToken( node );
     786             : }
     787             : 
     788           0 : void LocalNode::releaseSendToken( SendToken token )
     789             : {
     790           0 :     LBASSERT( !_impl->inReceiverThread( ));
     791           0 :     if( token )
     792           0 :         token->release();
     793           0 : }
     794             : 
     795             : //----------------------------------------------------------------------
     796             : // Connecting a node
     797             : //----------------------------------------------------------------------
     798             : namespace
     799             : {
     800             : enum ConnectResult
     801             : {
     802             :     CONNECT_OK,
     803             :     CONNECT_TRY_AGAIN,
     804             :     CONNECT_BAD_STATE,
     805             :     CONNECT_TIMEOUT,
     806             :     CONNECT_UNREACHABLE
     807             : };
     808             : }
     809             : 
     810          72 : NodePtr LocalNode::connect( const NodeID& nodeID )
     811             : {
     812          72 :     LBASSERT( nodeID != 0 );
     813          73 :     LBASSERT( isListening( ));
     814             : 
     815             :     // Make sure that only one connection request based on the node identifier
     816             :     // is pending at a given time. Otherwise a node with the same id might be
     817             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     818             :     // mutex is to register connecting nodes with this local node, and handle
     819             :     // all cases correctly, which is far more complex. Node connections only
     820             :     // happen a lot during initialization, and are therefore not time-critical.
     821          73 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     822             : 
     823         146 :     Nodes nodes;
     824          73 :     getNodes( nodes );
     825             : 
     826          76 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     827             :     {
     828          76 :         NodePtr peer = *i;
     829          76 :         if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
     830          73 :             return peer;
     831           3 :     }
     832             : 
     833           0 :     LBDEBUG << "Connecting node " << nodeID << std::endl;
     834           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     835             :     {
     836           0 :         NodePtr peer = *i;
     837           0 :         NodePtr node = _connect( nodeID, peer );
     838           0 :         if( node )
     839           0 :             return node;
     840           0 :     }
     841             : 
     842           0 :     NodePtr node = _connectFromZeroconf( nodeID );
     843           0 :     if( node )
     844           0 :         return node;
     845             : 
     846             :     // check again if node connected by itself by now
     847           0 :     nodes.clear();
     848           0 :     getNodes( nodes );
     849           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     850             :     {
     851           0 :         node = *i;
     852           0 :         if( node->getNodeID() == nodeID && node->isReachable( ))
     853           0 :             return node;
     854             :     }
     855             : 
     856           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     857          73 :     return 0;
     858             : }
     859             : 
     860           0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
     861             : {
     862           0 :     LBASSERT( nodeID != 0 );
     863             : 
     864           0 :     NodePtr node;
     865             :     {
     866           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     867           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     868           0 :         if( i != _impl->nodes->end( ))
     869           0 :             node = i->second;
     870             :     }
     871             : 
     872           0 :     LBASSERT( getNodeID() != nodeID );
     873           0 :     if( !node )
     874             :     {
     875           0 :         lunchbox::Request< void* > request = registerRequest< void* >();
     876           0 :         peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
     877           0 :         node = reinterpret_cast< Node* >( request.wait( ));
     878           0 :         if( !node )
     879             :         {
     880           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     881           0 :                    << std::endl;
     882           0 :             return 0;
     883             :         }
     884           0 :         node->unref( this ); // ref'd before serveRequest()
     885             :     }
     886             : 
     887           0 :     if( node->isReachable( ))
     888           0 :         return node;
     889             : 
     890           0 :     size_t tries = 10;
     891           0 :     while( --tries )
     892             :     {
     893           0 :         switch( _connect( node ))
     894             :         {
     895             :           case CONNECT_OK:
     896           0 :               return node;
     897             :           case CONNECT_TRY_AGAIN:
     898             :           {
     899           0 :               lunchbox::RNG rng;
     900             :               // collision avoidance
     901           0 :               lunchbox::sleep( rng.get< uint8_t >( ));
     902           0 :               break;
     903             :           }
     904             :           case CONNECT_BAD_STATE:
     905           0 :               LBWARN << "Internal connect error" << std::endl;
     906             :               // no break;
     907             :           case CONNECT_TIMEOUT:
     908           0 :               return 0;
     909             : 
     910             :           case CONNECT_UNREACHABLE:
     911           0 :               break; // maybe peer talks to us
     912             :         }
     913             : 
     914           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     915             :         // connect failed - check for simultaneous connect from peer
     916           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     917           0 :         if( i != _impl->nodes->end( ))
     918           0 :             node = i->second;
     919           0 :     }
     920             : 
     921           0 :     return node->isReachable() ? node : 0;
     922             : }
     923             : 
     924           0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
     925             : {
     926           0 :     lunchbox::ScopedWrite mutex( _impl->service );
     927             : 
     928             :     const Strings& instances =
     929           0 :         _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
     930           0 :     for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
     931             :     {
     932           0 :         const std::string& instance = *i;
     933           0 :         const NodeID candidate( instance );
     934           0 :         if( candidate != nodeID )
     935           0 :             continue;
     936             : 
     937           0 :         const std::string& typeStr = _impl->service->get( instance, "co_type" );
     938           0 :         if( typeStr.empty( ))
     939           0 :             return 0;
     940             : 
     941           0 :         std::istringstream in( typeStr );
     942           0 :         uint32_t type = 0;
     943           0 :         in >> type;
     944             : 
     945           0 :         NodePtr node = createNode( type );
     946           0 :         if( !node )
     947             :         {
     948           0 :             LBINFO << "Can't create node of type " << type << std::endl;
     949           0 :             continue;
     950             :         }
     951             : 
     952             :         const std::string& numStr = _impl->service->get( instance,
     953           0 :                                                          "co_numPorts" );
     954           0 :         uint32_t num = 0;
     955             : 
     956           0 :         in.clear();
     957           0 :         in.str( numStr );
     958           0 :         in >> num;
     959           0 :         LBASSERT( num > 0 );
     960           0 :         for( size_t j = 0; j < num; ++j )
     961             :         {
     962           0 :             ConnectionDescriptionPtr desc = new ConnectionDescription;
     963           0 :             std::ostringstream out;
     964           0 :             out << "co_port" << j;
     965             : 
     966           0 :             std::string descStr = _impl->service->get( instance, out.str( ));
     967           0 :             LBASSERT( !descStr.empty( ));
     968           0 :             LBCHECK( desc->fromString( descStr ));
     969           0 :             LBASSERT( descStr.empty( ));
     970           0 :             node->addConnectionDescription( desc );
     971           0 :         }
     972           0 :         mutex.leave();
     973           0 :         if( _connect( node ))
     974           0 :             return node;
     975           0 :     }
     976           0 :     return 0;
     977             : }
     978             : 
     979          33 : bool LocalNode::connect( NodePtr node )
     980             : {
     981          33 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     982          33 :     return ( _connect( node ) == CONNECT_OK );
     983             : }
     984             : 
     985          33 : uint32_t LocalNode::_connect( NodePtr node )
     986             : {
     987          33 :     LBASSERTINFO( isListening(), *this );
     988          33 :     if( node->isReachable( ))
     989           0 :         return CONNECT_OK;
     990             : 
     991          33 :     LBASSERT( node->isClosed( ));
     992          33 :     LBDEBUG << "Connecting " << node << std::endl;
     993             : 
     994             :     // try connecting using the given descriptions
     995          33 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
     996          99 :     for( ConnectionDescriptionsCIter i = cds.begin();
     997          66 :         i != cds.end(); ++i )
     998             :     {
     999          33 :         ConnectionDescriptionPtr description = *i;
    1000          33 :         if( description->type >= CONNECTIONTYPE_MULTICAST )
    1001           0 :             continue; // Don't use multicast for primary connections
    1002             : 
    1003          33 :         ConnectionPtr connection = Connection::create( description );
    1004          33 :         if( !connection || !connection->connect( ))
    1005           0 :             continue;
    1006             : 
    1007          33 :         return _connect( node, connection );
    1008           0 :     }
    1009             : 
    1010           0 :     LBDEBUG << "Node " << node
    1011           0 :            << " unreachable, all connections failed to connect" <<std::endl;
    1012           0 :     return CONNECT_UNREACHABLE;
    1013             : }
    1014             : 
    1015           0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
    1016             : {
    1017           0 :     return ( _connect( node, connection ) == CONNECT_OK );
    1018             : }
    1019             : 
    1020          33 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
    1021             : {
    1022          33 :     LBASSERT( connection );
    1023          33 :     LBASSERT( node->getNodeID() != getNodeID( ));
    1024             : 
    1025          66 :     if( !node || !isListening() || !connection->isConnected() ||
    1026          33 :         !node->isClosed( ))
    1027             :     {
    1028           0 :         return CONNECT_BAD_STATE;
    1029             :     }
    1030             : 
    1031          33 :     _addConnection( connection );
    1032             : 
    1033             :     // send connect command to peer
    1034          33 :     lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
    1035             : #ifdef COMMON_BIGENDIAN
    1036             :     uint32_t cmd = CMD_NODE_CONNECT_BE;
    1037             :     lunchbox::byteswap( cmd );
    1038             : #else
    1039          33 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1040             : #endif
    1041             :     OCommand( Connections( 1, connection ), cmd )
    1042          33 :         << getNodeID() << request << getType() << serialize();
    1043             : 
    1044          33 :     bool connected = false;
    1045             :     try
    1046             :     {
    1047          33 :         connected = request.wait( 10000 /*ms*/ );
    1048             :     }
    1049           0 :     catch( const lunchbox::FutureTimeout& )
    1050             :     {
    1051           0 :         LBWARN << "Node connection handshake timeout - " << node
    1052           0 :                << " not a Collage node?" << std::endl;
    1053           0 :         request.unregister();
    1054           0 :         return CONNECT_TIMEOUT;
    1055             :     }
    1056             : 
    1057             :     // In simultaneous connect case, depending on the connection type
    1058             :     // (e.g. RDMA), a check on the connection state of the node is required
    1059          33 :     if( !connected || !node->isConnected( ))
    1060           0 :         return CONNECT_TRY_AGAIN;
    1061             : 
    1062          33 :     LBASSERT( node->getNodeID() != 0 );
    1063          33 :     LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
    1064          33 :     LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
    1065          33 :     return CONNECT_OK;
    1066             : }
    1067             : 
    1068          39 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
    1069             : {
    1070          39 :     LBASSERTINFO( id.isUUID(), id );
    1071          39 :     if( !id.isUUID( ))
    1072             :     {
    1073           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1074           0 :         return 0;
    1075             :     }
    1076             : 
    1077          39 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
    1078          39 :     if( masterNodeID == 0 )
    1079             :     {
    1080           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1081           0 :         return 0;
    1082             :     }
    1083             : 
    1084          39 :     NodePtr master = connect( masterNodeID );
    1085          39 :     if( master && !master->isClosed( ))
    1086          39 :         return master;
    1087             : 
    1088           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1089           0 :            << " for object " << id << std::endl;
    1090           0 :     return 0;
    1091             : }
    1092             : 
    1093          33 : NodePtr LocalNode::createNode( const uint32_t type )
    1094             : {
    1095          33 :     LBASSERTINFO( type == NODETYPE_NODE, type );
    1096          33 :     return new Node( type );
    1097             : }
    1098             : 
    1099           0 : NodePtr LocalNode::getNode( const NodeID& id ) const
    1100             : {
    1101           0 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1102           0 :     NodeHash::const_iterator i = _impl->nodes->find( id );
    1103           0 :     if( i == _impl->nodes->end() || !i->second->isReachable( ))
    1104           0 :         return 0;
    1105           0 :     return i->second;
    1106             : }
    1107             : 
    1108         112 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
    1109             : {
    1110         112 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1111         328 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1112             :     {
    1113         216 :         NodePtr node = i->second;
    1114         216 :         if( node->isReachable() && ( addSelf || node != this ))
    1115         216 :             nodes.push_back( i->second );
    1116         328 :     }
    1117         112 : }
    1118             : 
    1119         138 : CommandQueue* LocalNode::getCommandThreadQueue()
    1120             : {
    1121         138 :     return _impl->commandThread->getWorkerQueue();
    1122             : }
    1123             : 
    1124           7 : bool LocalNode::inCommandThread() const
    1125             : {
    1126           7 :     return _impl->commandThread->isCurrent();
    1127             : }
    1128             : 
    1129       72410 : int64_t LocalNode::getTime64() const
    1130             : {
    1131       72410 :     return _impl->clock.getTime64();
    1132             : }
    1133             : 
    1134           0 : ssize_t LocalNode::getCounter( const Counter counter ) const
    1135             : {
    1136           0 :     return _impl->counters[ counter ];
    1137             : }
    1138             : 
    1139         193 : void LocalNode::flushCommands()
    1140             : {
    1141         193 :     _impl->incoming.interrupt();
    1142         193 : }
    1143             : 
    1144             : //----------------------------------------------------------------------
    1145             : // receiver thread functions
    1146             : //----------------------------------------------------------------------
    1147          46 : void LocalNode::_runReceiverThread()
    1148             : {
    1149          46 :     LB_TS_THREAD( _rcvThread );
    1150          46 :     _initService();
    1151             : 
    1152          46 :     int nErrors = 0;
    1153       72611 :     while( isListening( ))
    1154             :     {
    1155       72520 :         const ConnectionSet::Event result = _impl->incoming.select();
    1156       72519 :         switch( result )
    1157             :         {
    1158             :             case ConnectionSet::EVENT_CONNECT:
    1159          33 :                 _handleConnect();
    1160          33 :                 break;
    1161             : 
    1162             :             case ConnectionSet::EVENT_DATA:
    1163       72157 :                 _handleData();
    1164       72157 :                 break;
    1165             : 
    1166             :             case ConnectionSet::EVENT_DISCONNECT:
    1167             :             case ConnectionSet::EVENT_INVALID_HANDLE:
    1168          33 :                 _handleDisconnect();
    1169          33 :                 break;
    1170             : 
    1171             :             case ConnectionSet::EVENT_TIMEOUT:
    1172           0 :                 LBINFO << "select timeout" << std::endl;
    1173           0 :                 break;
    1174             : 
    1175             :             case ConnectionSet::EVENT_ERROR:
    1176           0 :                 ++nErrors;
    1177           0 :                 LBWARN << "Connection error during select" << std::endl;
    1178           0 :                 if( nErrors > 100 )
    1179             :                 {
    1180           0 :                     LBWARN << "Too many errors in a row, capping connection"
    1181           0 :                            << std::endl;
    1182           0 :                     _handleDisconnect();
    1183             :                 }
    1184           0 :                 break;
    1185             : 
    1186             :             case ConnectionSet::EVENT_SELECT_ERROR:
    1187           0 :                 LBWARN << "Error during select" << std::endl;
    1188           0 :                 ++nErrors;
    1189           0 :                 if( nErrors > 10 )
    1190             :                 {
    1191           0 :                     LBWARN << "Too many errors in a row" << std::endl;
    1192           0 :                     LBUNIMPLEMENTED;
    1193             :                 }
    1194           0 :                 break;
    1195             : 
    1196             :             case ConnectionSet::EVENT_INTERRUPT:
    1197         296 :                 _redispatchCommands();
    1198         296 :                 break;
    1199             : 
    1200             :             default:
    1201           0 :                 LBUNIMPLEMENTED;
    1202             :         }
    1203       72519 :         if( result != ConnectionSet::EVENT_ERROR &&
    1204             :             result != ConnectionSet::EVENT_SELECT_ERROR )
    1205             :         {
    1206       72519 :             nErrors = 0;
    1207             :         }
    1208             :     }
    1209             : 
    1210          45 :     if( !_impl->pendingCommands.empty( ))
    1211           0 :         LBWARN << _impl->pendingCommands.size()
    1212           0 :                << " commands pending while leaving command thread" << std::endl;
    1213             : 
    1214          45 :     _impl->pendingCommands.clear();
    1215          45 :     LBCHECK( _impl->commandThread->join( ));
    1216             : 
    1217          45 :     ConnectionPtr connection = getConnection();
    1218          90 :     PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
    1219          45 :     connection = pipe->getSibling();
    1220          45 :     _removeConnection( connection );
    1221          45 :     _impl->connectionNodes.erase( connection );
    1222          45 :     _disconnect();
    1223             : 
    1224          45 :     const Connections& connections = _impl->incoming.getConnections();
    1225         160 :     while( !connections.empty( ))
    1226             :     {
    1227          70 :         connection = connections.back();
    1228          70 :         NodePtr node = _impl->connectionNodes[ connection ];
    1229             : 
    1230          70 :         if( node )
    1231          70 :             _closeNode( node );
    1232          70 :         _removeConnection( connection );
    1233          70 :     }
    1234             : 
    1235          45 :     _impl->objectStore->clear();
    1236          45 :     _impl->pendingCommands.clear();
    1237          45 :     _impl->smallBuffers.flush();
    1238          45 :     _impl->bigBuffers.flush();
    1239             : 
    1240         225 :     LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
    1241         225 :            << std::endl;
    1242          45 : }
    1243             : 
    1244          33 : void LocalNode::_handleConnect()
    1245             : {
    1246          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1247          66 :     ConnectionPtr newConn = connection->acceptSync();
    1248          33 :     connection->acceptNB();
    1249             : 
    1250          33 :     if( newConn )
    1251          33 :         _addConnection( newConn );
    1252             :     else
    1253          33 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1254          33 : }
    1255             : 
    1256          33 : void LocalNode::_handleDisconnect()
    1257             : {
    1258          33 :     while( _handleData( )) ; // read remaining data off connection
    1259             : 
    1260          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1261          33 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
    1262             : 
    1263          33 :     if( i != _impl->connectionNodes.end( ))
    1264             :     {
    1265          33 :         NodePtr node = i->second;
    1266             : 
    1267          33 :         node->ref(); // extend lifetime to give cmd handler a chance
    1268             : 
    1269             :         // local command dispatching
    1270             :         OCommand( this, this, CMD_NODE_REMOVE_NODE )
    1271          33 :                 << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
    1272             : 
    1273          33 :         if( node->getConnection() == connection )
    1274          33 :             _closeNode( node );
    1275           0 :         else if( connection->isMulticast( ))
    1276           0 :             node->_removeMulticast( connection );
    1277             :     }
    1278             : 
    1279          33 :     _removeConnection( connection );
    1280          33 : }
    1281             : 
    1282       72190 : bool LocalNode::_handleData()
    1283             : {
    1284       72190 :     _impl->smallBuffers.compact();
    1285       72190 :     _impl->bigBuffers.compact();
    1286             : 
    1287       72190 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1288       72190 :     LBASSERT( connection );
    1289             : 
    1290      144380 :     BufferPtr buffer = _readHead( connection );
    1291       72190 :     if( !buffer ) // fluke signal
    1292          66 :         return false;
    1293             : 
    1294      144248 :     ICommand command = _setupCommand( connection, buffer );
    1295       72124 :     const bool gotCommand = _readTail( command, buffer, connection );
    1296       72124 :     LBASSERT( gotCommand );
    1297             : 
    1298             :     // start next receive
    1299      144248 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1300       72124 :     connection->recvNB( nextBuffer, COMMAND_MINSIZE );
    1301             : 
    1302       72124 :     if( gotCommand )
    1303             :     {
    1304       72124 :         _dispatchCommand( command );
    1305       72124 :         return true;
    1306             :     }
    1307             : 
    1308           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1309       72190 :     return false;
    1310             : }
    1311             : 
    1312       72190 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
    1313             : {
    1314       72190 :     BufferPtr buffer;
    1315       72190 :     const bool gotSize = connection->recvSync( buffer, false );
    1316             : 
    1317       72190 :     if( !buffer ) // fluke signal
    1318             :     {
    1319           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1320           0 :                << std::endl;
    1321           0 :         _impl->incoming.setDirty();
    1322           0 :         return 0;
    1323             :     }
    1324             : 
    1325       72190 :     if( gotSize )
    1326       72124 :         return buffer;
    1327             : 
    1328             :     // Some systems signal data on dead connections.
    1329          66 :     buffer->setSize( 0 );
    1330          66 :     connection->recvNB( buffer, COMMAND_MINSIZE );
    1331          66 :     return 0;
    1332             : }
    1333             : 
    1334       72124 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
    1335             :                                    ConstBufferPtr buffer )
    1336             : {
    1337       72124 :     NodePtr node;
    1338       72124 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
    1339       72124 :     if( i != _impl->connectionNodes.end( ))
    1340       72058 :         node = i->second;
    1341       72124 :     LBVERB << "Handle data from " << node << std::endl;
    1342             : 
    1343             : #ifdef COMMON_BIGENDIAN
    1344             :     const bool swapping = node ? !node->isBigEndian() : false;
    1345             : #else
    1346       72124 :     const bool swapping = node ? node->isBigEndian() : false;
    1347             : #endif
    1348      144248 :     ICommand command( this, node, buffer, swapping );
    1349             : 
    1350       72124 :     if( node )
    1351             :     {
    1352       72058 :         node->_setLastReceive( getTime64( ));
    1353       72058 :         return command;
    1354             :     }
    1355             : 
    1356          66 :     uint32_t cmd = command.getCommand();
    1357             : #ifdef COMMON_BIGENDIAN
    1358             :     lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
    1359             : #endif
    1360          66 :     switch( cmd )
    1361             :     {
    1362             :     case CMD_NODE_CONNECT:
    1363             :     case CMD_NODE_CONNECT_REPLY:
    1364             :     case CMD_NODE_ID:
    1365             : #ifdef COMMON_BIGENDIAN
    1366             :         command = ICommand( this, node, buffer, true );
    1367             : #endif
    1368          66 :         break;
    1369             : 
    1370             :     case CMD_NODE_CONNECT_BE:
    1371             :     case CMD_NODE_CONNECT_REPLY_BE:
    1372             :     case CMD_NODE_ID_BE:
    1373             : #ifndef COMMON_BIGENDIAN
    1374           0 :         command = ICommand( this, node, buffer, true );
    1375             : #endif
    1376           0 :         break;
    1377             : 
    1378             :     default:
    1379           0 :         LBUNIMPLEMENTED;
    1380           0 :         return ICommand();
    1381             :     }
    1382             : 
    1383          66 :     command.setCommand( cmd ); // reset correctly swapped version
    1384       72190 :     return command;
    1385             : }
    1386             : 
    1387       72124 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
    1388             :                            ConnectionPtr connection )
    1389             : {
    1390       72124 :     const uint64_t needed = command.getSize();
    1391       72124 :     if( needed <= buffer->getSize( ))
    1392       62113 :         return true;
    1393             : 
    1394       10011 :     if( needed > buffer->getMaxSize( ))
    1395             :     {
    1396           0 :         LBASSERT( needed > COMMAND_ALLOCSIZE );
    1397           0 :         LBASSERTINFO( needed < LB_BIT48,
    1398             :                       "Out-of-sync network stream: " << command << "?" );
    1399             :         // not enough space for remaining data, alloc and copy to new buffer
    1400           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
    1401           0 :         newBuffer->replace( *buffer );
    1402           0 :         buffer = newBuffer;
    1403             : 
    1404           0 :         command = ICommand( this, command.getRemoteNode(), buffer,
    1405           0 :                             command.isSwapping( ));
    1406             :     }
    1407             : 
    1408             :     // read remaining data
    1409       10011 :     connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
    1410       10011 :     return connection->recvSync( buffer );
    1411             : }
    1412             : 
    1413          34 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
    1414             : {
    1415          34 :     LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
    1416             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
    1417             :         _impl->bigBuffers.alloc( size ) :
    1418          34 :         _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1419          34 :     return buffer;
    1420             : }
    1421             : 
    1422       72169 : void LocalNode::_dispatchCommand( ICommand& command )
    1423             : {
    1424       72169 :     LBASSERTINFO( command.isValid(), command );
    1425             : 
    1426       72169 :     if( dispatchCommand( command ))
    1427       72169 :         _redispatchCommands();
    1428             :     else
    1429             :     {
    1430           0 :         _redispatchCommands();
    1431           0 :         _impl->pendingCommands.push_back( command );
    1432             :     }
    1433       72169 : }
    1434             : 
    1435       72205 : bool LocalNode::dispatchCommand( ICommand& command )
    1436             : {
    1437       72205 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1438       72205 :     LBASSERTINFO( command.isValid(), command );
    1439             : 
    1440       72205 :     const uint32_t type = command.getType();
    1441       72205 :     switch( type )
    1442             :     {
    1443             :         case COMMANDTYPE_NODE:
    1444       71697 :             LBCHECK( Dispatcher::dispatchCommand( command ));
    1445       71697 :             return true;
    1446             : 
    1447             :         case COMMANDTYPE_OBJECT:
    1448         508 :             return _impl->objectStore->dispatchObjectCommand( command );
    1449             : 
    1450             :         default:
    1451           0 :             LBABORT( "Unknown command type " << type << " for " << command );
    1452           0 :             return true;
    1453             :     }
    1454             : }
    1455             : 
    1456       72465 : void LocalNode::_redispatchCommands()
    1457             : {
    1458       72465 :     bool changes = true;
    1459      144930 :     while( changes && !_impl->pendingCommands.empty( ))
    1460             :     {
    1461           0 :         changes = false;
    1462             : 
    1463           0 :         for( CommandList::iterator i = _impl->pendingCommands.begin();
    1464           0 :              i != _impl->pendingCommands.end(); ++i )
    1465             :         {
    1466           0 :             ICommand& command = *i;
    1467           0 :             LBASSERT( command.isValid( ));
    1468             : 
    1469           0 :             if( dispatchCommand( command ))
    1470             :             {
    1471           0 :                 _impl->pendingCommands.erase( i );
    1472           0 :                 changes = true;
    1473           0 :                 break;
    1474             :             }
    1475             :         }
    1476             :     }
    1477             : 
    1478             : #ifndef NDEBUG
    1479       72465 :     if( !_impl->pendingCommands.empty( ))
    1480           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1481           0 :                << std::endl;
    1482       72465 :     LBASSERT( _impl->pendingCommands.size() < 200 );
    1483             : #endif
    1484       72465 : }
    1485             : 
    1486          46 : void LocalNode::_initService()
    1487             : {
    1488          46 :     LB_TS_SCOPED( _rcvThread );
    1489          46 :     _impl->service->withdraw(); // go silent during k/v update
    1490             : 
    1491          90 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1492          46 :     if( descs.empty( ))
    1493          48 :         return;
    1494             : 
    1495          88 :     std::ostringstream out;
    1496          44 :     out << getType();
    1497          44 :     _impl->service->set( "co_type", out.str( ));
    1498             : 
    1499          44 :     out.str("");
    1500          44 :     out << descs.size();
    1501          44 :     _impl->service->set( "co_numPorts", out.str( ));
    1502             : 
    1503          88 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
    1504             :     {
    1505          44 :         ConnectionDescriptionPtr desc = *i;
    1506          44 :         out.str("");
    1507          44 :         out << "co_port" << i - descs.begin();
    1508          44 :         _impl->service->set( out.str(), desc->toString( ));
    1509          44 :     }
    1510             : 
    1511          88 :     _impl->service->announce( descs.front()->port, getNodeID().getString( ));
    1512             : }
    1513             : 
    1514          45 : void LocalNode::_exitService()
    1515             : {
    1516          45 :     _impl->service->withdraw();
    1517          45 : }
    1518             : 
    1519           0 : Zeroconf LocalNode::getZeroconf()
    1520             : {
    1521           0 :     lunchbox::ScopedWrite mutex( _impl->service );
    1522           0 :     _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
    1523           0 :     return Zeroconf( _impl->service.data );
    1524             : }
    1525             : 
    1526             : 
    1527             : //----------------------------------------------------------------------
    1528             : // command thread functions
    1529             : //----------------------------------------------------------------------
    1530          46 : bool LocalNode::_startCommandThread( const int32_t threadID )
    1531             : {
    1532          46 :     _impl->commandThread->threadID = threadID;
    1533          46 :     return _impl->commandThread->start();
    1534             : }
    1535             : 
    1536       51173 : bool LocalNode::_notifyCommandThreadIdle()
    1537             : {
    1538       51173 :     return _impl->objectStore->notifyCommandThreadIdle();
    1539             : }
    1540             : 
    1541       20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
    1542             : {
    1543       20001 :     const uint32_t requestID = command.get< uint32_t >();
    1544       20001 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
    1545             : 
    1546       20001 :     serveRequest( requestID );
    1547       20001 :     return true;
    1548             : }
    1549             : 
    1550          45 : bool LocalNode::_cmdStopRcv( ICommand& command )
    1551             : {
    1552          45 :     LB_TS_THREAD( _rcvThread );
    1553          45 :     LBASSERT( isListening( ));
    1554             : 
    1555          45 :     _exitService();
    1556          45 :     _setClosing(); // causes rcv thread exit
    1557             : 
    1558          45 :     command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
    1559          45 :     _dispatchCommand( command );
    1560          45 :     return true;
    1561             : }
    1562             : 
    1563          45 : bool LocalNode::_cmdStopCmd( ICommand& )
    1564             : {
    1565          45 :     LB_TS_THREAD( _cmdThread );
    1566          45 :     LBASSERTINFO( isClosing(), *this );
    1567             : 
    1568          45 :     _setClosed();
    1569          45 :     return true;
    1570             : }
    1571             : 
    1572           0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
    1573             : {
    1574           0 :     const int32_t affinity = command.get< int32_t >();
    1575             : 
    1576           0 :     lunchbox::Thread::setAffinity( affinity );
    1577           0 :     return true;
    1578             : }
    1579             : 
    1580          33 : bool LocalNode::_cmdConnect( ICommand& command )
    1581             : {
    1582          33 :     LBASSERTINFO( !command.getRemoteNode(), command );
    1583          33 :     LBASSERT( _impl->inReceiverThread( ));
    1584             : 
    1585          33 :     const NodeID& nodeID = command.get< NodeID >();
    1586          33 :     const uint32_t requestID = command.get< uint32_t >();
    1587          33 :     const uint32_t nodeType = command.get< uint32_t >();
    1588          33 :     std::string data = command.get< std::string >();
    1589             : 
    1590          33 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1591          33 :            << nodeType << " data " << data << std::endl;
    1592             : 
    1593          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1594             : 
    1595          33 :     LBASSERT( connection );
    1596          33 :     LBASSERT( nodeID != getNodeID() );
    1597          33 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1598             :               _impl->connectionNodes.end( ));
    1599             : 
    1600          66 :     NodePtr peer;
    1601             : #ifdef COMMON_BIGENDIAN
    1602             :     uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
    1603             :     lunchbox::byteswap( cmd );
    1604             : #else
    1605          33 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1606             : #endif
    1607             : 
    1608             :     // No locking needed, only recv thread modifies
    1609          33 :     NodeHashCIter i = _impl->nodes->find( nodeID );
    1610          33 :     if( i != _impl->nodes->end( ))
    1611             :     {
    1612           0 :         peer = i->second;
    1613           0 :         if( peer->isReachable( ))
    1614             :         {
    1615             :             // Node exists, probably simultaneous connect from peer
    1616           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1617           0 :                    << std::endl;
    1618             : 
    1619             :             // refuse connection
    1620             :             OCommand( Connections( 1, connection ), cmd )
    1621           0 :                 << NodeID() << requestID;
    1622             : 
    1623             :             // NOTE: There is no close() here. The reply command above has to be
    1624             :             // received by the peer first, before closing the connection.
    1625           0 :             _removeConnection( connection );
    1626           0 :             return true;
    1627             :         }
    1628             :     }
    1629             : 
    1630             :     // create and add connected node
    1631          33 :     if( !peer )
    1632          33 :         peer = createNode( nodeType );
    1633          33 :     if( !peer )
    1634             :     {
    1635           0 :         LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
    1636           0 :                << std::endl;
    1637             : 
    1638             :         // refuse connection
    1639           0 :         OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
    1640             : 
    1641             :         // NOTE: There is no close() here. The reply command above has to be
    1642             :         // received by the peer first, before closing the connection.
    1643           0 :         _removeConnection( connection );
    1644           0 :         return true;
    1645             :     }
    1646             : 
    1647          33 :     if( !peer->deserialize( data ))
    1648           0 :         LBWARN << "Error during node initialization" << std::endl;
    1649          33 :     LBASSERTINFO( data.empty(), data );
    1650          33 :     LBASSERTINFO( peer->getNodeID() == nodeID,
    1651             :                   peer->getNodeID() << "!=" << nodeID );
    1652          33 :     LBASSERT( peer->getType() == nodeType );
    1653             : 
    1654          33 :     _impl->connectionNodes[ connection ] = peer;
    1655             :     {
    1656          33 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1657          33 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1658             :     }
    1659          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1660             : 
    1661             :     // send our information as reply
    1662             :     OCommand( Connections( 1, connection ), cmd )
    1663          33 :         << getNodeID() << requestID << getType() << serialize();
    1664             : 
    1665          66 :     return true;
    1666             : }
    1667             : 
    1668          33 : bool LocalNode::_cmdConnectReply( ICommand& command )
    1669             : {
    1670          33 :     LBASSERT( !command.getRemoteNode( ));
    1671          33 :     LBASSERT( _impl->inReceiverThread( ));
    1672             : 
    1673          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1674          33 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1675             :               _impl->connectionNodes.end( ));
    1676             : 
    1677          33 :     const NodeID& nodeID = command.get< NodeID >();
    1678          33 :     const uint32_t requestID = command.get< uint32_t >();
    1679             : 
    1680             :     // connection refused
    1681          33 :     if( nodeID == 0 )
    1682             :     {
    1683           0 :         LBINFO << "Connection refused, node already connected by peer"
    1684           0 :                << std::endl;
    1685             : 
    1686           0 :         _removeConnection( connection );
    1687           0 :         serveRequest( requestID, false );
    1688           0 :         return true;
    1689             :     }
    1690             : 
    1691          33 :     const uint32_t nodeType = command.get< uint32_t >();
    1692          66 :     std::string data = command.get< std::string >();
    1693             : 
    1694          33 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1695          33 :            << " type " << nodeType << " data " << data << std::endl;
    1696             : 
    1697             :     // No locking needed, only recv thread modifies
    1698          33 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1699          66 :     NodePtr peer;
    1700          33 :     if( i != _impl->nodes->end( ))
    1701           0 :         peer = i->second;
    1702             : 
    1703          33 :     if( peer && peer->isReachable( )) // simultaneous connect
    1704             :     {
    1705           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1706           0 :                << connection << std::endl;
    1707             : 
    1708           0 :         _removeConnection( connection );
    1709           0 :         _closeNode( peer );
    1710           0 :         serveRequest( requestID, false );
    1711           0 :         return true;
    1712             :     }
    1713             : 
    1714             :     // create and add node
    1715          33 :     if( !peer )
    1716             :     {
    1717          33 :         if( requestID != LB_UNDEFINED_UINT32 )
    1718          33 :             peer = reinterpret_cast< Node* >( getRequestData( requestID ));
    1719             :         else
    1720           0 :             peer = createNode( nodeType );
    1721             :     }
    1722          33 :     if( !peer )
    1723             :     {
    1724           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1725           0 :                << std::endl;
    1726           0 :         _removeConnection( connection );
    1727           0 :         return true;
    1728             :     }
    1729             : 
    1730          33 :     LBASSERTINFO( peer->getType() == nodeType,
    1731             :                   peer->getType() << " != " << nodeType );
    1732          33 :     LBASSERT( peer->isClosed( ));
    1733             : 
    1734          33 :     if( !peer->deserialize( data ))
    1735           0 :         LBWARN << "Error during node initialization" << std::endl;
    1736          33 :     LBASSERT( data.empty( ));
    1737          33 :     LBASSERT( peer->getNodeID() == nodeID );
    1738             : 
    1739             :     // send ACK to peer
    1740             :     // cppcheck-suppress unusedScopedObject
    1741          33 :     OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
    1742             : 
    1743          33 :     peer->_connect( connection );
    1744          33 :     _impl->connectionNodes[ connection ] = peer;
    1745             :     {
    1746          33 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1747          33 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1748             :     }
    1749          33 :     _connectMulticast( peer );
    1750          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1751             : 
    1752          33 :     serveRequest( requestID, true );
    1753             : 
    1754          33 :     notifyConnect( peer );
    1755          66 :     return true;
    1756             : }
    1757             : 
    1758          33 : bool LocalNode::_cmdConnectAck( ICommand& command )
    1759             : {
    1760          33 :     NodePtr node = command.getRemoteNode();
    1761          33 :     LBASSERT( node );
    1762          33 :     LBASSERT( _impl->inReceiverThread( ));
    1763          33 :     LBVERB << "handle connect ack" << std::endl;
    1764             : 
    1765          33 :     node->_connect( _impl->incoming.getConnection( ));
    1766          33 :     _connectMulticast( node );
    1767          33 :     notifyConnect( node );
    1768          33 :     return true;
    1769             : }
    1770             : 
    1771           0 : bool LocalNode::_cmdID( ICommand& command )
    1772             : {
    1773           0 :     LBASSERT( _impl->inReceiverThread( ));
    1774             : 
    1775           0 :     const NodeID& nodeID = command.get< NodeID >();
    1776           0 :     uint32_t nodeType = command.get< uint32_t >();
    1777           0 :     std::string data = command.get< std::string >();
    1778             : 
    1779           0 :     if( command.getRemoteNode( ))
    1780             :     {
    1781           0 :         LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
    1782           0 :         LBASSERT( command.getRemoteNode()->_getMulticast( ));
    1783           0 :         return true;
    1784             :     }
    1785             : 
    1786           0 :     LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
    1787             : 
    1788           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1789           0 :     LBASSERT( connection->isMulticast( ));
    1790           0 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1791             :               _impl->connectionNodes.end( ));
    1792             : 
    1793           0 :     NodePtr node;
    1794           0 :     if( nodeID == getNodeID() ) // 'self' multicast connection
    1795           0 :         node = this;
    1796             :     else
    1797             :     {
    1798             :         // No locking needed, only recv thread writes
    1799           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1800             : 
    1801           0 :         if( i == _impl->nodes->end( ))
    1802             :         {
    1803             :             // unknown node: create and add unconnected node
    1804           0 :             node = createNode( nodeType );
    1805             : 
    1806           0 :             if( !node->deserialize( data ))
    1807           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1808           0 :             LBASSERTINFO( data.empty(), data );
    1809             : 
    1810             :             {
    1811           0 :                 lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1812           0 :                 _impl->nodes.data[ nodeID ] = node;
    1813             :             }
    1814           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1815           0 :                    << connection << std::endl;
    1816             :         }
    1817             :         else
    1818           0 :             node = i->second;
    1819             :     }
    1820           0 :     LBASSERT( node );
    1821           0 :     LBASSERTINFO( node->getNodeID() == nodeID,
    1822             :                   node->getNodeID() << "!=" << nodeID );
    1823             : 
    1824           0 :     _connectMulticast( node, connection );
    1825           0 :     _impl->connectionNodes[ connection ] = node;
    1826           0 :     LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
    1827           0 :            << " to " << getNodeID() << std::endl;
    1828           0 :     return true;
    1829             : }
    1830             : 
    1831           7 : bool LocalNode::_cmdDisconnect( ICommand& command )
    1832             : {
    1833           7 :     LBASSERT( _impl->inReceiverThread( ));
    1834             : 
    1835           7 :     const uint32_t requestID = command.get< uint32_t >();
    1836             : 
    1837           7 :     NodePtr node = static_cast<Node*>( getRequestData( requestID ));
    1838           7 :     LBASSERT( node );
    1839             : 
    1840           7 :     _closeNode( node );
    1841           7 :     LBASSERT( node->isClosed( ));
    1842           7 :     serveRequest( requestID );
    1843           7 :     return true;
    1844             : }
    1845             : 
    1846           0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
    1847             : {
    1848           0 :     const NodeID& nodeID = command.get< NodeID >();
    1849           0 :     const uint32_t requestID = command.get< uint32_t >();
    1850             : 
    1851           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1852           0 :            << " nodeID " << nodeID << std::endl;
    1853             : 
    1854           0 :     NodePtr node = getNode( nodeID );
    1855           0 :     NodePtr toNode = command.getRemoteNode();
    1856             : 
    1857           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1858           0 :     std::string nodeData;
    1859           0 :     if( node )
    1860             :     {
    1861           0 :         nodeType = node->getType();
    1862           0 :         nodeData = node->serialize();
    1863           0 :         LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
    1864           0 :                << toNode << std::endl;
    1865             :     }
    1866             : 
    1867             :     toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
    1868           0 :         << nodeID << requestID << nodeType << nodeData;
    1869           0 :     return true;
    1870             : }
    1871             : 
    1872           0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
    1873             : {
    1874           0 :     LBASSERT( _impl->inReceiverThread( ));
    1875             : 
    1876           0 :     const NodeID& nodeID = command.get< NodeID >();
    1877           0 :     const uint32_t requestID = command.get< uint32_t >();
    1878           0 :     const uint32_t nodeType = command.get< uint32_t >();
    1879           0 :     std::string nodeData = command.get< std::string >();
    1880             : 
    1881           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    1882           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    1883             : 
    1884             :     // No locking needed, only recv thread writes
    1885           0 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1886           0 :     if( i != _impl->nodes->end( ))
    1887             :     {
    1888             :         // Requested node connected to us in the meantime
    1889           0 :         NodePtr node = i->second;
    1890             : 
    1891           0 :         node->ref( this );
    1892           0 :         serveRequest( requestID, node.get( ));
    1893           0 :         return true;
    1894             :     }
    1895             : 
    1896           0 :     if( nodeType == NODETYPE_INVALID )
    1897             :     {
    1898           0 :         serveRequest( requestID, (void*)0 );
    1899           0 :         return true;
    1900             :     }
    1901             : 
    1902             :     // new node: create and add unconnected node
    1903           0 :     NodePtr node = createNode( nodeType );
    1904           0 :     if( node )
    1905             :     {
    1906           0 :         LBASSERT( node );
    1907             : 
    1908           0 :         if( !node->deserialize( nodeData ))
    1909           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    1910           0 :         LBASSERT( nodeData.empty( ));
    1911           0 :         node->ref( this );
    1912             :     }
    1913             :     else
    1914           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    1915             : 
    1916           0 :     serveRequest( requestID, node.get( ));
    1917           0 :     return true;
    1918             : }
    1919             : 
    1920           0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
    1921             : {
    1922           0 :     LBASSERT( inCommandThread( ));
    1923           0 :     if( !_impl->sendToken ) // enqueue command if no token available
    1924             :     {
    1925           0 :         const uint32_t timeout = Global::getTimeout();
    1926           0 :         if( timeout == LB_TIMEOUT_INDEFINITE ||
    1927           0 :             ( getTime64() - _impl->lastSendToken <= timeout ))
    1928             :         {
    1929           0 :             _impl->sendTokenQueue.push_back( command );
    1930           0 :             return true;
    1931             :         }
    1932             : 
    1933             :         // timeout! - clear old requests
    1934           0 :         _impl->sendTokenQueue.clear();
    1935             :         // 'generate' new token - release is robust
    1936             :     }
    1937             : 
    1938           0 :     _impl->sendToken = false;
    1939             : 
    1940           0 :     const uint32_t requestID = command.get< uint32_t >();
    1941           0 :     command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1942           0 :             << requestID;
    1943           0 :     return true;
    1944             : }
    1945             : 
    1946           0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
    1947             : {
    1948           0 :     const uint32_t requestID = command.get< uint32_t >();
    1949           0 :     serveRequest( requestID );
    1950           0 :     return true;
    1951             : }
    1952             : 
    1953           0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
    1954             : {
    1955           0 :     LBASSERT( inCommandThread( ));
    1956           0 :     _impl->lastSendToken = getTime64();
    1957             : 
    1958           0 :     if( _impl->sendToken )
    1959           0 :         return true; // double release due to timeout
    1960           0 :     if( _impl->sendTokenQueue.empty( ))
    1961             :     {
    1962           0 :         _impl->sendToken = true;
    1963           0 :         return true;
    1964             :     }
    1965             : 
    1966           0 :     ICommand& request = _impl->sendTokenQueue.front();
    1967             : 
    1968           0 :     const uint32_t requestID = request.get< uint32_t >();
    1969           0 :     request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1970           0 :             << requestID;
    1971           0 :     _impl->sendTokenQueue.pop_front();
    1972           0 :     return true;
    1973             : }
    1974             : 
    1975           0 : bool LocalNode::_cmdAddListener( ICommand& command )
    1976             : {
    1977             :     Connection* rawConnection =
    1978           0 :         reinterpret_cast< Connection* >( command.get< uint64_t >( ));
    1979           0 :     std::string data = command.get< std::string >();
    1980             : 
    1981           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    1982           0 :     command.getRemoteNode()->_addConnectionDescription( description );
    1983             : 
    1984           0 :     if( command.getRemoteNode() != this )
    1985           0 :         return true;
    1986             : 
    1987           0 :     ConnectionPtr connection = rawConnection;
    1988           0 :     connection->unref();
    1989           0 :     LBASSERT( connection );
    1990             : 
    1991           0 :     _impl->connectionNodes[ connection ] = this;
    1992           0 :     if( connection->isMulticast( ))
    1993           0 :         _addMulticast( this, connection );
    1994             : 
    1995           0 :     connection->acceptNB();
    1996           0 :     _impl->incoming.addConnection( connection );
    1997             : 
    1998           0 :     _initService(); // update zeroconf
    1999           0 :     return true;
    2000             : }
    2001             : 
    2002           0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
    2003             : {
    2004           0 :     const uint32_t requestID = command.get< uint32_t >();
    2005           0 :     Connection* rawConnection = command.get< Connection* >();
    2006           0 :     std::string data = command.get< std::string >();
    2007             : 
    2008           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2009           0 :     LBCHECK(
    2010             :           command.getRemoteNode()->_removeConnectionDescription( description ));
    2011             : 
    2012           0 :     if( command.getRemoteNode() != this )
    2013           0 :         return true;
    2014             : 
    2015           0 :     _initService(); // update zeroconf
    2016             : 
    2017           0 :     ConnectionPtr connection = rawConnection;
    2018           0 :     connection->unref( this );
    2019           0 :     LBASSERT( connection );
    2020             : 
    2021           0 :     if( connection->isMulticast( ))
    2022           0 :         _removeMulticast( connection );
    2023             : 
    2024           0 :     _impl->incoming.removeConnection( connection );
    2025           0 :     LBASSERT( _impl->connectionNodes.find( connection ) !=
    2026             :               _impl->connectionNodes.end( ));
    2027           0 :     _impl->connectionNodes.erase( connection );
    2028           0 :     serveRequest( requestID );
    2029           0 :     return true;
    2030             : }
    2031             : 
    2032           0 : bool LocalNode::_cmdPing( ICommand& command )
    2033             : {
    2034           0 :     LBASSERT( inCommandThread( ));
    2035           0 :     command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
    2036           0 :     return true;
    2037             : }
    2038             : 
    2039           2 : bool LocalNode::_cmdCommand( ICommand& command )
    2040             : {
    2041           2 :     const uint128_t& commandID = command.get< uint128_t >();
    2042           2 :     CommandHandler func;
    2043             :     {
    2044           2 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2045           2 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2046           2 :         if( i == _impl->commandHandlers->end( ))
    2047           0 :             return false;
    2048             : 
    2049           2 :         CommandQueue* queue = i->second.second;
    2050           2 :         if( queue )
    2051             :         {
    2052             :             command.setDispatchFunction( CmdFunc( this,
    2053           1 :                                                 &LocalNode::_cmdCommandAsync ));
    2054           1 :             queue->push( command );
    2055           1 :             return true;
    2056             :         }
    2057             :         // else
    2058             : 
    2059           1 :         func = i->second.first;
    2060             :     }
    2061             : 
    2062           2 :     CustomICommand customCmd( command );
    2063           3 :     return func( customCmd );
    2064             : }
    2065             : 
    2066           1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
    2067             : {
    2068           1 :     const uint128_t& commandID = command.get< uint128_t >();
    2069           1 :     CommandHandler func;
    2070             :     {
    2071           1 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2072           1 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2073           1 :         LBASSERT( i != _impl->commandHandlers->end( ));
    2074           1 :         if( i ==  _impl->commandHandlers->end( ))
    2075           0 :             return true; // deregistered between dispatch and now
    2076           1 :         func = i->second.first;
    2077             :     }
    2078           2 :     CustomICommand customCmd( command );
    2079           2 :     return func( customCmd );
    2080             : }
    2081             : 
    2082          33 : bool LocalNode::_cmdAddConnection( ICommand& command )
    2083             : {
    2084          33 :     LBASSERT( _impl->inReceiverThread( ));
    2085             : 
    2086          33 :     ConnectionPtr connection = command.get< ConnectionPtr >();
    2087          33 :     _addConnection( connection );
    2088          33 :     connection->unref(); // ref'd by _addConnection
    2089          33 :     return true;
    2090             : }
    2091             : 
    2092          63 : }

Generated by: LCOV version 1.11