LCOV - code coverage report
Current view: top level - co - objectStore.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 537 746 72.0 %
Date: 2015-11-03 13:48:53 Functions: 42 48 87.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2010, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "objectStore.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "instanceCache.h"
      28             : #include "log.h"
      29             : #include "masterCMCommand.h"
      30             : #include "nodeCommand.h"
      31             : #include "oCommand.h"
      32             : #include "objectCM.h"
      33             : #include "objectCommand.h"
      34             : #include "objectDataICommand.h"
      35             : #include "objectDataIStream.h"
      36             : 
      37             : #include <lunchbox/futureFunction.h>
      38             : #include <lunchbox/scopedMutex.h>
      39             : 
      40             : #include <boost/bind.hpp>
      41             : 
      42             : #include <limits>
      43             : 
      44             : //#define DEBUG_DISPATCH
      45             : #ifdef DEBUG_DISPATCH
      46             : #  include <set>
      47             : #endif
      48             : 
      49             : namespace co
      50             : {
      51             : typedef CommandFunc< ObjectStore > CmdFunc;
      52             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      53             : 
      54          51 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
      55             :         : _localNode( localNode )
      56             :         , _instanceIDs( -0x7FFFFFFF )
      57             :         , _instanceCache( new InstanceCache( Global::getIAttribute(
      58         102 :                               Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
      59         153 :         , _counters( counters )
      60             : {
      61          51 :     LBASSERT( localNode );
      62          51 :     CommandQueue* queue = localNode->getCommandThreadQueue();
      63             : 
      64             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
      65          51 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
      66             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
      67          51 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
      68             :     localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
      69          51 :         CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
      70             :     localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
      71          51 :         CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
      72             :     localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
      73          51 :         CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
      74             :     localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
      75          51 :         CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
      76             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
      77          51 :         CmdFunc( this, &ObjectStore::_cmdMap ), queue );
      78             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
      79          51 :         CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
      80             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
      81          51 :         CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
      82             :     localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
      83          51 :         CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
      84             :     localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
      85          51 :         CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
      86             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
      87          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      88             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
      89          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      90             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
      91          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      92             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
      93          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      94             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
      95          51 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      96             :     localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
      97          51 :         CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
      98             :     localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
      99          51 :         CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
     100             :     localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
     101          51 :         CmdFunc( this, &ObjectStore::_cmdPush ), queue );
     102             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
     103          51 :         CmdFunc( this, &ObjectStore::_cmdSync ), queue );
     104             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
     105          51 :         CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
     106          51 : }
     107             : 
     108         147 : ObjectStore::~ObjectStore()
     109             : {
     110          49 :     LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
     111             : 
     112             : #ifndef NDEBUG
     113          49 :     if( !_objects->empty( ))
     114             :     {
     115           0 :         LBWARN << _objects->size() << " attached objects in destructor"
     116           0 :                << std::endl;
     117             : 
     118           0 :         for( ObjectsHash::const_iterator i = _objects->begin();
     119           0 :              i != _objects->end(); ++i )
     120             :         {
     121           0 :             const Objects& objects = i->second;
     122           0 :             LBWARN << "  " << objects.size() << " objects with id "
     123           0 :                    << i->first << std::endl;
     124             : 
     125           0 :             for( Objects::const_iterator j = objects.begin();
     126           0 :                  j != objects.end(); ++j )
     127             :             {
     128           0 :                 const Object* object = *j;
     129           0 :                 LBINFO << "    object type " << lunchbox::className( object )
     130           0 :                        << std::endl;
     131             :             }
     132             :         }
     133             :     }
     134             :     //LBASSERT( _objects->empty( ))
     135             : #endif
     136          49 :    clear();
     137          49 :    delete _instanceCache;
     138          49 :    _instanceCache = 0;
     139          98 : }
     140             : 
     141          94 : void ObjectStore::clear( )
     142             : {
     143          94 :     LBASSERT( _objects->empty( ));
     144          94 :     expireInstanceData( 0 );
     145          94 :     LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
     146             : 
     147          94 :     _objects->clear();
     148          94 :     _sendQueue.clear();
     149          94 : }
     150             : 
     151           0 : void ObjectStore::disableInstanceCache()
     152             : {
     153           0 :     LBASSERT( _localNode->isClosed( ));
     154           0 :     delete _instanceCache;
     155           0 :     _instanceCache = 0;
     156           0 : }
     157             : 
     158          94 : void ObjectStore::expireInstanceData( const int64_t age )
     159             : {
     160          94 :     if( _instanceCache )
     161          94 :         _instanceCache->expire( age );
     162          94 : }
     163             : 
     164         110 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
     165             : {
     166         110 :     if( _instanceCache )
     167         110 :         _instanceCache->remove( nodeID );
     168         110 : }
     169             : 
     170           0 : void ObjectStore::enableSendOnRegister()
     171             : {
     172           0 :     ++_sendOnRegister;
     173           0 : }
     174             : 
     175           0 : void ObjectStore::disableSendOnRegister()
     176             : {
     177           0 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     178             :     {
     179             :         lunchbox::Request< void > request =
     180           0 :             _localNode->registerRequest< void >();
     181           0 :         _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
     182             :     }
     183             :     else // OPT
     184           0 :         --_sendOnRegister;
     185           0 : }
     186             : 
     187             : //---------------------------------------------------------------------------
     188             : // identifier master node mapping
     189             : //---------------------------------------------------------------------------
     190          39 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
     191             : {
     192          39 :     LB_TS_NOT_THREAD( _commandThread );
     193             : 
     194             :     // OPT: look up locally first?
     195          39 :     Nodes nodes;
     196          39 :     _localNode->getNodes( nodes );
     197             : 
     198             :     // OPT: send to multiple nodes at once?
     199          39 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     200             :     {
     201          39 :         NodePtr node = *i;
     202             :         lunchbox::Request< NodeID > request =
     203          39 :             _localNode->registerRequest< NodeID >();
     204             : 
     205          78 :         LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
     206         117 :                              << " req " << request.getID() << std::endl;
     207          39 :         node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
     208             : 
     209             :         try
     210             :         {
     211          39 :             const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
     212             : 
     213          39 :             if( masterNodeID != 0 )
     214             :             {
     215          39 :                 LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
     216          39 :                                      << masterNodeID << std::endl;
     217          39 :                 return masterNodeID;
     218             :             }
     219             :         }
     220           0 :         catch( const lunchbox::FutureTimeout& )
     221             :         {
     222           0 :             _localNode->unregisterRequest( request.getID( ));
     223           0 :             throw;
     224             :         }
     225           0 :     }
     226             : 
     227           0 :     return NodeID();
     228             : }
     229             : 
     230             : //---------------------------------------------------------------------------
     231             : // object mapping
     232             : //---------------------------------------------------------------------------
     233          20 : void ObjectStore::attach( Object* object, const uint128_t& id,
     234             :                           const uint32_t instanceID )
     235             : {
     236          20 :     LBASSERT( object );
     237          20 :     LB_TS_NOT_THREAD( _receiverThread );
     238             : 
     239             :     lunchbox::Request< void > request =
     240          20 :         _localNode->registerRequest< void >( object );
     241          20 :     _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
     242          20 : }
     243             : 
     244             : namespace
     245             : {
     246          61 : uint32_t _genNextID( lunchbox::a_int32_t& val )
     247             : {
     248             :     uint32_t result;
     249          61 :     do
     250             :     {
     251          61 :         const long id = ++val;
     252             :         result = static_cast< uint32_t >(
     253          61 :             static_cast< int64_t >( id ) + 0x7FFFFFFFu );
     254             :     }
     255             :     while( result > CO_INSTANCE_MAX );
     256             : 
     257          61 :     return result;
     258             : }
     259             : }
     260             : 
     261          61 : void ObjectStore::_attach( Object* object, const uint128_t& id,
     262             :                            const uint32_t inInstanceID )
     263             : {
     264          61 :     LBASSERT( object );
     265          61 :     LB_TS_THREAD( _receiverThread );
     266             : 
     267          61 :     uint32_t instanceID = inInstanceID;
     268          61 :     if( inInstanceID == CO_INSTANCE_INVALID )
     269          20 :         instanceID = _genNextID( _instanceIDs );
     270             : 
     271          61 :     object->attach( id, instanceID );
     272             : 
     273             :     {
     274          61 :         lunchbox::ScopedFastWrite mutex( _objects );
     275          61 :         Objects& objects = _objects.data[ id ];
     276          61 :         LBASSERTINFO( !object->isMaster() || objects.empty(),
     277             :                       "Attaching master " << *object << ", " <<
     278             :                       objects.size() << " attached objects with same ID, " <<
     279             :                       "first is " << ( objects[0]->isMaster() ? "master " :
     280             :                                        "slave " ) << *objects[0] );
     281          61 :         objects.push_back( object );
     282             :     }
     283             : 
     284          61 :     _localNode->flushCommands(); // redispatch pending commands
     285             : 
     286          61 :     LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
     287          61 :                          << static_cast< void* >( object ) << std::endl;
     288          61 : }
     289             : 
     290          28 : void ObjectStore::detach( Object* object )
     291             : {
     292          28 :     LBASSERT( object );
     293          28 :     LB_TS_NOT_THREAD( _receiverThread );
     294             : 
     295          28 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     296             :     _localNode->send( CMD_NODE_DETACH_OBJECT )
     297          28 :         << object->getID() << object->getInstanceID() << request;
     298          28 : }
     299             : 
     300           0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
     301             : {
     302           0 :     LBASSERT( newObject );
     303           0 :     LBASSERT( oldObject );
     304           0 :     LBASSERT( oldObject->isMaster() );
     305           0 :     LB_TS_THREAD( _receiverThread );
     306             : 
     307           0 :     if( !oldObject->isAttached() )
     308           0 :         return;
     309             : 
     310           0 :     LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
     311           0 :                          << std::endl;
     312           0 :     const uint128_t& id = oldObject->getID();
     313             : 
     314           0 :     lunchbox::ScopedFastWrite mutex( _objects );
     315           0 :     ObjectsHash::iterator i = _objects->find( id );
     316           0 :     LBASSERT( i != _objects->end( ));
     317           0 :     if( i == _objects->end( ))
     318           0 :         return;
     319             : 
     320           0 :     Objects& objects = i->second;
     321           0 :     Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
     322           0 :     LBASSERT( j != objects.end( ));
     323           0 :     if( j == objects.end( ))
     324           0 :         return;
     325             : 
     326           0 :     newObject->transfer( oldObject );
     327           0 :     *j = newObject;
     328             : }
     329             : 
     330          59 : void ObjectStore::_detach( Object* object )
     331             : {
     332             :     // check also _cmdUnmapObject when modifying!
     333          59 :     LBASSERT( object );
     334          59 :     LB_TS_THREAD( _receiverThread );
     335             : 
     336          59 :     if( !object->isAttached() )
     337           0 :         return;
     338             : 
     339          59 :     const uint128_t& id = object->getID();
     340             : 
     341          59 :     LBASSERT( _objects->find( id ) != _objects->end( ));
     342          59 :     LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
     343             : 
     344          59 :     Objects& objects = _objects.data[ id ];
     345          59 :     Objects::iterator i = find( objects.begin(),objects.end(), object );
     346          59 :     LBASSERT( i != objects.end( ));
     347             : 
     348             :     {
     349          59 :         lunchbox::ScopedFastWrite mutex( _objects );
     350          59 :         objects.erase( i );
     351          59 :         if( objects.empty( ))
     352          58 :             _objects->erase( id );
     353             :     }
     354             : 
     355          59 :     LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
     356          59 :     object->detach();
     357          59 :     return;
     358             : }
     359             : 
     360          41 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
     361             :                              const uint128_t& version, NodePtr master )
     362             : {
     363          41 :     LB_TS_NOT_THREAD( _receiverThread );
     364          41 :     LBLOG( LOG_OBJECTS )
     365          41 :         << "Mapping " << lunchbox::className( object ) << " to id " << id
     366         123 :         << " version " << version << std::endl;
     367          41 :     LBASSERT( object );
     368          41 :     LBASSERTINFO( id.isUUID(), id );
     369             : 
     370          41 :     if( !master )
     371          39 :         master = _localNode->connectObjectMaster( id );
     372             : 
     373          41 :     if( !master || !master->isReachable( ))
     374             :     {
     375           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     376           0 :                << std::endl;
     377           0 :         return LB_UNDEFINED_UINT32;
     378             :     }
     379             : 
     380          41 :     if( !object || !id.isUUID( ))
     381             :     {
     382           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     383           0 :         return LB_UNDEFINED_UINT32;
     384             :     }
     385             : 
     386          41 :     const bool isAttached = object->isAttached();
     387          41 :     const bool isMaster = object->isMaster();
     388          41 :     LBASSERT( !isAttached );
     389          41 :     LBASSERT( !isMaster ) ;
     390          41 :     if( isAttached || isMaster )
     391             :     {
     392           0 :         LBWARN << "Invalid object state: attached " << isAttached << " master "
     393           0 :                << isMaster << std::endl;
     394           0 :         return LB_UNDEFINED_UINT32;
     395             :     }
     396             : 
     397          41 :     const uint32_t request = _localNode->registerRequest( object );
     398          41 :     uint128_t minCachedVersion = VERSION_HEAD;
     399          41 :     uint128_t maxCachedVersion = VERSION_NONE;
     400          41 :     uint32_t masterInstanceID = 0;
     401             :     const bool useCache = _checkInstanceCache( id, minCachedVersion,
     402             :                                                maxCachedVersion,
     403          41 :                                                masterInstanceID );
     404          41 :     object->notifyAttach();
     405             :     master->send( CMD_NODE_MAP_OBJECT )
     406          82 :         << version << minCachedVersion << maxCachedVersion << id
     407         123 :         << object->getMaxVersions() << request << _genNextID( _instanceIDs )
     408          41 :         << masterInstanceID << useCache;
     409          41 :     return request;
     410             : }
     411             : 
     412          45 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
     413             :                                        uint128_t& to, uint32_t& instanceID )
     414             : {
     415          45 :     if( !_instanceCache )
     416           0 :         return false;
     417             : 
     418          45 :     const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     419          45 :     if( cached == InstanceCache::Data::NONE )
     420          45 :         return false;
     421             : 
     422           0 :     const ObjectDataIStreamDeque& versions = cached.versions;
     423           0 :     LBASSERT( !cached.versions.empty( ));
     424           0 :     instanceID = cached.masterInstanceID;
     425           0 :     from = versions.front()->getVersion();
     426           0 :     to = versions.back()->getVersion();
     427           0 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
     428           0 :                          << std::endl;
     429           0 :     return true;
     430             : }
     431             : 
     432          41 : bool ObjectStore::mapSync( const uint32_t requestID )
     433             : {
     434          41 :     if( requestID == LB_UNDEFINED_UINT32 )
     435           0 :         return false;
     436             : 
     437          41 :     void* data = _localNode->getRequestData( requestID );
     438          41 :     if( data == 0 )
     439           0 :         return false;
     440             : 
     441          41 :     Object* object = LBSAFECAST( Object*, data );
     442          41 :     uint128_t version = VERSION_NONE;
     443          41 :     _localNode->waitRequest( requestID, version );
     444             : 
     445          41 :     const bool mapped = object->isAttached();
     446          41 :     if( mapped )
     447          41 :         object->applyMapData( version ); // apply initial instance data
     448             : 
     449          41 :     object->notifyAttached();
     450          41 :     LBLOG( LOG_OBJECTS )
     451          82 :         << "Mapped " << lunchbox::className( object ) << std::endl;
     452          41 :     return mapped;
     453             : }
     454             : 
     455             : 
     456           4 : f_bool_t ObjectStore::sync( Object* object, NodePtr master, const uint128_t& id,
     457             :                             const uint32_t instanceID )
     458             : {
     459           4 :     const uint32_t request = _startSync( object, master, id, instanceID );
     460             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
     461           4 :                                                  this, request, object );
     462           4 :     return f_bool_t( new FuturebImpl( func ));
     463             : }
     464             : 
     465           4 : uint32_t ObjectStore::_startSync( Object* object, NodePtr master,
     466             :                                   const uint128_t& id,
     467             :                                   const uint32_t instanceID )
     468             : {
     469           4 :     LB_TS_NOT_THREAD( _receiverThread );
     470           4 :     LBLOG( LOG_OBJECTS )
     471           4 :         << "Syncing " << lunchbox::className( object ) << " with id " << id
     472          12 :         << std::endl;
     473           4 :     LBASSERT( object );
     474           4 :     LBASSERTINFO( id.isUUID(), id );
     475             : 
     476           4 :     if( !object || !id.isUUID( ))
     477             :     {
     478           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     479           0 :         return LB_UNDEFINED_UINT32;
     480             :     }
     481             : 
     482           4 :     if( !master )
     483           0 :         master = _localNode->connectObjectMaster( id );
     484             : 
     485           4 :     if( !master || !master->isReachable( ))
     486             :     {
     487           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     488           0 :                << std::endl;
     489           0 :         return LB_UNDEFINED_UINT32;
     490             :     }
     491             : 
     492             :     const uint32_t request =
     493           4 :         _localNode->registerRequest( new ObjectDataIStream );
     494           4 :     uint128_t minCachedVersion = VERSION_HEAD;
     495           4 :     uint128_t maxCachedVersion = VERSION_NONE;
     496           4 :     uint32_t cacheInstanceID = 0;
     497             : 
     498             :     bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
     499           4 :                                          cacheInstanceID );
     500           4 :     if( useCache )
     501             :     {
     502           0 :         switch( instanceID )
     503             :         {
     504             :         case CO_INSTANCE_ALL:
     505           0 :             break;
     506             :         default:
     507           0 :             if( instanceID == cacheInstanceID )
     508           0 :                 break;
     509             : 
     510           0 :             useCache = false;
     511           0 :             LBCHECK( _instanceCache->release( id, 1 ));
     512           0 :             break;
     513             :         }
     514             :     }
     515             : 
     516             :     // Use stream expected by MasterCMCommand
     517             :     master->send( CMD_NODE_SYNC_OBJECT )
     518           8 :         << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
     519          12 :         << uint64_t(0) /* maxVersions */ << request << instanceID
     520           4 :         << cacheInstanceID << useCache;
     521           4 :     return request;
     522             : }
     523             : 
     524           4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
     525             : {
     526           4 :     if( requestID == LB_UNDEFINED_UINT32 )
     527           0 :         return false;
     528             : 
     529           4 :     void* data = _localNode->getRequestData( requestID );
     530           4 :     if( data == 0 )
     531           0 :         return false;
     532             : 
     533           4 :     ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
     534             : 
     535           4 :     bool ok = false;
     536           4 :     _localNode->waitRequest( requestID, ok );
     537             : 
     538           4 :     if( !ok )
     539             :     {
     540           0 :         LBWARN << "Object synchronization failed" << std::endl;
     541           0 :         delete is;
     542           0 :         return false;
     543             :     }
     544             : 
     545           4 :     is->waitReady();
     546           4 :     object->applyInstanceData( *is );
     547           8 :     LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
     548          12 :                          << std::endl;
     549           4 :     delete is;
     550           4 :     return true;
     551             : }
     552             : 
     553          41 : void ObjectStore::unmap( Object* object )
     554             : {
     555          41 :     LBASSERT( object );
     556          41 :     if( !object->isAttached( )) // not registered
     557          34 :         return;
     558             : 
     559          40 :     const uint128_t& id = object->getID();
     560             : 
     561          40 :     LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
     562             : 
     563          40 :     object->notifyDetach();
     564             : 
     565             :     // send unsubscribe to master, master will send detach command.
     566          40 :     LBASSERT( !object->isMaster( ));
     567          40 :     LB_TS_NOT_THREAD( _commandThread );
     568             : 
     569          40 :     const uint32_t masterInstanceID = object->getMasterInstanceID();
     570          40 :     if( masterInstanceID != CO_INSTANCE_INVALID )
     571             :     {
     572          32 :         NodePtr master = object->getMasterNode();
     573          32 :         LBASSERT( master )
     574             : 
     575          32 :         if( master && master->isReachable( ))
     576             :         {
     577             :             lunchbox::Request< void > request =
     578          32 :                 _localNode->registerRequest< void >();
     579             :             master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
     580          32 :                 << id << request << masterInstanceID << object->getInstanceID();
     581          32 :             request.wait();
     582          32 :             object->notifyDetached();
     583          32 :             return;
     584             :         }
     585           0 :         LBERROR << "Master node for object id " << id << " not connected"
     586           0 :                 << std::endl;
     587             :     }
     588             : 
     589             :     // no unsubscribe sent: Detach directly
     590           8 :     detach( object );
     591           8 :     object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
     592           8 :     object->notifyDetached();
     593             : }
     594             : 
     595          20 : bool ObjectStore::register_( Object* object )
     596             : {
     597          20 :     LBASSERT( object );
     598          20 :     LBASSERT( !object->isAttached( ));
     599             : 
     600          20 :     const uint128_t& id = object->getID( );
     601          20 :     LBASSERTINFO( id.isUUID(), id );
     602             : 
     603          20 :     object->notifyAttach();
     604          20 :     object->setupChangeManager( object->getChangeType(), true, _localNode,
     605          40 :                                 CO_INSTANCE_INVALID );
     606          20 :     attach( object, id, CO_INSTANCE_INVALID );
     607             : 
     608          20 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     609          20 :         _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
     610             : 
     611          20 :     object->notifyAttached();
     612             : 
     613          20 :     LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
     614          20 :     return true;
     615             : }
     616             : 
     617          20 : void ObjectStore::deregister( Object* object )
     618             : {
     619          20 :     LBASSERT( object );
     620          20 :     if( !object->isAttached( )) // not registered
     621          20 :         return;
     622             : 
     623          20 :     LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
     624          20 :     LBASSERT( object->isMaster( ));
     625             : 
     626          20 :     object->notifyDetach();
     627             : 
     628          20 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0  )
     629             :     {
     630             :         // remove from send queue
     631             :         lunchbox::Request< void > request =
     632          20 :             _localNode->registerRequest< void >();
     633          20 :         _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
     634             :     }
     635             : 
     636          20 :     const uint128_t id = object->getID();
     637          20 :     detach( object );
     638          20 :     object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
     639          20 :     if( _instanceCache )
     640          20 :         _instanceCache->erase( id );
     641          20 :     object->notifyDetached();
     642             : }
     643             : 
     644       51172 : bool ObjectStore::notifyCommandThreadIdle()
     645             : {
     646       51172 :     LB_TS_THREAD( _commandThread );
     647       51173 :     if( _sendQueue.empty( ))
     648       51172 :         return false;
     649             : 
     650           0 :     LBASSERT( _sendOnRegister > 0 );
     651           0 :     SendQueueItem& item = _sendQueue.front();
     652             : 
     653           0 :     if( item.age > _localNode->getTime64( ))
     654             :     {
     655           0 :         Nodes nodes;
     656           0 :         _localNode->getNodes( nodes, false );
     657           0 :         if( nodes.empty( ))
     658             :         {
     659           0 :             lunchbox::Thread::yield();
     660           0 :             return !_sendQueue.empty();
     661             :         }
     662             : 
     663           0 :         item.object->sendInstanceData( nodes );
     664             :     }
     665           0 :     _sendQueue.pop_front();
     666           0 :     return !_sendQueue.empty();
     667             : }
     668             : 
     669           7 : void ObjectStore::removeNode( NodePtr node )
     670             : {
     671           7 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     672           7 :     _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
     673           7 : }
     674             : 
     675             : //===========================================================================
     676             : // ICommand handling
     677             : //===========================================================================
     678         548 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
     679             : {
     680         548 :     LB_TS_THREAD( _receiverThread );
     681         548 :     ObjectICommand command( cmd );
     682         548 :     const uint128_t& id = command.getObjectID();
     683         548 :     const uint32_t instanceID = command.getInstanceID();
     684             : 
     685         548 :     ObjectsHash::const_iterator i = _objects->find( id );
     686             : 
     687         548 :     if( i == _objects->end( ))
     688             :         // When the instance ID is set to none, we only care about the command
     689             :         // when we have an object of the given ID (multicast)
     690           0 :         return ( instanceID == CO_INSTANCE_NONE );
     691             : 
     692         548 :     const Objects& objects = i->second;
     693         548 :     LBASSERTINFO( !objects.empty(), command );
     694             : 
     695         548 :     if( instanceID <= CO_INSTANCE_MAX )
     696             :     {
     697          59 :         for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
     698             :         {
     699          59 :             Object* object = *j;
     700          59 :             if( instanceID == object->getInstanceID( ))
     701             :             {
     702          53 :                 LBCHECK( object->dispatchCommand( command ));
     703          53 :                 return true;
     704             :             }
     705             :         }
     706           0 :         LBERROR << "Can't find object instance " << instanceID << " for "
     707           0 :                 << command << std::endl;
     708           0 :         LBUNREACHABLE;
     709           0 :         return false;
     710             :     }
     711             : 
     712         495 :     Objects::const_iterator j = objects.begin();
     713         495 :     Object* object = *j;
     714         495 :     LBCHECK( object->dispatchCommand( command ));
     715             : 
     716         495 :     for( ++j; j != objects.end(); ++j )
     717             :     {
     718           0 :         object = *j;
     719           0 :         LBCHECK( object->dispatchCommand( command ));
     720             :     }
     721         495 :     return true;
     722             : }
     723             : 
     724          39 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
     725             : {
     726          39 :     LB_TS_THREAD( _commandThread );
     727             : 
     728          39 :     const uint128_t& id = command.get< uint128_t >();
     729          39 :     const uint32_t requestID = command.get< uint32_t >();
     730          39 :     LBASSERT( id.isUUID( ));
     731             : 
     732          39 :     NodeID masterNodeID;
     733             :     {
     734          39 :         lunchbox::ScopedFastRead mutex( _objects );
     735          39 :         ObjectsHashCIter i = _objects->find( id );
     736             : 
     737          39 :         if( i != _objects->end( ))
     738             :         {
     739          39 :             const Objects& objects = i->second;
     740          39 :             LBASSERT( !objects.empty( ));
     741             : 
     742          39 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     743             :             {
     744          39 :                 Object* object = *j;
     745          39 :                 if( object->isMaster( ))
     746          39 :                     masterNodeID = _localNode->getNodeID();
     747             :                 else
     748             :                 {
     749           0 :                     NodePtr master = object->getMasterNode();
     750           0 :                     if( master.isValid( ))
     751           0 :                         masterNodeID = master->getNodeID();
     752             :                 }
     753          39 :                 if( masterNodeID != 0 )
     754          39 :                     break;
     755             :             }
     756          39 :         }
     757             :     }
     758             : 
     759          39 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
     760          39 :                          << " req " << requestID << std::endl;
     761             :     command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
     762          39 :             << masterNodeID << requestID;
     763          39 :     return true;
     764             : }
     765             : 
     766          39 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
     767             : {
     768          39 :     const NodeID& masterNodeID = command.get< NodeID >();
     769          39 :     const uint32_t requestID = command.get< uint32_t >();
     770          39 :     _localNode->serveRequest( requestID, masterNodeID );
     771          39 :     return true;
     772             : }
     773             : 
     774          20 : bool ObjectStore::_cmdAttach( ICommand& command )
     775             : {
     776          20 :     LB_TS_THREAD( _receiverThread );
     777          20 :     LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
     778             : 
     779          20 :     const uint128_t& objectID = command.get< uint128_t >();
     780          20 :     const uint32_t instanceID = command.get< uint32_t >();
     781          20 :     const uint32_t requestID = command.get< uint32_t >();
     782             : 
     783             :     Object* object = static_cast< Object* >( _localNode->getRequestData(
     784          20 :                                                  requestID ));
     785          20 :     _attach( object, objectID, instanceID );
     786          20 :     _localNode->serveRequest( requestID );
     787          20 :     return true;
     788             : }
     789             : 
     790          60 : bool ObjectStore::_cmdDetach( ICommand& command )
     791             : {
     792          60 :     LB_TS_THREAD( _receiverThread );
     793          60 :     LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
     794             : 
     795          60 :     const uint128_t& objectID = command.get< uint128_t >();
     796          60 :     const uint32_t instanceID = command.get< uint32_t >();
     797          60 :     const uint32_t requestID = command.get< uint32_t >();
     798             : 
     799          60 :     ObjectsHash::const_iterator i = _objects->find( objectID );
     800          60 :     if( i != _objects->end( ))
     801             :     {
     802          59 :         const Objects& objects = i->second;
     803             : 
     804         180 :         for( Objects::const_iterator j = objects.begin();
     805         120 :              j != objects.end(); ++j )
     806             :         {
     807          60 :             Object* object = *j;
     808          60 :             if( object->getInstanceID() == instanceID )
     809             :             {
     810          59 :                 _detach( object );
     811          59 :                 break;
     812             :             }
     813             :         }
     814             :     }
     815             : 
     816          60 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
     817          60 :     _localNode->serveRequest( requestID );
     818          60 :     return true;
     819             : }
     820             : 
     821          20 : bool ObjectStore::_cmdRegister( ICommand& command )
     822             : {
     823          20 :     LB_TS_THREAD( _commandThread );
     824          20 :     if( _sendOnRegister <= 0 )
     825          20 :         return true;
     826             : 
     827           0 :     LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
     828             : 
     829           0 :     Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
     830             : 
     831             :     const int32_t age = Global::getIAttribute(
     832           0 :                             Global::IATTR_NODE_SEND_QUEUE_AGE );
     833           0 :     SendQueueItem item;
     834           0 :     item.age = age ? age + _localNode->getTime64() :
     835           0 :                      std::numeric_limits< int64_t >::max();
     836           0 :     item.object = object;
     837           0 :     _sendQueue.push_back( item );
     838             : 
     839             :     const uint32_t size = Global::getIAttribute(
     840           0 :                              Global::IATTR_NODE_SEND_QUEUE_SIZE );
     841           0 :     while( _sendQueue.size() > size )
     842           0 :         _sendQueue.pop_front();
     843             : 
     844           0 :     return true;
     845             : }
     846             : 
     847          20 : bool ObjectStore::_cmdDeregister( ICommand& command )
     848             : {
     849          20 :     LB_TS_THREAD( _commandThread );
     850          20 :     LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
     851             : 
     852          20 :     const uint32_t requestID = command.get< uint32_t >();
     853             : 
     854          20 :     const void* object = _localNode->getRequestData( requestID );
     855             : 
     856          20 :     for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
     857             :     {
     858           0 :         if( i->object == object )
     859             :         {
     860           0 :             _sendQueue.erase( i );
     861           0 :             break;
     862             :         }
     863             :     }
     864             : 
     865          20 :     _localNode->serveRequest( requestID );
     866          20 :     return true;
     867             : }
     868             : 
     869          41 : bool ObjectStore::_cmdMap( ICommand& cmd )
     870             : {
     871          41 :     LB_TS_THREAD( _commandThread );
     872             : 
     873          41 :     MasterCMCommand command( cmd );
     874          41 :     const uint128_t& id = command.getObjectID();
     875             : 
     876          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
     877           0 :                          << command.getInstanceID() << " req "
     878          41 :                          << command.getRequestID() << std::endl;
     879             : 
     880          82 :     ObjectCMPtr masterCM;
     881             :     {
     882          41 :         lunchbox::ScopedFastRead mutex( _objects );
     883          41 :         ObjectsHash::const_iterator i = _objects->find( id );
     884          41 :         if( i != _objects->end( ))
     885             :         {
     886          41 :             const Objects& objects = i->second;
     887             : 
     888          41 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     889             :             {
     890          41 :                 Object* object = *j;
     891          41 :                 if( object->isMaster( ))
     892             :                 {
     893          41 :                     masterCM = object->_getChangeManager();
     894          41 :                     break;
     895             :                 }
     896             :             }
     897          41 :         }
     898             :     }
     899             : 
     900          41 :     if( !masterCM || !masterCM->addSlave( command ))
     901             :     {
     902           0 :         LBWARN << "Can't find master object to map " << id << std::endl;
     903           0 :         NodePtr node = command.getNode();
     904             :         node->send( CMD_NODE_MAP_OBJECT_REPLY )
     905           0 :             << node->getNodeID() << id << command.getRequestedVersion()
     906           0 :             << command.getRequestID() << false << command.useCache() << false;
     907             :     }
     908             : 
     909          41 :     ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
     910          82 :     return true;
     911             : }
     912             : 
     913          41 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
     914             : {
     915          41 :     LB_TS_THREAD( _receiverThread );
     916             : 
     917          41 :     const uint128_t& nodeID = command.get< uint128_t >();
     918          41 :     const uint128_t& objectID = command.get< uint128_t >();
     919          41 :     const uint32_t requestID = command.get< uint32_t >();
     920          41 :     const uint32_t instanceID = command.get< uint32_t >();
     921          41 :     const Object::ChangeType changeType = command.get< Object::ChangeType >();
     922          41 :     const uint32_t masterInstanceID = command.get< uint32_t >();
     923             : 
     924             :     // Map success commands are potentially multicasted (see above)
     925             :     // verify that we are the intended receiver
     926          41 :     if( nodeID != _localNode->getNodeID( ))
     927           0 :         return true;
     928             : 
     929          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
     930           0 :                          << " id " << objectID << "." << instanceID
     931          41 :                          << " req " << requestID << std::endl;
     932             : 
     933             :     // set up change manager and attach object to dispatch table
     934             :     Object* object = static_cast< Object* >(
     935          41 :         _localNode->getRequestData( requestID ));
     936          41 :     LBASSERT( object );
     937          41 :     LBASSERT( !object->isMaster( ));
     938             : 
     939             :     object->setupChangeManager( Object::ChangeType( changeType ), false,
     940          41 :                                 _localNode, masterInstanceID );
     941          41 :     _attach( object, objectID, instanceID );
     942          41 :     return true;
     943             : }
     944             : 
     945          41 : bool ObjectStore::_cmdMapReply( ICommand& command )
     946             : {
     947          41 :     LB_TS_THREAD( _receiverThread );
     948             : 
     949             :     // Map reply commands are potentially multicasted (see above)
     950             :     // verify that we are the intended receiver
     951          41 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
     952           0 :         return true;
     953             : 
     954          41 :     const uint128_t& objectID = command.get< uint128_t >();
     955          41 :     const uint128_t& version = command.get< uint128_t >();
     956          41 :     const uint32_t requestID = command.get< uint32_t >();
     957          41 :     const bool result = command.get< bool >();
     958          41 :     const bool releaseCache = command.get< bool >();
     959          41 :     const bool useCache = command.get< bool >();
     960             : 
     961          41 :     LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
     962          41 :                          << objectID << " req " << requestID << std::endl;
     963             : 
     964          41 :     LBASSERT( _localNode->getRequestData( requestID ));
     965             : 
     966          41 :     if( result )
     967             :     {
     968             :         Object* object = static_cast<Object*>(
     969          41 :             _localNode->getRequestData( requestID ));
     970          41 :         LBASSERT( object );
     971          41 :         LBASSERT( !object->isMaster( ));
     972             : 
     973          41 :         object->setMasterNode( command.getNode( ));
     974             : 
     975          41 :         if( useCache )
     976             :         {
     977           0 :             LBASSERT( releaseCache );
     978           0 :             LBASSERT( _instanceCache );
     979             : 
     980           0 :             const uint128_t& id = objectID;
     981           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     982           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
     983           0 :             LBASSERT( !cached.versions.empty( ));
     984             : 
     985           0 :             object->addInstanceDatas( cached.versions, version );
     986           0 :             LBCHECK( _instanceCache->release( id, 2 ));
     987             :         }
     988          41 :         else if( releaseCache )
     989             :         {
     990           0 :             LBCHECK( _instanceCache->release( objectID, 1 ));
     991             :         }
     992             :     }
     993             :     else
     994             :     {
     995           0 :         if( releaseCache )
     996           0 :             _instanceCache->release( objectID, 1 );
     997             : 
     998           0 :         LBWARN << "Could not map object " << objectID << std::endl;
     999             :     }
    1000             : 
    1001          41 :     _localNode->serveRequest( requestID, version );
    1002          41 :     return true;
    1003             : }
    1004             : 
    1005          32 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
    1006             : {
    1007          32 :     LB_TS_THREAD( _commandThread );
    1008          32 :     LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object  " << command << std::endl;
    1009             : 
    1010          32 :     const uint128_t& id = command.get< uint128_t >();
    1011          32 :     const uint32_t requestID = command.get< uint32_t >();
    1012          32 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1013          32 :     const uint32_t slaveInstanceID = command.get< uint32_t >();
    1014             : 
    1015          32 :     NodePtr node = command.getNode();
    1016             : 
    1017             :     {
    1018          32 :         lunchbox::ScopedFastWrite mutex( _objects );
    1019          32 :         ObjectsHash::const_iterator i = _objects->find( id );
    1020          32 :         if( i != _objects->end( ))
    1021             :         {
    1022          32 :             const Objects& objects = i->second;
    1023          32 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1024             :             {
    1025          32 :                 Object* object = *j;
    1026          64 :                 if( object->isMaster() &&
    1027          32 :                     object->getInstanceID() == masterInstanceID )
    1028             :                 {
    1029          32 :                     object->removeSlave( node, slaveInstanceID );
    1030          32 :                     break;
    1031             :                 }
    1032             :             }
    1033          32 :         }
    1034             :     }
    1035             : 
    1036          32 :     node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
    1037          32 :     return true;
    1038             : }
    1039             : 
    1040           2 : bool ObjectStore::_cmdUnmap( ICommand& command )
    1041             : {
    1042           2 :     LB_TS_THREAD( _receiverThread );
    1043           2 :     LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
    1044             : 
    1045           2 :     const uint128_t& objectID = command.get< uint128_t >();
    1046             : 
    1047           2 :     if( _instanceCache )
    1048           2 :         _instanceCache->erase( objectID );
    1049             : 
    1050           2 :     ObjectsHash::iterator i = _objects->find( objectID );
    1051           2 :     if( i == _objects->end( )) // nothing to do
    1052           0 :         return true;
    1053             : 
    1054           2 :     const Objects objects = i->second;
    1055             :     {
    1056           2 :         lunchbox::ScopedFastWrite mutex( _objects );
    1057           2 :         _objects->erase( i );
    1058             :     }
    1059             : 
    1060           4 :     for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
    1061             :     {
    1062           2 :         Object* object = *j;
    1063           2 :         object->detach();
    1064             :     }
    1065             : 
    1066           2 :     return true;
    1067             : }
    1068             : 
    1069           4 : bool ObjectStore::_cmdSync( ICommand& cmd )
    1070             : {
    1071           4 :     LB_TS_THREAD( _commandThread );
    1072           4 :     MasterCMCommand command( cmd );
    1073           4 :     const uint128_t& id = command.getObjectID();
    1074           4 :     LBINFO << command.getNode() << std::endl;
    1075             : 
    1076           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
    1077           0 :                          << command.getInstanceID() << " req "
    1078           4 :                          << command.getRequestID() << std::endl;
    1079             : 
    1080           4 :     const uint32_t cacheInstanceID = command.getMasterInstanceID();
    1081           8 :     ObjectCMPtr cm;
    1082             :     {
    1083           4 :         lunchbox::ScopedFastRead mutex( _objects );
    1084           4 :         ObjectsHash::const_iterator i = _objects->find( id );
    1085           4 :         if( i != _objects->end( ))
    1086             :         {
    1087           4 :             const Objects& objects = i->second;
    1088             : 
    1089           8 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1090             :             {
    1091           4 :                 Object* object = *j;
    1092           4 :                 if( command.getInstanceID() == object->getInstanceID( ))
    1093             :                 {
    1094           0 :                     cm = object->_getChangeManager();
    1095           0 :                     LBASSERT( cm );
    1096           0 :                     break;
    1097             :                 }
    1098             : 
    1099           4 :                 if( command.getInstanceID() != CO_INSTANCE_ALL )
    1100           0 :                     continue;
    1101             : 
    1102           4 :                 cm = object->_getChangeManager();
    1103           4 :                 LBASSERT( cm );
    1104           4 :                 if( cacheInstanceID == object->getInstanceID( ))
    1105           0 :                     break;
    1106             :             }
    1107           4 :             if( !cm )
    1108           0 :                 LBWARN << "Can't find object to sync " << id << "."
    1109           0 :                        << command.getInstanceID() << " in " << objects.size()
    1110           0 :                        << " instances" << std::endl;
    1111             :         }
    1112           4 :         if( !cm )
    1113           0 :             LBWARN << "Can't find object to sync " << id
    1114           0 :                    << ", no object with identifier" << std::endl;
    1115             :     }
    1116           4 :     if( !cm || !cm->sendSync( command ))
    1117             :     {
    1118           0 :         NodePtr node = command.getNode();
    1119             :         node->send( CMD_NODE_SYNC_OBJECT_REPLY )
    1120           0 :             << node->getNodeID() << id << command.getRequestID()
    1121           0 :             << false << command.useCache() << false;
    1122             :     }
    1123           8 :     return true;
    1124             : }
    1125             : 
    1126           4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
    1127             : {
    1128           4 :     LB_TS_THREAD( _receiverThread );
    1129             : 
    1130             :     // Sync reply commands are potentially multicasted (see above)
    1131             :     // verify that we are the intended receiver
    1132           4 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
    1133           0 :         return true;
    1134             : 
    1135           4 :     const NodeID& id = command.get< NodeID >();
    1136           4 :     const uint32_t requestID = command.get< uint32_t >();
    1137           4 :     const bool result = command.get< bool >();
    1138           4 :     const bool releaseCache = command.get< bool >();
    1139           4 :     const bool useCache = command.get< bool >();
    1140           4 :     void* const data = _localNode->getRequestData( requestID );
    1141           4 :     ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
    1142             : 
    1143           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
    1144           4 :                          << requestID << std::endl;
    1145           4 :     if( result )
    1146             :     {
    1147           4 :         if( useCache )
    1148             :         {
    1149           0 :             LBASSERT( releaseCache );
    1150           0 :             LBASSERT( _instanceCache );
    1151             : 
    1152           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
    1153           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
    1154           0 :             LBASSERT( !cached.versions.empty( ));
    1155             : 
    1156           0 :             *is = *cached.versions.back();
    1157           0 :             LBCHECK( _instanceCache->release( id, 2 ));
    1158             :         }
    1159           4 :         else if( releaseCache )
    1160             :         {
    1161           0 :             LBCHECK( _instanceCache->release( id, 1 ));
    1162             :         }
    1163             :     }
    1164             :     else
    1165             :     {
    1166           0 :         if( releaseCache )
    1167           0 :             _instanceCache->release( id, 1 );
    1168             : 
    1169           0 :         LBWARN << "Could not sync object " << id << " request " << requestID
    1170           0 :                << std::endl;
    1171             :     }
    1172             : 
    1173           4 :     _localNode->serveRequest( requestID, result );
    1174           4 :     return true;
    1175             : }
    1176             : 
    1177          54 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
    1178             : {
    1179          54 :     LB_TS_THREAD( _receiverThread );
    1180          54 :     LBASSERT( _localNode );
    1181             : 
    1182          54 :     ObjectDataICommand command( inCommand );
    1183          54 :     const NodeID& nodeID = command.get< NodeID >();
    1184          54 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1185          54 :     const uint32_t cmd = command.getCommand();
    1186             : 
    1187          54 :     LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
    1188          54 :                          << masterInstanceID << " node " << nodeID << std::endl;
    1189             : 
    1190          54 :     command.setType( COMMANDTYPE_OBJECT );
    1191          54 :     command.setCommand( CMD_OBJECT_INSTANCE );
    1192             : 
    1193          54 :     const uint128_t& version = command.getVersion();
    1194          54 :     if( _instanceCache && version.high() == 0 )
    1195             :     {
    1196          54 :         const ObjectVersion rev( command.getObjectID(), version );
    1197             : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
    1198          54 :         if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
    1199             : #endif
    1200          48 :             _instanceCache->add( rev, masterInstanceID, command, 0 );
    1201             :     }
    1202             : 
    1203          54 :     switch( cmd )
    1204             :     {
    1205             :       case CMD_NODE_OBJECT_INSTANCE:
    1206           0 :         LBASSERT( nodeID == 0 );
    1207           0 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1208           0 :         return true;
    1209             : 
    1210             :       case CMD_NODE_OBJECT_INSTANCE_MAP:
    1211          37 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1212           0 :             return true;
    1213             : 
    1214          37 :         LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
    1215          37 :         return dispatchObjectCommand( command );
    1216             : 
    1217             :       case CMD_NODE_OBJECT_INSTANCE_COMMIT:
    1218           3 :         LBASSERT( nodeID == 0 );
    1219           3 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1220           3 :         return dispatchObjectCommand( command );
    1221             : 
    1222             :       case CMD_NODE_OBJECT_INSTANCE_PUSH:
    1223           6 :         LBASSERT( nodeID == 0 );
    1224           6 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1225           6 :         _pushData.addDataCommand( command.getObjectID(), command );
    1226           6 :         return true;
    1227             : 
    1228             :       case CMD_NODE_OBJECT_INSTANCE_SYNC:
    1229             :       {
    1230           8 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1231           0 :             return true;
    1232             : 
    1233           8 :         void* data = _localNode->getRequestData( command.getInstanceID( ));
    1234           8 :         LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
    1235           8 :         LBASSERTINFO( data, this );
    1236             : 
    1237           8 :         ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
    1238           8 :         is->addDataCommand( command );
    1239           8 :         return true;
    1240             :       }
    1241             : 
    1242             :       default:
    1243           0 :         LBUNREACHABLE;
    1244           0 :         return false;
    1245          54 :     }
    1246             : }
    1247             : 
    1248           0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
    1249             : {
    1250           0 :     LB_TS_THREAD( _commandThread );
    1251           0 :     LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
    1252             : 
    1253           0 :     if( --_sendOnRegister == 0 )
    1254             :     {
    1255           0 :         _sendQueue.clear();
    1256             : 
    1257           0 :         Nodes nodes;
    1258           0 :         _localNode->getNodes( nodes, false );
    1259           0 :         for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
    1260             :         {
    1261           0 :             NodePtr node = *i;
    1262           0 :             ConnectionPtr multicast = node->getConnection( true );
    1263           0 :             ConnectionPtr connection = node->getConnection( false );
    1264           0 :             if( multicast )
    1265           0 :                 multicast->finish();
    1266           0 :             if( connection && connection != multicast )
    1267           0 :                 connection->finish();
    1268           0 :         }
    1269             :     }
    1270             : 
    1271           0 :     const uint32_t requestID = command.get< uint32_t >();
    1272           0 :     _localNode->serveRequest( requestID );
    1273           0 :     return true;
    1274             : }
    1275             : 
    1276          40 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
    1277             : {
    1278          40 :     LB_TS_THREAD( _commandThread );
    1279          40 :     LBLOG( LOG_OBJECTS ) << "Cmd object  " << command << std::endl;
    1280             : 
    1281          40 :     Node* node = command.get< Node* >();
    1282          40 :     const uint32_t requestID = command.get< uint32_t >();
    1283             : 
    1284          40 :     lunchbox::ScopedFastWrite mutex( _objects );
    1285          65 :     for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
    1286             :     {
    1287          25 :         const Objects& objects = i->second;
    1288          50 :         for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1289          25 :             (*j)->removeSlaves( node );
    1290             :     }
    1291             : 
    1292          40 :     if( requestID != LB_UNDEFINED_UINT32 )
    1293           7 :         _localNode->serveRequest( requestID );
    1294             :     else
    1295          33 :         node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
    1296             : 
    1297          40 :     return true;
    1298             : }
    1299             : 
    1300           4 : bool ObjectStore::_cmdPush( ICommand& command )
    1301             : {
    1302           4 :     LB_TS_THREAD( _commandThread );
    1303             : 
    1304           4 :     const uint128_t& objectID = command.get< uint128_t >();
    1305           4 :     const uint128_t& groupID = command.get< uint128_t >();
    1306           4 :     const uint128_t& typeID = command.get< uint128_t >();
    1307             : 
    1308           4 :     ObjectDataIStream* is = _pushData.pull( objectID );
    1309             : 
    1310           4 :     _localNode->objectPush( groupID, typeID, objectID, *is );
    1311           4 :     _pushData.recycle( is );
    1312           4 :     return true;
    1313             : }
    1314             : 
    1315           0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
    1316             : {
    1317           0 :     if( !objectStore )
    1318             :     {
    1319           0 :         os << "NULL objectStore";
    1320           0 :         return os;
    1321             :     }
    1322             : 
    1323           0 :     os << "objectStore (" << (void*)objectStore << ")";
    1324             : 
    1325           0 :     return os;
    1326             : }
    1327          63 : }

Generated by: LCOV version 1.11