LCOV - code coverage report
Current view: top level - co - connection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 150 185 81.1 %
Date: 2016-12-14 01:26:48 Functions: 23 25 92.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "connection.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "connectionDescription.h"
      25             : #include "connectionListener.h"
      26             : #include "log.h"
      27             : #include "pipeConnection.h"
      28             : #include "socketConnection.h"
      29             : #include "rspConnection.h"
      30             : 
      31             : #ifdef _WIN32
      32             : #  include "namedPipeConnection.h"
      33             : #endif
      34             : 
      35             : #include <co/exception.h>
      36             : 
      37             : #ifdef COLLAGE_USE_OFED
      38             : #  include "rdmaConnection.h"
      39             : #endif
      40             : #ifdef COLLAGE_USE_UDT
      41             : #  include "udtConnection.h"
      42             : #endif
      43             : 
      44             : #include <lunchbox/scopedMutex.h>
      45             : #include <lunchbox/stdExt.h>
      46             : 
      47             : #define STATISTICS
      48             : namespace co
      49             : {
      50             : namespace detail
      51             : {
      52             : class Connection
      53             : {
      54             : public:
      55             :     co::Connection::State state; //!< The connection state
      56             :     ConnectionDescriptionPtr description; //!< The connection parameters
      57             : 
      58             :     /** The lock used to protect concurrent write calls. */
      59             :     mutable lunchbox::Lock sendLock;
      60             : 
      61             :     BufferPtr buffer; //!< Current async read buffer
      62             :     uint64_t bytes; //!< Current read request size
      63             : 
      64             :     /** The listeners on state changes */
      65             :     ConnectionListeners listeners;
      66             : 
      67             :     uint64_t outBytes; //!< Statistic: written bytes
      68             :     uint64_t inBytes; //!< Statistic: read bytes
      69             : 
      70         429 :     Connection()
      71         429 :             : state( co::Connection::STATE_CLOSED )
      72         429 :             , description( new ConnectionDescription )
      73             :             , bytes( 0 )
      74             :             , outBytes( 0 )
      75         858 :             , inBytes( 0 )
      76             :     {
      77         429 :         description->type = CONNECTIONTYPE_NONE;
      78         429 :     }
      79             : 
      80         421 :     ~Connection()
      81         421 :     {
      82         421 :         LBASSERT( state == co::Connection::STATE_CLOSED );
      83         421 :         state = co::Connection::STATE_CLOSED;
      84         421 :         description = 0;
      85             : 
      86         421 :         LBASSERTINFO( !buffer,
      87             :                       "Pending read operation during connection destruction" );
      88         421 :     }
      89             : 
      90        1127 :     void fireStateChanged( co::Connection* connection )
      91             :     {
      92        3509 :         for( ConnectionListeners::const_iterator i= listeners.begin();
      93        2338 :              i != listeners.end(); ++i )
      94             :         {
      95          44 :             (*i)->notifyStateChanged( connection );
      96             :         }
      97        1125 :     }
      98             : };
      99             : }
     100             : 
     101         428 : Connection::Connection()
     102         428 :         : _impl( new detail::Connection )
     103             : {
     104         429 :     LBVERB << "New Connection @" << (void*)this << std::endl;
     105         429 : }
     106             : 
     107         842 : Connection::~Connection()
     108             : {
     109         421 :     LBVERB << "Delete Connection @" << (void*)this << std::endl;
     110             : #ifdef STATISTICS
     111        1263 :     LBDEBUG << *this << ": " << (_impl->outBytes >> 20) << " MB out, "
     112        1263 :             << (_impl->inBytes >> 20) << " MB in" << std::endl;
     113             : #endif
     114         421 :     delete _impl;
     115         421 : }
     116             : 
     117           0 : bool Connection::operator == ( const Connection& rhs ) const
     118             : {
     119           0 :     if( this == &rhs )
     120           0 :         return true;
     121           0 :     if( _impl->description->type != CONNECTIONTYPE_PIPE )
     122           0 :         return false;
     123           0 :     Connection* pipe = const_cast< Connection* >( this );
     124           0 :     return pipe->acceptSync().get() == &rhs;
     125             : }
     126             : 
     127         106 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
     128             : {
     129         212 :     ConnectionPtr connection;
     130         106 :     switch( description->type )
     131             :     {
     132             :         case CONNECTIONTYPE_TCPIP:
     133          98 :             connection = new SocketConnection;
     134          98 :             break;
     135             : 
     136             :         case CONNECTIONTYPE_PIPE:
     137           3 :             connection = new PipeConnection;
     138           3 :             break;
     139             : 
     140             : #ifdef _WIN32
     141             :         case CONNECTIONTYPE_NAMEDPIPE:
     142             :             connection = new NamedPipeConnection;
     143             :             break;
     144             : #endif
     145             : 
     146             :         case CONNECTIONTYPE_RSP:
     147           0 :             connection = new RSPConnection;
     148           0 :             break;
     149             : 
     150             : #ifdef COLLAGE_USE_OFED
     151             :         case CONNECTIONTYPE_RDMA:
     152           4 :             connection = new RDMAConnection;
     153           4 :             break;
     154             : #endif
     155             : #ifdef COLLAGE_USE_UDT
     156             :         case CONNECTIONTYPE_UDT:
     157             :             connection = new UDTConnection;
     158             :             break;
     159             : #endif
     160             : 
     161             :         default:
     162           3 :             LBWARN << "Connection type " << description->type
     163           3 :                    << " not supported" << std::endl;
     164           1 :             return 0;
     165             :     }
     166             : 
     167         105 :     if( description->bandwidth == 0 )
     168          78 :         description->bandwidth = connection->getDescription()->bandwidth;
     169             : 
     170         105 :     connection->_setDescription( description );
     171         105 :     return connection;
     172             : }
     173             : 
     174     3244745 : Connection::State Connection::getState() const
     175             : {
     176     3244745 :     return _impl->state;
     177             : }
     178             : 
     179         105 : void Connection::_setDescription( ConnectionDescriptionPtr description )
     180             : {
     181         105 :     LBASSERT( description.isValid( ));
     182         105 :     LBASSERTINFO( _impl->description->type == description->type,
     183             :                   "Wrong connection type in description" );
     184         105 :     _impl->description = description;
     185         105 :     LBASSERT( description->bandwidth > 0 );
     186         105 : }
     187             : 
     188        1181 : void Connection::_setState( const State state )
     189             : {
     190        1181 :     if( _impl->state == state )
     191          54 :         return;
     192        1127 :     _impl->state = state;
     193        1127 :     _impl->fireStateChanged( this );
     194             : }
     195             : 
     196         183 : void Connection::lockSend() const
     197             : {
     198         183 :     _impl->sendLock.set();
     199         183 : }
     200             : 
     201         183 : void Connection::unlockSend() const
     202             : {
     203         183 :     _impl->sendLock.unset();
     204         183 : }
     205             : 
     206         175 : void Connection::addListener( ConnectionListener* listener )
     207             : {
     208         175 :     _impl->listeners.push_back( listener );
     209         175 : }
     210             : 
     211         174 : void Connection::removeListener( ConnectionListener* listener )
     212             : {
     213         174 :     ConnectionListeners::iterator i = find( _impl->listeners.begin(),
     214         348 :                                             _impl->listeners.end(), listener );
     215         174 :     if( i != _impl->listeners.end( ))
     216         174 :         _impl->listeners.erase( i );
     217         174 : }
     218             : 
     219             : //----------------------------------------------------------------------
     220             : // read
     221             : //----------------------------------------------------------------------
     222     2225616 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
     223             : {
     224     2225616 :     LBASSERT( !_impl->buffer );
     225     2225616 :     LBASSERT( _impl->bytes == 0 );
     226     2225616 :     LBASSERT( buffer );
     227     2225616 :     LBASSERT( bytes > 0 );
     228     2225616 :     LBASSERTINFO( bytes < LB_BIT48,
     229             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     230             : 
     231     2225616 :     _impl->buffer = buffer;
     232     2225616 :     _impl->bytes = bytes;
     233     2225616 :     buffer->reserve( buffer->getSize() + bytes );
     234     2225616 :     readNB( buffer->getData() + buffer->getSize(), bytes );
     235     2225616 : }
     236             : 
     237     2225498 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
     238             : {
     239     2225498 :     LBASSERTINFO( _impl->buffer,
     240             :                   "No pending receive on " << getDescription()->toString( ));
     241             : 
     242             :     // reset async IO data
     243     2225497 :     outBuffer = _impl->buffer;
     244     2225498 :     const uint64_t bytes = _impl->bytes;
     245     2225498 :     _impl->buffer = 0;
     246     2225498 :     _impl->bytes = 0;
     247             : 
     248     2225498 :     if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
     249          34 :         return false;
     250     2225464 :     LBASSERTINFO( bytes < LB_BIT48,
     251             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     252             : #ifdef STATISTICS
     253     2225464 :     _impl->inBytes += bytes;
     254             : #endif
     255             : 
     256             :     // 'Iterators' for receive loop
     257     2225464 :     uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
     258     2225464 :     uint64_t bytesLeft = bytes;
     259     2225464 :     int64_t got = readSync( ptr, bytesLeft, block );
     260             : 
     261             :     // WAR: fluke notification: On Win32, we get occasionally a data
     262             :     // notification and then deadlock when reading from the connection. The
     263             :     // callee (Node::handleData) will flag the first read, the underlying
     264             :     // SocketConnection will not block and we will restore the AIO operation if
     265             :     // no data was present.
     266     2225464 :     if( got == READ_TIMEOUT )
     267             :     {
     268           0 :         _impl->buffer = outBuffer;
     269           0 :         _impl->bytes = bytes;
     270           0 :         outBuffer = 0;
     271           0 :         return true;
     272             :     }
     273             : 
     274             :     // From here on, receive loop until all data read or error
     275             :     while( true )
     276             :     {
     277     2512302 :         if( got < 0 ) // error
     278             :         {
     279          40 :             const uint64_t read = bytes - bytesLeft;
     280          40 :             outBuffer->resize( outBuffer->getSize() + read );
     281          40 :             if( bytes == bytesLeft )
     282          40 :                 LBDEBUG << "Read on dead connection" << std::endl;
     283             :             else
     284           0 :                 LBERROR << "Error during read after " << read << " bytes on "
     285           0 :                         << _impl->description << std::endl;
     286          40 :             return false;
     287             :         }
     288     2512262 :         else if( got == 0 )
     289             :         {
     290             :             // ConnectionSet::select may report data on an 'empty' connection.
     291             :             // If we have nothing read so far, we have hit this case.
     292           0 :             if( bytes == bytesLeft )
     293           0 :                 return false;
     294           0 :             LBVERB << "Zero bytes read" << std::endl;
     295             :         }
     296     2512262 :         if( bytesLeft > static_cast< uint64_t >( got )) // partial read
     297             :         {
     298      286838 :             ptr += got;
     299      286838 :             bytesLeft -= got;
     300             : 
     301      286838 :             readNB( ptr, bytesLeft );
     302      286838 :             got = readSync( ptr, bytesLeft, true );
     303      286838 :             continue;
     304             :         }
     305             : 
     306             :         // read done
     307     2225424 :         LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
     308             :                       got << " != " << bytesLeft );
     309             : 
     310     2225424 :         outBuffer->resize( outBuffer->getSize() + bytes );
     311             : #ifndef NDEBUG
     312     2225424 :         if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     313             :         {
     314           0 :             ptr -= (bytes - bytesLeft); // rewind
     315           0 :             LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
     316             :         }
     317             : #endif
     318     2225424 :         return true;
     319      286838 :     }
     320             : 
     321             :     LBUNREACHABLE;
     322             :     return true;
     323             : }
     324             : 
     325         224 : BufferPtr Connection::resetRecvData()
     326             : {
     327         224 :     BufferPtr buffer = _impl->buffer;
     328         224 :     _impl->buffer = 0;
     329         224 :     _impl->bytes = 0;
     330         224 :     return buffer;
     331             : }
     332             : 
     333             : //----------------------------------------------------------------------
     334             : // write
     335             : //----------------------------------------------------------------------
     336     2472874 : bool Connection::send( const void* buffer, const uint64_t bytes,
     337             :                        const bool isLocked )
     338             : {
     339             : #ifdef STATISTICS
     340     2472874 :     _impl->outBytes += bytes;
     341             : #endif
     342     2472874 :     LBASSERT( bytes > 0 );
     343     2468799 :     if( bytes == 0 )
     344           0 :         return true;
     345             : 
     346     2468799 :     const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
     347             : 
     348             :     // possible OPT: We need to lock here to guarantee an atomic transmission of
     349             :     // the buffer. Possible improvements are:
     350             :     // 1) Disassemble buffer into 'small enough' pieces and use a header to
     351             :     //    reassemble correctly on the other side (aka reliable UDP)
     352             :     // 2) Introduce a send thread with a thread-safe task queue
     353     4939100 :     lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
     354             : 
     355             : #ifndef NDEBUG
     356     2476679 :     if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     357           0 :         LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
     358             : #endif
     359             : 
     360     2476679 :     uint64_t bytesLeft = bytes;
     361     7460911 :     while( bytesLeft )
     362             :     {
     363             :         try
     364             :         {
     365     2498494 :             const int64_t wrote = this->write( ptr, bytesLeft );
     366     2492116 :             if( wrote == -1 ) // error
     367             :             {
     368           0 :                 LBERROR << "Error during write after " << bytes - bytesLeft
     369           0 :                         << " bytes, closing connection" << std::endl;
     370           0 :                 close();
     371           0 :                 return false;
     372             :             }
     373     2492116 :             else if( wrote == 0 )
     374           0 :                 LBINFO << "Zero bytes write" << std::endl;
     375             : 
     376     2492116 :             bytesLeft -= wrote;
     377     2492116 :             ptr += wrote;
     378             :         }
     379           0 :         catch( const co::Exception& e )
     380             :         {
     381           0 :             LBERROR << e.what() << " after " << bytes - bytesLeft
     382           0 :                     << " bytes, closing connection" << std::endl;
     383           0 :             close();
     384           0 :             return false;
     385             :         }
     386             : 
     387             :     }
     388     2470301 :     return true;
     389             : }
     390             : 
     391         166 : bool Connection::isMulticast() const
     392             : {
     393         166 :     return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
     394             : }
     395             : 
     396        1101 : ConstConnectionDescriptionPtr Connection::getDescription() const
     397             : {
     398        1101 :     return _impl->description;
     399             : }
     400             : 
     401         528 : ConnectionDescriptionPtr Connection::_getDescription()
     402             : {
     403         528 :     return _impl->description;
     404             : }
     405             : 
     406         502 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
     407             : {
     408         502 :     const Connection::State state = connection.getState();
     409        1004 :     ConstConnectionDescriptionPtr desc = connection.getDescription();
     410             : 
     411        1004 :     os << lunchbox::className( connection ) << " " << (void*)&connection
     412             :        << " state " << ( state == Connection::STATE_CLOSED     ? "closed" :
     413           0 :                          state == Connection::STATE_CONNECTING ? "connecting" :
     414           0 :                          state == Connection::STATE_CONNECTED  ? "connected" :
     415           0 :                          state == Connection::STATE_LISTENING  ? "listening" :
     416             :                          state == Connection::STATE_CLOSING    ? "closing" :
     417        1004 :                          "UNKNOWN" );
     418         502 :     if( desc.isValid( ))
     419         502 :         os << " description " << desc->toString();
     420             : 
     421        1004 :     return os;
     422             : }
     423          66 : }

Generated by: LCOV version 1.11