LCOV - code coverage report
Current view: top level - co - barrier.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 140 169 82.8 %
Date: 2018-01-09 16:37:03 Functions: 23 24 95.8 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2006-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "barrier.h"
      23             : 
      24             : #include "barrierCommand.h"
      25             : #include "connection.h"
      26             : #include "dataIStream.h"
      27             : #include "dataOStream.h"
      28             : #include "exception.h"
      29             : #include "global.h"
      30             : #include "iCommand.h"
      31             : #include "log.h"
      32             : #include "objectICommand.h"
      33             : #include "objectOCommand.h"
      34             : 
      35             : #include <lunchbox/monitor.h>
      36             : 
      37             : #include <unordered_map>
      38             : 
      39             : namespace co
      40             : {
      41             : namespace
      42             : {
      43         109 : struct Request
      44             : {
      45         109 :     Request()
      46         109 :         : time(0)
      47             :         , timeout(LB_TIMEOUT_INDEFINITE)
      48         109 :         , incarnation(0)
      49             :     {
      50         109 :     }
      51             :     uint64_t time;
      52             :     uint32_t timeout;
      53             :     uint32_t incarnation;
      54             :     Nodes nodes;
      55             : };
      56             : 
      57             : typedef std::unordered_map<uint128_t, Request> RequestMap;
      58             : typedef RequestMap::iterator RequestMapIter;
      59             : }
      60             : 
      61             : namespace detail
      62             : {
      63          31 : class Barrier
      64             : {
      65             : public:
      66          26 :     Barrier()
      67          26 :         : height(0)
      68             :     {
      69          26 :     }
      70           5 :     Barrier(const uint128_t& masterID_, const uint32_t height_)
      71           5 :         : masterID(masterID_)
      72           5 :         , height(height_)
      73             :     {
      74           5 :     }
      75             : 
      76             :     /** The master barrier node. */
      77             :     NodeID masterID;
      78             : 
      79             :     /** The height of the barrier, only set on the master. */
      80             :     uint32_t height;
      81             : 
      82             :     /** The local, connected instantiation of the master node. */
      83             :     NodePtr master;
      84             : 
      85             :     /** Slave nodes which have entered the barrier, index per version. */
      86             :     RequestMap enteredNodes;
      87             : 
      88             :     /** The monitor used for barrier leave notification. */
      89             :     lunchbox::Monitor<uint32_t> incarnation;
      90             : };
      91             : }
      92             : 
      93             : typedef CommandFunc<Barrier> CmdFunc;
      94             : 
      95           5 : Barrier::Barrier(LocalNodePtr localNode, const uint128_t& masterNodeID,
      96           5 :                  const uint32_t height)
      97           5 :     : _impl(new detail::Barrier(masterNodeID.isUUID() ? masterNodeID
      98           0 :                                                       : localNode->getNodeID(),
      99          10 :                                 height))
     100             : {
     101           5 :     localNode->registerObject(this);
     102           5 : }
     103             : 
     104          26 : Barrier::Barrier(LocalNodePtr localNode, const ObjectVersion& id)
     105          26 :     : _impl(new detail::Barrier)
     106             : {
     107          26 :     localNode->mapObject(this, id);
     108          26 : }
     109             : 
     110          90 : Barrier::~Barrier()
     111             : {
     112          62 :     LocalNodePtr localNode = getLocalNode();
     113          31 :     if (localNode)
     114           1 :         localNode->releaseObject(this);
     115             : 
     116          31 :     delete _impl;
     117          59 : }
     118             : 
     119             : //---------------------------------------------------------------------------
     120             : // Serialization
     121             : //---------------------------------------------------------------------------
     122         106 : void Barrier::getInstanceData(DataOStream& os)
     123             : {
     124         106 :     LBASSERT(_impl->masterID != NodeID());
     125         106 :     os << _impl->height << _impl->masterID;
     126         106 : }
     127             : 
     128          26 : void Barrier::applyInstanceData(DataIStream& is)
     129             : {
     130          26 :     is >> _impl->height >> _impl->masterID;
     131          26 : }
     132             : 
     133         101 : void Barrier::pack(DataOStream& os)
     134             : {
     135         101 :     os << _impl->height;
     136         101 : }
     137             : 
     138         101 : void Barrier::unpack(DataIStream& is)
     139             : {
     140         101 :     is >> _impl->height;
     141         101 : }
     142             : //---------------------------------------------------------------------------
     143             : 
     144           1 : void Barrier::setHeight(const uint32_t height)
     145             : {
     146           1 :     _impl->height = height;
     147           1 : }
     148             : 
     149           0 : void Barrier::increase()
     150             : {
     151           0 :     ++_impl->height;
     152           0 : }
     153             : 
     154          28 : uint32_t Barrier::getHeight() const
     155             : {
     156          28 :     return _impl->height;
     157             : }
     158             : 
     159          31 : void Barrier::attach(const uint128_t& id, const uint32_t instanceID)
     160             : {
     161          31 :     Object::attach(id, instanceID);
     162             : 
     163          62 :     LocalNodePtr node = getLocalNode();
     164          31 :     CommandQueue* queue = node->getCommandThreadQueue();
     165             : 
     166          62 :     registerCommand(CMD_BARRIER_ENTER, CmdFunc(this, &Barrier::_cmdEnter),
     167          31 :                     queue);
     168          31 :     registerCommand(CMD_BARRIER_ENTER_REPLY,
     169          62 :                     CmdFunc(this, &Barrier::_cmdEnterReply), queue);
     170             : 
     171             : #ifdef COLLAGE_V1_API
     172          31 :     if (_impl->masterID == NodeID())
     173          26 :         _impl->masterID = node->getNodeID();
     174             : #else
     175             :     LBASSERT(_impl->masterID == NodeID());
     176             : #endif
     177          31 : }
     178             : 
     179         352 : bool Barrier::enter(const uint32_t timeout)
     180             : {
     181         352 :     LBASSERT(_impl->height > 0);
     182         352 :     LBASSERT(_impl->masterID != NodeID());
     183             : 
     184         352 :     if (_impl->height == 1) // trivial ;)
     185           0 :         return true;
     186             : 
     187         352 :     if (!_impl->master)
     188             :     {
     189          66 :         LocalNodePtr localNode = getLocalNode();
     190          33 :         _impl->master = localNode->connect(_impl->masterID);
     191             :     }
     192             : 
     193         352 :     LBASSERT(_impl->master);
     194         352 :     LBASSERT(_impl->master->isReachable());
     195         352 :     if (!_impl->master || !_impl->master->isReachable())
     196             :     {
     197           0 :         LBWARN << "Can't connect barrier master node " << _impl->masterID
     198           0 :                << std::endl;
     199           0 :         return false;
     200             :     }
     201             : 
     202         704 :     LBLOG(LOG_BARRIER) << "enter barrier " << getID() << " v" << getVersion()
     203        1056 :                        << ", height " << _impl->height << std::endl;
     204             : 
     205         352 :     const uint32_t leaveVal = _impl->incarnation.get() + 1;
     206             : 
     207         703 :     send(_impl->master, CMD_BARRIER_ENTER) << getVersion() << leaveVal - 1
     208         349 :                                            << timeout;
     209             : 
     210         352 :     if (timeout == LB_TIMEOUT_INDEFINITE)
     211         305 :         _impl->incarnation.waitEQ(leaveVal);
     212          47 :     else if (!_impl->incarnation.timedWaitEQ(leaveVal, timeout))
     213          23 :         return false;
     214             : 
     215         658 :     LBLOG(LOG_BARRIER) << "left barrier " << getID() << " v" << getVersion()
     216         987 :                        << ", height " << _impl->height << std::endl;
     217         329 :     return true;
     218             : }
     219             : 
     220         352 : bool Barrier::_cmdEnter(ICommand& cmd)
     221             : {
     222         352 :     LB_TS_THREAD(_thread);
     223         352 :     LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
     224             :                  _impl->master);
     225             : 
     226         704 :     ObjectICommand command(cmd);
     227         352 :     const uint128_t version = command.get<uint128_t>();
     228         352 :     const uint32_t incarnation = command.get<uint32_t>();
     229         352 :     const uint32_t timeout = command.get<uint32_t>();
     230             : 
     231         352 :     LBLOG(LOG_BARRIER) << "handle barrier enter " << command << " v" << version
     232         704 :                        << " barrier v" << getVersion() << std::endl;
     233             : 
     234         352 :     Request& request = _impl->enteredNodes[version];
     235             : 
     236         352 :     LBLOG(LOG_BARRIER) << "enter barrier v" << version << ", has "
     237           0 :                        << request.nodes.size() << " of " << _impl->height
     238         352 :                        << std::endl;
     239             : 
     240         352 :     request.time = getLocalNode()->getTime64();
     241             : 
     242             :     // It's the first call to enter barrier
     243         352 :     if (request.nodes.empty())
     244             :     {
     245         109 :         request.incarnation = incarnation;
     246         109 :         request.timeout = timeout;
     247             :     }
     248         243 :     else if (request.timeout != LB_TIMEOUT_INDEFINITE)
     249             :     {
     250             :         // the incarnation belongs to an older barrier
     251          40 :         if (request.incarnation < incarnation)
     252             :         {
     253             :             // send directly the reply command to unblock the caller
     254           0 :             _sendNotify(version, command.getNode());
     255           0 :             return true;
     256             :         }
     257             :         // the previous enter had a timeout, start a new synchronization
     258             :         //  (same version means same group -> no member can run ahead)
     259          40 :         else if (request.incarnation > incarnation)
     260             :         {
     261           0 :             request.nodes.clear();
     262           0 :             request.incarnation = incarnation;
     263           0 :             request.timeout = timeout;
     264             :         }
     265             :     }
     266         352 :     request.nodes.push_back(command.getNode());
     267             : 
     268             :     // clean older data which was not removed during older synchronization
     269         352 :     if (request.timeout != LB_TIMEOUT_INDEFINITE)
     270          47 :         _cleanup(request.time);
     271             : 
     272             :     // If we got early entry requests for this barrier, just note their
     273             :     // appearance. This requires that another request for the later version
     274             :     // arrives once the barrier reaches this version. The only case when this is
     275             :     // not the case is when no contributor to the current version contributes to
     276             :     // the later version, in which case deadlocks might happen because the later
     277             :     // version never leaves the barrier. We simply assume this is not the case.
     278         352 :     if (version > getVersion())
     279           0 :         return true;
     280             : 
     281             :     // If it's an older version a timeout has been handled.
     282             :     // For performance, send directly the order to unblock the caller.
     283         352 :     if (timeout != LB_TIMEOUT_INDEFINITE && version < getVersion())
     284             :     {
     285           0 :         LBASSERT(incarnation == 0);
     286           0 :         _sendNotify(version, command.getNode());
     287           0 :         return true;
     288             :     }
     289             : 
     290         352 :     LBASSERTINFO(version == getVersion(),
     291             :                  "Barrier master updated to new version while in barrier "
     292             :                      << getID() << " (" << version << " != " << getVersion()
     293             :                      << ")");
     294             : 
     295         352 :     Nodes& nodes = request.nodes;
     296         352 :     if (nodes.size() < _impl->height)
     297         245 :         return true;
     298             : 
     299         107 :     LBASSERT(nodes.size() == _impl->height);
     300         107 :     LBLOG(LOG_BARRIER) << "Barrier reached " << getID() << " v" << version
     301         107 :                        << std::endl;
     302             : 
     303         107 :     lunchbox::usort(nodes);
     304             : 
     305         242 :     for (NodesIter i = nodes.begin(); i != nodes.end(); ++i)
     306         135 :         _sendNotify(version, *i);
     307             : 
     308             :     // delete node vector for version
     309         107 :     _impl->enteredNodes.erase(version);
     310         107 :     return true;
     311             : }
     312             : 
     313         135 : void Barrier::_sendNotify(const uint128_t& version, NodePtr node)
     314             : {
     315         135 :     LB_TS_THREAD(_thread);
     316         135 :     LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
     317             :                  _impl->master);
     318             : 
     319         135 :     if (node->isLocal()) // OPT
     320             :     {
     321         104 :         LBLOG(LOG_BARRIER) << "Unlock local user(s)" << std::endl;
     322             :         // the case where we receive a different version of the barrier meant
     323             :         // that previosly we have detect a timeout true negative
     324         104 :         if (version == getVersion())
     325         104 :             ++_impl->incarnation;
     326             :     }
     327             :     else
     328             :     {
     329          31 :         LBLOG(LOG_BARRIER) << "Unlock " << node << std::endl;
     330          31 :         send(node, CMD_BARRIER_ENTER_REPLY) << version;
     331             :     }
     332         135 : }
     333             : 
     334          47 : void Barrier::_cleanup(const uint64_t time)
     335             : {
     336          47 :     LB_TS_THREAD(_thread);
     337          47 :     LBASSERTINFO(!_impl->master || _impl->master == getLocalNode(),
     338             :                  _impl->master);
     339             : 
     340          47 :     if (_impl->enteredNodes.size() < 2)
     341          47 :         return;
     342             : 
     343           0 :     for (RequestMapIter i = _impl->enteredNodes.begin();
     344           0 :          i != _impl->enteredNodes.end(); ++i)
     345             :     {
     346           0 :         Request& cleanNodes = i->second;
     347             : 
     348           0 :         if (cleanNodes.timeout == LB_TIMEOUT_INDEFINITE)
     349           0 :             continue;
     350             : 
     351             :         const uint32_t timeout =
     352           0 :             cleanNodes.timeout != LB_TIMEOUT_DEFAULT
     353           0 :                 ? cleanNodes.timeout
     354           0 :                 : Global::getIAttribute(Global::IATTR_TIMEOUT_DEFAULT);
     355             : 
     356           0 :         if (time > cleanNodes.time + timeout)
     357             :         {
     358           0 :             _impl->enteredNodes.erase(i);
     359           0 :             return;
     360             :         }
     361             :     }
     362             : }
     363             : 
     364          31 : bool Barrier::_cmdEnterReply(ICommand& cmd)
     365             : {
     366          62 :     ObjectICommand command(cmd);
     367          31 :     LB_TS_THREAD(_thread);
     368          31 :     LBLOG(LOG_BARRIER) << "Got ok, unlock local user(s)" << std::endl;
     369          31 :     const uint128_t version = command.get<uint128_t>();
     370             : 
     371          31 :     if (version == getVersion())
     372          31 :         ++_impl->incarnation;
     373             : 
     374          62 :     return true;
     375             : }
     376          63 : }

Generated by: LCOV version 1.11