LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 583 1094 53.3 %
Date: 2016-06-07 01:19:44 Functions: 75 107 70.1 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/futureFunction.h>
      46             : #include <lunchbox/hash.h>
      47             : #include <lunchbox/lockable.h>
      48             : #include <lunchbox/request.h>
      49             : #include <lunchbox/rng.h>
      50             : #include <lunchbox/servus.h>
      51             : #include <lunchbox/sleep.h>
      52             : 
      53             : #include <boost/bind.hpp>
      54             : #include <boost/date_time/posix_time/posix_time.hpp>
      55             : #include <boost/lexical_cast.hpp>
      56             : 
      57             : #include <list>
      58             : 
      59             : namespace bp = boost::posix_time;
      60             : 
      61             : namespace co
      62             : {
      63             : namespace
      64             : {
      65          22 : lunchbox::a_int32_t _threadIDs;
      66             : 
      67             : typedef CommandFunc< LocalNode > CmdFunc;
      68             : typedef std::list< ICommand > CommandList;
      69             : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
      70             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      71             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      72             : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
      73             : typedef NodeHash::const_iterator NodeHashCIter;
      74             : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
      75             : typedef HandlerHash::const_iterator HandlerHashCIter;
      76             : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
      77             : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
      78             : typedef CommandHash::const_iterator CommandHashCIter;
      79             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      80             : }
      81             : 
      82             : namespace detail
      83             : {
      84         106 : class ReceiverThread : public lunchbox::Thread
      85             : {
      86             : public:
      87          55 :     explicit ReceiverThread( co::LocalNode* localNode )
      88          55 :         : _localNode( localNode ) {}
      89             : 
      90          50 :     bool init() override
      91             :     {
      92          50 :         const int32_t threadID = ++_threadIDs - 1;
      93         100 :         setName( std::string( "Rcv" ) +
      94          50 :                  boost::lexical_cast< std::string >( threadID ));
      95          50 :         return _localNode->_startCommandThread( threadID );
      96             :     }
      97             : 
      98          50 :     void run() override { _localNode->_runReceiverThread(); }
      99             : 
     100             : private:
     101             :     co::LocalNode* const _localNode;
     102             : };
     103             : 
     104         106 : class CommandThread : public Worker
     105             : {
     106             : public:
     107          55 :     explicit CommandThread( co::LocalNode* localNode )
     108             :         : Worker( Global::getCommandQueueLimit( ))
     109             :         , threadID( 0 )
     110          55 :         , _localNode( localNode )
     111          55 :     {}
     112             : 
     113             :     int32_t threadID;
     114             : 
     115             : protected:
     116          50 :     bool init() override
     117             :     {
     118         100 :         setName( std::string( "Cmd" ) +
     119          50 :                  boost::lexical_cast< std::string >( threadID ));
     120          50 :         return true;
     121             :     }
     122             : 
     123      103335 :     bool stopRunning() override { return _localNode->isClosed(); }
     124       51606 :     bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
     125             : 
     126             : private:
     127             :     co::LocalNode* const _localNode;
     128             : };
     129             : 
     130             : class LocalNode
     131             : {
     132             : public:
     133          55 :     LocalNode()
     134             :         : smallBuffers( 200 )
     135             :         , bigBuffers( 20 )
     136             :         , sendToken( true )
     137             :         , lastSendToken( 0 )
     138             :         , objectStore( 0 )
     139             :         , receiverThread( 0 )
     140             :         , commandThread( 0 )
     141          55 :         , service( "_collage._tcp" )
     142          55 :     {}
     143             : 
     144          53 :     ~LocalNode()
     145          53 :     {
     146          53 :         LBASSERT( incoming.isEmpty( ));
     147          53 :         LBASSERT( connectionNodes.empty( ));
     148          53 :         LBASSERT( pendingCommands.empty( ));
     149          53 :         LBASSERT( nodes->empty( ));
     150             : 
     151          53 :         delete objectStore;
     152          53 :         objectStore = 0;
     153          53 :         LBASSERT( !commandThread->isRunning( ));
     154          53 :         delete commandThread;
     155          53 :         commandThread = 0;
     156             : 
     157          53 :         LBASSERT( !receiverThread->isRunning( ));
     158          53 :         delete receiverThread;
     159          53 :         receiverThread = 0;
     160          53 :     }
     161             : 
     162         280 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     163             : 
     164             :     /** Commands re-scheduled for dispatch. */
     165             :     CommandList pendingCommands;
     166             : 
     167             :     /** The command buffer 'allocator' for small packets */
     168             :     co::BufferCache smallBuffers;
     169             : 
     170             :     /** The command buffer 'allocator' for big packets */
     171             :     co::BufferCache bigBuffers;
     172             : 
     173             :     bool sendToken; //!< send token availability.
     174             :     uint64_t lastSendToken; //!< last used time for timeout detection
     175             :     std::deque< co::ICommand > sendTokenQueue; //!< pending requests
     176             : 
     177             :     /** Manager of distributed object */
     178             :     ObjectStore* objectStore;
     179             : 
     180             :     /** Needed for thread-safety during nodeID-based connect() */
     181             :     lunchbox::Lock connectLock;
     182             : 
     183             :     /** The node for each connection. */
     184             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     185             : 
     186             :     /** The connected nodes. */
     187             :     lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
     188             : 
     189             :     /** The connection set of all connections from/to this node. */
     190             :     co::ConnectionSet incoming;
     191             : 
     192             :     /** The process-global clock. */
     193             :     lunchbox::Clock clock;
     194             : 
     195             :     /** The registered push handlers. */
     196             :     lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
     197             : 
     198             :     /** The registered custom command handlers. */
     199             :     lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
     200             : 
     201             :     ReceiverThread* receiverThread;
     202             :     CommandThread* commandThread;
     203             : 
     204             :     lunchbox::Lockable< servus::Servus > service;
     205             : 
     206             :     // Performance counters:
     207             :     a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
     208             : };
     209             : }
     210             : 
     211          55 : LocalNode::LocalNode( const uint32_t type )
     212             :     : Node( type )
     213          55 :     , _impl( new detail::LocalNode )
     214             : {
     215          55 :     _impl->receiverThread = new detail::ReceiverThread( this );
     216          55 :     _impl->commandThread  = new detail::CommandThread( this );
     217          55 :     _impl->objectStore = new ObjectStore( this, _impl->counters );
     218             : 
     219          55 :     CommandQueue* queue = getCommandThreadQueue();
     220             :     registerCommand( CMD_NODE_CONNECT,
     221          55 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     222             :     registerCommand( CMD_NODE_CONNECT_BE,
     223          55 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     224             :     registerCommand( CMD_NODE_CONNECT_REPLY,
     225          55 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     226             :     registerCommand( CMD_NODE_CONNECT_REPLY_BE,
     227          55 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     228             :     registerCommand( CMD_NODE_ID,
     229          55 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     230             :     registerCommand( CMD_NODE_ID_BE,
     231          55 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     232             :     registerCommand( CMD_NODE_ACK_REQUEST,
     233          55 :                      CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
     234             :     registerCommand( CMD_NODE_STOP_RCV,
     235          55 :                      CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
     236             :     registerCommand( CMD_NODE_STOP_CMD,
     237          55 :                      CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
     238             :     registerCommand( CMD_NODE_SET_AFFINITY_RCV,
     239          55 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
     240             :     registerCommand( CMD_NODE_SET_AFFINITY_CMD,
     241          55 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
     242             :     registerCommand( CMD_NODE_CONNECT_ACK,
     243          55 :                      CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
     244             :     registerCommand( CMD_NODE_DISCONNECT,
     245          55 :                      CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
     246             :     registerCommand( CMD_NODE_GET_NODE_DATA,
     247          55 :                      CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
     248             :     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
     249          55 :                      CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
     250             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
     251          55 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
     252             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     253          55 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
     254             :     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
     255          55 :                      CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
     256             :     registerCommand( CMD_NODE_ADD_LISTENER,
     257          55 :                      CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
     258             :     registerCommand( CMD_NODE_REMOVE_LISTENER,
     259          55 :                      CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
     260             :     registerCommand( CMD_NODE_PING,
     261          55 :                      CmdFunc( this, &LocalNode::_cmdPing ), queue );
     262             :     registerCommand( CMD_NODE_PING_REPLY,
     263          55 :                      CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
     264             :     registerCommand( CMD_NODE_COMMAND,
     265          55 :                      CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
     266             :     registerCommand( CMD_NODE_ADD_CONNECTION,
     267          55 :                      CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
     268          55 : }
     269             : 
     270         148 : LocalNode::~LocalNode( )
     271             : {
     272          53 :     LBASSERT( !hasPendingRequests( ));
     273          53 :     delete _impl;
     274          95 : }
     275             : 
     276           1 : bool LocalNode::initLocal( const int argc, char** argv )
     277             : {
     278             :     // We do not use getopt_long because it really does not work due to the
     279             :     // following aspects:
     280             :     // - reordering of arguments
     281             :     // - different behavior of GNU and BSD implementations
     282             :     // - incomplete man pages
     283           1 :     for( int i=1; i<argc; ++i )
     284             :     {
     285           0 :         if( std::string( "--eq-listen" ) == argv[i] )
     286           0 :             LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
     287           0 :         if( std::string( "--eq-listen" ) == argv[i] ||
     288           0 :             std::string( "--co-listen" ) == argv[i] )
     289             :         {
     290           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     291             :             {
     292           0 :                 std::string data = argv[++i];
     293           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     294           0 :                 desc->port = Global::getDefaultPort();
     295             : 
     296           0 :                 if( desc->fromString( data ))
     297             :                 {
     298           0 :                     addConnectionDescription( desc );
     299           0 :                     LBASSERTINFO( data.empty(), data );
     300             :                 }
     301             :                 else
     302           0 :                     LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
     303             :             }
     304             :             else
     305             :             {
     306           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     307             :             }
     308             :         }
     309           0 :         else if ( std::string( "--co-globals" ) == argv[i] )
     310             :         {
     311           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     312             :             {
     313           0 :                 const std::string data = argv[++i];
     314           0 :                 if( !Global::fromString( data ))
     315             :                 {
     316           0 :                     LBWARN << "Invalid global variables string: " << data
     317           0 :                            << ", using default global variables." << std::endl;
     318           0 :                 }
     319             :             }
     320             :             else
     321             :             {
     322           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     323             :             }
     324             :         }
     325             :     }
     326             : 
     327           1 :     if( !listen( ))
     328             :     {
     329           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
     330           0 :                << std::endl;
     331           0 :         return false;
     332             :     }
     333           1 :     return true;
     334             : }
     335             : 
     336          50 : bool LocalNode::listen()
     337             : {
     338          50 :     LBVERB << "Listener data: " << serialize() << std::endl;
     339          50 :     if( !isClosed() || !_connectSelf( ))
     340           0 :         return false;
     341             : 
     342          50 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     343         291 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     344         194 :          i != descriptions.end(); ++i )
     345             :     {
     346          47 :         ConnectionDescriptionPtr description = *i;
     347          94 :         ConnectionPtr connection = Connection::create( description );
     348             : 
     349          47 :         if( !connection || !connection->listen( ))
     350             :         {
     351           0 :             LBWARN << "Can't create listener connection: " << description
     352           0 :                    << std::endl;
     353           0 :             return false;
     354             :         }
     355             : 
     356          47 :         _impl->connectionNodes[ connection ] = this;
     357          47 :         if( connection->isMulticast( ))
     358           0 :             _addMulticast( this, connection );
     359             : 
     360          47 :         connection->acceptNB();
     361          47 :         _impl->incoming.addConnection( connection );
     362             : 
     363          94 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     364         141 :                << std::endl;
     365          47 :     }
     366             : 
     367         100 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     368         150 :            << std::endl;
     369             : 
     370          50 :     _setListening();
     371          50 :     _impl->receiverThread->start();
     372             : 
     373          50 :     LBDEBUG << *this << std::endl;
     374          50 :     return true;
     375             : }
     376             : 
     377          49 : bool LocalNode::close()
     378             : {
     379          49 :     if( !isListening() )
     380           0 :         return false;
     381             : 
     382          49 :     send( CMD_NODE_STOP_RCV );
     383             : 
     384          49 :     LBCHECK( _impl->receiverThread->join( ));
     385          49 :     _cleanup();
     386             : 
     387         147 :     LBDEBUG << _impl->incoming.getSize() << " connections open after close"
     388         147 :            << std::endl;
     389             : #ifndef NDEBUG
     390          49 :     const Connections& connections = _impl->incoming.getConnections();
     391         147 :     for( Connections::const_iterator i = connections.begin();
     392          98 :          i != connections.end(); ++i )
     393             :     {
     394           0 :         LBDEBUG << "    " << *i << std::endl;
     395             :     }
     396             : #endif
     397             : 
     398          49 :     LBASSERTINFO( !hasPendingRequests(),
     399             :                   *static_cast< lunchbox::RequestHandler* >( this ));
     400          49 :     return true;
     401             : }
     402             : 
     403           0 : void LocalNode::setAffinity( const int32_t affinity )
     404             : {
     405           0 :     send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
     406           0 :     send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
     407             : 
     408           0 :     lunchbox::Thread::setAffinity( affinity );
     409           0 : }
     410             : 
     411          34 : void LocalNode::addConnection( ConnectionPtr connection )
     412             : {
     413          34 :     connection->ref(); // unref in _cmdAddConnection
     414          34 :     send( CMD_NODE_ADD_CONNECTION ) << connection;
     415          34 : }
     416             : 
     417           0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
     418             : {
     419           0 :     LBASSERT( isListening( ));
     420             : 
     421           0 :     ConnectionPtr connection = Connection::create( desc );
     422           0 :     if( connection && connection->listen( ))
     423             :     {
     424           0 :         addListener( connection );
     425           0 :         return connection;
     426             :     }
     427             : 
     428           0 :     return 0;
     429             : }
     430             : 
     431           0 : void LocalNode::addListener( ConnectionPtr connection )
     432             : {
     433           0 :     LBASSERT( isListening( ));
     434           0 :     LBASSERT( connection->isListening( ));
     435           0 :     if( !isListening() || !connection->isListening( ))
     436           0 :         return;
     437             : 
     438           0 :     connection->ref( this ); // unref in self handler
     439             : 
     440             :     // Update everybody's description list of me, add the listener to myself in
     441             :     // my handler
     442           0 :     Nodes nodes;
     443           0 :     getNodes( nodes );
     444             : 
     445           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     446           0 :         (*i)->send( CMD_NODE_ADD_LISTENER )
     447           0 :             << (uint64_t)(connection.get( ))
     448           0 :             << connection->getDescription()->toString();
     449             : }
     450             : 
     451           0 : void LocalNode::removeListeners( const Connections& connections )
     452             : {
     453           0 :     std::vector< lunchbox::Request< void > > requests;
     454           0 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     455             :     {
     456           0 :         ConnectionPtr connection = *i;
     457           0 :         requests.push_back( _removeListener( connection ));
     458           0 :     }
     459           0 : }
     460             : 
     461           0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
     462             : {
     463           0 :     LBASSERT( isListening( ));
     464           0 :     LBASSERTINFO( !conn->isConnected(), conn );
     465             : 
     466           0 :     conn->ref( this );
     467           0 :     const lunchbox::Request< void > request = registerRequest< void >();
     468           0 :     Nodes nodes;
     469           0 :     getNodes( nodes );
     470             : 
     471           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     472           0 :         (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
     473           0 :                                           << conn->getDescription()->toString();
     474           0 :     return request;
     475             : }
     476             : 
     477         152 : void LocalNode::_addConnection( ConnectionPtr connection )
     478             : {
     479         152 :     if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
     480             :     {
     481          34 :         addConnection( connection );
     482         186 :         return;
     483             :     }
     484             : 
     485         118 :     BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
     486         118 :     connection->recvNB( buffer, COMMAND_MINSIZE );
     487         118 :     _impl->incoming.addConnection( connection );
     488             : }
     489             : 
     490         224 : void LocalNode::_removeConnection( ConnectionPtr connection )
     491             : {
     492         224 :     LBASSERT( connection );
     493             : 
     494         224 :     _impl->incoming.removeConnection( connection );
     495         224 :     connection->resetRecvData();
     496         224 :     if( !connection->isClosed( ))
     497         130 :         connection->close(); // cancel pending IO's
     498         224 : }
     499             : 
     500          49 : void LocalNode::_cleanup()
     501             : {
     502          49 :     LBVERB << "Clean up stopped node" << std::endl;
     503          49 :     LBASSERTINFO( isClosed(), *this );
     504             : 
     505          49 :     if( !_impl->connectionNodes.empty( ))
     506         141 :         LBDEBUG << _impl->connectionNodes.size()
     507         141 :                << " open connections during cleanup" << std::endl;
     508             : #ifndef NDEBUG
     509         288 :     for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     510         192 :          i != _impl->connectionNodes.end(); ++i )
     511             :     {
     512          47 :         NodePtr node = i->second;
     513          47 :         LBDEBUG << "    " << i->first << " : " << node << std::endl;
     514          47 :     }
     515             : #endif
     516             : 
     517          49 :     _impl->connectionNodes.clear();
     518             : 
     519          49 :     if( !_impl->nodes->empty( ))
     520           6 :         LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
     521           6 :                << std::endl;
     522             : 
     523             : #ifndef NDEBUG
     524          51 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     525             :     {
     526           2 :         NodePtr node = i->second;
     527           2 :         LBDEBUG << "    " << node << std::endl;
     528           2 :     }
     529             : #endif
     530             : 
     531          49 :     _impl->nodes->clear();
     532          49 : }
     533             : 
     534         115 : void LocalNode::_closeNode( NodePtr node )
     535             : {
     536         115 :     ConnectionPtr connection = node->getConnection();
     537         230 :     ConnectionPtr mcConnection = node->_getMulticast();
     538             : 
     539         115 :     node->_disconnect();
     540             : 
     541         115 :     if( connection )
     542             :     {
     543          68 :         LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
     544             :                       _impl->connectionNodes.end(), connection );
     545             : 
     546          68 :         _removeConnection( connection );
     547          68 :         _impl->connectionNodes.erase( connection );
     548             :     }
     549             : 
     550         115 :     if( mcConnection )
     551             :     {
     552           0 :         _removeConnection( mcConnection );
     553           0 :         _impl->connectionNodes.erase( mcConnection );
     554             :     }
     555             : 
     556         115 :     _impl->objectStore->removeInstanceData( node->getNodeID( ));
     557             : 
     558         230 :     lunchbox::ScopedFastWrite mutex( _impl->nodes );
     559         115 :     _impl->nodes->erase( node->getNodeID( ));
     560         115 :     notifyDisconnect( node );
     561         230 :     LBDEBUG << node << " disconnected from " << *this << std::endl;
     562         115 : }
     563             : 
     564          50 : bool LocalNode::_connectSelf()
     565             : {
     566             :     // setup local connection to myself
     567          50 :     PipeConnectionPtr connection = new PipeConnection;
     568          50 :     if( !connection->connect( ))
     569             :     {
     570           0 :         LBERROR << "Could not create local connection to receiver thread."
     571           0 :                 << std::endl;
     572           0 :         return false;
     573             :     }
     574             : 
     575          50 :     Node::_connect( connection->getSibling( ));
     576          50 :     _setClosed(); // reset state after _connect set it to connected
     577             : 
     578             :     // add to connection set
     579          50 :     LBASSERT( connection->getDescription( ));
     580          50 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
     581             :               _impl->connectionNodes.end( ));
     582             : 
     583          50 :     _impl->connectionNodes[ connection ] = this;
     584          50 :     _impl->nodes.data[ getNodeID() ] = this;
     585          50 :     _addConnection( connection );
     586             : 
     587         100 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     588         150 :            << std::endl;
     589          50 :     return true;
     590             : }
     591             : 
     592           8 : bool LocalNode::disconnect( NodePtr node )
     593             : {
     594           8 :     if( !node || !isListening() )
     595           0 :         return false;
     596             : 
     597           8 :     if( !node->isConnected( ))
     598           0 :         return true;
     599             : 
     600           8 :     LBASSERT( !inCommandThread( ));
     601           8 :     lunchbox::Request< void > request = registerRequest< void >( node.get( ));
     602           8 :     send( CMD_NODE_DISCONNECT ) << request;
     603             : 
     604           8 :     request.wait();
     605           8 :     _impl->objectStore->removeNode( node );
     606           8 :     return true;
     607             : }
     608             : 
     609       20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
     610             : {
     611       20001 :     if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
     612       20001 :         return;
     613             : 
     614       20001 :     if( node == this ) // OPT
     615           0 :         serveRequest( requestID );
     616             :     else
     617       20001 :         node->send( CMD_NODE_ACK_REQUEST ) << requestID;
     618             : }
     619             : 
     620           0 : void LocalNode::ping( NodePtr peer )
     621             : {
     622           0 :     LBASSERT( !_impl->inReceiverThread( ));
     623           0 :     peer->send( CMD_NODE_PING );
     624           0 : }
     625             : 
     626           0 : bool LocalNode::pingIdleNodes()
     627             : {
     628           0 :     LBASSERT( !_impl->inReceiverThread( ) );
     629           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     630           0 :     Nodes nodes;
     631           0 :     getNodes( nodes, false );
     632             : 
     633           0 :     bool pinged = false;
     634           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     635             :     {
     636           0 :         NodePtr node = *i;
     637           0 :         if( getTime64() - node->getLastReceiveTime() > timeout )
     638             :         {
     639           0 :             LBINFO << " Ping Node: " <<  node->getNodeID() << " last seen "
     640           0 :                    << node->getLastReceiveTime() << std::endl;
     641           0 :             node->send( CMD_NODE_PING );
     642           0 :             pinged = true;
     643             :         }
     644           0 :     }
     645           0 :     return pinged;
     646             : }
     647             : 
     648             : //----------------------------------------------------------------------
     649             : // Object functionality
     650             : //----------------------------------------------------------------------
     651           0 : void LocalNode::disableInstanceCache()
     652             : {
     653           0 :     _impl->objectStore->disableInstanceCache();
     654           0 : }
     655             : 
     656           0 : void LocalNode::expireInstanceData( const int64_t age )
     657             : {
     658           0 :     _impl->objectStore->expireInstanceData( age );
     659           0 : }
     660             : 
     661           0 : void LocalNode::enableSendOnRegister()
     662             : {
     663           0 :     _impl->objectStore->enableSendOnRegister();
     664           0 : }
     665             : 
     666           0 : void LocalNode::disableSendOnRegister()
     667             : {
     668           0 :     _impl->objectStore->disableSendOnRegister();
     669           0 : }
     670             : 
     671          21 : bool LocalNode::registerObject( Object* object )
     672             : {
     673          21 :     return _impl->objectStore->register_( object );
     674             : }
     675             : 
     676          21 : void LocalNode::deregisterObject( Object* object )
     677             : {
     678          21 :     _impl->objectStore->deregister( object );
     679          21 : }
     680             : 
     681          40 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
     682             :                                NodePtr master, const uint128_t& version )
     683             : {
     684             :     const uint32_t request = _impl->objectStore->mapNB( object, id, version,
     685          40 :                                                         master );
     686             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
     687          40 :                                                  _impl->objectStore, request );
     688          40 :     return f_bool_t( new FuturebImpl( func ));
     689             : }
     690             : 
     691           0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     692             :                                  const uint128_t& version )
     693             : {
     694           0 :     return _impl->objectStore->mapNB( object, id, version, 0 );
     695             : }
     696             : 
     697           2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     698             :                                  const uint128_t& version, NodePtr master )
     699             : {
     700           2 :     return _impl->objectStore->mapNB( object, id, version, master );
     701             : }
     702             : 
     703             : 
     704           2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
     705             : {
     706           2 :     return _impl->objectStore->mapSync( requestID );
     707             : }
     708             : 
     709           4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
     710             :                                const uint32_t instanceID )
     711             : {
     712           4 :     return _impl->objectStore->sync( object, master, id, instanceID );
     713             : }
     714             : 
     715          42 : void LocalNode::unmapObject( Object* object )
     716             : {
     717          42 :     _impl->objectStore->unmap( object );
     718          42 : }
     719             : 
     720           0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
     721             : {
     722           0 :     _impl->objectStore->swap( oldObject, newObject );
     723           0 : }
     724             : 
     725           0 : void LocalNode::objectPush( const uint128_t& groupID,
     726             :                             const uint128_t& objectType,
     727             :                             const uint128_t& objectID, DataIStream& istream )
     728             : {
     729           0 :     lunchbox::ScopedRead mutex( _impl->pushHandlers );
     730           0 :     HandlerHashCIter i = _impl->pushHandlers->find( groupID );
     731           0 :     if( i != _impl->pushHandlers->end( ))
     732           0 :         i->second( groupID, objectType, objectID, istream );
     733             :     else
     734           0 :         LBWARN << "No custom handler for push group " << groupID
     735           0 :                << " registered" << std::endl;
     736             : 
     737           0 :     if( istream.wasUsed() && istream.hasData( ))
     738           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     739           0 :                << objectType << " object " << objectID << std::endl;
     740           0 : }
     741             : 
     742           0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
     743             :                                      const PushHandler& handler )
     744             : {
     745           0 :     lunchbox::ScopedWrite mutex( _impl->pushHandlers );
     746           0 :     (*_impl->pushHandlers)[ groupID ] = handler;
     747           0 : }
     748             : 
     749           2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
     750             :                                         const CommandHandler& func,
     751             :                                         CommandQueue* queue )
     752             : {
     753           2 :     lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
     754           2 :     if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
     755             :     {
     756           0 :         LBWARN << "Already got a registered handler for custom command "
     757           0 :                << command << std::endl;
     758           0 :         return false;
     759             :     }
     760             : 
     761             :     _impl->commandHandlers->insert( std::make_pair( command,
     762           2 :                                                 std::make_pair( func, queue )));
     763           2 :     return true;
     764             : }
     765             : 
     766           0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
     767             : {
     768           0 :     LBASSERT( !inCommandThread( ));
     769           0 :     LBASSERT( !_impl->inReceiverThread( ));
     770             : 
     771           0 :     lunchbox::Request< void > request = registerRequest< void >();
     772           0 :     node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
     773             : 
     774             :     try
     775             :     {
     776           0 :         request.wait( Global::getTimeout( ));
     777             :     }
     778           0 :     catch( const lunchbox::FutureTimeout& )
     779             :     {
     780           0 :         LBERROR << "Timeout while acquiring send token " << request.getID()
     781           0 :                 << std::endl;
     782           0 :         request.unregister();
     783           0 :         return 0;
     784             :     }
     785           0 :     return new co::SendToken( node );
     786             : }
     787             : 
     788           0 : void LocalNode::releaseSendToken( SendToken token )
     789             : {
     790           0 :     LBASSERT( !_impl->inReceiverThread( ));
     791           0 :     if( token )
     792           0 :         token->release();
     793           0 : }
     794             : 
     795             : //----------------------------------------------------------------------
     796             : // Connecting a node
     797             : //----------------------------------------------------------------------
     798             : namespace
     799             : {
     800             : enum ConnectResult
     801             : {
     802             :     CONNECT_OK,
     803             :     CONNECT_TRY_AGAIN,
     804             :     CONNECT_BAD_STATE,
     805             :     CONNECT_TIMEOUT,
     806             :     CONNECT_UNREACHABLE
     807             : };
     808             : }
     809             : 
     810          73 : NodePtr LocalNode::connect( const NodeID& nodeID )
     811             : {
     812          73 :     LBASSERT( nodeID != 0 );
     813          73 :     LBASSERT( isListening( ));
     814             : 
     815             :     // Make sure that only one connection request based on the node identifier
     816             :     // is pending at a given time. Otherwise a node with the same id might be
     817             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     818             :     // mutex is to register connecting nodes with this local node, and handle
     819             :     // all cases correctly, which is far more complex. Node connections only
     820             :     // happen a lot during initialization, and are therefore not time-critical.
     821          73 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     822             : 
     823         146 :     Nodes nodes;
     824          73 :     getNodes( nodes );
     825             : 
     826          76 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     827             :     {
     828          76 :         NodePtr peer = *i;
     829          76 :         if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
     830          73 :             return peer;
     831           3 :     }
     832             : 
     833           0 :     LBDEBUG << "Connecting node " << nodeID << std::endl;
     834           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     835             :     {
     836           0 :         NodePtr peer = *i;
     837           0 :         NodePtr node = _connect( nodeID, peer );
     838           0 :         if( node )
     839           0 :             return node;
     840           0 :     }
     841             : 
     842           0 :     NodePtr node = _connectFromZeroconf( nodeID );
     843           0 :     if( node )
     844           0 :         return node;
     845             : 
     846             :     // check again if node connected by itself by now
     847           0 :     nodes.clear();
     848           0 :     getNodes( nodes );
     849           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     850             :     {
     851           0 :         node = *i;
     852           0 :         if( node->getNodeID() == nodeID && node->isReachable( ))
     853           0 :             return node;
     854             :     }
     855             : 
     856           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     857          73 :     return 0;
     858             : }
     859             : 
     860           0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
     861             : {
     862           0 :     LBASSERT( nodeID != 0 );
     863             : 
     864           0 :     NodePtr node;
     865             :     {
     866           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     867           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     868           0 :         if( i != _impl->nodes->end( ))
     869           0 :             node = i->second;
     870             :     }
     871             : 
     872           0 :     LBASSERT( getNodeID() != nodeID );
     873           0 :     if( !node )
     874             :     {
     875           0 :         lunchbox::Request< void* > request = registerRequest< void* >();
     876           0 :         peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
     877           0 :         node = reinterpret_cast< Node* >( request.wait( ));
     878           0 :         if( !node )
     879             :         {
     880           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     881           0 :                    << std::endl;
     882           0 :             return 0;
     883             :         }
     884           0 :         node->unref( this ); // ref'd before serveRequest()
     885             :     }
     886             : 
     887           0 :     if( node->isReachable( ))
     888           0 :         return node;
     889             : 
     890           0 :     size_t tries = 10;
     891           0 :     while( --tries )
     892             :     {
     893           0 :         switch( _connect( node ))
     894             :         {
     895             :           case CONNECT_OK:
     896           0 :               return node;
     897             :           case CONNECT_TRY_AGAIN:
     898             :           {
     899           0 :               lunchbox::RNG rng;
     900             :               // collision avoidance
     901           0 :               lunchbox::sleep( rng.get< uint8_t >( ));
     902           0 :               break;
     903             :           }
     904             :           case CONNECT_BAD_STATE:
     905           0 :               LBWARN << "Internal connect error" << std::endl;
     906             :               // no break;
     907             :           case CONNECT_TIMEOUT:
     908           0 :               return 0;
     909             : 
     910             :           case CONNECT_UNREACHABLE:
     911           0 :               break; // maybe peer talks to us
     912             :         }
     913             : 
     914           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     915             :         // connect failed - check for simultaneous connect from peer
     916           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     917           0 :         if( i != _impl->nodes->end( ))
     918           0 :             node = i->second;
     919           0 :     }
     920             : 
     921           0 :     return node->isReachable() ? node : 0;
     922             : }
     923             : 
     924           0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
     925             : {
     926           0 :     lunchbox::ScopedWrite mutex( _impl->service );
     927             : 
     928             :     const Strings& instances =
     929           0 :         _impl->service->discover( servus::Servus::IF_ALL, 500 );
     930           0 :     for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
     931             :     {
     932           0 :         const std::string& instance = *i;
     933           0 :         const NodeID candidate( instance );
     934           0 :         if( candidate != nodeID )
     935           0 :             continue;
     936             : 
     937           0 :         const std::string& typeStr = _impl->service->get( instance, "co_type" );
     938           0 :         if( typeStr.empty( ))
     939           0 :             return 0;
     940             : 
     941           0 :         std::istringstream in( typeStr );
     942           0 :         uint32_t type = 0;
     943           0 :         in >> type;
     944             : 
     945           0 :         NodePtr node = createNode( type );
     946           0 :         if( !node )
     947             :         {
     948           0 :             LBINFO << "Can't create node of type " << type << std::endl;
     949           0 :             continue;
     950             :         }
     951             : 
     952             :         const std::string& numStr = _impl->service->get( instance,
     953           0 :                                                          "co_numPorts" );
     954           0 :         uint32_t num = 0;
     955             : 
     956           0 :         in.clear();
     957           0 :         in.str( numStr );
     958           0 :         in >> num;
     959           0 :         LBASSERT( num > 0 );
     960           0 :         for( size_t j = 0; j < num; ++j )
     961             :         {
     962           0 :             ConnectionDescriptionPtr desc = new ConnectionDescription;
     963           0 :             std::ostringstream out;
     964           0 :             out << "co_port" << j;
     965             : 
     966           0 :             std::string descStr = _impl->service->get( instance, out.str( ));
     967           0 :             LBASSERT( !descStr.empty( ));
     968           0 :             LBCHECK( desc->fromString( descStr ));
     969           0 :             LBASSERT( descStr.empty( ));
     970           0 :             node->addConnectionDescription( desc );
     971           0 :         }
     972           0 :         mutex.leave();
     973           0 :         if( _connect( node ))
     974           0 :             return node;
     975           0 :     }
     976           0 :     return 0;
     977             : }
     978             : 
     979          34 : bool LocalNode::connect( NodePtr node )
     980             : {
     981          34 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     982          34 :     return ( _connect( node ) == CONNECT_OK );
     983             : }
     984             : 
     985          34 : uint32_t LocalNode::_connect( NodePtr node )
     986             : {
     987          34 :     LBASSERTINFO( isListening(), *this );
     988          34 :     if( node->isReachable( ))
     989           0 :         return CONNECT_OK;
     990             : 
     991          34 :     LBASSERT( node->isClosed( ));
     992          34 :     LBDEBUG << "Connecting " << node << std::endl;
     993             : 
     994             :     // try connecting using the given descriptions
     995          34 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
     996         102 :     for( ConnectionDescriptionsCIter i = cds.begin();
     997          68 :         i != cds.end(); ++i )
     998             :     {
     999          34 :         ConnectionDescriptionPtr description = *i;
    1000          34 :         if( description->type >= CONNECTIONTYPE_MULTICAST )
    1001           0 :             continue; // Don't use multicast for primary connections
    1002             : 
    1003          34 :         ConnectionPtr connection = Connection::create( description );
    1004          34 :         if( !connection || !connection->connect( ))
    1005           0 :             continue;
    1006             : 
    1007          34 :         return _connect( node, connection );
    1008           0 :     }
    1009             : 
    1010           0 :     LBDEBUG << "Node " << node
    1011           0 :            << " unreachable, all connections failed to connect" <<std::endl;
    1012           0 :     return CONNECT_UNREACHABLE;
    1013             : }
    1014             : 
    1015           0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
    1016             : {
    1017           0 :     return ( _connect( node, connection ) == CONNECT_OK );
    1018             : }
    1019             : 
    1020          34 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
    1021             : {
    1022          34 :     LBASSERT( connection );
    1023          34 :     LBASSERT( node->getNodeID() != getNodeID( ));
    1024             : 
    1025          68 :     if( !node || !isListening() || !connection->isConnected() ||
    1026          34 :         !node->isClosed( ))
    1027             :     {
    1028           0 :         return CONNECT_BAD_STATE;
    1029             :     }
    1030             : 
    1031          34 :     _addConnection( connection );
    1032             : 
    1033             :     // send connect command to peer
    1034          34 :     lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
    1035             : #ifdef COLLAGE_BIGENDIAN
    1036             :     uint32_t cmd = CMD_NODE_CONNECT_BE;
    1037             :     lunchbox::byteswap( cmd );
    1038             : #else
    1039          34 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1040             : #endif
    1041             :     OCommand( Connections( 1, connection ), cmd )
    1042          34 :         << getNodeID() << request << getType() << serialize();
    1043             : 
    1044          34 :     bool connected = false;
    1045             :     try
    1046             :     {
    1047          34 :         connected = request.wait( 10000 /*ms*/ );
    1048             :     }
    1049           0 :     catch( const lunchbox::FutureTimeout& )
    1050             :     {
    1051           0 :         LBWARN << "Node connection handshake timeout - " << node
    1052           0 :                << " not a Collage node?" << std::endl;
    1053           0 :         request.unregister();
    1054           0 :         return CONNECT_TIMEOUT;
    1055             :     }
    1056             : 
    1057             :     // In simultaneous connect case, depending on the connection type
    1058             :     // (e.g. RDMA), a check on the connection state of the node is required
    1059          34 :     if( !connected || !node->isConnected( ))
    1060           0 :         return CONNECT_TRY_AGAIN;
    1061             : 
    1062          34 :     LBASSERT( node->getNodeID() != 0 );
    1063          34 :     LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
    1064          34 :     LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
    1065          34 :     return CONNECT_OK;
    1066             : }
    1067             : 
    1068          40 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
    1069             : {
    1070          40 :     LBASSERTINFO( id.isUUID(), id );
    1071          40 :     if( !id.isUUID( ))
    1072             :     {
    1073           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1074           0 :         return 0;
    1075             :     }
    1076             : 
    1077          40 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
    1078          40 :     if( masterNodeID == 0 )
    1079             :     {
    1080           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1081           0 :         return 0;
    1082             :     }
    1083             : 
    1084          40 :     NodePtr master = connect( masterNodeID );
    1085          40 :     if( master && !master->isClosed( ))
    1086          40 :         return master;
    1087             : 
    1088           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1089           0 :            << " for object " << id << std::endl;
    1090           0 :     return 0;
    1091             : }
    1092             : 
    1093          34 : NodePtr LocalNode::createNode( const uint32_t type )
    1094             : {
    1095          34 :     LBASSERTINFO( type == NODETYPE_NODE, type );
    1096          34 :     return new Node( type );
    1097             : }
    1098             : 
    1099           0 : NodePtr LocalNode::getNode( const NodeID& id ) const
    1100             : {
    1101           0 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1102           0 :     NodeHash::const_iterator i = _impl->nodes->find( id );
    1103           0 :     if( i == _impl->nodes->end() || !i->second->isReachable( ))
    1104           0 :         return 0;
    1105           0 :     return i->second;
    1106             : }
    1107             : 
    1108         113 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
    1109             : {
    1110         113 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1111         332 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1112             :     {
    1113         219 :         NodePtr node = i->second;
    1114         219 :         if( node->isReachable() && ( addSelf || node != this ))
    1115         219 :             nodes.push_back( i->second );
    1116         332 :     }
    1117         113 : }
    1118             : 
    1119         146 : CommandQueue* LocalNode::getCommandThreadQueue()
    1120             : {
    1121         146 :     return _impl->commandThread->getWorkerQueue();
    1122             : }
    1123             : 
    1124           8 : bool LocalNode::inCommandThread() const
    1125             : {
    1126           8 :     return _impl->commandThread->isCurrent();
    1127             : }
    1128             : 
    1129       72429 : int64_t LocalNode::getTime64() const
    1130             : {
    1131       72429 :     return _impl->clock.getTime64();
    1132             : }
    1133             : 
    1134           0 : ssize_t LocalNode::getCounter( const Counter counter ) const
    1135             : {
    1136           0 :     return _impl->counters[ counter ];
    1137             : }
    1138             : 
    1139         196 : void LocalNode::flushCommands()
    1140             : {
    1141         196 :     _impl->incoming.interrupt();
    1142         196 : }
    1143             : 
    1144             : //----------------------------------------------------------------------
    1145             : // receiver thread functions
    1146             : //----------------------------------------------------------------------
    1147          50 : void LocalNode::_runReceiverThread()
    1148             : {
    1149          50 :     LB_TS_THREAD( _rcvThread );
    1150          50 :     _initService();
    1151             : 
    1152          50 :     int nErrors = 0;
    1153       72650 :     while( isListening( ))
    1154             :     {
    1155       72551 :         const ConnectionSet::Event result = _impl->incoming.select();
    1156       72550 :         switch( result )
    1157             :         {
    1158             :             case ConnectionSet::EVENT_CONNECT:
    1159          34 :                 _handleConnect();
    1160          34 :                 break;
    1161             : 
    1162             :             case ConnectionSet::EVENT_DATA:
    1163       72179 :                 nErrors = 0;
    1164       72179 :                 _handleData();
    1165       72179 :                 break;
    1166             : 
    1167             :             case ConnectionSet::EVENT_DISCONNECT:
    1168             :             case ConnectionSet::EVENT_INVALID_HANDLE:
    1169          34 :                 _handleDisconnect();
    1170          34 :                 break;
    1171             : 
    1172             :             case ConnectionSet::EVENT_TIMEOUT:
    1173           0 :                 LBINFO << "select timeout" << std::endl;
    1174           0 :                 break;
    1175             : 
    1176             :             case ConnectionSet::EVENT_ERROR:
    1177           0 :                 ++nErrors;
    1178           0 :                 LBWARN << "Connection error during select" << std::endl;
    1179           0 :                 if( nErrors > 100 )
    1180             :                 {
    1181           0 :                     LBWARN << "Too many errors in a row, capping connection"
    1182           0 :                            << std::endl;
    1183           0 :                     _handleDisconnect();
    1184             :                 }
    1185           0 :                 break;
    1186             : 
    1187             :             case ConnectionSet::EVENT_SELECT_ERROR:
    1188           0 :                 LBWARN << "Error during select" << std::endl;
    1189           0 :                 ++nErrors;
    1190           0 :                 if( nErrors > 10 )
    1191             :                 {
    1192           0 :                     LBWARN << "Too many errors in a row" << std::endl;
    1193           0 :                     LBUNIMPLEMENTED;
    1194             :                 }
    1195           0 :                 break;
    1196             : 
    1197             :             case ConnectionSet::EVENT_INTERRUPT:
    1198         303 :                 _redispatchCommands();
    1199         303 :                 break;
    1200             : 
    1201             :             default:
    1202           0 :                 LBUNIMPLEMENTED;
    1203             :         }
    1204             :     }
    1205             : 
    1206          49 :     if( !_impl->pendingCommands.empty( ))
    1207           0 :         LBWARN << _impl->pendingCommands.size()
    1208           0 :                << " commands pending while leaving command thread" << std::endl;
    1209             : 
    1210          49 :     _impl->pendingCommands.clear();
    1211          49 :     LBCHECK( _impl->commandThread->join( ));
    1212             : 
    1213          49 :     ConnectionPtr connection = getConnection();
    1214          98 :     PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
    1215          49 :     connection = pipe->getSibling();
    1216          49 :     _removeConnection( connection );
    1217          49 :     _impl->connectionNodes.erase( connection );
    1218          49 :     _disconnect();
    1219             : 
    1220          49 :     const Connections& connections = _impl->incoming.getConnections();
    1221         171 :     while( !connections.empty( ))
    1222             :     {
    1223          73 :         connection = connections.back();
    1224          73 :         NodePtr node = _impl->connectionNodes[ connection ];
    1225             : 
    1226          73 :         if( node )
    1227          73 :             _closeNode( node );
    1228          73 :         _removeConnection( connection );
    1229          73 :     }
    1230             : 
    1231          49 :     _impl->objectStore->clear();
    1232          49 :     _impl->pendingCommands.clear();
    1233          49 :     _impl->smallBuffers.flush();
    1234          49 :     _impl->bigBuffers.flush();
    1235             : 
    1236         245 :     LBDEBUG << "Leaving receiver thread of " << lunchbox::className( this )
    1237         245 :            << std::endl;
    1238          49 : }
    1239             : 
    1240          34 : void LocalNode::_handleConnect()
    1241             : {
    1242          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1243          68 :     ConnectionPtr newConn = connection->acceptSync();
    1244          34 :     connection->acceptNB();
    1245             : 
    1246          34 :     if( newConn )
    1247          34 :         _addConnection( newConn );
    1248             :     else
    1249          34 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1250          34 : }
    1251             : 
    1252          34 : void LocalNode::_handleDisconnect()
    1253             : {
    1254          34 :     while( _handleData( )) ; // read remaining data off connection
    1255             : 
    1256          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1257          34 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
    1258             : 
    1259          34 :     if( i != _impl->connectionNodes.end( ))
    1260             :     {
    1261          34 :         NodePtr node = i->second;
    1262             : 
    1263          34 :         node->ref(); // extend lifetime to give cmd handler a chance
    1264             : 
    1265             :         // local command dispatching
    1266             :         OCommand( this, this, CMD_NODE_REMOVE_NODE )
    1267          34 :                 << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
    1268             : 
    1269          34 :         if( node->getConnection() == connection )
    1270          34 :             _closeNode( node );
    1271           0 :         else if( connection->isMulticast( ))
    1272           0 :             node->_removeMulticast( connection );
    1273             :     }
    1274             : 
    1275          34 :     _removeConnection( connection );
    1276          34 : }
    1277             : 
    1278       72213 : bool LocalNode::_handleData()
    1279             : {
    1280       72213 :     _impl->smallBuffers.compact();
    1281       72213 :     _impl->bigBuffers.compact();
    1282             : 
    1283       72213 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1284       72213 :     LBASSERT( connection );
    1285             : 
    1286      144426 :     BufferPtr buffer = _readHead( connection );
    1287       72213 :     if( !buffer ) // fluke signal
    1288          68 :         return false;
    1289             : 
    1290      144290 :     ICommand command = _setupCommand( connection, buffer );
    1291       72145 :     const bool gotCommand = _readTail( command, buffer, connection );
    1292       72145 :     LBASSERT( gotCommand );
    1293             : 
    1294             :     // start next receive
    1295      144290 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1296       72145 :     connection->recvNB( nextBuffer, COMMAND_MINSIZE );
    1297             : 
    1298       72145 :     if( gotCommand )
    1299             :     {
    1300       72145 :         _dispatchCommand( command );
    1301       72145 :         return true;
    1302             :     }
    1303             : 
    1304           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1305       72213 :     return false;
    1306             : }
    1307             : 
    1308       72213 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
    1309             : {
    1310       72213 :     BufferPtr buffer;
    1311       72213 :     const bool gotSize = connection->recvSync( buffer, false );
    1312             : 
    1313       72213 :     if( !buffer ) // fluke signal
    1314             :     {
    1315           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1316           0 :                << std::endl;
    1317           0 :         _impl->incoming.setDirty();
    1318           0 :         return 0;
    1319             :     }
    1320             : 
    1321       72213 :     if( gotSize )
    1322       72145 :         return buffer;
    1323             : 
    1324             :     // Some systems signal data on dead connections.
    1325          68 :     buffer->setSize( 0 );
    1326          68 :     connection->recvNB( buffer, COMMAND_MINSIZE );
    1327          68 :     return 0;
    1328             : }
    1329             : 
    1330       72145 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
    1331             :                                    ConstBufferPtr buffer )
    1332             : {
    1333       72145 :     NodePtr node;
    1334       72145 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
    1335       72145 :     if( i != _impl->connectionNodes.end( ))
    1336       72077 :         node = i->second;
    1337       72145 :     LBVERB << "Handle data from " << node << std::endl;
    1338             : 
    1339             : #ifdef COLLAGE_BIGENDIAN
    1340             :     const bool swapping = node ? !node->isBigEndian() : false;
    1341             : #else
    1342       72145 :     const bool swapping = node ? node->isBigEndian() : false;
    1343             : #endif
    1344      144290 :     ICommand command( this, node, buffer, swapping );
    1345             : 
    1346       72145 :     if( node )
    1347             :     {
    1348       72077 :         node->_setLastReceive( getTime64( ));
    1349       72077 :         return command;
    1350             :     }
    1351             : 
    1352          68 :     uint32_t cmd = command.getCommand();
    1353             : #ifdef COLLAGE_BIGENDIAN
    1354             :     lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
    1355             : #endif
    1356          68 :     switch( cmd )
    1357             :     {
    1358             :     case CMD_NODE_CONNECT:
    1359             :     case CMD_NODE_CONNECT_REPLY:
    1360             :     case CMD_NODE_ID:
    1361             : #ifdef COLLAGE_BIGENDIAN
    1362             :         command = ICommand( this, node, buffer, true );
    1363             : #endif
    1364          68 :         break;
    1365             : 
    1366             :     case CMD_NODE_CONNECT_BE:
    1367             :     case CMD_NODE_CONNECT_REPLY_BE:
    1368             :     case CMD_NODE_ID_BE:
    1369             : #ifndef COLLAGE_BIGENDIAN
    1370           0 :         command = ICommand( this, node, buffer, true );
    1371             : #endif
    1372           0 :         break;
    1373             : 
    1374             :     default:
    1375           0 :         LBUNIMPLEMENTED;
    1376           0 :         return ICommand();
    1377             :     }
    1378             : 
    1379          68 :     command.setCommand( cmd ); // reset correctly swapped version
    1380       72213 :     return command;
    1381             : }
    1382             : 
    1383       72145 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
    1384             :                            ConnectionPtr connection )
    1385             : {
    1386       72145 :     const uint64_t needed = command.getSize();
    1387       72145 :     if( needed <= buffer->getSize( ))
    1388       62134 :         return true;
    1389             : 
    1390       10011 :     if( needed > buffer->getMaxSize( ))
    1391             :     {
    1392           0 :         LBASSERT( needed > COMMAND_ALLOCSIZE );
    1393           0 :         LBASSERTINFO( needed < LB_BIT48,
    1394             :                       "Out-of-sync network stream: " << command << "?" );
    1395             :         // not enough space for remaining data, alloc and copy to new buffer
    1396           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
    1397           0 :         newBuffer->replace( *buffer );
    1398           0 :         buffer = newBuffer;
    1399             : 
    1400           0 :         command = ICommand( this, command.getRemoteNode(), buffer,
    1401           0 :                             command.isSwapping( ));
    1402             :     }
    1403             : 
    1404             :     // read remaining data
    1405       10011 :     connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
    1406       10011 :     return connection->recvSync( buffer );
    1407             : }
    1408             : 
    1409          35 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
    1410             : {
    1411          35 :     LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
    1412             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
    1413             :         _impl->bigBuffers.alloc( size ) :
    1414          35 :         _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1415          35 :     return buffer;
    1416             : }
    1417             : 
    1418       72194 : void LocalNode::_dispatchCommand( ICommand& command )
    1419             : {
    1420       72194 :     LBASSERTINFO( command.isValid(), command );
    1421             : 
    1422       72194 :     if( dispatchCommand( command ))
    1423       72194 :         _redispatchCommands();
    1424             :     else
    1425             :     {
    1426           0 :         _redispatchCommands();
    1427           0 :         _impl->pendingCommands.push_back( command );
    1428             :     }
    1429       72194 : }
    1430             : 
    1431       72231 : bool LocalNode::dispatchCommand( ICommand& command )
    1432             : {
    1433       72231 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1434       72231 :     LBASSERTINFO( command.isValid(), command );
    1435             : 
    1436       72231 :     const uint32_t type = command.getType();
    1437       72231 :     switch( type )
    1438             :     {
    1439             :         case COMMANDTYPE_NODE:
    1440       71726 :             LBCHECK( Dispatcher::dispatchCommand( command ));
    1441       71726 :             return true;
    1442             : 
    1443             :         case COMMANDTYPE_OBJECT:
    1444         505 :             return _impl->objectStore->dispatchObjectCommand( command );
    1445             : 
    1446             :         default:
    1447           0 :             LBABORT( "Unknown command type " << type << " for " << command );
    1448           0 :             return true;
    1449             :     }
    1450             : }
    1451             : 
    1452       72497 : void LocalNode::_redispatchCommands()
    1453             : {
    1454       72497 :     bool changes = true;
    1455      144994 :     while( changes && !_impl->pendingCommands.empty( ))
    1456             :     {
    1457           0 :         changes = false;
    1458             : 
    1459           0 :         for( CommandList::iterator i = _impl->pendingCommands.begin();
    1460           0 :              i != _impl->pendingCommands.end(); ++i )
    1461             :         {
    1462           0 :             ICommand& command = *i;
    1463           0 :             LBASSERT( command.isValid( ));
    1464             : 
    1465           0 :             if( dispatchCommand( command ))
    1466             :             {
    1467           0 :                 _impl->pendingCommands.erase( i );
    1468           0 :                 changes = true;
    1469           0 :                 break;
    1470             :             }
    1471             :         }
    1472             :     }
    1473             : 
    1474             : #ifndef NDEBUG
    1475       72497 :     if( !_impl->pendingCommands.empty( ))
    1476           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1477           0 :                << std::endl;
    1478       72497 :     LBASSERT( _impl->pendingCommands.size() < 200 );
    1479             : #endif
    1480       72497 : }
    1481             : 
    1482          50 : void LocalNode::_initService()
    1483             : {
    1484          50 :     LB_TS_SCOPED( _rcvThread );
    1485          50 :     _impl->service->withdraw(); // go silent during k/v update
    1486             : 
    1487          97 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1488          50 :     if( descs.empty( ))
    1489          53 :         return;
    1490             : 
    1491          94 :     std::ostringstream out;
    1492          47 :     out << getType();
    1493          47 :     _impl->service->set( "co_type", out.str( ));
    1494             : 
    1495          47 :     out.str("");
    1496          47 :     out << descs.size();
    1497          47 :     _impl->service->set( "co_numPorts", out.str( ));
    1498             : 
    1499          94 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
    1500             :     {
    1501          47 :         ConnectionDescriptionPtr desc = *i;
    1502          47 :         out.str("");
    1503          47 :         out << "co_port" << i - descs.begin();
    1504          47 :         _impl->service->set( out.str(), desc->toString( ));
    1505          47 :     }
    1506             : 
    1507          94 :     _impl->service->announce( descs.front()->port, getNodeID().getString( ));
    1508             : }
    1509             : 
    1510          49 : void LocalNode::_exitService()
    1511             : {
    1512          49 :     _impl->service->withdraw();
    1513          49 : }
    1514             : 
    1515           1 : Zeroconf LocalNode::getZeroconf()
    1516             : {
    1517           1 :     lunchbox::ScopedWrite mutex( _impl->service );
    1518           1 :     _impl->service->discover( servus::Servus::IF_ALL, 500 );
    1519           1 :     return Zeroconf( _impl->service.data );
    1520             : }
    1521             : 
    1522             : 
    1523             : //----------------------------------------------------------------------
    1524             : // command thread functions
    1525             : //----------------------------------------------------------------------
    1526          50 : bool LocalNode::_startCommandThread( const int32_t threadID )
    1527             : {
    1528          50 :     _impl->commandThread->threadID = threadID;
    1529          50 :     return _impl->commandThread->start();
    1530             : }
    1531             : 
    1532       51606 : bool LocalNode::_notifyCommandThreadIdle()
    1533             : {
    1534       51606 :     return _impl->objectStore->notifyCommandThreadIdle();
    1535             : }
    1536             : 
    1537       20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
    1538             : {
    1539       20001 :     const uint32_t requestID = command.get< uint32_t >();
    1540       20001 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
    1541             : 
    1542       20001 :     serveRequest( requestID );
    1543       20001 :     return true;
    1544             : }
    1545             : 
    1546          49 : bool LocalNode::_cmdStopRcv( ICommand& command )
    1547             : {
    1548          49 :     LB_TS_THREAD( _rcvThread );
    1549          49 :     LBASSERT( isListening( ));
    1550             : 
    1551          49 :     _exitService();
    1552          49 :     _setClosing(); // causes rcv thread exit
    1553             : 
    1554          49 :     command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
    1555          49 :     _dispatchCommand( command );
    1556          49 :     return true;
    1557             : }
    1558             : 
    1559          49 : bool LocalNode::_cmdStopCmd( ICommand& )
    1560             : {
    1561          49 :     LB_TS_THREAD( _cmdThread );
    1562          49 :     LBASSERTINFO( isClosing(), *this );
    1563             : 
    1564          49 :     _setClosed();
    1565          49 :     return true;
    1566             : }
    1567             : 
    1568           0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
    1569             : {
    1570           0 :     const int32_t affinity = command.get< int32_t >();
    1571             : 
    1572           0 :     lunchbox::Thread::setAffinity( affinity );
    1573           0 :     return true;
    1574             : }
    1575             : 
    1576          34 : bool LocalNode::_cmdConnect( ICommand& command )
    1577             : {
    1578          34 :     LBASSERTINFO( !command.getRemoteNode(), command );
    1579          34 :     LBASSERT( _impl->inReceiverThread( ));
    1580             : 
    1581          34 :     const NodeID& nodeID = command.get< NodeID >();
    1582          34 :     const uint32_t requestID = command.get< uint32_t >();
    1583          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1584          34 :     std::string data = command.get< std::string >();
    1585             : 
    1586          34 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1587          34 :            << nodeType << " data " << data << std::endl;
    1588             : 
    1589          68 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1590             : 
    1591          34 :     LBASSERT( connection );
    1592          34 :     LBASSERT( nodeID != getNodeID() );
    1593          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1594             :               _impl->connectionNodes.end( ));
    1595             : 
    1596          68 :     NodePtr peer;
    1597             : #ifdef COLLAGE_BIGENDIAN
    1598             :     uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
    1599             :     lunchbox::byteswap( cmd );
    1600             : #else
    1601          34 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1602             : #endif
    1603             : 
    1604             :     // No locking needed, only recv thread modifies
    1605          34 :     NodeHashCIter i = _impl->nodes->find( nodeID );
    1606          34 :     if( i != _impl->nodes->end( ))
    1607             :     {
    1608           0 :         peer = i->second;
    1609           0 :         if( peer->isReachable( ))
    1610             :         {
    1611             :             // Node exists, probably simultaneous connect from peer
    1612           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1613           0 :                    << std::endl;
    1614             : 
    1615             :             // refuse connection
    1616             :             OCommand( Connections( 1, connection ), cmd )
    1617           0 :                 << NodeID() << requestID;
    1618             : 
    1619             :             // NOTE: There is no close() here. The reply command above has to be
    1620             :             // received by the peer first, before closing the connection.
    1621           0 :             _removeConnection( connection );
    1622           0 :             return true;
    1623             :         }
    1624             :     }
    1625             : 
    1626             :     // create and add connected node
    1627          34 :     if( !peer )
    1628          34 :         peer = createNode( nodeType );
    1629          34 :     if( !peer )
    1630             :     {
    1631           0 :         LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
    1632           0 :                << std::endl;
    1633             : 
    1634             :         // refuse connection
    1635           0 :         OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
    1636             : 
    1637             :         // NOTE: There is no close() here. The reply command above has to be
    1638             :         // received by the peer first, before closing the connection.
    1639           0 :         _removeConnection( connection );
    1640           0 :         return true;
    1641             :     }
    1642             : 
    1643          34 :     if( !peer->deserialize( data ))
    1644           0 :         LBWARN << "Error during node initialization" << std::endl;
    1645          34 :     LBASSERTINFO( data.empty(), data );
    1646          34 :     LBASSERTINFO( peer->getNodeID() == nodeID,
    1647             :                   peer->getNodeID() << "!=" << nodeID );
    1648          34 :     LBASSERT( peer->getType() == nodeType );
    1649             : 
    1650          34 :     _impl->connectionNodes[ connection ] = peer;
    1651             :     {
    1652          34 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1653          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1654             :     }
    1655          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1656             : 
    1657             :     // send our information as reply
    1658             :     OCommand( Connections( 1, connection ), cmd )
    1659          34 :         << getNodeID() << requestID << getType() << serialize();
    1660             : 
    1661          68 :     return true;
    1662             : }
    1663             : 
    1664          34 : bool LocalNode::_cmdConnectReply( ICommand& command )
    1665             : {
    1666          34 :     LBASSERT( !command.getRemoteNode( ));
    1667          34 :     LBASSERT( _impl->inReceiverThread( ));
    1668             : 
    1669          34 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1670          34 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1671             :               _impl->connectionNodes.end( ));
    1672             : 
    1673          34 :     const NodeID& nodeID = command.get< NodeID >();
    1674          34 :     const uint32_t requestID = command.get< uint32_t >();
    1675             : 
    1676             :     // connection refused
    1677          34 :     if( nodeID == 0 )
    1678             :     {
    1679           0 :         LBINFO << "Connection refused, node already connected by peer"
    1680           0 :                << std::endl;
    1681             : 
    1682           0 :         _removeConnection( connection );
    1683           0 :         serveRequest( requestID, false );
    1684           0 :         return true;
    1685             :     }
    1686             : 
    1687          34 :     const uint32_t nodeType = command.get< uint32_t >();
    1688          68 :     std::string data = command.get< std::string >();
    1689             : 
    1690          34 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1691          34 :            << " type " << nodeType << " data " << data << std::endl;
    1692             : 
    1693             :     // No locking needed, only recv thread modifies
    1694          34 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1695          68 :     NodePtr peer;
    1696          34 :     if( i != _impl->nodes->end( ))
    1697           0 :         peer = i->second;
    1698             : 
    1699          34 :     if( peer && peer->isReachable( )) // simultaneous connect
    1700             :     {
    1701           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1702           0 :                << connection << std::endl;
    1703             : 
    1704           0 :         _removeConnection( connection );
    1705           0 :         _closeNode( peer );
    1706           0 :         serveRequest( requestID, false );
    1707           0 :         return true;
    1708             :     }
    1709             : 
    1710             :     // create and add node
    1711          34 :     if( !peer )
    1712             :     {
    1713          34 :         if( requestID != LB_UNDEFINED_UINT32 )
    1714          34 :             peer = reinterpret_cast< Node* >( getRequestData( requestID ));
    1715             :         else
    1716           0 :             peer = createNode( nodeType );
    1717             :     }
    1718          34 :     if( !peer )
    1719             :     {
    1720           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1721           0 :                << std::endl;
    1722           0 :         _removeConnection( connection );
    1723           0 :         return true;
    1724             :     }
    1725             : 
    1726          34 :     LBASSERTINFO( peer->getType() == nodeType,
    1727             :                   peer->getType() << " != " << nodeType );
    1728          34 :     LBASSERT( peer->isClosed( ));
    1729             : 
    1730          34 :     if( !peer->deserialize( data ))
    1731           0 :         LBWARN << "Error during node initialization" << std::endl;
    1732          34 :     LBASSERT( data.empty( ));
    1733          34 :     LBASSERT( peer->getNodeID() == nodeID );
    1734             : 
    1735             :     // send ACK to peer
    1736             :     // cppcheck-suppress unusedScopedObject
    1737          34 :     OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
    1738             : 
    1739          34 :     peer->_connect( connection );
    1740          34 :     _impl->connectionNodes[ connection ] = peer;
    1741             :     {
    1742          34 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1743          34 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1744             :     }
    1745          34 :     _connectMulticast( peer );
    1746          34 :     LBVERB << "Added node " << nodeID << std::endl;
    1747             : 
    1748          34 :     serveRequest( requestID, true );
    1749             : 
    1750          34 :     notifyConnect( peer );
    1751          68 :     return true;
    1752             : }
    1753             : 
    1754          34 : bool LocalNode::_cmdConnectAck( ICommand& command )
    1755             : {
    1756          34 :     NodePtr node = command.getRemoteNode();
    1757          34 :     LBASSERT( node );
    1758          34 :     LBASSERT( _impl->inReceiverThread( ));
    1759          34 :     LBVERB << "handle connect ack" << std::endl;
    1760             : 
    1761          34 :     node->_connect( _impl->incoming.getConnection( ));
    1762          34 :     _connectMulticast( node );
    1763          34 :     notifyConnect( node );
    1764          34 :     return true;
    1765             : }
    1766             : 
    1767           0 : bool LocalNode::_cmdID( ICommand& command )
    1768             : {
    1769           0 :     LBASSERT( _impl->inReceiverThread( ));
    1770             : 
    1771           0 :     const NodeID& nodeID = command.get< NodeID >();
    1772           0 :     uint32_t nodeType = command.get< uint32_t >();
    1773           0 :     std::string data = command.get< std::string >();
    1774             : 
    1775           0 :     if( command.getRemoteNode( ))
    1776             :     {
    1777           0 :         LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
    1778           0 :         LBASSERT( command.getRemoteNode()->_getMulticast( ));
    1779           0 :         return true;
    1780             :     }
    1781             : 
    1782           0 :     LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
    1783             : 
    1784           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1785           0 :     LBASSERT( connection->isMulticast( ));
    1786           0 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1787             :               _impl->connectionNodes.end( ));
    1788             : 
    1789           0 :     NodePtr node;
    1790           0 :     if( nodeID == getNodeID() ) // 'self' multicast connection
    1791           0 :         node = this;
    1792             :     else
    1793             :     {
    1794             :         // No locking needed, only recv thread writes
    1795           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1796             : 
    1797           0 :         if( i == _impl->nodes->end( ))
    1798             :         {
    1799             :             // unknown node: create and add unconnected node
    1800           0 :             node = createNode( nodeType );
    1801             : 
    1802           0 :             if( !node->deserialize( data ))
    1803           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1804           0 :             LBASSERTINFO( data.empty(), data );
    1805             : 
    1806             :             {
    1807           0 :                 lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1808           0 :                 _impl->nodes.data[ nodeID ] = node;
    1809             :             }
    1810           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1811           0 :                    << connection << std::endl;
    1812             :         }
    1813             :         else
    1814           0 :             node = i->second;
    1815             :     }
    1816           0 :     LBASSERT( node );
    1817           0 :     LBASSERTINFO( node->getNodeID() == nodeID,
    1818             :                   node->getNodeID() << "!=" << nodeID );
    1819             : 
    1820           0 :     _connectMulticast( node, connection );
    1821           0 :     _impl->connectionNodes[ connection ] = node;
    1822           0 :     LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
    1823           0 :            << " to " << getNodeID() << std::endl;
    1824           0 :     return true;
    1825             : }
    1826             : 
    1827           8 : bool LocalNode::_cmdDisconnect( ICommand& command )
    1828             : {
    1829           8 :     LBASSERT( _impl->inReceiverThread( ));
    1830             : 
    1831           8 :     const uint32_t requestID = command.get< uint32_t >();
    1832             : 
    1833           8 :     NodePtr node = static_cast<Node*>( getRequestData( requestID ));
    1834           8 :     LBASSERT( node );
    1835             : 
    1836           8 :     _closeNode( node );
    1837           8 :     LBASSERT( node->isClosed( ));
    1838           8 :     serveRequest( requestID );
    1839           8 :     return true;
    1840             : }
    1841             : 
    1842           0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
    1843             : {
    1844           0 :     const NodeID& nodeID = command.get< NodeID >();
    1845           0 :     const uint32_t requestID = command.get< uint32_t >();
    1846             : 
    1847           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1848           0 :            << " nodeID " << nodeID << std::endl;
    1849             : 
    1850           0 :     NodePtr node = getNode( nodeID );
    1851           0 :     NodePtr toNode = command.getRemoteNode();
    1852             : 
    1853           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1854           0 :     std::string nodeData;
    1855           0 :     if( node )
    1856             :     {
    1857           0 :         nodeType = node->getType();
    1858           0 :         nodeData = node->serialize();
    1859           0 :         LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID << " to "
    1860           0 :                << toNode << std::endl;
    1861             :     }
    1862             : 
    1863             :     toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
    1864           0 :         << nodeID << requestID << nodeType << nodeData;
    1865           0 :     return true;
    1866             : }
    1867             : 
    1868           0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
    1869             : {
    1870           0 :     LBASSERT( _impl->inReceiverThread( ));
    1871             : 
    1872           0 :     const NodeID& nodeID = command.get< NodeID >();
    1873           0 :     const uint32_t requestID = command.get< uint32_t >();
    1874           0 :     const uint32_t nodeType = command.get< uint32_t >();
    1875           0 :     std::string nodeData = command.get< std::string >();
    1876             : 
    1877           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    1878           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    1879             : 
    1880             :     // No locking needed, only recv thread writes
    1881           0 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1882           0 :     if( i != _impl->nodes->end( ))
    1883             :     {
    1884             :         // Requested node connected to us in the meantime
    1885           0 :         NodePtr node = i->second;
    1886             : 
    1887           0 :         node->ref( this );
    1888           0 :         serveRequest( requestID, node.get( ));
    1889           0 :         return true;
    1890             :     }
    1891             : 
    1892           0 :     if( nodeType == NODETYPE_INVALID )
    1893             :     {
    1894           0 :         serveRequest( requestID, (void*)0 );
    1895           0 :         return true;
    1896             :     }
    1897             : 
    1898             :     // new node: create and add unconnected node
    1899           0 :     NodePtr node = createNode( nodeType );
    1900           0 :     if( node )
    1901             :     {
    1902           0 :         LBASSERT( node );
    1903             : 
    1904           0 :         if( !node->deserialize( nodeData ))
    1905           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    1906           0 :         LBASSERT( nodeData.empty( ));
    1907           0 :         node->ref( this );
    1908             :     }
    1909             :     else
    1910           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    1911             : 
    1912           0 :     serveRequest( requestID, node.get( ));
    1913           0 :     return true;
    1914             : }
    1915             : 
    1916           0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
    1917             : {
    1918           0 :     LBASSERT( inCommandThread( ));
    1919           0 :     if( !_impl->sendToken ) // enqueue command if no token available
    1920             :     {
    1921           0 :         const uint32_t timeout = Global::getTimeout();
    1922           0 :         if( timeout == LB_TIMEOUT_INDEFINITE ||
    1923           0 :             ( getTime64() - _impl->lastSendToken <= timeout ))
    1924             :         {
    1925           0 :             _impl->sendTokenQueue.push_back( command );
    1926           0 :             return true;
    1927             :         }
    1928             : 
    1929             :         // timeout! - clear old requests
    1930           0 :         _impl->sendTokenQueue.clear();
    1931             :         // 'generate' new token - release is robust
    1932             :     }
    1933             : 
    1934           0 :     _impl->sendToken = false;
    1935             : 
    1936           0 :     const uint32_t requestID = command.get< uint32_t >();
    1937           0 :     command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1938           0 :             << requestID;
    1939           0 :     return true;
    1940             : }
    1941             : 
    1942           0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
    1943             : {
    1944           0 :     const uint32_t requestID = command.get< uint32_t >();
    1945           0 :     serveRequest( requestID );
    1946           0 :     return true;
    1947             : }
    1948             : 
    1949           0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
    1950             : {
    1951           0 :     LBASSERT( inCommandThread( ));
    1952           0 :     _impl->lastSendToken = getTime64();
    1953             : 
    1954           0 :     if( _impl->sendToken )
    1955           0 :         return true; // double release due to timeout
    1956           0 :     if( _impl->sendTokenQueue.empty( ))
    1957             :     {
    1958           0 :         _impl->sendToken = true;
    1959           0 :         return true;
    1960             :     }
    1961             : 
    1962           0 :     ICommand& request = _impl->sendTokenQueue.front();
    1963             : 
    1964           0 :     const uint32_t requestID = request.get< uint32_t >();
    1965           0 :     request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1966           0 :             << requestID;
    1967           0 :     _impl->sendTokenQueue.pop_front();
    1968           0 :     return true;
    1969             : }
    1970             : 
    1971           0 : bool LocalNode::_cmdAddListener( ICommand& command )
    1972             : {
    1973             :     Connection* rawConnection =
    1974           0 :         reinterpret_cast< Connection* >( command.get< uint64_t >( ));
    1975           0 :     std::string data = command.get< std::string >();
    1976             : 
    1977           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    1978           0 :     command.getRemoteNode()->_addConnectionDescription( description );
    1979             : 
    1980           0 :     if( command.getRemoteNode() != this )
    1981           0 :         return true;
    1982             : 
    1983           0 :     ConnectionPtr connection = rawConnection;
    1984           0 :     connection->unref();
    1985           0 :     LBASSERT( connection );
    1986             : 
    1987           0 :     _impl->connectionNodes[ connection ] = this;
    1988           0 :     if( connection->isMulticast( ))
    1989           0 :         _addMulticast( this, connection );
    1990             : 
    1991           0 :     connection->acceptNB();
    1992           0 :     _impl->incoming.addConnection( connection );
    1993             : 
    1994           0 :     _initService(); // update zeroconf
    1995           0 :     return true;
    1996             : }
    1997             : 
    1998           0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
    1999             : {
    2000           0 :     const uint32_t requestID = command.get< uint32_t >();
    2001           0 :     Connection* rawConnection = command.get< Connection* >();
    2002           0 :     std::string data = command.get< std::string >();
    2003             : 
    2004           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2005           0 :     LBCHECK(
    2006             :           command.getRemoteNode()->_removeConnectionDescription( description ));
    2007             : 
    2008           0 :     if( command.getRemoteNode() != this )
    2009           0 :         return true;
    2010             : 
    2011           0 :     _initService(); // update zeroconf
    2012             : 
    2013           0 :     ConnectionPtr connection = rawConnection;
    2014           0 :     connection->unref( this );
    2015           0 :     LBASSERT( connection );
    2016             : 
    2017           0 :     if( connection->isMulticast( ))
    2018           0 :         _removeMulticast( connection );
    2019             : 
    2020           0 :     _impl->incoming.removeConnection( connection );
    2021           0 :     LBASSERT( _impl->connectionNodes.find( connection ) !=
    2022             :               _impl->connectionNodes.end( ));
    2023           0 :     _impl->connectionNodes.erase( connection );
    2024           0 :     serveRequest( requestID );
    2025           0 :     return true;
    2026             : }
    2027             : 
    2028           0 : bool LocalNode::_cmdPing( ICommand& command )
    2029             : {
    2030           0 :     LBASSERT( inCommandThread( ));
    2031           0 :     command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
    2032           0 :     return true;
    2033             : }
    2034             : 
    2035           2 : bool LocalNode::_cmdCommand( ICommand& command )
    2036             : {
    2037           2 :     const uint128_t& commandID = command.get< uint128_t >();
    2038           2 :     CommandHandler func;
    2039             :     {
    2040           2 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2041           2 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2042           2 :         if( i == _impl->commandHandlers->end( ))
    2043           0 :             return false;
    2044             : 
    2045           2 :         CommandQueue* queue = i->second.second;
    2046           2 :         if( queue )
    2047             :         {
    2048             :             command.setDispatchFunction( CmdFunc( this,
    2049           1 :                                                 &LocalNode::_cmdCommandAsync ));
    2050           1 :             queue->push( command );
    2051           1 :             return true;
    2052             :         }
    2053             :         // else
    2054             : 
    2055           1 :         func = i->second.first;
    2056             :     }
    2057             : 
    2058           2 :     CustomICommand customCmd( command );
    2059           3 :     return func( customCmd );
    2060             : }
    2061             : 
    2062           1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
    2063             : {
    2064           1 :     const uint128_t& commandID = command.get< uint128_t >();
    2065           1 :     CommandHandler func;
    2066             :     {
    2067           1 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2068           1 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2069           1 :         LBASSERT( i != _impl->commandHandlers->end( ));
    2070           1 :         if( i ==  _impl->commandHandlers->end( ))
    2071           0 :             return true; // deregistered between dispatch and now
    2072           1 :         func = i->second.first;
    2073             :     }
    2074           2 :     CustomICommand customCmd( command );
    2075           2 :     return func( customCmd );
    2076             : }
    2077             : 
    2078          34 : bool LocalNode::_cmdAddConnection( ICommand& command )
    2079             : {
    2080          34 :     LBASSERT( _impl->inReceiverThread( ));
    2081             : 
    2082          34 :     ConnectionPtr connection = command.get< ConnectionPtr >();
    2083          34 :     _addConnection( connection );
    2084          34 :     connection->unref(); // ref'd by _addConnection
    2085          34 :     return true;
    2086             : }
    2087             : 
    2088          66 : }

Generated by: LCOV version 1.11