LCOV - code coverage report
Current view: top level - co - versionedSlaveCM.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 77 147 52.4 %
Date: 2018-01-09 16:37:03 Functions: 11 15 73.3 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "versionedSlaveCM.h"
      22             : 
      23             : #include "log.h"
      24             : #include "object.h"
      25             : #include "objectCommand.h"
      26             : #include "objectDataICommand.h"
      27             : #include "objectDataIStream.h"
      28             : #include "objectDataOCommand.h"
      29             : #include <limits>
      30             : #include <lunchbox/scopedMutex.h>
      31             : 
      32             : namespace co
      33             : {
      34             : typedef CommandFunc<VersionedSlaveCM> CmdFunc;
      35             : 
      36          34 : VersionedSlaveCM::VersionedSlaveCM(Object* object, uint32_t masterInstanceID)
      37             :     : ObjectCM(object)
      38             :     , _version(VERSION_NONE)
      39             :     , _currentIStream(0)
      40             : #pragma warning(push)
      41             : #pragma warning(disable : 4355)
      42             :     , _ostream(this)
      43             : #pragma warning(pop)
      44          34 :     , _masterInstanceID(masterInstanceID)
      45             : {
      46          34 :     LBASSERT(object);
      47             : 
      48          34 :     object->registerCommand(CMD_OBJECT_INSTANCE,
      49          68 :                             CmdFunc(this, &VersionedSlaveCM::_cmdData), 0);
      50          34 :     object->registerCommand(CMD_OBJECT_DELTA,
      51          68 :                             CmdFunc(this, &VersionedSlaveCM::_cmdData), 0);
      52          34 : }
      53             : 
      54         102 : VersionedSlaveCM::~VersionedSlaveCM()
      55             : {
      56          34 :     while (!_queuedVersions.isEmpty())
      57           0 :         delete _queuedVersions.pop();
      58             : 
      59          34 :     LBASSERT(!_currentIStream);
      60          34 :     delete _currentIStream;
      61          68 : }
      62             : 
      63           0 : uint128_t VersionedSlaveCM::commit(const uint32_t)
      64             : {
      65             : #if 0
      66             :     LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
      67             :                          << std::endl;
      68             : #endif
      69           0 :     if (!_object->isDirty() || !_master || !_master->isReachable())
      70           0 :         return VERSION_NONE;
      71             : 
      72           0 :     _ostream.enableSlaveCommit(_master);
      73           0 :     _object->pack(_ostream);
      74           0 :     _ostream.disable();
      75             : 
      76           0 :     return _ostream.hasSentData() ? _ostream.getVersion() : VERSION_NONE;
      77             : }
      78             : 
      79         309 : uint128_t VersionedSlaveCM::sync(const uint128_t& v)
      80             : {
      81             : #if 0
      82             :     LBLOG( LOG_OBJECTS ) << "sync to v" << v << ", id " << _object->getID()
      83             :                          << "." << _object->getInstanceID() << std::endl;
      84             : #endif
      85         309 :     if (_version == v)
      86         200 :         return _version;
      87             : 
      88         109 :     if (v == VERSION_HEAD)
      89             :     {
      90           0 :         _syncToHead();
      91           0 :         return _version;
      92             :     }
      93             : 
      94         109 :     const uint128_t version = (v == VERSION_NEXT) ? _version + 1 : v;
      95         109 :     LBASSERTINFO(version.high() == 0, "Not a master version: " << version)
      96         109 :     LBASSERTINFO(_version <= version,
      97             :                  "can't sync to older version of object "
      98             :                      << lunchbox::className(_object) << " " << _object->getID()
      99             :                      << " (" << _version << ", " << version << ")");
     100             : 
     101         329 :     while (_version < version)
     102         110 :         _unpackOneVersion(_queuedVersions.pop());
     103             : 
     104         218 :     LocalNodePtr node = _object->getLocalNode();
     105         109 :     if (node.isValid())
     106         109 :         node->flushCommands();
     107             : 
     108         109 :     return _version;
     109             : }
     110             : 
     111           0 : void VersionedSlaveCM::_syncToHead()
     112             : {
     113           0 :     if (_queuedVersions.isEmpty())
     114           0 :         return;
     115             : 
     116           0 :     ObjectDataIStream* is = 0;
     117           0 :     while (_queuedVersions.tryPop(is))
     118           0 :         _unpackOneVersion(is);
     119             : 
     120           0 :     LocalNodePtr localNode = _object->getLocalNode();
     121           0 :     if (localNode.isValid())
     122           0 :         localNode->flushCommands();
     123             : }
     124             : 
     125         144 : void VersionedSlaveCM::_releaseStream(ObjectDataIStream* stream)
     126             : {
     127             : #ifdef CO_AGGRESSIVE_CACHING
     128             :     stream->reset();
     129             :     _iStreamCache.release(stream);
     130             : #else
     131         144 :     delete stream;
     132             : #endif
     133         144 : }
     134             : 
     135           0 : uint128_t VersionedSlaveCM::getHeadVersion() const
     136             : {
     137           0 :     ObjectDataIStream* is = 0;
     138           0 :     if (_queuedVersions.getBack(is))
     139             :     {
     140           0 :         LBASSERT(is);
     141           0 :         return is->getVersion();
     142             :     }
     143           0 :     return _version;
     144             : }
     145             : 
     146         110 : void VersionedSlaveCM::_unpackOneVersion(ObjectDataIStream* is)
     147             : {
     148         110 :     LBASSERT(is);
     149         110 :     LBASSERTINFO(_version == is->getVersion() - 1 || _version == VERSION_NONE,
     150             :                  "Expected version " << _version + 1 << " or 0, got "
     151             :                                      << is->getVersion() << " for "
     152             :                                      << *_object);
     153             : 
     154         110 :     if (is->hasInstanceData())
     155           4 :         _object->applyInstanceData(*is);
     156             :     else
     157         106 :         _object->unpack(*is);
     158             : 
     159         110 :     _version = is->getVersion();
     160         110 :     _sendAck();
     161             : 
     162         110 :     LBASSERT(_version != VERSION_INVALID);
     163         110 :     LBASSERT(_version != VERSION_NONE);
     164         110 :     LBASSERTINFO(is->getRemainingBufferSize() == 0 &&
     165             :                      is->nRemainingBuffers() == 0,
     166             :                  "Object " << typeid(*_object).name()
     167             :                            << " did not unpack all data");
     168             : 
     169             : #if 0
     170             :     LBLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
     171             :                          << _object->getID() << "." << _object->getInstanceID()
     172             :                          << std::endl;
     173             : #endif
     174         110 :     _releaseStream(is);
     175         110 : }
     176             : 
     177         110 : void VersionedSlaveCM::_sendAck()
     178             : {
     179         110 :     const uint64_t maxVersion = _version.low() + _object->getMaxVersions();
     180         110 :     if (maxVersion <= _version.low()) // overflow: default unblocking commit
     181         108 :         return;
     182             : 
     183           4 :     _object->send(_master, CMD_OBJECT_MAX_VERSION, _masterInstanceID)
     184           6 :         << maxVersion << _object->getInstanceID();
     185             : }
     186             : 
     187          34 : void VersionedSlaveCM::applyMapData(const uint128_t& version)
     188             : {
     189             :     while (true)
     190             :     {
     191          34 :         ObjectDataIStream* is = _queuedVersions.pop();
     192          34 :         if (is->getVersion() == version)
     193             :         {
     194          34 :             LBASSERTINFO(is->hasInstanceData(), *_object);
     195             : 
     196          34 :             if (is->hasData()) // not VERSION_NONE
     197          31 :                 _object->applyInstanceData(*is);
     198          34 :             _version = is->getVersion();
     199             : 
     200          34 :             LBASSERT(_version != VERSION_INVALID);
     201          34 :             LBASSERTINFO(!is->hasData(),
     202             :                          lunchbox::className(_object)
     203             :                              << " did not unpack all data, "
     204             :                              << is->getRemainingBufferSize() << " bytes, "
     205             :                              << is->nRemainingBuffers() << " buffer(s)");
     206             : 
     207          34 :             _releaseStream(is);
     208          34 :             return;
     209             :         }
     210             :         else
     211             :         {
     212             :             // Found the following case:
     213             :             // - p1, t1 calls commit
     214             :             // - p1, t2 calls mapObject
     215             :             // - p1, cmd commits new version
     216             :             // - p1, cmd subscribes object
     217             :             // - p1, rcv attaches object
     218             :             // - p1, cmd receives commit data
     219             :             // -> newly attached object recv new commit data before map data,
     220             :             //    ignore it
     221           0 :             LBASSERTINFO(is->getVersion() > version, is->getVersion()
     222             :                                                          << " <= " << version);
     223           0 :             _releaseStream(is);
     224             :         }
     225           0 :     }
     226             : }
     227             : 
     228           0 : void VersionedSlaveCM::addInstanceDatas(const ObjectDataIStreamDeque& cache,
     229             :                                         const uint128_t& startVersion)
     230             : {
     231           0 :     LB_TS_THREAD(_rcvThread);
     232             : #if 0
     233             :     LBLOG( LOG_OBJECTS ) << lunchbox::disableFlush << "Adding data front ";
     234             : #endif
     235             : 
     236           0 :     uint128_t oldest = VERSION_NONE;
     237           0 :     uint128_t newest = VERSION_NONE;
     238           0 :     if (!_queuedVersions.isEmpty())
     239             :     {
     240           0 :         ObjectDataIStream* is = 0;
     241             : 
     242           0 :         LBCHECK(_queuedVersions.getFront(is));
     243           0 :         oldest = is->getVersion();
     244             : 
     245           0 :         LBCHECK(_queuedVersions.getBack(is));
     246           0 :         newest = is->getVersion();
     247             :     }
     248             : 
     249           0 :     ObjectDataIStreamDeque head;
     250           0 :     ObjectDataIStreams tail;
     251             : 
     252           0 :     for (ObjectDataIStreamDeque::const_iterator i = cache.begin();
     253           0 :          i != cache.end(); ++i)
     254             :     {
     255           0 :         ObjectDataIStream* stream = *i;
     256           0 :         const uint128_t& version = stream->getVersion();
     257           0 :         if (version < startVersion)
     258           0 :             continue;
     259             : 
     260           0 :         LBASSERT(stream->isReady());
     261           0 :         LBASSERT(stream->hasInstanceData());
     262           0 :         if (!stream->isReady())
     263           0 :             break;
     264             : 
     265           0 :         if (version < oldest)
     266           0 :             head.push_front(stream);
     267           0 :         else if (version > newest)
     268           0 :             tail.push_back(stream);
     269             :     }
     270             : 
     271           0 :     for (ObjectDataIStreamDeque::const_iterator i = head.begin();
     272           0 :          i != head.end(); ++i)
     273             :     {
     274           0 :         const ObjectDataIStream* stream = *i;
     275             : #ifndef NDEBUG
     276           0 :         ObjectDataIStream* debugStream = 0;
     277           0 :         _queuedVersions.getFront(debugStream);
     278           0 :         if (debugStream)
     279           0 :             LBASSERT(debugStream->getVersion() == stream->getVersion() + 1);
     280             : #endif
     281           0 :         _queuedVersions.pushFront(new ObjectDataIStream(*stream));
     282             :     }
     283             : 
     284           0 :     for (ObjectDataIStreams::const_iterator i = tail.begin(); i != tail.end();
     285             :          ++i)
     286             :     {
     287           0 :         const ObjectDataIStream* stream = *i;
     288             : #ifndef NDEBUG
     289           0 :         ObjectDataIStream* debugStream = 0;
     290           0 :         _queuedVersions.getBack(debugStream);
     291           0 :         if (debugStream)
     292             :         {
     293           0 :             LBASSERT(debugStream->getVersion() + 1 == stream->getVersion());
     294             :         }
     295             : #endif
     296           0 :         _queuedVersions.push(new ObjectDataIStream(*stream));
     297             :     }
     298           0 : }
     299             : 
     300             : //---------------------------------------------------------------------------
     301             : // command handlers
     302             : //---------------------------------------------------------------------------
     303         144 : bool VersionedSlaveCM::_cmdData(ICommand& cmd)
     304             : {
     305         288 :     ObjectDataICommand command(cmd);
     306             : 
     307         144 :     LB_TS_THREAD(_rcvThread);
     308         144 :     LBASSERT(command.getNode().isValid());
     309             : 
     310         144 :     if (!_currentIStream)
     311         144 :         _currentIStream = _iStreamCache.alloc();
     312             : 
     313         144 :     _currentIStream->addDataCommand(command);
     314         144 :     if (_currentIStream->isReady())
     315             :     {
     316         144 :         const uint128_t& version = _currentIStream->getVersion();
     317             : #ifndef NDEBUG
     318         144 :         ObjectDataIStream* debugStream = 0;
     319         144 :         _queuedVersions.getBack(debugStream);
     320         144 :         if (debugStream)
     321             :         {
     322          98 :             LBASSERT(debugStream->getVersion() + 1 == version ||
     323             :                      debugStream->getVersion() == VERSION_NONE);
     324             :         }
     325             : #endif
     326         144 :         _queuedVersions.push(_currentIStream);
     327         144 :         _object->notifyNewHeadVersion(version);
     328         144 :         _currentIStream = 0;
     329             :     }
     330         288 :     return true;
     331             : }
     332          63 : }

Generated by: LCOV version 1.11