LCOV - code coverage report
Current view: top level - co - objectStore.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 536 743 72.1 %
Date: 2018-01-09 16:37:03 Functions: 42 48 87.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "objectStore.h"
      23             : 
      24             : #include "connection.h"
      25             : #include "connectionDescription.h"
      26             : #include "global.h"
      27             : #include "instanceCache.h"
      28             : #include "log.h"
      29             : #include "masterCMCommand.h"
      30             : #include "nodeCommand.h"
      31             : #include "oCommand.h"
      32             : #include "objectCM.h"
      33             : #include "objectCommand.h"
      34             : #include "objectDataICommand.h"
      35             : #include "objectDataIStream.h"
      36             : 
      37             : #include <lunchbox/futureFunction.h>
      38             : #include <lunchbox/scopedMutex.h>
      39             : 
      40             : #include <boost/bind.hpp>
      41             : 
      42             : #include <limits>
      43             : 
      44             : //#define DEBUG_DISPATCH
      45             : #ifdef DEBUG_DISPATCH
      46             : #include <set>
      47             : #endif
      48             : 
      49             : namespace co
      50             : {
      51             : typedef CommandFunc<ObjectStore> CmdFunc;
      52             : typedef lunchbox::FutureFunction<bool> FuturebImpl;
      53             : 
      54          53 : ObjectStore::ObjectStore(LocalNode* localNode, a_ssize_t* counters)
      55             :     : _localNode(localNode)
      56             :     , _instanceIDs(-0x7FFFFFFF)
      57             :     , _instanceCache(new InstanceCache(
      58          53 :           Global::getIAttribute(Global::IATTR_INSTANCE_CACHE_SIZE) * LB_1MB))
      59         106 :     , _counters(counters)
      60             : {
      61          53 :     LBASSERT(localNode);
      62          53 :     CommandQueue* queue = localNode->getCommandThreadQueue();
      63             : 
      64             :     localNode->_registerCommand(CMD_NODE_FIND_MASTER_NODE_ID,
      65         106 :                                 CmdFunc(this,
      66             :                                         &ObjectStore::_cmdFindMasterNodeID),
      67          53 :                                 queue);
      68             :     localNode->_registerCommand(
      69             :         CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
      70          53 :         CmdFunc(this, &ObjectStore::_cmdFindMasterNodeIDReply), 0);
      71             :     localNode->_registerCommand(CMD_NODE_ATTACH_OBJECT,
      72          53 :                                 CmdFunc(this, &ObjectStore::_cmdAttach), 0);
      73             :     localNode->_registerCommand(CMD_NODE_DETACH_OBJECT,
      74          53 :                                 CmdFunc(this, &ObjectStore::_cmdDetach), 0);
      75             :     localNode->_registerCommand(CMD_NODE_REGISTER_OBJECT,
      76         106 :                                 CmdFunc(this, &ObjectStore::_cmdRegister),
      77          53 :                                 queue);
      78             :     localNode->_registerCommand(CMD_NODE_DEREGISTER_OBJECT,
      79         106 :                                 CmdFunc(this, &ObjectStore::_cmdDeregister),
      80          53 :                                 queue);
      81             :     localNode->_registerCommand(CMD_NODE_MAP_OBJECT,
      82          53 :                                 CmdFunc(this, &ObjectStore::_cmdMap), queue);
      83             :     localNode->_registerCommand(CMD_NODE_MAP_OBJECT_SUCCESS,
      84          53 :                                 CmdFunc(this, &ObjectStore::_cmdMapSuccess), 0);
      85             :     localNode->_registerCommand(CMD_NODE_MAP_OBJECT_REPLY,
      86          53 :                                 CmdFunc(this, &ObjectStore::_cmdMapReply), 0);
      87             :     localNode->_registerCommand(CMD_NODE_UNMAP_OBJECT,
      88          53 :                                 CmdFunc(this, &ObjectStore::_cmdUnmap), 0);
      89             :     localNode->_registerCommand(CMD_NODE_UNSUBSCRIBE_OBJECT,
      90         106 :                                 CmdFunc(this, &ObjectStore::_cmdUnsubscribe),
      91          53 :                                 queue);
      92             :     localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE,
      93          53 :                                 CmdFunc(this, &ObjectStore::_cmdInstance), 0);
      94             :     localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_MAP,
      95          53 :                                 CmdFunc(this, &ObjectStore::_cmdInstance), 0);
      96             :     localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_COMMIT,
      97          53 :                                 CmdFunc(this, &ObjectStore::_cmdInstance), 0);
      98             :     localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_PUSH,
      99          53 :                                 CmdFunc(this, &ObjectStore::_cmdInstance), 0);
     100             :     localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_SYNC,
     101          53 :                                 CmdFunc(this, &ObjectStore::_cmdInstance), 0);
     102             :     localNode->_registerCommand(
     103             :         CMD_NODE_DISABLE_SEND_ON_REGISTER,
     104          53 :         CmdFunc(this, &ObjectStore::_cmdDisableSendOnRegister), queue);
     105             :     localNode->_registerCommand(CMD_NODE_REMOVE_NODE,
     106         106 :                                 CmdFunc(this, &ObjectStore::_cmdRemoveNode),
     107          53 :                                 queue);
     108             :     localNode->_registerCommand(CMD_NODE_OBJECT_PUSH,
     109          53 :                                 CmdFunc(this, &ObjectStore::_cmdPush), queue);
     110             :     localNode->_registerCommand(CMD_NODE_SYNC_OBJECT,
     111          53 :                                 CmdFunc(this, &ObjectStore::_cmdSync), queue);
     112             :     localNode->_registerCommand(CMD_NODE_SYNC_OBJECT_REPLY,
     113          53 :                                 CmdFunc(this, &ObjectStore::_cmdSyncReply), 0);
     114          53 : }
     115             : 
     116         153 : ObjectStore::~ObjectStore()
     117             : {
     118          51 :     LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
     119             : 
     120             : #ifndef NDEBUG
     121          51 :     if (!_objects->empty())
     122             :     {
     123           0 :         LBWARN << _objects->size() << " attached objects in destructor"
     124           0 :                << std::endl;
     125             : 
     126           0 :         for (ObjectsHash::const_iterator i = _objects->begin();
     127           0 :              i != _objects->end(); ++i)
     128             :         {
     129           0 :             const Objects& objects = i->second;
     130           0 :             LBWARN << "  " << objects.size() << " objects with id " << i->first
     131           0 :                    << std::endl;
     132             : 
     133           0 :             for (Objects::const_iterator j = objects.begin();
     134           0 :                  j != objects.end(); ++j)
     135             :             {
     136           0 :                 const Object* object = *j;
     137           0 :                 LBINFO << "    object type " << lunchbox::className(object)
     138           0 :                        << std::endl;
     139             :             }
     140             :         }
     141             :     }
     142             : // LBASSERT( _objects->empty( ))
     143             : #endif
     144          51 :     clear();
     145          51 :     delete _instanceCache;
     146          51 :     _instanceCache = 0;
     147         102 : }
     148             : 
     149          98 : void ObjectStore::clear()
     150             : {
     151          98 :     LBASSERT(_objects->empty());
     152          98 :     expireInstanceData(0);
     153          98 :     LBASSERT(!_instanceCache || _instanceCache->isEmpty());
     154             : 
     155          98 :     _objects->clear();
     156          98 :     _sendQueue.clear();
     157          98 : }
     158             : 
     159           0 : void ObjectStore::disableInstanceCache()
     160             : {
     161           0 :     LBASSERT(_localNode->isClosed());
     162           0 :     delete _instanceCache;
     163           0 :     _instanceCache = 0;
     164           0 : }
     165             : 
     166          98 : void ObjectStore::expireInstanceData(const int64_t age)
     167             : {
     168          98 :     if (_instanceCache)
     169          98 :         _instanceCache->expire(age);
     170          98 : }
     171             : 
     172         111 : void ObjectStore::removeInstanceData(const NodeID& nodeID)
     173             : {
     174         111 :     if (_instanceCache)
     175         111 :         _instanceCache->remove(nodeID);
     176         111 : }
     177             : 
     178           0 : void ObjectStore::enableSendOnRegister()
     179             : {
     180           0 :     ++_sendOnRegister;
     181           0 : }
     182             : 
     183           0 : void ObjectStore::disableSendOnRegister()
     184             : {
     185           0 :     if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
     186             :     {
     187           0 :         lunchbox::Request<void> request = _localNode->registerRequest<void>();
     188           0 :         _localNode->send(CMD_NODE_DISABLE_SEND_ON_REGISTER) << request;
     189             :     }
     190             :     else // OPT
     191           0 :         --_sendOnRegister;
     192           0 : }
     193             : 
     194             : //---------------------------------------------------------------------------
     195             : // identifier master node mapping
     196             : //---------------------------------------------------------------------------
     197          34 : NodeID ObjectStore::findMasterNodeID(const uint128_t& identifier)
     198             : {
     199          34 :     LB_TS_NOT_THREAD(_commandThread);
     200             : 
     201             :     // OPT: look up locally first?
     202          68 :     const Nodes& nodes = _localNode->getNodes();
     203             : 
     204             :     // OPT: send to multiple nodes at once?
     205          34 :     for (NodePtr node : nodes)
     206             :     {
     207             :         lunchbox::Request<NodeID> request =
     208          34 :             _localNode->registerRequest<NodeID>();
     209             : 
     210          68 :         LBLOG(LOG_OBJECTS) << "Finding " << identifier << " on " << node
     211         102 :                            << " req " << request.getID() << std::endl;
     212          34 :         node->send(CMD_NODE_FIND_MASTER_NODE_ID) << identifier << request;
     213             : 
     214             :         try
     215             :         {
     216          34 :             const NodeID& masterNodeID = request.wait(Global::getTimeout());
     217             : 
     218          34 :             if (masterNodeID != 0)
     219             :             {
     220          34 :                 LBLOG(LOG_OBJECTS) << "Found " << identifier << " on "
     221          34 :                                    << masterNodeID << std::endl;
     222          34 :                 return masterNodeID;
     223             :             }
     224             :         }
     225           0 :         catch (const lunchbox::FutureTimeout&)
     226             :         {
     227           0 :             _localNode->unregisterRequest(request.getID());
     228           0 :             throw;
     229             :         }
     230             :     }
     231             : 
     232           0 :     return NodeID();
     233             : }
     234             : 
     235             : //---------------------------------------------------------------------------
     236             : // object mapping
     237             : //---------------------------------------------------------------------------
     238          15 : void ObjectStore::attach(Object* object, const uint128_t& id,
     239             :                          const uint32_t instanceID)
     240             : {
     241          15 :     LBASSERT(object);
     242          15 :     LB_TS_NOT_THREAD(_receiverThread);
     243             : 
     244          30 :     lunchbox::Request<void> request = _localNode->registerRequest<void>(object);
     245          15 :     _localNode->send(CMD_NODE_ATTACH_OBJECT) << id << instanceID << request;
     246          15 : }
     247             : 
     248             : namespace
     249             : {
     250          51 : uint32_t _genNextID(lunchbox::a_int32_t& val)
     251             : {
     252             :     uint32_t result;
     253           0 :     do
     254             :     {
     255          51 :         const long id = ++val;
     256          51 :         result = static_cast<uint32_t>(static_cast<int64_t>(id) + 0x7FFFFFFFu);
     257          51 :     } while (result > CO_INSTANCE_MAX);
     258             : 
     259          51 :     return result;
     260             : }
     261             : }
     262             : 
     263          51 : void ObjectStore::_attach(Object* object, const uint128_t& id,
     264             :                           const uint32_t inInstanceID)
     265             : {
     266          51 :     LBASSERT(object);
     267          51 :     LB_TS_THREAD(_receiverThread);
     268             : 
     269          51 :     uint32_t instanceID = inInstanceID;
     270          51 :     if (inInstanceID == CO_INSTANCE_INVALID)
     271          15 :         instanceID = _genNextID(_instanceIDs);
     272             : 
     273          51 :     object->attach(id, instanceID);
     274             : 
     275             :     {
     276         102 :         lunchbox::ScopedFastWrite mutex(_objects);
     277          51 :         Objects& objects = _objects.data[id];
     278          51 :         LBASSERTINFO(!object->isMaster() || objects.empty(),
     279             :                      "Attaching master "
     280             :                          << *object << ", " << objects.size()
     281             :                          << " attached objects with same ID, "
     282             :                          << "first is "
     283             :                          << (objects[0]->isMaster() ? "master " : "slave ")
     284             :                          << *objects[0]);
     285          51 :         objects.push_back(object);
     286             :     }
     287             : 
     288          51 :     _localNode->flushCommands(); // redispatch pending commands
     289             : 
     290          51 :     LBLOG(LOG_OBJECTS) << "attached " << *object << " @"
     291          51 :                        << static_cast<void*>(object) << std::endl;
     292          51 : }
     293             : 
     294          17 : void ObjectStore::detach(Object* object)
     295             : {
     296          17 :     LBASSERT(object);
     297          17 :     LB_TS_NOT_THREAD(_receiverThread);
     298             : 
     299          34 :     lunchbox::Request<void> request = _localNode->registerRequest<void>();
     300          34 :     _localNode->send(CMD_NODE_DETACH_OBJECT)
     301          51 :         << object->getID() << object->getInstanceID() << request;
     302          17 : }
     303             : 
     304           0 : void ObjectStore::swap(Object* oldObject, Object* newObject)
     305             : {
     306           0 :     LBASSERT(newObject);
     307           0 :     LBASSERT(oldObject);
     308           0 :     LBASSERT(oldObject->isMaster());
     309           0 :     LB_TS_THREAD(_receiverThread);
     310             : 
     311           0 :     if (!oldObject->isAttached())
     312           0 :         return;
     313             : 
     314           0 :     LBLOG(LOG_OBJECTS) << "Swap " << lunchbox::className(oldObject)
     315           0 :                        << std::endl;
     316           0 :     const uint128_t& id = oldObject->getID();
     317             : 
     318           0 :     lunchbox::ScopedFastWrite mutex(_objects);
     319           0 :     ObjectsHash::iterator i = _objects->find(id);
     320           0 :     LBASSERT(i != _objects->end());
     321           0 :     if (i == _objects->end())
     322           0 :         return;
     323             : 
     324           0 :     Objects& objects = i->second;
     325           0 :     Objects::iterator j = find(objects.begin(), objects.end(), oldObject);
     326           0 :     LBASSERT(j != objects.end());
     327           0 :     if (j == objects.end())
     328           0 :         return;
     329             : 
     330           0 :     newObject->transfer(oldObject);
     331           0 :     *j = newObject;
     332             : }
     333             : 
     334          50 : void ObjectStore::_detach(Object* object)
     335             : {
     336             :     // check also _cmdUnmapObject when modifying!
     337          50 :     LBASSERT(object);
     338          50 :     LB_TS_THREAD(_receiverThread);
     339             : 
     340          50 :     if (!object->isAttached())
     341           0 :         return;
     342             : 
     343          50 :     const uint128_t& id = object->getID();
     344             : 
     345          50 :     LBASSERT(_objects->find(id) != _objects->end());
     346          50 :     LBLOG(LOG_OBJECTS) << "Detach " << *object << std::endl;
     347             : 
     348          50 :     Objects& objects = _objects.data[id];
     349          50 :     Objects::iterator i = find(objects.begin(), objects.end(), object);
     350          50 :     LBASSERT(i != objects.end());
     351             : 
     352             :     {
     353         100 :         lunchbox::ScopedFastWrite mutex(_objects);
     354          50 :         objects.erase(i);
     355          50 :         if (objects.empty())
     356          49 :             _objects->erase(id);
     357             :     }
     358             : 
     359          50 :     LBASSERT(object->getInstanceID() != CO_INSTANCE_INVALID);
     360          50 :     object->detach();
     361          50 :     return;
     362             : }
     363             : 
     364          36 : uint32_t ObjectStore::mapNB(Object* object, const uint128_t& id,
     365             :                             const uint128_t& version, NodePtr master)
     366             : {
     367          36 :     LB_TS_NOT_THREAD(_receiverThread);
     368          72 :     LBLOG(LOG_OBJECTS) << "Mapping " << lunchbox::className(object) << " to id "
     369         108 :                        << id << " version " << version << std::endl;
     370          36 :     LBASSERT(object);
     371          36 :     LBASSERTINFO(id.isUUID(), id);
     372             : 
     373          36 :     if (!master)
     374          34 :         master = _localNode->connectObjectMaster(id);
     375             : 
     376          36 :     if (!master || !master->isReachable())
     377             :     {
     378           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     379           0 :                << std::endl;
     380           0 :         return LB_UNDEFINED_UINT32;
     381             :     }
     382             : 
     383          36 :     if (!object || !id.isUUID())
     384             :     {
     385           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     386           0 :         return LB_UNDEFINED_UINT32;
     387             :     }
     388             : 
     389          36 :     const bool isAttached = object->isAttached();
     390          36 :     const bool isMaster = object->isMaster();
     391          36 :     LBASSERTINFO(!isAttached, *object);
     392          36 :     LBASSERT(!isMaster);
     393          36 :     if (isAttached || isMaster)
     394             :     {
     395           0 :         LBWARN << "Invalid object state: attached " << isAttached << " master "
     396           0 :                << isMaster << std::endl;
     397           0 :         return LB_UNDEFINED_UINT32;
     398             :     }
     399             : 
     400          36 :     const uint32_t request = _localNode->registerRequest(object);
     401          36 :     uint128_t minCachedVersion = VERSION_HEAD;
     402          36 :     uint128_t maxCachedVersion = VERSION_NONE;
     403          36 :     uint32_t masterInstanceID = 0;
     404             :     const bool useCache =
     405          36 :         _checkInstanceCache(id, minCachedVersion, maxCachedVersion,
     406          36 :                             masterInstanceID);
     407          36 :     object->notifyAttach();
     408          72 :     master->send(CMD_NODE_MAP_OBJECT)
     409          36 :         << version << minCachedVersion << maxCachedVersion << id
     410         108 :         << object->getMaxVersions() << request << _genNextID(_instanceIDs)
     411          36 :         << masterInstanceID << useCache;
     412          36 :     return request;
     413             : }
     414             : 
     415          40 : bool ObjectStore::_checkInstanceCache(const uint128_t& id, uint128_t& from,
     416             :                                       uint128_t& to, uint32_t& instanceID)
     417             : {
     418          40 :     if (!_instanceCache)
     419           0 :         return false;
     420             : 
     421          40 :     const InstanceCache::Data& cached = (*_instanceCache)[id];
     422          40 :     if (cached == InstanceCache::Data::NONE)
     423          40 :         return false;
     424             : 
     425           0 :     const ObjectDataIStreamDeque& versions = cached.versions;
     426           0 :     LBASSERT(!cached.versions.empty());
     427           0 :     instanceID = cached.masterInstanceID;
     428           0 :     from = versions.front()->getVersion();
     429           0 :     to = versions.back()->getVersion();
     430           0 :     LBLOG(LOG_OBJECTS) << "Object " << id << " have v" << from << ".." << to
     431           0 :                        << std::endl;
     432           0 :     return true;
     433             : }
     434             : 
     435          36 : bool ObjectStore::mapSync(const uint32_t requestID)
     436             : {
     437          36 :     if (requestID == LB_UNDEFINED_UINT32)
     438           0 :         return false;
     439             : 
     440          36 :     void* data = _localNode->getRequestData(requestID);
     441          36 :     if (data == 0)
     442           0 :         return false;
     443             : 
     444          36 :     Object* object = LBSAFECAST(Object*, data);
     445          36 :     uint128_t version = VERSION_NONE;
     446          36 :     _localNode->waitRequest(requestID, version);
     447             : 
     448          36 :     const bool mapped = object->isAttached();
     449          36 :     if (mapped)
     450          36 :         object->applyMapData(version); // apply initial instance data
     451             : 
     452          36 :     object->notifyAttached();
     453          36 :     LBLOG(LOG_OBJECTS) << "Mapped " << lunchbox::className(object) << std::endl;
     454          36 :     return mapped;
     455             : }
     456             : 
     457           4 : f_bool_t ObjectStore::sync(Object* object, const uint128_t& id, NodePtr master,
     458             :                            const uint32_t instanceID)
     459             : {
     460           4 :     const uint32_t request = _startSync(object, id, master, instanceID);
     461             :     const FuturebImpl::Func& func =
     462           8 :         boost::bind(&ObjectStore::_finishSync, this, request, object);
     463           8 :     return f_bool_t(new FuturebImpl(func));
     464             : }
     465             : 
     466           4 : uint32_t ObjectStore::_startSync(Object* object, const uint128_t& id,
     467             :                                  NodePtr master, const uint32_t instanceID)
     468             : {
     469           4 :     LB_TS_NOT_THREAD(_receiverThread);
     470           8 :     LBLOG(LOG_OBJECTS) << "Syncing " << lunchbox::className(object)
     471          12 :                        << " with id " << id << std::endl;
     472           4 :     LBASSERT(object);
     473           4 :     LBASSERTINFO(id.isUUID(), id);
     474             : 
     475           4 :     if (!object || !id.isUUID())
     476             :     {
     477           0 :         LBWARN << "Invalid object " << object << " or id " << id << std::endl;
     478           0 :         return LB_UNDEFINED_UINT32;
     479             :     }
     480             : 
     481           4 :     if (!master)
     482           0 :         master = _localNode->connectObjectMaster(id);
     483             : 
     484           4 :     if (!master || !master->isReachable())
     485             :     {
     486           0 :         LBWARN << "Mapping of object " << id << " failed, invalid master node"
     487           0 :                << std::endl;
     488           0 :         return LB_UNDEFINED_UINT32;
     489             :     }
     490             : 
     491           4 :     const uint32_t request = _localNode->registerRequest(new ObjectDataIStream);
     492           4 :     uint128_t minCachedVersion = VERSION_HEAD;
     493           4 :     uint128_t maxCachedVersion = VERSION_NONE;
     494           4 :     uint32_t cacheInstanceID = 0;
     495             : 
     496           4 :     bool useCache = _checkInstanceCache(id, minCachedVersion, maxCachedVersion,
     497           4 :                                         cacheInstanceID);
     498           4 :     if (useCache)
     499             :     {
     500           0 :         switch (instanceID)
     501             :         {
     502             :         case CO_INSTANCE_ALL:
     503           0 :             break;
     504             :         default:
     505           0 :             if (instanceID == cacheInstanceID)
     506           0 :                 break;
     507             : 
     508           0 :             useCache = false;
     509           0 :             LBCHECK(_instanceCache->release(id, 1));
     510           0 :             break;
     511             :         }
     512             :     }
     513             : 
     514             :     // Use stream expected by MasterCMCommand
     515           8 :     master->send(CMD_NODE_SYNC_OBJECT)
     516           4 :         << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
     517          12 :         << uint64_t(0) /* maxVersions */ << request << instanceID
     518           4 :         << cacheInstanceID << useCache;
     519           4 :     return request;
     520             : }
     521             : 
     522           4 : bool ObjectStore::_finishSync(const uint32_t requestID, Object* object)
     523             : {
     524           4 :     if (requestID == LB_UNDEFINED_UINT32)
     525           0 :         return false;
     526             : 
     527           4 :     void* data = _localNode->getRequestData(requestID);
     528           4 :     if (data == 0)
     529           0 :         return false;
     530             : 
     531           4 :     ObjectDataIStream* is = LBSAFECAST(ObjectDataIStream*, data);
     532             : 
     533           4 :     bool ok = false;
     534           4 :     _localNode->waitRequest(requestID, ok);
     535             : 
     536           4 :     if (!ok)
     537             :     {
     538           0 :         LBWARN << "Object synchronization failed" << std::endl;
     539           0 :         delete is;
     540           0 :         return false;
     541             :     }
     542             : 
     543           4 :     is->waitReady();
     544           4 :     object->applyInstanceData(*is);
     545           4 :     LBLOG(LOG_OBJECTS) << "Synced " << lunchbox::className(object) << std::endl;
     546           4 :     delete is;
     547           4 :     return true;
     548             : }
     549             : 
     550          36 : void ObjectStore::unmap(Object* object)
     551             : {
     552          36 :     LBASSERT(object);
     553          36 :     if (!object->isAttached()) // not registered
     554          35 :         return;
     555             : 
     556          35 :     const uint128_t& id = object->getID();
     557             : 
     558          35 :     LBLOG(LOG_OBJECTS) << "Unmap " << object << std::endl;
     559             : 
     560          35 :     object->notifyDetach();
     561             : 
     562             :     // send unsubscribe to master, master will send detach command.
     563          35 :     LBASSERT(!object->isMaster());
     564          35 :     LB_TS_NOT_THREAD(_commandThread);
     565             : 
     566          35 :     const uint32_t masterInstanceID = object->getMasterInstanceID();
     567          35 :     if (masterInstanceID != CO_INSTANCE_INVALID)
     568             :     {
     569          33 :         NodePtr master = object->getMasterNode();
     570          33 :         LBASSERT(master)
     571             : 
     572          33 :         if (master && master->isReachable())
     573             :         {
     574             :             lunchbox::Request<void> request =
     575          66 :                 _localNode->registerRequest<void>();
     576          66 :             master->send(CMD_NODE_UNSUBSCRIBE_OBJECT)
     577          99 :                 << id << request << masterInstanceID << object->getInstanceID();
     578          33 :             request.wait();
     579          33 :             object->notifyDetached();
     580          33 :             return;
     581             :         }
     582           0 :         LBERROR << "Master node for object id " << id << " not connected"
     583           0 :                 << std::endl;
     584             :     }
     585             : 
     586             :     // no unsubscribe sent: Detach directly
     587           2 :     detach(object);
     588           2 :     object->setupChangeManager(Object::NONE, false, 0, CO_INSTANCE_INVALID);
     589           2 :     object->notifyDetached();
     590             : }
     591             : 
     592          15 : bool ObjectStore::register_(Object* object)
     593             : {
     594          15 :     LBASSERT(object);
     595          15 :     LBASSERT(!object->isAttached());
     596             : 
     597          15 :     const uint128_t& id = object->getID();
     598          15 :     LBASSERTINFO(id.isUUID(), id);
     599             : 
     600          15 :     object->notifyAttach();
     601          30 :     object->setupChangeManager(object->getChangeType(), true, _localNode,
     602          15 :                                CO_INSTANCE_INVALID);
     603          15 :     attach(object, id, CO_INSTANCE_INVALID);
     604             : 
     605          15 :     if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
     606          15 :         _localNode->send(CMD_NODE_REGISTER_OBJECT) << object;
     607             : 
     608          15 :     object->notifyAttached();
     609             : 
     610          15 :     LBLOG(LOG_OBJECTS) << "Registered " << object << std::endl;
     611          15 :     return true;
     612             : }
     613             : 
     614          15 : void ObjectStore::deregister(Object* object)
     615             : {
     616          15 :     LBASSERT(object);
     617          15 :     if (!object->isAttached()) // not registered
     618           0 :         return;
     619             : 
     620          15 :     LBLOG(LOG_OBJECTS) << "Deregister " << *object << std::endl;
     621          15 :     LBASSERT(object->isMaster());
     622             : 
     623          15 :     object->notifyDetach();
     624             : 
     625          15 :     if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
     626             :     {
     627             :         // remove from send queue
     628          30 :         lunchbox::Request<void> request = _localNode->registerRequest<void>();
     629          15 :         _localNode->send(CMD_NODE_DEREGISTER_OBJECT) << request;
     630             :     }
     631             : 
     632          15 :     const uint128_t id = object->getID();
     633          15 :     detach(object);
     634          15 :     object->setupChangeManager(Object::NONE, true, 0, CO_INSTANCE_INVALID);
     635          15 :     if (_instanceCache)
     636          15 :         _instanceCache->erase(id);
     637          15 :     object->notifyDetached();
     638             : }
     639             : 
     640       51502 : bool ObjectStore::notifyCommandThreadIdle()
     641             : {
     642       51502 :     LB_TS_THREAD(_commandThread);
     643       51502 :     if (_sendQueue.empty())
     644       51502 :         return false;
     645             : 
     646           0 :     LBASSERT(_sendOnRegister > 0);
     647           0 :     SendQueueItem& item = _sendQueue.front();
     648             : 
     649           0 :     if (item.age > _localNode->getTime64())
     650             :     {
     651           0 :         const Nodes& nodes = _localNode->getNodes(false);
     652           0 :         if (nodes.empty())
     653             :         {
     654           0 :             lunchbox::Thread::yield();
     655           0 :             return !_sendQueue.empty();
     656             :         }
     657             : 
     658           0 :         item.object->sendInstanceData(nodes);
     659             :     }
     660           0 :     _sendQueue.pop_front();
     661           0 :     return !_sendQueue.empty();
     662             : }
     663             : 
     664           7 : void ObjectStore::removeNode(NodePtr node)
     665             : {
     666          14 :     lunchbox::Request<void> request = _localNode->registerRequest<void>();
     667           7 :     _localNode->send(CMD_NODE_REMOVE_NODE) << node.get() << request;
     668           7 : }
     669             : 
     670             : //===========================================================================
     671             : // ICommand handling
     672             : //===========================================================================
     673         541 : bool ObjectStore::dispatchObjectCommand(ICommand& cmd)
     674             : {
     675         541 :     LB_TS_THREAD(_receiverThread);
     676        1082 :     ObjectICommand command(cmd);
     677         541 :     const uint128_t& id = command.getObjectID();
     678         541 :     const uint32_t instanceID = command.getInstanceID();
     679             : 
     680         541 :     ObjectsHash::const_iterator i = _objects->find(id);
     681             : 
     682         541 :     if (i == _objects->end())
     683             :         // When the instance ID is set to none, we only care about the command
     684             :         // when we have an object of the given ID (multicast)
     685           0 :         return (instanceID == CO_INSTANCE_NONE);
     686             : 
     687         541 :     const Objects& objects = i->second;
     688         541 :     LBASSERTINFO(!objects.empty(), command);
     689             : 
     690         541 :     if (instanceID <= CO_INSTANCE_MAX)
     691             :     {
     692          54 :         for (Objects::const_iterator j = objects.begin(); j != objects.end();
     693             :              ++j)
     694             :         {
     695          54 :             Object* object = *j;
     696          54 :             if (instanceID == object->getInstanceID())
     697             :             {
     698          48 :                 LBCHECK(object->dispatchCommand(command));
     699          48 :                 return true;
     700             :             }
     701             :         }
     702           0 :         LBERROR << "Can't find object instance " << instanceID << " for "
     703           0 :                 << command << std::endl;
     704           0 :         LBUNREACHABLE;
     705           0 :         return false;
     706             :     }
     707             : 
     708         493 :     Objects::const_iterator j = objects.begin();
     709         493 :     Object* object = *j;
     710         493 :     LBCHECK(object->dispatchCommand(command));
     711             : 
     712         493 :     for (++j; j != objects.end(); ++j)
     713             :     {
     714           0 :         object = *j;
     715           0 :         LBCHECK(object->dispatchCommand(command));
     716             :     }
     717         493 :     return true;
     718             : }
     719             : 
     720          34 : bool ObjectStore::_cmdFindMasterNodeID(ICommand& command)
     721             : {
     722          34 :     LB_TS_THREAD(_commandThread);
     723             : 
     724          34 :     const uint128_t& id = command.get<uint128_t>();
     725          34 :     const uint32_t requestID = command.get<uint32_t>();
     726          34 :     LBASSERT(id.isUUID());
     727             : 
     728          34 :     NodeID masterNodeID;
     729             :     {
     730          68 :         lunchbox::ScopedFastRead mutex(_objects);
     731          34 :         ObjectsHashCIter i = _objects->find(id);
     732             : 
     733          34 :         if (i != _objects->end())
     734             :         {
     735          34 :             const Objects& objects = i->second;
     736          34 :             LBASSERT(!objects.empty());
     737             : 
     738          34 :             for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
     739             :             {
     740          34 :                 Object* object = *j;
     741          34 :                 if (object->isMaster())
     742          34 :                     masterNodeID = _localNode->getNodeID();
     743             :                 else
     744             :                 {
     745           0 :                     NodePtr master = object->getMasterNode();
     746           0 :                     if (master.isValid())
     747           0 :                         masterNodeID = master->getNodeID();
     748             :                 }
     749          34 :                 if (masterNodeID != 0)
     750          34 :                     break;
     751             :             }
     752             :         }
     753             :     }
     754             : 
     755          34 :     LBLOG(LOG_OBJECTS) << "Object " << id << " master " << masterNodeID
     756          34 :                        << " req " << requestID << std::endl;
     757          68 :     command.getNode()->send(CMD_NODE_FIND_MASTER_NODE_ID_REPLY) << masterNodeID
     758          34 :                                                                 << requestID;
     759          34 :     return true;
     760             : }
     761             : 
     762          34 : bool ObjectStore::_cmdFindMasterNodeIDReply(ICommand& command)
     763             : {
     764          34 :     const NodeID& masterNodeID = command.get<NodeID>();
     765          34 :     const uint32_t requestID = command.get<uint32_t>();
     766          34 :     _localNode->serveRequest(requestID, masterNodeID);
     767          34 :     return true;
     768             : }
     769             : 
     770          15 : bool ObjectStore::_cmdAttach(ICommand& command)
     771             : {
     772          15 :     LB_TS_THREAD(_receiverThread);
     773          15 :     LBLOG(LOG_OBJECTS) << "Cmd attach object " << command << std::endl;
     774             : 
     775          15 :     const uint128_t& objectID = command.get<uint128_t>();
     776          15 :     const uint32_t instanceID = command.get<uint32_t>();
     777          15 :     const uint32_t requestID = command.get<uint32_t>();
     778             : 
     779             :     Object* object =
     780          15 :         static_cast<Object*>(_localNode->getRequestData(requestID));
     781          15 :     _attach(object, objectID, instanceID);
     782          15 :     _localNode->serveRequest(requestID);
     783          15 :     return true;
     784             : }
     785             : 
     786          50 : bool ObjectStore::_cmdDetach(ICommand& command)
     787             : {
     788          50 :     LB_TS_THREAD(_receiverThread);
     789          50 :     LBLOG(LOG_OBJECTS) << "Cmd detach object " << command << std::endl;
     790             : 
     791          50 :     const uint128_t& objectID = command.get<uint128_t>();
     792          50 :     const uint32_t instanceID = command.get<uint32_t>();
     793          50 :     const uint32_t requestID = command.get<uint32_t>();
     794             : 
     795          50 :     ObjectsHash::const_iterator i = _objects->find(objectID);
     796          50 :     if (i != _objects->end())
     797             :     {
     798          50 :         const Objects& objects = i->second;
     799             : 
     800          51 :         for (Objects::const_iterator j = objects.begin(); j != objects.end();
     801             :              ++j)
     802             :         {
     803          51 :             Object* object = *j;
     804          51 :             if (object->getInstanceID() == instanceID)
     805             :             {
     806          50 :                 _detach(object);
     807          50 :                 break;
     808             :             }
     809             :         }
     810             :     }
     811             : 
     812          50 :     LBASSERT(requestID != LB_UNDEFINED_UINT32);
     813          50 :     _localNode->serveRequest(requestID);
     814          50 :     return true;
     815             : }
     816             : 
     817          15 : bool ObjectStore::_cmdRegister(ICommand& command)
     818             : {
     819          15 :     LB_TS_THREAD(_commandThread);
     820          15 :     if (_sendOnRegister <= 0)
     821          15 :         return true;
     822             : 
     823           0 :     LBLOG(LOG_OBJECTS) << "Cmd register object " << command << std::endl;
     824             : 
     825           0 :     Object* object = reinterpret_cast<Object*>(command.get<void*>());
     826             : 
     827             :     const int32_t age =
     828           0 :         Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_AGE);
     829           0 :     SendQueueItem item;
     830           0 :     item.age = age ? age + _localNode->getTime64()
     831             :                    : std::numeric_limits<int64_t>::max();
     832           0 :     item.object = object;
     833           0 :     _sendQueue.push_back(item);
     834             : 
     835             :     const uint32_t size =
     836           0 :         Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE);
     837           0 :     while (_sendQueue.size() > size)
     838           0 :         _sendQueue.pop_front();
     839             : 
     840           0 :     return true;
     841             : }
     842             : 
     843          15 : bool ObjectStore::_cmdDeregister(ICommand& command)
     844             : {
     845          15 :     LB_TS_THREAD(_commandThread);
     846          15 :     LBLOG(LOG_OBJECTS) << "Cmd deregister object " << command << std::endl;
     847             : 
     848          15 :     const uint32_t requestID = command.get<uint32_t>();
     849             : 
     850          15 :     const void* object = _localNode->getRequestData(requestID);
     851             : 
     852          15 :     for (SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i)
     853             :     {
     854           0 :         if (i->object == object)
     855             :         {
     856           0 :             _sendQueue.erase(i);
     857           0 :             break;
     858             :         }
     859             :     }
     860             : 
     861          15 :     _localNode->serveRequest(requestID);
     862          15 :     return true;
     863             : }
     864             : 
     865          36 : bool ObjectStore::_cmdMap(ICommand& cmd)
     866             : {
     867          36 :     LB_TS_THREAD(_commandThread);
     868             : 
     869          72 :     MasterCMCommand command(cmd);
     870          36 :     const uint128_t& id = command.getObjectID();
     871             : 
     872          36 :     LBLOG(LOG_OBJECTS) << "Cmd map object " << command << " id " << id << "."
     873           0 :                        << command.getInstanceID() << " req "
     874          36 :                        << command.getRequestID() << std::endl;
     875             : 
     876          72 :     ObjectCMPtr masterCM;
     877             :     {
     878          72 :         lunchbox::ScopedFastRead mutex(_objects);
     879          36 :         ObjectsHash::const_iterator i = _objects->find(id);
     880          36 :         if (i != _objects->end())
     881             :         {
     882          36 :             const Objects& objects = i->second;
     883             : 
     884          36 :             for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
     885             :             {
     886          36 :                 Object* object = *j;
     887          36 :                 if (object->isMaster())
     888             :                 {
     889          36 :                     masterCM = object->_getChangeManager();
     890          36 :                     break;
     891             :                 }
     892             :             }
     893             :         }
     894             :     }
     895             : 
     896          36 :     if (!masterCM || !masterCM->addSlave(command))
     897             :     {
     898           0 :         LBWARN << "Can't find master object to map " << id << std::endl;
     899           0 :         NodePtr node = command.getNode();
     900           0 :         node->send(CMD_NODE_MAP_OBJECT_REPLY)
     901           0 :             << node->getNodeID() << id << command.getRequestedVersion()
     902           0 :             << command.getRequestID() << false << command.useCache() << false;
     903             :     }
     904             : 
     905          36 :     ++_counters[LocalNode::COUNTER_MAP_OBJECT_REMOTE];
     906          72 :     return true;
     907             : }
     908             : 
     909          36 : bool ObjectStore::_cmdMapSuccess(ICommand& command)
     910             : {
     911          36 :     LB_TS_THREAD(_receiverThread);
     912             : 
     913          36 :     const uint128_t& nodeID = command.get<uint128_t>();
     914          36 :     const uint128_t& objectID = command.get<uint128_t>();
     915          36 :     const uint32_t requestID = command.get<uint32_t>();
     916          36 :     const uint32_t instanceID = command.get<uint32_t>();
     917          36 :     const Object::ChangeType changeType = command.get<Object::ChangeType>();
     918          36 :     const uint32_t masterInstanceID = command.get<uint32_t>();
     919             : 
     920             :     // Map success commands are potentially multicasted (see above)
     921             :     // verify that we are the intended receiver
     922          36 :     if (nodeID != _localNode->getNodeID())
     923           0 :         return true;
     924             : 
     925          36 :     LBLOG(LOG_OBJECTS) << "Cmd map object success " << command << " id "
     926           0 :                        << objectID << "." << instanceID << " req " << requestID
     927          36 :                        << std::endl;
     928             : 
     929             :     // set up change manager and attach object to dispatch table
     930             :     Object* object =
     931          36 :         static_cast<Object*>(_localNode->getRequestData(requestID));
     932          36 :     LBASSERT(object);
     933          36 :     LBASSERT(!object->isMaster());
     934             : 
     935          36 :     object->setupChangeManager(changeType, false, _localNode, masterInstanceID);
     936          36 :     _attach(object, objectID, instanceID);
     937          36 :     return true;
     938             : }
     939             : 
     940          36 : bool ObjectStore::_cmdMapReply(ICommand& command)
     941             : {
     942          36 :     LB_TS_THREAD(_receiverThread);
     943             : 
     944             :     // Map reply commands are potentially multicasted (see above)
     945             :     // verify that we are the intended receiver
     946          36 :     if (command.get<uint128_t>() != _localNode->getNodeID())
     947           0 :         return true;
     948             : 
     949          36 :     const uint128_t& objectID = command.get<uint128_t>();
     950          36 :     const uint128_t& version = command.get<uint128_t>();
     951          36 :     const uint32_t requestID = command.get<uint32_t>();
     952          36 :     const bool result = command.get<bool>();
     953          36 :     const bool releaseCache = command.get<bool>();
     954          36 :     const bool useCache = command.get<bool>();
     955             : 
     956          36 :     LBLOG(LOG_OBJECTS) << "Cmd map object reply " << command << " id "
     957          36 :                        << objectID << " req " << requestID << std::endl;
     958             : 
     959          36 :     LBASSERT(_localNode->getRequestData(requestID));
     960             : 
     961          36 :     if (result)
     962             :     {
     963             :         Object* object =
     964          36 :             static_cast<Object*>(_localNode->getRequestData(requestID));
     965          36 :         LBASSERT(object);
     966          36 :         LBASSERT(!object->isMaster());
     967             : 
     968          36 :         object->setMasterNode(command.getNode());
     969             : 
     970          36 :         if (useCache)
     971             :         {
     972           0 :             LBASSERT(releaseCache);
     973           0 :             LBASSERT(_instanceCache);
     974             : 
     975           0 :             const uint128_t& id = objectID;
     976           0 :             const InstanceCache::Data& cached = (*_instanceCache)[id];
     977           0 :             LBASSERT(cached != InstanceCache::Data::NONE);
     978           0 :             LBASSERT(!cached.versions.empty());
     979             : 
     980           0 :             object->addInstanceDatas(cached.versions, version);
     981           0 :             LBCHECK(_instanceCache->release(id, 2));
     982             :         }
     983          36 :         else if (releaseCache)
     984             :         {
     985           0 :             LBCHECK(_instanceCache->release(objectID, 1));
     986             :         }
     987             :     }
     988             :     else
     989             :     {
     990           0 :         if (releaseCache)
     991           0 :             _instanceCache->release(objectID, 1);
     992             : 
     993           0 :         LBWARN << "Could not map object " << objectID << std::endl;
     994             :     }
     995             : 
     996          36 :     _localNode->serveRequest(requestID, version);
     997          36 :     return true;
     998             : }
     999             : 
    1000          33 : bool ObjectStore::_cmdUnsubscribe(ICommand& command)
    1001             : {
    1002          33 :     LB_TS_THREAD(_commandThread);
    1003          33 :     LBLOG(LOG_OBJECTS) << "Cmd unsubscribe object  " << command << std::endl;
    1004             : 
    1005          33 :     const uint128_t& id = command.get<uint128_t>();
    1006          33 :     const uint32_t requestID = command.get<uint32_t>();
    1007          33 :     const uint32_t masterInstanceID = command.get<uint32_t>();
    1008          33 :     const uint32_t slaveInstanceID = command.get<uint32_t>();
    1009             : 
    1010          66 :     NodePtr node = command.getNode();
    1011             : 
    1012             :     {
    1013          66 :         lunchbox::ScopedFastWrite mutex(_objects);
    1014          33 :         ObjectsHash::const_iterator i = _objects->find(id);
    1015          33 :         if (i != _objects->end())
    1016             :         {
    1017          33 :             const Objects& objects = i->second;
    1018          33 :             for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
    1019             :             {
    1020          33 :                 Object* object = *j;
    1021          66 :                 if (object->isMaster() &&
    1022          33 :                     object->getInstanceID() == masterInstanceID)
    1023             :                 {
    1024          33 :                     object->removeSlave(node, slaveInstanceID);
    1025          33 :                     break;
    1026             :                 }
    1027             :             }
    1028             :         }
    1029             :     }
    1030             : 
    1031          33 :     node->send(CMD_NODE_DETACH_OBJECT) << id << slaveInstanceID << requestID;
    1032          66 :     return true;
    1033             : }
    1034             : 
    1035           1 : bool ObjectStore::_cmdUnmap(ICommand& command)
    1036             : {
    1037           1 :     LB_TS_THREAD(_receiverThread);
    1038           1 :     LBLOG(LOG_OBJECTS) << "Cmd unmap object " << command << std::endl;
    1039             : 
    1040           1 :     const uint128_t& objectID = command.get<uint128_t>();
    1041             : 
    1042           1 :     if (_instanceCache)
    1043           1 :         _instanceCache->erase(objectID);
    1044             : 
    1045           1 :     ObjectsHash::iterator i = _objects->find(objectID);
    1046           1 :     if (i == _objects->end()) // nothing to do
    1047           0 :         return true;
    1048             : 
    1049           2 :     const Objects objects = i->second;
    1050             :     {
    1051           2 :         lunchbox::ScopedFastWrite mutex(_objects);
    1052           1 :         _objects->erase(i);
    1053             :     }
    1054             : 
    1055           2 :     for (Objects::const_iterator j = objects.begin(); j != objects.end(); ++j)
    1056             :     {
    1057           1 :         Object* object = *j;
    1058           1 :         object->detach();
    1059             :     }
    1060             : 
    1061           1 :     return true;
    1062             : }
    1063             : 
    1064           4 : bool ObjectStore::_cmdSync(ICommand& cmd)
    1065             : {
    1066           4 :     LB_TS_THREAD(_commandThread);
    1067           8 :     MasterCMCommand command(cmd);
    1068           4 :     const uint128_t& id = command.getObjectID();
    1069           4 :     LBINFO << command.getNode() << std::endl;
    1070           4 :     LBLOG(LOG_OBJECTS) << "Cmd sync object id " << id << "."
    1071           0 :                        << command.getInstanceID() << " req "
    1072           4 :                        << command.getRequestID() << std::endl;
    1073             : 
    1074           4 :     const uint32_t cacheInstanceID = command.getMasterInstanceID();
    1075           8 :     ObjectCMPtr cm;
    1076             :     {
    1077           8 :         lunchbox::ScopedFastRead mutex(_objects);
    1078           4 :         ObjectsHash::const_iterator i = _objects->find(id);
    1079           4 :         if (i != _objects->end())
    1080             :         {
    1081           4 :             const Objects& objects = i->second;
    1082             : 
    1083           8 :             for (Object* object : objects)
    1084             :             {
    1085           4 :                 if (command.getInstanceID() == object->getInstanceID())
    1086             :                 {
    1087           0 :                     cm = object->_getChangeManager();
    1088           0 :                     LBASSERT(cm);
    1089           0 :                     break;
    1090             :                 }
    1091             : 
    1092           4 :                 if (command.getInstanceID() != CO_INSTANCE_ALL)
    1093           0 :                     continue;
    1094             : 
    1095           4 :                 cm = object->_getChangeManager();
    1096           4 :                 LBASSERT(cm);
    1097           4 :                 if (cacheInstanceID == object->getInstanceID())
    1098           0 :                     break;
    1099             :             }
    1100           4 :             if (!cm)
    1101           0 :                 LBWARN << "Can't find object to sync " << id << "."
    1102           0 :                        << command.getInstanceID() << " in " << objects.size()
    1103           0 :                        << " instances" << std::endl;
    1104             :         }
    1105           4 :         if (!cm)
    1106           0 :             LBWARN << "Can't find object to sync " << id
    1107           0 :                    << ", no object with identifier" << std::endl;
    1108             :     }
    1109           4 :     if (!cm || !cm->sendSync(command))
    1110             :     {
    1111           0 :         NodePtr node = command.getNode();
    1112           0 :         node->send(CMD_NODE_SYNC_OBJECT_REPLY)
    1113           0 :             << node->getNodeID() << id << command.getRequestID() << false
    1114           0 :             << command.useCache() << false;
    1115             :     }
    1116           8 :     return true;
    1117             : }
    1118             : 
    1119           4 : bool ObjectStore::_cmdSyncReply(ICommand& command)
    1120             : {
    1121           4 :     LB_TS_THREAD(_receiverThread);
    1122             : 
    1123             :     // Sync reply commands are potentially multicasted (see above)
    1124             :     // verify that we are the intended receiver
    1125           4 :     if (command.get<uint128_t>() != _localNode->getNodeID())
    1126           0 :         return true;
    1127             : 
    1128           4 :     const NodeID& id = command.get<NodeID>();
    1129           4 :     const uint32_t requestID = command.get<uint32_t>();
    1130           4 :     const bool result = command.get<bool>();
    1131           4 :     const bool releaseCache = command.get<bool>();
    1132           4 :     const bool useCache = command.get<bool>();
    1133           4 :     void* const data = _localNode->getRequestData(requestID);
    1134           4 :     ObjectDataIStream* const is = LBSAFECAST(ObjectDataIStream*, data);
    1135             : 
    1136           4 :     LBLOG(LOG_OBJECTS) << "Cmd sync object reply " << command << " req "
    1137           4 :                        << requestID << std::endl;
    1138           4 :     if (result)
    1139             :     {
    1140           4 :         if (useCache)
    1141             :         {
    1142           0 :             LBASSERT(releaseCache);
    1143           0 :             LBASSERT(_instanceCache);
    1144             : 
    1145           0 :             const InstanceCache::Data& cached = (*_instanceCache)[id];
    1146           0 :             LBASSERT(cached != InstanceCache::Data::NONE);
    1147           0 :             LBASSERT(!cached.versions.empty());
    1148             : 
    1149           0 :             *is = *cached.versions.back();
    1150           0 :             LBCHECK(_instanceCache->release(id, 2));
    1151             :         }
    1152           4 :         else if (releaseCache)
    1153             :         {
    1154           0 :             LBCHECK(_instanceCache->release(id, 1));
    1155             :         }
    1156             :     }
    1157             :     else
    1158             :     {
    1159           0 :         if (releaseCache)
    1160           0 :             _instanceCache->release(id, 1);
    1161             : 
    1162           0 :         LBWARN << "Could not sync object " << id << " request " << requestID
    1163           0 :                << std::endl;
    1164             :     }
    1165             : 
    1166           4 :     _localNode->serveRequest(requestID, result);
    1167           4 :     return true;
    1168             : }
    1169             : 
    1170          51 : bool ObjectStore::_cmdInstance(ICommand& inCommand)
    1171             : {
    1172          51 :     LB_TS_THREAD(_receiverThread);
    1173          51 :     LBASSERT(_localNode);
    1174             : 
    1175         102 :     ObjectDataICommand command(inCommand);
    1176          51 :     const NodeID& nodeID = command.get<NodeID>();
    1177          51 :     const uint32_t masterInstanceID = command.get<uint32_t>();
    1178          51 :     const uint32_t cmd = command.getCommand();
    1179             : 
    1180          51 :     LBLOG(LOG_OBJECTS) << "Cmd instance " << command << " master "
    1181          51 :                        << masterInstanceID << " node " << nodeID << std::endl;
    1182             : 
    1183          51 :     command.setType(COMMANDTYPE_OBJECT);
    1184          51 :     command.setCommand(CMD_OBJECT_INSTANCE);
    1185             : 
    1186          51 :     const uint128_t& version = command.getVersion();
    1187          51 :     if (_instanceCache && version.high() == 0)
    1188             :     {
    1189          51 :         const ObjectVersion rev(command.getObjectID(), version);
    1190             : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
    1191          51 :         if (cmd != CMD_NODE_OBJECT_INSTANCE_PUSH)
    1192             : #endif
    1193          44 :             _instanceCache->add(rev, masterInstanceID, command, 0);
    1194             :     }
    1195             : 
    1196          51 :     switch (cmd)
    1197             :     {
    1198             :     case CMD_NODE_OBJECT_INSTANCE:
    1199           0 :         LBASSERT(nodeID == 0);
    1200           0 :         LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
    1201           0 :         return true;
    1202             : 
    1203             :     case CMD_NODE_OBJECT_INSTANCE_MAP:
    1204          32 :         if (nodeID != _localNode->getNodeID()) // not for me
    1205           0 :             return true;
    1206             : 
    1207          32 :         LBASSERT(command.getInstanceID() <= CO_INSTANCE_MAX);
    1208          32 :         return dispatchObjectCommand(command);
    1209             : 
    1210             :     case CMD_NODE_OBJECT_INSTANCE_COMMIT:
    1211           4 :         LBASSERT(nodeID == 0);
    1212           4 :         LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
    1213           4 :         return dispatchObjectCommand(command);
    1214             : 
    1215             :     case CMD_NODE_OBJECT_INSTANCE_PUSH:
    1216           7 :         LBASSERT(nodeID == 0);
    1217           7 :         LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
    1218           7 :         _pushData.addDataCommand(command.getObjectID(), command);
    1219           7 :         return true;
    1220             : 
    1221             :     case CMD_NODE_OBJECT_INSTANCE_SYNC:
    1222             :     {
    1223           8 :         if (nodeID != _localNode->getNodeID()) // not for me
    1224           0 :             return true;
    1225             : 
    1226           8 :         void* data = _localNode->getRequestData(command.getInstanceID());
    1227           8 :         LBASSERT(command.getInstanceID() != CO_INSTANCE_NONE);
    1228           8 :         LBASSERTINFO(data, this);
    1229             : 
    1230           8 :         ObjectDataIStream* is = LBSAFECAST(ObjectDataIStream*, data);
    1231           8 :         is->addDataCommand(command);
    1232           8 :         return true;
    1233             :     }
    1234             : 
    1235             :     default:
    1236           0 :         LBUNREACHABLE;
    1237           0 :         return false;
    1238             :     }
    1239             : }
    1240             : 
    1241           0 : bool ObjectStore::_cmdDisableSendOnRegister(ICommand& command)
    1242             : {
    1243           0 :     LB_TS_THREAD(_commandThread);
    1244           0 :     LBASSERTINFO(_sendOnRegister > 0, _sendOnRegister);
    1245             : 
    1246           0 :     if (--_sendOnRegister == 0)
    1247             :     {
    1248           0 :         _sendQueue.clear();
    1249             : 
    1250           0 :         const Nodes& nodes = _localNode->getNodes(false);
    1251           0 :         for (NodePtr node : nodes)
    1252             :         {
    1253           0 :             ConnectionPtr multicast = node->getConnection(true);
    1254           0 :             ConnectionPtr connection = node->getConnection(false);
    1255           0 :             if (multicast)
    1256           0 :                 multicast->finish();
    1257           0 :             if (connection && connection != multicast)
    1258           0 :                 connection->finish();
    1259             :         }
    1260             :     }
    1261             : 
    1262           0 :     const uint32_t requestID = command.get<uint32_t>();
    1263           0 :     _localNode->serveRequest(requestID);
    1264           0 :     return true;
    1265             : }
    1266             : 
    1267          40 : bool ObjectStore::_cmdRemoveNode(ICommand& command)
    1268             : {
    1269          40 :     LB_TS_THREAD(_commandThread);
    1270          40 :     LBLOG(LOG_OBJECTS) << "Cmd object  " << command << std::endl;
    1271             : 
    1272          40 :     Node* node = command.get<Node*>();
    1273          40 :     const uint32_t requestID = command.get<uint32_t>();
    1274             : 
    1275          80 :     lunchbox::ScopedFastWrite mutex(_objects);
    1276          65 :     for (ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i)
    1277             :     {
    1278          25 :         const Objects& objects = i->second;
    1279          50 :         for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
    1280          25 :             (*j)->removeSlaves(node);
    1281             :     }
    1282             : 
    1283          40 :     if (requestID != LB_UNDEFINED_UINT32)
    1284           7 :         _localNode->serveRequest(requestID);
    1285             :     else
    1286          33 :         node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
    1287             : 
    1288          80 :     return true;
    1289             : }
    1290             : 
    1291           5 : bool ObjectStore::_cmdPush(ICommand& command)
    1292             : {
    1293           5 :     LB_TS_THREAD(_commandThread);
    1294             : 
    1295           5 :     const uint128_t& objectID = command.get<uint128_t>();
    1296           5 :     const uint128_t& groupID = command.get<uint128_t>();
    1297           5 :     const uint128_t& typeID = command.get<uint128_t>();
    1298             : 
    1299           5 :     ObjectDataIStream* is = _pushData.pull(objectID);
    1300             : 
    1301           5 :     _localNode->objectPush(groupID, typeID, objectID, *is);
    1302           5 :     _pushData.recycle(is);
    1303           5 :     return true;
    1304             : }
    1305             : 
    1306           0 : std::ostream& operator<<(std::ostream& os, ObjectStore* objectStore)
    1307             : {
    1308           0 :     if (!objectStore)
    1309             :     {
    1310           0 :         os << "NULL objectStore";
    1311           0 :         return os;
    1312             :     }
    1313             : 
    1314           0 :     os << "objectStore (" << (void*)objectStore << ")";
    1315             : 
    1316           0 :     return os;
    1317             : }
    1318          63 : }

Generated by: LCOV version 1.11