LCOV - code coverage report
Current view: top level - co - fullMasterCM.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 121 177 68.4 %
Date: 2018-01-09 16:37:03 Functions: 17 19 89.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "fullMasterCM.h"
      22             : 
      23             : #include "log.h"
      24             : #include "node.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : #include "object.h"
      28             : #include "objectDataIStream.h"
      29             : 
      30             : //#define CO_INSTRUMENT
      31             : 
      32             : namespace co
      33             : {
      34             : namespace
      35             : {
      36             : #ifdef CO_INSTRUMENT
      37             : lunchbox::a_int32_t _bytesBuffered;
      38             : #endif
      39             : }
      40             : 
      41          11 : FullMasterCM::FullMasterCM(Object* object)
      42             :     : VersionedMasterCM(object)
      43             :     , _commitCount(0)
      44          11 :     , _nVersions(0)
      45             : {
      46          11 : }
      47             : 
      48          26 : FullMasterCM::~FullMasterCM()
      49             : {
      50          72 :     for (InstanceDataDeque::const_iterator i = _instanceDatas.begin();
      51          48 :          i != _instanceDatas.end(); ++i)
      52             :     {
      53          13 :         delete *i;
      54             :     }
      55          11 :     _instanceDatas.clear();
      56             : 
      57          33 :     for (InstanceDatas::const_iterator i = _instanceDataCache.begin();
      58          22 :          i != _instanceDataCache.end(); ++i)
      59             :     {
      60           0 :         delete *i;
      61             :     }
      62          11 :     _instanceDataCache.clear();
      63          15 : }
      64             : 
      65           0 : void FullMasterCM::sendInstanceData(const Nodes& nodes)
      66             : {
      67           0 :     LB_TS_THREAD(_cmdThread);
      68           0 :     Mutex mutex(_slaves);
      69           0 :     if (!_slaves->empty())
      70           0 :         return;
      71             : 
      72           0 :     InstanceData* data = _instanceDatas.back();
      73           0 :     data->os.sendInstanceData(nodes);
      74             : }
      75             : 
      76          11 : void FullMasterCM::init()
      77             : {
      78          11 :     LBASSERT(_commitCount == 0);
      79          11 :     VersionedMasterCM::init();
      80             : 
      81          11 :     InstanceData* data = _newInstanceData();
      82             : 
      83          11 :     data->os.enableCommit(VERSION_FIRST, *_slaves);
      84          11 :     _object->getInstanceData(data->os);
      85          11 :     data->os.disable();
      86             : 
      87          11 :     _instanceDatas.push_back(data);
      88          11 :     ++_version;
      89          11 :     ++_commitCount;
      90          11 : }
      91             : 
      92           1 : void FullMasterCM::setAutoObsolete(const uint32_t count)
      93             : {
      94           2 :     Mutex mutex(_slaves);
      95           1 :     _nVersions = count;
      96           1 :     _obsolete();
      97           1 : }
      98             : 
      99         110 : void FullMasterCM::_updateCommitCount(const uint32_t incarnation)
     100             : {
     101         110 :     LBASSERT(!_instanceDatas.empty());
     102         110 :     if (incarnation == CO_COMMIT_NEXT)
     103             :     {
     104         110 :         ++_commitCount;
     105         110 :         return;
     106             :     }
     107             : 
     108           0 :     if (incarnation >= _commitCount)
     109             :     {
     110           0 :         _commitCount = incarnation;
     111           0 :         return;
     112             :     }
     113             : 
     114           0 :     LBASSERTINFO(incarnation >= _commitCount,
     115             :                  "Detected decreasing commit incarnation counter");
     116           0 :     _commitCount = incarnation;
     117             : 
     118             :     // obsolete 'future' old packages
     119           0 :     while (_instanceDatas.size() > 1)
     120             :     {
     121           0 :         InstanceData* data = _instanceDatas.back();
     122           0 :         if (data->commitCount <= _commitCount)
     123           0 :             break;
     124             : 
     125             : #ifdef CO_INSTRUMENT
     126             :         _bytesBuffered -= data->os.getSaveBuffer().getSize();
     127             :         LBINFO << _bytesBuffered << " bytes used" << std::endl;
     128             : #endif
     129           0 :         _releaseInstanceData(data);
     130           0 :         _instanceDatas.pop_back();
     131             :     }
     132             : 
     133           0 :     InstanceData* data = _instanceDatas.back();
     134           0 :     if (data->commitCount > _commitCount)
     135             :     {
     136             :         // tweak commitCount of minimum retained version for correct obsoletion
     137           0 :         data->commitCount = 0;
     138           0 :         _version = data->os.getVersion();
     139             :     }
     140             : }
     141             : 
     142         111 : void FullMasterCM::_obsolete()
     143             : {
     144         111 :     LBASSERT(!_instanceDatas.empty());
     145         327 :     while (_instanceDatas.size() > 1 && _commitCount > _nVersions)
     146             :     {
     147         207 :         InstanceData* data = _instanceDatas.front();
     148         207 :         if (data->commitCount >= (_commitCount - _nVersions))
     149          99 :             break;
     150             : 
     151             : #ifdef CO_INSTRUMENT
     152             :         _bytesBuffered -= data->os.getSaveBuffer().getSize();
     153             :         LBINFO << _bytesBuffered << " bytes used" << std::endl;
     154             : #endif
     155             : #if 0
     156             :         LBINFO
     157             :             << "Remove v" << data->os.getVersion() << " c" << data->commitCount
     158             :             << "@" << _commitCount << "/" << _nVersions << " from "
     159             :             << lunchbox::className( _object ) << " " << ObjectVersion( _object )
     160             :             << std::endl;
     161             : #endif
     162         108 :         _releaseInstanceData(data);
     163         108 :         _instanceDatas.pop_front();
     164             :     }
     165         111 :     _checkConsistency();
     166         111 : }
     167             : 
     168          30 : bool FullMasterCM::_initSlave(const MasterCMCommand& command,
     169             :                               const uint128_t& /*replyVersion*/,
     170             :                               bool replyUseCache)
     171             : {
     172          30 :     _checkConsistency();
     173             : 
     174          30 :     const uint128_t& version = command.getRequestedVersion();
     175          30 :     const uint128_t oldest = _instanceDatas.front()->os.getVersion();
     176             :     uint128_t start =
     177          30 :         (version == VERSION_OLDEST || version < oldest) ? oldest : version;
     178          30 :     uint128_t end = _version;
     179             : 
     180             : #ifndef NDEBUG
     181          30 :     if (version != VERSION_OLDEST && version < start)
     182           0 :         LBINFO << "Mapping version " << start << " instead of requested "
     183           0 :                << version << " for " << lunchbox::className(_object) << " "
     184           0 :                << ObjectVersion(_object->getID(), _version) << " of "
     185           0 :                << _instanceDatas.size() << "/" << _nVersions << std::endl;
     186             : #endif
     187             : 
     188          30 :     const uint128_t& minCachedVersion = command.getMinCachedVersion();
     189          30 :     const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
     190          30 :     const uint128_t replyVersion = start;
     191          30 :     if (replyUseCache)
     192             :     {
     193           0 :         if (minCachedVersion <= start && maxCachedVersion >= start)
     194             :         {
     195             : #ifdef CO_INSTRUMENT_MULTICAST
     196             :             _hit += maxCachedVersion + 1 - start;
     197             : #endif
     198           0 :             start = maxCachedVersion + 1;
     199             :         }
     200           0 :         else if (maxCachedVersion == end)
     201             :         {
     202           0 :             end = LB_MAX(start, minCachedVersion - 1);
     203             : #ifdef CO_INSTRUMENT_MULTICAST
     204             :             _hit += _version - end;
     205             : #endif
     206             :         }
     207             :         // TODO else cached block in the middle, send head and tail elements
     208             :     }
     209             : 
     210             : #if 0
     211             :     LBLOG( LOG_OBJECTS )
     212             :         << *_object << ", instantiate on " << node->getNodeID() << " with v"
     213             :         << ((requested == VERSION_OLDEST) ? oldest : requested) << " ("
     214             :         << requested << ") sending " << start << ".." << end << " have "
     215             :         << _version - _nVersions << ".." << _version << " "
     216             :         << _instanceDatas.size() << std::endl;
     217             : #endif
     218          30 :     LBASSERT(start >= oldest);
     219             : 
     220          30 :     bool dataSent = false;
     221             : 
     222             :     // send all instance datas from start..end
     223          30 :     InstanceDataDeque::iterator i = _instanceDatas.begin();
     224          30 :     while (i != _instanceDatas.end() && (*i)->os.getVersion() < start)
     225           0 :         ++i;
     226             : 
     227          90 :     for (; i != _instanceDatas.end() && (*i)->os.getVersion() <= end; ++i)
     228             :     {
     229          30 :         if (!dataSent)
     230             :         {
     231          30 :             _sendMapSuccess(command, true);
     232          30 :             dataSent = true;
     233             :         }
     234             : 
     235          30 :         InstanceData* data = *i;
     236          30 :         LBASSERT(data);
     237          30 :         data->os.sendMapData(command.getRemoteNode(), command.getInstanceID());
     238             : 
     239             : #ifdef CO_INSTRUMENT_MULTICAST
     240             :         ++_miss;
     241             : #endif
     242             :     }
     243             : 
     244          30 :     if (!dataSent)
     245             :     {
     246           0 :         _sendMapSuccess(command, false);
     247           0 :         _sendMapReply(command, replyVersion, true, replyUseCache, false);
     248             :     }
     249             :     else
     250          30 :         _sendMapReply(command, replyVersion, true, replyUseCache, true);
     251             : 
     252             : #ifdef CO_INSTRUMENT_MULTICAST
     253             :     if (_miss % 100 == 0)
     254             :         LBINFO << "Cached " << _hit << "/" << _hit + _miss
     255             :                << " instance data transmissions" << std::endl;
     256             : #endif
     257          30 :     return true;
     258             : }
     259             : 
     260         141 : void FullMasterCM::_checkConsistency() const
     261             : {
     262             : #ifndef NDEBUG
     263         141 :     LBASSERT(!_instanceDatas.empty());
     264         141 :     LBASSERT(_object->isAttached());
     265             : 
     266         141 :     if (_version == VERSION_NONE)
     267           0 :         return;
     268             : 
     269         141 :     uint128_t version = _version;
     270        1440 :     for (InstanceDataDeque::const_reverse_iterator i = _instanceDatas.rbegin();
     271         960 :          i != _instanceDatas.rend(); ++i)
     272             :     {
     273         339 :         const InstanceData* data = *i;
     274         339 :         LBASSERT(data->os.getVersion() != VERSION_NONE);
     275         339 :         LBASSERTINFO(data->os.getVersion() == version,
     276             :                      data->os.getVersion() << " != " << version);
     277         339 :         if (data != _instanceDatas.front())
     278             :         {
     279         198 :             LBASSERTINFO(data->commitCount + _nVersions >= _commitCount,
     280             :                          data->commitCount << ", " << _commitCount << " ["
     281             :                                            << _nVersions << "]");
     282             :         }
     283         339 :         --version;
     284             :     }
     285             : #endif
     286             : }
     287             : 
     288             : //---------------------------------------------------------------------------
     289             : // cache handling
     290             : //---------------------------------------------------------------------------
     291         121 : FullMasterCM::InstanceData* FullMasterCM::_newInstanceData()
     292             : {
     293             :     InstanceData* instanceData;
     294             : 
     295         121 :     if (_instanceDataCache.empty())
     296         121 :         instanceData = new InstanceData(this);
     297             :     else
     298             :     {
     299           0 :         instanceData = _instanceDataCache.back();
     300           0 :         _instanceDataCache.pop_back();
     301             :     }
     302             : 
     303         121 :     instanceData->commitCount = _commitCount;
     304         121 :     instanceData->os.reset();
     305         121 :     instanceData->os.enableSave();
     306         121 :     return instanceData;
     307             : }
     308             : 
     309         110 : void FullMasterCM::_addInstanceData(InstanceData* data)
     310             : {
     311         110 :     LBASSERT(data->os.getVersion() != VERSION_NONE);
     312         110 :     LBASSERT(data->os.getVersion() != VERSION_INVALID);
     313             : 
     314         110 :     _instanceDatas.push_back(data);
     315             : #ifdef CO_INSTRUMENT
     316             :     _bytesBuffered += data->os.getSaveBuffer().getSize();
     317             :     LBINFO << _bytesBuffered << " bytes used" << std::endl;
     318             : #endif
     319         110 : }
     320             : 
     321         108 : void FullMasterCM::_releaseInstanceData(InstanceData* data)
     322             : {
     323             : #ifdef CO_AGGRESSIVE_CACHING
     324             :     _instanceDataCache.push_back(data);
     325             : #else
     326         108 :     delete data;
     327             : #endif
     328         108 : }
     329             : 
     330         110 : uint128_t FullMasterCM::commit(const uint32_t incarnation)
     331             : {
     332         110 :     LBASSERT(_version != VERSION_NONE);
     333             : 
     334         110 :     if (!_object->isDirty())
     335             :     {
     336           0 :         Mutex mutex(_slaves);
     337           0 :         _updateCommitCount(incarnation);
     338           0 :         _obsolete();
     339           0 :         return _version;
     340             :     }
     341             : 
     342         110 :     _maxVersion.waitGE(_version.low() + 1);
     343         220 :     Mutex mutex(_slaves);
     344             : #if 0
     345             :     LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
     346             :                          << std::endl;
     347             : #endif
     348         110 :     _updateCommitCount(incarnation);
     349         110 :     _commit();
     350         110 :     _obsolete();
     351         110 :     return _version;
     352             : }
     353             : 
     354           6 : void FullMasterCM::_commit()
     355             : {
     356           6 :     InstanceData* instanceData = _newInstanceData();
     357           6 :     instanceData->os.enableCommit(_version + 1, *_slaves);
     358           6 :     _object->getInstanceData(instanceData->os);
     359           6 :     instanceData->os.disable();
     360             : 
     361           6 :     if (instanceData->os.hasSentData())
     362             :     {
     363           6 :         ++_version;
     364           6 :         LBASSERT(_version != VERSION_NONE);
     365             : #if 0
     366             :         LBINFO << "Committed v" << _version << "@" << _commitCount << ", id "
     367             :                << _object->getID() << std::endl;
     368             : #endif
     369           6 :         _addInstanceData(instanceData);
     370             :     }
     371             :     else
     372           0 :         _instanceDataCache.push_back(instanceData);
     373           6 : }
     374             : 
     375           3 : void FullMasterCM::push(const uint128_t& groupID, const uint128_t& typeID,
     376             :                         const Nodes& nodes)
     377             : {
     378           6 :     Mutex mutex(_slaves);
     379           3 :     InstanceData* instanceData = _instanceDatas.back();
     380           3 :     instanceData->os.push(nodes, _object->getID(), groupID, typeID);
     381           3 : }
     382             : 
     383           0 : bool FullMasterCM::sendSync(const MasterCMCommand& command)
     384             : {
     385             :     // const uint128_t& version = command.getRequestedVersion();
     386           0 :     const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
     387             :     const bool useCache =
     388           0 :         command.useCache() &&
     389           0 :         command.getMasterInstanceID() == _object->getInstanceID() &&
     390           0 :         maxCachedVersion == _version;
     391             : 
     392           0 :     if (!useCache)
     393             :     {
     394           0 :         Mutex mutex(_slaves);
     395           0 :         InstanceData* instanceData = _instanceDatas.back();
     396           0 :         instanceData->os.sync(command);
     397             :     }
     398             : 
     399           0 :     NodePtr node = command.getNode();
     400           0 :     node->send(CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/)
     401           0 :         << node->getNodeID() << command.getObjectID() << command.getRequestID()
     402           0 :         << true << command.useCache() << useCache;
     403           0 :     return true;
     404             : }
     405          63 : }

Generated by: LCOV version 1.11