LCOV - code coverage report
Current view: top level - co - versionedSlaveCM.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 80 151 53.0 %
Date: 2016-12-14 01:26:48 Functions: 11 15 73.3 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "versionedSlaveCM.h"
      22             : 
      23             : #include "log.h"
      24             : #include "object.h"
      25             : #include "objectCommand.h"
      26             : #include "objectDataICommand.h"
      27             : #include "objectDataIStream.h"
      28             : #include "objectDataOCommand.h"
      29             : #include <lunchbox/scopedMutex.h>
      30             : #include <limits>
      31             : 
      32             : namespace co
      33             : {
      34             : typedef CommandFunc< VersionedSlaveCM > CmdFunc;
      35             : 
      36          34 : VersionedSlaveCM::VersionedSlaveCM( Object* object, uint32_t masterInstanceID )
      37             :         : ObjectCM( object )
      38             :         , _version( VERSION_NONE )
      39             :         , _currentIStream( 0 )
      40             : #pragma warning(push)
      41             : #pragma warning(disable: 4355)
      42             :         , _ostream( this )
      43             : #pragma warning(pop)
      44          34 :         , _masterInstanceID( masterInstanceID )
      45             : {
      46          34 :     LBASSERT( object );
      47             : 
      48          34 :     object->registerCommand( CMD_OBJECT_INSTANCE,
      49          68 :                              CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
      50          34 :     object->registerCommand( CMD_OBJECT_DELTA,
      51          68 :                              CmdFunc( this, &VersionedSlaveCM::_cmdData ), 0 );
      52          34 : }
      53             : 
      54         102 : VersionedSlaveCM::~VersionedSlaveCM()
      55             : {
      56          34 :     while( !_queuedVersions.isEmpty( ))
      57           0 :         delete _queuedVersions.pop();
      58             : 
      59          34 :     LBASSERT( !_currentIStream );
      60          34 :     delete _currentIStream;
      61          34 :     _currentIStream = 0;
      62             : 
      63          34 :     _version = VERSION_NONE;
      64          34 :     _master = 0;
      65          68 : }
      66             : 
      67           0 : uint128_t VersionedSlaveCM::commit( const uint32_t )
      68             : {
      69             : #if 0
      70             :     LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
      71             :                          << std::endl;
      72             : #endif
      73           0 :     if( !_object->isDirty() || !_master || !_master->isReachable( ))
      74           0 :         return VERSION_NONE;
      75             : 
      76           0 :     _ostream.enableSlaveCommit( _master );
      77           0 :     _object->pack( _ostream );
      78           0 :     _ostream.disable();
      79             : 
      80           0 :     return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
      81             : }
      82             : 
      83         309 : uint128_t VersionedSlaveCM::sync( const uint128_t& v )
      84             : {
      85             : #if 0
      86             :     LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
      87             :                          << "." << _object->getInstanceID() << std::endl;
      88             : #endif
      89         309 :     if( _version == v )
      90         200 :         return _version;
      91             : 
      92         109 :     if( v == VERSION_HEAD )
      93             :     {
      94           0 :         _syncToHead();
      95           0 :         return _version;
      96             :     }
      97             : 
      98         109 :     const uint128_t version = ( v == VERSION_NEXT ) ? _version + 1 : v;
      99         109 :     LBASSERTINFO( version.high() == 0, "Not a master version: " << version )
     100         109 :     LBASSERTINFO( _version <= version,
     101             :                   "can't sync to older version of object " <<
     102             :                   lunchbox::className( _object ) << " " << _object->getID() <<
     103             :                   " (" << _version << ", " << version <<")" );
     104             : 
     105         329 :     while( _version < version )
     106         110 :         _unpackOneVersion( _queuedVersions.pop( ));
     107             : 
     108         218 :     LocalNodePtr node = _object->getLocalNode();
     109         109 :     if( node.isValid( ))
     110         109 :         node->flushCommands();
     111             : 
     112         109 :     return _version;
     113             : }
     114             : 
     115           0 : void VersionedSlaveCM::_syncToHead()
     116             : {
     117           0 :     if( _queuedVersions.isEmpty( ))
     118           0 :         return;
     119             : 
     120           0 :     ObjectDataIStream* is = 0;
     121           0 :     while( _queuedVersions.tryPop( is ))
     122           0 :         _unpackOneVersion( is );
     123             : 
     124           0 :     LocalNodePtr localNode = _object->getLocalNode();
     125           0 :     if( localNode.isValid( ))
     126           0 :         localNode->flushCommands();
     127             : }
     128             : 
     129         144 : void VersionedSlaveCM::_releaseStream( ObjectDataIStream* stream )
     130             : {
     131             : #ifdef CO_AGGRESSIVE_CACHING
     132             :     stream->reset();
     133             :     _iStreamCache.release( stream );
     134             : #else
     135         144 :     delete stream;
     136             : #endif
     137         144 : }
     138             : 
     139           0 : uint128_t VersionedSlaveCM::getHeadVersion() const
     140             : {
     141           0 :     ObjectDataIStream* is = 0;
     142           0 :     if( _queuedVersions.getBack( is ))
     143             :     {
     144           0 :         LBASSERT( is );
     145           0 :         return is->getVersion();
     146             :     }
     147           0 :     return _version;
     148             : }
     149             : 
     150         110 : void VersionedSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
     151             : {
     152         110 :     LBASSERT( is );
     153         110 :     LBASSERTINFO( _version == is->getVersion() - 1 || _version == VERSION_NONE,
     154             :                   "Expected version " << _version + 1 << " or 0, got "
     155             :                   << is->getVersion() << " for " << *_object );
     156             : 
     157         110 :     if( is->hasInstanceData( ))
     158           4 :         _object->applyInstanceData( *is );
     159             :     else
     160         106 :         _object->unpack( *is );
     161             : 
     162         110 :     _version = is->getVersion();
     163         110 :     _sendAck();
     164             : 
     165         110 :     LBASSERT( _version != VERSION_INVALID );
     166         110 :     LBASSERT( _version != VERSION_NONE );
     167         110 :     LBASSERTINFO( is->getRemainingBufferSize()==0 && is->nRemainingBuffers()==0,
     168             :                   "Object " << typeid( *_object ).name() <<
     169             :                   " did not unpack all data" );
     170             : 
     171             : #if 0
     172             :     LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
     173             :                          << _object->getID() << "." << _object->getInstanceID()
     174             :                          << std::endl;
     175             : #endif
     176         110 :     _releaseStream( is );
     177         110 : }
     178             : 
     179         110 : void VersionedSlaveCM::_sendAck()
     180             : {
     181         110 :     const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
     182         110 :     if( maxVersion <= _version.low( )) // overflow: default unblocking commit
     183         108 :         return;
     184             : 
     185           4 :     _object->send( _master, CMD_OBJECT_MAX_VERSION, _masterInstanceID )
     186           6 :             << maxVersion << _object->getInstanceID();
     187             : }
     188             : 
     189          34 : void VersionedSlaveCM::applyMapData( const uint128_t& version )
     190             : {
     191             :     while( true )
     192             :     {
     193          34 :         ObjectDataIStream* is = _queuedVersions.pop();
     194          34 :         if( is->getVersion() == version )
     195             :         {
     196          34 :             LBASSERTINFO( is->hasInstanceData(), *_object );
     197             : 
     198          34 :             if( is->hasData( )) // not VERSION_NONE
     199          31 :                 _object->applyInstanceData( *is );
     200          34 :             _version = is->getVersion();
     201             : 
     202          34 :             LBASSERT( _version != VERSION_INVALID );
     203          34 :             LBASSERTINFO( !is->hasData(),
     204             :                           lunchbox::className( _object ) <<
     205             :                           " did not unpack all data, " <<
     206             :                           is->getRemainingBufferSize() << " bytes, " <<
     207             :                           is->nRemainingBuffers() << " buffer(s)" );
     208             : 
     209          34 :             _releaseStream( is );
     210          34 :             return;
     211             :         }
     212             :         else
     213             :         {
     214             :             // Found the following case:
     215             :             // - p1, t1 calls commit
     216             :             // - p1, t2 calls mapObject
     217             :             // - p1, cmd commits new version
     218             :             // - p1, cmd subscribes object
     219             :             // - p1, rcv attaches object
     220             :             // - p1, cmd receives commit data
     221             :             // -> newly attached object recv new commit data before map data,
     222             :             //    ignore it
     223           0 :             LBASSERTINFO( is->getVersion() > version,
     224             :                           is->getVersion() << " <= " << version );
     225           0 :             _releaseStream( is );
     226             :         }
     227           0 :     }
     228             : }
     229             : 
     230           0 : void VersionedSlaveCM::addInstanceDatas( const ObjectDataIStreamDeque& cache,
     231             :                                          const uint128_t& startVersion )
     232             : {
     233           0 :     LB_TS_THREAD( _rcvThread );
     234             : #if 0
     235             :     LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
     236             : #endif
     237             : 
     238           0 :     uint128_t oldest = VERSION_NONE;
     239           0 :     uint128_t newest = VERSION_NONE;
     240           0 :     if( !_queuedVersions.isEmpty( ))
     241             :     {
     242           0 :         ObjectDataIStream* is = 0;
     243             : 
     244           0 :         LBCHECK( _queuedVersions.getFront( is ));
     245           0 :         oldest = is->getVersion();
     246             : 
     247           0 :         LBCHECK( _queuedVersions.getBack( is ));
     248           0 :         newest = is->getVersion();
     249             :     }
     250             : 
     251           0 :     ObjectDataIStreamDeque head;
     252           0 :     ObjectDataIStreams tail;
     253             : 
     254           0 :     for( ObjectDataIStreamDeque::const_iterator i = cache.begin();
     255           0 :          i != cache.end(); ++i )
     256             :     {
     257           0 :         ObjectDataIStream* stream = *i;
     258           0 :         const uint128_t& version = stream->getVersion();
     259           0 :         if( version < startVersion )
     260           0 :             continue;
     261             : 
     262           0 :         LBASSERT( stream->isReady( ));
     263           0 :         LBASSERT( stream->hasInstanceData( ));
     264           0 :         if( !stream->isReady( ))
     265           0 :             break;
     266             : 
     267           0 :         if( version < oldest )
     268           0 :             head.push_front( stream );
     269           0 :         else if( version > newest )
     270           0 :             tail.push_back( stream );
     271             :     }
     272             : 
     273           0 :     for( ObjectDataIStreamDeque::const_iterator i = head.begin();
     274           0 :          i != head.end(); ++i )
     275             :     {
     276           0 :         const ObjectDataIStream* stream = *i;
     277             : #ifndef NDEBUG
     278           0 :         ObjectDataIStream* debugStream = 0;
     279           0 :         _queuedVersions.getFront( debugStream );
     280           0 :         if( debugStream )
     281           0 :             LBASSERT( debugStream->getVersion() == stream->getVersion() + 1);
     282             : #endif
     283           0 :         _queuedVersions.pushFront( new ObjectDataIStream( *stream ));
     284             :     }
     285             : 
     286           0 :     for( ObjectDataIStreams::const_iterator i = tail.begin();
     287           0 :          i != tail.end(); ++i )
     288             :     {
     289           0 :         const ObjectDataIStream* stream = *i;
     290             : #ifndef NDEBUG
     291           0 :         ObjectDataIStream* debugStream = 0;
     292           0 :         _queuedVersions.getBack( debugStream );
     293           0 :         if( debugStream )
     294             :         {
     295           0 :             LBASSERT( debugStream->getVersion() + 1 == stream->getVersion( ));
     296             :         }
     297             : #endif
     298           0 :         _queuedVersions.push( new ObjectDataIStream( *stream ));
     299             :     }
     300           0 : }
     301             : 
     302             : //---------------------------------------------------------------------------
     303             : // command handlers
     304             : //---------------------------------------------------------------------------
     305         144 : bool VersionedSlaveCM::_cmdData( ICommand& cmd )
     306             : {
     307         288 :     ObjectDataICommand command( cmd );
     308             : 
     309         144 :     LB_TS_THREAD( _rcvThread );
     310         144 :     LBASSERT( command.getNode().isValid( ));
     311             : 
     312         144 :     if( !_currentIStream )
     313         144 :         _currentIStream = _iStreamCache.alloc();
     314             : 
     315         144 :     _currentIStream->addDataCommand( command );
     316         144 :     if( _currentIStream->isReady( ))
     317             :     {
     318         144 :         const uint128_t& version = _currentIStream->getVersion();
     319             : #ifndef NDEBUG
     320         144 :         ObjectDataIStream* debugStream = 0;
     321         144 :         _queuedVersions.getBack( debugStream );
     322         144 :         if ( debugStream )
     323             :         {
     324          98 :             LBASSERT( debugStream->getVersion() + 1 == version ||
     325             :                       debugStream->getVersion() == VERSION_NONE );
     326             :         }
     327             : #endif
     328         144 :         _queuedVersions.push( _currentIStream );
     329         144 :         _object->notifyNewHeadVersion( version );
     330         144 :         _currentIStream = 0;
     331             :     }
     332         288 :     return true;
     333             : }
     334             : 
     335          66 : }

Generated by: LCOV version 1.11