LCOV - code coverage report
Current view: top level - co - barrier.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 133 162 82.1 %
Date: 2015-11-03 13:48:53 Functions: 23 24 95.8 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2006-2014, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                    2011, Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *               2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "barrier.h"
      23             : 
      24             : #include "iCommand.h"
      25             : #include "connection.h"
      26             : #include "dataIStream.h"
      27             : #include "dataOStream.h"
      28             : #include "global.h"
      29             : #include "log.h"
      30             : #include "objectICommand.h"
      31             : #include "objectOCommand.h"
      32             : #include "barrierCommand.h"
      33             : #include "exception.h"
      34             : 
      35             : #include <lunchbox/monitor.h>
      36             : #include <lunchbox/stdExt.h>
      37             : 
      38             : namespace co
      39             : {
      40             : namespace
      41             : {
      42         109 : struct Request
      43             : {
      44         109 :     Request()
      45         109 :             : time( 0 ), timeout( LB_TIMEOUT_INDEFINITE ), incarnation( 0 ) {}
      46             :     uint64_t time;
      47             :     uint32_t timeout;
      48             :     uint32_t incarnation;
      49             :     Nodes nodes;
      50             : };
      51             : 
      52             : typedef stde::hash_map< uint128_t, Request > RequestMap;
      53             : typedef RequestMap::iterator RequestMapIter;
      54             : }
      55             : 
      56             : namespace detail
      57             : {
      58          31 : class Barrier
      59             : {
      60             : public:
      61          26 :     Barrier() : height( 0 ) {}
      62           5 :     Barrier( const uint128_t& masterID_, const uint32_t height_ )
      63             :         : masterID( masterID_ )
      64           5 :         , height( height_ )
      65           5 :     {}
      66             : 
      67             :     /** The master barrier node. */
      68             :     NodeID masterID;
      69             : 
      70             :     /** The height of the barrier, only set on the master. */
      71             :     uint32_t height;
      72             : 
      73             :     /** The local, connected instantiation of the master node. */
      74             :     NodePtr master;
      75             : 
      76             :     /** Slave nodes which have entered the barrier, index per version. */
      77             :     RequestMap enteredNodes;
      78             : 
      79             :     /** The monitor used for barrier leave notification. */
      80             :     lunchbox::Monitor< uint32_t > incarnation;
      81             : };
      82             : }
      83             : 
      84             : typedef CommandFunc<Barrier> CmdFunc;
      85             : 
      86           5 : Barrier::Barrier( LocalNodePtr localNode, const uint128_t& masterNodeID,
      87             :                   const uint32_t height )
      88           5 :     : _impl( new detail::Barrier( masterNodeID.isUUID() ?
      89           0 :                                   masterNodeID : localNode->getNodeID(),
      90          10 :                                   height ))
      91             : {
      92           5 :     localNode->registerObject( this );
      93           5 : }
      94             : 
      95          26 : Barrier::Barrier( LocalNodePtr localNode, const ObjectVersion& id )
      96          26 :     : _impl( new detail::Barrier )
      97             : {
      98          26 :     localNode->mapObject( this, id );
      99          26 : }
     100             : 
     101          90 : Barrier::~Barrier()
     102             : {
     103          31 :     LocalNodePtr localNode = getLocalNode();
     104          31 :     if( localNode )
     105           1 :         localNode->releaseObject( this );
     106             : 
     107          31 :     delete _impl;
     108          59 : }
     109             : 
     110             : //---------------------------------------------------------------------------
     111             : // Serialization
     112             : //---------------------------------------------------------------------------
     113         109 : void Barrier::getInstanceData( DataOStream& os )
     114             : {
     115         109 :     LBASSERT( _impl->masterID != NodeID( ));
     116         109 :     os << _impl->height << _impl->masterID;
     117         109 : }
     118             : 
     119          26 : void Barrier::applyInstanceData( DataIStream& is )
     120             : {
     121          26 :     is >> _impl->height >> _impl->masterID;
     122          26 : }
     123             : 
     124         104 : void Barrier::pack( DataOStream& os )
     125             : {
     126         104 :     os << _impl->height;
     127         104 : }
     128             : 
     129         101 : void Barrier::unpack( DataIStream& is )
     130             : {
     131         101 :     is >> _impl->height;
     132         101 : }
     133             : //---------------------------------------------------------------------------
     134             : 
     135           1 : void Barrier::setHeight( const uint32_t height )
     136             : {
     137           1 :     _impl->height = height;
     138           1 : }
     139             : 
     140           0 : void Barrier::increase()
     141             : {
     142           0 :     ++_impl->height;
     143           0 : }
     144             : 
     145          28 : uint32_t Barrier::getHeight() const
     146             : {
     147          28 :     return _impl->height;
     148             : }
     149             : 
     150          31 : void Barrier::attach( const uint128_t& id, const uint32_t instanceID )
     151             : {
     152          31 :     Object::attach( id, instanceID );
     153             : 
     154          31 :     LocalNodePtr node = getLocalNode();
     155          31 :     CommandQueue* queue = node->getCommandThreadQueue();
     156             : 
     157             :     registerCommand( CMD_BARRIER_ENTER,
     158          31 :                      CmdFunc( this, &Barrier::_cmdEnter ), queue );
     159             :     registerCommand( CMD_BARRIER_ENTER_REPLY,
     160          31 :                      CmdFunc( this, &Barrier::_cmdEnterReply ), queue );
     161             : 
     162             : #ifdef COLLAGE_V1_API
     163          31 :     if( _impl->masterID == NodeID( ))
     164          26 :         _impl->masterID = node->getNodeID();
     165             : #else
     166             :     LBASSERT( _impl->masterID == NodeID( ));
     167             : #endif
     168          31 : }
     169             : 
     170         352 : bool Barrier::enter( const uint32_t timeout )
     171             : {
     172         352 :     LBASSERT( _impl->height > 0 );
     173         352 :     LBASSERT( _impl->masterID != NodeID( ));
     174             : 
     175         352 :     if( _impl->height == 1 ) // trivial ;)
     176           0 :         return true;
     177             : 
     178         352 :     if( !_impl->master )
     179             :     {
     180          33 :         LocalNodePtr localNode = getLocalNode();
     181          33 :         _impl->master = localNode->connect( _impl->masterID );
     182             :     }
     183             : 
     184         352 :     LBASSERT( _impl->master );
     185         352 :     LBASSERT( _impl->master->isReachable( ));
     186         352 :     if( !_impl->master || !_impl->master->isReachable( ))
     187             :     {
     188           0 :         LBWARN << "Can't connect barrier master node " << _impl->masterID
     189           0 :                << std::endl;
     190           0 :         return false;
     191             :     }
     192             : 
     193         704 :     LBLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
     194        1056 :                          << ", height " << _impl->height << std::endl;
     195             : 
     196         352 :     const uint32_t leaveVal = _impl->incarnation.get() + 1;
     197             : 
     198             :     send( _impl->master, CMD_BARRIER_ENTER )
     199         352 :         << getVersion() << leaveVal - 1 << timeout;
     200             : 
     201         352 :     if( timeout == LB_TIMEOUT_INDEFINITE )
     202         305 :         _impl->incarnation.waitEQ( leaveVal );
     203          47 :     else if( !_impl->incarnation.timedWaitEQ( leaveVal, timeout ))
     204          23 :         return false;
     205             : 
     206         658 :     LBLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
     207         987 :                          << ", height " << _impl->height << std::endl;
     208         329 :     return true;
     209             : }
     210             : 
     211         352 : bool Barrier::_cmdEnter( ICommand& cmd )
     212             : {
     213         352 :     LB_TS_THREAD( _thread );
     214         352 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     215             :                   _impl->master );
     216             : 
     217         352 :     ObjectICommand command( cmd );
     218         352 :     const uint128_t version = command.get< uint128_t >();
     219         352 :     const uint32_t incarnation = command.get< uint32_t >();
     220         352 :     const uint32_t timeout = command.get< uint32_t >();
     221             : 
     222         352 :     LBLOG( LOG_BARRIER ) << "handle barrier enter " << command
     223           0 :                          << " v" << version
     224         704 :                          << " barrier v" << getVersion() << std::endl;
     225             : 
     226         352 :     Request& request = _impl->enteredNodes[ version ];
     227             : 
     228         352 :     LBLOG( LOG_BARRIER ) << "enter barrier v" << version
     229           0 :                          << ", has " << request.nodes.size() << " of "
     230         352 :                          << _impl->height << std::endl;
     231             : 
     232         352 :     request.time = getLocalNode()->getTime64();
     233             : 
     234             :     // It's the first call to enter barrier
     235         352 :     if( request.nodes.empty( ))
     236             :     {
     237         109 :         request.incarnation = incarnation;
     238         109 :         request.timeout = timeout;
     239             :     }
     240         243 :     else if( request.timeout != LB_TIMEOUT_INDEFINITE )
     241             :     {
     242             :         // the incarnation belongs to an older barrier
     243          40 :         if( request.incarnation < incarnation )
     244             :         {
     245             :             // send directly the reply command to unblock the caller
     246           0 :             _sendNotify( version, command.getNode( ));
     247           0 :             return true;
     248             :         }
     249             :         // the previous enter had a timeout, start a new synchronization
     250             :         //  (same version means same group -> no member can run ahead)
     251          40 :         else if( request.incarnation > incarnation )
     252             :         {
     253           0 :             request.nodes.clear();
     254           0 :             request.incarnation = incarnation;
     255           0 :             request.timeout = timeout;
     256             :         }
     257             :     }
     258         352 :     request.nodes.push_back( command.getNode( ));
     259             : 
     260             :     // clean older data which was not removed during older synchronization
     261         352 :     if( request.timeout != LB_TIMEOUT_INDEFINITE )
     262          47 :         _cleanup( request.time );
     263             : 
     264             :     // If we got early entry requests for this barrier, just note their
     265             :     // appearance. This requires that another request for the later version
     266             :     // arrives once the barrier reaches this version. The only case when this is
     267             :     // not the case is when no contributor to the current version contributes to
     268             :     // the later version, in which case deadlocks might happen because the later
     269             :     // version never leaves the barrier. We simply assume this is not the case.
     270         352 :     if( version > getVersion( ))
     271           0 :         return true;
     272             : 
     273             :     // If it's an older version a timeout has been handled.
     274             :     // For performance, send directly the order to unblock the caller.
     275         352 :     if( timeout != LB_TIMEOUT_INDEFINITE && version < getVersion( ))
     276             :     {
     277           0 :         LBASSERT( incarnation == 0 );
     278           0 :         _sendNotify( version, command.getNode( ) );
     279           0 :         return true;
     280             :     }
     281             : 
     282         352 :     LBASSERTINFO( version == getVersion(),
     283             :                   "Barrier master updated to new version while in barrier " <<
     284             :                   getID() << " (" << version << " != " << getVersion() << ")" );
     285             : 
     286         352 :     Nodes& nodes = request.nodes;
     287         352 :     if( nodes.size() < _impl->height )
     288         245 :         return true;
     289             : 
     290         107 :     LBASSERT( nodes.size() == _impl->height );
     291         107 :     LBLOG( LOG_BARRIER ) << "Barrier reached " << getID() << " v" << version
     292         107 :                          << std::endl;
     293             : 
     294         107 :     stde::usort( nodes );
     295             : 
     296         242 :     for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
     297         135 :         _sendNotify( version, *i );
     298             : 
     299             :     // delete node vector for version
     300         107 :     RequestMapIter i = _impl->enteredNodes.find( version );
     301         107 :     LBASSERT( i != _impl->enteredNodes.end( ));
     302         107 :     _impl->enteredNodes.erase( i );
     303         107 :     return true;
     304             : }
     305             : 
     306         135 : void Barrier::_sendNotify( const uint128_t& version, NodePtr node )
     307             : {
     308         135 :     LB_TS_THREAD( _thread );
     309         135 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     310             :                   _impl->master );
     311             : 
     312         135 :     if( node->isLocal( )) // OPT
     313             :     {
     314         104 :         LBLOG( LOG_BARRIER ) << "Unlock local user(s)" << std::endl;
     315             :         // the case where we receive a different version of the barrier meant
     316             :         // that previosly we have detect a timeout true negative
     317         104 :         if( version == getVersion() )
     318         104 :             ++_impl->incarnation;
     319             :     }
     320             :     else
     321             :     {
     322          31 :         LBLOG( LOG_BARRIER ) << "Unlock " << node << std::endl;
     323          31 :         send( node, CMD_BARRIER_ENTER_REPLY ) << version;
     324             :     }
     325         135 : }
     326             : 
     327          47 : void Barrier::_cleanup( const uint64_t time )
     328             : {
     329          47 :     LB_TS_THREAD( _thread );
     330          47 :     LBASSERTINFO( !_impl->master || _impl->master == getLocalNode(),
     331             :                   _impl->master );
     332             : 
     333          47 :     if( _impl->enteredNodes.size() < 2 )
     334          47 :         return;
     335             : 
     336           0 :     for( RequestMapIter i = _impl->enteredNodes.begin();
     337           0 :          i != _impl->enteredNodes.end(); ++i )
     338             :     {
     339           0 :         Request& cleanNodes = i->second;
     340             : 
     341           0 :         if( cleanNodes.timeout == LB_TIMEOUT_INDEFINITE )
     342           0 :             continue;
     343             : 
     344           0 :         const uint32_t timeout = cleanNodes.timeout != LB_TIMEOUT_DEFAULT ?
     345             :                         cleanNodes.timeout :
     346           0 :                         Global::getIAttribute( Global::IATTR_TIMEOUT_DEFAULT );
     347             : 
     348           0 :         if( time > cleanNodes.time + timeout )
     349             :         {
     350           0 :             _impl->enteredNodes.erase( i );
     351           0 :             return;
     352             :         }
     353             :     }
     354             : }
     355             : 
     356          31 : bool Barrier::_cmdEnterReply( ICommand& cmd )
     357             : {
     358          31 :     ObjectICommand command( cmd );
     359          31 :     LB_TS_THREAD( _thread );
     360          31 :     LBLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << std::endl;
     361          31 :     const uint128_t version = command.get< uint128_t >();
     362             : 
     363          31 :     if( version == getVersion( ))
     364          31 :         ++_impl->incarnation;
     365             : 
     366          31 :     return true;
     367             : }
     368             : 
     369          63 : }

Generated by: LCOV version 1.11