LCOV - code coverage report
Current view: top level - co - connection.cpp (source / functions) Hit Total Coverage
Test: Collage Lines: 145 177 81.9 %
Date: 2016-06-07 01:19:44 Functions: 23 25 92.0 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *
       5             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       6             :  *
       7             :  * This library is free software; you can redistribute it and/or modify it under
       8             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       9             :  * by the Free Software Foundation.
      10             :  *
      11             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      12             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      13             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      14             :  * details.
      15             :  *
      16             :  * You should have received a copy of the GNU Lesser General Public License
      17             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      18             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      19             :  */
      20             : 
      21             : #include "connection.h"
      22             : 
      23             : #include "buffer.h"
      24             : #include "connectionDescription.h"
      25             : #include "connectionListener.h"
      26             : #include "log.h"
      27             : #include "pipeConnection.h"
      28             : #include "socketConnection.h"
      29             : #include "rspConnection.h"
      30             : 
      31             : #ifdef _WIN32
      32             : #  include "namedPipeConnection.h"
      33             : #endif
      34             : 
      35             : #include <co/exception.h>
      36             : 
      37             : #ifdef COLLAGE_USE_OFED
      38             : #  include "rdmaConnection.h"
      39             : #endif
      40             : #ifdef COLLAGE_USE_UDT
      41             : #  include "udtConnection.h"
      42             : #endif
      43             : 
      44             : #include <lunchbox/scopedMutex.h>
      45             : #include <lunchbox/stdExt.h>
      46             : 
      47             : //#define STATISTICS
      48             : #ifdef STATISTICS
      49             : typedef std::map< uint64_t, size_t > Histogram;
      50             : typedef Histogram::const_iterator HistogramCIter;
      51             : lunchbox::Lockable< Histogram, lunchbox::SpinLock > _histogram;
      52             : 
      53             : #  define ADD_STATISTIC( x )                            \
      54             :     {                                                   \
      55             :         lunchbox::ScopedFastWrite mutex( _histogram );  \
      56             :         ++(*_histogram)[ x ];                           \
      57             :     }
      58             : #  define DUMP_STATISTIC                                                \
      59             :     {                                                                   \
      60             :         lunchbox::ScopedFastRead mutex( _histogram );                   \
      61             :         LBINFO << lunchbox::disableFlush << lunchbox::disableHeader;    \
      62             :         for( HistogramCIter i=_histogram->begin(); i!=_histogram->end(); ++i) \
      63             :             LBINFO << i->first << ", " << i->second << std::endl;       \
      64             :         LBINFO << lunchbox::enableHeader << lunchbox::enableFlush <<std::endl; \
      65             :     }
      66             : #else
      67             : #  define ADD_STATISTIC( x )
      68             : #  define DUMP_STATISTIC
      69             : #endif
      70             : 
      71             : namespace co
      72             : {
      73             : namespace detail
      74             : {
      75             : class Connection
      76             : {
      77             : public:
      78             :     co::Connection::State state; //!< The connection state
      79             :     ConnectionDescriptionPtr description; //!< The connection parameters
      80             : 
      81             :     /** The lock used to protect concurrent write calls. */
      82             :     mutable lunchbox::Lock sendLock;
      83             : 
      84             :     BufferPtr buffer; //!< Current async read buffer
      85             :     uint64_t bytes; //!< Current read request size
      86             : 
      87             :     /** The listeners on state changes */
      88             :     ConnectionListeners listeners;
      89             : 
      90         429 :     Connection()
      91             :             : state( co::Connection::STATE_CLOSED )
      92         429 :             , description( new ConnectionDescription )
      93         858 :             , bytes( 0 )
      94             :     {
      95         429 :         description->type = CONNECTIONTYPE_NONE;
      96         429 :     }
      97             : 
      98         421 :     ~Connection()
      99         421 :     {
     100         421 :         LBASSERT( state == co::Connection::STATE_CLOSED );
     101         421 :         state = co::Connection::STATE_CLOSED;
     102         421 :         description = 0;
     103             : 
     104         421 :         LBASSERTINFO( !buffer,
     105             :                       "Pending read operation during connection destruction" );
     106         421 :     }
     107             : 
     108        1127 :     void fireStateChanged( co::Connection* connection )
     109             :     {
     110        3513 :         for( ConnectionListeners::const_iterator i= listeners.begin();
     111        2342 :              i != listeners.end(); ++i )
     112             :         {
     113          44 :             (*i)->notifyStateChanged( connection );
     114             :         }
     115        1127 :     }
     116             : };
     117             : }
     118             : 
     119         429 : Connection::Connection()
     120         429 :         : _impl( new detail::Connection )
     121             : {
     122         429 :     LBVERB << "New Connection @" << (void*)this << std::endl;
     123         429 : }
     124             : 
     125         842 : Connection::~Connection()
     126             : {
     127         421 :     delete _impl;
     128         421 :     LBVERB << "Delete Connection @" << (void*)this << std::endl;
     129             :     DUMP_STATISTIC;
     130         421 : }
     131             : 
     132           0 : bool Connection::operator == ( const Connection& rhs ) const
     133             : {
     134           0 :     if( this == &rhs )
     135           0 :         return true;
     136           0 :     if( _impl->description->type != CONNECTIONTYPE_PIPE )
     137           0 :         return false;
     138           0 :     Connection* pipe = const_cast< Connection* >( this );
     139           0 :     return pipe->acceptSync().get() == &rhs;
     140             : }
     141             : 
     142         106 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
     143             : {
     144         106 :     ConnectionPtr connection;
     145         106 :     switch( description->type )
     146             :     {
     147             :         case CONNECTIONTYPE_TCPIP:
     148          98 :             connection = new SocketConnection;
     149          98 :             break;
     150             : 
     151             :         case CONNECTIONTYPE_PIPE:
     152           3 :             connection = new PipeConnection;
     153           3 :             break;
     154             : 
     155             : #ifdef _WIN32
     156             :         case CONNECTIONTYPE_NAMEDPIPE:
     157             :             connection = new NamedPipeConnection;
     158             :             break;
     159             : #endif
     160             : 
     161             :         case CONNECTIONTYPE_RSP:
     162           0 :             connection = new RSPConnection;
     163           0 :             break;
     164             : 
     165             : #ifdef COLLAGE_USE_OFED
     166             :         case CONNECTIONTYPE_RDMA:
     167           4 :             connection = new RDMAConnection;
     168           4 :             break;
     169             : #endif
     170             : #ifdef COLLAGE_USE_UDT
     171             :         case CONNECTIONTYPE_UDT:
     172             :             connection = new UDTConnection;
     173             :             break;
     174             : #endif
     175             : 
     176             :         default:
     177           3 :             LBWARN << "Connection type " << description->type
     178           3 :                    << " not supported" << std::endl;
     179           1 :             return 0;
     180             :     }
     181             : 
     182         105 :     if( description->bandwidth == 0 )
     183          78 :         description->bandwidth = connection->getDescription()->bandwidth;
     184             : 
     185         105 :     connection->_setDescription( description );
     186         105 :     return connection;
     187             : }
     188             : 
     189     3256119 : Connection::State Connection::getState() const
     190             : {
     191     3256119 :     return _impl->state;
     192             : }
     193             : 
     194         105 : void Connection::_setDescription( ConnectionDescriptionPtr description )
     195             : {
     196         105 :     LBASSERT( description.isValid( ));
     197         105 :     LBASSERTINFO( _impl->description->type == description->type,
     198             :                   "Wrong connection type in description" );
     199         105 :     _impl->description = description;
     200         105 :     LBASSERT( description->bandwidth > 0 );
     201         105 : }
     202             : 
     203        1181 : void Connection::_setState( const State state )
     204             : {
     205        1181 :     if( _impl->state == state )
     206        1235 :         return;
     207        1127 :     _impl->state = state;
     208        1127 :     _impl->fireStateChanged( this );
     209             : }
     210             : 
     211         183 : void Connection::lockSend() const
     212             : {
     213         183 :     _impl->sendLock.set();
     214         183 : }
     215             : 
     216         183 : void Connection::unlockSend() const
     217             : {
     218         183 :     _impl->sendLock.unset();
     219         183 : }
     220             : 
     221         175 : void Connection::addListener( ConnectionListener* listener )
     222             : {
     223         175 :     _impl->listeners.push_back( listener );
     224         175 : }
     225             : 
     226         174 : void Connection::removeListener( ConnectionListener* listener )
     227             : {
     228             :     ConnectionListeners::iterator i = find( _impl->listeners.begin(),
     229         174 :                                             _impl->listeners.end(), listener );
     230         174 :     if( i != _impl->listeners.end( ))
     231         174 :         _impl->listeners.erase( i );
     232         174 : }
     233             : 
     234             : //----------------------------------------------------------------------
     235             : // read
     236             : //----------------------------------------------------------------------
     237     2207507 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
     238             : {
     239     2207507 :     LBASSERT( !_impl->buffer );
     240     2207507 :     LBASSERT( _impl->bytes == 0 );
     241     2207507 :     LBASSERT( buffer );
     242     2207507 :     LBASSERT( bytes > 0 );
     243     2207507 :     LBASSERTINFO( bytes < LB_BIT48,
     244             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     245             : 
     246     2207507 :     _impl->buffer = buffer;
     247     2207507 :     _impl->bytes = bytes;
     248     2207507 :     buffer->reserve( buffer->getSize() + bytes );
     249     2207507 :     readNB( buffer->getData() + buffer->getSize(), bytes );
     250     2207506 : }
     251             : 
     252     2207389 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
     253             : {
     254     2207389 :     LBASSERTINFO( _impl->buffer,
     255             :                   "No pending receive on " << getDescription()->toString( ));
     256             : 
     257             :     // reset async IO data
     258     2207389 :     outBuffer = _impl->buffer;
     259     2207389 :     const uint64_t bytes = _impl->bytes;
     260     2207389 :     _impl->buffer = 0;
     261     2207389 :     _impl->bytes = 0;
     262             : 
     263     2207389 :     if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
     264          34 :         return false;
     265     2207355 :     LBASSERTINFO( bytes < LB_BIT48,
     266             :                   "Out-of-sync network stream: read size " << bytes << "?" );
     267             : 
     268             :     // 'Iterators' for receive loop
     269     2207355 :     uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
     270     2207355 :     uint64_t bytesLeft = bytes;
     271     2207355 :     int64_t got = readSync( ptr, bytesLeft, block );
     272             : 
     273             :     // WAR: fluke notification: On Win32, we get occasionally a data
     274             :     // notification and then deadlock when reading from the connection. The
     275             :     // callee (Node::handleData) will flag the first read, the underlying
     276             :     // SocketConnection will not block and we will restore the AIO operation if
     277             :     // no data was present.
     278     2207355 :     if( got == READ_TIMEOUT )
     279             :     {
     280           0 :         _impl->buffer = outBuffer;
     281           0 :         _impl->bytes = bytes;
     282           0 :         outBuffer = 0;
     283           0 :         return true;
     284             :     }
     285             : 
     286             :     // From here on, receive loop until all data read or error
     287             :     while( true )
     288             :     {
     289     2501257 :         if( got < 0 ) // error
     290             :         {
     291          40 :             const uint64_t read = bytes - bytesLeft;
     292          40 :             outBuffer->resize( outBuffer->getSize() + read );
     293          40 :             if( bytes == bytesLeft )
     294          40 :                 LBDEBUG << "Read on dead connection" << std::endl;
     295             :             else
     296           0 :                 LBERROR << "Error during read after " << read << " bytes on "
     297           0 :                         << _impl->description << std::endl;
     298          40 :             return false;
     299             :         }
     300     2501217 :         else if( got == 0 )
     301             :         {
     302             :             // ConnectionSet::select may report data on an 'empty' connection.
     303             :             // If we have nothing read so far, we have hit this case.
     304           0 :             if( bytes == bytesLeft )
     305           0 :                 return false;
     306           0 :             LBVERB << "Zero bytes read" << std::endl;
     307             :         }
     308     2501217 :         if( bytesLeft > static_cast< uint64_t >( got )) // partial read
     309             :         {
     310      293902 :             ptr += got;
     311      293902 :             bytesLeft -= got;
     312             : 
     313      293902 :             readNB( ptr, bytesLeft );
     314      293902 :             got = readSync( ptr, bytesLeft, true );
     315      293902 :             continue;
     316             :         }
     317             : 
     318             :         // read done
     319     2207315 :         LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
     320             :                       got << " != " << bytesLeft );
     321             : 
     322     2207315 :         outBuffer->resize( outBuffer->getSize() + bytes );
     323             : #ifndef NDEBUG
     324     2207315 :         if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     325             :         {
     326           0 :             ptr -= (bytes - bytesLeft); // rewind
     327           0 :             LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
     328             :         }
     329             : #endif
     330     2207315 :         return true;
     331             :     }
     332             : 
     333             :     LBUNREACHABLE;
     334      293902 :     return true;
     335             : }
     336             : 
     337         224 : BufferPtr Connection::resetRecvData()
     338             : {
     339         224 :     BufferPtr buffer = _impl->buffer;
     340         224 :     _impl->buffer = 0;
     341         224 :     _impl->bytes = 0;
     342         224 :     return buffer;
     343             : }
     344             : 
     345             : //----------------------------------------------------------------------
     346             : // write
     347             : //----------------------------------------------------------------------
     348     2455002 : bool Connection::send( const void* buffer, const uint64_t bytes,
     349             :                        const bool isLocked )
     350             : {
     351             :     ADD_STATISTIC( bytes );
     352     2455002 :     LBASSERT( bytes > 0 );
     353     2451907 :     if( bytes == 0 )
     354           0 :         return true;
     355             : 
     356     2451907 :     const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
     357             : 
     358             :     // possible OPT: We need to lock here to guarantee an atomic transmission of
     359             :     // the buffer. Possible improvements are:
     360             :     // 1) Disassemble buffer into 'small enough' pieces and use a header to
     361             :     //    reassemble correctly on the other side (aka reliable UDP)
     362             :     // 2) Introduce a send thread with a thread-safe task queue
     363     2451907 :     lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
     364             : 
     365             : #ifndef NDEBUG
     366     2458719 :     if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
     367           0 :         LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
     368             : #endif
     369             : 
     370     2458719 :     uint64_t bytesLeft = bytes;
     371     7390561 :     while( bytesLeft )
     372             :     {
     373             :         try
     374             :         {
     375     2479108 :             const int64_t wrote = this->write( ptr, bytesLeft );
     376     2473123 :             if( wrote == -1 ) // error
     377             :             {
     378           0 :                 LBERROR << "Error during write after " << bytes - bytesLeft
     379           0 :                         << " bytes, closing connection" << std::endl;
     380           0 :                 close();
     381           0 :                 return false;
     382             :             }
     383     2473123 :             else if( wrote == 0 )
     384           0 :                 LBINFO << "Zero bytes write" << std::endl;
     385             : 
     386     2473123 :             bytesLeft -= wrote;
     387     2473123 :             ptr += wrote;
     388             :         }
     389           0 :         catch( const co::Exception& e )
     390             :         {
     391           0 :             LBERROR << e.what() << " after " << bytes - bytesLeft
     392           0 :                     << " bytes, closing connection" << std::endl;
     393           0 :             close();
     394           0 :             return false;
     395             :         }
     396             : 
     397             :     }
     398     2452734 :     return true;
     399             : }
     400             : 
     401         166 : bool Connection::isMulticast() const
     402             : {
     403         166 :     return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
     404             : }
     405             : 
     406         681 : ConstConnectionDescriptionPtr Connection::getDescription() const
     407             : {
     408         681 :     return _impl->description;
     409             : }
     410             : 
     411         527 : ConnectionDescriptionPtr Connection::_getDescription()
     412             : {
     413         527 :     return _impl->description;
     414             : }
     415             : 
     416          81 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
     417             : {
     418          81 :     const Connection::State state = connection.getState();
     419          81 :     ConstConnectionDescriptionPtr desc = connection.getDescription();
     420             : 
     421         162 :     os << lunchbox::className( connection ) << " " << (void*)&connection
     422          81 :        << " state " << ( state == Connection::STATE_CLOSED     ? "closed" :
     423             :                          state == Connection::STATE_CONNECTING ? "connecting" :
     424             :                          state == Connection::STATE_CONNECTED  ? "connected" :
     425             :                          state == Connection::STATE_LISTENING  ? "listening" :
     426             :                          state == Connection::STATE_CLOSING    ? "closing" :
     427         162 :                          "UNKNOWN" );
     428          81 :     if( desc.isValid( ))
     429          81 :         os << " description " << desc->toString();
     430             : 
     431          81 :     return os;
     432             : }
     433          66 : }

Generated by: LCOV version 1.11