LCOV - code coverage report
Current view: top level - co - localNode.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 580 1088 53.3 %
Date: 2014-10-06 Functions: 73 107 68.2 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "localNode.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "bufferCache.h"
      26             : #include "commandQueue.h"
      27             : #include "connectionDescription.h"
      28             : #include "connectionSet.h"
      29             : #include "customICommand.h"
      30             : #include "dataIStream.h"
      31             : #include "exception.h"
      32             : #include "global.h"
      33             : #include "iCommand.h"
      34             : #include "nodeCommand.h"
      35             : #include "oCommand.h"
      36             : #include "object.h"
      37             : #include "objectICommand.h"
      38             : #include "objectStore.h"
      39             : #include "pipeConnection.h"
      40             : #include "sendToken.h"
      41             : #include "worker.h"
      42             : #include "zeroconf.h"
      43             : 
      44             : #include <lunchbox/clock.h>
      45             : #include <lunchbox/futureFunction.h>
      46             : #include <lunchbox/hash.h>
      47             : #include <lunchbox/lockable.h>
      48             : #include <lunchbox/request.h>
      49             : #include <lunchbox/rng.h>
      50             : #include <lunchbox/servus.h>
      51             : #include <lunchbox/sleep.h>
      52             : 
      53             : #include <boost/bind.hpp>
      54             : #include <boost/date_time/posix_time/posix_time.hpp>
      55             : #include <boost/lexical_cast.hpp>
      56             : 
      57             : #include <list>
      58             : 
      59             : namespace bp = boost::posix_time;
      60             : 
      61             : namespace co
      62             : {
      63             : namespace
      64             : {
      65          20 : lunchbox::a_int32_t _threadIDs;
      66             : 
      67             : typedef CommandFunc< LocalNode > CmdFunc;
      68             : typedef std::list< ICommand > CommandList;
      69             : typedef lunchbox::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
      70             : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
      71             : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
      72             : typedef stde::hash_map< uint128_t, NodePtr > NodeHash;
      73             : typedef NodeHash::const_iterator NodeHashCIter;
      74             : typedef stde::hash_map< uint128_t, LocalNode::PushHandler > HandlerHash;
      75             : typedef HandlerHash::const_iterator HandlerHashCIter;
      76             : typedef std::pair< LocalNode::CommandHandler, CommandQueue* > CommandPair;
      77             : typedef stde::hash_map< uint128_t, CommandPair > CommandHash;
      78             : typedef CommandHash::const_iterator CommandHashCIter;
      79             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      80             : }
      81             : 
      82             : namespace detail
      83             : {
      84          98 : class ReceiverThread : public lunchbox::Thread
      85             : {
      86             : public:
      87          51 :     ReceiverThread( co::LocalNode* localNode ) : _localNode( localNode ) {}
      88          46 :     bool init() override
      89             :     {
      90          46 :         const int32_t threadID = ++_threadIDs - 1;
      91          92 :         setName( std::string( "Rcv" ) +
      92          46 :                  boost::lexical_cast< std::string >( threadID ));
      93          46 :         return _localNode->_startCommandThread( threadID );
      94             :     }
      95             : 
      96          46 :     void run() override { _localNode->_runReceiverThread(); }
      97             : 
      98             : private:
      99             :     co::LocalNode* const _localNode;
     100             : };
     101             : 
     102          98 : class CommandThread : public Worker
     103             : {
     104             : public:
     105          51 :     CommandThread( co::LocalNode* localNode )
     106             :         : Worker( Global::getCommandQueueLimit( ))
     107             :         , threadID( 0 )
     108          51 :         , _localNode( localNode )
     109          51 :     {}
     110             : 
     111             :     int32_t threadID;
     112             : 
     113             : protected:
     114          46 :     bool init() override
     115             :     {
     116          92 :         setName( std::string( "Cmd" ) +
     117          46 :                  boost::lexical_cast< std::string >( threadID ));
     118          46 :         return true;
     119             :     }
     120             : 
     121      103286 :     bool stopRunning() override { return _localNode->isClosed(); }
     122       50597 :     bool notifyIdle() override { return _localNode->_notifyCommandThreadIdle();}
     123             : 
     124             : private:
     125             :     co::LocalNode* const _localNode;
     126             : };
     127             : 
     128             : class LocalNode
     129             : {
     130             : public:
     131          51 :     LocalNode()
     132             :         : smallBuffers( 200 )
     133             :         , bigBuffers( 20 )
     134             :         , sendToken( true )
     135             :         , lastSendToken( 0 )
     136             :         , objectStore( 0 )
     137             :         , receiverThread( 0 )
     138             :         , commandThread( 0 )
     139          51 :         , service( "_collage._tcp" )
     140          51 :     {}
     141             : 
     142          49 :     ~LocalNode()
     143          49 :     {
     144          49 :         LBASSERT( incoming.isEmpty( ));
     145          49 :         LBASSERT( connectionNodes.empty( ));
     146          49 :         LBASSERT( pendingCommands.empty( ));
     147          49 :         LBASSERT( nodes->empty( ));
     148             : 
     149          49 :         delete objectStore;
     150          49 :         objectStore = 0;
     151          49 :         LBASSERT( !commandThread->isRunning( ));
     152          49 :         delete commandThread;
     153          49 :         commandThread = 0;
     154             : 
     155          49 :         LBASSERT( !receiverThread->isRunning( ));
     156          49 :         delete receiverThread;
     157          49 :         receiverThread = 0;
     158          49 :     }
     159             : 
     160         271 :     bool inReceiverThread() const { return receiverThread->isCurrent(); }
     161             : 
     162             :     /** Commands re-scheduled for dispatch. */
     163             :     CommandList pendingCommands;
     164             : 
     165             :     /** The command buffer 'allocator' for small packets */
     166             :     co::BufferCache smallBuffers;
     167             : 
     168             :     /** The command buffer 'allocator' for big packets */
     169             :     co::BufferCache bigBuffers;
     170             : 
     171             :     bool sendToken; //!< send token availability.
     172             :     uint64_t lastSendToken; //!< last used time for timeout detection
     173             :     std::deque< co::ICommand > sendTokenQueue; //!< pending requests
     174             : 
     175             :     /** Manager of distributed object */
     176             :     ObjectStore* objectStore;
     177             : 
     178             :     /** Needed for thread-safety during nodeID-based connect() */
     179             :     lunchbox::Lock connectLock;
     180             : 
     181             :     /** The node for each connection. */
     182             :     ConnectionNodeHash connectionNodes; // read and write: recv only
     183             : 
     184             :     /** The connected nodes. */
     185             :     lunchbox::Lockable< NodeHash, lunchbox::SpinLock > nodes; // r: all, w: recv
     186             : 
     187             :     /** The connection set of all connections from/to this node. */
     188             :     co::ConnectionSet incoming;
     189             : 
     190             :     /** The process-global clock. */
     191             :     lunchbox::Clock clock;
     192             : 
     193             :     /** The registered push handlers. */
     194             :     lunchbox::Lockable< HandlerHash, lunchbox::Lock > pushHandlers;
     195             : 
     196             :     /** The registered custom command handlers. */
     197             :     lunchbox::Lockable< CommandHash, lunchbox::SpinLock > commandHandlers;
     198             : 
     199             :     ReceiverThread* receiverThread;
     200             :     CommandThread* commandThread;
     201             : 
     202             :     lunchbox::Lockable< lunchbox::Servus > service;
     203             : 
     204             :     // Performance counters:
     205             :     a_ssize_t counters[ co::LocalNode::COUNTER_ALL ];
     206             : };
     207             : }
     208             : 
     209          51 : LocalNode::LocalNode( const uint32_t type )
     210             :         : Node( type )
     211          51 :         , _impl( new detail::LocalNode )
     212             : {
     213          51 :     _impl->receiverThread = new detail::ReceiverThread( this );
     214          51 :     _impl->commandThread  = new detail::CommandThread( this );
     215          51 :     _impl->objectStore = new ObjectStore( this, _impl->counters );
     216             : 
     217          51 :     CommandQueue* queue = getCommandThreadQueue();
     218             :     registerCommand( CMD_NODE_CONNECT,
     219          51 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     220             :     registerCommand( CMD_NODE_CONNECT_BE,
     221          51 :                      CmdFunc( this, &LocalNode::_cmdConnect ), 0 );
     222             :     registerCommand( CMD_NODE_CONNECT_REPLY,
     223          51 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     224             :     registerCommand( CMD_NODE_CONNECT_REPLY_BE,
     225          51 :                      CmdFunc( this, &LocalNode::_cmdConnectReply ), 0 );
     226             :     registerCommand( CMD_NODE_ID,
     227          51 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     228             :     registerCommand( CMD_NODE_ID_BE,
     229          51 :                      CmdFunc( this, &LocalNode::_cmdID ), 0 );
     230             :     registerCommand( CMD_NODE_ACK_REQUEST,
     231          51 :                      CmdFunc( this, &LocalNode::_cmdAckRequest ), 0 );
     232             :     registerCommand( CMD_NODE_STOP_RCV,
     233          51 :                      CmdFunc( this, &LocalNode::_cmdStopRcv ), 0 );
     234             :     registerCommand( CMD_NODE_STOP_CMD,
     235          51 :                      CmdFunc( this, &LocalNode::_cmdStopCmd ), queue );
     236             :     registerCommand( CMD_NODE_SET_AFFINITY_RCV,
     237          51 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), 0 );
     238             :     registerCommand( CMD_NODE_SET_AFFINITY_CMD,
     239          51 :                      CmdFunc( this, &LocalNode::_cmdSetAffinity ), queue );
     240             :     registerCommand( CMD_NODE_CONNECT_ACK,
     241          51 :                      CmdFunc( this, &LocalNode::_cmdConnectAck ), 0 );
     242             :     registerCommand( CMD_NODE_DISCONNECT,
     243          51 :                      CmdFunc( this, &LocalNode::_cmdDisconnect ), 0 );
     244             :     registerCommand( CMD_NODE_GET_NODE_DATA,
     245          51 :                      CmdFunc( this, &LocalNode::_cmdGetNodeData ), queue );
     246             :     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
     247          51 :                      CmdFunc( this, &LocalNode::_cmdGetNodeDataReply ), 0 );
     248             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
     249          51 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendToken ), queue );
     250             :     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
     251          51 :                      CmdFunc( this, &LocalNode::_cmdAcquireSendTokenReply ), 0);
     252             :     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
     253          51 :                      CmdFunc( this, &LocalNode::_cmdReleaseSendToken ), queue );
     254             :     registerCommand( CMD_NODE_ADD_LISTENER,
     255          51 :                      CmdFunc( this, &LocalNode::_cmdAddListener ), 0 );
     256             :     registerCommand( CMD_NODE_REMOVE_LISTENER,
     257          51 :                      CmdFunc( this, &LocalNode::_cmdRemoveListener ), 0 );
     258             :     registerCommand( CMD_NODE_PING,
     259          51 :                      CmdFunc( this, &LocalNode::_cmdPing ), queue );
     260             :     registerCommand( CMD_NODE_PING_REPLY,
     261          51 :                      CmdFunc( this, &LocalNode::_cmdDiscard ), 0 );
     262             :     registerCommand( CMD_NODE_COMMAND,
     263          51 :                      CmdFunc( this, &LocalNode::_cmdCommand ), 0 );
     264             :     registerCommand( CMD_NODE_ADD_CONNECTION,
     265          51 :                      CmdFunc( this, &LocalNode::_cmdAddConnection ), 0 );
     266          51 : }
     267             : 
     268         137 : LocalNode::~LocalNode( )
     269             : {
     270          49 :     LBASSERT( !hasPendingRequests( ));
     271          49 :     delete _impl;
     272          88 : }
     273             : 
     274           1 : bool LocalNode::initLocal( const int argc, char** argv )
     275             : {
     276             : #ifndef NDEBUG
     277           1 :     LBVERB << lunchbox::disableFlush << "args: ";
     278           2 :     for( int i=0; i<argc; i++ )
     279           1 :          LBVERB << argv[i] << ", ";
     280           1 :     LBVERB << std::endl << lunchbox::enableFlush;
     281             : #endif
     282             : 
     283             :     // We do not use getopt_long because it really does not work due to the
     284             :     // following aspects:
     285             :     // - reordering of arguments
     286             :     // - different behavior of GNU and BSD implementations
     287             :     // - incomplete man pages
     288           1 :     for( int i=1; i<argc; ++i )
     289             :     {
     290           0 :         if( std::string( "--eq-listen" ) == argv[i] )
     291           0 :             LBWARN << "Deprecated --eq-listen, use --co-listen" << std::endl;
     292           0 :         if( std::string( "--eq-listen" ) == argv[i] ||
     293           0 :             std::string( "--co-listen" ) == argv[i] )
     294             :         {
     295           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     296             :             {
     297           0 :                 std::string data = argv[++i];
     298           0 :                 ConnectionDescriptionPtr desc = new ConnectionDescription;
     299           0 :                 desc->port = Global::getDefaultPort();
     300             : 
     301           0 :                 if( desc->fromString( data ))
     302             :                 {
     303           0 :                     addConnectionDescription( desc );
     304           0 :                     LBASSERTINFO( data.empty(), data );
     305             :                 }
     306             :                 else
     307           0 :                     LBWARN << "Ignoring listen option: " << argv[i] <<std::endl;
     308             :             }
     309             :             else
     310             :             {
     311           0 :                 LBWARN << "No argument given to --co-listen!" << std::endl;
     312             :             }
     313             :         }
     314           0 :         else if ( std::string( "--co-globals" ) == argv[i] )
     315             :         {
     316           0 :             if( (i+1)<argc && argv[i+1][0] != '-' )
     317             :             {
     318           0 :                 const std::string data = argv[++i];
     319           0 :                 if( !Global::fromString( data ))
     320             :                 {
     321           0 :                     LBWARN << "Invalid global variables string: " << data
     322           0 :                            << ", using default global variables." << std::endl;
     323           0 :                 }
     324             :             }
     325             :             else
     326             :             {
     327           0 :                 LBWARN << "No argument given to --co-globals!" << std::endl;
     328             :             }
     329             :         }
     330             :     }
     331             : 
     332           1 :     if( !listen( ))
     333             :     {
     334           0 :         LBWARN << "Can't setup listener(s) on " << *static_cast< Node* >( this )
     335           0 :                << std::endl;
     336           0 :         return false;
     337             :     }
     338           1 :     return true;
     339             : }
     340             : 
     341          46 : bool LocalNode::listen()
     342             : {
     343          46 :     LBVERB << "Listener data: " << serialize() << std::endl;
     344          46 :     if( !isClosed() || !_connectSelf( ))
     345           0 :         return false;
     346             : 
     347          46 :     const ConnectionDescriptions& descriptions = getConnectionDescriptions();
     348         270 :     for( ConnectionDescriptionsCIter i = descriptions.begin();
     349         180 :          i != descriptions.end(); ++i )
     350             :     {
     351          44 :         ConnectionDescriptionPtr description = *i;
     352          88 :         ConnectionPtr connection = Connection::create( description );
     353             : 
     354          44 :         if( !connection || !connection->listen( ))
     355             :         {
     356           0 :             LBWARN << "Can't create listener connection: " << description
     357           0 :                    << std::endl;
     358           0 :             return false;
     359             :         }
     360             : 
     361          44 :         _impl->connectionNodes[ connection ] = this;
     362          44 :         if( connection->isMulticast( ))
     363           0 :             _addMulticast( this, connection );
     364             : 
     365          44 :         connection->acceptNB();
     366          44 :         _impl->incoming.addConnection( connection );
     367             : 
     368          88 :         LBVERB << "Added node " << getNodeID() << " using " << connection
     369         132 :                << std::endl;
     370          44 :     }
     371             : 
     372          92 :     LBVERB << lunchbox::className(this) << " start command and receiver thread "
     373         138 :            << std::endl;
     374             : 
     375          46 :     _setListening();
     376          46 :     _impl->receiverThread->start();
     377             : 
     378          46 :     LBINFO << *this << std::endl;
     379          46 :     return true;
     380             : }
     381             : 
     382           0 : bool LocalNode::listen( ConnectionPtr connection )
     383             : {
     384           0 :     if( !listen( ))
     385           0 :         return false;
     386           0 :     _addConnection( connection );
     387           0 :     return true;
     388             : }
     389             : 
     390          45 : bool LocalNode::close()
     391             : {
     392          45 :     if( !isListening() )
     393           0 :         return false;
     394             : 
     395          45 :     send( CMD_NODE_STOP_RCV );
     396             : 
     397          45 :     LBCHECK( _impl->receiverThread->join( ));
     398          45 :     _cleanup();
     399             : 
     400         135 :     LBINFO << _impl->incoming.getSize() << " connections open after close"
     401         135 :            << std::endl;
     402             : #ifndef NDEBUG
     403          45 :     const Connections& connections = _impl->incoming.getConnections();
     404         135 :     for( Connections::const_iterator i = connections.begin();
     405          90 :          i != connections.end(); ++i )
     406             :     {
     407           0 :         LBINFO << "    " << *i << std::endl;
     408             :     }
     409             : #endif
     410             : 
     411          45 :     LBASSERTINFO( !hasPendingRequests(),
     412             :                   *static_cast< lunchbox::RequestHandler* >( this ));
     413          45 :     return true;
     414             : }
     415             : 
     416           0 : void LocalNode::setAffinity( const int32_t affinity )
     417             : {
     418           0 :     send( CMD_NODE_SET_AFFINITY_RCV ) << affinity;
     419           0 :     send( CMD_NODE_SET_AFFINITY_CMD ) << affinity;
     420             : 
     421           0 :     lunchbox::Thread::setAffinity( affinity );
     422           0 : }
     423             : 
     424           0 : ConnectionPtr LocalNode::addListener( ConnectionDescriptionPtr desc )
     425             : {
     426           0 :     LBASSERT( isListening( ));
     427             : 
     428           0 :     ConnectionPtr connection = Connection::create( desc );
     429           0 :     if( connection && connection->listen( ))
     430             :     {
     431           0 :         addListener( connection );
     432           0 :         return connection;
     433             :     }
     434             : 
     435           0 :     return 0;
     436             : }
     437             : 
     438           0 : void LocalNode::addListener( ConnectionPtr connection )
     439             : {
     440           0 :     LBASSERT( isListening( ));
     441           0 :     LBASSERT( connection->isListening( ));
     442           0 :     if( !isListening() || !connection->isListening( ))
     443           0 :         return;
     444             : 
     445           0 :     connection->ref( this ); // unref in self handler
     446             : 
     447             :     // Update everybody's description list of me, add the listener to myself in
     448             :     // my handler
     449           0 :     Nodes nodes;
     450           0 :     getNodes( nodes );
     451             : 
     452           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     453           0 :         (*i)->send( CMD_NODE_ADD_LISTENER )
     454           0 :             << (uint64_t)(connection.get( ))
     455           0 :             << connection->getDescription()->toString();
     456             : }
     457             : 
     458           0 : void LocalNode::removeListeners( const Connections& connections )
     459             : {
     460           0 :     std::vector< lunchbox::Request< void > > requests;
     461           0 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     462             :     {
     463           0 :         ConnectionPtr connection = *i;
     464           0 :         requests.push_back( _removeListener( connection ));
     465           0 :     }
     466           0 : }
     467             : 
     468           0 : lunchbox::Request< void > LocalNode::_removeListener( ConnectionPtr conn )
     469             : {
     470           0 :     LBASSERT( isListening( ));
     471           0 :     LBASSERTINFO( !conn->isConnected(), conn );
     472             : 
     473           0 :     conn->ref( this );
     474           0 :     const lunchbox::Request< void > request = registerRequest< void >();
     475           0 :     Nodes nodes;
     476           0 :     getNodes( nodes );
     477             : 
     478           0 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     479           0 :         (*i)->send( CMD_NODE_REMOVE_LISTENER ) << request << conn.get()
     480           0 :                                           << conn->getDescription()->toString();
     481           0 :     return request;
     482             : }
     483             : 
     484         145 : void LocalNode::_addConnection( ConnectionPtr connection )
     485             : {
     486         145 :     if( _impl->receiverThread->isRunning() && !_impl->inReceiverThread( ))
     487             :     {
     488          33 :         connection->ref(); // unref in _cmdAddConnection
     489          33 :         send( CMD_NODE_ADD_CONNECTION ) << connection;
     490         178 :         return;
     491             :     }
     492             : 
     493         112 :     BufferPtr buffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
     494         112 :     connection->recvNB( buffer, COMMAND_MINSIZE );
     495         112 :     _impl->incoming.addConnection( connection );
     496             : }
     497             : 
     498         214 : void LocalNode::_removeConnection( ConnectionPtr connection )
     499             : {
     500         214 :     LBASSERT( connection );
     501             : 
     502         214 :     _impl->incoming.removeConnection( connection );
     503         214 :     connection->resetRecvData();
     504         214 :     if( !connection->isClosed( ))
     505         122 :         connection->close(); // cancel pending IO's
     506         214 : }
     507             : 
     508          45 : void LocalNode::_cleanup()
     509             : {
     510          45 :     LBVERB << "Clean up stopped node" << std::endl;
     511          45 :     LBASSERTINFO( isClosed(), *this );
     512             : 
     513          45 :     if( !_impl->connectionNodes.empty( ))
     514         132 :         LBINFO << _impl->connectionNodes.size()
     515         132 :                << " open connections during cleanup" << std::endl;
     516             : #ifndef NDEBUG
     517         267 :     for( ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
     518         178 :          i != _impl->connectionNodes.end(); ++i )
     519             :     {
     520          44 :         NodePtr node = i->second;
     521          44 :         LBINFO << "    " << i->first << " : " << node << std::endl;
     522          44 :     }
     523             : #endif
     524             : 
     525          45 :     _impl->connectionNodes.clear();
     526             : 
     527          45 :     if( !_impl->nodes->empty( ))
     528           3 :         LBINFO << _impl->nodes->size() << " nodes connected during cleanup"
     529           3 :                << std::endl;
     530             : 
     531             : #ifndef NDEBUG
     532          46 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
     533             :     {
     534           1 :         NodePtr node = i->second;
     535           1 :         LBINFO << "    " << node << std::endl;
     536           1 :     }
     537             : #endif
     538             : 
     539          45 :     _impl->nodes->clear();
     540          45 : }
     541             : 
     542         110 : void LocalNode::_closeNode( NodePtr node )
     543             : {
     544         110 :     ConnectionPtr connection = node->getConnection();
     545         220 :     ConnectionPtr mcConnection = node->_getMulticast();
     546             : 
     547         110 :     node->_disconnect();
     548             : 
     549         110 :     if( connection )
     550             :     {
     551          66 :         LBASSERTINFO( _impl->connectionNodes.find( connection ) !=
     552             :                       _impl->connectionNodes.end(), connection );
     553             : 
     554          66 :         _removeConnection( connection );
     555          66 :         _impl->connectionNodes.erase( connection );
     556             :     }
     557             : 
     558         110 :     if( mcConnection )
     559             :     {
     560           0 :         _removeConnection( mcConnection );
     561           0 :         _impl->connectionNodes.erase( mcConnection );
     562             :     }
     563             : 
     564         110 :     _impl->objectStore->removeInstanceData( node->getNodeID( ));
     565             : 
     566         220 :     lunchbox::ScopedFastWrite mutex( _impl->nodes );
     567         110 :     _impl->nodes->erase( node->getNodeID( ));
     568         110 :     notifyDisconnect( node );
     569         220 :     LBINFO << node << " disconnected from " << *this << std::endl;
     570         110 : }
     571             : 
     572          46 : bool LocalNode::_connectSelf()
     573             : {
     574             :     // setup local connection to myself
     575          46 :     PipeConnectionPtr connection = new PipeConnection;
     576          46 :     if( !connection->connect( ))
     577             :     {
     578           0 :         LBERROR << "Could not create local connection to receiver thread."
     579           0 :                 << std::endl;
     580           0 :         return false;
     581             :     }
     582             : 
     583          46 :     Node::_connect( connection->getSibling( ));
     584          46 :     _setClosed(); // reset state after _connect set it to connected
     585             : 
     586             :     // add to connection set
     587          46 :     LBASSERT( connection->getDescription( ));
     588          46 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
     589             :               _impl->connectionNodes.end( ));
     590             : 
     591          46 :     _impl->connectionNodes[ connection ] = this;
     592          46 :     _impl->nodes.data[ getNodeID() ] = this;
     593          46 :     _addConnection( connection );
     594             : 
     595          92 :     LBVERB << "Added node " << getNodeID() << " using " << connection
     596         138 :            << std::endl;
     597          46 :     return true;
     598             : }
     599             : 
     600           7 : bool LocalNode::disconnect( NodePtr node )
     601             : {
     602           7 :     if( !node || !isListening() )
     603           0 :         return false;
     604             : 
     605           7 :     if( !node->isConnected( ))
     606           0 :         return true;
     607             : 
     608           7 :     LBASSERT( !inCommandThread( ));
     609           7 :     lunchbox::Request< void > request = registerRequest< void >( node.get( ));
     610           7 :     send( CMD_NODE_DISCONNECT ) << request;
     611             : 
     612           7 :     request.wait();
     613           7 :     _impl->objectStore->removeNode( node );
     614           7 :     return true;
     615             : }
     616             : 
     617       20001 : void LocalNode::ackRequest( NodePtr node, const uint32_t requestID )
     618             : {
     619       20001 :     if( requestID == LB_UNDEFINED_UINT32 ) // no need to ack operation
     620       20001 :         return;
     621             : 
     622       20001 :     if( node == this ) // OPT
     623           0 :         serveRequest( requestID );
     624             :     else
     625       20001 :         node->send( CMD_NODE_ACK_REQUEST ) << requestID;
     626             : }
     627             : 
     628           0 : void LocalNode::ping( NodePtr peer )
     629             : {
     630           0 :     LBASSERT( !_impl->inReceiverThread( ));
     631           0 :     peer->send( CMD_NODE_PING );
     632           0 : }
     633             : 
     634           0 : bool LocalNode::pingIdleNodes()
     635             : {
     636           0 :     LBASSERT( !_impl->inReceiverThread( ) );
     637           0 :     const int64_t timeout = Global::getKeepaliveTimeout() / 2;
     638           0 :     Nodes nodes;
     639           0 :     getNodes( nodes, false );
     640             : 
     641           0 :     bool pinged = false;
     642           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     643             :     {
     644           0 :         NodePtr node = *i;
     645           0 :         if( getTime64() - node->getLastReceiveTime() > timeout )
     646             :         {
     647           0 :             LBINFO << " Ping Node: " <<  node->getNodeID() << " last seen "
     648           0 :                    << node->getLastReceiveTime() << std::endl;
     649           0 :             node->send( CMD_NODE_PING );
     650           0 :             pinged = true;
     651             :         }
     652           0 :     }
     653           0 :     return pinged;
     654             : }
     655             : 
     656             : //----------------------------------------------------------------------
     657             : // Object functionality
     658             : //----------------------------------------------------------------------
     659           0 : void LocalNode::disableInstanceCache()
     660             : {
     661           0 :     _impl->objectStore->disableInstanceCache();
     662           0 : }
     663             : 
     664           0 : void LocalNode::expireInstanceData( const int64_t age )
     665             : {
     666           0 :     _impl->objectStore->expireInstanceData( age );
     667           0 : }
     668             : 
     669           0 : void LocalNode::enableSendOnRegister()
     670             : {
     671           0 :     _impl->objectStore->enableSendOnRegister();
     672           0 : }
     673             : 
     674           0 : void LocalNode::disableSendOnRegister()
     675             : {
     676           0 :     _impl->objectStore->disableSendOnRegister();
     677           0 : }
     678             : 
     679          20 : bool LocalNode::registerObject( Object* object )
     680             : {
     681          20 :     return _impl->objectStore->register_( object );
     682             : }
     683             : 
     684          20 : void LocalNode::deregisterObject( Object* object )
     685             : {
     686          20 :     _impl->objectStore->deregister( object );
     687          20 : }
     688             : 
     689          39 : f_bool_t LocalNode::mapObject( Object* object, const uint128_t& id,
     690             :                                NodePtr master, const uint128_t& version )
     691             : {
     692             :     const uint32_t request = _impl->objectStore->mapNB( object, id, version,
     693          39 :                                                         master );
     694             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::mapSync,
     695          39 :                                                  _impl->objectStore, request );
     696          39 :     return f_bool_t( new FuturebImpl( func ));
     697             : }
     698             : 
     699           0 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     700             :                                  const uint128_t& version )
     701             : {
     702           0 :     return _impl->objectStore->mapNB( object, id, version, 0 );
     703             : }
     704             : 
     705           2 : uint32_t LocalNode::mapObjectNB( Object* object, const uint128_t& id,
     706             :                                  const uint128_t& version, NodePtr master )
     707             : {
     708           2 :     return _impl->objectStore->mapNB( object, id, version, master );
     709             : }
     710             : 
     711             : 
     712           2 : bool LocalNode::mapObjectSync( const uint32_t requestID )
     713             : {
     714           2 :     return _impl->objectStore->mapSync( requestID );
     715             : }
     716             : 
     717           4 : f_bool_t LocalNode::syncObject( Object* object, NodePtr master, const uint128_t& id,
     718             :                                const uint32_t instanceID )
     719             : {
     720           4 :     return _impl->objectStore->sync( object, master, id, instanceID );
     721             : }
     722             : 
     723          41 : void LocalNode::unmapObject( Object* object )
     724             : {
     725          41 :     _impl->objectStore->unmap( object );
     726          41 : }
     727             : 
     728           0 : void LocalNode::swapObject( Object* oldObject, Object* newObject )
     729             : {
     730           0 :     _impl->objectStore->swap( oldObject, newObject );
     731           0 : }
     732             : 
     733           0 : void LocalNode::objectPush( const uint128_t& groupID,
     734             :                             const uint128_t& objectType,
     735             :                             const uint128_t& objectID, DataIStream& istream )
     736             : {
     737           0 :     lunchbox::ScopedRead mutex( _impl->pushHandlers );
     738           0 :     HandlerHashCIter i = _impl->pushHandlers->find( groupID );
     739           0 :     if( i != _impl->pushHandlers->end( ))
     740           0 :         i->second( groupID, objectType, objectID, istream );
     741             :     else
     742           0 :         LBWARN << "No custom handler for push group " << groupID
     743           0 :                << " registered" << std::endl;
     744             : 
     745           0 :     if( istream.wasUsed() && istream.hasData( ))
     746           0 :         LBWARN << "Incomplete Object::push for group " << groupID << " type "
     747           0 :                << objectType << " object " << objectID << std::endl;
     748           0 : }
     749             : 
     750           0 : void LocalNode::registerPushHandler( const uint128_t& groupID,
     751             :                                      const PushHandler& handler )
     752             : {
     753           0 :     lunchbox::ScopedWrite mutex( _impl->pushHandlers );
     754           0 :     (*_impl->pushHandlers)[ groupID ] = handler;
     755           0 : }
     756             : 
     757           2 : bool LocalNode::registerCommandHandler( const uint128_t& command,
     758             :                                         const CommandHandler& func,
     759             :                                         CommandQueue* queue )
     760             : {
     761           2 :     lunchbox::ScopedFastWrite mutex( _impl->commandHandlers );
     762           2 :     if( _impl->commandHandlers->find(command) != _impl->commandHandlers->end( ))
     763             :     {
     764           0 :         LBWARN << "Already got a registered handler for custom command "
     765           0 :                << command << std::endl;
     766           0 :         return false;
     767             :     }
     768             : 
     769             :     _impl->commandHandlers->insert( std::make_pair( command,
     770           2 :                                                 std::make_pair( func, queue )));
     771           2 :     return true;
     772             : }
     773             : 
     774           0 : LocalNode::SendToken LocalNode::acquireSendToken( NodePtr node )
     775             : {
     776           0 :     LBASSERT( !inCommandThread( ));
     777           0 :     LBASSERT( !_impl->inReceiverThread( ));
     778             : 
     779           0 :     lunchbox::Request< void > request = registerRequest< void >();
     780           0 :     node->send( CMD_NODE_ACQUIRE_SEND_TOKEN ) << request;
     781             : 
     782             :     try
     783             :     {
     784           0 :         request.wait(  Global::getTimeout() );
     785             :     }
     786             :     catch ( lunchbox::FutureTimeout& )
     787             :     {
     788             :         LBERROR << "Timeout while acquiring send token " << request.getID()
     789             :                 << std::endl;
     790             :         request.relinquish();
     791             :         return 0;
     792             :     }
     793           0 :     return new co::SendToken( node );
     794             : }
     795             : 
     796           0 : void LocalNode::releaseSendToken( SendToken token )
     797             : {
     798           0 :     LBASSERT( !_impl->inReceiverThread( ));
     799           0 :     if( token )
     800           0 :         token->release();
     801           0 : }
     802             : 
     803             : //----------------------------------------------------------------------
     804             : // Connecting a node
     805             : //----------------------------------------------------------------------
     806             : namespace
     807             : {
     808             : enum ConnectResult
     809             : {
     810             :     CONNECT_OK,
     811             :     CONNECT_TRY_AGAIN,
     812             :     CONNECT_BAD_STATE,
     813             :     CONNECT_TIMEOUT,
     814             :     CONNECT_UNREACHABLE
     815             : };
     816             : }
     817             : 
     818          72 : NodePtr LocalNode::connect( const NodeID& nodeID )
     819             : {
     820          72 :     LBASSERT( nodeID != 0 );
     821          72 :     LBASSERT( isListening( ));
     822             : 
     823             :     // Make sure that only one connection request based on the node identifier
     824             :     // is pending at a given time. Otherwise a node with the same id might be
     825             :     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
     826             :     // mutex is to register connecting nodes with this local node, and handle
     827             :     // all cases correctly, which is far more complex. Node connections only
     828             :     // happen a lot during initialization, and are therefore not time-critical.
     829          72 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     830             : 
     831         144 :     Nodes nodes;
     832          72 :     getNodes( nodes );
     833             : 
     834          75 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     835             :     {
     836          75 :         NodePtr peer = *i;
     837          75 :         if( peer->getNodeID() == nodeID && peer->isReachable( )) // early out
     838          72 :             return peer;
     839           3 :     }
     840             : 
     841           0 :     LBINFO << "Connecting node " << nodeID << std::endl;
     842           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     843             :     {
     844           0 :         NodePtr peer = *i;
     845           0 :         NodePtr node = _connect( nodeID, peer );
     846           0 :         if( node )
     847           0 :             return node;
     848           0 :     }
     849             : 
     850           0 :     NodePtr node = _connectFromZeroconf( nodeID );
     851           0 :     if( node )
     852           0 :         return node;
     853             : 
     854             :     // check again if node connected by itself by now
     855           0 :     nodes.clear();
     856           0 :     getNodes( nodes );
     857           0 :     for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
     858             :     {
     859           0 :         node = *i;
     860           0 :         if( node->getNodeID() == nodeID && node->isReachable( ))
     861           0 :             return node;
     862             :     }
     863             : 
     864           0 :     LBWARN << "Node " << nodeID << " connection failed" << std::endl;
     865          72 :     return 0;
     866             : }
     867             : 
     868           0 : NodePtr LocalNode::_connect( const NodeID& nodeID, NodePtr peer )
     869             : {
     870           0 :     LBASSERT( nodeID != 0 );
     871             : 
     872           0 :     NodePtr node;
     873             :     {
     874           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     875           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     876           0 :         if( i != _impl->nodes->end( ))
     877           0 :             node = i->second;
     878             :     }
     879             : 
     880           0 :     LBASSERT( getNodeID() != nodeID );
     881           0 :     if( !node )
     882             :     {
     883           0 :         lunchbox::Request< void* > request = registerRequest< void* >();
     884           0 :         peer->send( CMD_NODE_GET_NODE_DATA ) << nodeID << request;
     885           0 :         node = reinterpret_cast< Node* >( request.wait( ));
     886           0 :         if( !node )
     887             :         {
     888           0 :             LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
     889           0 :                    << std::endl;
     890           0 :             return 0;
     891             :         }
     892           0 :         node->unref( this ); // ref'd before serveRequest()
     893             :     }
     894             : 
     895           0 :     if( node->isReachable( ))
     896           0 :         return node;
     897             : 
     898           0 :     size_t tries = 10;
     899           0 :     while( --tries )
     900             :     {
     901           0 :         switch( _connect( node ))
     902             :         {
     903             :           case CONNECT_OK:
     904           0 :               return node;
     905             :           case CONNECT_TRY_AGAIN:
     906             :           {
     907           0 :               lunchbox::RNG rng;
     908             :               // collision avoidance
     909           0 :               lunchbox::sleep( rng.get< uint8_t >( ));
     910           0 :               break;
     911             :           }
     912             :           case CONNECT_BAD_STATE:
     913           0 :               LBWARN << "Internal connect error" << std::endl;
     914             :               // no break;
     915             :           case CONNECT_TIMEOUT:
     916           0 :               return 0;
     917             : 
     918             :           case CONNECT_UNREACHABLE:
     919           0 :               break; // maybe peer talks to us
     920             :         }
     921             : 
     922           0 :         lunchbox::ScopedFastRead mutexNodes( _impl->nodes );
     923             :         // connect failed - check for simultaneous connect from peer
     924           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
     925           0 :         if( i != _impl->nodes->end( ))
     926           0 :             node = i->second;
     927           0 :     }
     928             : 
     929           0 :     return node->isReachable() ? node : 0;
     930             : }
     931             : 
     932           0 : NodePtr LocalNode::_connectFromZeroconf( const NodeID& nodeID )
     933             : {
     934           0 :     lunchbox::ScopedWrite mutex( _impl->service );
     935             : 
     936             :     const Strings& instances =
     937           0 :         _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
     938           0 :     for( StringsCIter i = instances.begin(); i != instances.end(); ++i )
     939             :     {
     940           0 :         const std::string& instance = *i;
     941           0 :         const NodeID candidate( instance );
     942           0 :         if( candidate != nodeID )
     943           0 :             continue;
     944             : 
     945           0 :         const std::string& typeStr = _impl->service->get( instance, "co_type" );
     946           0 :         if( typeStr.empty( ))
     947           0 :             return 0;
     948             : 
     949           0 :         std::istringstream in( typeStr );
     950           0 :         uint32_t type = 0;
     951           0 :         in >> type;
     952             : 
     953           0 :         NodePtr node = createNode( type );
     954           0 :         if( !node )
     955             :         {
     956           0 :             LBINFO << "Can't create node of type " << type << std::endl;
     957           0 :             continue;
     958             :         }
     959             : 
     960             :         const std::string& numStr = _impl->service->get( instance,
     961           0 :                                                          "co_numPorts" );
     962           0 :         uint32_t num = 0;
     963             : 
     964           0 :         in.clear();
     965           0 :         in.str( numStr );
     966           0 :         in >> num;
     967           0 :         LBASSERT( num > 0 );
     968           0 :         for( size_t j = 0; j < num; ++j )
     969             :         {
     970           0 :             ConnectionDescriptionPtr desc = new ConnectionDescription;
     971           0 :             std::ostringstream out;
     972           0 :             out << "co_port" << j;
     973             : 
     974           0 :             std::string descStr = _impl->service->get( instance, out.str( ));
     975           0 :             LBASSERT( !descStr.empty( ));
     976           0 :             LBCHECK( desc->fromString( descStr ));
     977           0 :             LBASSERT( descStr.empty( ));
     978           0 :             node->addConnectionDescription( desc );
     979           0 :         }
     980           0 :         mutex.leave();
     981           0 :         if( _connect( node ))
     982           0 :             return node;
     983           0 :     }
     984           0 :     return 0;
     985             : }
     986             : 
     987          33 : bool LocalNode::connect( NodePtr node )
     988             : {
     989          33 :     lunchbox::ScopedWrite mutex( _impl->connectLock );
     990          33 :     return ( _connect( node ) == CONNECT_OK );
     991             : }
     992             : 
     993          33 : uint32_t LocalNode::_connect( NodePtr node )
     994             : {
     995          33 :     LBASSERTINFO( isListening(), *this );
     996          33 :     if( node->isReachable( ))
     997           0 :         return CONNECT_OK;
     998             : 
     999          33 :     LBASSERT( node->isClosed( ));
    1000          33 :     LBINFO << "Connecting " << node << std::endl;
    1001             : 
    1002             :     // try connecting using the given descriptions
    1003          33 :     const ConnectionDescriptions& cds = node->getConnectionDescriptions();
    1004          99 :     for( ConnectionDescriptionsCIter i = cds.begin();
    1005          66 :         i != cds.end(); ++i )
    1006             :     {
    1007          33 :         ConnectionDescriptionPtr description = *i;
    1008          33 :         if( description->type >= CONNECTIONTYPE_MULTICAST )
    1009           0 :             continue; // Don't use multicast for primary connections
    1010             : 
    1011          33 :         ConnectionPtr connection = Connection::create( description );
    1012          33 :         if( !connection || !connection->connect( ))
    1013           0 :             continue;
    1014             : 
    1015          33 :         return _connect( node, connection );
    1016             :     }
    1017             : 
    1018           0 :     LBWARN << "Node unreachable, all connections failed to connect" <<std::endl;
    1019           0 :     return CONNECT_UNREACHABLE;
    1020             : }
    1021             : 
    1022           0 : bool LocalNode::connect( NodePtr node, ConnectionPtr connection )
    1023             : {
    1024           0 :     return ( _connect( node, connection ) == CONNECT_OK );
    1025             : }
    1026             : 
    1027          33 : uint32_t LocalNode::_connect( NodePtr node, ConnectionPtr connection )
    1028             : {
    1029          33 :     LBASSERT( connection );
    1030          33 :     LBASSERT( node->getNodeID() != getNodeID( ));
    1031             : 
    1032          66 :     if( !node || !isListening() || !connection->isConnected() ||
    1033          33 :         !node->isClosed( ))
    1034             :     {
    1035           0 :         return CONNECT_BAD_STATE;
    1036             :     }
    1037             : 
    1038          33 :     _addConnection( connection );
    1039             : 
    1040             :     // send connect command to peer
    1041          33 :     lunchbox::Request< bool > request = registerRequest< bool >( node.get( ));
    1042             : #ifdef COLLAGE_BIGENDIAN
    1043             :     uint32_t cmd = CMD_NODE_CONNECT_BE;
    1044             :     lunchbox::byteswap( cmd );
    1045             : #else
    1046          33 :     const uint32_t cmd = CMD_NODE_CONNECT;
    1047             : #endif
    1048             :     OCommand( Connections( 1, connection ), cmd )
    1049          33 :         << getNodeID() << request << getType() << serialize();
    1050             : 
    1051          33 :     bool connected = false;
    1052             :     try
    1053             :     {
    1054          33 :         connected = request.wait( 10000 /*ms*/ );
    1055             :     }
    1056             :     catch( lunchbox::FutureTimeout& )
    1057             :     {
    1058             :         LBWARN << "Node connection handshake timeout - " << node
    1059             :                << " not a Collage node?" << std::endl;
    1060             :         request.relinquish();
    1061             :         return CONNECT_TIMEOUT;
    1062             :     }
    1063             : 
    1064             :     // In simultaneous connect case, depending on the connection type
    1065             :     // (e.g. RDMA), a check on the connection state of the node is required
    1066          33 :     if( !connected || !node->isConnected( ))
    1067           0 :         return CONNECT_TRY_AGAIN;
    1068             : 
    1069          33 :     LBASSERT( node->getNodeID() != 0 );
    1070          33 :     LBASSERTINFO( node->getNodeID() != getNodeID(), getNodeID() );
    1071          33 :     LBINFO << node << " connected to " << *(Node*)this << std::endl;
    1072          33 :     return CONNECT_OK;
    1073             : }
    1074             : 
    1075          39 : NodePtr LocalNode::connectObjectMaster( const uint128_t& id )
    1076             : {
    1077          39 :     LBASSERTINFO( id.isUUID(), id );
    1078          39 :     if( !id.isUUID( ))
    1079             :     {
    1080           0 :         LBWARN << "Invalid object id " << id << std::endl;
    1081           0 :         return 0;
    1082             :     }
    1083             : 
    1084          39 :     const NodeID masterNodeID = _impl->objectStore->findMasterNodeID( id );
    1085          39 :     if( masterNodeID == 0 )
    1086             :     {
    1087           0 :         LBWARN << "Can't find master node for object " << id << std::endl;
    1088           0 :         return 0;
    1089             :     }
    1090             : 
    1091          39 :     NodePtr master = connect( masterNodeID );
    1092          39 :     if( master && !master->isClosed( ))
    1093          39 :         return master;
    1094             : 
    1095           0 :     LBWARN << "Can't connect master node with id " << masterNodeID
    1096           0 :            << " for object " << id << std::endl;
    1097           0 :     return 0;
    1098             : }
    1099             : 
    1100          33 : NodePtr LocalNode::createNode( const uint32_t type )
    1101             : {
    1102          33 :     LBASSERTINFO( type == NODETYPE_NODE, type );
    1103          33 :     return new Node( type );
    1104             : }
    1105             : 
    1106           0 : NodePtr LocalNode::getNode( const NodeID& id ) const
    1107             : {
    1108           0 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1109           0 :     NodeHash::const_iterator i = _impl->nodes->find( id );
    1110           0 :     if( i == _impl->nodes->end() || !i->second->isReachable( ))
    1111           0 :         return 0;
    1112           0 :     return i->second;
    1113             : }
    1114             : 
    1115         111 : void LocalNode::getNodes( Nodes& nodes, const bool addSelf ) const
    1116             : {
    1117         111 :     lunchbox::ScopedFastRead mutex( _impl->nodes );
    1118         326 :     for( NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
    1119             :     {
    1120         215 :         NodePtr node = i->second;
    1121         215 :         if( node->isReachable() && ( addSelf || node != this ))
    1122         215 :             nodes.push_back( i->second );
    1123         326 :     }
    1124         111 : }
    1125             : 
    1126         138 : CommandQueue* LocalNode::getCommandThreadQueue()
    1127             : {
    1128         138 :     return _impl->commandThread->getWorkerQueue();
    1129             : }
    1130             : 
    1131           7 : bool LocalNode::inCommandThread() const
    1132             : {
    1133           7 :     return _impl->commandThread->isCurrent();
    1134             : }
    1135             : 
    1136       72410 : int64_t LocalNode::getTime64() const
    1137             : {
    1138       72410 :     return _impl->clock.getTime64();
    1139             : }
    1140             : 
    1141           0 : ssize_t LocalNode::getCounter( const Counter counter ) const
    1142             : {
    1143           0 :     return _impl->counters[ counter ];
    1144             : }
    1145             : 
    1146         193 : void LocalNode::flushCommands()
    1147             : {
    1148         193 :     _impl->incoming.interrupt();
    1149         193 : }
    1150             : 
    1151             : //----------------------------------------------------------------------
    1152             : // receiver thread functions
    1153             : //----------------------------------------------------------------------
    1154          46 : void LocalNode::_runReceiverThread()
    1155             : {
    1156          46 :     LB_TS_THREAD( _rcvThread );
    1157          46 :     _initService();
    1158             : 
    1159          46 :     int nErrors = 0;
    1160       72611 :     while( isListening( ))
    1161             :     {
    1162       72520 :         const ConnectionSet::Event result = _impl->incoming.select();
    1163       72519 :         switch( result )
    1164             :         {
    1165             :             case ConnectionSet::EVENT_CONNECT:
    1166          33 :                 _handleConnect();
    1167          33 :                 break;
    1168             : 
    1169             :             case ConnectionSet::EVENT_DATA:
    1170       72157 :                 _handleData();
    1171       72157 :                 break;
    1172             : 
    1173             :             case ConnectionSet::EVENT_DISCONNECT:
    1174             :             case ConnectionSet::EVENT_INVALID_HANDLE:
    1175          33 :                 _handleDisconnect();
    1176          33 :                 break;
    1177             : 
    1178             :             case ConnectionSet::EVENT_TIMEOUT:
    1179           0 :                 LBINFO << "select timeout" << std::endl;
    1180           0 :                 break;
    1181             : 
    1182             :             case ConnectionSet::EVENT_ERROR:
    1183           0 :                 ++nErrors;
    1184           0 :                 LBWARN << "Connection error during select" << std::endl;
    1185           0 :                 if( nErrors > 100 )
    1186             :                 {
    1187           0 :                     LBWARN << "Too many errors in a row, capping connection"
    1188           0 :                            << std::endl;
    1189           0 :                     _handleDisconnect();
    1190             :                 }
    1191           0 :                 break;
    1192             : 
    1193             :             case ConnectionSet::EVENT_SELECT_ERROR:
    1194           0 :                 LBWARN << "Error during select" << std::endl;
    1195           0 :                 ++nErrors;
    1196           0 :                 if( nErrors > 10 )
    1197             :                 {
    1198           0 :                     LBWARN << "Too many errors in a row" << std::endl;
    1199           0 :                     LBUNIMPLEMENTED;
    1200             :                 }
    1201           0 :                 break;
    1202             : 
    1203             :             case ConnectionSet::EVENT_INTERRUPT:
    1204         296 :                 _redispatchCommands();
    1205         296 :                 break;
    1206             : 
    1207             :             default:
    1208           0 :                 LBUNIMPLEMENTED;
    1209             :         }
    1210       72519 :         if( result != ConnectionSet::EVENT_ERROR &&
    1211             :             result != ConnectionSet::EVENT_SELECT_ERROR )
    1212             :         {
    1213       72519 :             nErrors = 0;
    1214             :         }
    1215             :     }
    1216             : 
    1217          45 :     if( !_impl->pendingCommands.empty( ))
    1218           0 :         LBWARN << _impl->pendingCommands.size()
    1219           0 :                << " commands pending while leaving command thread" << std::endl;
    1220             : 
    1221          45 :     _impl->pendingCommands.clear();
    1222          45 :     LBCHECK( _impl->commandThread->join( ));
    1223             : 
    1224          45 :     ConnectionPtr connection = getConnection();
    1225          90 :     PipeConnectionPtr pipe = LBSAFECAST( PipeConnection*, connection.get( ));
    1226          45 :     connection = pipe->getSibling();
    1227          45 :     _removeConnection( connection );
    1228          45 :     _impl->connectionNodes.erase( connection );
    1229          45 :     _disconnect();
    1230             : 
    1231          45 :     const Connections& connections = _impl->incoming.getConnections();
    1232         160 :     while( !connections.empty( ))
    1233             :     {
    1234          70 :         connection = connections.back();
    1235          70 :         NodePtr node = _impl->connectionNodes[ connection ];
    1236             : 
    1237          70 :         if( node )
    1238          70 :             _closeNode( node );
    1239          70 :         _removeConnection( connection );
    1240          70 :     }
    1241             : 
    1242          45 :     _impl->objectStore->clear();
    1243          45 :     _impl->pendingCommands.clear();
    1244          45 :     _impl->smallBuffers.flush();
    1245          45 :     _impl->bigBuffers.flush();
    1246             : 
    1247         225 :     LBINFO << "Leaving receiver thread of " << lunchbox::className( this )
    1248         225 :            << std::endl;
    1249          45 : }
    1250             : 
    1251          33 : void LocalNode::_handleConnect()
    1252             : {
    1253          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1254          66 :     ConnectionPtr newConn = connection->acceptSync();
    1255          33 :     connection->acceptNB();
    1256             : 
    1257          33 :     if( newConn )
    1258          33 :         _addConnection( newConn );
    1259             :     else
    1260          33 :         LBINFO << "Received connect event, but accept() failed" << std::endl;
    1261          33 : }
    1262             : 
    1263          33 : void LocalNode::_handleDisconnect()
    1264             : {
    1265          33 :     while( _handleData( )) ; // read remaining data off connection
    1266             : 
    1267          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1268          33 :     ConnectionNodeHash::iterator i = _impl->connectionNodes.find( connection );
    1269             : 
    1270          33 :     if( i != _impl->connectionNodes.end( ))
    1271             :     {
    1272          33 :         NodePtr node = i->second;
    1273             : 
    1274          33 :         node->ref(); // extend lifetime to give cmd handler a chance
    1275             : 
    1276             :         // local command dispatching
    1277             :         OCommand( this, this, CMD_NODE_REMOVE_NODE )
    1278          33 :                 << node.get() << uint32_t( LB_UNDEFINED_UINT32 );
    1279             : 
    1280          33 :         if( node->getConnection() == connection )
    1281          33 :             _closeNode( node );
    1282           0 :         else if( connection->isMulticast( ))
    1283           0 :             node->_removeMulticast( connection );
    1284             :     }
    1285             : 
    1286          33 :     _removeConnection( connection );
    1287          33 : }
    1288             : 
    1289       72190 : bool LocalNode::_handleData()
    1290             : {
    1291       72190 :     _impl->smallBuffers.compact();
    1292       72190 :     _impl->bigBuffers.compact();
    1293             : 
    1294       72190 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1295       72190 :     LBASSERT( connection );
    1296             : 
    1297      144380 :     BufferPtr buffer = _readHead( connection );
    1298       72190 :     if( !buffer ) // fluke signal
    1299          66 :         return false;
    1300             : 
    1301      144248 :     ICommand command = _setupCommand( connection, buffer );
    1302       72124 :     const bool gotCommand = _readTail( command, buffer, connection );
    1303       72124 :     LBASSERT( gotCommand );
    1304             : 
    1305             :     // start next receive
    1306      144248 :     BufferPtr nextBuffer = _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1307       72124 :     connection->recvNB( nextBuffer, COMMAND_MINSIZE );
    1308             : 
    1309       72124 :     if( gotCommand )
    1310             :     {
    1311       72124 :         _dispatchCommand( command );
    1312       72124 :         return true;
    1313             :     }
    1314             : 
    1315           0 :     LBERROR << "Incomplete command read: " << command << std::endl;
    1316       72190 :     return false;
    1317             : }
    1318             : 
    1319       72190 : BufferPtr LocalNode::_readHead( ConnectionPtr connection )
    1320             : {
    1321       72190 :     BufferPtr buffer;
    1322       72190 :     const bool gotSize = connection->recvSync( buffer, false );
    1323             : 
    1324       72190 :     if( !buffer ) // fluke signal
    1325             :     {
    1326           0 :         LBWARN << "Erronous network event on " << connection->getDescription()
    1327           0 :                << std::endl;
    1328           0 :         _impl->incoming.setDirty();
    1329           0 :         return 0;
    1330             :     }
    1331             : 
    1332       72190 :     if( gotSize )
    1333       72124 :         return buffer;
    1334             : 
    1335             :     // Some systems signal data on dead connections.
    1336          66 :     buffer->setSize( 0 );
    1337          66 :     connection->recvNB( buffer, COMMAND_MINSIZE );
    1338          66 :     return 0;
    1339             : }
    1340             : 
    1341       72124 : ICommand LocalNode::_setupCommand( ConnectionPtr connection,
    1342             :                                    ConstBufferPtr buffer )
    1343             : {
    1344       72124 :     NodePtr node;
    1345       72124 :     ConnectionNodeHashCIter i = _impl->connectionNodes.find( connection );
    1346       72124 :     if( i != _impl->connectionNodes.end( ))
    1347       72058 :         node = i->second;
    1348       72124 :     LBVERB << "Handle data from " << node << std::endl;
    1349             : 
    1350             : #ifdef COLLAGE_BIGENDIAN
    1351             :     const bool swapping = node ? !node->isBigEndian() : false;
    1352             : #else
    1353       72124 :     const bool swapping = node ? node->isBigEndian() : false;
    1354             : #endif
    1355      144248 :     ICommand command( this, node, buffer, swapping );
    1356             : 
    1357       72124 :     if( node )
    1358             :     {
    1359       72058 :         node->_setLastReceive( getTime64( ));
    1360       72058 :         return command;
    1361             :     }
    1362             : 
    1363          66 :     uint32_t cmd = command.getCommand();
    1364             : #ifdef COLLAGE_BIGENDIAN
    1365             :     lunchbox::byteswap( cmd ); // pre-node commands are sent little endian
    1366             : #endif
    1367          66 :     switch( cmd )
    1368             :     {
    1369             :     case CMD_NODE_CONNECT:
    1370             :     case CMD_NODE_CONNECT_REPLY:
    1371             :     case CMD_NODE_ID:
    1372             : #ifdef COLLAGE_BIGENDIAN
    1373             :         command = ICommand( this, node, buffer, true );
    1374             : #endif
    1375          66 :         break;
    1376             : 
    1377             :     case CMD_NODE_CONNECT_BE:
    1378             :     case CMD_NODE_CONNECT_REPLY_BE:
    1379             :     case CMD_NODE_ID_BE:
    1380             : #ifndef COLLAGE_BIGENDIAN
    1381           0 :         command = ICommand( this, node, buffer, true );
    1382             : #endif
    1383           0 :         break;
    1384             : 
    1385             :     default:
    1386           0 :         LBUNIMPLEMENTED;
    1387           0 :         return ICommand();
    1388             :     }
    1389             : 
    1390          66 :     command.setCommand( cmd ); // reset correctly swapped version
    1391       72190 :     return command;
    1392             : }
    1393             : 
    1394       72124 : bool LocalNode::_readTail( ICommand& command, BufferPtr buffer,
    1395             :                            ConnectionPtr connection )
    1396             : {
    1397       72124 :     const uint64_t needed = command.getSize();
    1398       72124 :     if( needed <= buffer->getSize( ))
    1399       62112 :         return true;
    1400             : 
    1401       10011 :     if( needed > buffer->getMaxSize( ))
    1402             :     {
    1403           0 :         LBASSERT( needed > COMMAND_ALLOCSIZE );
    1404           0 :         LBASSERTINFO( needed < LB_BIT48,
    1405             :                       "Out-of-sync network stream: " << command << "?" );
    1406             :         // not enough space for remaining data, alloc and copy to new buffer
    1407           0 :         BufferPtr newBuffer = _impl->bigBuffers.alloc( needed );
    1408           0 :         newBuffer->replace( *buffer );
    1409           0 :         buffer = newBuffer;
    1410             : 
    1411           0 :         command = ICommand( this, command.getRemoteNode(), buffer,
    1412           0 :                             command.isSwapping( ));
    1413             :     }
    1414             : 
    1415             :     // read remaining data
    1416       10011 :     connection->recvNB( buffer, command.getSize() - buffer->getSize( ));
    1417       10011 :     return connection->recvSync( buffer );
    1418             : }
    1419             : 
    1420          34 : BufferPtr LocalNode::allocBuffer( const uint64_t size )
    1421             : {
    1422          34 :     LBASSERT( _impl->receiverThread->isStopped() || _impl->inReceiverThread( ));
    1423             :     BufferPtr buffer = size > COMMAND_ALLOCSIZE ?
    1424             :         _impl->bigBuffers.alloc( size ) :
    1425          34 :         _impl->smallBuffers.alloc( COMMAND_ALLOCSIZE );
    1426          34 :     return buffer;
    1427             : }
    1428             : 
    1429       72169 : void LocalNode::_dispatchCommand( ICommand& command )
    1430             : {
    1431       72169 :     LBASSERTINFO( command.isValid(), command );
    1432             : 
    1433       72169 :     if( dispatchCommand( command ))
    1434       72169 :         _redispatchCommands();
    1435             :     else
    1436             :     {
    1437           0 :         _redispatchCommands();
    1438           0 :         _impl->pendingCommands.push_back( command );
    1439             :     }
    1440       72169 : }
    1441             : 
    1442       72205 : bool LocalNode::dispatchCommand( ICommand& command )
    1443             : {
    1444       72205 :     LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
    1445       72205 :     LBASSERTINFO( command.isValid(), command );
    1446             : 
    1447       72205 :     const uint32_t type = command.getType();
    1448       72205 :     switch( type )
    1449             :     {
    1450             :         case COMMANDTYPE_NODE:
    1451       71697 :             LBCHECK( Dispatcher::dispatchCommand( command ));
    1452       71697 :             return true;
    1453             : 
    1454             :         case COMMANDTYPE_OBJECT:
    1455         508 :             return _impl->objectStore->dispatchObjectCommand( command );
    1456             : 
    1457             :         default:
    1458           0 :             LBABORT( "Unknown command type " << type << " for " << command );
    1459           0 :             return true;
    1460             :     }
    1461             : }
    1462             : 
    1463       72465 : void LocalNode::_redispatchCommands()
    1464             : {
    1465       72465 :     bool changes = true;
    1466      144930 :     while( changes && !_impl->pendingCommands.empty( ))
    1467             :     {
    1468           0 :         changes = false;
    1469             : 
    1470           0 :         for( CommandList::iterator i = _impl->pendingCommands.begin();
    1471           0 :              i != _impl->pendingCommands.end(); ++i )
    1472             :         {
    1473           0 :             ICommand& command = *i;
    1474           0 :             LBASSERT( command.isValid( ));
    1475             : 
    1476           0 :             if( dispatchCommand( command ))
    1477             :             {
    1478           0 :                 _impl->pendingCommands.erase( i );
    1479           0 :                 changes = true;
    1480           0 :                 break;
    1481             :             }
    1482             :         }
    1483             :     }
    1484             : 
    1485             : #ifndef NDEBUG
    1486       72465 :     if( !_impl->pendingCommands.empty( ))
    1487           0 :         LBVERB << _impl->pendingCommands.size() << " undispatched commands"
    1488           0 :                << std::endl;
    1489       72465 :     LBASSERT( _impl->pendingCommands.size() < 200 );
    1490             : #endif
    1491       72465 : }
    1492             : 
    1493          46 : void LocalNode::_initService()
    1494             : {
    1495          46 :     LB_TS_SCOPED( _rcvThread );
    1496          46 :     _impl->service->withdraw(); // go silent during k/v update
    1497             : 
    1498          90 :     const ConnectionDescriptions& descs = getConnectionDescriptions();
    1499          46 :     if( descs.empty( ))
    1500          48 :         return;
    1501             : 
    1502          88 :     std::ostringstream out;
    1503          44 :     out << getType();
    1504          44 :     _impl->service->set( "co_type", out.str( ));
    1505             : 
    1506          44 :     out.str("");
    1507          44 :     out << descs.size();
    1508          44 :     _impl->service->set( "co_numPorts", out.str( ));
    1509             : 
    1510          88 :     for( ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i )
    1511             :     {
    1512          44 :         ConnectionDescriptionPtr desc = *i;
    1513          44 :         out.str("");
    1514          44 :         out << "co_port" << i - descs.begin();
    1515          44 :         _impl->service->set( out.str(), desc->toString( ));
    1516          44 :     }
    1517             : 
    1518          88 :     _impl->service->announce( descs.front()->port, getNodeID().getString( ));
    1519             : }
    1520             : 
    1521          45 : void LocalNode::_exitService()
    1522             : {
    1523          45 :     _impl->service->withdraw();
    1524          45 : }
    1525             : 
    1526           0 : Zeroconf LocalNode::getZeroconf()
    1527             : {
    1528           0 :     lunchbox::ScopedWrite mutex( _impl->service );
    1529           0 :     _impl->service->discover( lunchbox::Servus::IF_ALL, 500 );
    1530           0 :     return Zeroconf( _impl->service.data );
    1531             : }
    1532             : 
    1533             : 
    1534             : //----------------------------------------------------------------------
    1535             : // command thread functions
    1536             : //----------------------------------------------------------------------
    1537          46 : bool LocalNode::_startCommandThread( const int32_t threadID )
    1538             : {
    1539          46 :     _impl->commandThread->threadID = threadID;
    1540          46 :     return _impl->commandThread->start();
    1541             : }
    1542             : 
    1543       50596 : bool LocalNode::_notifyCommandThreadIdle()
    1544             : {
    1545       50596 :     return _impl->objectStore->notifyCommandThreadIdle();
    1546             : }
    1547             : 
    1548       20001 : bool LocalNode::_cmdAckRequest( ICommand& command )
    1549             : {
    1550       20001 :     const uint32_t requestID = command.get< uint32_t >();
    1551       20001 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
    1552             : 
    1553       20001 :     serveRequest( requestID );
    1554       20001 :     return true;
    1555             : }
    1556             : 
    1557          45 : bool LocalNode::_cmdStopRcv( ICommand& command )
    1558             : {
    1559          45 :     LB_TS_THREAD( _rcvThread );
    1560          45 :     LBASSERT( isListening( ));
    1561             : 
    1562          45 :     _exitService();
    1563          45 :     _setClosing(); // causes rcv thread exit
    1564             : 
    1565          45 :     command.setCommand( CMD_NODE_STOP_CMD ); // causes cmd thread exit
    1566          45 :     _dispatchCommand( command );
    1567          45 :     return true;
    1568             : }
    1569             : 
    1570          45 : bool LocalNode::_cmdStopCmd( ICommand& )
    1571             : {
    1572          45 :     LB_TS_THREAD( _cmdThread );
    1573          45 :     LBASSERTINFO( isClosing(), *this );
    1574             : 
    1575          45 :     _setClosed();
    1576          45 :     return true;
    1577             : }
    1578             : 
    1579           0 : bool LocalNode::_cmdSetAffinity( ICommand& command )
    1580             : {
    1581           0 :     const int32_t affinity = command.get< int32_t >();
    1582             : 
    1583           0 :     lunchbox::Thread::setAffinity( affinity );
    1584           0 :     return true;
    1585             : }
    1586             : 
    1587          33 : bool LocalNode::_cmdConnect( ICommand& command )
    1588             : {
    1589          33 :     LBASSERTINFO( !command.getRemoteNode(), command );
    1590          33 :     LBASSERT( _impl->inReceiverThread( ));
    1591             : 
    1592          33 :     const NodeID& nodeID = command.get< NodeID >();
    1593          33 :     const uint32_t requestID = command.get< uint32_t >();
    1594          33 :     const uint32_t nodeType = command.get< uint32_t >();
    1595          33 :     std::string data = command.get< std::string >();
    1596             : 
    1597          33 :     LBVERB << "handle connect " << command << " req " << requestID << " type "
    1598          33 :            << nodeType << " data " << data << std::endl;
    1599             : 
    1600          66 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1601             : 
    1602          33 :     LBASSERT( connection );
    1603          33 :     LBASSERT( nodeID != getNodeID() );
    1604          33 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1605             :               _impl->connectionNodes.end( ));
    1606             : 
    1607          66 :     NodePtr peer;
    1608             : #ifdef COLLAGE_BIGENDIAN
    1609             :     uint32_t cmd = CMD_NODE_CONNECT_REPLY_BE;
    1610             :     lunchbox::byteswap( cmd );
    1611             : #else
    1612          33 :     const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
    1613             : #endif
    1614             : 
    1615             :     // No locking needed, only recv thread modifies
    1616          33 :     NodeHashCIter i = _impl->nodes->find( nodeID );
    1617          33 :     if( i != _impl->nodes->end( ))
    1618             :     {
    1619           0 :         peer = i->second;
    1620           0 :         if( peer->isReachable( ))
    1621             :         {
    1622             :             // Node exists, probably simultaneous connect from peer
    1623           0 :             LBINFO << "Already got node " << nodeID << ", refusing connect"
    1624           0 :                    << std::endl;
    1625             : 
    1626             :             // refuse connection
    1627             :             OCommand( Connections( 1, connection ), cmd )
    1628           0 :                 << NodeID() << requestID;
    1629             : 
    1630             :             // NOTE: There is no close() here. The reply command above has to be
    1631             :             // received by the peer first, before closing the connection.
    1632           0 :             _removeConnection( connection );
    1633           0 :             return true;
    1634             :         }
    1635             :     }
    1636             : 
    1637             :     // create and add connected node
    1638          33 :     if( !peer )
    1639          33 :         peer = createNode( nodeType );
    1640          33 :     if( !peer )
    1641             :     {
    1642           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1643           0 :                << std::endl;
    1644             : 
    1645             :         // refuse connection
    1646           0 :         OCommand( Connections( 1, connection ), cmd ) << NodeID() << requestID;
    1647             : 
    1648             :         // NOTE: There is no close() here. The reply command above has to be
    1649             :         // received by the peer first, before closing the connection.
    1650           0 :         _removeConnection( connection );
    1651           0 :         return true;
    1652             :     }
    1653             : 
    1654          33 :     if( !peer->deserialize( data ))
    1655           0 :         LBWARN << "Error during node initialization" << std::endl;
    1656          33 :     LBASSERTINFO( data.empty(), data );
    1657          33 :     LBASSERTINFO( peer->getNodeID() == nodeID,
    1658             :                   peer->getNodeID() << "!=" << nodeID );
    1659          33 :     LBASSERT( peer->getType() == nodeType );
    1660             : 
    1661          33 :     _impl->connectionNodes[ connection ] = peer;
    1662             :     {
    1663          33 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1664          33 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1665             :     }
    1666          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1667             : 
    1668             :     // send our information as reply
    1669             :     OCommand( Connections( 1, connection ), cmd )
    1670          33 :         << getNodeID() << requestID << getType() << serialize();
    1671             : 
    1672          66 :     return true;
    1673             : }
    1674             : 
    1675          33 : bool LocalNode::_cmdConnectReply( ICommand& command )
    1676             : {
    1677          33 :     LBASSERT( !command.getRemoteNode( ));
    1678          33 :     LBASSERT( _impl->inReceiverThread( ));
    1679             : 
    1680          33 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1681          33 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1682             :               _impl->connectionNodes.end( ));
    1683             : 
    1684          33 :     const NodeID& nodeID = command.get< NodeID >();
    1685          33 :     const uint32_t requestID = command.get< uint32_t >();
    1686             : 
    1687             :     // connection refused
    1688          33 :     if( nodeID == 0 )
    1689             :     {
    1690           0 :         LBINFO << "Connection refused, node already connected by peer"
    1691           0 :                << std::endl;
    1692             : 
    1693           0 :         _removeConnection( connection );
    1694           0 :         serveRequest( requestID, false );
    1695           0 :         return true;
    1696             :     }
    1697             : 
    1698          33 :     const uint32_t nodeType = command.get< uint32_t >();
    1699          66 :     std::string data = command.get< std::string >();
    1700             : 
    1701          33 :     LBVERB << "handle connect reply " << command << " req " << requestID
    1702          33 :            << " type " << nodeType << " data " << data << std::endl;
    1703             : 
    1704             :     // No locking needed, only recv thread modifies
    1705          33 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1706          66 :     NodePtr peer;
    1707          33 :     if( i != _impl->nodes->end( ))
    1708           0 :         peer = i->second;
    1709             : 
    1710          33 :     if( peer && peer->isReachable( )) // simultaneous connect
    1711             :     {
    1712           0 :         LBINFO << "Closing simultaneous connection from " << peer << " on "
    1713           0 :                << connection << std::endl;
    1714             : 
    1715           0 :         _removeConnection( connection );
    1716           0 :         _closeNode( peer );
    1717           0 :         serveRequest( requestID, false );
    1718           0 :         return true;
    1719             :     }
    1720             : 
    1721             :     // create and add node
    1722          33 :     if( !peer )
    1723             :     {
    1724          33 :         if( requestID != LB_UNDEFINED_UINT32 )
    1725          33 :             peer = reinterpret_cast< Node* >( getRequestData( requestID ));
    1726             :         else
    1727           0 :             peer = createNode( nodeType );
    1728             :     }
    1729          33 :     if( !peer )
    1730             :     {
    1731           0 :         LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
    1732           0 :                << std::endl;
    1733           0 :         _removeConnection( connection );
    1734           0 :         return true;
    1735             :     }
    1736             : 
    1737          33 :     LBASSERTINFO( peer->getType() == nodeType,
    1738             :                   peer->getType() << " != " << nodeType );
    1739          33 :     LBASSERT( peer->isClosed( ));
    1740             : 
    1741          33 :     if( !peer->deserialize( data ))
    1742           0 :         LBWARN << "Error during node initialization" << std::endl;
    1743          33 :     LBASSERT( data.empty( ));
    1744          33 :     LBASSERT( peer->getNodeID() == nodeID );
    1745             : 
    1746             :     // send ACK to peer
    1747             :     // cppcheck-suppress unusedScopedObject
    1748          33 :     OCommand( Connections( 1, connection ), CMD_NODE_CONNECT_ACK );
    1749             : 
    1750          33 :     peer->_connect( connection );
    1751          33 :     _impl->connectionNodes[ connection ] = peer;
    1752             :     {
    1753          33 :         lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1754          33 :         _impl->nodes.data[ peer->getNodeID() ] = peer;
    1755             :     }
    1756          33 :     _connectMulticast( peer );
    1757          33 :     LBVERB << "Added node " << nodeID << std::endl;
    1758             : 
    1759          33 :     serveRequest( requestID, true );
    1760             : 
    1761          33 :     notifyConnect( peer );
    1762          66 :     return true;
    1763             : }
    1764             : 
    1765          33 : bool LocalNode::_cmdConnectAck( ICommand& command )
    1766             : {
    1767          33 :     NodePtr node = command.getRemoteNode();
    1768          33 :     LBASSERT( node );
    1769          33 :     LBASSERT( _impl->inReceiverThread( ));
    1770          33 :     LBVERB << "handle connect ack" << std::endl;
    1771             : 
    1772          33 :     node->_connect( _impl->incoming.getConnection( ));
    1773          33 :     _connectMulticast( node );
    1774          33 :     notifyConnect( node );
    1775          33 :     return true;
    1776             : }
    1777             : 
    1778           0 : bool LocalNode::_cmdID( ICommand& command )
    1779             : {
    1780           0 :     LBASSERT( _impl->inReceiverThread( ));
    1781             : 
    1782           0 :     const NodeID& nodeID = command.get< NodeID >();
    1783           0 :     uint32_t nodeType = command.get< uint32_t >();
    1784           0 :     std::string data = command.get< std::string >();
    1785             : 
    1786           0 :     if( command.getRemoteNode( ))
    1787             :     {
    1788           0 :         LBASSERT( nodeID == command.getRemoteNode()->getNodeID( ));
    1789           0 :         LBASSERT( command.getRemoteNode()->_getMulticast( ));
    1790           0 :         return true;
    1791             :     }
    1792             : 
    1793           0 :     LBINFO << "handle ID " << command << " node " << nodeID << std::endl;
    1794             : 
    1795           0 :     ConnectionPtr connection = _impl->incoming.getConnection();
    1796           0 :     LBASSERT( connection->isMulticast( ));
    1797           0 :     LBASSERT( _impl->connectionNodes.find( connection ) ==
    1798             :               _impl->connectionNodes.end( ));
    1799             : 
    1800           0 :     NodePtr node;
    1801           0 :     if( nodeID == getNodeID() ) // 'self' multicast connection
    1802           0 :         node = this;
    1803             :     else
    1804             :     {
    1805             :         // No locking needed, only recv thread writes
    1806           0 :         NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1807             : 
    1808           0 :         if( i == _impl->nodes->end( ))
    1809             :         {
    1810             :             // unknown node: create and add unconnected node
    1811           0 :             node = createNode( nodeType );
    1812             : 
    1813           0 :             if( !node->deserialize( data ))
    1814           0 :                 LBWARN << "Error during node initialization" << std::endl;
    1815           0 :             LBASSERTINFO( data.empty(), data );
    1816             : 
    1817             :             {
    1818           0 :                 lunchbox::ScopedFastWrite mutex( _impl->nodes );
    1819           0 :                 _impl->nodes.data[ nodeID ] = node;
    1820             :             }
    1821           0 :             LBVERB << "Added node " << nodeID << " with multicast "
    1822           0 :                    << connection << std::endl;
    1823             :         }
    1824             :         else
    1825           0 :             node = i->second;
    1826             :     }
    1827           0 :     LBASSERT( node );
    1828           0 :     LBASSERTINFO( node->getNodeID() == nodeID,
    1829             :                   node->getNodeID() << "!=" << nodeID );
    1830             : 
    1831           0 :     _connectMulticast( node, connection );
    1832           0 :     _impl->connectionNodes[ connection ] = node;
    1833           0 :     LBINFO << "Added multicast connection " << connection << " from " << nodeID
    1834           0 :            << " to " << getNodeID() << std::endl;
    1835           0 :     return true;
    1836             : }
    1837             : 
    1838           7 : bool LocalNode::_cmdDisconnect( ICommand& command )
    1839             : {
    1840           7 :     LBASSERT( _impl->inReceiverThread( ));
    1841             : 
    1842           7 :     const uint32_t requestID = command.get< uint32_t >();
    1843             : 
    1844           7 :     NodePtr node = static_cast<Node*>( getRequestData( requestID ));
    1845           7 :     LBASSERT( node );
    1846             : 
    1847           7 :     _closeNode( node );
    1848           7 :     LBASSERT( node->isClosed( ));
    1849           7 :     serveRequest( requestID );
    1850           7 :     return true;
    1851             : }
    1852             : 
    1853           0 : bool LocalNode::_cmdGetNodeData( ICommand& command )
    1854             : {
    1855           0 :     const NodeID& nodeID = command.get< NodeID >();
    1856           0 :     const uint32_t requestID = command.get< uint32_t >();
    1857             : 
    1858           0 :     LBVERB << "cmd get node data: " << command << " req " << requestID
    1859           0 :            << " nodeID " << nodeID << std::endl;
    1860             : 
    1861           0 :     NodePtr node = getNode( nodeID );
    1862           0 :     NodePtr toNode = command.getRemoteNode();
    1863             : 
    1864           0 :     uint32_t nodeType = NODETYPE_INVALID;
    1865           0 :     std::string nodeData;
    1866           0 :     if( node )
    1867             :     {
    1868           0 :         nodeType = node->getType();
    1869           0 :         nodeData = node->serialize();
    1870           0 :         LBINFO << "Sent node data '" << nodeData << "' for " << nodeID << " to "
    1871           0 :                << toNode << std::endl;
    1872             :     }
    1873             : 
    1874             :     toNode->send( CMD_NODE_GET_NODE_DATA_REPLY )
    1875           0 :         << nodeID << requestID << nodeType << nodeData;
    1876           0 :     return true;
    1877             : }
    1878             : 
    1879           0 : bool LocalNode::_cmdGetNodeDataReply( ICommand& command )
    1880             : {
    1881           0 :     LBASSERT( _impl->inReceiverThread( ));
    1882             : 
    1883           0 :     const NodeID& nodeID = command.get< NodeID >();
    1884           0 :     const uint32_t requestID = command.get< uint32_t >();
    1885           0 :     const uint32_t nodeType = command.get< uint32_t >();
    1886           0 :     std::string nodeData = command.get< std::string >();
    1887             : 
    1888           0 :     LBVERB << "cmd get node data reply: " << command << " req " << requestID
    1889           0 :            << " type " << nodeType << " data " << nodeData << std::endl;
    1890             : 
    1891             :     // No locking needed, only recv thread writes
    1892           0 :     NodeHash::const_iterator i = _impl->nodes->find( nodeID );
    1893           0 :     if( i != _impl->nodes->end( ))
    1894             :     {
    1895             :         // Requested node connected to us in the meantime
    1896           0 :         NodePtr node = i->second;
    1897             : 
    1898           0 :         node->ref( this );
    1899           0 :         serveRequest( requestID, node.get( ));
    1900           0 :         return true;
    1901             :     }
    1902             : 
    1903           0 :     if( nodeType == NODETYPE_INVALID )
    1904             :     {
    1905           0 :         serveRequest( requestID, (void*)0 );
    1906           0 :         return true;
    1907             :     }
    1908             : 
    1909             :     // new node: create and add unconnected node
    1910           0 :     NodePtr node = createNode( nodeType );
    1911           0 :     if( node )
    1912             :     {
    1913           0 :         LBASSERT( node );
    1914             : 
    1915           0 :         if( !node->deserialize( nodeData ))
    1916           0 :             LBWARN << "Failed to initialize node data" << std::endl;
    1917           0 :         LBASSERT( nodeData.empty( ));
    1918           0 :         node->ref( this );
    1919             :     }
    1920             :     else
    1921           0 :         LBINFO << "Can't create node of type " << nodeType << std::endl;
    1922             : 
    1923           0 :     serveRequest( requestID, node.get( ));
    1924           0 :     return true;
    1925             : }
    1926             : 
    1927           0 : bool LocalNode::_cmdAcquireSendToken( ICommand& command )
    1928             : {
    1929           0 :     LBASSERT( inCommandThread( ));
    1930           0 :     if( !_impl->sendToken ) // enqueue command if no token available
    1931             :     {
    1932           0 :         const uint32_t timeout = Global::getTimeout();
    1933           0 :         if( timeout == LB_TIMEOUT_INDEFINITE ||
    1934           0 :             ( getTime64() - _impl->lastSendToken <= timeout ))
    1935             :         {
    1936           0 :             _impl->sendTokenQueue.push_back( command );
    1937           0 :             return true;
    1938             :         }
    1939             : 
    1940             :         // timeout! - clear old requests
    1941           0 :         _impl->sendTokenQueue.clear();
    1942             :         // 'generate' new token - release is robust
    1943             :     }
    1944             : 
    1945           0 :     _impl->sendToken = false;
    1946             : 
    1947           0 :     const uint32_t requestID = command.get< uint32_t >();
    1948           0 :     command.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1949           0 :             << requestID;
    1950           0 :     return true;
    1951             : }
    1952             : 
    1953           0 : bool LocalNode::_cmdAcquireSendTokenReply( ICommand& command )
    1954             : {
    1955           0 :     const uint32_t requestID = command.get< uint32_t >();
    1956           0 :     serveRequest( requestID );
    1957           0 :     return true;
    1958             : }
    1959             : 
    1960           0 : bool LocalNode::_cmdReleaseSendToken( ICommand& )
    1961             : {
    1962           0 :     LBASSERT( inCommandThread( ));
    1963           0 :     _impl->lastSendToken = getTime64();
    1964             : 
    1965           0 :     if( _impl->sendToken )
    1966           0 :         return true; // double release due to timeout
    1967           0 :     if( _impl->sendTokenQueue.empty( ))
    1968             :     {
    1969           0 :         _impl->sendToken = true;
    1970           0 :         return true;
    1971             :     }
    1972             : 
    1973           0 :     ICommand& request = _impl->sendTokenQueue.front();
    1974             : 
    1975           0 :     const uint32_t requestID = request.get< uint32_t >();
    1976           0 :     request.getRemoteNode()->send( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY )
    1977           0 :             << requestID;
    1978           0 :     _impl->sendTokenQueue.pop_front();
    1979           0 :     return true;
    1980             : }
    1981             : 
    1982           0 : bool LocalNode::_cmdAddListener( ICommand& command )
    1983             : {
    1984             :     Connection* rawConnection =
    1985           0 :         reinterpret_cast< Connection* >( command.get< uint64_t >( ));
    1986           0 :     std::string data = command.get< std::string >();
    1987             : 
    1988           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    1989           0 :     command.getRemoteNode()->_addConnectionDescription( description );
    1990             : 
    1991           0 :     if( command.getRemoteNode() != this )
    1992           0 :         return true;
    1993             : 
    1994           0 :     ConnectionPtr connection = rawConnection;
    1995           0 :     connection->unref();
    1996           0 :     LBASSERT( connection );
    1997             : 
    1998           0 :     _impl->connectionNodes[ connection ] = this;
    1999           0 :     if( connection->isMulticast( ))
    2000           0 :         _addMulticast( this, connection );
    2001             : 
    2002           0 :     connection->acceptNB();
    2003           0 :     _impl->incoming.addConnection( connection );
    2004             : 
    2005           0 :     _initService(); // update zeroconf
    2006           0 :     return true;
    2007             : }
    2008             : 
    2009           0 : bool LocalNode::_cmdRemoveListener( ICommand& command )
    2010             : {
    2011           0 :     const uint32_t requestID = command.get< uint32_t >();
    2012           0 :     Connection* rawConnection = command.get< Connection* >();
    2013           0 :     std::string data = command.get< std::string >();
    2014             : 
    2015           0 :     ConnectionDescriptionPtr description = new ConnectionDescription( data );
    2016           0 :     LBCHECK(
    2017             :           command.getRemoteNode()->_removeConnectionDescription( description ));
    2018             : 
    2019           0 :     if( command.getRemoteNode() != this )
    2020           0 :         return true;
    2021             : 
    2022           0 :     _initService(); // update zeroconf
    2023             : 
    2024           0 :     ConnectionPtr connection = rawConnection;
    2025           0 :     connection->unref( this );
    2026           0 :     LBASSERT( connection );
    2027             : 
    2028           0 :     if( connection->isMulticast( ))
    2029           0 :         _removeMulticast( connection );
    2030             : 
    2031           0 :     _impl->incoming.removeConnection( connection );
    2032           0 :     LBASSERT( _impl->connectionNodes.find( connection ) !=
    2033             :               _impl->connectionNodes.end( ));
    2034           0 :     _impl->connectionNodes.erase( connection );
    2035           0 :     serveRequest( requestID );
    2036           0 :     return true;
    2037             : }
    2038             : 
    2039           0 : bool LocalNode::_cmdPing( ICommand& command )
    2040             : {
    2041           0 :     LBASSERT( inCommandThread( ));
    2042           0 :     command.getRemoteNode()->send( CMD_NODE_PING_REPLY );
    2043           0 :     return true;
    2044             : }
    2045             : 
    2046           2 : bool LocalNode::_cmdCommand( ICommand& command )
    2047             : {
    2048           2 :     const uint128_t& commandID = command.get< uint128_t >();
    2049           2 :     CommandHandler func;
    2050             :     {
    2051           2 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2052           2 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2053           2 :         if( i == _impl->commandHandlers->end( ))
    2054           0 :             return false;
    2055             : 
    2056           2 :         CommandQueue* queue = i->second.second;
    2057           2 :         if( queue )
    2058             :         {
    2059             :             command.setDispatchFunction( CmdFunc( this,
    2060           1 :                                                 &LocalNode::_cmdCommandAsync ));
    2061           1 :             queue->push( command );
    2062           1 :             return true;
    2063             :         }
    2064             :         // else
    2065             : 
    2066           1 :         func = i->second.first;
    2067             :     }
    2068             : 
    2069           2 :     CustomICommand customCmd( command );
    2070           3 :     return func( customCmd );
    2071             : }
    2072             : 
    2073           1 : bool LocalNode::_cmdCommandAsync( ICommand& command )
    2074             : {
    2075           1 :     const uint128_t& commandID = command.get< uint128_t >();
    2076           1 :     CommandHandler func;
    2077             :     {
    2078           1 :         lunchbox::ScopedFastRead mutex( _impl->commandHandlers );
    2079           1 :         CommandHashCIter i = _impl->commandHandlers->find( commandID );
    2080           1 :         LBASSERT( i != _impl->commandHandlers->end( ));
    2081           1 :         if( i ==  _impl->commandHandlers->end( ))
    2082           0 :             return true; // deregistered between dispatch and now
    2083           1 :         func = i->second.first;
    2084             :     }
    2085           2 :     CustomICommand customCmd( command );
    2086           2 :     return func( customCmd );
    2087             : }
    2088             : 
    2089          33 : bool LocalNode::_cmdAddConnection( ICommand& command )
    2090             : {
    2091          33 :     LBASSERT( _impl->inReceiverThread( ));
    2092             : 
    2093          33 :     ConnectionPtr connection = command.get< ConnectionPtr >();
    2094          33 :     _addConnection( connection );
    2095          33 :     connection->unref(); // ref'd by _addConnection
    2096          33 :     return true;
    2097             : }
    2098             : 
    2099          60 : }

Generated by: LCOV version 1.10