LCOV - code coverage report
Current view: top level - co - barrier.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 136 165 82.4 %
Date: 2016-12-14 01:26:48 Functions: 23 24 95.8 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2011, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "barrier.h"
      23             : 
      24             : #include "iCommand.h"
      25             : #include "connection.h"
      26             : #include "dataIStream.h"
      27             : #include "dataOStream.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "objectICommand.h"
      31             : #include "objectOCommand.h"
      32             : #include "barrierCommand.h"
      33             : #include "exception.h"
      34             : 
      35             : #include <lunchbox/monitor.h>
      36             : #include <lunchbox/stdExt.h>
      37             : 
      38             : #include <unordered_map>
      39             : 
      40             : namespace co
      41             : {
      42             : namespace
      43             : {
      44         109 : struct Request
      45             : {
      46         109 :     Request()
      47         109 :             : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
      48             :     uint64_t time;
      49             :     uint32_t timeout;
      50             :     uint32_t incarnation;
      51             :     Nodes nodes;
      52             : };
      53             : 
      54             : typedef std::unordered_map< uint128_t, Request > RequestMap;
      55             : typedef RequestMap::iterator RequestMapIter;
      56             : }
      57             : 
      58             : namespace detail
      59             : {
      60          31 : class Barrier
      61             : {
      62             : public:
      63          26 :     Barrier() : height( 0 ) {}
      64           5 :     Barrier( const uint128_t& masterID_, const uint32_t height_ )
      65           5 :         : masterID( masterID_ )
      66           5 :         , height( height_ )
      67           5 :     {}
      68             : 
      69             :     /** The master barrier node. */
      70             :     NodeID masterID;
      71             : 
      72             :     /** The height of the barrier, only set on the master. */
      73             :     uint32_t height;
      74             : 
      75             :     /** The local, connected instantiation of the master node. */
      76             :     NodePtr master;
      77             : 
      78             :     /** Slave nodes which have entered the barrier, index per version. */
      79             :     RequestMap enteredNodes;
      80             : 
      81             :     /** The monitor used for barrier leave notification. */
      82             :     lunchbox::Monitor< uint32_t > incarnation;
      83             : };
      84             : }
      85             : 
      86             : typedef CommandFunc<Barrier> CmdFunc;
      87             : 
      88           5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
      89           5 :                   const uint32_t height )
      90           5 :     : _impl( new detail::Barrier( masterNodeID.isUUID() ?
      91           0 :                                   masterNodeID : localNode->getNodeID(),
      92          10 :                                   height ))
      93             : {
      94           5 :     localNode->registerObject( this );
      95           5 : }
      96             : 
      97          26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
      98          26 :     : _impl( new detail::Barrier )
      99             : {
     100          26 :     localNode->mapObject( this, id );
     101          26 : }
     102             : 
     103          90 : Barrier::~Barrier()
     104             : {
     105          62 :     LocalNodePtr localNode = getLocalNode();
     106          31 :     if( localNode )
     107           1 :         localNode->releaseObject( this );
     108             : 
     109          31 :     delete _impl;
     110          59 : }
     111             : 
     112             : //---------------------------------------------------------------------------
     113             : // Serialization
     114             : //---------------------------------------------------------------------------
     115         106 : void Barrier::getInstanceData( DataOStream& os )
     116             : {
     117         106 :     LBASSERT( _impl->masterID != NodeID( ));
     118         106 :     os << _impl->height << _impl->masterID;
     119         106 : }
     120             : 
     121          26 : void Barrier::applyInstanceData( DataIStream& is )
     122             : {
     123          26 :     is >> _impl->height >> _impl->masterID;
     124          26 : }
     125             : 
     126         101 : void Barrier::pack( DataOStream& os )
     127             : {
     128         101 :     os << _impl->height;
     129         101 : }
     130             : 
     131         101 : void Barrier::unpack( DataIStream& is )
     132             : {
     133         101 :     is >> _impl->height;
     134         101 : }
     135             : //---------------------------------------------------------------------------
     136             : 
     137           1 : void Barrier::setHeight( const uint32_t height )
     138             : {
     139           1 :     _impl->height = height;
     140           1 : }
     141             : 
     142           0 : void Barrier::increase()
     143             : {
     144           0 :     ++_impl->height;
     145           0 : }
     146             : 
     147          28 : uint32_t Barrier::getHeight() const
     148             : {
     149          28 :     return _impl->height;
     150             : }
     151             : 
     152          31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
     153             : {
     154          31 :     Object::attach( id, instanceID );
     155             : 
     156          62 :     LocalNodePtr node = getLocalNode();
     157          31 :     CommandQueue* queue = node->getCommandThreadQueue();
     158             : 
     159          31 :     registerCommand( CMD_BARRIER_ENTER,
     160          62 :                      CmdFunc( this, &Barrier::_cmdEnter ), queue );
     161          31 :     registerCommand( CMD_BARRIER_ENTER_REPLY,
     162          62 :                      CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
     163             : 
     164             : #ifdef COLLAGE_V1_API
     165          31 :     if( _impl->masterID == NodeID( ))
     166          26 :         _impl->masterID = node->getNodeID();
     167             : #else
     168             :     LBASSERT( _impl->masterID == NodeID( ));
     169             : #endif
     170          31 : }
     171             : 
     172         352 : bool Barrier::enter( const uint32_t timeout )
     173             : {
     174         352 :     LBASSERT( _impl->height > 0 );
     175         352 :     LBASSERT( _impl->masterID != NodeID( ));
     176             : 
     177         352 :     if( _impl->height == 1 ) // trivial ;)
     178           0 :         return true;
     179             : 
     180         352 :     if( !_impl->master )
     181             :     {
     182          66 :         LocalNodePtr localNode = getLocalNode();
     183          33 :         _impl->master = localNode->connect( _impl->masterID );
     184             :     }
     185             : 
     186         352 :     LBASSERT( _impl->master );
     187         352 :     LBASSERT( _impl->master->isReachable( ));
     188         352 :     if( !_impl->master || !_impl->master->isReachable( ))
     189             :     {
     190           0 :         LBWARN << "Can't connect barrier master node " << _impl->masterID
     191           0 :                << std::endl;
     192           0 :         return false;
     193             :     }
     194             : 
     195         704 :     LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
     196        1056 :                          << ", height " << _impl->height << std::endl;
     197             : 
     198         352 :     const uint32_t leaveVal = _impl->incarnation.get() + 1;
     199             : 
     200         701 :     send( _impl->master, CMD_BARRIER_ENTER )
     201        1054 :         << getVersion() << leaveVal - 1 << timeout;
     202             : 
     203         352 :     if( timeout == LB_TIMEOUT_INDEFINITE )
     204         305 :         _impl->incarnation.waitEQ( leaveVal );
     205          47 :     else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
     206          23 :         return false;
     207             : 
     208         658 :     LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
     209         987 :                          << ", height " << _impl->height << std::endl;
     210         329 :     return true;
     211             : }
     212             : 
     213         352 : bool Barrier::_cmdEnter( ICommand& cmd )
     214             : {
     215         352 :     LB_TS_THREAD( _thread );
     216         352 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     217             :                   _impl->master );
     218             : 
     219         704 :     ObjectICommand command( cmd );
     220         352 :     const uint128_t version = command.get< uint128_t >();
     221         352 :     const uint32_t incarnation = command.get< uint32_t >();
     222         352 :     const uint32_t timeout = command.get< uint32_t >();
     223             : 
     224         352 :     LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
     225           0 :                          << " v" << version
     226         704 :                          << " barrier v" << getVersion() << std::endl;
     227             : 
     228         352 :     Request& request = _impl->enteredNodes[ version ];
     229             : 
     230         352 :     LBLOG( LOG_BARRIER ) << "enter barrier v" << version
     231           0 :                          << ", has " << request.nodes.size() << " of "
     232         352 :                          << _impl->height << std::endl;
     233             : 
     234         352 :     request.time = getLocalNode()->getTime64();
     235             : 
     236             :     // It's the first call to enter barrier
     237         352 :     if( request.nodes.empty( ))
     238             :     {
     239         109 :         request.incarnation = incarnation;
     240         109 :         request.timeout = timeout;
     241             :     }
     242         243 :     else if( request.timeout != LB_TIMEOUT_INDEFINITE )
     243             :     {
     244             :         // the incarnation belongs to an older barrier
     245          40 :         if( request.incarnation < incarnation )
     246             :         {
     247             :             // send directly the reply command to unblock the caller
     248           0 :             _sendNotify( version, command.getNode( ));
     249           0 :             return true;
     250             :         }
     251             :         // the previous enter had a timeout, start a new synchronization
     252             :         //  (same version means same group -> no member can run ahead)
     253          40 :         else if( request.incarnation > incarnation )
     254             :         {
     255           0 :             request.nodes.clear();
     256           0 :             request.incarnation = incarnation;
     257           0 :             request.timeout = timeout;
     258             :         }
     259             :     }
     260         352 :     request.nodes.push_back( command.getNode( ));
     261             : 
     262             :     // clean older data which was not removed during older synchronization
     263         352 :     if( request.timeout != LB_TIMEOUT_INDEFINITE )
     264          47 :         _cleanup( request.time );
     265             : 
     266             :     // If we got early entry requests for this barrier, just note their
     267             :     // appearance. This requires that another request for the later version
     268             :     // arrives once the barrier reaches this version. The only case when this is
     269             :     // not the case is when no contributor to the current version contributes to
     270             :     // the later version, in which case deadlocks might happen because the later
     271             :     // version never leaves the barrier. We simply assume this is not the case.
     272         352 :     if( version > getVersion( ))
     273           0 :         return true;
     274             : 
     275             :     // If it's an older version a timeout has been handled.
     276             :     // For performance, send directly the order to unblock the caller.
     277         352 :     if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
     278             :     {
     279           0 :         LBASSERT( incarnation == 0 );
     280           0 :         _sendNotify( version, command.getNode( ) );
     281           0 :         return true;
     282             :     }
     283             : 
     284         352 :     LBASSERTINFO( version == getVersion(),
     285             :                   "Barrier master updated to new version while in barrier " <<
     286             :                   getID() << " (" << version << " != " << getVersion() << ")" );
     287             : 
     288         352 :     Nodes& nodes = request.nodes;
     289         352 :     if( nodes.size() < _impl->height )
     290         245 :         return true;
     291             : 
     292         107 :     LBASSERT( nodes.size() == _impl->height );
     293         107 :     LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
     294         107 :                          << std::endl;
     295             : 
     296         107 :     stde::usort( nodes );
     297             : 
     298         242 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     299         135 :         _sendNotify( version, *i );
     300             : 
     301             :     // delete node vector for version
     302         107 :     _impl->enteredNodes.erase( version );
     303         107 :     return true;
     304             : }
     305             : 
     306         135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
     307             : {
     308         135 :     LB_TS_THREAD( _thread );
     309         135 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     310             :                   _impl->master );
     311             : 
     312         135 :     if( node->isLocal( )) // OPT
     313             :     {
     314         104 :         LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
     315             :         // the case where we receive a different version of the barrier meant
     316             :         // that previosly we have detect a timeout true negative
     317         104 :         if( version == getVersion() )
     318         104 :             ++_impl->incarnation;
     319             :     }
     320             :     else
     321             :     {
     322          31 :         LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
     323          31 :         send( node, CMD_BARRIER_ENTER_REPLY ) << version;
     324             :     }
     325         135 : }
     326             : 
     327          47 : void Barrier::_cleanup( const uint64_t time )
     328             : {
     329          47 :     LB_TS_THREAD( _thread );
     330          47 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     331             :                   _impl->master );
     332             : 
     333          47 :     if( _impl->enteredNodes.size() < 2 )
     334          47 :         return;
     335             : 
     336           0 :     for( RequestMapIter i = _impl->enteredNodes.begin();
     337           0 :          i != _impl->enteredNodes.end(); ++i )
     338             :     {
     339           0 :         Request& cleanNodes = i->second;
     340             : 
     341           0 :         if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
     342           0 :             continue;
     343             : 
     344           0 :         const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
     345             :                         cleanNodes.timeout :
     346           0 :                         Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
     347             : 
     348           0 :         if( time > cleanNodes.time + timeout )
     349             :         {
     350           0 :             _impl->enteredNodes.erase( i );
     351           0 :             return;
     352             :         }
     353             :     }
     354             : }
     355             : 
     356          31 : bool Barrier::_cmdEnterReply( ICommand& cmd )
     357             : {
     358          62 :     ObjectICommand command( cmd );
     359          31 :     LB_TS_THREAD( _thread );
     360          31 :     LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
     361          31 :     const uint128_t version = command.get< uint128_t >();
     362             : 
     363          31 :     if( version == getVersion( ))
     364          31 :         ++_impl->incarnation;
     365             : 
     366          62 :     return true;
     367             : }
     368             : 
     369          66 : }

Generated by: LCOV version 1.11