LCOV - code coverage report
Current view: top level - co - versionedSlaveCM.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 78 149 52.3 %
Date: 2015-11-03 13:48:53 Functions: 11 15 73.3 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *               2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "versionedSlaveCM.h"
      22             : 
      23             : #include "log.h"
      24             : #include "object.h"
      25             : #include "objectCommand.h"
      26             : #include "objectDataICommand.h"
      27             : #include "objectDataIStream.h"
      28             : #include "objectDataOCommand.h"
      29             : #include <lunchbox/scopedMutex.h>
      30             : #include <limits>
      31             : 
      32             : namespace co
      33             : {
      34             : typedef CommandFunc< VersionedSlaveCM > CmdFunc;
      35             : 
      36          33 : VersionedSlaveCM::VersionedSlaveCM( Object* object, uint32_t masterInstanceID )
      37             :         : ObjectCM( object )
      38             :         , _version( VERSION_NONE )
      39             :         , _currentIStream( 0 )
      40             : #pragma warning(push)
      41             : #pragma warning(disable: 4355)
      42             :         , _ostream( this )
      43             : #pragma warning(pop)
      44          33 :         , _masterInstanceID( masterInstanceID )
      45             : {
      46          33 :     LBASSERT( object );
      47             : 
      48             :     object->registerCommand( CMD_OBJECT_INSTANCE,
      49          33 :                              CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
      50             :     object->registerCommand( CMD_OBJECT_DELTA,
      51          33 :                              CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
      52          33 : }
      53             : 
      54          99 : VersionedSlaveCM::~VersionedSlaveCM()
      55             : {
      56          69 :     while( !_queuedVersions.isEmpty( ))
      57           3 :         delete _queuedVersions.pop();
      58             : 
      59          33 :     LBASSERT( !_currentIStream );
      60          33 :     delete _currentIStream;
      61          33 :     _currentIStream = 0;
      62             : 
      63          33 :     _version = VERSION_NONE;
      64          33 :     _master = 0;
      65          66 : }
      66             : 
      67           0 : uint128_t VersionedSlaveCM::commit( const uint32_t )
      68             : {
      69             : #if 0
      70             :     LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
      71             :                          << std::endl;
      72             : #endif
      73           0 :     if( !_object->isDirty() || !_master || !_master->isReachable( ))
      74           0 :         return VERSION_NONE;
      75             : 
      76           0 :     _ostream.enableSlaveCommit( _master );
      77           0 :     _object->pack( _ostream );
      78           0 :     _ostream.disable();
      79             : 
      80           0 :     return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
      81             : }
      82             : 
      83         308 : uint128_t VersionedSlaveCM::sync( const uint128_t& v )
      84             : {
      85             : #if 0
      86             :     LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
      87             :                          << "." << _object->getInstanceID() << std::endl;
      88             : #endif
      89         308 :     if( _version == v )
      90         200 :         return _version;
      91             : 
      92         108 :     if( v == VERSION_HEAD )
      93             :     {
      94           0 :         _syncToHead();
      95           0 :         return _version;
      96             :     }
      97             : 
      98         108 :     const uint128_t version = ( v == VERSION_NEXT ) ? _version + 1 : v;
      99         108 :     LBASSERTINFO( version.high() == 0, "Not a master version: " << version )
     100         108 :     LBASSERTINFO( _version <= version,
     101             :                   "can't sync to older version of object " <<
     102             :                   lunchbox::className( _object ) << " " << _object->getID() <<
     103             :                   " (" << _version << ", " << version <<")" );
     104             : 
     105         325 :     while( _version < version )
     106         109 :         _unpackOneVersion( _queuedVersions.pop( ));
     107             : 
     108         108 :     LocalNodePtr node = _object->getLocalNode();
     109         108 :     if( node.isValid( ))
     110         108 :         node->flushCommands();
     111             : 
     112         108 :     return _version;
     113             : }
     114             : 
     115           0 : void VersionedSlaveCM::_syncToHead()
     116             : {
     117           0 :     if( _queuedVersions.isEmpty( ))
     118           0 :         return;
     119             : 
     120           0 :     ObjectDataIStream* is = 0;
     121           0 :     while( _queuedVersions.tryPop( is ))
     122           0 :         _unpackOneVersion( is );
     123             : 
     124           0 :     LocalNodePtr localNode = _object->getLocalNode();
     125           0 :     if( localNode.isValid( ))
     126           0 :         localNode->flushCommands();
     127             : }
     128             : 
     129         142 : void VersionedSlaveCM::_releaseStream( ObjectDataIStream* stream )
     130             : {
     131             : #ifdef CO_AGGRESSIVE_CACHING
     132             :     stream->reset();
     133             :     _iStreamCache.release( stream );
     134             : #else
     135         142 :     delete stream;
     136             : #endif
     137         142 : }
     138             : 
     139           0 : uint128_t VersionedSlaveCM::getHeadVersion() const
     140             : {
     141           0 :     ObjectDataIStream* is = 0;
     142           0 :     if( _queuedVersions.getBack( is ))
     143             :     {
     144           0 :         LBASSERT( is );
     145           0 :         return is->getVersion();
     146             :     }
     147           0 :     return _version;
     148             : }
     149             : 
     150         109 : void VersionedSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
     151             : {
     152         109 :     LBASSERT( is );
     153         109 :     LBASSERTINFO( _version == is->getVersion() - 1 || _version == VERSION_NONE,
     154             :                   "Expected version " << _version + 1 << " or 0, got "
     155             :                   << is->getVersion() << " for " << *_object );
     156             : 
     157         109 :     if( is->hasInstanceData( ))
     158           3 :         _object->applyInstanceData( *is );
     159             :     else
     160         106 :         _object->unpack( *is );
     161             : 
     162         109 :     _version = is->getVersion();
     163         109 :     _sendAck();
     164             : 
     165         109 :     LBASSERT( _version != VERSION_INVALID );
     166         109 :     LBASSERT( _version != VERSION_NONE );
     167         109 :     LBASSERTINFO( is->getRemainingBufferSize()==0 && is->nRemainingBuffers()==0,
     168             :                   "Object " << typeid( *_object ).name() <<
     169             :                   " did not unpack all data" );
     170             : 
     171             : #if 0
     172             :     LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
     173             :                          << _object->getID() << "." << _object->getInstanceID()
     174             :                          << std::endl;
     175             : #endif
     176         109 :     _releaseStream( is );
     177         109 : }
     178             : 
     179         109 : void VersionedSlaveCM::_sendAck()
     180             : {
     181         109 :     const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
     182         109 :     if( maxVersion <= _version.low( )) // overflow: default unblocking commit
     183         216 :         return;
     184             : 
     185             :     _object->send( _master, CMD_OBJECT_MAX_VERSION, _masterInstanceID )
     186           2 :             << maxVersion << _object->getInstanceID();
     187             : }
     188             : 
     189          33 : void VersionedSlaveCM::applyMapData( const uint128_t& version )
     190             : {
     191             :     while( true )
     192             :     {
     193          33 :         ObjectDataIStream* is = _queuedVersions.pop();
     194          33 :         if( is->getVersion() == version )
     195             :         {
     196          33 :             LBASSERTINFO( is->hasInstanceData(), *_object );
     197             : 
     198          33 :             if( is->hasData( )) // not VERSION_NONE
     199          30 :                 _object->applyInstanceData( *is );
     200          33 :             _version = is->getVersion();
     201             : 
     202          33 :             LBASSERT( _version != VERSION_INVALID );
     203          33 :             LBASSERTINFO( !is->hasData(),
     204             :                           lunchbox::className( _object ) <<
     205             :                           " did not unpack all data, " <<
     206             :                           is->getRemainingBufferSize() << " bytes, " <<
     207             :                           is->nRemainingBuffers() << " buffer(s)" );
     208             : 
     209          33 :             _releaseStream( is );
     210             : #if 0
     211             :             LBLOG( LOG_OBJECTS ) << "Mapped initial data of " << _object
     212             :                                  << std::endl;
     213             : #endif
     214          33 :             return;
     215             :         }
     216             :         else
     217             :         {
     218             :             // Found the following case:
     219             :             // - p1, t1 calls commit
     220             :             // - p1, t2 calls mapObject
     221             :             // - p1, cmd commits new version
     222             :             // - p1, cmd subscribes object
     223             :             // - p1, rcv attaches object
     224             :             // - p1, cmd receives commit data
     225             :             // -> newly attached object recv new commit data before map data,
     226             :             //    ignore it
     227           0 :             LBASSERTINFO( is->getVersion() > version,
     228             :                           is->getVersion() << " <= " << version );
     229           0 :             _releaseStream( is );
     230             :         }
     231           0 :     }
     232             : }
     233             : 
     234           0 : void VersionedSlaveCM::addInstanceDatas( const ObjectDataIStreamDeque& cache,
     235             :                                          const uint128_t& startVersion )
     236             : {
     237           0 :     LB_TS_THREAD( _rcvThread );
     238             : #if 0
     239             :     LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
     240             : #endif
     241             : 
     242           0 :     uint128_t oldest = VERSION_NONE;
     243           0 :     uint128_t newest = VERSION_NONE;
     244           0 :     if( !_queuedVersions.isEmpty( ))
     245             :     {
     246           0 :         ObjectDataIStream* is = 0;
     247             : 
     248           0 :         LBCHECK( _queuedVersions.getFront( is ));
     249           0 :         oldest = is->getVersion();
     250             : 
     251           0 :         LBCHECK( _queuedVersions.getBack( is ));
     252           0 :         newest = is->getVersion();
     253             :     }
     254             : 
     255           0 :     ObjectDataIStreamDeque head;
     256           0 :     ObjectDataIStreams tail;
     257             : 
     258           0 :     for( ObjectDataIStreamDeque::const_iterator i = cache.begin();
     259           0 :          i != cache.end(); ++i )
     260             :     {
     261           0 :         ObjectDataIStream* stream = *i;
     262           0 :         const uint128_t& version = stream->getVersion();
     263           0 :         if( version < startVersion )
     264           0 :             continue;
     265             : 
     266           0 :         LBASSERT( stream->isReady( ));
     267           0 :         LBASSERT( stream->hasInstanceData( ));
     268           0 :         if( !stream->isReady( ))
     269           0 :             break;
     270             : 
     271           0 :         if( version < oldest )
     272           0 :             head.push_front( stream );
     273           0 :         else if( version > newest )
     274           0 :             tail.push_back( stream );
     275             :     }
     276             : 
     277           0 :     for( ObjectDataIStreamDeque::const_iterator i = head.begin();
     278           0 :          i != head.end(); ++i )
     279             :     {
     280           0 :         const ObjectDataIStream* stream = *i;
     281             : #ifndef NDEBUG
     282           0 :         ObjectDataIStream* debugStream = 0;
     283           0 :         _queuedVersions.getFront( debugStream );
     284           0 :         if( debugStream )
     285           0 :             LBASSERT( debugStream->getVersion() == stream->getVersion() + 1);
     286             : #endif
     287           0 :         _queuedVersions.pushFront( new ObjectDataIStream( *stream ));
     288             : #if 0
     289             :         LBLOG( LOG_OBJECTS ) << stream->getVersion() << ' ';
     290             : #endif
     291             :     }
     292             : 
     293             : #if 0
     294             :     LBLOG( LOG_OBJECTS ) << " back ";
     295             : #endif
     296           0 :     for( ObjectDataIStreams::const_iterator i = tail.begin();
     297           0 :          i != tail.end(); ++i )
     298             :     {
     299           0 :         const ObjectDataIStream* stream = *i;
     300             : #ifndef NDEBUG
     301           0 :         ObjectDataIStream* debugStream = 0;
     302           0 :         _queuedVersions.getBack( debugStream );
     303           0 :         if( debugStream )
     304             :         {
     305           0 :             LBASSERT( debugStream->getVersion() + 1 == stream->getVersion( ));
     306             :         }
     307             : #endif
     308           0 :         _queuedVersions.push( new ObjectDataIStream( *stream ));
     309             : #if 0
     310             :         LBLOG( LOG_OBJECTS ) << stream->getVersion() << ' ';
     311             : #endif
     312           0 :     }
     313             : #if 0
     314             :     LBLOG( LOG_OBJECTS ) << std::endl << lunchbox::enableFlush;
     315             : #endif
     316           0 : }
     317             : 
     318             : //---------------------------------------------------------------------------
     319             : // command handlers
     320             : //---------------------------------------------------------------------------
     321         145 : bool VersionedSlaveCM::_cmdData( ICommand& cmd )
     322             : {
     323         145 :     ObjectDataICommand command( cmd );
     324             : 
     325         145 :     LB_TS_THREAD( _rcvThread );
     326         145 :     LBASSERT( command.getNode().isValid( ));
     327             : 
     328         145 :     if( !_currentIStream )
     329         145 :         _currentIStream = _iStreamCache.alloc();
     330             : 
     331         145 :     _currentIStream->addDataCommand( command );
     332         145 :     if( _currentIStream->isReady( ))
     333             :     {
     334         145 :         const uint128_t& version = _currentIStream->getVersion();
     335             : #if 0
     336             :         LBLOG( LOG_OBJECTS ) << "v" << version << ", id " << _object->getID()
     337             :                              << "." << _object->getInstanceID() << " ready"
     338             :                              << std::endl;
     339             : #endif
     340             : #ifndef NDEBUG
     341         145 :         ObjectDataIStream* debugStream = 0;
     342         145 :         _queuedVersions.getBack( debugStream );
     343         145 :         if ( debugStream )
     344             :         {
     345         101 :             LBASSERT( debugStream->getVersion() + 1 == version ||
     346             :                       debugStream->getVersion() == VERSION_NONE );
     347             :         }
     348             : #endif
     349         145 :         _queuedVersions.push( _currentIStream );
     350         145 :         _object->notifyNewHeadVersion( version );
     351         145 :         _currentIStream = 0;
     352             :     }
     353         145 :     return true;
     354             : }
     355             : 
     356          63 : }

Generated by: LCOV version 1.11