LCOV - code coverage report
Current view: top level - co - fullMasterCM.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 123 179 68.7 %
Date: 2016-12-14 01:26:48 Functions: 17 19 89.5 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "fullMasterCM.h"
      22             : 
      23             : #include "log.h"
      24             : #include "node.h"
      25             : #include "nodeCommand.h"
      26             : #include "oCommand.h"
      27             : #include "object.h"
      28             : #include "objectDataIStream.h"
      29             : 
      30             : //#define CO_INSTRUMENT
      31             : 
      32             : namespace co
      33             : {
      34             : namespace
      35             : {
      36             : #ifdef CO_INSTRUMENT
      37             : lunchbox::a_int32_t _bytesBuffered;
      38             : #endif
      39             : }
      40             : 
      41          11 : FullMasterCM::FullMasterCM( Object* object )
      42             :         : VersionedMasterCM( object )
      43             :         , _commitCount( 0 )
      44          11 :         , _nVersions( 0 )
      45          11 : {}
      46             : 
      47          26 : FullMasterCM::~FullMasterCM()
      48             : {
      49          72 :     for( InstanceDataDeque::const_iterator i = _instanceDatas.begin();
      50          48 :          i != _instanceDatas.end(); ++i )
      51             :     {
      52          13 :         delete *i;
      53             :     }
      54          11 :     _instanceDatas.clear();
      55             : 
      56          33 :     for( InstanceDatas::const_iterator i = _instanceDataCache.begin();
      57          22 :          i != _instanceDataCache.end(); ++i )
      58             :     {
      59           0 :         delete *i;
      60             :     }
      61          11 :     _instanceDataCache.clear();
      62          15 : }
      63             : 
      64           0 : void FullMasterCM::sendInstanceData( const Nodes& nodes )
      65             : {
      66           0 :     LB_TS_THREAD( _cmdThread );
      67           0 :     Mutex mutex( _slaves );
      68           0 :     if( !_slaves->empty( ))
      69           0 :         return;
      70             : 
      71           0 :     InstanceData* data = _instanceDatas.back();
      72           0 :     data->os.sendInstanceData( nodes );
      73             : }
      74             : 
      75          11 : void FullMasterCM::init()
      76             : {
      77          11 :     LBASSERT( _commitCount == 0 );
      78          11 :     VersionedMasterCM::init();
      79             : 
      80          11 :     InstanceData* data = _newInstanceData();
      81             : 
      82          11 :     data->os.enableCommit( VERSION_FIRST, *_slaves );
      83          11 :     _object->getInstanceData( data->os );
      84          11 :     data->os.disable();
      85             : 
      86          11 :     _instanceDatas.push_back( data );
      87          11 :     ++_version;
      88          11 :     ++_commitCount;
      89          11 : }
      90             : 
      91           1 : void FullMasterCM::setAutoObsolete( const uint32_t count )
      92             : {
      93           2 :     Mutex mutex( _slaves );
      94           1 :     _nVersions = count;
      95           1 :     _obsolete();
      96           1 : }
      97             : 
      98         110 : void FullMasterCM::_updateCommitCount( const uint32_t incarnation )
      99             : {
     100         110 :     LBASSERT( !_instanceDatas.empty( ));
     101         110 :     if( incarnation == CO_COMMIT_NEXT )
     102             :     {
     103         110 :         ++_commitCount;
     104         110 :         return;
     105             :     }
     106             : 
     107           0 :     if( incarnation >= _commitCount )
     108             :     {
     109           0 :         _commitCount = incarnation;
     110           0 :         return;
     111             :     }
     112             : 
     113           0 :     LBASSERTINFO( incarnation >= _commitCount,
     114             :           "Detected decreasing commit incarnation counter" );
     115           0 :     _commitCount = incarnation;
     116             : 
     117             :     // obsolete 'future' old packages
     118           0 :     while( _instanceDatas.size() > 1 )
     119             :     {
     120           0 :         InstanceData* data = _instanceDatas.back();
     121           0 :         if( data->commitCount <= _commitCount )
     122           0 :             break;
     123             : 
     124             : #ifdef CO_INSTRUMENT
     125             :         _bytesBuffered -= data->os.getSaveBuffer().getSize();
     126             :         LBINFO << _bytesBuffered << " bytes used" << std::endl;
     127             : #endif
     128           0 :         _releaseInstanceData( data );
     129           0 :         _instanceDatas.pop_back();
     130             :     }
     131             : 
     132           0 :     InstanceData* data = _instanceDatas.back();
     133           0 :     if( data->commitCount > _commitCount )
     134             :     {
     135             :         // tweak commitCount of minimum retained version for correct obsoletion
     136           0 :         data->commitCount = 0;
     137           0 :         _version = data->os.getVersion();
     138             :     }
     139             : }
     140             : 
     141         111 : void FullMasterCM::_obsolete()
     142             : {
     143         111 :     LBASSERT( !_instanceDatas.empty( ));
     144         327 :     while( _instanceDatas.size() > 1 && _commitCount > _nVersions )
     145             :     {
     146         207 :         InstanceData* data = _instanceDatas.front();
     147         207 :         if( data->commitCount >= (_commitCount - _nVersions))
     148          99 :             break;
     149             : 
     150             : #ifdef CO_INSTRUMENT
     151             :         _bytesBuffered -= data->os.getSaveBuffer().getSize();
     152             :         LBINFO << _bytesBuffered << " bytes used" << std::endl;
     153             : #endif
     154             : #if 0
     155             :         LBINFO
     156             :             << "Remove v" << data->os.getVersion() << " c" << data->commitCount
     157             :             << "@" << _commitCount << "/" << _nVersions << " from "
     158             :             << lunchbox::className( _object ) << " " << ObjectVersion( _object )
     159             :             << std::endl;
     160             : #endif
     161         108 :         _releaseInstanceData( data );
     162         108 :         _instanceDatas.pop_front();
     163             :     }
     164         111 :     _checkConsistency();
     165         111 : }
     166             : 
     167          30 : bool FullMasterCM::_initSlave( const MasterCMCommand& command,
     168             :                                const uint128_t& /*replyVersion*/,
     169             :                                bool replyUseCache )
     170             : {
     171          30 :     _checkConsistency();
     172             : 
     173          30 :     const uint128_t& version = command.getRequestedVersion();
     174          30 :     const uint128_t oldest = _instanceDatas.front()->os.getVersion();
     175          30 :     uint128_t start = (version == VERSION_OLDEST || version < oldest ) ?
     176          30 :                           oldest : version;
     177          30 :     uint128_t end = _version;
     178             : 
     179             : #ifndef NDEBUG
     180          30 :     if( version != VERSION_OLDEST && version < start )
     181           0 :         LBINFO << "Mapping version " << start << " instead of requested "
     182           0 :                << version << " for " << lunchbox::className( _object )
     183           0 :                << " " << ObjectVersion( _object->getID(), _version ) << " of "
     184           0 :                << _instanceDatas.size() << "/" << _nVersions << std::endl;
     185             : #endif
     186             : 
     187          30 :     const uint128_t& minCachedVersion = command.getMinCachedVersion();
     188          30 :     const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
     189          30 :     const uint128_t replyVersion = start;
     190          30 :     if( replyUseCache )
     191             :     {
     192           0 :         if( minCachedVersion <= start && maxCachedVersion >= start )
     193             :         {
     194             : #ifdef CO_INSTRUMENT_MULTICAST
     195             :             _hit += maxCachedVersion + 1 - start;
     196             : #endif
     197           0 :             start = maxCachedVersion + 1;
     198             :         }
     199           0 :         else if( maxCachedVersion == end )
     200             :         {
     201           0 :             end = LB_MAX( start, minCachedVersion - 1 );
     202             : #ifdef CO_INSTRUMENT_MULTICAST
     203             :             _hit += _version - end;
     204             : #endif
     205             :         }
     206             :         // TODO else cached block in the middle, send head and tail elements
     207             :     }
     208             : 
     209             : #if 0
     210             :     LBLOG( LOG_OBJECTS )
     211             :         << *_object << ", instantiate on " << node->getNodeID() << " with v"
     212             :         << ((requested == VERSION_OLDEST) ? oldest : requested) << " ("
     213             :         << requested << ") sending " << start << ".." << end << " have "
     214             :         << _version - _nVersions << ".." << _version << " "
     215             :         << _instanceDatas.size() << std::endl;
     216             : #endif
     217          30 :     LBASSERT( start >= oldest );
     218             : 
     219          30 :     bool dataSent = false;
     220             : 
     221             :     // send all instance datas from start..end
     222          30 :     InstanceDataDeque::iterator i = _instanceDatas.begin();
     223          30 :     while( i != _instanceDatas.end() && (*i)->os.getVersion() < start )
     224           0 :         ++i;
     225             : 
     226          90 :     for( ; i != _instanceDatas.end() && (*i)->os.getVersion() <= end; ++i )
     227             :     {
     228          30 :         if( !dataSent )
     229             :         {
     230          30 :             _sendMapSuccess( command, true );
     231          30 :             dataSent = true;
     232             :         }
     233             : 
     234          30 :         InstanceData* data = *i;
     235          30 :         LBASSERT( data );
     236          60 :         data->os.sendMapData( command.getRemoteNode(),
     237          30 :                               command.getInstanceID( ));
     238             : 
     239             : #ifdef CO_INSTRUMENT_MULTICAST
     240             :         ++_miss;
     241             : #endif
     242             :     }
     243             : 
     244          30 :     if( !dataSent )
     245             :     {
     246           0 :         _sendMapSuccess( command, false );
     247           0 :         _sendMapReply( command, replyVersion, true, replyUseCache, false );
     248             :     }
     249             :     else
     250          30 :         _sendMapReply( command, replyVersion, true, replyUseCache, true );
     251             : 
     252             : #ifdef CO_INSTRUMENT_MULTICAST
     253             :     if( _miss % 100 == 0 )
     254             :         LBINFO << "Cached " << _hit << "/" << _hit + _miss
     255             :                << " instance data transmissions" << std::endl;
     256             : #endif
     257          30 :     return true;
     258             : }
     259             : 
     260         141 : void FullMasterCM::_checkConsistency() const
     261             : {
     262             : #ifndef NDEBUG
     263         141 :     LBASSERT( !_instanceDatas.empty( ));
     264         141 :     LBASSERT( _object->isAttached() );
     265             : 
     266         141 :     if( _version == VERSION_NONE )
     267           0 :         return;
     268             : 
     269         141 :     uint128_t version = _version;
     270        1440 :     for( InstanceDataDeque::const_reverse_iterator i = _instanceDatas.rbegin();
     271         960 :          i != _instanceDatas.rend(); ++i )
     272             :     {
     273         339 :         const InstanceData* data = *i;
     274         339 :         LBASSERT( data->os.getVersion() != VERSION_NONE );
     275         339 :         LBASSERTINFO( data->os.getVersion() == version,
     276             :                       data->os.getVersion() << " != " << version );
     277         339 :         if( data != _instanceDatas.front( ))
     278             :         {
     279         198 :             LBASSERTINFO( data->commitCount + _nVersions >= _commitCount,
     280             :                           data->commitCount << ", " << _commitCount << " [" <<
     281             :                           _nVersions << "]" );
     282             :         }
     283         339 :         --version;
     284             :     }
     285             : #endif
     286             : }
     287             : 
     288             : //---------------------------------------------------------------------------
     289             : // cache handling
     290             : //---------------------------------------------------------------------------
     291         121 : FullMasterCM::InstanceData* FullMasterCM::_newInstanceData()
     292             : {
     293             :     InstanceData* instanceData;
     294             : 
     295         121 :     if( _instanceDataCache.empty( ))
     296         121 :         instanceData = new InstanceData( this );
     297             :     else
     298             :     {
     299           0 :         instanceData = _instanceDataCache.back();
     300           0 :         _instanceDataCache.pop_back();
     301             :     }
     302             : 
     303         121 :     instanceData->commitCount = _commitCount;
     304         121 :     instanceData->os.reset();
     305         121 :     instanceData->os.enableSave();
     306         121 :     return instanceData;
     307             : }
     308             : 
     309         110 : void FullMasterCM::_addInstanceData( InstanceData* data )
     310             : {
     311         110 :     LBASSERT( data->os.getVersion() != VERSION_NONE );
     312         110 :     LBASSERT( data->os.getVersion() != VERSION_INVALID );
     313             : 
     314         110 :     _instanceDatas.push_back( data );
     315             : #ifdef CO_INSTRUMENT
     316             :     _bytesBuffered += data->os.getSaveBuffer().getSize();
     317             :     LBINFO << _bytesBuffered << " bytes used" << std::endl;
     318             : #endif
     319         110 : }
     320             : 
     321         108 : void FullMasterCM::_releaseInstanceData( InstanceData* data )
     322             : {
     323             : #ifdef CO_AGGRESSIVE_CACHING
     324             :     _instanceDataCache.push_back( data );
     325             : #else
     326         108 :     delete data;
     327             : #endif
     328         108 : }
     329             : 
     330         110 : uint128_t FullMasterCM::commit( const uint32_t incarnation )
     331             : {
     332         110 :     LBASSERT( _version != VERSION_NONE );
     333             : 
     334         110 :     if( !_object->isDirty( ))
     335             :     {
     336           0 :         Mutex mutex( _slaves );
     337           0 :         _updateCommitCount( incarnation );
     338           0 :         _obsolete();
     339           0 :         return _version;
     340             :     }
     341             : 
     342         110 :     _maxVersion.waitGE( _version.low() + 1 );
     343         220 :     Mutex mutex( _slaves );
     344             : #if 0
     345             :     LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
     346             :                          << std::endl;
     347             : #endif
     348         110 :     _updateCommitCount( incarnation );
     349         110 :     _commit();
     350         110 :     _obsolete();
     351         110 :     return _version;
     352             : }
     353             : 
     354           6 : void FullMasterCM::_commit()
     355             : {
     356           6 :     InstanceData* instanceData = _newInstanceData();
     357           6 :     instanceData->os.enableCommit( _version + 1, *_slaves );
     358           6 :     _object->getInstanceData( instanceData->os );
     359           6 :     instanceData->os.disable();
     360             : 
     361           6 :     if( instanceData->os.hasSentData( ))
     362             :     {
     363           6 :         ++_version;
     364           6 :         LBASSERT( _version != VERSION_NONE );
     365             : #if 0
     366             :         LBINFO << "Committed v" << _version << "@" << _commitCount << ", id "
     367             :                << _object->getID() << std::endl;
     368             : #endif
     369           6 :         _addInstanceData( instanceData );
     370             :     }
     371             :     else
     372           0 :         _instanceDataCache.push_back( instanceData );
     373           6 : }
     374             : 
     375           3 : void FullMasterCM::push( const uint128_t& groupID, const uint128_t& typeID,
     376             :                          const Nodes& nodes )
     377             : {
     378           6 :     Mutex mutex( _slaves );
     379           3 :     InstanceData* instanceData = _instanceDatas.back();
     380           3 :     instanceData->os.push( nodes, _object->getID(), groupID, typeID );
     381           3 : }
     382             : 
     383           0 : bool FullMasterCM::sendSync( const MasterCMCommand& command )
     384             : {
     385             :     //const uint128_t& version = command.getRequestedVersion();
     386           0 :     const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
     387             :     const bool useCache =
     388           0 :         command.useCache() &&
     389           0 :         command.getMasterInstanceID() == _object->getInstanceID() &&
     390           0 :         maxCachedVersion == _version;
     391             : 
     392           0 :     if( !useCache )
     393             :     {
     394           0 :         Mutex mutex( _slaves );
     395           0 :         InstanceData* instanceData = _instanceDatas.back();
     396           0 :         instanceData->os.sync( command );
     397             :     }
     398             : 
     399           0 :     NodePtr node = command.getNode();
     400           0 :     node->send( CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/ )
     401           0 :         << node->getNodeID() << command.getObjectID() << command.getRequestID()
     402           0 :         << true << command.useCache() << useCache;
     403           0 :     return true;
     404             : }
     405             : 
     406          66 : }

Generated by: LCOV version 1.11