LCOV - code coverage report
Current view: top level - co - oCommand.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 81 85 95.3 %
Date: 2018-01-09 16:37:03 Functions: 11 14 78.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2012-2017, Daniel Nachbaur <danielnachbaur@gmail.com>
       3             :  *                          Stefan.Eilemann@epfl.ch
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "oCommand.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "iCommand.h"
      25             : 
      26             : namespace co
      27             : {
      28             : namespace detail
      29             : {
      30       72107 : class OCommand
      31             : {
      32             : public:
      33       72104 :     OCommand(co::Dispatcher* const dispatcher_, LocalNodePtr localNode_)
      34       72104 :         : isLocked(false)
      35             :         , size(0)
      36             :         , dispatcher(dispatcher_)
      37       72104 :         , localNode(localNode_)
      38             :     {
      39       72105 :     }
      40             : 
      41             :     bool isLocked;
      42             :     uint64_t size;
      43             :     co::Dispatcher* const dispatcher;
      44             :     LocalNodePtr localNode;
      45             : };
      46             : }
      47             : 
      48       72074 : OCommand::OCommand(const Connections& receivers, const uint32_t cmd,
      49       72074 :                    const uint32_t type)
      50             :     : DataOStream(std::numeric_limits<size_t>::max())
      51       72074 :     , _impl(new detail::OCommand(0, 0))
      52             : {
      53       72072 :     _setupConnections(receivers);
      54       72071 :     _init(cmd, type);
      55       72072 : }
      56             : 
      57          33 : OCommand::OCommand(Dispatcher* const dispatcher, LocalNodePtr localNode,
      58          33 :                    const uint32_t cmd, const uint32_t type)
      59             :     : DataOStream(std::numeric_limits<size_t>::max())
      60          33 :     , _impl(new detail::OCommand(dispatcher, localNode))
      61             : {
      62          33 :     _init(cmd, type);
      63          33 : }
      64             : 
      65           0 : OCommand::OCommand(const OCommand& rhs)
      66             :     : DataOStream(const_cast<OCommand&>(rhs))
      67           0 :     , _impl(new detail::OCommand(*rhs._impl))
      68             : {
      69           0 : }
      70             : 
      71      144212 : OCommand::~OCommand()
      72             : {
      73       72105 :     if (_impl->isLocked)
      74             :     {
      75         159 :         LBASSERT(_impl->size > 0);
      76         159 :         const uint64_t size = _impl->size + getBuffer().getSize();
      77         159 :         const Connections& connections = getConnections();
      78         159 :         if (size < COMMAND_MINSIZE) // Fill send to minimal size
      79             :         {
      80         146 :             const size_t delta = COMMAND_MINSIZE - size;
      81         146 :             void* padding = alloca(delta);
      82         876 :             for (ConnectionsCIter i = connections.begin();
      83         584 :                  i != connections.end(); ++i)
      84             :             {
      85         292 :                 ConnectionPtr connection = *i;
      86         146 :                 connection->send(padding, delta, true);
      87             :             }
      88             :         }
      89         318 :         for (ConnectionsCIter i = connections.begin(); i != connections.end();
      90             :              ++i)
      91             :         {
      92         318 :             ConnectionPtr connection = *i;
      93         159 :             connection->unlockSend();
      94             :         }
      95         159 :         _impl->isLocked = false;
      96         159 :         _impl->size = 0;
      97         159 :         reset();
      98             :     }
      99             :     else
     100       71946 :         disable();
     101             : 
     102       72107 :     if (_impl->dispatcher)
     103             :     {
     104          33 :         LBASSERT(_impl->localNode);
     105             : 
     106             :         // #145 proper local command dispatch?
     107          33 :         LBASSERT(_impl->size == 0);
     108          33 :         const uint64_t size = getBuffer().getSize();
     109          66 :         BufferPtr buffer = _impl->localNode->allocBuffer(size);
     110          33 :         buffer->swap(getBuffer());
     111          33 :         reinterpret_cast<uint64_t*>(buffer->getData())[0] = size;
     112             : 
     113          66 :         ICommand cmd(_impl->localNode, _impl->localNode, buffer);
     114          33 :         _impl->dispatcher->dispatchCommand(cmd);
     115             :     }
     116             : 
     117       72107 :     delete _impl;
     118       72107 : }
     119             : 
     120       72104 : void OCommand::_init(const uint32_t cmd, const uint32_t type)
     121             : {
     122       72104 :     enableSave();
     123       72104 :     _enable();
     124       72107 :     *this << uint64_t(0) /* size */ << type << cmd;
     125       72105 : }
     126             : 
     127         159 : void OCommand::sendHeader(const uint64_t additionalSize)
     128             : {
     129         159 :     LBASSERT(!_impl->dispatcher);
     130         159 :     LBASSERT(!_impl->isLocked);
     131         159 :     LBASSERT(additionalSize > 0);
     132             : 
     133         159 :     const Connections& connections = getConnections();
     134         318 :     for (ConnectionsCIter i = connections.begin(); i != connections.end(); ++i)
     135             :     {
     136         318 :         ConnectionPtr connection = *i;
     137         159 :         connection->lockSend();
     138             :     }
     139         159 :     _impl->isLocked = true;
     140         159 :     _impl->size = additionalSize;
     141         159 :     flush(true);
     142         159 : }
     143             : 
     144           3 : size_t OCommand::getSize()
     145             : {
     146           3 :     return sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t);
     147             : }
     148             : 
     149       72067 : void OCommand::sendData(const void* buffer LB_UNUSED, const uint64_t size,
     150             :                         const bool last LB_UNUSED)
     151             : {
     152       72067 :     LBASSERT(!_impl->dispatcher);
     153       72069 :     LBASSERT(last);
     154       72069 :     LBASSERTINFO(size >= 16, size);
     155       72068 :     LBASSERT(getBuffer().getData() == buffer);
     156       72063 :     LBASSERT(getBuffer().getSize() == size);
     157       72056 :     LBASSERT(getBuffer().getMaxSize() >= COMMAND_MINSIZE);
     158             : 
     159             :     // Update size field
     160             :     // cppcheck-suppress unreadVariable
     161       72056 :     uint8_t* bytes = getBuffer().getData();
     162       72054 :     reinterpret_cast<uint64_t*>(bytes)[0] = _impl->size + size;
     163             :     const uint64_t sendSize =
     164       72054 :         _impl->isLocked ? size : LB_MAX(size, COMMAND_MINSIZE);
     165       72054 :     const Connections& connections = getConnections();
     166      144128 :     for (auto connection : connections)
     167             :     {
     168       72071 :         if (connection)
     169       72071 :             connection->send(bytes, sendSize, _impl->isLocked);
     170             :         else
     171           0 :             LBERROR << "Can't send data, node is closed" << std::endl;
     172             :     }
     173       72073 : }
     174          63 : }

Generated by: LCOV version 1.11