LCOV - code coverage report
Current view: top level - co - objectStore.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 539 742 72.6 %
Date: 2014-10-06 Functions: 42 48 87.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "objectStore.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "instanceCache.h"
      28             : #include "log.h"
      29             : #include "masterCMCommand.h"
      30             : #include "nodeCommand.h"
      31             : #include "oCommand.h"
      32             : #include "objectCM.h"
      33             : #include "objectCommand.h"
      34             : #include "objectDataICommand.h"
      35             : #include "objectDataIStream.h"
      36             : 
      37             : #include <lunchbox/futureFunction.h>
      38             : #include <lunchbox/scopedMutex.h>
      39             : 
      40             : #include <boost/bind.hpp>
      41             : 
      42             : #include <limits>
      43             : 
      44             : //#define DEBUG_DISPATCH
      45             : #ifdef DEBUG_DISPATCH
      46             : #  include <set>
      47             : #endif
      48             : 
      49             : namespace co
      50             : {
      51             : typedef CommandFunc< ObjectStore > CmdFunc;
      52             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      53             : 
      54          51 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
      55             :         : _localNode( localNode )
      56             :         , _instanceIDs( -0x7FFFFFFF )
      57             :         , _instanceCache( new InstanceCache( Global::getIAttribute(
      58         102 :                               Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
      59         153 :         , _counters( counters )
      60             : {
      61          51 :     LBASSERT( localNode );
      62          51 :     CommandQueue* queue = localNode->getCommandThreadQueue();
      63             : 
      64             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
      65          51 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
      66             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
      67          51 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
      68             :     localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
      69          51 :         CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
      70             :     localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
      71          51 :         CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
      72             :     localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
      73          51 :         CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
      74             :     localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
      75          51 :         CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
      76             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
      77          51 :         CmdFunc( this, &ObjectStore::_cmdMap ), queue );
      78             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
      79          51 :         CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
      80             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
      81          51 :         CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
      82             :     localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
      83          51 :         CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
      84             :     localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
      85          51 :         CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
      86             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
      87          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      88             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
      89          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      90             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
      91          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      92             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
      93          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      94             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
      95          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      96             :     localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
      97          51 :         CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
      98             :     localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
      99          51 :         CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
     100             :     localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
     101          51 :         CmdFunc( this, &ObjectStore::_cmdPush ), queue );
     102             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
     103          51 :         CmdFunc( this, &ObjectStore::_cmdSync ), queue );
     104             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
     105          51 :         CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
     106          51 : }
     107             : 
     108         147 : ObjectStore::~ObjectStore()
     109             : {
     110          49 :     LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
     111             : 
     112             : #ifndef NDEBUG
     113          49 :     if( !_objects->empty( ))
     114             :     {
     115           0 :         LBWARN << _objects->size() << " attached objects in destructor"
     116           0 :                << std::endl;
     117             : 
     118           0 :         for( ObjectsHash::const_iterator i = _objects->begin();
     119           0 :              i != _objects->end(); ++i )
     120             :         {
     121           0 :             const Objects& objects = i->second;
     122           0 :             LBWARN << "  " << objects.size() << " objects with id "
     123           0 :                    << i->first << std::endl;
     124             : 
     125           0 :             for( Objects::const_iterator j = objects.begin();
     126           0 :                  j != objects.end(); ++j )
     127             :             {
     128           0 :                 const Object* object = *j;
     129           0 :                 LBINFO << "    object type " << lunchbox::className( object )
     130           0 :                        << std::endl;
     131             :             }
     132             :         }
     133             :     }
     134             :     //LBASSERT( _objects->empty( ))
     135             : #endif
     136          49 :    clear();
     137          49 :    delete _instanceCache;
     138          49 :    _instanceCache = 0;
     139          98 : }
     140             : 
     141          94 : void ObjectStore::clear( )
     142             : {
     143          94 :     LBASSERT( _objects->empty( ));
     144          94 :     expireInstanceData( 0 );
     145          94 :     LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
     146             : 
     147          94 :     _objects->clear();
     148          94 :     _sendQueue.clear();
     149          94 : }
     150             : 
     151           0 : void ObjectStore::disableInstanceCache()
     152             : {
     153           0 :     LBASSERT( _localNode->isClosed( ));
     154           0 :     delete _instanceCache;
     155           0 :     _instanceCache = 0;
     156           0 : }
     157             : 
     158          94 : void ObjectStore::expireInstanceData( const int64_t age )
     159             : {
     160          94 :     if( _instanceCache )
     161          94 :         _instanceCache->expire( age );
     162          94 : }
     163             : 
     164         110 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
     165             : {
     166         110 :     if( _instanceCache )
     167         110 :         _instanceCache->remove( nodeID );
     168         110 : }
     169             : 
     170           0 : void ObjectStore::enableSendOnRegister()
     171             : {
     172           0 :     ++_sendOnRegister;
     173           0 : }
     174             : 
     175           0 : void ObjectStore::disableSendOnRegister()
     176             : {
     177           0 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     178             :     {
     179             :         lunchbox::Request< void > request =
     180           0 :             _localNode->registerRequest< void >();
     181           0 :         _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
     182             :     }
     183             :     else // OPT
     184           0 :         --_sendOnRegister;
     185           0 : }
     186             : 
     187             : //---------------------------------------------------------------------------
     188             : // identifier master node mapping
     189             : //---------------------------------------------------------------------------
     190          39 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
     191             : {
     192          39 :     LB_TS_NOT_THREAD( _commandThread );
     193             : 
     194             :     // OPT: look up locally first?
     195          39 :     Nodes nodes;
     196          39 :     _localNode->getNodes( nodes );
     197             : 
     198             :     // OPT: send to multiple nodes at once?
     199          39 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     200             :     {
     201          39 :         NodePtr node = *i;
     202             :         lunchbox::Request< NodeID > request =
     203          39 :             _localNode->registerRequest< NodeID >();
     204             : 
     205          78 :         LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
     206         117 :                              << " req " << request.getID() << std::endl;
     207          39 :         node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
     208             : 
     209          39 :         const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
     210             : 
     211          39 :         if( masterNodeID != 0 )
     212             :         {
     213          39 :             LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
     214          39 :                                  << masterNodeID << std::endl;
     215          39 :             return masterNodeID;
     216             :         }
     217           0 :     }
     218             : 
     219           0 :     return NodeID();
     220             : }
     221             : 
     222             : //---------------------------------------------------------------------------
     223             : // object mapping
     224             : //---------------------------------------------------------------------------
     225          20 : void ObjectStore::attach( Object* object, const uint128_t& id,
     226             :                           const uint32_t instanceID )
     227             : {
     228          20 :     LBASSERT( object );
     229          20 :     LB_TS_NOT_THREAD( _receiverThread );
     230             : 
     231             :     lunchbox::Request< void > request =
     232          20 :         _localNode->registerRequest< void >( object );
     233          20 :     _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
     234          20 : }
     235             : 
     236             : namespace
     237             : {
     238          61 : uint32_t _genNextID( lunchbox::a_int32_t& val )
     239             : {
     240             :     uint32_t result;
     241          61 :     do
     242             :     {
     243          61 :         const long id = ++val;
     244             :         result = static_cast< uint32_t >(
     245          61 :             static_cast< int64_t >( id ) + 0x7FFFFFFFu );
     246             :     }
     247             :     while( result > CO_INSTANCE_MAX );
     248             : 
     249          61 :     return result;
     250             : }
     251             : }
     252             : 
     253          61 : void ObjectStore::_attach( Object* object, const uint128_t& id,
     254             :                            const uint32_t inInstanceID )
     255             : {
     256          61 :     LBASSERT( object );
     257          61 :     LB_TS_THREAD( _receiverThread );
     258             : 
     259          61 :     uint32_t instanceID = inInstanceID;
     260          61 :     if( inInstanceID == CO_INSTANCE_INVALID )
     261          20 :         instanceID = _genNextID( _instanceIDs );
     262             : 
     263          61 :     object->attach( id, instanceID );
     264             : 
     265             :     {
     266          61 :         lunchbox::ScopedFastWrite mutex( _objects );
     267          61 :         Objects& objects = _objects.data[ id ];
     268          61 :         LBASSERTINFO( !object->isMaster() || objects.empty(),
     269             :                       "Attaching master " << *object << ", " <<
     270             :                       objects.size() << " attached objects with same ID, " <<
     271             :                       "first is " << ( objects[0]->isMaster() ? "master " :
     272             :                                        "slave " ) << *objects[0] );
     273          61 :         objects.push_back( object );
     274             :     }
     275             : 
     276          61 :     _localNode->flushCommands(); // redispatch pending commands
     277             : 
     278          61 :     LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
     279          61 :                          << static_cast< void* >( object ) << std::endl;
     280          61 : }
     281             : 
     282          28 : void ObjectStore::detach( Object* object )
     283             : {
     284          28 :     LBASSERT( object );
     285          28 :     LB_TS_NOT_THREAD( _receiverThread );
     286             : 
     287          28 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     288             :     _localNode->send( CMD_NODE_DETACH_OBJECT )
     289          28 :         << object->getID() << object->getInstanceID() << request;
     290          28 : }
     291             : 
     292           0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
     293             : {
     294           0 :     LBASSERT( newObject );
     295           0 :     LBASSERT( oldObject );
     296           0 :     LBASSERT( oldObject->isMaster() );
     297           0 :     LB_TS_THREAD( _receiverThread );
     298             : 
     299           0 :     if( !oldObject->isAttached() )
     300           0 :         return;
     301             : 
     302           0 :     LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
     303           0 :                          << std::endl;
     304           0 :     const uint128_t& id = oldObject->getID();
     305             : 
     306           0 :     lunchbox::ScopedFastWrite mutex( _objects );
     307           0 :     ObjectsHash::iterator i = _objects->find( id );
     308           0 :     LBASSERT( i != _objects->end( ));
     309           0 :     if( i == _objects->end( ))
     310           0 :         return;
     311             : 
     312           0 :     Objects& objects = i->second;
     313           0 :     Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
     314           0 :     LBASSERT( j != objects.end( ));
     315           0 :     if( j == objects.end( ))
     316           0 :         return;
     317             : 
     318           0 :     newObject->transfer( oldObject );
     319           0 :     *j = newObject;
     320             : }
     321             : 
     322          59 : void ObjectStore::_detach( Object* object )
     323             : {
     324             :     // check also _cmdUnmapObject when modifying!
     325          59 :     LBASSERT( object );
     326          59 :     LB_TS_THREAD( _receiverThread );
     327             : 
     328          59 :     if( !object->isAttached() )
     329           0 :         return;
     330             : 
     331          59 :     const uint128_t& id = object->getID();
     332             : 
     333          59 :     LBASSERT( _objects->find( id ) != _objects->end( ));
     334          59 :     LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
     335             : 
     336          59 :     Objects& objects = _objects.data[ id ];
     337          59 :     Objects::iterator i = find( objects.begin(),objects.end(), object );
     338          59 :     LBASSERT( i != objects.end( ));
     339             : 
     340             :     {
     341          59 :         lunchbox::ScopedFastWrite mutex( _objects );
     342          59 :         objects.erase( i );
     343          59 :         if( objects.empty( ))
     344          58 :             _objects->erase( id );
     345             :     }
     346             : 
     347          59 :     LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
     348          59 :     object->detach();
     349          59 :     return;
     350             : }
     351             : 
     352          41 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
     353             :                              const uint128_t& version, NodePtr master )
     354             : {
     355          41 :     LB_TS_NOT_THREAD( _receiverThread );
     356          41 :     LBLOG( LOG_OBJECTS )
     357          41 :         << "Mapping " << lunchbox::className( object ) << " to id " << id
     358         123 :         << " version " << version << std::endl;
     359          41 :     LBASSERT( object );
     360          41 :     LBASSERTINFO( id.isUUID(), id );
     361             : 
     362          41 :     if( !master )
     363          39 :         master = _localNode->connectObjectMaster( id );
     364             : 
     365          41 :     if( !master || !master->isReachable( ))
     366             :     {
     367           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     368           0 :                << std::endl;
     369           0 :         return LB_UNDEFINED_UINT32;
     370             :     }
     371             : 
     372          41 :     if( !object || !id.isUUID( ))
     373             :     {
     374           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     375           0 :         return LB_UNDEFINED_UINT32;
     376             :     }
     377             : 
     378          41 :     const bool isAttached = object->isAttached();
     379          41 :     const bool isMaster = object->isMaster();
     380          41 :     LBASSERT( !isAttached );
     381          41 :     LBASSERT( !isMaster ) ;
     382          41 :     if( isAttached || isMaster )
     383             :     {
     384           0 :         LBWARN << "Invalid object state: attached " << isAttached << " master "
     385           0 :                << isMaster << std::endl;
     386           0 :         return LB_UNDEFINED_UINT32;
     387             :     }
     388             : 
     389             :     lunchbox::Request< void > request =
     390          41 :         _localNode->registerRequest< void >( object );
     391          41 :     uint128_t minCachedVersion = VERSION_HEAD;
     392          41 :     uint128_t maxCachedVersion = VERSION_NONE;
     393          41 :     uint32_t masterInstanceID = 0;
     394             :     const bool useCache = _checkInstanceCache( id, minCachedVersion,
     395             :                                                maxCachedVersion,
     396          41 :                                                masterInstanceID );
     397          41 :     object->notifyAttach();
     398             :     master->send( CMD_NODE_MAP_OBJECT )
     399          82 :         << version << minCachedVersion << maxCachedVersion << id
     400         123 :         << object->getMaxVersions() << request << _genNextID( _instanceIDs )
     401          41 :         << masterInstanceID << useCache;
     402          41 :     request.relinquish();
     403          41 :     return request.getID();
     404             : }
     405             : 
     406          45 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
     407             :                                        uint128_t& to, uint32_t& instanceID )
     408             : {
     409          45 :     if( !_instanceCache )
     410           0 :         return false;
     411             : 
     412          45 :     const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     413          45 :     if( cached == InstanceCache::Data::NONE )
     414          45 :         return false;
     415             : 
     416           0 :     const ObjectDataIStreamDeque& versions = cached.versions;
     417           0 :     LBASSERT( !cached.versions.empty( ));
     418           0 :     instanceID = cached.masterInstanceID;
     419           0 :     from = versions.front()->getVersion();
     420           0 :     to = versions.back()->getVersion();
     421           0 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
     422           0 :                          << std::endl;
     423           0 :     return true;
     424             : }
     425             : 
     426          41 : bool ObjectStore::mapSync( const uint32_t requestID )
     427             : {
     428          41 :     if( requestID == LB_UNDEFINED_UINT32 )
     429           0 :         return false;
     430             : 
     431          41 :     void* data = _localNode->getRequestData( requestID );
     432          41 :     if( data == 0 )
     433           0 :         return false;
     434             : 
     435          41 :     Object* object = LBSAFECAST( Object*, data );
     436          41 :     uint128_t version = VERSION_NONE;
     437          41 :     _localNode->waitRequest( requestID, version );
     438             : 
     439          41 :     const bool mapped = object->isAttached();
     440          41 :     if( mapped )
     441          41 :         object->applyMapData( version ); // apply initial instance data
     442             : 
     443          41 :     object->notifyAttached();
     444          41 :     LBLOG( LOG_OBJECTS )
     445          82 :         << "Mapped " << lunchbox::className( object ) << std::endl;
     446          41 :     return mapped;
     447             : }
     448             : 
     449             : 
     450           4 : f_bool_t ObjectStore::sync( Object* object, NodePtr master, const uint128_t& id,
     451             :                             const uint32_t instanceID )
     452             : {
     453           4 :     const uint32_t request = _startSync( object, master, id, instanceID );
     454             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
     455           4 :                                                  this, request, object );
     456           4 :     return f_bool_t( new FuturebImpl( func ));
     457             : }
     458             : 
     459           4 : uint32_t ObjectStore::_startSync( Object* object, NodePtr master,
     460             :                                   const uint128_t& id,
     461             :                                   const uint32_t instanceID )
     462             : {
     463           4 :     LB_TS_NOT_THREAD( _receiverThread );
     464           4 :     LBLOG( LOG_OBJECTS )
     465           4 :         << "Syncing " << lunchbox::className( object ) << " with id " << id
     466          12 :         << std::endl;
     467           4 :     LBASSERT( object );
     468           4 :     LBASSERTINFO( id.isUUID(), id );
     469             : 
     470           4 :     if( !object || !id.isUUID( ))
     471             :     {
     472           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     473           0 :         return LB_UNDEFINED_UINT32;
     474             :     }
     475             : 
     476           4 :     if( !master )
     477           0 :         master = _localNode->connectObjectMaster( id );
     478             : 
     479           4 :     if( !master || !master->isReachable( ))
     480             :     {
     481           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     482           0 :                << std::endl;
     483           0 :         return LB_UNDEFINED_UINT32;
     484             :     }
     485             : 
     486             :     lunchbox::Request< void > request =
     487           4 :         _localNode->registerRequest< void >( new ObjectDataIStream );
     488           4 :     uint128_t minCachedVersion = VERSION_HEAD;
     489           4 :     uint128_t maxCachedVersion = VERSION_NONE;
     490           4 :     uint32_t cacheInstanceID = 0;
     491             : 
     492             :     bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
     493           4 :                                          cacheInstanceID );
     494           4 :     if( useCache )
     495             :     {
     496           0 :         switch( instanceID )
     497             :         {
     498             :         case CO_INSTANCE_ALL:
     499           0 :             break;
     500             :         default:
     501           0 :             if( instanceID == cacheInstanceID )
     502           0 :                 break;
     503             : 
     504           0 :             useCache = false;
     505           0 :             LBCHECK( _instanceCache->release( id, 1 ));
     506           0 :             break;
     507             :         }
     508             :     }
     509             : 
     510             :     // Use stream expected by MasterCMCommand
     511             :     master->send( CMD_NODE_SYNC_OBJECT )
     512           8 :         << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
     513          12 :         << uint64_t(0) /* maxVersions */ << request << instanceID
     514           4 :         << cacheInstanceID << useCache;
     515           4 :     request.relinquish();
     516           4 :     return request.getID();
     517             : }
     518             : 
     519           4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
     520             : {
     521           4 :     if( requestID == LB_UNDEFINED_UINT32 )
     522           0 :         return false;
     523             : 
     524           4 :     void* data = _localNode->getRequestData( requestID );
     525           4 :     if( data == 0 )
     526           0 :         return false;
     527             : 
     528           4 :     ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
     529             : 
     530           4 :     bool ok = false;
     531           4 :     _localNode->waitRequest( requestID, ok );
     532             : 
     533           4 :     if( !ok )
     534             :     {
     535           0 :         LBWARN << "Object synchronization failed" << std::endl;
     536           0 :         delete is;
     537           0 :         return false;
     538             :     }
     539             : 
     540           4 :     is->waitReady();
     541           4 :     object->applyInstanceData( *is );
     542           8 :     LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
     543          12 :                          << std::endl;
     544           4 :     delete is;
     545           4 :     return true;
     546             : }
     547             : 
     548          41 : void ObjectStore::unmap( Object* object )
     549             : {
     550          41 :     LBASSERT( object );
     551          41 :     if( !object->isAttached( )) // not registered
     552          34 :         return;
     553             : 
     554          40 :     const uint128_t& id = object->getID();
     555             : 
     556          40 :     LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
     557             : 
     558          40 :     object->notifyDetach();
     559             : 
     560             :     // send unsubscribe to master, master will send detach command.
     561          40 :     LBASSERT( !object->isMaster( ));
     562          40 :     LB_TS_NOT_THREAD( _commandThread );
     563             : 
     564          40 :     const uint32_t masterInstanceID = object->getMasterInstanceID();
     565          40 :     if( masterInstanceID != CO_INSTANCE_INVALID )
     566             :     {
     567          32 :         NodePtr master = object->getMasterNode();
     568          32 :         LBASSERT( master )
     569             : 
     570          32 :         if( master && master->isReachable( ))
     571             :         {
     572             :             lunchbox::Request< void > request =
     573          32 :                 _localNode->registerRequest< void >();
     574             :             master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
     575          32 :                 << id << request << masterInstanceID << object->getInstanceID();
     576          32 :             request.wait();
     577          32 :             object->notifyDetached();
     578          32 :             return;
     579             :         }
     580           0 :         LBERROR << "Master node for object id " << id << " not connected"
     581           0 :                 << std::endl;
     582             :     }
     583             : 
     584             :     // no unsubscribe sent: Detach directly
     585           8 :     detach( object );
     586           8 :     object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
     587           8 :     object->notifyDetached();
     588             : }
     589             : 
     590          20 : bool ObjectStore::register_( Object* object )
     591             : {
     592          20 :     LBASSERT( object );
     593          20 :     LBASSERT( !object->isAttached( ));
     594             : 
     595          20 :     const uint128_t& id = object->getID( );
     596          20 :     LBASSERTINFO( id.isUUID(), id );
     597             : 
     598          20 :     object->notifyAttach();
     599          20 :     object->setupChangeManager( object->getChangeType(), true, _localNode,
     600          40 :                                 CO_INSTANCE_INVALID );
     601          20 :     attach( object, id, CO_INSTANCE_INVALID );
     602             : 
     603          20 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     604          20 :         _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
     605             : 
     606          20 :     object->notifyAttached();
     607             : 
     608          20 :     LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
     609          20 :     return true;
     610             : }
     611             : 
     612          20 : void ObjectStore::deregister( Object* object )
     613             : {
     614          20 :     LBASSERT( object );
     615          20 :     if( !object->isAttached( )) // not registered
     616          20 :         return;
     617             : 
     618          20 :     LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
     619          20 :     LBASSERT( object->isMaster( ));
     620             : 
     621          20 :     object->notifyDetach();
     622             : 
     623          20 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0  )
     624             :     {
     625             :         // remove from send queue
     626             :         lunchbox::Request< void > request =
     627          20 :             _localNode->registerRequest< void >();
     628          20 :         _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
     629             :     }
     630             : 
     631          20 :     const uint128_t id = object->getID();
     632          20 :     detach( object );
     633          20 :     object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
     634          20 :     if( _instanceCache )
     635          20 :         _instanceCache->erase( id );
     636          20 :     object->notifyDetached();
     637             : }
     638             : 
     639       50596 : bool ObjectStore::notifyCommandThreadIdle()
     640             : {
     641       50596 :     LB_TS_THREAD( _commandThread );
     642       50597 :     if( _sendQueue.empty( ))
     643       50597 :         return false;
     644             : 
     645           0 :     LBASSERT( _sendOnRegister > 0 );
     646           0 :     SendQueueItem& item = _sendQueue.front();
     647             : 
     648           0 :     if( item.age > _localNode->getTime64( ))
     649             :     {
     650           0 :         Nodes nodes;
     651           0 :         _localNode->getNodes( nodes, false );
     652           0 :         if( nodes.empty( ))
     653             :         {
     654           0 :             lunchbox::Thread::yield();
     655           0 :             return !_sendQueue.empty();
     656             :         }
     657             : 
     658           0 :         item.object->sendInstanceData( nodes );
     659             :     }
     660           0 :     _sendQueue.pop_front();
     661           0 :     return !_sendQueue.empty();
     662             : }
     663             : 
     664           7 : void ObjectStore::removeNode( NodePtr node )
     665             : {
     666           7 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     667           7 :     _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
     668           7 : }
     669             : 
     670             : //===========================================================================
     671             : // ICommand handling
     672             : //===========================================================================
     673         548 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
     674             : {
     675         548 :     LB_TS_THREAD( _receiverThread );
     676         548 :     ObjectICommand command( cmd );
     677         548 :     const uint128_t& id = command.getObjectID();
     678         548 :     const uint32_t instanceID = command.getInstanceID();
     679             : 
     680         548 :     ObjectsHash::const_iterator i = _objects->find( id );
     681             : 
     682         548 :     if( i == _objects->end( ))
     683             :         // When the instance ID is set to none, we only care about the command
     684             :         // when we have an object of the given ID (multicast)
     685           0 :         return ( instanceID == CO_INSTANCE_NONE );
     686             : 
     687         548 :     const Objects& objects = i->second;
     688         548 :     LBASSERTINFO( !objects.empty(), command );
     689             : 
     690         548 :     if( instanceID <= CO_INSTANCE_MAX )
     691             :     {
     692          59 :         for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
     693             :         {
     694          59 :             Object* object = *j;
     695          59 :             if( instanceID == object->getInstanceID( ))
     696             :             {
     697          53 :                 LBCHECK( object->dispatchCommand( command ));
     698          53 :                 return true;
     699             :             }
     700             :         }
     701           0 :         LBUNREACHABLE;
     702           0 :         return false;
     703             :     }
     704             : 
     705         495 :     Objects::const_iterator j = objects.begin();
     706         495 :     Object* object = *j;
     707         495 :     LBCHECK( object->dispatchCommand( command ));
     708             : 
     709         495 :     for( ++j; j != objects.end(); ++j )
     710             :     {
     711           0 :         object = *j;
     712           0 :         LBCHECK( object->dispatchCommand( command ));
     713             :     }
     714         495 :     return true;
     715             : }
     716             : 
     717          39 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
     718             : {
     719          39 :     LB_TS_THREAD( _commandThread );
     720             : 
     721          39 :     const uint128_t& id = command.get< uint128_t >();
     722          39 :     const uint32_t requestID = command.get< uint32_t >();
     723          39 :     LBASSERT( id.isUUID( ));
     724             : 
     725          39 :     NodeID masterNodeID;
     726             :     {
     727          39 :         lunchbox::ScopedFastRead mutex( _objects );
     728          39 :         ObjectsHashCIter i = _objects->find( id );
     729             : 
     730          39 :         if( i != _objects->end( ))
     731             :         {
     732          39 :             const Objects& objects = i->second;
     733          39 :             LBASSERT( !objects.empty( ));
     734             : 
     735          39 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     736             :             {
     737          39 :                 Object* object = *j;
     738          39 :                 if( object->isMaster( ))
     739          39 :                     masterNodeID = _localNode->getNodeID();
     740             :                 else
     741             :                 {
     742           0 :                     NodePtr master = object->getMasterNode();
     743           0 :                     if( master.isValid( ))
     744           0 :                         masterNodeID = master->getNodeID();
     745             :                 }
     746          39 :                 if( masterNodeID != 0 )
     747          39 :                     break;
     748             :             }
     749          39 :         }
     750             :     }
     751             : 
     752          39 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
     753          39 :                          << " req " << requestID << std::endl;
     754             :     command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
     755          39 :             << masterNodeID << requestID;
     756          39 :     return true;
     757             : }
     758             : 
     759          39 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
     760             : {
     761          39 :     const NodeID& masterNodeID = command.get< NodeID >();
     762          39 :     const uint32_t requestID = command.get< uint32_t >();
     763          39 :     _localNode->serveRequest( requestID, masterNodeID );
     764          39 :     return true;
     765             : }
     766             : 
     767          20 : bool ObjectStore::_cmdAttach( ICommand& command )
     768             : {
     769          20 :     LB_TS_THREAD( _receiverThread );
     770          20 :     LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
     771             : 
     772          20 :     const uint128_t& objectID = command.get< uint128_t >();
     773          20 :     const uint32_t instanceID = command.get< uint32_t >();
     774          20 :     const uint32_t requestID = command.get< uint32_t >();
     775             : 
     776             :     Object* object = static_cast< Object* >( _localNode->getRequestData(
     777          20 :                                                  requestID ));
     778          20 :     _attach( object, objectID, instanceID );
     779          20 :     _localNode->serveRequest( requestID );
     780          20 :     return true;
     781             : }
     782             : 
     783          60 : bool ObjectStore::_cmdDetach( ICommand& command )
     784             : {
     785          60 :     LB_TS_THREAD( _receiverThread );
     786          60 :     LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
     787             : 
     788          60 :     const uint128_t& objectID = command.get< uint128_t >();
     789          60 :     const uint32_t instanceID = command.get< uint32_t >();
     790          60 :     const uint32_t requestID = command.get< uint32_t >();
     791             : 
     792          60 :     ObjectsHash::const_iterator i = _objects->find( objectID );
     793          60 :     if( i != _objects->end( ))
     794             :     {
     795          59 :         const Objects& objects = i->second;
     796             : 
     797         180 :         for( Objects::const_iterator j = objects.begin();
     798         120 :              j != objects.end(); ++j )
     799             :         {
     800          60 :             Object* object = *j;
     801          60 :             if( object->getInstanceID() == instanceID )
     802             :             {
     803          59 :                 _detach( object );
     804          59 :                 break;
     805             :             }
     806             :         }
     807             :     }
     808             : 
     809          60 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
     810          60 :     _localNode->serveRequest( requestID );
     811          60 :     return true;
     812             : }
     813             : 
     814          20 : bool ObjectStore::_cmdRegister( ICommand& command )
     815             : {
     816          20 :     LB_TS_THREAD( _commandThread );
     817          20 :     if( _sendOnRegister <= 0 )
     818          20 :         return true;
     819             : 
     820           0 :     LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
     821             : 
     822           0 :     Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
     823             : 
     824             :     const int32_t age = Global::getIAttribute(
     825           0 :                             Global::IATTR_NODE_SEND_QUEUE_AGE );
     826             :     SendQueueItem item;
     827           0 :     item.age = age ? age + _localNode->getTime64() :
     828           0 :                      std::numeric_limits< int64_t >::max();
     829           0 :     item.object = object;
     830           0 :     _sendQueue.push_back( item );
     831             : 
     832             :     const uint32_t size = Global::getIAttribute(
     833           0 :                              Global::IATTR_NODE_SEND_QUEUE_SIZE );
     834           0 :     while( _sendQueue.size() > size )
     835           0 :         _sendQueue.pop_front();
     836             : 
     837           0 :     return true;
     838             : }
     839             : 
     840          20 : bool ObjectStore::_cmdDeregister( ICommand& command )
     841             : {
     842          20 :     LB_TS_THREAD( _commandThread );
     843          20 :     LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
     844             : 
     845          20 :     const uint32_t requestID = command.get< uint32_t >();
     846             : 
     847          20 :     const void* object = _localNode->getRequestData( requestID );
     848             : 
     849          20 :     for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
     850             :     {
     851           0 :         if( i->object == object )
     852             :         {
     853           0 :             _sendQueue.erase( i );
     854           0 :             break;
     855             :         }
     856             :     }
     857             : 
     858          20 :     _localNode->serveRequest( requestID );
     859          20 :     return true;
     860             : }
     861             : 
     862          41 : bool ObjectStore::_cmdMap( ICommand& cmd )
     863             : {
     864          41 :     LB_TS_THREAD( _commandThread );
     865             : 
     866          41 :     MasterCMCommand command( cmd );
     867          41 :     const uint128_t& id = command.getObjectID();
     868             : 
     869          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
     870           0 :                          << command.getInstanceID() << " req "
     871          41 :                          << command.getRequestID() << std::endl;
     872             : 
     873          82 :     ObjectCMPtr masterCM;
     874             :     {
     875          41 :         lunchbox::ScopedFastRead mutex( _objects );
     876          41 :         ObjectsHash::const_iterator i = _objects->find( id );
     877          41 :         if( i != _objects->end( ))
     878             :         {
     879          41 :             const Objects& objects = i->second;
     880             : 
     881          41 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     882             :             {
     883          41 :                 Object* object = *j;
     884          41 :                 if( object->isMaster( ))
     885             :                 {
     886          41 :                     masterCM = object->_getChangeManager();
     887          41 :                     break;
     888             :                 }
     889             :             }
     890          41 :         }
     891             :     }
     892             : 
     893          41 :     if( !masterCM || !masterCM->addSlave( command ))
     894             :     {
     895           0 :         LBWARN << "Can't find master object to map " << id << std::endl;
     896           0 :         NodePtr node = command.getNode();
     897             :         node->send( CMD_NODE_MAP_OBJECT_REPLY )
     898           0 :             << node->getNodeID() << id << command.getRequestedVersion()
     899           0 :             << command.getRequestID() << false << command.useCache() << false;
     900             :     }
     901             : 
     902          41 :     ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
     903          82 :     return true;
     904             : }
     905             : 
     906          41 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
     907             : {
     908          41 :     LB_TS_THREAD( _receiverThread );
     909             : 
     910          41 :     const uint128_t& nodeID = command.get< uint128_t >();
     911          41 :     const uint128_t& objectID = command.get< uint128_t >();
     912          41 :     const uint32_t requestID = command.get< uint32_t >();
     913          41 :     const uint32_t instanceID = command.get< uint32_t >();
     914          41 :     const Object::ChangeType changeType = command.get< Object::ChangeType >();
     915          41 :     const uint32_t masterInstanceID = command.get< uint32_t >();
     916             : 
     917             :     // Map success commands are potentially multicasted (see above)
     918             :     // verify that we are the intended receiver
     919          41 :     if( nodeID != _localNode->getNodeID( ))
     920           0 :         return true;
     921             : 
     922          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
     923           0 :                          << " id " << objectID << "." << instanceID
     924          41 :                          << " req " << requestID << std::endl;
     925             : 
     926             :     // set up change manager and attach object to dispatch table
     927             :     Object* object = static_cast< Object* >(
     928          41 :         _localNode->getRequestData( requestID ));
     929          41 :     LBASSERT( object );
     930          41 :     LBASSERT( !object->isMaster( ));
     931             : 
     932             :     object->setupChangeManager( Object::ChangeType( changeType ), false,
     933          41 :                                 _localNode, masterInstanceID );
     934          41 :     _attach( object, objectID, instanceID );
     935          41 :     return true;
     936             : }
     937             : 
     938          41 : bool ObjectStore::_cmdMapReply( ICommand& command )
     939             : {
     940          41 :     LB_TS_THREAD( _receiverThread );
     941             : 
     942             :     // Map reply commands are potentially multicasted (see above)
     943             :     // verify that we are the intended receiver
     944          41 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
     945           0 :         return true;
     946             : 
     947          41 :     const uint128_t& objectID = command.get< uint128_t >();
     948          41 :     const uint128_t& version = command.get< uint128_t >();
     949          41 :     const uint32_t requestID = command.get< uint32_t >();
     950          41 :     const bool result = command.get< bool >();
     951          41 :     const bool releaseCache = command.get< bool >();
     952          41 :     const bool useCache = command.get< bool >();
     953             : 
     954          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
     955          41 :                          << objectID << " req " << requestID << std::endl;
     956             : 
     957          41 :     LBASSERT( _localNode->getRequestData( requestID ));
     958             : 
     959          41 :     if( result )
     960             :     {
     961             :         Object* object = static_cast<Object*>(
     962          41 :             _localNode->getRequestData( requestID ));
     963          41 :         LBASSERT( object );
     964          41 :         LBASSERT( !object->isMaster( ));
     965             : 
     966          41 :         object->setMasterNode( command.getNode( ));
     967             : 
     968          41 :         if( useCache )
     969             :         {
     970           0 :             LBASSERT( releaseCache );
     971           0 :             LBASSERT( _instanceCache );
     972             : 
     973           0 :             const uint128_t& id = objectID;
     974           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     975           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
     976           0 :             LBASSERT( !cached.versions.empty( ));
     977             : 
     978           0 :             object->addInstanceDatas( cached.versions, version );
     979           0 :             LBCHECK( _instanceCache->release( id, 2 ));
     980             :         }
     981          41 :         else if( releaseCache )
     982             :         {
     983           0 :             LBCHECK( _instanceCache->release( objectID, 1 ));
     984             :         }
     985             :     }
     986             :     else
     987             :     {
     988           0 :         if( releaseCache )
     989           0 :             _instanceCache->release( objectID, 1 );
     990             : 
     991           0 :         LBWARN << "Could not map object " << objectID << std::endl;
     992             :     }
     993             : 
     994          41 :     _localNode->serveRequest( requestID, version );
     995          41 :     return true;
     996             : }
     997             : 
     998          32 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
     999             : {
    1000          32 :     LB_TS_THREAD( _commandThread );
    1001          32 :     LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object  " << command << std::endl;
    1002             : 
    1003          32 :     const uint128_t& id = command.get< uint128_t >();
    1004          32 :     const uint32_t requestID = command.get< uint32_t >();
    1005          32 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1006          32 :     const uint32_t slaveInstanceID = command.get< uint32_t >();
    1007             : 
    1008          32 :     NodePtr node = command.getNode();
    1009             : 
    1010             :     {
    1011          32 :         lunchbox::ScopedFastWrite mutex( _objects );
    1012          32 :         ObjectsHash::const_iterator i = _objects->find( id );
    1013          32 :         if( i != _objects->end( ))
    1014             :         {
    1015          32 :             const Objects& objects = i->second;
    1016          32 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1017             :             {
    1018          32 :                 Object* object = *j;
    1019          64 :                 if( object->isMaster() &&
    1020          32 :                     object->getInstanceID() == masterInstanceID )
    1021             :                 {
    1022          32 :                     object->removeSlave( node, slaveInstanceID );
    1023          32 :                     break;
    1024             :                 }
    1025             :             }
    1026          32 :         }
    1027             :     }
    1028             : 
    1029          32 :     node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
    1030          32 :     return true;
    1031             : }
    1032             : 
    1033           2 : bool ObjectStore::_cmdUnmap( ICommand& command )
    1034             : {
    1035           2 :     LB_TS_THREAD( _receiverThread );
    1036           2 :     LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
    1037             : 
    1038           2 :     const uint128_t& objectID = command.get< uint128_t >();
    1039             : 
    1040           2 :     if( _instanceCache )
    1041           2 :         _instanceCache->erase( objectID );
    1042             : 
    1043           2 :     ObjectsHash::iterator i = _objects->find( objectID );
    1044           2 :     if( i == _objects->end( )) // nothing to do
    1045           0 :         return true;
    1046             : 
    1047           2 :     const Objects objects = i->second;
    1048             :     {
    1049           2 :         lunchbox::ScopedFastWrite mutex( _objects );
    1050           2 :         _objects->erase( i );
    1051             :     }
    1052             : 
    1053           4 :     for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
    1054             :     {
    1055           2 :         Object* object = *j;
    1056           2 :         object->detach();
    1057             :     }
    1058             : 
    1059           2 :     return true;
    1060             : }
    1061             : 
    1062           4 : bool ObjectStore::_cmdSync( ICommand& cmd )
    1063             : {
    1064           4 :     LB_TS_THREAD( _commandThread );
    1065           4 :     MasterCMCommand command( cmd );
    1066           4 :     const uint128_t& id = command.getObjectID();
    1067           4 :     LBINFO << command.getNode() << std::endl;
    1068             : 
    1069           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
    1070           0 :                          << command.getInstanceID() << " req "
    1071           4 :                          << command.getRequestID() << std::endl;
    1072             : 
    1073           4 :     const uint32_t cacheInstanceID = command.getMasterInstanceID();
    1074           8 :     ObjectCMPtr cm;
    1075             :     {
    1076           4 :         lunchbox::ScopedFastRead mutex( _objects );
    1077           4 :         ObjectsHash::const_iterator i = _objects->find( id );
    1078           4 :         if( i != _objects->end( ))
    1079             :         {
    1080           4 :             const Objects& objects = i->second;
    1081             : 
    1082           8 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1083             :             {
    1084           4 :                 Object* object = *j;
    1085           4 :                 if( command.getInstanceID() == object->getInstanceID( ))
    1086             :                 {
    1087           0 :                     cm = object->_getChangeManager();
    1088           0 :                     LBASSERT( cm );
    1089           0 :                     break;
    1090             :                 }
    1091             : 
    1092           4 :                 if( command.getInstanceID() != CO_INSTANCE_ALL )
    1093           0 :                     continue;
    1094             : 
    1095           4 :                 cm = object->_getChangeManager();
    1096           4 :                 LBASSERT( cm );
    1097           4 :                 if( cacheInstanceID == object->getInstanceID( ))
    1098           0 :                     break;
    1099             :             }
    1100           4 :             if( !cm )
    1101           0 :                 LBWARN << "Can't find object to sync " << id << "."
    1102           0 :                        << command.getInstanceID() << " in " << objects.size()
    1103           0 :                        << " instances" << std::endl;
    1104             :         }
    1105           4 :         if( !cm )
    1106           0 :             LBWARN << "Can't find object to sync " << id
    1107           0 :                    << ", no object with identifier" << std::endl;
    1108             :     }
    1109           4 :     if( !cm || !cm->sendSync( command ))
    1110             :     {
    1111           0 :         NodePtr node = command.getNode();
    1112             :         node->send( CMD_NODE_SYNC_OBJECT_REPLY )
    1113           0 :             << node->getNodeID() << id << command.getRequestID()
    1114           0 :             << false << command.useCache() << false;
    1115             :     }
    1116           8 :     return true;
    1117             : }
    1118             : 
    1119           4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
    1120             : {
    1121           4 :     LB_TS_THREAD( _receiverThread );
    1122             : 
    1123             :     // Sync reply commands are potentially multicasted (see above)
    1124             :     // verify that we are the intended receiver
    1125           4 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
    1126           0 :         return true;
    1127             : 
    1128           4 :     const NodeID& id = command.get< NodeID >();
    1129           4 :     const uint32_t requestID = command.get< uint32_t >();
    1130           4 :     const bool result = command.get< bool >();
    1131           4 :     const bool releaseCache = command.get< bool >();
    1132           4 :     const bool useCache = command.get< bool >();
    1133           4 :     void* const data = _localNode->getRequestData( requestID );
    1134           4 :     ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
    1135             : 
    1136           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
    1137           4 :                          << requestID << std::endl;
    1138           4 :     if( result )
    1139             :     {
    1140           4 :         if( useCache )
    1141             :         {
    1142           0 :             LBASSERT( releaseCache );
    1143           0 :             LBASSERT( _instanceCache );
    1144             : 
    1145           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
    1146           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
    1147           0 :             LBASSERT( !cached.versions.empty( ));
    1148             : 
    1149           0 :             *is = *cached.versions.back();
    1150           0 :             LBCHECK( _instanceCache->release( id, 2 ));
    1151             :         }
    1152           4 :         else if( releaseCache )
    1153             :         {
    1154           0 :             LBCHECK( _instanceCache->release( id, 1 ));
    1155             :         }
    1156             :     }
    1157             :     else
    1158             :     {
    1159           0 :         if( releaseCache )
    1160           0 :             _instanceCache->release( id, 1 );
    1161             : 
    1162           0 :         LBWARN << "Could not sync object " << id << " request " << requestID
    1163           0 :                << std::endl;
    1164             :     }
    1165             : 
    1166           4 :     _localNode->serveRequest( requestID, result );
    1167           4 :     return true;
    1168             : }
    1169             : 
    1170          54 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
    1171             : {
    1172          54 :     LB_TS_THREAD( _receiverThread );
    1173          54 :     LBASSERT( _localNode );
    1174             : 
    1175          54 :     ObjectDataICommand command( inCommand );
    1176          54 :     const NodeID& nodeID = command.get< NodeID >();
    1177          54 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1178          54 :     const uint32_t cmd = command.getCommand();
    1179             : 
    1180          54 :     LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
    1181          54 :                          << masterInstanceID << " node " << nodeID << std::endl;
    1182             : 
    1183          54 :     command.setType( COMMANDTYPE_OBJECT );
    1184          54 :     command.setCommand( CMD_OBJECT_INSTANCE );
    1185             : 
    1186          54 :     const uint128_t& version = command.getVersion();
    1187          54 :     if( _instanceCache && version.high() == 0 )
    1188             :     {
    1189          54 :         const ObjectVersion rev( command.getObjectID(), version );
    1190             : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
    1191          54 :         if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
    1192             : #endif
    1193          48 :             _instanceCache->add( rev, masterInstanceID, command, 0 );
    1194             :     }
    1195             : 
    1196          54 :     switch( cmd )
    1197             :     {
    1198             :       case CMD_NODE_OBJECT_INSTANCE:
    1199           0 :         LBASSERT( nodeID == 0 );
    1200           0 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1201           0 :         return true;
    1202             : 
    1203             :       case CMD_NODE_OBJECT_INSTANCE_MAP:
    1204          37 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1205           0 :             return true;
    1206             : 
    1207          37 :         LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
    1208          37 :         return dispatchObjectCommand( command );
    1209             : 
    1210             :       case CMD_NODE_OBJECT_INSTANCE_COMMIT:
    1211           3 :         LBASSERT( nodeID == 0 );
    1212           3 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1213           3 :         return dispatchObjectCommand( command );
    1214             : 
    1215             :       case CMD_NODE_OBJECT_INSTANCE_PUSH:
    1216           6 :         LBASSERT( nodeID == 0 );
    1217           6 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1218           6 :         _pushData.addDataCommand( command.getObjectID(), command );
    1219           6 :         return true;
    1220             : 
    1221             :       case CMD_NODE_OBJECT_INSTANCE_SYNC:
    1222             :       {
    1223           8 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1224           0 :             return true;
    1225             : 
    1226           8 :         void* data = _localNode->getRequestData( command.getInstanceID( ));
    1227           8 :         LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
    1228           8 :         LBASSERTINFO( data, this );
    1229             : 
    1230           8 :         ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
    1231           8 :         is->addDataCommand( command );
    1232           8 :         return true;
    1233             :       }
    1234             : 
    1235             :       default:
    1236           0 :         LBUNREACHABLE;
    1237           0 :         return false;
    1238          54 :     }
    1239             : }
    1240             : 
    1241           0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
    1242             : {
    1243           0 :     LB_TS_THREAD( _commandThread );
    1244           0 :     LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
    1245             : 
    1246           0 :     if( --_sendOnRegister == 0 )
    1247             :     {
    1248           0 :         _sendQueue.clear();
    1249             : 
    1250           0 :         Nodes nodes;
    1251           0 :         _localNode->getNodes( nodes, false );
    1252           0 :         for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
    1253             :         {
    1254           0 :             NodePtr node = *i;
    1255           0 :             ConnectionPtr multicast = node->getConnection( true );
    1256           0 :             ConnectionPtr connection = node->getConnection( false );
    1257           0 :             if( multicast )
    1258           0 :                 multicast->finish();
    1259           0 :             if( connection && connection != multicast )
    1260           0 :                 connection->finish();
    1261           0 :         }
    1262             :     }
    1263             : 
    1264           0 :     const uint32_t requestID = command.get< uint32_t >();
    1265           0 :     _localNode->serveRequest( requestID );
    1266           0 :     return true;
    1267             : }
    1268             : 
    1269          40 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
    1270             : {
    1271          40 :     LB_TS_THREAD( _commandThread );
    1272          40 :     LBLOG( LOG_OBJECTS ) << "Cmd object  " << command << std::endl;
    1273             : 
    1274          40 :     Node* node = command.get< Node* >();
    1275          40 :     const uint32_t requestID = command.get< uint32_t >();
    1276             : 
    1277          40 :     lunchbox::ScopedFastWrite mutex( _objects );
    1278          65 :     for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
    1279             :     {
    1280          25 :         const Objects& objects = i->second;
    1281          50 :         for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1282          25 :             (*j)->removeSlaves( node );
    1283             :     }
    1284             : 
    1285          40 :     if( requestID != LB_UNDEFINED_UINT32 )
    1286           7 :         _localNode->serveRequest( requestID );
    1287             :     else
    1288          33 :         node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
    1289             : 
    1290          40 :     return true;
    1291             : }
    1292             : 
    1293           4 : bool ObjectStore::_cmdPush( ICommand& command )
    1294             : {
    1295           4 :     LB_TS_THREAD( _commandThread );
    1296             : 
    1297           4 :     const uint128_t& objectID = command.get< uint128_t >();
    1298           4 :     const uint128_t& groupID = command.get< uint128_t >();
    1299           4 :     const uint128_t& typeID = command.get< uint128_t >();
    1300             : 
    1301           4 :     ObjectDataIStream* is = _pushData.pull( objectID );
    1302             : 
    1303           4 :     _localNode->objectPush( groupID, typeID, objectID, *is );
    1304           4 :     _pushData.recycle( is );
    1305           4 :     return true;
    1306             : }
    1307             : 
    1308           0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
    1309             : {
    1310           0 :     if( !objectStore )
    1311             :     {
    1312           0 :         os << "NULL objectStore";
    1313           0 :         return os;
    1314             :     }
    1315             : 
    1316           0 :     os << "objectStore (" << (void*)objectStore << ")";
    1317             : 
    1318           0 :     return os;
    1319             : }
    1320          60 : }

Generated by: LCOV version 1.10