LCOV - code coverage report
Current view: top level - co - barrier.cpp (source / functions) Hit Total Coverage
Test: lcov2.info Lines: 132 165 80.0 %
Date: 2014-10-06 Functions: 23 25 92.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2011, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "barrier.h"
      23             : 
      24             : #include "iCommand.h"
      25             : #include "connection.h"
      26             : #include "dataIStream.h"
      27             : #include "dataOStream.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "objectICommand.h"
      31             : #include "objectOCommand.h"
      32             : #include "barrierCommand.h"
      33             : #include "exception.h"
      34             : 
      35             : #include <lunchbox/monitor.h>
      36             : #include <lunchbox/stdExt.h>
      37             : 
      38             : namespace co
      39             : {
      40             : namespace
      41             : {
      42         109 : struct Request
      43             : {
      44         109 :     Request()
      45         109 :             : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
      46             :     uint64_t time;
      47             :     uint32_t timeout;
      48             :     uint32_t incarnation;
      49             :     Nodes nodes;
      50             : };
      51             : 
      52             : typedef stde::hash_map< uint128_t, Request > RequestMap;
      53             : typedef RequestMap::iterator RequestMapIter;
      54             : }
      55             : 
      56             : namespace detail
      57             : {
      58          31 : class Barrier
      59             : {
      60             : public:
      61          26 :     Barrier() : height( 0 ) {}
      62           5 :     Barrier( const uint128_t& masterID_, const uint32_t height_ )
      63             :         : masterID( masterID_ )
      64           5 :         , height( height_ )
      65           5 :     {}
      66             : 
      67             :     /** The master barrier node. */
      68             :     NodeID masterID;
      69             : 
      70             :     /** The height of the barrier, only set on the master. */
      71             :     uint32_t height;
      72             : 
      73             :     /** The local, connected instantiation of the master node. */
      74             :     NodePtr master;
      75             : 
      76             :     /** Slave nodes which have entered the barrier, index per version. */
      77             :     RequestMap enteredNodes;
      78             : 
      79             :     /** The monitor used for barrier leave notification. */
      80             :     lunchbox::Monitor< uint32_t > incarnation;
      81             : };
      82             : }
      83             : 
      84             : typedef CommandFunc<Barrier> CmdFunc;
      85             : 
      86             : #ifdef COLLAGE_V1_API
      87           0 : Barrier::Barrier( NodePtr master, const uint32_t height )
      88           0 :     : _impl( new detail::Barrier( master ? master->getNodeID() : NodeID(),
      89           0 :                                   height ))
      90           0 : {}
      91             : #endif
      92             : 
      93           5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
      94             :                   const uint32_t height )
      95           5 :     : _impl( new detail::Barrier( masterNodeID.isUUID() ?
      96           0 :                                   masterNodeID : localNode->getNodeID(),
      97          10 :                                   height ))
      98             : {
      99           5 :     localNode->registerObject( this );
     100           5 : }
     101             : 
     102          26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
     103          26 :     : _impl( new detail::Barrier )
     104             : {
     105          26 :     localNode->mapObject( this, id );
     106          26 : }
     107             : 
     108          90 : Barrier::~Barrier()
     109             : {
     110          31 :     LocalNodePtr localNode = getLocalNode();
     111          31 :     if( localNode )
     112           1 :         localNode->releaseObject( this );
     113             : 
     114          31 :     delete _impl;
     115          59 : }
     116             : 
     117             : //---------------------------------------------------------------------------
     118             : // Serialization
     119             : //---------------------------------------------------------------------------
     120         109 : void Barrier::getInstanceData( DataOStream& os )
     121             : {
     122         109 :     LBASSERT( _impl->masterID != NodeID( ));
     123         109 :     os << _impl->height << _impl->masterID;
     124         109 : }
     125             : 
     126          26 : void Barrier::applyInstanceData( DataIStream& is )
     127             : {
     128          26 :     is >> _impl->height >> _impl->masterID;
     129          26 : }
     130             : 
     131         104 : void Barrier::pack( DataOStream& os )
     132             : {
     133         104 :     os << _impl->height;
     134         104 : }
     135             : 
     136         101 : void Barrier::unpack( DataIStream& is )
     137             : {
     138         101 :     is >> _impl->height;
     139         101 : }
     140             : //---------------------------------------------------------------------------
     141             : 
     142           1 : void Barrier::setHeight( const uint32_t height )
     143             : {
     144           1 :     _impl->height = height;
     145           1 : }
     146             : 
     147           0 : void Barrier::increase()
     148             : {
     149           0 :     ++_impl->height;
     150           0 : }
     151             : 
     152          28 : uint32_t Barrier::getHeight() const
     153             : {
     154          28 :     return _impl->height;
     155             : }
     156             : 
     157          31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
     158             : {
     159          31 :     Object::attach( id, instanceID );
     160             : 
     161          31 :     LocalNodePtr node = getLocalNode();
     162          31 :     CommandQueue* queue = node->getCommandThreadQueue();
     163             : 
     164             :     registerCommand( CMD_BARRIER_ENTER,
     165          31 :                      CmdFunc( this, &Barrier::_cmdEnter ), queue );
     166             :     registerCommand( CMD_BARRIER_ENTER_REPLY,
     167          31 :                      CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
     168             : 
     169             : #ifdef COLLAGE_V1_API
     170          31 :     if( _impl->masterID == NodeID( ))
     171          26 :         _impl->masterID = node->getNodeID();
     172             : #else
     173             :     LBASSERT( _impl->masterID == NodeID( ));
     174             : #endif
     175          31 : }
     176             : 
     177         352 : void Barrier::enter( const uint32_t timeout )
     178             : {
     179         352 :     LBASSERT( _impl->height > 0 );
     180         352 :     LBASSERT( _impl->masterID != NodeID( ));
     181             : 
     182         352 :     if( _impl->height == 1 ) // trivial ;)
     183           0 :         return;
     184             : 
     185         352 :     if( !_impl->master )
     186             :     {
     187          32 :         LocalNodePtr localNode = getLocalNode();
     188          32 :         _impl->master = localNode->connect( _impl->masterID );
     189             :     }
     190             : 
     191         352 :     LBASSERT( _impl->master );
     192         352 :     LBASSERT( _impl->master->isReachable( ));
     193         352 :     if( !_impl->master || !_impl->master->isReachable( ))
     194             :     {
     195           0 :         LBWARN << "Can't connect barrier master node " << _impl->masterID
     196           0 :                << std::endl;
     197           0 :         return;
     198             :     }
     199             : 
     200         704 :     LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
     201        1056 :                          << ", height " << _impl->height << std::endl;
     202             : 
     203         352 :     const uint32_t leaveVal = _impl->incarnation.get() + 1;
     204             : 
     205             :     send( _impl->master, CMD_BARRIER_ENTER )
     206         352 :         << getVersion() << leaveVal - 1 << timeout;
     207             : 
     208         352 :     if( timeout == LB_TIMEOUT_INDEFINITE )
     209         305 :         _impl->incarnation.waitEQ( leaveVal );
     210          47 :     else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
     211          23 :         throw Exception( Exception::TIMEOUT_BARRIER );
     212             : 
     213         654 :     LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
     214         981 :                          << ", height " << _impl->height << std::endl;
     215             : }
     216             : 
     217         352 : bool Barrier::_cmdEnter( ICommand& cmd )
     218             : {
     219         352 :     LB_TS_THREAD( _thread );
     220         352 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     221             :                   _impl->master );
     222             : 
     223         352 :     ObjectICommand command( cmd );
     224         352 :     const uint128_t version = command.get< uint128_t >();
     225         352 :     const uint32_t incarnation = command.get< uint32_t >();
     226         352 :     const uint32_t timeout = command.get< uint32_t >();
     227             : 
     228         352 :     LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
     229           0 :                          << " v" << version
     230         704 :                          << " barrier v" << getVersion() << std::endl;
     231             : 
     232         352 :     Request& request = _impl->enteredNodes[ version ];
     233             : 
     234         352 :     LBLOG( LOG_BARRIER ) << "enter barrier v" << version
     235           0 :                          << ", has " << request.nodes.size() << " of "
     236         352 :                          << _impl->height << std::endl;
     237             : 
     238         352 :     request.time = getLocalNode()->getTime64();
     239             : 
     240             :     // It's the first call to enter barrier
     241         352 :     if( request.nodes.empty( ))
     242             :     {
     243         109 :         request.incarnation = incarnation;
     244         109 :         request.timeout = timeout;
     245             :     }
     246         243 :     else if( request.timeout != LB_TIMEOUT_INDEFINITE )
     247             :     {
     248             :         // the incarnation belongs to an older barrier
     249          40 :         if( request.incarnation < incarnation )
     250             :         {
     251             :             // send directly the reply command to unblock the caller
     252           0 :             _sendNotify( version, command.getNode( ));
     253           0 :             return true;
     254             :         }
     255             :         // the previous enter had a timeout, start a new synchronization
     256             :         //  (same version means same group -> no member can run ahead)
     257          40 :         else if( request.incarnation > incarnation )
     258             :         {
     259           0 :             request.nodes.clear();
     260           0 :             request.incarnation = incarnation;
     261           0 :             request.timeout = timeout;
     262             :         }
     263             :     }
     264         352 :     request.nodes.push_back( command.getNode( ));
     265             : 
     266             :     // clean older data which was not removed during older synchronization
     267         352 :     if( request.timeout != LB_TIMEOUT_INDEFINITE )
     268          47 :         _cleanup( request.time );
     269             : 
     270             :     // If we got early entry requests for this barrier, just note their
     271             :     // appearance. This requires that another request for the later version
     272             :     // arrives once the barrier reaches this version. The only case when this is
     273             :     // not the case is when no contributor to the current version contributes to
     274             :     // the later version, in which case deadlocks might happen because the later
     275             :     // version never leaves the barrier. We simply assume this is not the case.
     276         352 :     if( version > getVersion( ))
     277           0 :         return true;
     278             : 
     279             :     // If it's an older version a timeout has been handled.
     280             :     // For performance, send directly the order to unblock the caller.
     281         352 :     if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
     282             :     {
     283           0 :         LBASSERT( incarnation == 0 );
     284           0 :         _sendNotify( version, command.getNode( ) );
     285           0 :         return true;
     286             :     }
     287             : 
     288         352 :     LBASSERTINFO( version == getVersion(),
     289             :                   "Barrier master updated to new version while in barrier " <<
     290             :                   getID() << " (" << version << " != " << getVersion() << ")" );
     291             : 
     292         352 :     Nodes& nodes = request.nodes;
     293         352 :     if( nodes.size() < _impl->height )
     294         245 :         return true;
     295             : 
     296         107 :     LBASSERT( nodes.size() == _impl->height );
     297         107 :     LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
     298         107 :                          << std::endl;
     299             : 
     300         107 :     stde::usort( nodes );
     301             : 
     302         242 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     303         135 :         _sendNotify( version, *i );
     304             : 
     305             :     // delete node vector for version
     306         107 :     RequestMapIter i = _impl->enteredNodes.find( version );
     307         107 :     LBASSERT( i != _impl->enteredNodes.end( ));
     308         107 :     _impl->enteredNodes.erase( i );
     309         107 :     return true;
     310             : }
     311             : 
     312         135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
     313             : {
     314         135 :     LB_TS_THREAD( _thread );
     315         135 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     316             :                   _impl->master );
     317             : 
     318         135 :     if( node->isLocal( )) // OPT
     319             :     {
     320         104 :         LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
     321             :         // the case where we receive a different version of the barrier meant
     322             :         // that previosly we have detect a timeout true negative
     323         104 :         if( version == getVersion() )
     324         104 :             ++_impl->incarnation;
     325             :     }
     326             :     else
     327             :     {
     328          31 :         LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
     329          31 :         send( node, CMD_BARRIER_ENTER_REPLY ) << version;
     330             :     }
     331         135 : }
     332             : 
     333          47 : void Barrier::_cleanup( const uint64_t time )
     334             : {
     335          47 :     LB_TS_THREAD( _thread );
     336          47 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     337             :                   _impl->master );
     338             : 
     339          47 :     if( _impl->enteredNodes.size() < 2 )
     340          47 :         return;
     341             : 
     342           0 :     for( RequestMapIter i = _impl->enteredNodes.begin();
     343           0 :          i != _impl->enteredNodes.end(); ++i )
     344             :     {
     345           0 :         Request& cleanNodes = i->second;
     346             : 
     347           0 :         if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
     348           0 :             continue;
     349             : 
     350           0 :         const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
     351             :                         cleanNodes.timeout :
     352           0 :                         Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
     353             : 
     354           0 :         if( time > cleanNodes.time + timeout )
     355             :         {
     356           0 :             _impl->enteredNodes.erase( i );
     357           0 :             return;
     358             :         }
     359             :     }
     360             : }
     361             : 
     362          28 : bool Barrier::_cmdEnterReply( ICommand& cmd )
     363             : {
     364          28 :     ObjectICommand command( cmd );
     365          28 :     LB_TS_THREAD( _thread );
     366          28 :     LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
     367          28 :     const uint128_t version = command.get< uint128_t >();
     368             : 
     369          28 :     if( version == getVersion( ))
     370          29 :         ++_impl->incarnation;
     371             : 
     372          29 :     return true;
     373             : }
     374             : 
     375          60 : }

Generated by: LCOV version 1.10