LCOV - code coverage report
Current view: top level - co - queueSlave.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 41 46 89.1 %
Date: 2018-01-09 16:37:03 Functions: 10 10 100.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2011-2017, Stefan Eilemann <eile@eyescale.ch>
       3             :  *                          Carsten Rohn <carsten.rohn@rtt.ag>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #include "queueSlave.h"
      23             : 
      24             : #include "buffer.h"
      25             : #include "commandQueue.h"
      26             : #include "dataIStream.h"
      27             : #include "exception.h"
      28             : #include "global.h"
      29             : #include "objectICommand.h"
      30             : #include "objectOCommand.h"
      31             : #include "queueCommand.h"
      32             : 
      33             : namespace co
      34             : {
      35             : namespace detail
      36             : {
      37           1 : class QueueSlave
      38             : {
      39             : public:
      40           1 :     QueueSlave(const uint32_t mark, const uint32_t amount)
      41           1 :         : masterInstanceID(CO_INSTANCE_ALL)
      42             :         , prefetchMark(
      43             :               mark == LB_UNDEFINED_UINT32
      44           1 :                   ? Global::getIAttribute(Global::IATTR_TILE_QUEUE_MIN_SIZE)
      45             :                   : mark)
      46             :         , prefetchAmount(
      47             :               amount == LB_UNDEFINED_UINT32
      48           1 :                   ? Global::getIAttribute(Global::IATTR_TILE_QUEUE_REFILL)
      49           3 :                   : amount)
      50             :     {
      51           1 :     }
      52             : 
      53             :     co::CommandQueue queue;
      54             :     NodePtr master;
      55             :     uint32_t masterInstanceID;
      56             : 
      57             :     const uint32_t prefetchMark;
      58             :     const uint32_t prefetchAmount;
      59             : };
      60             : }
      61             : 
      62           1 : QueueSlave::QueueSlave(const uint32_t prefetchMark,
      63           1 :                        const uint32_t prefetchAmount)
      64           1 :     : _impl(new detail::QueueSlave(prefetchMark, prefetchAmount))
      65             : {
      66           1 : }
      67             : 
      68           3 : QueueSlave::~QueueSlave()
      69             : {
      70           1 :     delete _impl;
      71           2 : }
      72             : 
      73           1 : void QueueSlave::attach(const uint128_t& id, const uint32_t instanceID)
      74             : {
      75           1 :     Object::attach(id, instanceID);
      76           1 :     registerCommand(CMD_QUEUE_ITEM, CommandFunc<Object>(0, 0), &_impl->queue);
      77           1 :     registerCommand(CMD_QUEUE_EMPTY, CommandFunc<Object>(0, 0), &_impl->queue);
      78           1 : }
      79             : 
      80           1 : void QueueSlave::applyInstanceData(co::DataIStream& is)
      81             : {
      82           1 :     NodeID masterNodeID;
      83           1 :     is >> _impl->masterInstanceID >> masterNodeID;
      84             : 
      85           1 :     LBASSERT(masterNodeID != 0);
      86           1 :     LBASSERT(!_impl->master);
      87           2 :     LocalNodePtr localNode = getLocalNode();
      88           1 :     _impl->master = localNode->connect(masterNodeID);
      89           1 : }
      90             : 
      91           5 : ObjectICommand QueueSlave::pop(const uint32_t timeout)
      92             : {
      93           5 :     static lunchbox::a_int32_t _request;
      94           5 :     const int32_t request = ++_request;
      95             : 
      96             :     while (true)
      97             :     {
      98           5 :         const size_t queueSize = _impl->queue.getSize();
      99           5 :         if (queueSize <= _impl->prefetchMark)
     100             :         {
     101          10 :             send(_impl->master, CMD_QUEUE_GET_ITEM, _impl->masterInstanceID)
     102          15 :                 << _impl->prefetchAmount << getInstanceID() << request;
     103             :         }
     104             : 
     105           5 :         ObjectICommand cmd(_impl->queue.pop(timeout));
     106           5 :         if (!cmd.isValid())
     107             :         {
     108           0 :             LBERROR << "Timeout during QueueSlave::pop()" << std::endl;
     109           0 :             return ObjectICommand(0, 0, 0);
     110             :         }
     111             : 
     112           5 :         switch (cmd.getCommand())
     113             :         {
     114             :         case CMD_QUEUE_ITEM:
     115           4 :             return ObjectICommand(cmd);
     116             : 
     117             :         default:
     118           0 :             LBUNIMPLEMENTED;
     119             :         case CMD_QUEUE_EMPTY:
     120           1 :             if (cmd.get<int32_t>() == request)
     121           1 :                 return ObjectICommand(0, 0, 0);
     122             :             // else left-over or not our empty command, discard and retry
     123           0 :             break;
     124             :         }
     125           0 :     }
     126             : }
     127          63 : }

Generated by: LCOV version 1.11