LCOV - code coverage report
Current view: top level - co - oCommand.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 83 87 95.4 %
Date: 2016-12-14 01:26:48 Functions: 11 14 78.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2012-2016, Daniel Nachbaur <danielnachbaur@gmail.com>
       3             :  *                          Stefan.Eilemann@epfl.ch
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "oCommand.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "iCommand.h"
      25             : 
      26             : namespace co
      27             : {
      28             : namespace detail
      29             : {
      30             : 
      31       72182 : class OCommand
      32             : {
      33             : public:
      34       72182 :     OCommand( co::Dispatcher* const dispatcher_, LocalNodePtr localNode_ )
      35       72182 :         : isLocked( false )
      36             :         , size( 0 )
      37             :         , dispatcher( dispatcher_ )
      38       72182 :         , localNode( localNode_ )
      39       72182 :     {}
      40             : 
      41             :     bool isLocked;
      42             :     uint64_t size;
      43             :     co::Dispatcher* const dispatcher;
      44             :     LocalNodePtr localNode;
      45             : };
      46             : 
      47             : }
      48             : 
      49       72148 : OCommand::OCommand( const Connections& receivers, const uint32_t cmd,
      50       72148 :                     const uint32_t type )
      51             :     : DataOStream()
      52       72148 :     , _impl( new detail::OCommand( 0, 0 ))
      53             : {
      54       72147 :     _setupConnections( receivers );
      55       72146 :     _init( cmd, type );
      56       72143 : }
      57             : 
      58          34 : OCommand::OCommand( Dispatcher* const dispatcher, LocalNodePtr localNode,
      59          34 :                     const uint32_t cmd, const uint32_t type )
      60             :     : DataOStream()
      61          34 :     , _impl( new detail::OCommand( dispatcher, localNode ))
      62             : {
      63          34 :     _init( cmd, type );
      64          34 : }
      65             : 
      66           0 : OCommand::OCommand( const OCommand& rhs )
      67             :     : DataOStream( const_cast< OCommand& >( rhs ))
      68           0 :     , _impl( new detail::OCommand( *rhs._impl ))
      69             : {
      70           0 : }
      71             : 
      72      144364 : OCommand::~OCommand()
      73             : {
      74       72182 :     if( _impl->isLocked )
      75             :     {
      76         165 :         LBASSERT( _impl->size > 0 );
      77         165 :         const uint64_t size = _impl->size + getBuffer().getSize();
      78         165 :         const Connections& connections = getConnections();
      79         165 :         if( size < COMMAND_MINSIZE ) // Fill send to minimal size
      80             :         {
      81         152 :             const size_t delta = COMMAND_MINSIZE - size;
      82         152 :             void* padding = alloca( delta );
      83         912 :             for( ConnectionsCIter i = connections.begin();
      84         608 :                  i != connections.end(); ++i )
      85             :             {
      86         304 :                 ConnectionPtr connection = *i;
      87         152 :                 connection->send( padding, delta, true );
      88             :             }
      89             :         }
      90         990 :         for( ConnectionsCIter i = connections.begin();
      91         660 :              i != connections.end(); ++i )
      92             :         {
      93         330 :             ConnectionPtr connection = *i;
      94         165 :             connection->unlockSend();
      95             :         }
      96         165 :         _impl->isLocked = false;
      97         165 :         _impl->size = 0;
      98         165 :         reset();
      99             :     }
     100             :     else
     101       72017 :         disable();
     102             : 
     103       72182 :     if( _impl->dispatcher )
     104             :     {
     105          34 :         LBASSERT( _impl->localNode );
     106             : 
     107             :         // #145 proper local command dispatch?
     108          34 :         LBASSERT( _impl->size == 0 );
     109          34 :         const uint64_t size = getBuffer().getSize();
     110          68 :         BufferPtr buffer = _impl->localNode->allocBuffer( size );
     111          34 :         buffer->swap( getBuffer( ));
     112          34 :         reinterpret_cast< uint64_t* >( buffer->getData( ))[ 0 ] = size;
     113             : 
     114          68 :         ICommand cmd( _impl->localNode, _impl->localNode, buffer, false );
     115          34 :         _impl->dispatcher->dispatchCommand( cmd );
     116             :     }
     117             : 
     118       72182 :     delete _impl;
     119       72182 : }
     120             : 
     121       72180 : void OCommand::_init( const uint32_t cmd, const uint32_t type )
     122             : {
     123             : #ifndef COLLAGE_BIGENDIAN
     124             :     // big endian hosts swap handshake commands to little endian...
     125       72180 :     LBASSERTINFO( cmd < CMD_NODE_MAXIMUM, std::hex << "0x" << cmd << std::dec );
     126             : #endif
     127       72180 :     enableSave();
     128       72180 :     _enable();
     129       72181 :     *this << uint64_t( 0 )/* size */ << type << cmd;
     130       72177 : }
     131             : 
     132         165 : void OCommand::sendHeader( const uint64_t additionalSize )
     133             : {
     134         165 :     LBASSERT( !_impl->dispatcher );
     135         165 :     LBASSERT( !_impl->isLocked );
     136         165 :     LBASSERT( additionalSize > 0 );
     137             : 
     138         165 :     const Connections& connections = getConnections();
     139         330 :     for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
     140             :     {
     141         330 :         ConnectionPtr connection = *i;
     142         165 :         connection->lockSend();
     143             :     }
     144         165 :     _impl->isLocked = true;
     145         165 :     _impl->size = additionalSize;
     146         165 :     flush( true );
     147         165 : }
     148             : 
     149           3 : size_t OCommand::getSize()
     150             : {
     151           3 :     return sizeof( uint64_t ) + sizeof( uint32_t ) + sizeof( uint32_t );
     152             : }
     153             : 
     154       72140 : void OCommand::sendData( const void* buffer LB_UNUSED, const uint64_t size,
     155             :                          const bool last LB_UNUSED )
     156             : {
     157       72140 :     LBASSERT( !_impl->dispatcher );
     158       72140 :     LBASSERT( last );
     159       72140 :     LBASSERTINFO( size >= 16, size );
     160       72139 :     LBASSERT( getBuffer().getData() == buffer );
     161       72136 :     LBASSERT( getBuffer().getSize() == size );
     162       72134 :     LBASSERT( getBuffer().getMaxSize() >= COMMAND_MINSIZE );
     163             : 
     164             :     // Update size field
     165             :     // cppcheck-suppress unreadVariable
     166       72129 :     uint8_t* bytes = getBuffer().getData();
     167       72128 :     reinterpret_cast< uint64_t* >( bytes )[ 0 ] = _impl->size + size;
     168       72128 :     const uint64_t sendSize = _impl->isLocked ? size : LB_MAX( size,
     169             :                                                                COMMAND_MINSIZE);
     170       72128 :     const Connections& connections = getConnections();
     171      144273 :     for( auto connection : connections )
     172             :     {
     173       72146 :         if ( connection )
     174       72146 :             connection->send( bytes, sendSize, _impl->isLocked );
     175             :         else
     176           0 :             LBERROR << "Can't send data, node is closed" << std::endl;
     177             :     }
     178       72147 : }
     179             : 
     180          66 : }

Generated by: LCOV version 1.11