LCOV - code coverage report
Current view: top level - co - objectStore.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 540 748 72.2 %
Date: 2016-12-14 01:26:48 Functions: 42 48 87.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "objectStore.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "instanceCache.h"
      28             : #include "log.h"
      29             : #include "masterCMCommand.h"
      30             : #include "nodeCommand.h"
      31             : #include "oCommand.h"
      32             : #include "objectCM.h"
      33             : #include "objectCommand.h"
      34             : #include "objectDataICommand.h"
      35             : #include "objectDataIStream.h"
      36             : 
      37             : #include <lunchbox/futureFunction.h>
      38             : #include <lunchbox/scopedMutex.h>
      39             : 
      40             : #include <boost/bind.hpp>
      41             : 
      42             : #include <limits>
      43             : 
      44             : //#define DEBUG_DISPATCH
      45             : #ifdef DEBUG_DISPATCH
      46             : #  include <set>
      47             : #endif
      48             : 
      49             : namespace co
      50             : {
      51             : typedef CommandFunc< ObjectStore > CmdFunc;
      52             : typedef lunchbox::FutureFunction< bool > FuturebImpl;
      53             : 
      54          55 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
      55             :         : _localNode( localNode )
      56             :         , _instanceIDs( -0x7FFFFFFF )
      57          55 :         , _instanceCache( new InstanceCache( Global::getIAttribute(
      58          55 :                               Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
      59         110 :         , _counters( counters )
      60             : {
      61          55 :     LBASSERT( localNode );
      62          55 :     CommandQueue* queue = localNode->getCommandThreadQueue();
      63             : 
      64             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
      65          55 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
      66             :     localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
      67          55 :         CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
      68             :     localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
      69          55 :         CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
      70             :     localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
      71          55 :         CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
      72             :     localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
      73          55 :         CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
      74             :     localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
      75          55 :         CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
      76             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
      77          55 :         CmdFunc( this, &ObjectStore::_cmdMap ), queue );
      78             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
      79          55 :         CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
      80             :     localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
      81          55 :         CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
      82             :     localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
      83          55 :         CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
      84             :     localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
      85          55 :         CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
      86             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
      87          55 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      88             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
      89          55 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      90             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
      91          55 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      92             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
      93          55 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      94             :     localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
      95          55 :         CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
      96             :     localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
      97          55 :         CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
      98             :     localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
      99          55 :         CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
     100             :     localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
     101          55 :         CmdFunc( this, &ObjectStore::_cmdPush ), queue );
     102             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
     103          55 :         CmdFunc( this, &ObjectStore::_cmdSync ), queue );
     104             :     localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
     105          55 :         CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
     106          55 : }
     107             : 
     108         159 : ObjectStore::~ObjectStore()
     109             : {
     110          53 :     LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
     111             : 
     112             : #ifndef NDEBUG
     113          53 :     if( !_objects->empty( ))
     114             :     {
     115           0 :         LBWARN << _objects->size() << " attached objects in destructor"
     116           0 :                << std::endl;
     117             : 
     118           0 :         for( ObjectsHash::const_iterator i = _objects->begin();
     119           0 :              i != _objects->end(); ++i )
     120             :         {
     121           0 :             const Objects& objects = i->second;
     122           0 :             LBWARN << "  " << objects.size() << " objects with id "
     123           0 :                    << i->first << std::endl;
     124             : 
     125           0 :             for( Objects::const_iterator j = objects.begin();
     126           0 :                  j != objects.end(); ++j )
     127             :             {
     128           0 :                 const Object* object = *j;
     129           0 :                 LBINFO << "    object type " << lunchbox::className( object )
     130           0 :                        << std::endl;
     131             :             }
     132             :         }
     133             :     }
     134             :     //LBASSERT( _objects->empty( ))
     135             : #endif
     136          53 :    clear();
     137          53 :    delete _instanceCache;
     138          53 :    _instanceCache = 0;
     139         106 : }
     140             : 
     141         102 : void ObjectStore::clear( )
     142             : {
     143         102 :     LBASSERT( _objects->empty( ));
     144         102 :     expireInstanceData( 0 );
     145         102 :     LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
     146             : 
     147         102 :     _objects->clear();
     148         102 :     _sendQueue.clear();
     149         102 : }
     150             : 
     151           0 : void ObjectStore::disableInstanceCache()
     152             : {
     153           0 :     LBASSERT( _localNode->isClosed( ));
     154           0 :     delete _instanceCache;
     155           0 :     _instanceCache = 0;
     156           0 : }
     157             : 
     158         102 : void ObjectStore::expireInstanceData( const int64_t age )
     159             : {
     160         102 :     if( _instanceCache )
     161         102 :         _instanceCache->expire( age );
     162         102 : }
     163             : 
     164         115 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
     165             : {
     166         115 :     if( _instanceCache )
     167         115 :         _instanceCache->remove( nodeID );
     168         115 : }
     169             : 
     170           0 : void ObjectStore::enableSendOnRegister()
     171             : {
     172           0 :     ++_sendOnRegister;
     173           0 : }
     174             : 
     175           0 : void ObjectStore::disableSendOnRegister()
     176             : {
     177           0 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     178             :     {
     179             :         lunchbox::Request< void > request =
     180           0 :             _localNode->registerRequest< void >();
     181           0 :         _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
     182             :     }
     183             :     else // OPT
     184           0 :         --_sendOnRegister;
     185           0 : }
     186             : 
     187             : //---------------------------------------------------------------------------
     188             : // identifier master node mapping
     189             : //---------------------------------------------------------------------------
     190          40 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
     191             : {
     192          40 :     LB_TS_NOT_THREAD( _commandThread );
     193             : 
     194             :     // OPT: look up locally first?
     195          80 :     const Nodes& nodes = _localNode->getNodes();
     196             : 
     197             :     // OPT: send to multiple nodes at once?
     198          40 :     for( NodePtr node : nodes )
     199             :     {
     200             :         lunchbox::Request< NodeID > request =
     201          40 :             _localNode->registerRequest< NodeID >();
     202             : 
     203          80 :         LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
     204         120 :                              << " req " << request.getID() << std::endl;
     205          40 :         node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
     206             : 
     207             :         try
     208             :         {
     209          40 :             const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
     210             : 
     211          40 :             if( masterNodeID != 0 )
     212             :             {
     213          40 :                 LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
     214          40 :                                      << masterNodeID << std::endl;
     215          40 :                 return masterNodeID;
     216             :             }
     217             :         }
     218           0 :         catch( const lunchbox::FutureTimeout& )
     219             :         {
     220           0 :             _localNode->unregisterRequest( request.getID( ));
     221           0 :             throw;
     222             :         }
     223             :     }
     224             : 
     225           0 :     return NodeID();
     226             : }
     227             : 
     228             : //---------------------------------------------------------------------------
     229             : // object mapping
     230             : //---------------------------------------------------------------------------
     231          21 : void ObjectStore::attach( Object* object, const uint128_t& id,
     232             :                           const uint32_t instanceID )
     233             : {
     234          21 :     LBASSERT( object );
     235          21 :     LB_TS_NOT_THREAD( _receiverThread );
     236             : 
     237             :     lunchbox::Request< void > request =
     238          42 :         _localNode->registerRequest< void >( object );
     239          21 :     _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
     240          21 : }
     241             : 
     242             : namespace
     243             : {
     244          63 : uint32_t _genNextID( lunchbox::a_int32_t& val )
     245             : {
     246             :     uint32_t result;
     247           0 :     do
     248             :     {
     249          63 :         const long id = ++val;
     250          63 :         result = static_cast< uint32_t >(
     251          63 :             static_cast< int64_t >( id ) + 0x7FFFFFFFu );
     252             :     }
     253          63 :     while( result > CO_INSTANCE_MAX );
     254             : 
     255          63 :     return result;
     256             : }
     257             : }
     258             : 
     259          63 : void ObjectStore::_attach( Object* object, const uint128_t& id,
     260             :                            const uint32_t inInstanceID )
     261             : {
     262          63 :     LBASSERT( object );
     263          63 :     LB_TS_THREAD( _receiverThread );
     264             : 
     265          63 :     uint32_t instanceID = inInstanceID;
     266          63 :     if( inInstanceID == CO_INSTANCE_INVALID )
     267          21 :         instanceID = _genNextID( _instanceIDs );
     268             : 
     269          63 :     object->attach( id, instanceID );
     270             : 
     271             :     {
     272         126 :         lunchbox::ScopedFastWrite mutex( _objects );
     273          63 :         Objects& objects = _objects.data[ id ];
     274          63 :         LBASSERTINFO( !object->isMaster() || objects.empty(),
     275             :                       "Attaching master " << *object << ", " <<
     276             :                       objects.size() << " attached objects with same ID, " <<
     277             :                       "first is " << ( objects[0]->isMaster() ? "master " :
     278             :                                        "slave " ) << *objects[0] );
     279          63 :         objects.push_back( object );
     280             :     }
     281             : 
     282          63 :     _localNode->flushCommands(); // redispatch pending commands
     283             : 
     284          63 :     LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
     285          63 :                          << static_cast< void* >( object ) << std::endl;
     286          63 : }
     287             : 
     288          29 : void ObjectStore::detach( Object* object )
     289             : {
     290          29 :     LBASSERT( object );
     291          29 :     LB_TS_NOT_THREAD( _receiverThread );
     292             : 
     293          58 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     294          58 :     _localNode->send( CMD_NODE_DETACH_OBJECT )
     295          87 :         << object->getID() << object->getInstanceID() << request;
     296          29 : }
     297             : 
     298           0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
     299             : {
     300           0 :     LBASSERT( newObject );
     301           0 :     LBASSERT( oldObject );
     302           0 :     LBASSERT( oldObject->isMaster() );
     303           0 :     LB_TS_THREAD( _receiverThread );
     304             : 
     305           0 :     if( !oldObject->isAttached() )
     306           0 :         return;
     307             : 
     308           0 :     LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
     309           0 :                          << std::endl;
     310           0 :     const uint128_t& id = oldObject->getID();
     311             : 
     312           0 :     lunchbox::ScopedFastWrite mutex( _objects );
     313           0 :     ObjectsHash::iterator i = _objects->find( id );
     314           0 :     LBASSERT( i != _objects->end( ));
     315           0 :     if( i == _objects->end( ))
     316           0 :         return;
     317             : 
     318           0 :     Objects& objects = i->second;
     319           0 :     Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
     320           0 :     LBASSERT( j != objects.end( ));
     321           0 :     if( j == objects.end( ))
     322           0 :         return;
     323             : 
     324           0 :     newObject->transfer( oldObject );
     325           0 :     *j = newObject;
     326             : }
     327             : 
     328          62 : void ObjectStore::_detach( Object* object )
     329             : {
     330             :     // check also _cmdUnmapObject when modifying!
     331          62 :     LBASSERT( object );
     332          62 :     LB_TS_THREAD( _receiverThread );
     333             : 
     334          62 :     if( !object->isAttached() )
     335           0 :         return;
     336             : 
     337          62 :     const uint128_t& id = object->getID();
     338             : 
     339          62 :     LBASSERT( _objects->find( id ) != _objects->end( ));
     340          62 :     LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
     341             : 
     342          62 :     Objects& objects = _objects.data[ id ];
     343          62 :     Objects::iterator i = find( objects.begin(),objects.end(), object );
     344          62 :     LBASSERT( i != objects.end( ));
     345             : 
     346             :     {
     347         124 :         lunchbox::ScopedFastWrite mutex( _objects );
     348          62 :         objects.erase( i );
     349          62 :         if( objects.empty( ))
     350          61 :             _objects->erase( id );
     351             :     }
     352             : 
     353          62 :     LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
     354          62 :     object->detach();
     355          62 :     return;
     356             : }
     357             : 
     358          42 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
     359             :                              const uint128_t& version, NodePtr master )
     360             : {
     361          42 :     LB_TS_NOT_THREAD( _receiverThread );
     362          42 :     LBLOG( LOG_OBJECTS )
     363          42 :         << "Mapping " << lunchbox::className( object ) << " to id " << id
     364         126 :         << " version " << version << std::endl;
     365          42 :     LBASSERT( object );
     366          42 :     LBASSERTINFO( id.isUUID(), id );
     367             : 
     368          42 :     if( !master )
     369          40 :         master = _localNode->connectObjectMaster( id );
     370             : 
     371          42 :     if( !master || !master->isReachable( ))
     372             :     {
     373           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     374           0 :                << std::endl;
     375           0 :         return LB_UNDEFINED_UINT32;
     376             :     }
     377             : 
     378          42 :     if( !object || !id.isUUID( ))
     379             :     {
     380           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     381           0 :         return LB_UNDEFINED_UINT32;
     382             :     }
     383             : 
     384          42 :     const bool isAttached = object->isAttached();
     385          42 :     const bool isMaster = object->isMaster();
     386          42 :     LBASSERTINFO( !isAttached, *object );
     387          42 :     LBASSERT( !isMaster ) ;
     388          42 :     if( isAttached || isMaster )
     389             :     {
     390           0 :         LBWARN << "Invalid object state: attached " << isAttached << " master "
     391           0 :                << isMaster << std::endl;
     392           0 :         return LB_UNDEFINED_UINT32;
     393             :     }
     394             : 
     395          42 :     const uint32_t request = _localNode->registerRequest( object );
     396          42 :     uint128_t minCachedVersion = VERSION_HEAD;
     397          42 :     uint128_t maxCachedVersion = VERSION_NONE;
     398          42 :     uint32_t masterInstanceID = 0;
     399          42 :     const bool useCache = _checkInstanceCache( id, minCachedVersion,
     400             :                                                maxCachedVersion,
     401          42 :                                                masterInstanceID );
     402          42 :     object->notifyAttach();
     403          84 :     master->send( CMD_NODE_MAP_OBJECT )
     404          42 :         << version << minCachedVersion << maxCachedVersion << id
     405         126 :         << object->getMaxVersions() << request << _genNextID( _instanceIDs )
     406          42 :         << masterInstanceID << useCache;
     407          42 :     return request;
     408             : }
     409             : 
     410          46 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
     411             :                                        uint128_t& to, uint32_t& instanceID )
     412             : {
     413          46 :     if( !_instanceCache )
     414           0 :         return false;
     415             : 
     416          46 :     const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     417          46 :     if( cached == InstanceCache::Data::NONE )
     418          46 :         return false;
     419             : 
     420           0 :     const ObjectDataIStreamDeque& versions = cached.versions;
     421           0 :     LBASSERT( !cached.versions.empty( ));
     422           0 :     instanceID = cached.masterInstanceID;
     423           0 :     from = versions.front()->getVersion();
     424           0 :     to = versions.back()->getVersion();
     425           0 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
     426           0 :                          << std::endl;
     427           0 :     return true;
     428             : }
     429             : 
     430          42 : bool ObjectStore::mapSync( const uint32_t requestID )
     431             : {
     432          42 :     if( requestID == LB_UNDEFINED_UINT32 )
     433           0 :         return false;
     434             : 
     435          42 :     void* data = _localNode->getRequestData( requestID );
     436          42 :     if( data == 0 )
     437           0 :         return false;
     438             : 
     439          42 :     Object* object = LBSAFECAST( Object*, data );
     440          42 :     uint128_t version = VERSION_NONE;
     441          42 :     _localNode->waitRequest( requestID, version );
     442             : 
     443          42 :     const bool mapped = object->isAttached();
     444          42 :     if( mapped )
     445          42 :         object->applyMapData( version ); // apply initial instance data
     446             : 
     447          42 :     object->notifyAttached();
     448          42 :     LBLOG( LOG_OBJECTS )
     449          84 :         << "Mapped " << lunchbox::className( object ) << std::endl;
     450          42 :     return mapped;
     451             : }
     452             : 
     453             : 
     454           4 : f_bool_t ObjectStore::sync( Object* object, const uint128_t& id, NodePtr master,
     455             :                             const uint32_t instanceID )
     456             : {
     457           4 :     const uint32_t request = _startSync( object, id, master, instanceID );
     458             :     const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
     459           8 :                                                  this, request, object );
     460           8 :     return f_bool_t( new FuturebImpl( func ));
     461             : }
     462             : 
     463           4 : uint32_t ObjectStore::_startSync( Object* object, const uint128_t& id,
     464             :                                   NodePtr master, const uint32_t instanceID )
     465             : {
     466           4 :     LB_TS_NOT_THREAD( _receiverThread );
     467           4 :     LBLOG( LOG_OBJECTS )
     468           4 :         << "Syncing " << lunchbox::className( object ) << " with id " << id
     469          12 :         << std::endl;
     470           4 :     LBASSERT( object );
     471           4 :     LBASSERTINFO( id.isUUID(), id );
     472             : 
     473           4 :     if( !object || !id.isUUID( ))
     474             :     {
     475           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     476           0 :         return LB_UNDEFINED_UINT32;
     477             :     }
     478             : 
     479           4 :     if( !master )
     480           0 :         master = _localNode->connectObjectMaster( id );
     481             : 
     482           4 :     if( !master || !master->isReachable( ))
     483             :     {
     484           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     485           0 :                << std::endl;
     486           0 :         return LB_UNDEFINED_UINT32;
     487             :     }
     488             : 
     489             :     const uint32_t request =
     490           4 :         _localNode->registerRequest( new ObjectDataIStream );
     491           4 :     uint128_t minCachedVersion = VERSION_HEAD;
     492           4 :     uint128_t maxCachedVersion = VERSION_NONE;
     493           4 :     uint32_t cacheInstanceID = 0;
     494             : 
     495           4 :     bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
     496           4 :                                          cacheInstanceID );
     497           4 :     if( useCache )
     498             :     {
     499           0 :         switch( instanceID )
     500             :         {
     501             :         case CO_INSTANCE_ALL:
     502           0 :             break;
     503             :         default:
     504           0 :             if( instanceID == cacheInstanceID )
     505           0 :                 break;
     506             : 
     507           0 :             useCache = false;
     508           0 :             LBCHECK( _instanceCache->release( id, 1 ));
     509           0 :             break;
     510             :         }
     511             :     }
     512             : 
     513             :     // Use stream expected by MasterCMCommand
     514           8 :     master->send( CMD_NODE_SYNC_OBJECT )
     515           4 :         << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
     516          12 :         << uint64_t(0) /* maxVersions */ << request << instanceID
     517           4 :         << cacheInstanceID << useCache;
     518           4 :     return request;
     519             : }
     520             : 
     521           4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
     522             : {
     523           4 :     if( requestID == LB_UNDEFINED_UINT32 )
     524           0 :         return false;
     525             : 
     526           4 :     void* data = _localNode->getRequestData( requestID );
     527           4 :     if( data == 0 )
     528           0 :         return false;
     529             : 
     530           4 :     ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
     531             : 
     532           4 :     bool ok = false;
     533           4 :     _localNode->waitRequest( requestID, ok );
     534             : 
     535           4 :     if( !ok )
     536             :     {
     537           0 :         LBWARN << "Object synchronization failed" << std::endl;
     538           0 :         delete is;
     539           0 :         return false;
     540             :     }
     541             : 
     542           4 :     is->waitReady();
     543           4 :     object->applyInstanceData( *is );
     544           8 :     LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
     545          12 :                          << std::endl;
     546           4 :     delete is;
     547           4 :     return true;
     548             : }
     549             : 
     550          42 : void ObjectStore::unmap( Object* object )
     551             : {
     552          42 :     LBASSERT( object );
     553          42 :     if( !object->isAttached( )) // not registered
     554          35 :         return;
     555             : 
     556          41 :     const uint128_t& id = object->getID();
     557             : 
     558          41 :     LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
     559             : 
     560          41 :     object->notifyDetach();
     561             : 
     562             :     // send unsubscribe to master, master will send detach command.
     563          41 :     LBASSERT( !object->isMaster( ));
     564          41 :     LB_TS_NOT_THREAD( _commandThread );
     565             : 
     566          41 :     const uint32_t masterInstanceID = object->getMasterInstanceID();
     567          41 :     if( masterInstanceID != CO_INSTANCE_INVALID )
     568             :     {
     569          33 :         NodePtr master = object->getMasterNode();
     570          33 :         LBASSERT( master )
     571             : 
     572          33 :         if( master && master->isReachable( ))
     573             :         {
     574             :             lunchbox::Request< void > request =
     575          66 :                 _localNode->registerRequest< void >();
     576          66 :             master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
     577          99 :                 << id << request << masterInstanceID << object->getInstanceID();
     578          33 :             request.wait();
     579          33 :             object->notifyDetached();
     580          33 :             return;
     581             :         }
     582           0 :         LBERROR << "Master node for object id " << id << " not connected"
     583           0 :                 << std::endl;
     584             :     }
     585             : 
     586             :     // no unsubscribe sent: Detach directly
     587           8 :     detach( object );
     588           8 :     object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
     589           8 :     object->notifyDetached();
     590             : }
     591             : 
     592          21 : bool ObjectStore::register_( Object* object )
     593             : {
     594          21 :     LBASSERT( object );
     595          21 :     LBASSERT( !object->isAttached( ));
     596             : 
     597          21 :     const uint128_t& id = object->getID( );
     598          21 :     LBASSERTINFO( id.isUUID(), id );
     599             : 
     600          21 :     object->notifyAttach();
     601          42 :     object->setupChangeManager( object->getChangeType(), true, _localNode,
     602          21 :                                 CO_INSTANCE_INVALID );
     603          21 :     attach( object, id, CO_INSTANCE_INVALID );
     604             : 
     605          21 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
     606          21 :         _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
     607             : 
     608          21 :     object->notifyAttached();
     609             : 
     610          21 :     LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
     611          21 :     return true;
     612             : }
     613             : 
     614          21 : void ObjectStore::deregister( Object* object )
     615             : {
     616          21 :     LBASSERT( object );
     617          21 :     if( !object->isAttached( )) // not registered
     618           0 :         return;
     619             : 
     620          21 :     LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
     621          21 :     LBASSERT( object->isMaster( ));
     622             : 
     623          21 :     object->notifyDetach();
     624             : 
     625          21 :     if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0  )
     626             :     {
     627             :         // remove from send queue
     628             :         lunchbox::Request< void > request =
     629          42 :             _localNode->registerRequest< void >();
     630          21 :         _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
     631             :     }
     632             : 
     633          21 :     const uint128_t id = object->getID();
     634          21 :     detach( object );
     635          21 :     object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
     636          21 :     if( _instanceCache )
     637          21 :         _instanceCache->erase( id );
     638          21 :     object->notifyDetached();
     639             : }
     640             : 
     641       51574 : bool ObjectStore::notifyCommandThreadIdle()
     642             : {
     643       51574 :     LB_TS_THREAD( _commandThread );
     644       51574 :     if( _sendQueue.empty( ))
     645       51574 :         return false;
     646             : 
     647           0 :     LBASSERT( _sendOnRegister > 0 );
     648           0 :     SendQueueItem& item = _sendQueue.front();
     649             : 
     650           0 :     if( item.age > _localNode->getTime64( ))
     651             :     {
     652           0 :         const Nodes& nodes = _localNode->getNodes( false );
     653           0 :         if( nodes.empty( ))
     654             :         {
     655           0 :             lunchbox::Thread::yield();
     656           0 :             return !_sendQueue.empty();
     657             :         }
     658             : 
     659           0 :         item.object->sendInstanceData( nodes );
     660             :     }
     661           0 :     _sendQueue.pop_front();
     662           0 :     return !_sendQueue.empty();
     663             : }
     664             : 
     665           8 : void ObjectStore::removeNode( NodePtr node )
     666             : {
     667          16 :     lunchbox::Request< void > request = _localNode->registerRequest< void >();
     668           8 :     _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
     669           8 : }
     670             : 
     671             : //===========================================================================
     672             : // ICommand handling
     673             : //===========================================================================
     674         547 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
     675             : {
     676         547 :     LB_TS_THREAD( _receiverThread );
     677        1094 :     ObjectICommand command( cmd );
     678         547 :     const uint128_t& id = command.getObjectID();
     679         547 :     const uint32_t instanceID = command.getInstanceID();
     680             : 
     681         547 :     ObjectsHash::const_iterator i = _objects->find( id );
     682             : 
     683         546 :     if( i == _objects->end( ))
     684             :         // When the instance ID is set to none, we only care about the command
     685             :         // when we have an object of the given ID (multicast)
     686           0 :         return ( instanceID == CO_INSTANCE_NONE );
     687             : 
     688         546 :     const Objects& objects = i->second;
     689         546 :     LBASSERTINFO( !objects.empty(), command );
     690             : 
     691         546 :     if( instanceID <= CO_INSTANCE_MAX )
     692             :     {
     693          60 :         for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
     694             :         {
     695          60 :             Object* object = *j;
     696          60 :             if( instanceID == object->getInstanceID( ))
     697             :             {
     698          54 :                 LBCHECK( object->dispatchCommand( command ));
     699          54 :                 return true;
     700             :             }
     701             :         }
     702           0 :         LBERROR << "Can't find object instance " << instanceID << " for "
     703           0 :                 << command << std::endl;
     704           0 :         LBUNREACHABLE;
     705           0 :         return false;
     706             :     }
     707             : 
     708         492 :     Objects::const_iterator j = objects.begin();
     709         492 :     Object* object = *j;
     710         492 :     LBCHECK( object->dispatchCommand( command ));
     711             : 
     712         493 :     for( ++j; j != objects.end(); ++j )
     713             :     {
     714           0 :         object = *j;
     715           0 :         LBCHECK( object->dispatchCommand( command ));
     716             :     }
     717         493 :     return true;
     718             : }
     719             : 
     720          40 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
     721             : {
     722          40 :     LB_TS_THREAD( _commandThread );
     723             : 
     724          40 :     const uint128_t& id = command.get< uint128_t >();
     725          40 :     const uint32_t requestID = command.get< uint32_t >();
     726          40 :     LBASSERT( id.isUUID( ));
     727             : 
     728          40 :     NodeID masterNodeID;
     729             :     {
     730          80 :         lunchbox::ScopedFastRead mutex( _objects );
     731          40 :         ObjectsHashCIter i = _objects->find( id );
     732             : 
     733          40 :         if( i != _objects->end( ))
     734             :         {
     735          40 :             const Objects& objects = i->second;
     736          40 :             LBASSERT( !objects.empty( ));
     737             : 
     738          40 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     739             :             {
     740          40 :                 Object* object = *j;
     741          40 :                 if( object->isMaster( ))
     742          40 :                     masterNodeID = _localNode->getNodeID();
     743             :                 else
     744             :                 {
     745           0 :                     NodePtr master = object->getMasterNode();
     746           0 :                     if( master.isValid( ))
     747           0 :                         masterNodeID = master->getNodeID();
     748             :                 }
     749          40 :                 if( masterNodeID != 0 )
     750          40 :                     break;
     751             :             }
     752             :         }
     753             :     }
     754             : 
     755          40 :     LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
     756          40 :                          << " req " << requestID << std::endl;
     757          80 :     command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
     758          40 :             << masterNodeID << requestID;
     759          40 :     return true;
     760             : }
     761             : 
     762          40 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
     763             : {
     764          40 :     const NodeID& masterNodeID = command.get< NodeID >();
     765          40 :     const uint32_t requestID = command.get< uint32_t >();
     766          40 :     _localNode->serveRequest( requestID, masterNodeID );
     767          40 :     return true;
     768             : }
     769             : 
     770          21 : bool ObjectStore::_cmdAttach( ICommand& command )
     771             : {
     772          21 :     LB_TS_THREAD( _receiverThread );
     773          21 :     LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
     774             : 
     775          21 :     const uint128_t& objectID = command.get< uint128_t >();
     776          21 :     const uint32_t instanceID = command.get< uint32_t >();
     777          21 :     const uint32_t requestID = command.get< uint32_t >();
     778             : 
     779          21 :     Object* object = static_cast< Object* >( _localNode->getRequestData(
     780          21 :                                                  requestID ));
     781          21 :     _attach( object, objectID, instanceID );
     782          21 :     _localNode->serveRequest( requestID );
     783          21 :     return true;
     784             : }
     785             : 
     786          62 : bool ObjectStore::_cmdDetach( ICommand& command )
     787             : {
     788          62 :     LB_TS_THREAD( _receiverThread );
     789          62 :     LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
     790             : 
     791          62 :     const uint128_t& objectID = command.get< uint128_t >();
     792          62 :     const uint32_t instanceID = command.get< uint32_t >();
     793          62 :     const uint32_t requestID = command.get< uint32_t >();
     794             : 
     795          62 :     ObjectsHash::const_iterator i = _objects->find( objectID );
     796          62 :     if( i != _objects->end( ))
     797             :     {
     798          62 :         const Objects& objects = i->second;
     799             : 
     800         189 :         for( Objects::const_iterator j = objects.begin();
     801         126 :              j != objects.end(); ++j )
     802             :         {
     803          63 :             Object* object = *j;
     804          63 :             if( object->getInstanceID() == instanceID )
     805             :             {
     806          62 :                 _detach( object );
     807          62 :                 break;
     808             :             }
     809             :         }
     810             :     }
     811             : 
     812          62 :     LBASSERT( requestID != LB_UNDEFINED_UINT32 );
     813          62 :     _localNode->serveRequest( requestID );
     814          62 :     return true;
     815             : }
     816             : 
     817          21 : bool ObjectStore::_cmdRegister( ICommand& command )
     818             : {
     819          21 :     LB_TS_THREAD( _commandThread );
     820          21 :     if( _sendOnRegister <= 0 )
     821          21 :         return true;
     822             : 
     823           0 :     LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
     824             : 
     825           0 :     Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
     826             : 
     827             :     const int32_t age = Global::getIAttribute(
     828           0 :                             Global::IATTR_NODE_SEND_QUEUE_AGE );
     829           0 :     SendQueueItem item;
     830           0 :     item.age = age ? age + _localNode->getTime64() :
     831             :                      std::numeric_limits< int64_t >::max();
     832           0 :     item.object = object;
     833           0 :     _sendQueue.push_back( item );
     834             : 
     835           0 :     const uint32_t size = Global::getIAttribute(
     836           0 :                              Global::IATTR_NODE_SEND_QUEUE_SIZE );
     837           0 :     while( _sendQueue.size() > size )
     838           0 :         _sendQueue.pop_front();
     839             : 
     840           0 :     return true;
     841             : }
     842             : 
     843          21 : bool ObjectStore::_cmdDeregister( ICommand& command )
     844             : {
     845          21 :     LB_TS_THREAD( _commandThread );
     846          21 :     LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
     847             : 
     848          21 :     const uint32_t requestID = command.get< uint32_t >();
     849             : 
     850          21 :     const void* object = _localNode->getRequestData( requestID );
     851             : 
     852          21 :     for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
     853             :     {
     854           0 :         if( i->object == object )
     855             :         {
     856           0 :             _sendQueue.erase( i );
     857           0 :             break;
     858             :         }
     859             :     }
     860             : 
     861          21 :     _localNode->serveRequest( requestID );
     862          21 :     return true;
     863             : }
     864             : 
     865          42 : bool ObjectStore::_cmdMap( ICommand& cmd )
     866             : {
     867          42 :     LB_TS_THREAD( _commandThread );
     868             : 
     869          84 :     MasterCMCommand command( cmd );
     870          42 :     const uint128_t& id = command.getObjectID();
     871             : 
     872          42 :     LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
     873           0 :                          << command.getInstanceID() << " req "
     874          42 :                          << command.getRequestID() << std::endl;
     875             : 
     876          84 :     ObjectCMPtr masterCM;
     877             :     {
     878          84 :         lunchbox::ScopedFastRead mutex( _objects );
     879          42 :         ObjectsHash::const_iterator i = _objects->find( id );
     880          42 :         if( i != _objects->end( ))
     881             :         {
     882          42 :             const Objects& objects = i->second;
     883             : 
     884          42 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
     885             :             {
     886          42 :                 Object* object = *j;
     887          42 :                 if( object->isMaster( ))
     888             :                 {
     889          42 :                     masterCM = object->_getChangeManager();
     890          42 :                     break;
     891             :                 }
     892             :             }
     893             :         }
     894             :     }
     895             : 
     896          42 :     if( !masterCM || !masterCM->addSlave( command ))
     897             :     {
     898           0 :         LBWARN << "Can't find master object to map " << id << std::endl;
     899           0 :         NodePtr node = command.getNode();
     900           0 :         node->send( CMD_NODE_MAP_OBJECT_REPLY )
     901           0 :             << node->getNodeID() << id << command.getRequestedVersion()
     902           0 :             << command.getRequestID() << false << command.useCache() << false;
     903             :     }
     904             : 
     905          42 :     ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
     906          84 :     return true;
     907             : }
     908             : 
     909          42 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
     910             : {
     911          42 :     LB_TS_THREAD( _receiverThread );
     912             : 
     913          42 :     const uint128_t& nodeID = command.get< uint128_t >();
     914          42 :     const uint128_t& objectID = command.get< uint128_t >();
     915          42 :     const uint32_t requestID = command.get< uint32_t >();
     916          42 :     const uint32_t instanceID = command.get< uint32_t >();
     917          42 :     const Object::ChangeType changeType = command.get< Object::ChangeType >();
     918          42 :     const uint32_t masterInstanceID = command.get< uint32_t >();
     919             : 
     920             :     // Map success commands are potentially multicasted (see above)
     921             :     // verify that we are the intended receiver
     922          42 :     if( nodeID != _localNode->getNodeID( ))
     923           0 :         return true;
     924             : 
     925          42 :     LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
     926           0 :                          << " id " << objectID << "." << instanceID
     927          42 :                          << " req " << requestID << std::endl;
     928             : 
     929             :     // set up change manager and attach object to dispatch table
     930             :     Object* object = static_cast< Object* >(
     931          42 :         _localNode->getRequestData( requestID ));
     932          42 :     LBASSERT( object );
     933          42 :     LBASSERT( !object->isMaster( ));
     934             : 
     935          84 :     object->setupChangeManager( changeType, false, _localNode,
     936          42 :                                 masterInstanceID );
     937          42 :     _attach( object, objectID, instanceID );
     938          42 :     return true;
     939             : }
     940             : 
     941          42 : bool ObjectStore::_cmdMapReply( ICommand& command )
     942             : {
     943          42 :     LB_TS_THREAD( _receiverThread );
     944             : 
     945             :     // Map reply commands are potentially multicasted (see above)
     946             :     // verify that we are the intended receiver
     947          42 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
     948           0 :         return true;
     949             : 
     950          42 :     const uint128_t& objectID = command.get< uint128_t >();
     951          42 :     const uint128_t& version = command.get< uint128_t >();
     952          42 :     const uint32_t requestID = command.get< uint32_t >();
     953          42 :     const bool result = command.get< bool >();
     954          42 :     const bool releaseCache = command.get< bool >();
     955          42 :     const bool useCache = command.get< bool >();
     956             : 
     957          42 :     LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
     958          42 :                          << objectID << " req " << requestID << std::endl;
     959             : 
     960          42 :     LBASSERT( _localNode->getRequestData( requestID ));
     961             : 
     962          42 :     if( result )
     963             :     {
     964             :         Object* object = static_cast<Object*>(
     965          42 :             _localNode->getRequestData( requestID ));
     966          42 :         LBASSERT( object );
     967          42 :         LBASSERT( !object->isMaster( ));
     968             : 
     969          42 :         object->setMasterNode( command.getNode( ));
     970             : 
     971          42 :         if( useCache )
     972             :         {
     973           0 :             LBASSERT( releaseCache );
     974           0 :             LBASSERT( _instanceCache );
     975             : 
     976           0 :             const uint128_t& id = objectID;
     977           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
     978           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
     979           0 :             LBASSERT( !cached.versions.empty( ));
     980             : 
     981           0 :             object->addInstanceDatas( cached.versions, version );
     982           0 :             LBCHECK( _instanceCache->release( id, 2 ));
     983             :         }
     984          42 :         else if( releaseCache )
     985             :         {
     986           0 :             LBCHECK( _instanceCache->release( objectID, 1 ));
     987             :         }
     988             :     }
     989             :     else
     990             :     {
     991           0 :         if( releaseCache )
     992           0 :             _instanceCache->release( objectID, 1 );
     993             : 
     994           0 :         LBWARN << "Could not map object " << objectID << std::endl;
     995             :     }
     996             : 
     997          42 :     _localNode->serveRequest( requestID, version );
     998          42 :     return true;
     999             : }
    1000             : 
    1001          33 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
    1002             : {
    1003          33 :     LB_TS_THREAD( _commandThread );
    1004          33 :     LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object  " << command << std::endl;
    1005             : 
    1006          33 :     const uint128_t& id = command.get< uint128_t >();
    1007          33 :     const uint32_t requestID = command.get< uint32_t >();
    1008          33 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1009          33 :     const uint32_t slaveInstanceID = command.get< uint32_t >();
    1010             : 
    1011          66 :     NodePtr node = command.getNode();
    1012             : 
    1013             :     {
    1014          66 :         lunchbox::ScopedFastWrite mutex( _objects );
    1015          33 :         ObjectsHash::const_iterator i = _objects->find( id );
    1016          33 :         if( i != _objects->end( ))
    1017             :         {
    1018          33 :             const Objects& objects = i->second;
    1019          33 :             for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1020             :             {
    1021          33 :                 Object* object = *j;
    1022          66 :                 if( object->isMaster() &&
    1023          33 :                     object->getInstanceID() == masterInstanceID )
    1024             :                 {
    1025          33 :                     object->removeSlave( node, slaveInstanceID );
    1026          33 :                     break;
    1027             :                 }
    1028             :             }
    1029             :         }
    1030             :     }
    1031             : 
    1032          33 :     node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
    1033          66 :     return true;
    1034             : }
    1035             : 
    1036           1 : bool ObjectStore::_cmdUnmap( ICommand& command )
    1037             : {
    1038           1 :     LB_TS_THREAD( _receiverThread );
    1039           1 :     LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
    1040             : 
    1041           1 :     const uint128_t& objectID = command.get< uint128_t >();
    1042             : 
    1043           1 :     if( _instanceCache )
    1044           1 :         _instanceCache->erase( objectID );
    1045             : 
    1046           1 :     ObjectsHash::iterator i = _objects->find( objectID );
    1047           1 :     if( i == _objects->end( )) // nothing to do
    1048           0 :         return true;
    1049             : 
    1050           2 :     const Objects objects = i->second;
    1051             :     {
    1052           2 :         lunchbox::ScopedFastWrite mutex( _objects );
    1053           1 :         _objects->erase( i );
    1054             :     }
    1055             : 
    1056           2 :     for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
    1057             :     {
    1058           1 :         Object* object = *j;
    1059           1 :         object->detach();
    1060             :     }
    1061             : 
    1062           1 :     return true;
    1063             : }
    1064             : 
    1065           4 : bool ObjectStore::_cmdSync( ICommand& cmd )
    1066             : {
    1067           4 :     LB_TS_THREAD( _commandThread );
    1068           8 :     MasterCMCommand command( cmd );
    1069           4 :     const uint128_t& id = command.getObjectID();
    1070           4 :     LBINFO << command.getNode() << std::endl;
    1071           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
    1072           0 :                          << command.getInstanceID() << " req "
    1073           4 :                          << command.getRequestID() << std::endl;
    1074             : 
    1075           4 :     const uint32_t cacheInstanceID = command.getMasterInstanceID();
    1076           8 :     ObjectCMPtr cm;
    1077             :     {
    1078           8 :         lunchbox::ScopedFastRead mutex( _objects );
    1079           4 :         ObjectsHash::const_iterator i = _objects->find( id );
    1080           4 :         if( i != _objects->end( ))
    1081             :         {
    1082           4 :             const Objects& objects = i->second;
    1083             : 
    1084           8 :             for( Object* object : objects )
    1085             :             {
    1086           4 :                 if( command.getInstanceID() == object->getInstanceID( ))
    1087             :                 {
    1088           0 :                     cm = object->_getChangeManager();
    1089           0 :                     LBASSERT( cm );
    1090           0 :                     break;
    1091             :                 }
    1092             : 
    1093           4 :                 if( command.getInstanceID() != CO_INSTANCE_ALL )
    1094           0 :                     continue;
    1095             : 
    1096           4 :                 cm = object->_getChangeManager();
    1097           4 :                 LBASSERT( cm );
    1098           4 :                 if( cacheInstanceID == object->getInstanceID( ))
    1099           0 :                     break;
    1100             :             }
    1101           4 :             if( !cm )
    1102           0 :                 LBWARN << "Can't find object to sync " << id << "."
    1103           0 :                        << command.getInstanceID() << " in " << objects.size()
    1104           0 :                        << " instances" << std::endl;
    1105             :         }
    1106           4 :         if( !cm )
    1107           0 :             LBWARN << "Can't find object to sync " << id
    1108           0 :                    << ", no object with identifier" << std::endl;
    1109             :     }
    1110           4 :     if( !cm || !cm->sendSync( command ))
    1111             :     {
    1112           0 :         NodePtr node = command.getNode();
    1113           0 :         node->send( CMD_NODE_SYNC_OBJECT_REPLY )
    1114           0 :             << node->getNodeID() << id << command.getRequestID()
    1115           0 :             << false << command.useCache() << false;
    1116             :     }
    1117           8 :     return true;
    1118             : }
    1119             : 
    1120           4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
    1121             : {
    1122           4 :     LB_TS_THREAD( _receiverThread );
    1123             : 
    1124             :     // Sync reply commands are potentially multicasted (see above)
    1125             :     // verify that we are the intended receiver
    1126           4 :     if( command.get< uint128_t >() != _localNode->getNodeID( ))
    1127           0 :         return true;
    1128             : 
    1129           4 :     const NodeID& id = command.get< NodeID >();
    1130           4 :     const uint32_t requestID = command.get< uint32_t >();
    1131           4 :     const bool result = command.get< bool >();
    1132           4 :     const bool releaseCache = command.get< bool >();
    1133           4 :     const bool useCache = command.get< bool >();
    1134           4 :     void* const data = _localNode->getRequestData( requestID );
    1135           4 :     ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
    1136             : 
    1137           4 :     LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
    1138           4 :                          << requestID << std::endl;
    1139           4 :     if( result )
    1140             :     {
    1141           4 :         if( useCache )
    1142             :         {
    1143           0 :             LBASSERT( releaseCache );
    1144           0 :             LBASSERT( _instanceCache );
    1145             : 
    1146           0 :             const InstanceCache::Data& cached = (*_instanceCache)[ id ];
    1147           0 :             LBASSERT( cached != InstanceCache::Data::NONE );
    1148           0 :             LBASSERT( !cached.versions.empty( ));
    1149             : 
    1150           0 :             *is = *cached.versions.back();
    1151           0 :             LBCHECK( _instanceCache->release( id, 2 ));
    1152             :         }
    1153           4 :         else if( releaseCache )
    1154             :         {
    1155           0 :             LBCHECK( _instanceCache->release( id, 1 ));
    1156             :         }
    1157             :     }
    1158             :     else
    1159             :     {
    1160           0 :         if( releaseCache )
    1161           0 :             _instanceCache->release( id, 1 );
    1162             : 
    1163           0 :         LBWARN << "Could not sync object " << id << " request " << requestID
    1164           0 :                << std::endl;
    1165             :     }
    1166             : 
    1167           4 :     _localNode->serveRequest( requestID, result );
    1168           4 :     return true;
    1169             : }
    1170             : 
    1171          57 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
    1172             : {
    1173          57 :     LB_TS_THREAD( _receiverThread );
    1174          57 :     LBASSERT( _localNode );
    1175             : 
    1176         114 :     ObjectDataICommand command( inCommand );
    1177          57 :     const NodeID& nodeID = command.get< NodeID >();
    1178          57 :     const uint32_t masterInstanceID = command.get< uint32_t >();
    1179          57 :     const uint32_t cmd = command.getCommand();
    1180             : 
    1181          57 :     LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
    1182          57 :                          << masterInstanceID << " node " << nodeID << std::endl;
    1183             : 
    1184          57 :     command.setType( COMMANDTYPE_OBJECT );
    1185          57 :     command.setCommand( CMD_OBJECT_INSTANCE );
    1186             : 
    1187          57 :     const uint128_t& version = command.getVersion();
    1188          57 :     if( _instanceCache && version.high() == 0 )
    1189             :     {
    1190          57 :         const ObjectVersion rev( command.getObjectID(), version );
    1191             : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
    1192          57 :         if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
    1193             : #endif
    1194          50 :             _instanceCache->add( rev, masterInstanceID, command, 0 );
    1195             :     }
    1196             : 
    1197          57 :     switch( cmd )
    1198             :     {
    1199             :     case CMD_NODE_OBJECT_INSTANCE:
    1200           0 :         LBASSERT( nodeID == 0 );
    1201           0 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1202           0 :         return true;
    1203             : 
    1204             :     case CMD_NODE_OBJECT_INSTANCE_MAP:
    1205          38 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1206           0 :             return true;
    1207             : 
    1208          38 :         LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
    1209          38 :         return dispatchObjectCommand( command );
    1210             : 
    1211             :     case CMD_NODE_OBJECT_INSTANCE_COMMIT:
    1212           4 :         LBASSERT( nodeID == 0 );
    1213           4 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1214           4 :         return dispatchObjectCommand( command );
    1215             : 
    1216             :     case CMD_NODE_OBJECT_INSTANCE_PUSH:
    1217           7 :         LBASSERT( nodeID == 0 );
    1218           7 :         LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
    1219           7 :         _pushData.addDataCommand( command.getObjectID(), command );
    1220           7 :         return true;
    1221             : 
    1222             :     case CMD_NODE_OBJECT_INSTANCE_SYNC:
    1223             :     {
    1224           8 :         if( nodeID != _localNode->getNodeID( )) // not for me
    1225           0 :             return true;
    1226             : 
    1227           8 :         void* data = _localNode->getRequestData( command.getInstanceID( ));
    1228           8 :         LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
    1229           8 :         LBASSERTINFO( data, this );
    1230             : 
    1231           8 :         ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
    1232           8 :         is->addDataCommand( command );
    1233           8 :         return true;
    1234             :     }
    1235             : 
    1236             :     default:
    1237           0 :         LBUNREACHABLE;
    1238           0 :         return false;
    1239             :     }
    1240             : }
    1241             : 
    1242           0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
    1243             : {
    1244           0 :     LB_TS_THREAD( _commandThread );
    1245           0 :     LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
    1246             : 
    1247           0 :     if( --_sendOnRegister == 0 )
    1248             :     {
    1249           0 :         _sendQueue.clear();
    1250             : 
    1251           0 :         const Nodes& nodes = _localNode->getNodes( false );
    1252           0 :         for( NodePtr node : nodes )
    1253             :         {
    1254           0 :             ConnectionPtr multicast = node->getConnection( true );
    1255           0 :             ConnectionPtr connection = node->getConnection( false );
    1256           0 :             if( multicast )
    1257           0 :                 multicast->finish();
    1258           0 :             if( connection && connection != multicast )
    1259           0 :                 connection->finish();
    1260             :         }
    1261             :     }
    1262             : 
    1263           0 :     const uint32_t requestID = command.get< uint32_t >();
    1264           0 :     _localNode->serveRequest( requestID );
    1265           0 :     return true;
    1266             : }
    1267             : 
    1268          42 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
    1269             : {
    1270          42 :     LB_TS_THREAD( _commandThread );
    1271          42 :     LBLOG( LOG_OBJECTS ) << "Cmd object  " << command << std::endl;
    1272             : 
    1273          42 :     Node* node = command.get< Node* >();
    1274          42 :     const uint32_t requestID = command.get< uint32_t >();
    1275             : 
    1276          84 :     lunchbox::ScopedFastWrite mutex( _objects );
    1277          67 :     for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
    1278             :     {
    1279          25 :         const Objects& objects = i->second;
    1280          50 :         for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
    1281          25 :             (*j)->removeSlaves( node );
    1282             :     }
    1283             : 
    1284          42 :     if( requestID != LB_UNDEFINED_UINT32 )
    1285           8 :         _localNode->serveRequest( requestID );
    1286             :     else
    1287          34 :         node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
    1288             : 
    1289          84 :     return true;
    1290             : }
    1291             : 
    1292           5 : bool ObjectStore::_cmdPush( ICommand& command )
    1293             : {
    1294           5 :     LB_TS_THREAD( _commandThread );
    1295             : 
    1296           5 :     const uint128_t& objectID = command.get< uint128_t >();
    1297           5 :     const uint128_t& groupID = command.get< uint128_t >();
    1298           5 :     const uint128_t& typeID = command.get< uint128_t >();
    1299             : 
    1300           5 :     ObjectDataIStream* is = _pushData.pull( objectID );
    1301             : 
    1302           5 :     _localNode->objectPush( groupID, typeID, objectID, *is );
    1303           5 :     _pushData.recycle( is );
    1304           5 :     return true;
    1305             : }
    1306             : 
    1307           0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
    1308             : {
    1309           0 :     if( !objectStore )
    1310             :     {
    1311           0 :         os << "NULL objectStore";
    1312           0 :         return os;
    1313             :     }
    1314             : 
    1315           0 :     os << "objectStore (" << (void*)objectStore << ")";
    1316             : 
    1317           0 :     return os;
    1318             : }
    1319          66 : }

Generated by: LCOV version 1.11